Giter VIP home page Giter VIP logo

ttstringgolangiot's Introduction

#smartgo SmartGo

smartgo整体架构图

smartgo是什么?

SmartGo是能够支持主流消息队列功能及满足物联网MQTT数千万长连接设备推送消息使用golang语言全新开发的一款分布式、队列模型的智能中间件,具有以下特点:

  • 支持point-point、pub-sub、request-reply等多种模式
  • 支持严格的消息顺序
  • 支持数百万长连接
  • 亿级消息堆积能力
  • 比较友好的分布式特性

当前最新版本功能支持:

  1. 将整个项目命名为smartgo-1.0.0
  2. 将项目中所有子工程命名为stg-*

如何开始?


开发规范必读

  • 源文件使用Unix换行、UTF-8文件编码,遵照golang内置格式化代码规范
  • 请在git clone命令之前执行git config --global core.autocrlf false,确保本地代码使用Unix换行格式
  • 请在非主干分支上开发,禁止提交本地未测试运行通过代码到线上分支
  • 每次提交及之前(正常来说需要先pull --rebase,解决冲突),对代码进行修改必须有相对应的解释说明
  • 正常组内开发人员提交代码,需要经过经过审核后方可提交(且需要有统一格式注释,参照注释类型3)

原始文档请参考:smartgo/docs/doc/目录:

SmartGo文档截图示例:

SmartGo-Store:

SmartGo-Broker:

SmartGo-Net:

SmartGo-Register:

SmartGo-Client:

Markdown文档概述(由于Markdown格式会缺失图片,细节请参考原始文档)

SmartGo-Store技术文档说明

针对版本V1.0.0

©

成都基础平台架构

2017/11/21

目 录

1 存储 4

1.1 概述 4

1.2 零拷贝技术 4

1.3 CommitLog 4

1.4 ConsumeQueue 7

1.5 索引 9

1.6 主从同步 11

1.7 刷盘 13

1.8 文件清理 13

附件一Smartgo开发者联系方式 15

  1. 1存储
  2. 1.1概述

存储模块主要包含存储Producer生产的消息、ConsumeQueue、索引等数据以及主从同步、刷盘、清理服务等。

  1. 1.2零拷贝技术

零拷贝是通过将文件映射到内存上,直接操作文件,相比于传统的io(首先要调用系统IO,然后要将数据从内核空间传输到用户空间),避免了很多不必要的数据拷贝,提高存储性能。 存储消息,使用了零拷贝,零拷贝包含以下两种方式:

方式 优点 缺点
mmap + write 即使频繁调用,使用小块文件传输,效率也很高 不能很好的利用 DMA 方式,会比 sendfile 多消耗 CPU,内存安全性控制复杂
sendfile 可以利用 DMA 方式,消耗 CPU 较少,大块文件传输效率高,无内存安全新问题 小块文件效率低于 mmap 方式

SmartGo采用mmap+write方式,因为有小块数据传输的需求,效果会比sendfile更好。

  1. 1.3CommitLog

CommitLog用于存储真实消息数据。CommitLog路径默认为用户工作目录/store/commitlog。

CommitLog存储目录结构:

commitlog

    - 00000000000000000000

    - 00000000001073741824

commitlog文件名生成的规则:

文件名的长度为20位,左边补零,剩余的为文件起始偏移量(第一个文件起始偏移量为0);

文件名字根据指定commitlog文件大小(默认文件大小为1G,可以通过MessageStoreConfig的mapedFileSizieCommitLog进行配置)递增,文件大小单位为字节。

例如:

默认commitlog文件大小为1G=1073741824b

第一文件的起始偏移量为0,不足20位进行补零,故文件名00000000000000000000,当第 一文件写满,第二文件的起始偏移量为1073741824,不足20位进行补零,故文件名为00000000001073741824,后面的文件名以此类推。

文件n起始偏移量 = size * (n- 1)

文件1起始偏移量 = 1073741824 * (1 - 1) = 0

文件2起始偏移量 = 1073741824 * (2 - 1) = 1073741824

通过commitlog文件名能够方便快速定位信息所在的文件。

文件Index = (消息的起始物理偏移量-最早的文件的起始偏移量)/文件大小,即 (1073741827-0)/1073741824=1,可得知该消息在队列中的第二个文件中:

commitlog文件的消息结构:

序号 字段 说明 字节数 备注
1 TotalSize 消息总长度 4
2 MagicCode MagicCode 4 MagicCode分为:MessageMagicCode、BlankMagicCode。MessageMagicCode表示正确的消息内容;BlankMagicCode表示CommitLog文件空间不足,采用空字节占位写满文件。
3 BodyCRC 消息内容CRC 4 BodyCRC的值是对消息内容(body)进行CRC32生成的32bit冗余校验码,用于确保消息的正确性。
4 QueueId 消息队列编号 4
5 Flag 消息标志 4
6 QueueOffset 消息队列位置 8 自增值,消息队列逻辑位置,通过该值才能查找到consume queue中的数据;QueueOffset * 20才是消息队列的物理偏移量。
7 PhysicalOffset 物理位置 8
8 SysFlag MessageSysFlag 4
9 BornTimestamp 生产消息时间戳 8
10 BornHost 生产消息的地址+端口 8
11 StoreTimestamp 存储消息时间戳 8
12 StoreHost 存储消息的地址+端口 8
13 ReconsumeTimes 重新消费消息次数 4
14 PreparedTransationOffset 8
15 BodyLength 消息内容长度 4
16 Body 消息内容 bodyLength
17 TopicLength Topic长度 1
18 Topic topic topicLength
19 PropertiesLength 附加属性长度 2
20 Properties 附加属性 propertiesLength

添加CommitLog数据,将数据写入到MapedFile,每个MapedFile对应着一个储存消息的二进制文件,MapedFile在创建时会映射到内存上,添加消息时将需要保存的数据写入内存,后续有刷盘服务会将内存中数据持久化到二进制物理文件中,下图是添加CommitLog数据的主要业务流程:

查询CommitLog数据,直接从映射的内存中根据物理偏移量以及数据大小,获取数据,下图是查询CommitLog数据的主要业务流程:

  1. 1.4ConsumeQueue

消费者逻辑队列,对应/store/consumequeue文件夹,每个消费队列文件目录机构如下:

consumequeue

-- topic

    -- queue id

            -- 00000000000000000000

            -- 00000000000000001040

            -- 00000000000000002080

consumequeue文件名生成规则:

commitlog文件名生成规则一致,需要注意的是:maped文件大小为=向上取整(指定size/消息位置信息size)* 消息位置信息size

例如:

指定消费队列文件大小=1024

消息位置信息size = 20

mapedFileSize = 向上取整(1024 / 20) * 20

mapedFileSize = 1040

consumequeue文件结构:

ConsumeQueue中并不需要存储消息的内容,而存储的是消息在CommitLog中的offset。也就是说,ConsumeQuue其实是CommitLog的一个索引文件。

序号 字段 说明 字节数 备注
1 CommitLog Offset CommitLog的起始物理偏移量physical offset 8
2 Size 消息的大小 4
3 Message Tag Hashcode 消息Tag的哈希值 8 用于订阅时消息过滤(订阅时如果指定了Tag,会根据HashCode来快速查找到订阅的消息)

ConsumeQueue是定长的结构,每条数据大小为20个字节,每个文件默认大小为600万个字节。Consumer消费消息的时候,需要2个步骤:首先读取ConsumeQueue得到offset,然后读取CommitLog得到消息内容。

添加消息时,添加消息到commitLog后会向分发服务添加一个分发请求,分发服务调用MessageStore添加消息位置信息,根据消息的Topic、QueueId获取ConsumeQueue,消息的位置信息追加到对应的消费队列中,最终保存的二进制文件中,主要流程如下图:

获取消息时, 首先根据Topic、QueueId获取ConsumeQueue,然后根据消息逻辑offset,获取消息的物理偏移量、消息的Size,最后根据消息的物理偏移量、消息的Size获取CommitLog数据,主要流程如下图:

  1. 1.5索引

IndexService(索引服务)

IndexService用于创建索引文件集合,当用户想要查询某个topic下某个key的消息时,能够快速响应。

Index File(索引文件)

IndexFile存储消息索引的文件,文件结构如下:

索引文件由三个部分组成:Header(索引文件头信息)、Slot Table(槽位信息)、Index Linked List(消息的索引内容)

Index Header:索引文件头信息由40个字节的数据组成。

序号 字段 说明 字节数 备注
1 BeginTimestamp 索引文件开始时间 8 第一个索引创建的时间
2 EndTimestamp 索引文件结束时间 8 最后一个索引创建的时间
3 BeginPhyOffset 索引文件开始的物理偏移量 8 第一个索引对应的CommitLog物理偏移量
4 EndPhyOffset 索引文件结束的物理偏移量 8 最后一个索引对应的CommitLog物理偏移量
5 HashSlotCount 索引文件占用的槽位数 4
6 IndexCount 索引的个数 4

Slot Table

Index Linked List:消息的索引内容链表,默认每个文件有2000W消息索引内容组成,每个消息索引内容为20个字节的数据。

序号 字段 说明 字节数 备注
1 KeyHash key的哈希值 4 topic-key(key是消息的key)的hashCode组成
2 PhyOffset commitLog的物理偏移量 8
3 Timestamp 索引创建的时间 4
4 NextIndexOffset 下一个索引的索引地址 4

IndexFile的创建过程:

首先在DispatchMessageService写入ConsumeQueue后,会再调用indexService.putRequest,添加索引请求;IndexService定时获取创建索引请求,调用IndexService的buildIndex进行创建索引。

  1. 1.6主从同步

在集群模式的部署方式中,Master与Slave配对是通过指定相同的brokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数。一个Master下面可以挂载多个Slave,同一个Master下的多个Slave通过指定不同的BrokerId来区分。

主从同步服务

存储模块启动时,会启动主从同步服务,主从同步服务主要的组成部分是:主从同步服务端、主从同步客户端

主从同步服务端

接收slave节点的连接请求,接收到请求后会建立主从连接,接受和传递主从之间数据。

主从连接

主从连接主要由主从写服务、主从读服务组成,主从写服务主要用于master传输同步数据,主从读服务主要用于接受slave节点发送的offset信息。

主从写服务传输的数据结构:

序号 字段 说明 字节数 备注
1 Offset commitLog物理偏移量 8 同步commitLog物理偏移量
2 BodySize 传输数据的大小 4
3 BodyData 传输数据的内容 BodyLength

主从同步客户端

连接master节点,定时上报offset以及接收master节点传输的同步数据。

主从同步客户端上报的数据结构:

序号 字段 说明 字节数 备注
1 Offset commitLog物理偏移量 8 slave节点的最大commitLog物理偏移量

主从同步客户端上报offset时,会获取当前最大CommitLog文件物理偏移量。如果HAClient是首次上报offset,并且上报的offset为0,master节点会获取最后一个CommitLog文件进行传输,其余的CommitLog文件不会进行同步。上报的offset不为0,master节点会从上报的offset进行同步。

  1. 1.7刷盘

RocketMQ刷盘有两种方式,分为:同步刷盘、异步刷盘。

同步刷盘:在消息到达MQ后,RocketMQ需要将数据持久化,同步刷盘是指数据到达内存之后,必须刷到commitlog日志之后才算成功,然后返回producer数据已经发送成功。

异步刷盘:数据到达内存之后,返回producer说数据已经发送成功,然后再写入commitlog日志。

RocketMQ默认是使用异步刷盘。

逻辑队列刷盘服务(FlushConsumeQueueService):用于将ConsumeQueue的File文件写入入里磁盘,

首先判断是否到达了刷盘时间,如果到达了,那么全盘通刷;否则,遍历所有的ConsumeQueue,调用cq.commit(flushConsumeQueueLeastPages)进行刷盘,flushConsumeQueueLeastPages是目前文件的未刷盘大小达到flushConsumeQueueLeastPages*OS_PAGE_SIZE(1024*4)个,才进行刷盘。

逻辑队列刷盘服务:定时将ConsumeQueue的数据从内存写入到文件。

  1. 1.8文件清理

存储服务启动时,会启动定时清理文件服务,定时清除服务会每分钟定时清理CommitLog、ConsumeQueue文件。

清理CommitLog文件服务

清理CommitLog文件,需要满足以下任意一条件:

1、消息文件过期(默认72小时),且到达清理时点(默认是凌晨4点),删除过期文件。

2、消息文件过期(默认72小时),且磁盘空间达到了水位线(默认75%),删除过期文件。

3、磁盘已经达到必须释放的上限(85%水位线)的时候,则开始批量清理文件(无论是否过期),直到空间充足。

注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务。

清理ConsumeQueue文件服务

定时清理小于最小CommitLog物理偏移量的ConsumeQueue的文件。

SmartGo-Broker 技术文档说明

针对版本V1.0.0

©

成都基础平台架构

2017/11/21

目 录

1 概述 4

2 Borker模块交互 4

2.1 Registry 4

2.2 Client 4

2.3 Net 4

2.4 Store 5

2.5 Broker 5

3 专业术语 5

3.1 Topic 5

3.1 ConsumerOffset 5

3.2 SubscriptionGroup 5

4 Broker实现原理 5

4.1 Topic 管理 5

4.2 ConsumerOffset管理 7

4.3 SubscriptionGroup管理 9

4.4 发送消息 10

4.5 消费消息 11

4.6 主从同步 12

4.7 Hold 13

4.8 消息统计 14

4.9 Producer、Consumer连接管理 15

附件一 Smargo开发者联系方式 16

  1. 1概述

Broker消息中转角色,负责存储消息,转发消息,一般也称为 Server。在 JMS 规范中称为 Provider。Broker通过自身实现方法并且发布,提供Client调用。也就是说相对于Client,Broker是一个Service。

Broker在Smargo的角色很多,它是给Producer、Consumer提供服务的Service、又是调用Registry服务的Client、而它自身还承载着运维接口、消息统计等一系列的功能。

  1. 2Borker模块交互
  2. 2.1Registry

每个Broker与每个Registry保持长连接。

启动时会向每一个Registry注册,启动过后Broker每隔30秒向Register发送心跳,注册和发送心跳都包含了将自身的clusterName,brokerName,topic信息发。如果Broker 2分钟内没有发送心跳数据,则断开连接。

 Broker挂掉或者断开,Registry会有自动感应,会更新删除该Broker与Topic的关系。

  1. 2.2Client

每个Client通过Registry拿到BrokerList地址,Client与BrokerList保持长连接。

Producer向Broker发送消息,Broker负者处理解析消息,然后转发到Stroe进行消息持久化。

Consumer从Broker拉取消息进行消费,Broker会维护Consumer与Topic之间订阅关系,并且会维护与Topic消费的Offset。

  1. 2.3Net

Broker通过Net创建Service(目前端口为10911),注册并发布服务,供Client调用。

Broker通过Net创建Client,调用Registry的方法。

  1. 2.4Store

Broker收到消息,经过一些列的验证,解析,重新封装后将消息交给Store做后续的处理。

  1. 2.5Broker

Broker主节点之间没有交互,主节点与备节点同步Topic信息,Consumer Offset,延迟队列的Offset,订阅关系等。

  1. 3专业术语
  2. 3.1Topic

Topic是一个消息主题,一个在线Producer实例只能对应一个Topic,一个在线Consumer实例可以对应多个Topic,一条消息必须属于一个Topic。

  1. 1.1ConsumerOffset

ConsumerOffset主要记录了Consumer GroupName与Topic每个Queue的消费进度。

  1. 3.2SubscriptionGroup

SubscriptionGroup用来管理订阅组的订阅信息,包含订阅权限、重试队列,重试次数等。

  1. 4Broker实现原理
  2. 4.1Topic 管理

 默认Topic

目前Broker启动时会生成六个默认的Topic,OFFSET_MOVED_EVENT、SELF_TEST_TOPIC、DEFAULT_TOPIC、BENCHMARK_TOPIC、集群名称Topic、Broker名称Topic。其中DEFAULT_TOPIC最为关键,应为Topic的创建会以DEFAULT_TOPIC为模板进行创建。目前Smargo中DEFAULT_TOPIC的读写队列默认为16个;并且是一个可读可写Topic。

 持久化

每个Broker会将其下的每一个Topic进行统一的持久化,这些Topic被全部保存到一个以JSON的形式都保存到一个文件中,Smargo保存Topic文件的路径为/当前用户目录下/store/config/topic.json文件。该文件主要保存了每一个Topic的主要信息如:TopicName(topic名称)、ReadQueueNums(读队列个数)、WriteQueueNums(写队列个数)、Perm(topic权限)、Order(是否为顺序Topic)、topicSysFlag(系统标识)。

文件存储内容如下:

{

"topicConfigTable": {

    "topicConfigTable": {

        "%RETRY%consumerGroupId-example-200": {

            "SEPARATOR": "",

            "topicName": "%RETRY%consumerGroupId-example-200",

            "readQueueNums": 1,

            "writeQueueNums": 1,

            "perm": 6,

            "topicFilterType": 0,

            "topicSysFlag": 0,

            "order": false

        }

    }

},

"dataVersion": {

    "timestamp": 1511333414604049700,

    "counter": 2023

}

}

 初始化

在Broker启动时,Broker会将Topic.json文件进行加载,在内存中维护一套Topic名称与Topic对象之间的关系,对Topic进行任何操作,都会更新内存所维护的关系以及Topic.json的文件。

 创建Topic

创建Topic由Client发起,Broker没有检测到Client所需要发送的Topic,其创建如图所示:

在创建Topic的过程中,会将创建的Topic的队列数与DefaultTopic队列数对比,取其小的队列数为新建Topic的队列数。创建成功后会立马向所有Registry注册。

 其他操作

如果对原有的Topic进行了操作,会第一时间将内存维护的信息进行更新并且会刷入磁盘中。Broker启动时会开启一个线程,每隔30秒向Registry注册,将更新的Topic维护到Registry中。

  1. 4.2ConsumerOffset管理

ConsumerOffset主要管理的是订阅组与Topic Queue消费进度的管理。具体流程如下:

 初始化

在Broker启动时,Broker会将ConsumerOffset.json文件进行加载,在内存中维护一套以订阅组名称与Topic名称组合为key,以当前Topic队列消费offset为value的关系。

 持久化

在Broker启动时候,Broker会开启一个线程每个5秒对Client上报的Consumer与Topic Offset进行持久化。

Smargo保存consumerOffset文件的路径为/当前用户目录下/store/config/consumerOffset.json文件。存储结构如下:

{

"offsets": {

    groupName@TopicName: {

        queue 1: offset,

    …

    queue x: offset,

    }

}

}

  1. 4.3SubscriptionGroup管理

SubscriptionGroup用来管理订阅组的订阅信息,包含订阅权限、重试队列,重试次数等。其流程如下:

Consumer通过心跳服务进行对SubscriptionGroup来维护。

 持久化

每当Broker维护SubscriptionGroup关系发生改变,都会进行一次持久化。Smargo保存subscriptionGroup文件的路径为/当前用户目录下/store/config/subscriptionGroup.json。存储结构如下:

{

topicName: {

        "groupName": "xx", //订阅组名称

        "consumeEnable": true, //是否可以消费

        "consumeFromMinEnable": true, //是否允许从队列最小位置开始消费,线上默认会设置为false

        "consumeBroadcastEnable": true, //是否允许广播方式消费

        "retryQueueNums": 1, //消费失败的消息放到一个重试队列,每个订阅组配置几个重试队列

        "retryMaxTimes": 16, //重试消费最大次数,超过则投递到死信队列,不再投递,并报警

        "brokerId": 0,  //从哪个Broker开始消费

        "whichBrokerWhenConsumeSlowly": 0  //发现消息堆积后,将Consumer的消费请求重定向到另外一台Slave机器

    }

},

"dataVersion": {

    "timestamp": 1511342161274071800,

    "counter": 3

}

}

  1. 4.4发送消息

整个消息的发送流程分为两个步骤流程:

 Consumer回退消息

针对Consumer消费失败消息投放重试队列,Broker接收到消息检测到如果是消费失败的Consumer端回退消息。会经历一下流程:

  1. a)检测当前消息的订阅组是否存在。
  2. b)检测当前Broker是否有写入权限
  3. c)获取到重试队列Topic(重试队列Topic一般为%RETRY%+groupName),计算QueueID。
  4. d)如果当前消息消费次数大于设置重试消费次数则投入死信队列(死信队列Topic一般为%DLQ% +GroupName)消息将不会再被消费。
  5. e)如果当前消息消费次数小于设置重试消费次数,则会将当前消费次数+3个等级延迟,延迟该消息的消费。
  6. f)重新组装消息对象调用store服务。
  7. g)统计

 Producter发送正常消息

正常消息的发送情况远没有重试消息流程复杂,其流程如下:

  1. a)检查发送消息的合法性(Topic、Broker权限等)。

  2. b)重新组装消息对象调用store服务。

  3. c)统计

  4. 4.5消费消息

Bolt在consumer端的消费方式有两种:一种push(推)、一种pull(拉)。不管是pull与push对Broker而言都是由Consumer主动发起的pull操作。其主要流程如下:

![](

ttstringgolangiot's People

Contributors

wjl2017 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.