Giter VIP home page Giter VIP logo

joyqueue's Issues

并行消费问题

开启并行消费,例如窗口大小5,批量大小默认10,消费一段时间后,如果主题生产者停止发送消息后,消费会立刻停止,一直卡住不消费,直到如果后续生产继续那么消费也会开始消费,可以进行反复操作复现该问题。

消费时,不应读取到未提交的消息

只要写入的消息,就可以读取到。考虑到Raft复制协议,在极端情况下,有可能出现已写入但未提交的消息被删除的情况,所以,消费时,只应读到已提交的消息。

请问相比kafka的优势在哪里

基于文档中描述,分布式协议是raft实现,请教一下相比kafka的核心优势在哪里?是否还保留kafka分区的概念,集群的线性扩容能否提高吞吐,是如何做到的呢?

批量写的建议

  1. PartitionGroupStoreManager.write 方法可以同时写多条消息,但为什么每处理一条消息后都会唤醒刷新进程一次,而不是把这批消息处理完成后统一唤醒一次。
  2. 当前性能的瓶颈在于频繁的刷新上,而消息分组处理在ProduceManager.writeMessagesAsync, 如果可以把这部分工作后移到PartitionGroupStoreManager.write里面,那么同一分组上可以同时处理的消息会更多,它会减少flush次数,提升系统性能。

PositioningStore可能导致数据丢失

joyqueue的代码我看的比较早,记得之前store在flush的时候都只是通过write方法将数据写入FileChannel,然后并没有fsync到磁盘。
更新代码之后发现2020年4月,有提交新逻辑,支持在上一个文件写满了,然后第一次写新文件的时候,对上一个文件做一次force将其数据fsync到磁盘上(如下PositioningStore第414行)。
if (!storeFile.isClean()) { // 在文件第一次刷盘之前,需要把上一个文件fsync到磁盘上,避免服务器宕机导致文件不连续 if (flushForce && storeFile.flushPosition() == 0) { Map.Entry<Long, StoreFile<T>> prevEntry = storeFileMap.floorEntry(entry.getKey() - 1); if(null != prevEntry) { prevEntry.getValue().force(); } } storeFile.flush(); }

但是此时似乎还是会存在PositioningStore中数据丢失的问题。即一个Store文件在写入的过程中(没有写满),此时如果宕机,那么这个文件的部分数据会因为没有执行force导致数据没有刷新到磁盘(此时还在内核的page cache中),从而导致数据丢失!!!!
针对这个问题,我参考过其他涉及到WAL方式实现的各类开源项目,其几乎都是每写一次盘,然后force一次数据。
请问joyqueue目前这种实现是出于什么考虑呢?

Ctrl+ C 停止服务器时,导致文件破坏而无法重新启动

com.jd.joyqueue.store.StoreInitializeException: java.io.IOException: 请求的操作无法在使用用户映射区域打开的文件上执行。
at com.jd.joyqueue.store.PartitionGroupStoreManager.recover(PartitionGroupStoreManager.java:191) ~[joyqueue-store-core-4.1.0.jar:?]
at com.jd.joyqueue.store.Store.restorePartitionGroup(Store.java:275) ~[joyqueue-store-core-4.1.0.jar:?]
at com.jd.joyqueue.broker.store.StoreManager.doStart(StoreManager.java:82) ~[joyqueue-broker-core-4.1.0.jar:?]
at com.jd.joyqueue.toolkit.service.Activity.start(Activity.java:55) ~[joyqueue-toolkit-4.1.0.jar:?]
at com.jd.joyqueue.toolkit.service.Service.start(Service.java:39) ~[joyqueue-toolkit-4.1.0.jar:?]
at com.jd.joyqueue.broker.BrokerService.startIfNecessary(BrokerService.java:380) ~[joyqueue-broker-core-4.1.0.jar:?]
at com.jd.joyqueue.broker.BrokerService.doStart(BrokerService.java:320) ~[joyqueue-broker-core-4.1.0.jar:?]
at com.jd.joyqueue.toolkit.service.Activity.start(Activity.java:55) ~[joyqueue-toolkit-4.1.0.jar:?]
at com.jd.joyqueue.toolkit.service.Service.start(Service.java:39) ~[joyqueue-toolkit-4.1.0.jar:?]
at com.jd.joyqueue.broker.Launcher.main(Launcher.java:33) [joyqueue-server-runtime-4.1.0.jar:?]
Caused by: java.io.IOException: 请求的操作无法在使用用户映射区域打开的文件上执行。
at sun.nio.ch.FileDispatcherImpl.truncate0(Native Method) ~[?:1.8.0_202]
at sun.nio.ch.FileDispatcherImpl.truncate(FileDispatcherImpl.java:97) ~[?:1.8.0_202]
at sun.nio.ch.FileChannelImpl.truncate(FileChannelImpl.java:357) ~[?:1.8.0_202]
at com.jd.joyqueue.store.file.StoreFileImpl.rollback(StoreFileImpl.java:375) ~[joyqueue-store-core-4.1.0.jar:?]
at com.jd.joyqueue.store.file.PositioningStore.rollbackFiles(PositioningStore.java:175) ~[joyqueue-store-core-4.1.0.jar:?]
at com.jd.joyqueue.store.file.PositioningStore.setRight(PositioningStore.java:135) ~[joyqueue-store-core-4.1.0.jar:?]
at com.jd.joyqueue.store.PartitionGroupStoreManager$Partition.rollbackTo(PartitionGroupStoreManager.java:1383) ~[joyqueue-store-core-4.1.0.jar:?]
at com.jd.joyqueue.store.PartitionGroupStoreManager$Partition.access$500(PartitionGroupStoreManager.java:1362) ~[joyqueue-store-core-4.1.0.jar:?]
at com.jd.joyqueue.store.PartitionGroupStoreManager.recoverIndices(PartitionGroupStoreManager.java:199) ~[joyqueue-store-core-4.1.0.jar:?]
at com.jd.joyqueue.store.PartitionGroupStoreManager.recover(PartitionGroupStoreManager.java:189) ~[joyqueue-store-core-4.1.0.jar:?]
... 9 more

存储增加CheckPoint机制

现状

由于消息的索引是异步创建异步刷盘,所以重启后,需要扫描消息文件,恢复索引。当前对于PG索引的恢复策略是:找到每个partition的最后一条索引对应的消息尾部的位置(消息全局位置 + 消息长度),取这些位置的最小值,可以保证,这个位置之前的所有消息都有索引,这个位置之后的消息有可能有索引,有可能没有,这个位置成为安全索引位置。所以,目前的策略是,启动后,从安全索引位置开始,重建所有消息的索引。

问题

目前遇到的问题是,在指定分区发送消息,并且数据严重倾斜的情况下,Broker重启后,可能需要较长的时间恢复索引。极端情况下,需要重建全部索引,耗时较长。比如,一个PG有n个partition,如果其中一个partition一条消息都没有写入过,重启时,这个安全索引位置为0,需要重建全部索引。

解决方案

存储层,每个PG增加一个CheckPoint文件,定期记录当前的安全索引位置。重启时,首先读取CheckPoint中的安全索引位置,从这个位置来恢复索引。如果CheckPoint文件不存在或者读取失败,继续按照现有策略恢复索引。

CheckPoint文件位于PG目录下,文件名固定为checkpoint。文件采用JSON格式,目的是为了便于修改。例子:

{
  "Version": 0,
  "IndexPosition": 88888, 
  "partitions": [
    {"partition": 0, "index": 666},
    {"partition": 1, "index": 333},
    {"partition": 2, "index": 22}
  ]
}

其中Version为CheckPoint文件的版本号,目前固定为0,后续修改文件格式后顺序递增版本号。IndexPosition属性记录安全索引位置。partitions 记录当前每个分区的下一条索引序号。

需要做的修改

  1. 更新属性:PartitionGroupStoreManager#indexPosition

    1. 索引刷盘之后:需要同步或异步更新indexPosition
    2. 回滚的时候:回滚的时候需要同步回滚indexPosition
  2. PartitionGroupStoreManager中启动一个定时器,定时写入CheckPoint文件,默认的周期是60S。

  3. 修改PartitionGroupStoreManager#recoverPartitions恢复索引逻辑,优先读取CheckPoint文件,以CheckPoint文件中记录的IndexPosition为准返回安全索引位置。

  4. 修改PartitionGroupStoreManager#stop方法,在停止刷盘线程和写入CheckPoint定时器之后,再执行一次写入CheckPoint。

  5. 恢复PartitionGroup时,PartitionGroupStoreManager#recoverIndices方法中,增加如下逻辑:

    1. 读取checkpoint文件,检查每个分区的下一条索引序号是否大于等于checkpoint中记录的索引序号;
    2. 如果是,使用checkpoint中记录的indexPosition继续恢复索引;
    3. 否则,使用目前的逻辑计算出的indexPosition继续恢复索引。

讨论

由于目前采用异步刷盘,并且不会调用force强制刷盘,极端情况下(比如服务器掉电或者磁盘满),已刷盘的数据也不能保证一定安全写入到磁盘中。如何保证CheckPoint中记录的安全刷盘位置之前的索引数据一定安全的写入到磁盘中了呢?

archive issue: consume log exception

归档功能异常:消费归档功能里,在批量进行读取消费记录日志文件的时候,如果读取是在两个文件里进行的(比如批量1000条,500条读取完后再进行下一个文件的500条读取),操作hbase存储如果出现异常的话进行位置回退处理,此时的回退处理是对1000条整体位置的偏移量进行回退,但偏移量横跨两个文件进行读取计算的,所以会导致回退偏移量设置异常,从而影响整个消费记录日志的存储线程。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.