Giter VIP home page Giter VIP logo

rocketmq-spring-boot-starter's Introduction

spring boot starter for RocketMQ Build Status Coverage Status

Maven CentralGitHub release

项目介绍

Rocketmq 是由阿里巴巴团队开发并捐赠给apache团队的优秀消息中间件,承受过历年双十一大促的考验。

你可以通过本项目轻松的集成Rocketmq到你的SpringBoot项目中。 本项目主要包含以下特性

  • 同步发送消息
  • 异步发送消息
  • 广播发送消息
  • 有序发送和消费消息
  • 发送延时消息
  • 消息tag和key支持
  • 自动序列化和反序列化消息体
  • 消息的实际消费方IP追溯
  • 发送事务消息(NEW)
  • ...
  • 发送即忘消息(可能由于直接抛弃所有异常导致消息静默丢失,弃用)
  • 拉取方式消费(配置方式复杂,位点可能发生偏移,弃用)

简单入门实例

1. 添加maven依赖:
<dependency>
    <groupId>com.maihaoche</groupId>
    <artifactId>spring-boot-starter-rocketmq</artifactId>
    <version>0.1.0</version>
</dependency>
2. 添加配置:
spring:
    rocketmq:
      name-server-address: 172.21.10.111:9876
      # 可选, 如果无需发送消息则忽略该配置
      producer-group: local_pufang_producer
      # 发送超时配置毫秒数, 可选, 默认3000
      send-msg-timeout: 5000
      # 追溯消息具体消费情况的开关,默认打开
      #trace-enabled: false
      # 是否启用VIP通道,默认打开
      #vip-channel-enabled: false
3. 程序入口添加注解开启自动装配

在springboot应用主入口添加@EnableMQConfiguration注解开启自动装配:

@SpringBootApplication
@EnableMQConfiguration
class DemoApplication {
}
4. 构建消息体

通过我们提供的Builder类创建消息对象,详见wiki

MessageBuilder.of(new MSG_POJO()).topic("some-msg-topic").build();
5. 创建发送方

详见wiki

@MQProducer
public class DemoProducer extends AbstractMQProducer{
}
6. 创建消费方

详见wiki支持springEL风格配置项解析,如存在suclogger-test-cluster配置项,会优先将topic解析为配置项对应的值。

@MQConsumer(topic = "${suclogger-test-cluster}", consumerGroup = "local_sucloger_dev")
public class DemoConsumer extends AbstractMQPushConsumer {

    @Override
    public boolean process(Object message, Map extMap) {
        // extMap 中包含messageExt中的属性和message.properties中的属性
        System.out.println(message);
        return true;
    }
}
7. 发送消息:
// 注入发送者
@Autowired
private DemoProducer demoProducer;
    
...
    
// 发送
demoProducer.syncSend(msg)
    

发送事务消息###

Since 0.1.0

5.1 事务消息发送方#####
@MQTransactionProducer(producerGroup = "${camaro.mq.transactionProducerGroup}")
public class DemoTransactionProducer extends AbstractMQTransactionProducer {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // executeLocalTransaction
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // LocalTransactionState.ROLLBACK_MESSAGE
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

rocketmq-spring-boot-starter's People

Contributors

suclogger avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rocketmq-spring-boot-starter's Issues

在SpringBoot3.1.5中无法使用

如题,我在Spring Boot3.1.5中尝试使用rocketmq-spring-boot-starter2.2.3,但发现无法使用
@Autowired
private RocketMQTemplate template;
自动注入RocketMQTemplate,但将Spring Boot切换为2.7.11后可以正常注入

关于消息幂等的处理

由于rocketmq-spring-boot-starter为我们处理了消息Body的解析工作,但是导致messageExt对象对我们而言是不可见的了,之前是设想使用messageExt.getBodyCRC()来处理消息幂等性。那现在我能想到的是用md5hash(JSON.toJSONString(message)),将该哈希值作为key存入redis,并设置有效时间比如10分钟。当第一次消费的时候就将该值存到redis;如果有重复消费的情况时,当发现redis中已经有存在该key则忽略此条消息,来防止重复消费

想问下作者,你们是如何处理消息幂等的?

ons自动切换

阿里是ons,一般线下测完要上ONS,能否把ons集成进来。

关于 ConsumerGroup、Topic、Tags 的验证逻辑

看到下面这段代码:

// 检查consumerGroup
if(!StringUtils.isEmpty(validConsumerMap.get(consumerGroup))) {
    String exist = validConsumerMap.get(consumerGroup);
    throw new RuntimeException("消费组重复订阅,请新增消费组用于新的topic和tag组合: " + consumerGroup + "已经订阅了" + exist);
} else {
    validConsumerMap.put(consumerGroup, topic + "-" + tags);
}

两个问题:

  1. key 是否可以设计成 consumerGroup + topic ,有以下场景:
    consumerGroup:cg,topic:tp01,tags:tag01 || tag02
    consumerGroup:cg,topic:tp01,tags:tag03 || tag04
    如果按照原逻辑,就算 topic 相同,tags 不同,也必须新增一个 consumerGroup。

  2. consumerGroup 和 topic、tags 的关系是 1:1 吗?

我是初学者,还请指教,感谢。

消费者每次只能接受到发送消息的四分之一

每次消费者都只能接收到发送消息的四分之一,并且序号都是间隔3;因为每个topic默认是初始化4个队列,怀疑是不是只是从一个队列里去接受消息;
用官方demo跑了一下,消息是能全部获取到的;
是不是在哪里有特殊的配置?

事务消息demo

业务代码 是不是要写在 executeLocalTransaction/checkLocalTransaction 里面?能否提供具体的demo

AbstractMQTransactionProducer beanObj = AbstractMQTransactionProducer.class.cast(transactionProducer.getValue());
MQTransactionProducer anno = beanObj.getClass().getAnnotation(MQTransactionProducer.class);

            TransactionMQProducer producer = new TransactionMQProducer(environment.resolvePlaceholders(anno.producerGroup()));
            producer.setNamesrvAddr(mqProperties.getNameServerAddress());
            producer.setExecutorService(executorService);
            producer.setTransactionListener(beanObj);
            producer.start();
            beanObj.setProducer(producer);

这个producer和beanObje 循环引用 会不会释放不掉的

建议可通过配置设置VipChannelEnabled

rocketmq服务端3.2.6版本(或许其他也有类似情况),在producer和consumer连接时连接的是vip端口,导致发送、启动失败;建议在MQProducerAutoConfiguration 和MQConsumerAutoConfiguration中对producer和consumer进行配置参数读取并设置VipChannelEnabled的,以便通过配置即可对低版本服务端的支持

同步刷盘timeout,项目是进行重新推送,但doAfterSyncSend却没有消息的内容无法重新推送

我们想用rocketMQ的消息为任务,推送其他服务消费任务指令。所以我们可能丢不起。所以,配置rocketMQ为SYNC_FLUSH.

SYNC_FLUSH的SendStatus有对应几个状态:SEND_OK、FLUSH_DISK_TIMEOUT、FLUSH_SLAVE_TIMEOUT、SLAVE_NOT_AVAILABLE

对于状态是FLUSH_DISK_TIMEOUT和FLUSH_SLAVE_TIMEOUT是需要重推的,但是给的业务后续处理的方法只有SendResult没有Message参数,是不是可以改一下,满足要求?

  /**
     * 重写此方法处理发送后的逻辑
     *
     * @param sendResult  发送结果
     */
    public void doAfterSyncSend(SendResult sendResult) {}

RocketMq消息轨迹的问题

@suclogger 您好,关于消息轨迹的几个疑惑点,方便加微信沟通一下吗?我的微信是Software_King,期待您的回复。

  • 问个问题哈:RocketMq消息轨迹只能通过这个org.apache.rocketmq.client.hook.ConsumeMessageHook去实现吗?,自己实现consumeMessageBefore和consumeMessageAfter

  • 如果要实现从Producer到Broker,再到Consumer的消息轨迹,有接口吗?

关于这次版本升级,提出的几点问题

1.AbstractMQPushConsumer的dealMessage方法,感觉还是保留重试次数信息的日志打印比较好呢
2. 我看到了@MQProducer标注的类的实例会调用registerBean而@MQConsumer标注的类的实例没有调用registerBean。我不是很清楚这个registerBean的用意,因为之前没有用到么,似乎不加这一句也没有影响。。。
// register default mq producer to spring context registerBean(DefaultMQProducer.class.getName(), producer);
3.还有那个问题就是将MessageExt对象转换成Map<String,Object> extMap,我觉得不是很合理。我个人建议还是直接把MessageExt就给使用者暴露出来吧
4.消息的轨迹这个特性挺不错的哈
5.还有我看到加了延迟发消息的特性,不错。不过我记得这个level是可以修改配置的,后续可以改成从namesrv取到配置好的level值,而不是将DELAY_ARRAY写死

设置消费线程池

目前项目无法通过配置修改消费线程池大小,希望通过在MQProperties.java里面添加相关属性,然后修改自动化配置代码实现!

启动后报循环依赖的问题

┌─────┐
| mqProducer (field private org.apache.rocketmq.client.producer.DefaultMQProducer com.maihaoche.starter.mq.base.AbstractMQProducer.producer)
↑ ↓
| exposeProducer defined in class path resource [com/maihaoche/starter/mq/config/MQProducerAutoConfiguration.class]
└─────┘

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.