xuxueli / xxl-mq Goto Github PK
View Code? Open in Web Editor NEWA lightweight distributed message queue framework.(分布式消息队列XXL-MQ)
Home Page: http://www.xuxueli.com/xxl-mq/
License: GNU General Public License v3.0
A lightweight distributed message queue framework.(分布式消息队列XXL-MQ)
Home Page: http://www.xuxueli.com/xxl-mq/
License: GNU General Public License v3.0
你好,我在com.xxl.mq.client.XxlMqProducer类中增添了新的方法produceCachedMsg,然后在example工程中增加一个controller action调用该方法,但是会报以下错误:
for servlet [springmvc] in context with path [/xxl-mq-example] threw exception [Handler processing failed; nested exception is java.lang.NoSuchMethodError: com.xxl.mq.client.XxlMqProducer.produceCachedMsg(Ljava/lang/String;Ljava/util/Map;)V] with root cause
java.lang.NoSuchMethodError: com.xxl.mq.client.XxlMqProducer.produceCachedMsg(Ljava/lang/String;Ljava/util/Map;)V
然而奇怪的是,如果我在一个tomcat上同时启动broker和example两个项目,那么就可以成功的调用该方法。如果在server1中部署了broker,在server2上部署example,就会报上述错误
看了代码里面留有大量的Executors,还是建议手动设定方式。毕竟中间件还是要求更好的质量。
List<XxlMqMessage> list = xxlMqMessageDao.pullNewMessage(XxlMqMessageStatus.NEW.name(), topic, group, consumerRank, consumerTotal, pagesize);
对应sql
SELECT <include refid="Base_Column_List" />
FROM xxl_mq_message AS t
WHERE t.topic = #{topic}
AND t.group = #{group}
AND t.status = #{newStatus}
AND t.effectTime <![CDATA[ < ]]> NOW()
<if test="consumerTotal > 1">
AND (
(
t.shardingId <![CDATA[ = ]]> 0
AND
MOD(t.id, #{consumerTotal}) = #{consumerRank}
)
OR
(
t.shardingId <![CDATA[ > ]]> 0
AND
MOD(t.shardingId, #{consumerTotal}) = #{consumerRank}
)
)
</if>
ORDER BY t.id ASC
LIMIT #{pagesize}
/**
* zk config file
*/
private static final String ZK_ADDRESS_FILE = "/data/webapps/xxl-conf.properties";
配置地址可否开放出来,可以自定义
jdk17的用不了 。不维护了吧 。。。
在延迟消息未被消费前,有没有API可以取消该消息,只在控制台看到有delete方法,client端的API貌似没有这个
基于zk广播的确简单也容易理解,它的qps大吗? 性能应该不太好吧 ,为什么不在有多个client,让那个server发送client呢 接受到数据处理呢????
在server端部署时,是不是除了部署xxl-mq-broker这个子项目,还应该部署xxl-mq-client子项目下的broker类,因为只有这样XxlMqBroker才能通过Spring的依赖注入获得xxlMqMessageDao,并且执行init方法?
初次接触分布式CS架构,所以有些细节不是很熟悉,见笑了
var temp = "
" + new Date().format("yyyy-MM-dd HH:mm:ss") + ": ";
shoule be
var temp = "
" + new Date().format("yyyy-MM-dd hh:mm:ss") + ": ";
2022-05-28 11:11:33,694 sak-mq-comsumer: INFO -- rpc.remoting.invoker.reference.XxlRpcReferenceBean -- >>>>>>>>>>> xxl-rpc, invoke error, address:192.168.0.150:7080, XxlRpcRequestXxlRpcRequest{requestId='348c4076-2f68-46e0-acf1-1cd760cdb846', createMillisTime=1653707483694, accessToken='null', className='com.xxl.mq.client.broker.IXxlMqBroker', methodName='pullNewMessage', parameterTypes=[class java.lang.String, class java.lang.String, int, int, int], parameters=[site_group_push_product_batch, DEFAULT, 1, 3, 100], version='null'}
目前的xxl-rpc-core: 1.3.0
尝试升级到1.5.0,XxlRpcReferenceBean的构造方法已被移除 netType参数也被移除了,
您好,我在安装并阅读代码时,在xx-mq-client包的XxlMqClient 类中发现getXxlMqService的方法中使用CountDownlatch时,countdown方法在await方法之前,这似乎违背了countDownLatch的使用方法,同步会失效。可以劳烦您解释一下吗?
你的这个是那种license
多机多网卡部署,Brocker提示注册成功
“>>>>>>>>>>> xxl-rpc registe service item, ...........”
但Service端提示:
“>>>>>>>>>>> xxl-rpc,no address from service:........”
跟作者沟通过,怀疑时zk没注册成功。但把它换到同一个环境下也不行,应该是多网卡问题。希望得到解决。
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /xxl-mq/rpc
at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:783)
at com.xxl.mq.client.rpc.util.ZkServiceRegistry.registerServices(ZkServiceRegistry.java:80)
at com.xxl.mq.client.rpc.netcom.NetComServerFactory$1.run(NetComServerFactory.java:50)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
加了消息过期机制,发了一个PR,帮忙review下吧
我在阅读XxlMqConsumer类的代码时,发现如果注入一个TopicComsumer,XxlMqConsumer类在初始化时会watchTopic,但是在initTopicConsumer方法的136行中,似乎执行了ZK注册consumer的逻辑,而且用的是queueConsumerRespository,这里似乎不需要再注册consumer了吧,因为之前已经watchTopic了,即使需要注册,也应该使用topicConsumerRespository吧?
PS:现在代码的逻辑中,key使用的是consumer的topic名称,所以如果在一台机器上,启动两个监听同一Topic的TopicConsumer是不是会有问题?
如题,不知道大佬们有没有这个问题
现在mp 和 xxl-rpc 1.7.0 一起用不了
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.