Giter VIP home page Giter VIP logo

zeromq's Introduction

ZeroMQ

ZeroMQ 项目是一个轻量级的消息队列系统,使用 Netty 进行网络通讯,项目的具体功能如下:

  • 支持多个消费者和多个生产者之间的消息,网络通信依赖 Netty
  • 生产者和消费者的关系可以是一对多、多对一、多对多的关系
  • 多个消费者可以组成一个消费者集群,从而生产者可以向这个消费者集群投递消息
  • 目前支持的路由策略是关键字的精确匹配,支持简单的消费者负载均衡策略
  • Broker 的消息派发、负载均衡、应答处理基于异步多线程模型进行开发设计。
  • Broker 消息的投递,目前仅仅支持严格的顺序消息。其中 Broker 还支持消息的缓冲派发,Broker 会缓存一定数量的消息 之后,再批量分配给对此消息感兴趣的消费者

本项目是在 AvatarMQ 的基础上进行了一些改进,并且添加了大量注释,供自己和他人学习使用,下面是 tangjie 的 github 与博客地址:


ZeroMQ 项目更加详细的分析可以查看 ZeroMQ 结构分析

zeromq's People

Contributors

xuweilin2014 avatar

Stargazers

 avatar  avatar

Watchers

 avatar

zeromq's Issues

ZeroMQ 结构分析

ZeroMQ 结构分析

0x01 ZeroMQ 项目总体结构

目前业界流行的分布式消息队列系统(或者可以叫做消息中间件)种类繁多,比如,基于Erlang的RabbitMQ、基于Java的ActiveMQ/Apache Kafka等等,都能进行大批量的消息路由转发。它们的共同特点是,都有一个消息中转路由节点,按照消息队列里面的专业术语,这个角色应该是broker。整个消息系统通过这个broker节点,进行从消息生产者Producer到消费者Consumer的消息路由。当然了,生产者和消费者可以是多对多的关系。消息路由的时候,可以根据关键字(专业的术语叫topic),进行关键字精确匹配、模糊匹配、广播方式的消息路由。

我们所写的极简消息队列由以下几个部分组成:

  • Broker:简单来说就是消息队列服务器实体
  • Producer:消息的生产者,主要用来发送消息给消费者
  • Consumer:消息的消费者,主要用来接收生产者的消息
  • Routing Key:路由关键字(Topic),主要用来控制生产者和消费者之间的发送与接收消息的对应关系

所写的 ZeroMQ 项目具体的特性如下:

  • ZeroMQ 基于 Java 语言进行编写,网络通讯依赖 Netty
  • 生产者和消费者的关系可以是一对多、多对一、多对多的关系。若干个消费者可以组成消费者集群,生产者可以向这个消费者集群投递消息。
  • 消费者集群对于有共同关注点的消费者支持消息的负载均衡策略。
  • 目前仅仅支持关键字的精确匹配路由,后续会逐渐完善。
  • 消息队列服务器 Broker 基于 Netty 的主从事件线程池模型开发设计。
  • 网络消息序列化采用 Kryo 进行消息的网络序列化传输。
  • Broker 的消息派发、负载均衡、应答处理(ACK)基于异步多线程模型进行开发设计。
  • Broker 消息的投递,目前支持严格的消息顺序。其中 Broker 还支持消息的缓冲派发,即 Broker 会缓存一定数量的消息之后,再批量分配给对此消息感兴趣的消费者。

具体的结构图如下:

ZeroMQ 结构

在 ZeroMQ 消息队列中,传递的消息种类有以下几种:

  • Subscribe:Subscribe 消息是 Consumer 发送给 Broker 端的,用来说明 Consumer 订阅的消息主题。
  • Subscribe Ack:Broker 在接收到 Consumer 发送的 Subscribe 消息并且进行处理之后,会返回一个 Subscribe Ack 消息给 Consumer。
  • Unsubscribe:Consumer 发送 Unsubscribe 消息给 Broker 端的,用来说明 Consumer 取消订阅的主题
  • Unsubscribe Ack:Broker 在接收到 Consumer 发送的 Unsubscribe 消息并且进行处理之后,会返回一个 Unsubscribe Ack 消息给 Consumer。
  • Message:Producer 发送 Message 消息给 Broker,然后 Broker 会把这个 Message 消息转发给 Consumer 端
  • Producer Ack:Broker 在接收到 Producer 端的 Message 之后,会返回一个 Producer Ack 消息给 Producer
  • Consumer Ack:Consumer 在接收到 Broker 转发的 Message 之后,就会返回一个 Consumer Ack 消息

接下来分别对 Consumer、Producer、Broker 这三个模块进行讲解。

0x02 Consumer

ZeroMQ 中使用 Consumer 订阅并且等待接收消息的例子如下:

public class ZeroMQConsumer1 {

    // 由用户自定义,对 producer 发送过来的消息进行处理
    private static MessageConsumeHook hook = new MessageConsumeHook() {
        public ConsumerAckMessage consumeMessage(Message message) {
            System.out.printf("ZeroMQConsumer1 收到消息编号:%s,消息内容:%s\n", message.getMsgId(), new String(message.getBody()));
            // 返回 ConsumerAckMessage 消息给 broker 服务端
            ConsumerAckMessage result = new ConsumerAckMessage();
            // 设置消息消费结果为 SUCCESS
            result.setStatus(ConsumerAckMessage.SUCCESS);
            return result;
        }
    };

    public static void main(String[] args) {
        ZeroMQConsumer consumer = new ZeroMQConsumer("127.0.0.1:18888", "ZeroMQ-Topic-1", hook);
        consumer.setClusterId("ZeroMQCluster1");
        consumer.start();
    }

}

上面的 MessageConsumeHook 是一个接口,具体由 Consumer 端实现,表明 Consumer 对 Broker 端发送过来的 Message 的处理方式。在上面,只是简单地打印发送过来的消息信息,然后返回 ConsumerAckMessage 响应给 Broker。在实现了 MessageConsumeHook 接口之后,就会创建一个 Consumer 对象。创建时,要指定 Broker 端的地址,以及要订阅的主题,最后还有 Consumer 实现的 MessageConsumerHook 接口对象。最后调用 start 方法启动 Consumer。start 方法主要完成的工作如下:

  • 创建一个 ConsumerHandler 对象,并且将其设置到 pipeline 之中。ConsumerHandler 用来对 Broker 返回的消息进行处理。
  • 连接到 Broker 服务器端
  • 连接建立之后向 Broker 发送 Subscribe 消息,表明当前 Broker 订阅的主题,接着阻塞等待响应(这里使用同步发送的方式)
  • 当 Broker 返回 SubscribeAckMessage 之后,会唤醒 Consumer 中阻塞的线程,根据 SubscribeAckMessage 中的 status 属性判断订阅主题是否成功打印相应的日志。

Consumer 端的 pipeline 网络结构如下所示:

consumer pipeline

MessageObjectEncoder 和 MessageObjectDecoder 主要是对消息进行编解码和序列化,会在后面的章节进行讲解。ConsumerHandler 主要用来对 Broker 发送过来的消息进行处理。Broker 会返回 SubscribeAckMessage 消息以及 Message 消息,ConsumerHandler 中对这两种消息的处理都是封装成一个 Runnable 对象,然后放入到线程池中去处理,这样就不会阻塞 netty 的 io 线程:

  • SubscribeAckMessage:获取到阻塞的 CallBackFuture 对象,会唤醒阻塞的 consumer 线程,并且如果订阅成功的话,打印日志信息;如果订阅失败的话,同样会打印失败的原因。
  • Message:将 Broker 转发过来的 Message 交给 MessageConsumeHook 进行消费处理,并且向 Broker 返回一个响应 ConsumerAckMessage。

0x03 Producer

在 ZeroMQ 项目中使用 Producer 发送消息到 Broker 端的代码如下:

public class ZeroMQProducer1 {

    public static void main(String[] args) throws InterruptedException {
        ZeroMQProducer producer = new ZeroMQProducer("127.0.0.1:18888", "ZeroMQ-Topic-1");
        producer.setClusterId("zeromq cluster");
        producer.start();

        System.out.println(StringUtils.center("zeromq producer1 消息发送开始", 50, "*"));

        for (int i = 0; i < 10; i++) {
            Message message = new Message();
            String str = "hello zeromq from producer1[" + i + "]";
            message.setBody(str.getBytes());
            ProducerAckMessage result = producer.deliver(message);
            if (result.getStatus() == (ProducerAckMessage.SUCCESS)) {
                System.out.printf("zeromq producer1 发送消息编号:%s\n", result.getMsgId());
            }

            Thread.sleep(100);
        }

        producer.shutdown();
        System.out.println(StringUtils.center("zeromq producer1 消息发送完毕", 50, "*"));
    }

}

和 Consumer 一样,也是先创建一个 Producer 对象,并且在创建的时候指明 Broker 地址,要发送的消息主题,然后调用 start 方法启动 Producer 方法。Start 方法的流程如下:

  • 创建一个 ProducerHandler 对象,并且将其设置到 pipeline 中,ProducerHandler 会对 Broker 返回的消息进行处理。
  • 连接到 Broker 端

接着就是创建好 Message 消息对象,调用 deliver 方法将消息发送到 Broker 端,delive 方法的流程如下:

  • 如果 producer 与 broker 之间没有建立连接或者连接已经关闭了,或者 producer 不处于运行状态,就会直接返回一个 ProducerAckMessage 对象,只不过将对象的 status 设置为 FAIL,表明发送失败。
  • 将 Message 消息发送到 Broker 端,并且发送线程阻塞(使用同步发送方式),等到 Broker 端返回的响应信息 ProducerAckMessage,然后打印相应日志

Producer 端的 pipeline 网络结构如下:

producer pipeline

Broker 返回给 producer 的消息 ProducerAckMessage 会被 ProducerHandler 进行处理。

0x04 Broker

Broker 端的逻辑比较复杂,ZeroMQServer 启动的时候会创建以下三个任务提交到线程池中不断执行:

  • SendMessageController:SendMessageController 不断循环从 MessageTaskQueue 中取出 MessageDispatchTask(MessageDispatchTask 是对 message, clusterId, topic 这三个变量的封装),这里取出 MessageDispatchTask 中的 message,然后将其转发给客户端 consumer,并且阻塞等待客户端返回 ConsumerAckMessage。注意,这里转发 Message 到 consumer 端是多个线程并发发送的,比如有 6 条消息,然后有 3 个线程,那么就由这 3 个线程并发地发送消息,每条线程发送 2 条消息。
  • SendAckController:从 AckTaskQueue 中获取保存的 ProducerAckMessage,并且将其发送到 producer 端,表示 broker 已经收到消息。
  • TransferAckController:在 BrokerProducerMessageStrategy 中会将 producer 发送过来的消息的 msgId 生成标识,保存到 AckMessageCache 中,另外如果 producer 发送过来的消息如果没有消费者进行订阅,就会跳过这一步,直接生成一个 ProducerAckMessage,保存到 AckTaskQueue 中。因此,这里 AckPushMessageController 的任务就是根据 AckMessageCache 保存的消息的标识,并发地创建 ProducerAckMessage,然后将其分发到 AckTaskQueue 中,等待 SendAckController 将其发送到 producer 端。比如有有 6 个标识,然后有 3 个线程,那么就由这 3 个线程来并发地创建 ProducerAckMessage,并且 ack 消息发送到 producer 端,每个线程处理 2 条标识。

ZeroMQServer 继承了 BrokerParallelServer,具体启动代码如下:

public class BrokerParallelServer implements RemotingServer {

     // parallel 是核心线程数,也就是处理器的数目 * 2
     protected int parallel = NettyClustersConfig.getWorkerThreads();

     protected ExecutorService executor = Executors.newFixedThreadPool(parallel);

     public BrokerParallelServer() {
     }

    public void start() {
         for (int i = 0; i < parallel; i++) {
                 executor.submit(new SendMessageController());
                 executor.submit(new SendAckController());
                 executor.submit(new TransferAckController());
             }
         }

    public void shutdown() {
         executor.shutdown();
    }
}

Broker 端的 pipeline 结构如下所示:

broker pipeline

Broker 会接受到 4 种类型的消息:Subscribe、Message、Unsubscribe、ConsumerAckMessage。然后会分别对其进行处理:

  • Message:Producer 发送到 broker 的消息,由 ProducerStrategy 进行处理,处理流程如下:
    1. 获取消息的主题,并且得到订阅了该主题的消费者集群
    2. 如果没有消费者集群订阅该消息主题,那么直接创建一个 ProducerAckMessage,保存到 AckTaskQueue,然后返回。然后等待 SendAckController 从 AckTaskQueue 中取出 ProducerAckMessage,然后发送到 producer。
    3. 如果有消费者集群订阅该消息主题
      1. 为每一个订阅了该主题的消费者集群创建一个 MessageDispatchTask(一个消费者集群对应一个 task),然后将这些 task 保存到 MessageTaskQueue. 这个 MessageDispatchTask 会把消息分发给消费者集群中的一个消费者(注意,不是所有的消费者,只是其中一个)
      2. 根据这个消息的 msgId 创建一个标识,然后将其保存到 AckMessageCache 中,broker 会根据 AckMessageCache 中的标识并发地创建 ProducerAck,将其保存到 AckTaskQueue 中,最后将其分发到对应的 producer 端
  • ConsumerAckMessage:consumer 接收到 broker 发送的 Message 消息之后,会返回 ConsumerAckMessage,由 ConsumerStrategy 进行处理。在 broker 使用 SendMessageLauncher 发送消息到 consumer 端时,会创建一个 CallBackFuture,然后阻塞。当 consumer 接收到发送过去的消息,并且进行了处理之后,就会返回一个 ConsumerAckMessage,然后在这里被接收到,然后唤醒前面阻塞在 CallBackFuture 中的线程。
  • Subscribe:Consumer 发送给 Broker 的订阅消息,由 SubscribeStrategy 进行处理,会把 Consumer 的信息添加到 broker 中,具体如下:
    1. 如果 broker 没有此 consumer 对应的消费者集群,创建一个 ConsumeCluster:
      1. 把 <consumerId, channel> 连接信息保存到 channelMap 中,channel 是 consumer 端到 broker 端的连接
      2. 把 <topic, subscription> 保存到 subMap 中
      3. 把 channel 保存到 channelList 中
    2. 如果在消费者集群中找到了 consumerId 对应的消费者,删除这个消费者对应的连接,替换成新连接
    3. 如果在消费者集群中没有找到 consumerId 对应的消费者,则添加 consumerId 的相应信息到 ConsumerCluster 中
  • Unsubscribe:Consumer 发送给 Broker 的取消订阅消息,由 UnsubscribeStrategy 进行处理。只是从 broker 端删除掉这个 consumer 的信息。

0x05 编解码

在 Consumer,Producer 以及 Broker 端的消息结构如下:

消息体分为两部分,第一部分是 4 字节的数据,表示后面消息数据的长度;第二部分就是发送的各种消息的字节数据。MessageObjectDecoder 和 MessageObjectEncoder 就是消息的编解码器,使用 kryo 作为序列化器:

  • MessageObjectEncoder:首先使用 kryo 对消息对象进行序列化,并且获取到序列化后的消息长度(int 类型)。接着首先写入 int 类型的长度数据,再写入消息的字节数据。
  • MessageObjectDecoder:首先判断获取到的字节数目是否小于 4,如果小于 4,直接返回。如果大于 4,读取 4 个字节的数据,表示消息数据的长度 len,然后判断可以获取到的字节数目是否小于 len,如果小于 len,那么同样直接返回,否则读取字节流,利用 kryo 反序列化成 Object 对象。

0x06 连接池

在 Consumer 和 producer 调用 start 方法启动的时候需要从连接池中获取和 broker 的连接,并且在 shutdown 之后需要归还连接到连接池中,这样就形成了连接的复用。连接池的对象为 GenericKeyedObjectPool<String, Connection>,其中 String 可以看成是 broker 的地址,Connection 可以看成是建立的到 broker 的连接。当 consumer 或者 producer 从 pool 中 borrow 连接时,如果 pool 中已经存在要连接的 broker 端的地址,那么就会直接从 pool 中获取到连接对象 Connection,如果没有 broker 端的地址的话,就会先创建一些 Connection 对象(默认创建 8 个),保存到连接池 pool 中,然后再去获取。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.