Comments (4)
Partition assignment in a consumer group
When a consumer wants to join a group it sends a JoinGroup
request to the group coordinator. The first consumer to join the group becomes the group leader. The leader receives a list of all the active consumers in the group from the group coordinator. The leader is responsible for assigning a subset of partitions to each consumer and uses an implementation of the PartitionAssigner
interface (or language/framework appropriate implementation) to decide which partitions should be handled by each consumer.
After deciding on the partition assignment the group leader sends the list of assignments back to the Group Coordinator which then sends the information to all consumers. Each consumer only sees its own assignment, the leader is the only client process that has the full list of consumers in the group.
This partition assignment process happens whenever their is a rebalance event.
The way consumers maintain their membership in a consumer group and their ownership of the partitions assigned to them is by sending heartbeats to a Kafka Broker that is designated as the Group Coordinator.
The Group Coordinator broker can be different for different consumer groups.
If a consumer stops sending heartbeats for long enough its session will time out and the group coordinator will consider it dead and trigger a rebalance.
from kaffe.
Consumer Group behavior in Kaffe
I spun up a simple Kaffe project to see how it handles consumer groups in nodes that are completely isolated from each other except for the connection to Kafka. Things look good!
I changed my local whitelist topic to have 10 partitions. I then joined a single kafkatest node to the kafkatest1
consumer group. I then joined a second node to the same consumer group. I then killed the original node (the group leader) to verify that the second node took over the partitions.
single consumer node started
$ iex -S mix
Erlang/OTP 18 [erts-7.2.1] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false]
Compiling 1 file (.ex)
Generated kafkatest app
Interactive Elixir (1.3.4) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>
12:23:16.290 [info] group coordinator (groupId=kafkatest1,memberId=,generation=0,pid=#PID<0.171.0>):
failed to join group
reason::GroupCoordinatorNotAvailable
12:23:16.290 [info] group coordinator (groupId=kafkatest1,memberId=,generation=0,pid=#PID<0.171.0>):
re-joining group, reason::GroupCoordinatorNotAvailable
12:23:16.329 [info] group coordinator (groupId=kafkatest1,memberId=,generation=0,pid=#PID<0.171.0>):
failed to join group
reason::GroupCoordinatorNotAvailable
12:23:17.334 [info] group coordinator (groupId=kafkatest1,memberId=,generation=0,pid=#PID<0.171.0>):
re-joining group, reason::GroupCoordinatorNotAvailable
12:23:17.343 [info] group coordinator (groupId=kafkatest1,memberId=,generation=0,pid=#PID<0.171.0>):
connected to group coordinator kafka:9092
12:23:17.363 [info] group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.171.0>-9a3f7132-aa86-4925-836b-2bd31566492a,generation=1,pid=#PID<0.171.0>):
elected=true
12:23:17.418 [info] group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.171.0>-9a3f7132-aa86-4925-836b-2bd31566492a,generation=1,pid=#PID<0.171.0>):
assignments received:
whitelist:
partition=0 begin_offset=undefined
partition=1 begin_offset=undefined
partition=2 begin_offset=undefined
partition=3 begin_offset=undefined
partition=4 begin_offset=undefined
partition=5 begin_offset=undefined
partition=6 begin_offset=undefined
partition=7 begin_offset=undefined
partition=8 begin_offset=undefined
partition=9 begin_offset=undefined
12:23:17.423 [info] client :kafkatest1 connected to kafka:9092
second consumer node started
12:24:10.323 [info] group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.171.0>-9a3f7132-aa86-4925-836b-2bd31566492a,generation=1,pid=#PID<0.171.0>):
re-joining group, reason::RebalanceInProgress
12:24:10.327 [info] group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.171.0>-9a3f7132-aa86-4925-836b-2bd31566492a,generation=2,pid=#PID<0.171.0>):
elected=true
12:24:10.333 [info] group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.171.0>-9a3f7132-aa86-4925-836b-2bd31566492a,generation=2,pid=#PID<0.171.0>):
assignments received:
whitelist:
partition=0 begin_offset=undefined
partition=2 begin_offset=undefined
partition=4 begin_offset=undefined
partition=6 begin_offset=undefined
partition=8 begin_offset=undefined
$ iex -S mix
Erlang/OTP 18 [erts-7.2.1] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false]
Interactive Elixir (1.3.4) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>
12:24:09.544 [info] group coordinator (groupId=kafkatest1,memberId=,generation=0,pid=#PID<0.142.0>):
connected to group coordinator kafka:9092
12:24:10.328 [info] group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.142.0>-c306fad6-2bbf-4081-b80c-d4e3539c2b6d,generation=2,pid=#PID<0.142.0>):
elected=false
12:24:10.330 [info] group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.142.0>-c306fad6-2bbf-4081-b80c-d4e3539c2b6d,generation=2,pid=#PID<0.142.0>):
assignments received:
whitelist:
partition=1 begin_offset=undefined
partition=3 begin_offset=undefined
partition=5 begin_offset=undefined
partition=7 begin_offset=undefined
partition=9 begin_offset=undefined
12:24:10.340 [info] client :kafkatest1 connected to kafka:9092
group coordinator quit
12:25:25.716 [info] group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.142.0>-c306fad6-2bbf-4081-b80c-d4e3539c2b6d,generation=2,pid=#PID<0.142.0>):
re-joining group, reason::RebalanceInProgress
12:25:25.719 [info] group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.142.0>-c306fad6-2bbf-4081-b80c-d4e3539c2b6d,generation=3,pid=#PID<0.142.0>):
elected=true
12:25:25.722 [info] group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.142.0>-c306fad6-2bbf-4081-b80c-d4e3539c2b6d,generation=3,pid=#PID<0.142.0>):
assignments received:
whitelist:
partition=0 begin_offset=undefined
partition=1 begin_offset=undefined
partition=2 begin_offset=undefined
partition=3 begin_offset=undefined
partition=4 begin_offset=undefined
partition=5 begin_offset=undefined
partition=6 begin_offset=undefined
partition=7 begin_offset=undefined
partition=8 begin_offset=undefined
partition=9 begin_offset=undefined
It appears that group coordination is indeed correctly happening through Kafka. 👍
from kaffe.
Good background info (not that we doubted Kafka's ability to coordinate consumers)!
from kaffe.
I had some doubts. 😄
from kaffe.
Related Issues (20)
- Defining multiple handlers HOT 1
- worker_per_topic_partition with multiple topics HOT 1
- Examples not compatible with Elixir 1.10 or 1.11 HOT 2
- extract_der is giving error with SSL HOT 2
- Undefined function exponential_backoff HOT 10
- Offset doesn't get updated between runs and runs crash with OOM errors HOT 4
- async ack - lots of duplicate messages until I restart the application HOT 2
- Kaffe.Producer.produce_sync raises on timeout
- How to set kafka headers when publishing message HOT 1
- Invalid call to raise/reraise on brod/kpro error
- Add support for SCRAM mechanism in SASL authentication. HOT 1
- Module to help write ExUnit tests
- It's impossible to create 2 separate consumers for different topics
- Running mix with kaffe deps fails to download pc package from hex
- Wrong place for configuration
- Repeated rebalance cycle with kafka broker 2.3.0 HOT 16
- kaffe cannot recover from unreachable Kafka HOT 18
- Missing documentation HOT 8
- Connecting to a TLS-based Kafka instance under AWS MSK? HOT 18
- Receives notification when rebalance in progress/assignments revoked HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kaffe.