Giter VIP home page Giter VIP logo

apache / rocketmq Goto Github PK

View Code? Open in Web Editor NEW
20.6K 20.6K 11.5K 40.2 MB

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.

Home Page: https://rocketmq.apache.org/

License: Apache License 2.0

Shell 0.46% Java 98.75% Batchfile 0.17% Python 0.13% Starlark 0.49%
cloud-native eventing hacktoberfest java messaging rocketmq streaming

rocketmq's Introduction

Apache RocketMQ

Build Status CodeCov Maven Central Release License Average Time to Resolve An Issue Percentage of Issues Still Open Twitter Follow

Apache RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.

It offers a variety of features:

  • Messaging patterns including publish/subscribe, request/reply and streaming
  • Financial grade transactional message
  • Built-in fault tolerance and high availability configuration options base on DLedger Controller
  • Built-in message tracing capability, also support opentracing
  • Versatile big-data and streaming ecosystem integration
  • Message retroactivity by time or offset
  • Reliable FIFO and strict ordered messaging in the same queue
  • Efficient pull and push consumption model
  • Million-level message accumulation capacity in a single queue
  • Multiple messaging protocols like gRPC, MQTT, JMS and OpenMessaging
  • Flexible distributed scale-out deployment architecture
  • Lightning-fast batch message exchange system
  • Various message filter mechanics such as SQL and Tag
  • Docker images for isolated testing and cloud isolated clusters
  • Feature-rich administrative dashboard for configuration, metrics and monitoring
  • Authentication and authorization
  • Free open source connectors, for both sources and sinks
  • Lightweight real-time computing

Quick Start

This paragraph guides you through steps of installing RocketMQ in different ways. For local development and testing, only one instance will be created for each component.

Run RocketMQ locally

RocketMQ runs on all major operating systems and requires only a Java JDK version 8 or higher to be installed. To check, run java -version:

$ java -version
java version "1.8.0_121"

For Windows users, click here to download the 5.2.0 RocketMQ binary release, unpack it to your local disk, such as D:\rocketmq. For macOS and Linux users, execute following commands:

# Download release from the Apache mirror
$ wget https://dist.apache.org/repos/dist/release/rocketmq/5.2.0/rocketmq-all-5.2.0-bin-release.zip

# Unpack the release
$ unzip rocketmq-all-5.2.0-bin-release.zip

Prepare a terminal and change to the extracted bin directory:

$ cd rocketmq-all-5.2.0-bin-release/bin

1) Start NameServer

NameServer will be listening at 0.0.0.0:9876, make sure that the port is not used by others on the local machine, and then do as follows.

For macOS and Linux users:

### start Name Server
$ nohup sh mqnamesrv &

### check whether Name Server is successfully started
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

For Windows users, you need set environment variables first:

  • From the desktop, right click the Computer icon.
  • Choose Properties from the context menu.
  • Click the Advanced system settings link.
  • Click Environment Variables.
  • Add Environment ROCKETMQ_HOME="D:\rocketmq".

Then change directory to rocketmq, type and run:

$ mqnamesrv.cmd
The Name Server boot success...

2) Start Broker

For macOS and Linux users:

### start Broker
$ nohup sh bin/mqbroker -n localhost:9876 &

### check whether Broker is successfully started, eg: Broker's IP is 192.168.1.2, Broker's name is broker-a
$ tail -f ~/logs/rocketmqlogs/broker.log
The broker[broker-a, 192.169.1.2:10911] boot success...

For Windows users:

$ mqbroker.cmd -n localhost:9876
The broker[broker-a, 192.169.1.2:10911] boot success...

Run RocketMQ in Docker

You can run RocketMQ on your own machine within Docker containers, host network will be used to expose listening port in the container.

1) Start NameServer

$ docker run -it --net=host apache/rocketmq ./mqnamesrv

2) Start Broker

$ docker run -it --net=host --mount source=/tmp/store,target=/home/rocketmq/store apache/rocketmq ./mqbroker -n localhost:9876

Run RocketMQ in Kubernetes

You can also run a RocketMQ cluster within a Kubernetes cluster using RocketMQ Operator. Before your operations, make sure that kubectl and related kubeconfig file installed on your machine.

1) Install CRDs

### install CRDs
$ git clone https://github.com/apache/rocketmq-operator
$ cd rocketmq-operator && make deploy

### check whether CRDs is successfully installed
$ kubectl get crd | grep rocketmq.apache.org
brokers.rocketmq.apache.org                 2022-05-12T09:23:18Z
consoles.rocketmq.apache.org                2022-05-12T09:23:19Z
nameservices.rocketmq.apache.org            2022-05-12T09:23:18Z
topictransfers.rocketmq.apache.org          2022-05-12T09:23:19Z

### check whether operator is running
$ kubectl get pods | grep rocketmq-operator
rocketmq-operator-6f65c77c49-8hwmj   1/1     Running   0          93s

2) Create Cluster Instance

### create RocketMQ cluster resource
$ cd example && kubectl create -f rocketmq_v1alpha1_rocketmq_cluster.yaml

### check whether cluster resources is running
$ kubectl get sts
NAME                 READY   AGE
broker-0-master      1/1     107m
broker-0-replica-1   1/1     107m
name-service         1/1     107m

Apache RocketMQ Community


Learn it & Contact us


Contributing

We always welcome new contributions, whether for trivial cleanups, big new features or other material rewards, more details see here.


License

Apache License, Version 2.0 Copyright (C) Apache Software Foundation


Export Control Notice

This distribution includes cryptographic software. The country in which you currently reside may have restrictions on the import, possession, use, and/or re-export to another country, of encryption software. BEFORE using any encryption software, please check your country's laws, regulations and policies concerning the import, possession, or use, and re-export of encryption software, to see if this is permitted. See http://www.wassenaar.org/ for more information.

The U.S. Government Department of Commerce, Bureau of Industry and Security (BIS), has classified this software as Export Commodity Control Number (ECCN) 5D002.C.1, which includes information security software using or performing cryptographic functions with asymmetric algorithms. The form and manner of this Apache Software Foundation distribution makes it eligible for export under the License Exception ENC Technology Software Unrestricted (TSU) exception (see the BIS Export Administration Regulations, Section 740.13) for both object code and source code.

The following provides more details on the included cryptographic software:

This software uses Apache Commons Crypto (https://commons.apache.org/proper/commons-crypto/) to support authentication, and encryption and decryption of data sent across the network between services.

rocketmq's People

Contributors

aaron-ai avatar allenzhu avatar areyouok avatar chenzlalvin avatar dongeforever avatar drpmma avatar duhenglucky avatar francisoliverlee avatar fuyou001 avatar hzh0425 avatar lizhanhui avatar lizhimins avatar lollipopjin avatar mxsm avatar oliverwqcwrw avatar qqeasonchen avatar rongtongjin avatar shadowyspirits avatar shannonding avatar shroman avatar ther1sing3un avatar vintagewang avatar vongosling avatar wlliqipeng avatar xdkxlk avatar yangjodie avatar yuz10 avatar zhangjidi2016 avatar zhouxinyu avatar zongtanghu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rocketmq's Issues

How to modify broker port?

broker用到三个端口,只有listenPort在配置文件中有配置项,另外两个端口直接引用listenPort+2和listenPort-1,没有办法修改?

MessageStore IndexService performance improve

The issue tracker is ONLY used for bug report and feature request. Keep in mind, please check whether there is an existing same report before your raise a new one.

Alternately (especially if your communication is not a bug report), you can send mail to our mailing lists. We welcome any friendly suggestions, bug fixes, collaboration and other improvements.

Please ensure that your bug report is clear and that it is complete. Otherwise, we may be unable to understand it or to reproduce it, either of which would prevent us from fixing the bug. We strongly recommend the report(bug report or feature request) could include some hints as the following:

FEATURE REQUEST

  1. Please describe the feature you are requesting.
    https://issues.apache.org/jira/projects/ROCKETMQ/issues/ROCKETMQ-267?filter=allopenissues
    When using pagecache for commit log and index service,There is more latency in send request,replace it with randomAccessFile read and write,there is a good improve for send.

  2. Provide any additional detail on your proposed use case for this feature.

Performance improve data, old version is using map pagecache for Index read and write while new version is using randomAccessFile read and write. And [with direct io write] aims to use direct io.

24C96G  Linux rs6f15396.et2sqa 2.6.32-220.23.2.el6.x86_64 #1 SMP Mon Jan 28 17:12:52 CST 2013 x86_64 x86_64 x86_64 GNU/Linux

[old-version]
[<=0ms]:13222248 [0~10ms]:199848 [10~50ms]:5 [50~100ms]:8 [100~200ms]:0 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:10213863 [0~10ms]:195070 [10~50ms]:104 [50~100ms]:88 [100~200ms]:8 [200~500ms]:4 [500ms~1s]:4 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:10025096 [0~10ms]:197015 [10~50ms]:192 [50~100ms]:96 [100~200ms]:0 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:10218673 [0~10ms]:201210 [10~50ms]:151 [50~100ms]:48 [100~200ms]:4 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:10401378 [0~10ms]:198450 [10~50ms]:172 [50~100ms]:72 [100~200ms]:0 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:10498649 [0~10ms]:200961 [10~50ms]:124 [50~100ms]:60 [100~200ms]:0 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:10565265 [0~10ms]:204717 [10~50ms]:24 [50~100ms]:64 [100~200ms]:0 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:10228801 [0~10ms]:194281 [10~50ms]:150 [50~100ms]:142 [100~200ms]:0 [200~500ms]:4 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:9844542 [0~10ms]:196146 [10~50ms]:176 [50~100ms]:124 [100~200ms]:4 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:9586154 [0~10ms]:187757 [10~50ms]:344 [50~100ms]:120 [100~200ms]:0 [200~500ms]:0 [500ms~1s]:4 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0

Time           ---cpu-- ---mem-- ---tcp-- -----traffic---- --sda--- --sdb--- --sdc--- --sdd--- --sde--- --sdf--- --sdg--- --sdh--- --sdi--- --sdj--- --sdk--- --sdl--- --sdm---  ---load-
29/03/18-16:52   0.15     8.93    23.27     1.5K    1.0K     1.73     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      0.05
29/03/18-16:53  11.46    17.92    20.13     1.4K  931.00    49.53     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      2.64
29/03/18-16:54  21.07    17.92    19.80     1.6K    1.2K    99.64     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      5.11
29/03/18-16:55  20.92    17.92     2.34     1.3K    1.4K    99.71     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      6.15
29/03/18-16:56  20.74    17.93     0.43     1.4K    1.4K   100.00     0.05     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      7.03
29/03/18-16:57  21.39    17.94     1.09     1.4K    1.5K    99.80     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      7.26
29/03/18-16:58  20.69    17.95     0.47     1.3K    1.3K    99.83     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      7.27
29/03/18-16:59  20.88    17.97     0.49     1.3K    1.2K    99.80     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      7.17
29/03/18-17:00  20.92    17.98     0.46     1.3K    1.4K    99.48     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      7.12
29/03/18-17:01  20.71    17.98     0.47     1.3K    1.4K    99.03     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      7.00
29/03/18-17:02  17.42    17.98     0.70     1.7K   10.2K    85.76     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      6.41
29/03/18-17:03   3.91    17.98     0.40     1.5K    3.0K    14.05     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      3.16
29/03/18-17:04   4.08    17.98     1.05     1.5K    1.5K    14.09     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      2.22

[device performance]

Device:         rrqm/s   wrqm/s     r/s     w/s    rMB/s    wMB/s avgrq-sz avgqu-sz   await  svctm  %util
sda               0.00 12300.00    0.00  237.00     0.00    48.97   423.16     1.25    5.18   4.19  99.30
sda               0.00 12864.00    0.00  266.00     0.00    51.29   394.89     1.31    4.95   3.73  99.20
sda               0.00 13762.00    0.00  263.00     0.00    54.78   426.59     1.46    5.58   3.78  99.50
sda               0.00 13545.00    0.00  242.00     0.00    53.85   455.74     1.31    5.45   4.09  98.90

[new-version]
[<=0ms]:13974308 [0~10ms]:199764 [10~50ms]:7 [50~100ms]:4 [100~200ms]:0 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:14625146 [0~10ms]:195935 [10~50ms]:0 [50~100ms]:12 [100~200ms]:4 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:13871723 [0~10ms]:186522 [10~50ms]:0 [50~100ms]:120 [100~200ms]:20 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:13451393 [0~10ms]:185738 [10~50ms]:0 [50~100ms]:184 [100~200ms]:4 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:13424794 [0~10ms]:185283 [10~50ms]:0 [50~100ms]:168 [100~200ms]:20 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:13889762 [0~10ms]:190237 [10~50ms]:0 [50~100ms]:92 [100~200ms]:12 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:13953812 [0~10ms]:192415 [10~50ms]:0 [50~100ms]:52 [100~200ms]:16 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:13867076 [0~10ms]:191798 [10~50ms]:0 [50~100ms]:68 [100~200ms]:12 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:12797578 [0~10ms]:177276 [10~50ms]:23 [50~100ms]:265 [100~200ms]:28 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:4323033 [0~10ms]:59581 [10~50ms]:12 [50~100ms]:48 [100~200ms]:28 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0


Time           ---cpu-- ---mem-- ---tcp-- -----traffic---- --sda--- --sdb--- --sdc--- --sdd--- --sde--- --sdf--- --sdg--- --sdh--- --sdi--- --sdj--- --sdk--- --sdl--- --sdm---  ---load-
29/03/18-17:26   0.38    22.26     0.14     3.4K    4.6K     1.60     0.51     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      0.19
29/03/18-17:27   0.37    22.35     0.11   130.1K    8.1K     1.84     0.50     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      0.10
29/03/18-17:28   0.28    22.35     0.14     3.7K    6.5K     1.95     0.61     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      0.04
29/03/18-17:29  14.59    31.34     0.21     4.0K    5.0K    63.04     0.60     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      3.03
29/03/18-17:30  21.45    31.29     0.13     3.9K    5.0K    99.25     0.58     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      5.35
29/03/18-17:31  20.79    31.30     0.13     6.6K    7.7K   100.00     0.61     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      6.48
29/03/18-17:32  20.75    31.31     0.22    38.2K    6.7K    99.67     0.54     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      6.51
29/03/18-17:33  20.70    31.31     0.14     3.9K    5.0K    99.70     0.61     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      6.66
29/03/18-17:34  20.70    31.32     0.14     4.0K    4.9K    99.73     0.59     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      6.47
29/03/18-17:35  20.85    31.34     7.01     4.5K    5.1K    99.61     0.59     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      6.67
29/03/18-17:36  20.92    31.34    20.98    35.9K   10.1K    98.28     3.31     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      6.78
29/03/18-17:37  20.90    31.33    14.69    95.9K   14.1K    98.75     5.21     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      7.10
29/03/18-17:38  20.19    31.31    21.47    79.3K   16.9K    99.55     6.30     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      7.10
29/03/18-17:39   8.10    22.31    21.38    57.9K   16.8K    39.35     7.67     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00     0.00      4.06

[device performance]

Device:         rrqm/s   wrqm/s     r/s     w/s    rMB/s    wMB/s avgrq-sz avgqu-sz   await  svctm  %util
sda               0.00 15422.00    0.00  325.00     0.00    59.09   372.33     1.59    4.98   3.06  99.30
sda               0.00 12941.00    0.00  322.00     0.00    54.23   344.92     1.94    5.89   3.09  99.60
sda               0.00 13319.00    0.00  317.00     0.00    53.27   344.15     2.11    6.51   3.14  99.40

with direct io write [direct io fix](https://github.com/lindzh/rocketmq/blob/index_direct_io/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java)

[Device perf]
Device:         rrqm/s   wrqm/s     r/s     w/s    rMB/s    wMB/s avgrq-sz avgqu-sz   await  svctm  %util
sda               0.00 14543.00 1271.00 1411.00     4.96    54.57    45.46    13.17    5.05   0.37 100.00
sda               0.00 13314.00 1438.00 1579.00     5.62    56.45    42.13    12.05    3.92   0.33  99.90
sda               0.00 10633.00 1207.00 1364.00     4.71    60.58    52.01    13.48    5.63   0.39 100.00
sda               0.00 15780.00 1379.00 1527.00     5.39    57.97    44.65    12.06    4.20   0.34 100.00
sda               0.00 11500.00 1874.00 2030.00     7.32    62.47    36.61    12.73    3.28   0.26 100.00
sda               0.00 12055.00 1421.00 1554.00     5.55    52.41    39.90    11.12    3.75   0.34 100.00

[write perf]
[<=0ms]:13615826 [0~10ms]:198861 [10~50ms]:4 [50~100ms]:12 [100~200ms]:0 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:14935568 [0~10ms]:197179 [10~50ms]:6 [50~100ms]:8 [100~200ms]:4 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:14880945 [0~10ms]:197794 [10~50ms]:0 [50~100ms]:0 [100~200ms]:0 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:14754942 [0~10ms]:197580 [10~50ms]:0 [50~100ms]:4 [100~200ms]:0 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:14633515 [0~10ms]:193760 [10~50ms]:16 [50~100ms]:16 [100~200ms]:24 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:14254780 [0~10ms]:187715 [10~50ms]:12 [50~100ms]:24 [100~200ms]:64 [200~500ms]:8 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:14730283 [0~10ms]:194008 [10~50ms]:12 [50~100ms]:4 [100~200ms]:20 [200~500ms]:4 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
[<=0ms]:6990788 [0~10ms]:93172 [10~50ms]:12 [50~100ms]:20 [100~200ms]:24 [200~500ms]:4 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
  1. Indicate the importance of this issue to you (blocker, must-have, should-have, nice-to-have). Are you currently using any workarounds to address this issue?

should-have

  1. If there are some sub-tasks using -[] for each subtask and create a corresponding issue to map to the sub task:
  • no sub tasks

WARN ReputMessageService - found a illegal magic code 0x

version
rocketmq 3.2.6

master broker crash, then slave broker print following warn

WARN ReputMessageService - found a illegal magic code 0x696e4d73
then restart slave, error disappear

the error appear when commit log damage
i have two questions?

  1. what conditions happen the error?
  2. why error disapper after restart broker?

ConsumeMessageOrderlyService can use ReentrantLock

The ConsumeRequest class code is :
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
......
}
it will block the messageQueue and other messageQueue to process
we can use ReentrantLock.tryLock()

Supports internal and external network separation Settings

Let's start with the background. Now we're doing a message queuing service, and the underlying platform is based on k8s or openstack. I looked at the source code, slaver used brokerIP1:10911 and brokerIP1:10909 and brokerIP1:10909, slaver used brokerIP2:10912 from master synchronization data, while producer, consumer and console used brokerIP1:10911 or brokerIP1:10909. Our master and slaver and console deployment in an internal network, but the business side of the consumer and producer may is not in the network, if I set the broker brikerIP1 to a internal network address, so the external access apparently no, if I set the brokerIP1 to a external network address, need to do some extra work and slaver also console interact with the the master network increased by at least one forward, we plan to do network isolation, so don't want to do so. We want to have a solution that satisfies the business side's production and consumption components using an external network address, while slaver, console USES the Intranet network address, and currently they are not satisfied with brokerIP1. My idea is that the internal network and external network information configuration, and register to namesrv, with your role when you request each component namesrv information, namesrv according to the character select corresponding address back to the requesting party, also can put the judgment logic to request component side, according to their own role to choose the appropriate address, anyway, there are a lot of code changes.what's your plan?aliyun should also have this problem? I sincerely hope to receive your Suggestions!

new consumer may consume all message

@the issue tracker is ONLY used for bug report and feature request.

Any question or RocketMQ proposal please use our mailing lists.

FEATURE REQUEST

  1. Please describe the feature you are requesting.

assume one MQ Broker cluster,the broker commitLog min offset is zero , a new consumer start ,
the consumer will starting consume queue's offset zero messages,it's a problem。

  1. Provide any additional detail on your proposed use case for this feature.

  2. Indicate the importance of this issue to you (blocker, must-have, should-have, nice-to-have). Are you currently using any workarounds to address this issue?

  • must-have
  1. If there are some sub-tasks using -[] for each subtask and create a corresponding issue to map to the sub task:
  • org.apache.rocketmq.broker.processor.ConsumerManageProcessor#queryConsumerOffset method refactor

多tag不能消费

The issue tracker is ONLY used for bug report and feature request.

Any question or RocketMQ proposal please use our mailing lists.

BUG REPORT

  1. Please describe the issue you observed:
  • What did you do (The steps to reproduce)?
    producer对一条message打了多个tag

  • What did you expect to see?
    consumer订阅其中一个tag,期望正常消费

  • What did you see instead?
    producer正常发送,但consumer一直没有消费到message

  1. Please tell us about your environment:
    RocketMQ4.1.0 jdk1.8

咨询一下顺序消息处理问题。

咨询一下顺序消息处理问题。

ConsumeMessageOrderlyService中ConsumeRequest 类的处理方式为:
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
......
}
这样为了获得messageQueue的锁,这个线程会一直等待,同时也会影响到该线程对其他messageQueue处理效率。

是否可以使用下面的处理方式:
messageQueueLock改为存放每个messageQueue对应的ReentrantLock 对象,

ReentrantLock objLock = messageQueueLock.fetchLockObject(this.messageQueue);
if(objLock.tryLock())
{
try{
.....
}finally{
objLock.unlock();
}
}else
{
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(this.messageQueue,this.processQueue,10);
}

如果获取到这个messageQueue的锁,则进行消息消费;
如果没有获取到锁,把该请求重新放到线程池中,该线程不会阻塞,可以继续处理线程池中的其他请求。这样的执行效率会提高很多。

不知道这样的更改是否有其他隐患?

broker register nameserver default force register topicrouter data

The issue tracker is ONLY used for bug report and feature request. Keep in mind, please check whether there is an existing same report before your raise a new one.

Alternately (especially if your communication is not a bug report), you can send mail to our mailing lists. We welcome any friendly suggestions, bug fixes, collaboration and other improvements.

Please ensure that your bug report is clear and that it is complete. Otherwise, we may be unable to understand it or to reproduce it, either of which would prevent us from fixing the bug. We strongly recommend the report(bug report or feature request) could include some hints as the following:

BUG REPORT

  1. Please describe the issue you observed:
    if only broker upgrade, the broker will register failed,
    because the issues

BufferUnderflowException:null

The issue tracker is ONLY used for bug report and feature request. Keep in mind, please check whether there is an existing same report before your raise a new one.

Alternately (especially if your communication is not a bug report), you can send mail to our mailing lists. We welcome any friendly suggestions, bug fixes, collaboration and other improvements.

Please ensure that your bug report is clear and that it is complete. Otherwise, we may be unable to understand it or to reproduce it, either of which would prevent us from fixing the bug. We strongly recommend the report(bug report or feature request) could include some hints as the following:

BUG REPORT

  1. Please describe the issue you observed:
  • What did you do (The steps to reproduce)?

  • What did you expect to see?

  • What did you see instead?

  1. Please tell us about your environment:

  2. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):

FEATURE REQUEST

  1. Please describe the feature you are requesting.

  2. Provide any additional detail on your proposed use case for this feature.

  3. Indicate the importance of this issue to you (blocker, must-have, should-have, nice-to-have). Are you currently using any workarounds to address this issue?

  4. If there are some sub-tasks using -[] for each subtask and create a corresponding issue to map to the sub task:

为什么发送线程池默认只启动一个工作线程

thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default value is 1.

多起一些线程可以提高broker的接收消息的能力。但是,这里默认只启动了一个工作线程,是为了保证生产端消息的有序吗?

Support message statistical log on DLQ for security audit

The issue tracker is ONLY used for bug report and feature request. Keep in mind, please check whether there is an existing same report before your raise a new one.

Alternately (especially if your communication is not a bug report), you can send mail to our mailing lists. We welcome any friendly suggestions, bug fixes, collaboration and other improvements.

Please ensure that your bug report is clear and that it is complete. Otherwise, we may be unable to understand it or to reproduce it, either of which would prevent us from fixing the bug. We strongly recommend the report(bug report or feature request) could include some hints as the following:

FEATURE REQUEST

  1. Please describe the feature you are requesting.
    Add dlq message stat log for security audit which contains topic,consumerGroup and msgId.
  2. Provide any additional detail on your proposed use case for this feature.
    When consumeMessage failed, we need to confirm the accurate scope of influence which depend on the log rather than printmsg.
  3. Indicate the importance of this issue to you (blocker, must-have, should-have, nice-to-have). Are you currently using any workarounds to address this issue?
    should-have
  4. If there are some sub-tasks using -[] for each subtask and create a corresponding issue to map to the sub task:

RocketMq with PinPoint

Our Project has made extensive use of rocketmq. Now we want to use pinponit to monitor link call tracking. How can we put the tracepoint's traceId into the rocketmq message body without modifying the contents of the message? Is there extra extension parameter available?

thx!!!!

The client config consumeThreadMax doesn't work

The issue tracker is ONLY used for bug report and feature request. Keep in mind, please check whether there is an existing same report before your raise a new one.

Alternately (especially if your communication is not a bug report), you can send mail to our mailing lists. We welcome any friendly suggestions, bug fixes, collaboration and other improvements.

Please ensure that your bug report is clear and that it is complete. Otherwise, we may be unable to understand it or to reproduce it, either of which would prevent us from fixing the bug. We strongly recommend the report(bug report or feature request) could include some hints as the following:

BUG REPORT

  1. Please describe the issue you observed:
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

this.consumeExecutor = new ThreadPoolExecutor(
          this.defaultMQPushConsumer.getConsumeThreadMin(),
          this.defaultMQPushConsumer.getConsumeThreadMax(),
          1000 * 60,
          TimeUnit.MILLISECONDS,
          this.consumeRequestQueue,
          new ThreadFactoryImpl("ConsumeMessageThread_"));

The consumeRequestQueue is a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE, which means the thread of executor consumeExecutor won't expand until the queue meets the limit while it's not impossible.

  • What did you do (The steps to reproduce)?

Sets the ConsumeThreadMax a larger value than ConsumeThreadMin, then you will find the thread number of consumeExecutor always stay unchanged.

  • What did you expect to see?

  • What did you see instead?

  1. Please tell us about your environment:

  2. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):

WaitNotifyObject class may be a wake up bug

describe:

WaitNotifyObject is a tool class,it can make threads wait or wake.However, I found that it didn't update the cache when the thread group wakes up. This may be a bug.

public void wakeupAll() {
synchronized (this) {
boolean needNotify = false;
for (Boolean value : this.waitingThreadTable.values()) {
needNotify = needNotify || !value;
value = true;
}
if (needNotify) {
this.notifyAll();
}
}
}

'waiting' uses 'false' tags.'wake uping' uses 'true' tags.Maybe it can be corrected in the following way:

        for (Long key : this.waitingThreadTable.keySet()) {
        	needNotify = needNotify || !waitingThreadTable.get(key);
        	waitingThreadTable.put(key, true);
        }

No route info of this topic

When I was in rocketmq cluster test, appear (the Exception in the thread "is the main" com. Alibaba. Rocketmq. Client. Exception. MQClientException: No route info of this topic, MQ_TOPIC) of the problem

Schedule pull request only when the thread pool has not yet been shutdown

The issue tracker is ONLY used for bug report and feature request. Keep in mind, please check whether there is an existing same report before your raise a new one.

Alternately (especially if your communication is not a bug report), you can send mail to our mailing lists. We welcome any friendly suggestions, bug fixes, collaboration and other improvements.

Please ensure that your bug report is clear and that it is complete. Otherwise, we may be unable to understand it or to reproduce it, either of which would prevent us from fixing the bug. We strongly recommend the report(bug report or feature request) could include some hints as the following:

BUG REPORT

  1. Please describe the issue you observed:
    Rejection of adding tasks to thread pools of PullRequestService.
  • What did you do (The steps to reproduce)?
    Shutdown the push consumers after running for some time.

  • What did you expect to see?
    No unnecessary exceptions raised.

  • What did you see instead?
    Rejection of scheduling tasking

  1. Please tell us about your environment:
    NA
  2. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):

FEATURE REQUEST

  1. Please describe the feature you are requesting.

  2. Provide any additional detail on your proposed use case for this feature.

  3. Indicate the importance of this issue to you (blocker, must-have, should-have, nice-to-have). Are you currently using any workarounds to address this issue?

  4. If there are some sub-tasks using -[] for each subtask and create a corresponding issue to map to the sub task:

根据时间戳设置消息进度的时候不生效?消息回溯

命令如下:
resetOffsetByTime -s now -g groupname -t topicname -n nameserver:port
使用场景:新的消费者组 关注了老的topic,但是不想处理老的消息,希望从设置的时间开始消费
操作步骤:停掉了新的消费者组的consumer,然后执行了上述命令,然后启动消费者,发现还是消费了停掉之前的消息

java.nio.BufferUnderflowException: null

## continously delete topic occure below error:

java.nio.BufferUnderflowException: null
at java.nio.DirectByteBuffer.get(DirectByteBuffer.java:271) ~[na:1.8.0_161]
at java.nio.ByteBuffer.get(ByteBuffer.java:715) ~[na:1.8.0_161]
at org.apache.rocketmq.remoting.protocol.RemotingCommand.decode(RemotingCommand.java:150) ~[classes/:na]
at org.apache.rocketmq.remoting.netty.NettyDecoder.decode(NettyDecoder.java:50) ~[classes/:na]
at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343) [netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:411) [netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248) [netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) [netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) [netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) [netty-all-4.0.42.Final.jar:4.0.42.Final]
at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:385) [classes/:na]
at org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:327) [classes/:na]
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) [netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) [netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:36) [netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:358) [netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:41) [netty-all-4.0.42.Final.jar:4.0.42.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) [netty-all-4.0.42.Final.jar:4.0.42.Final]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
2018-04-12 05:32:01 INFO NettyServerNIOSelector_3_3 - closeChannel: close the connection to remote address[127.0.0.1:64537] result: true

消费端重启,历史消息重复推送问题

在集群模式下,只有一个消费端应用,消费端消费完后重启应用之前消费过的数据又被重复推送过来。如果消费完后等一会儿再重启消费端应用就不会有此问题,求解。版本:4.2.0

在依赖netty的时候,由于rocketmq里面用的是netty-all,建议使用netty-common等

原因:
1、使用了netty-all后,直接把项目绑定在了netty-all的jar依赖
2、项目里面是有了es时,es依赖的netty是netty-common等这样的分散式依赖,按需加载的方式,直接产生了一个集成问题,双方的netty的jar不一致,会引发同一个类,使用的jar不一样,即使你把netty-all去除了,rocketmq同样不能使用netty-common里面的类,比如:java.lang.NoClassDefFoundError: io/netty/util/concurrent/DefaultPromise$1
3、最后只能对rocketmq的common或者client,all进行重新自行打包,这个体验非常不好
4、或者采用其他类似的mq进行使用。

谢谢!

About a reject request bug

A error occured while send a message,the following request will be rejected by server and throw an exception:[REJECTREQUEST]system busy, start flow control for a while.

  • I saw the source code and find the error occured in mappedFile.appendMessage() within commitLog.putMessage()
  • commitLog.putMessage() exit without reset the member variable beginTimeInLock,so DefaultMessageStore.isOSPageCacheBusy() will return true
  • the method NettyRemotingAbstract.processRequestCommand() will revoke SendMessageProcessor.rejectRequest() and return true
  • the method NettyRemotingAbstract.processRequestCommand() return response with "[REJECTREQUEST]system busy, start flow control for a while"

SO, I think it is a bug.The member variable beginTimeInLock should be reset to be 0 at finally block.

DefaultMessageStore#isTheBatchFull关于maxTransferBytesOnMessageInMemory的疑惑

DefaultMessageStore#isTheBatchFull
private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int messageTotal, boolean isInDisk) {

if (0 == bufferTotal || 0 == messageTotal) {
return false;
}

if (maxMsgNums <= messageTotal) {
return true;
}

if (isInDisk) {
if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
return true;
}

if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1) {
return true;
}
} else {
if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
return true;
}

if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1) {
return true;
}
}

return false;
}
首先对参数进行一个说明:
sizePy :当前消息的字节长度
maxMsgNums : 本次拉取消息条数
bufferTotal : 已拉取消息字节总长度,不包含当前消息
messageTotal : 已拉取消息总条数
isInDisk :当前消息是否存在于磁盘中
具体处理逻辑:
1)如果bufferTotal 和messageTotal 都等于0,显然本次拉取任务才刚开始,本批拉取任务未完成,返回false
2)如果maxMsgNums <= messageTotal,返回true,表示已拉取完毕
3)接下来根据是否在磁盘中,会区分对待
如果该消息存在于磁盘而不是内存中:
如果已拉取消息字节数 + 待拉取消息的长度 》 maxTransferBytesOnMessageInDisk (MessageStoreConfig),默认64K,则不继续拉取该消息,返回拉取任务结束。
如果已拉取消息条数 > maxTransferCountOnMessageInDisk (MessageStoreConfig)默认为8,也就是,如果消息存在于磁盘中,一次拉取任务最多拉取8条。
如果该消息存在于内存中,对应的参数为maxTransferBytesOnMessageInMemory 、maxTransferCountOnMessageInMemory。
这里为什么要这么做呢?
我有三个不明白的地方:
1、checkInDiskByCommitOffset 这个方法,本身的合理性?因为就是设置一下accessMessageInMemoryMaxRatio,就设置一个值,,整个RocketMQ
并没有根据这值去置换内存中的消息到磁盘,,整个消息的存放都是基于MappedByteBuffer,内存映射,这机制是如何实现的呢?
2、就算消息存在于磁盘中,在这里设置这个传输阔值意义在哪?反正消息总是需要去拉取的,而且,这专门的拉取线程去做的事情,就算担心传输慢,会引发什么问题?
3、内存还要设置阔值,我也不是很明白,直接从内存中取,这么快,为什么要限制呢?不是求之不得吗?

愿大神们指点,谢谢。

master-slave sync model performance improve

FEATURE REQUEST

  1. the preformance of SYNC_MASTER model can be improved by decoupling the wait of slave broker fetch the message and processor threads

  2. In some highly message reliability situation like orders or finance system, only SYNC_MASTER model is allowed. After I made a performance test for rocketmq,I saw the monitor data showed that the maximum number of messages per minute is 300000, and once I added the producer client number, the avg duration will increase doubled

  3. After I read the broker's source code of processing produce request, I think this process can be improved by decoupling the wait of slave broker fetch the message and processor threads. My understanding is that:

    1. producer client send a produce request to the broker
    2. broker allocate a processor thread process this request, after SendMessageProcessor、DefaultMessageStore、CommitLog's process, the message wrote to the local disk
    3. after writing to local disk, the process thread handleHA: if the broker configure SYNC_MASTER model, the request will be packaged to a GroupCommitRequest, put the GroupCommitRequest into the queue of GroupTransferService, then the process thread begin waiting(max 5s)
    4. GroupTransferService is a Independent thread, this thread will constantly check if any request can be responded(timeout or the message's offset less than or equals to the offset of slave broker).Once a request can be responsed,GroupTransferService thread will notify the wait process thread to response client
    5. client receive the response form broker

if the 3.4 step slave broker fetch message slower slightly due to the delay of network or the delay of slave broker's disk write, the master broker's process thread will cost longer time to wait, then the master broker's throughput will reduce.

  1. problem optimize method:
    1. after 3.2 step, the process should return instand of waiting for the slave broker fetch messgae
    2. the work of waiting for slave broker can be given to the GroupTransferService thread. GroupTransferService data structure change like this:
      old data structure:
      private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
      private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new ArrayList<>();
      private volatile List<CommitLog.GroupCommitRequest> requestsRead = new ArrayList<>();
      new data structure:
      private ConcurrentSkipListMap<Long , CommitLog.GroupCommitRequest> groupCommitRequestConcurrentSkipListMap = new ConcurrentSkipListMap<>()
      The work process also has change like this:
      GroupTransferService iterate the skip list constantly to check if any request can be responsed(timeout or slave has fetched the message). If any request can be responsed, the GroupTransferService thread will response the request.
    3. in 3.3 step, before the request put to the skip list, check the push2SlaveMaxOffset is greated than the need offset of request. If greated, the request will be response immediately.
  2. simple optimize result:
    before optimize:
    300000 messages pre minute, avg cost 10 ms
    image
    after optimize:
    5000000 message pre minute, avg cost 1.5ms
    image

No route info of this topic, TopicTest 部署nameserver和broker正常

The issue tracker is ONLY used for bug report and feature request.

Any question or RocketMQ proposal please use our mailing lists.

BUG REPORT

  1. Please describe the issue you observed:
  • What did you do (The steps to reproduce)?

  • What did you expect to see?

  • What did you see instead?

  1. Please tell us about your environment:

  2. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):

FEATURE REQUEST

  1. Please describe the feature you are requesting.

  2. Provide any additional detail on your proposed use case for this feature.

  3. Indicate the importance of this issue to you (blocker, must-have, should-have, nice-to-have). Are you currently using any workarounds to address this issue?

  4. If there are some sub-tasks using -[] for each subtask and create a corresponding issue to map to the sub task:

There is a concurrent issue in StoreStatsService

The issue tracker is ONLY used for bug report and feature request. Keep in mind, please check whether there is an existing same report before your raise a new one.

Alternately (especially if your communication is not a bug report), you can send mail to our mailing lists. We welcome any friendly suggestions, bug fixes, collaboration and other improvements.

Please ensure that your bug report is clear and that it is complete. Otherwise, we may be unable to understand it or to reproduce it, either of which would prevent us from fixing the bug. We strongly recommend the report(bug report or feature request) could include some hints as the following:

BUG REPORT

  1. Please describe the issue you observed:
  • What did you do (The steps to reproduce)?
    StoreStatsService#getSinglePutMessageTopicSizeTotal and StoreStatsService#getSinglePutMessageTopicTimesTotal are not thread safe which could return a different instance under high concurrent scenario ,normally this won't happen ,because the cost of creating a AtomicLong and put it into a map can be ignored ,but shit happens ,i think this should be fixed
  • What did you expect to see?
    StoreStatsService#getSinglePutMessageTopicSizeTotal and StoreStatsService#getSinglePutMessageTopicTimesTotal will always return the instance
  • What did you see instead?
  1. Please tell us about your environment:
    Max OSX
  2. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):
    public AtomicLong getSinglePutMessageTopicTimesTotal(String topic) {
        AtomicLong rs = putMessageTopicTimesTotal.get(topic);
        if (null == rs) {
            rs = new AtomicLong(0);
            putMessageTopicTimesTotal.put(topic, rs);
        }
        return rs;
    }

delete the unused feature that monitor pull invalid offset

The issue tracker is ONLY used for bug report and feature request.

Any question or RocketMQ proposal please use our mailing lists.

FEATURE REQUEST

  1. Please describe the feature you are requesting.
    delete the unused feature that monitor pull invalid offset
  2. Provide any additional detail on your proposed use case for this feature.
    delete the unused feature that monitor pull invalid offset
  3. Indicate the importance of this issue to you (blocker, must-have, should-have, nice-to-have). Are you currently using any workarounds to address this issue?
    should-have
  4. If there are some sub-tasks using -[] for each subtask and create a corresponding issue to map to the sub task:
    delete some code

How to update topic's router?

有2个broker,分别是173,174组成双master集群,如果173 shutdown,那么新创建的topic只会路由到174的broker,即使173后面重新开启,topic也不会自动更新路由信息,导致问题:
1.topic没有负载均衡
2.如果174 shutdown,那么这个topic将无法生产和消费
所以要怎样更新topic的路由信息?谢谢!

I have 2 broker,173 and 174 use 2master-noslave,If 173 shutdown, The new topic only has 174's router info, Even if 173 restart,The topic's router could't update,so has 2 questions:
1.the topic not load balancing
2.if 174 shutdown,the topic can not produce and consume
So how to update topic's router? Thinks!

cant start broker.sh

version:4.2.0
enviroment:centos7 at vmAware
memory size:2G
can start namesvr,but cant start broker.sh,there is log

OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)

There is insufficient memory for the Java Runtime Environment to continue.

Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.

Fix Broker and NameServer startup

The issue tracker is ONLY used for bug report and feature request. Keep in mind, please check whether there is an existing same report before your raise a new one.

Alternately (especially if your communication is not a bug report), you can send mail to our mailing lists. We welcome any friendly suggestions, bug fixes, collaboration and other improvements.

Please ensure that your bug report is clear and that it is complete. Otherwise, we may be unable to understand it or to reproduce it, either of which would prevent us from fixing the bug. We strongly recommend the report(bug report or feature request) could include some hints as the following:

BUG REPORT

  1. Please describe the issue you observed:
    Broker and NameServer start failed because less of slf4j.
  • What did you do (The steps to reproduce)?
    change branch to develop ,and release as follow:
mvn -Prelease-all -DskipTests clean install -U

install apache rocketmq.and run with sh mqnamesrv &

  • What did you expect to see?
    Start success.
  • What did you see instead?
java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory
	at org.apache.rocketmq.namesrv.NamesrvStartup.main0(NamesrvStartup.java:94)
	at org.apache.rocketmq.namesrv.NamesrvStartup.main(NamesrvStartup.java:47)
Caused by: java.lang.ClassNotFoundException: org.slf4j.LoggerFactory
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 2 more
  1. Please tell us about your environment:
    Mac pro. jdk8

  2. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):

RocketMQ Feature Request: Transaction-Aware Message

Messaging is a major approach to decouple distributed systems. However, when trying to decouple system modules whose boundaries are associated with transactions, it is subtle to determine when to send messages. For example, when module A completes a database transaction, it needs to send a message to notify module B, C, and D to perform subsequent actions. The problem is what the best time point to send the message is in comparison with database transaction. If we sent the message before database transaction, what if the application or database server crashed out. The same counts if the transaction rolls back due to any potential reason. If we sent the message after the transaction commits, there will be chances that application server may suffer crashes before message delivery is made. Thus, to maintain consistency of the whole system, we need to either implement an extra module to periodically check messages of each transaction is sent given messages are designed to send after the transaction commits and supplementally send those missed ones to achieve eventual consistency or make our messaging system transaction aware.

The first option is viable, but it excessively complicates business application systems and brings about noticeable overhead as the extra module has to repeatedly check committed transactions and messages sent are in accordance after a period of time. The latter choice, however, confines complexities to the messaging system itself and once achieved, all application developers may enjoy simplicity and consistency.

To achieve the goal described above, we define a new type of message: transaction aware message. A transaction-aware message is composed of a pair of deliverable messages: prepare-message and commit/rollback message. Prepare messages are supposed to be sent before local transaction commits/roll-back, which are not visible to consumers. Commit/rollback messages are sent after transaction commits/rolls back. Brokers are responsible to inquire application servers, which share the same role with those prepare-message senders, to handle pending prepare messages after the defined period of time. On receiving of the commit message, the transaction aware message is made visible to consumers; Similarly, on receiving of rollback message, pending status of the corresponding prepare message is canceled quietly and consumers are completely unaware in the whole process.

FlushConsumeQueueService 报越界错误

When the broker sets mapedFileSizeConsumeQueue=300000 and the total number of messages sent by the broker reaches this value, the following error occurs in the storeerror.log

2018-03-12 15:16:36 WARN FlushConsumeQueueService - Offset for /opt/rocketmq/store/consumequeue/xxx_xxxx_test/6/00000000000000000000 not matched. Request offset: 300000, index: 1, mappedFileSize: 300000, mappedFiles count: 1 2018-03-12 15:16:36 WARN FlushConsumeQueueService - findMappedFileByOffset failure. java.lang.ArrayIndexOutOfBoundsException: 1 at java.util.concurrent.CopyOnWriteArrayList.get(CopyOnWriteArrayList.java:387) ~[na:1.8.0_121] at java.util.concurrent.CopyOnWriteArrayList.get(CopyOnWriteArrayList.java:396) ~[na:1.8.0_121] at org.apache.rocketmq.store.MappedFileQueue.findMappedFileByOffset(MappedFileQueue.java:478) [rocketmq-store-4.2.0.jar:4.2.0] at org.apache.rocketmq.store.MappedFileQueue.flush(MappedFileQueue.java:427) [rocketmq-store-4.2.0.jar:4.2.0] at org.apache.rocketmq.store.ConsumeQueue.flush(ConsumeQueue.java:324) [rocketmq-store-4.2.0.jar:4.2.0] at org.apache.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.doFlush(DefaultMessageStore.java:1664) [rocketmq-store-4.2.0.jar:4.2.0] at org.apache.rocketmq.store.DefaultMessageStore$FlushConsumeQueueService.run(DefaultMessageStore.java:1684) [rocketmq-store-4.2.0.jar:4.2.0] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]

Spinlock bad performance with high race conditions.

The issue tracker is ONLY used for bug report and feature request.

Any question or RocketMQ proposal please use our mailing lists.

BUG REPORT

  1. Please describe the issue you observed:
  • What did you do (The steps to reproduce)?

  • What did you expect to see?
    Spinlock had well performance with high race conditions.

  • What did you see instead?
    Spinlock had bad performance with high race conditions.

  1. Please tell us about your environment:

  2. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):
    CLH spinlock was a better choice.

FEATURE REQUEST

  1. Please describe the feature you are requesting.

  2. Provide any additional detail on your proposed use case for this feature.

  3. Indicate the importance of this issue to you (blocker, must-have, should-have, nice-to-have). Are you currently using any workarounds to address this issue?

  4. If there are some sub-tasks using -[] for each subtask and create a corresponding issue to map to the sub task:

Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message

BUG REPORT

  1. Please describe the issue you observed:
  • What did you do (The steps to reproduce)?
    当我配置一个BROADCASTING的消费者时,消息被正确的消费,但是通过shell命令查看消息状态(sh mqadmin consumerProgress -n XXX -g XXX)时报错如下:
    image
    我确定消息是被正确的消费了,并且consumer返回了正确状态值(ConsumeConcurrentlyStatus.CONSUME_SUCCESS)。
    但是当我把messageModel改成CLUSTERING时却不会有这样的问题,。
    image
    我想知道这个是不是rocketMQ的bug。

  • What did you expect to see?

  • What did you see instead?
    我现在需要使用rocketMQ广播消费的特性,并且需要对消息的情况做监控,所以需要查看消息的状态。

  1. Please tell us about your environment:
    CentOS release 6.2
    服务端版本: 4.1.0-incubating
    客户端版本:4.1.0-incubating
  2. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.