Comments (30)
yeaah, that fixed it. thx alot!
from ruby-kafka.
I'm not quite sure why, but it seems like sometimes open sockets by a consumer are killed by the broker when the consumer is kicked from its group. Make sure to not sleep for longer than the session timeout – you can also increase this timeout when calling consumer
:
consumer = kafka.consumer(group_id: "x", session_timeout: 45)
Note that there are broker-defined limits to the timeout value, so you may get an error if you set it too high or too low.
from ruby-kafka.
Actually, are you even using the new consumer code or are you calling kafka.fetch_messages
directly? Note that you may have to implement your own retry logic if you want to go down this path. I would recommend using the new consumer if you're running Kafka 0.9.
from ruby-kafka.
For the use case i'm talking about, i'm using fetch_messages
directly, because i need to do batch processing such that consumer.each_message
isn't appropiate. Having access to the batch (fetch_batch
) would be a valuable extension for me as well.
from ruby-kafka.
I'm considering adding a batch option to Consumer as well. Under the hood, a set of message batches are fetched and each message successively yielded in each_message
. What kind of processing do you need to do?
from ruby-kafka.
... i'm indexing messages/records into ElasticSearch. Not using bulk/batch would kill performance.
And for ES <-> DB constistency, i have a second process which additionally "delays" the messages to re-check (after some time has passed) if everything got correctly indexed/or re-index. That's why i'm sleeping and getting the EOFError.
from ruby-kafka.
Hmm. Would it still make sense to use a balanced consumer rather than a single process, for fault tolerance? And are you tracking offsets in ES as well, or are you using idempotent writes and at-least-once processing?
I've also been thinking about how an un-coordinated, single-process consumer would work. Would this API work for you https://gist.github.com/dasch/34f40482e242abbcf896?
from ruby-kafka.
By the way, does it work if you rescue ConnectionError and do a retry?
from ruby-kafka.
currently, i'm fetching a batch (fetch_messages
) and index this batch. If, and only if, it finishes successfully i'm then storing the current offset within zookeeper. If an exception gets raised, i rescue it, log and notify, then sleep for a few seconds and retry (so: yes, when i retry after the EOFError, ruby-kafka seems to reconnect automatically and everything continues to work again). For fault tolerance, i'm doing leader election via zookeeper myself. Sure, i'd appreciate having this implemented for me instead of doing it on my own, but i need the batch processing.
Moreover, for the "delay message for re-check" use case, i'm not quite sure if kafka's consumer group feature is even applicable, because the delaying-process won't get the same messages the indexing-process gets. Thus, i have to use the low level fetch_messages
method within the delaying-process, right? So please don't remove it from the API, i need it :-)
from ruby-kafka.
well, it's applicable by using a separated delay-topic. So, i'd be pretty happy if the new consumer feature gets a batch option, such that i then can remove all my manual offset handling and leader election :-)
from ruby-kafka.
Regarding the first part, the indexer: I think you should use the new consumer group feature with the built-in offset commit handling. I want to expand the API to allow for batch processing. The main difficulties are:
- A nice API for manually committing offsets (i.e., after the ES transaction returns)
- Heartbeats need to be sent with a somewhat high frequency, ~30 seconds.
- The API should be simple to use but still expose enough information.
There are basically two ways to go: have a batch-oriented loop that sends heartbeats and commits offsets automatically after yielding a batch, or a more manual poll based API where the user must poll with some frequency in order to stay in the group, and offset commits would perhaps be triggered manually.
The first case could look like this:
consumer.each_batch do |batch|
# Pseudo-ES-code:
txn = ES::Transaction.new
batch.messages.each {|message| txn << message.value }
txn.commit
end
The second case could look like this:
# Do this in a loop.
# A "batch" is a set of messages from the same topic/partition.
batches = consumer.fetch_batches
batches.each do |batch|
txn = ES::Transaction.new
batch.messages.each do |message|
txn << message.value
# Marks the message as processed; the next offset commit will include the offset.
consumer.mark_message_as_processed(message)
end
txn.commit
end
consumer.commit_offsets
from ruby-kafka.
Somewhat related to #132
from ruby-kafka.
am i right that a heartbeat frequency of 30 seconds means: a crashed consumer (currently leader) gets removed after 30 seconds? if yes, then that seems quite unfrequent to me, since i'm trying to use ES for realtime purposes. 5 seconds seems reasonable to me, which afaik is zookeepers heartbeat frequency and thus corresponds to my current (manual leader election) setup.
from ruby-kafka.
The session timeout basically means that a consumer group member must send a heartbeat every n seconds, otherwise it gets kicked from the group, and the group needs to re-balance (which is costly in terms of time.) Setting it low increases the risk of instability; setting it high means that a consumer can be dead for a while before the group notices. In case of a high value there's also the risk of a group rebalance not being noticed by a member, causing it to keep processing a partition that the group has now re-assigned to a different member. By setting the heartbeat interval to a lower value that risk can be mitigated, but there will never by any hard guarantee that two processes won't simultaneously process the same message.
UPDATE: The heartbeat interval is not currently configurable (it's set to half the session timeout) but that could easily be changed.
from ruby-kafka.
Take a look at #132 and see if that would be helpful.
from ruby-kafka.
thx. very fast failover is important to me and it's good to have full control over everything for this use case, such that i'm fine if i have to manage the low level details (leader election, offset handling, fetch_messages) myself here, but i'll for sure experiment with it when the batch feature is available. and for less important use cases, i'll of course use the consumer groups feature. Regarding the heartbeats: wouldn't it make sense to push this into a separate ruby thread?
from ruby-kafka.
thx. very fast failover is important to me and it's good to have full control over everything for this use case, such that i'm fine if i have to manage the low level details (leader election, offset handling, fetch_messages) myself here, but i'll for sure experiment with it when the batch feature is available. and for less important use cases, i'll of course use the consumer groups feature.
I'm still actively developing the consumer groups API, and I think your use case is somewhat typical, so I want to make this work for you.
Regarding the heartbeats: wouldn't it make sense to push this into a separate ruby thread?
Not really – the heartbeat should indicate that the consumer isn't dead or stuck. If a separate thread just continues to send heartbeats while the consumer thread is deadlocked there would be no point.
from ruby-kafka.
Regarding the "checker" consumer – is there no way to consistently write to ES? What kind of consistency guarantees are made, if any?
from ruby-kafka.
huh, long story. i try to keep it short. it's not the consistency of ES i'm worried about (writes to ES should be consistent and ES supports optimistic concurrency control via incrementing versions, etc). it's about achieving DB <-> ES consistency.
class Model
after_save :queue_for_recheck
after_destroy :queue_for_recheck
after_commit :queue_for_immediate_index
end
Within the DB transaction (after_safe, after_destroy) i queue for recheck, because the background indexer can't already see the changes to the DB record since we're in a DB transaction and the process could crash right before after_commit. So, if it crashes, the recheck will correct the DB <-> ES inconsistency.
from ruby-kafka.
Hmm... that's essentially a two-phased commit, right? That's very hard to do wrong. Have you considered using something like Maxwell that is based on the SQL database binlog.
Anyway, the consumer group API should cover your use case. I think "write this batch of messages atomically to this remote store" is a common consumer use case. When writes are idempotent, it shouldn't be difficult, and performance should be fine. Combining a batch API with a configurable heartbeat interval should make this a good fit for your use case.
from ruby-kafka.
yes, have considered listening to the binlog, thx for pointing me to maxwell. re-check is still neccessary, though, because deletions are an issue, unless you do "soft deletes" (only mark a record as deleted), because a long GC pause, virtual machine pause or whatever just before an update operation could undo a delete operation: 1. leader A paused right before update operation 2. leader B takes over, leader B updates record, leader B deletes record 3. leader A wakes up again, thinks still leader, updates record, then killed because no longer leader -> inconsistency between DB/ES. So, since the binlog doesn't provide an immense benefit here, i don't want to integrate another dependency to the stack.
Thanks for your support! really appreciate it and looking forward for the consumer group batch support.
from ruby-kafka.
Maxwell will include deletions in its topic, as far as I know, so you'll get a serialized stream of operations – as long as you process them in-order, you should be fine.
from ruby-kafka.
in a distributed environment it's not possible to guarantee that you process them in order at all times unless the datastore you're replicating to provides features to support it (ES does not, no datastore i know of does).
that's what i tried to say with:
- leader A paused right before update operation 2. leader B takes over, leader B updates record, leader B deletes record 3. leader A wakes up again, thinks still leader, updates record, then killed because no longer leader -> inconsistency between DB/ES
from ruby-kafka.
Yes and no – if your datastore has transactions, you can use optimistic locking to ensure no other agent has modified the data set since last time you saw it. Let's say your input Kafka topic is partitioned based on the ES document id, and that ES provides transactions (replace ES with Postgres for a more realistic scenario...). If the Kafka client was able to seek to a specific offset (which is certainly possible to implement) you could store the offset of the last message processed for a given partition within the data store itself. On startup, you'd query the data store; continue from that offset; and start fetching batches. When storing a batch in the store, you'd fail if the partition/offset has been changed since you did the lookup.
This of course only works for data stores that provide consistency and CAS semantics.
from ruby-kafka.
A concrete example could be a page view counter: all page views for a given page must of course be written to the same partition for this to work:
consumer = kafka.consumer(group_id: "page_view_counter")
consumer.subscribe("page-views")
kafka.partitions_for("page-views").each do |partition|
last_processed_offset = db.query("SELECT offset FROM kafka_commits WHERE partition = ?", partition)
# This doesn't exist yet.
consumer.seek("page-views", partition: partition, offset: last_processed_offset)
end
consumer.each_batch do |batch|
partition = batch.partition
last_offset = db.query("SELECT offset FROM kafka_commits WHERE partition = ?", partition)
new_offset = batch.messages.last.offset
db.transaction do |txn|
messages.each do |message|
url = JSON.parse(message.value)
txn.execute("UPDATE page_view_counts SET count = count + 1 WHERE url = ?", url)
end
txn.execute("UPDATE kafka_commits SET offset = ? WHERE partition = ? AND offset = ?", new_offset, partition, last_offset)
end
end
If two consumers process the same partition concurrently, only one would succeed in committing the transaction.
from ruby-kafka.
right, transactions are a means to get this done. unfortunately, ES and other stores i want to replicate to don't support transactions.
from ruby-kafka.
Yeah :-/
from ruby-kafka.
wondering ... did you ever encounter something similar? fetching offset 73580 works, 73581 ... 73641 raises EOFError, fetching offset 73642 is working again.
irb(main):011:0> client.fetch_messages(:topic => "my-topic", :offset => 73580, :partition => 0, :max_wait_time => 10).last
=> #<Kafka::FetchedMessage:0x000000077a0b28 ...>
irb(main):003:0> client.fetch_messages(:topic => "my-topic", :offset => 73581, :partition => 0, :max_wait_time => 10).last
Kafka::ConnectionError: Connection error: EOFError
irb(main):004:0> client.fetch_messages(:topic => "my-topic", :offset => 73582, :partition => 0, :max_wait_time => 10).last
Kafka::ConnectionError: Connection error: EOFError
...
irb(main):007:0> client.fetch_messages(:topic => "my-topic", :offset => 73641, :partition => 0, :max_wait_time => 10).first
Kafka::ConnectionError: Connection error: EOFError
irb(main):005:0> client.fetch_messages(:topic => "my-topic", :offset => 73642, :partition => 0, :max_wait_time => 10).last
=> #<Kafka::FetchedMessage:0x000000072a2a40 ...>
the respective kafka logfiles:
kafka kafka 12256 Mär 15 07:54 00000000000000000000.index
kafka kafka 6392949 Mär 15 07:54 00000000000000000000.log
kafka kafka 10485760 Mär 15 07:54 00000000000000073581.index
kafka kafka 1520909 Mär 15 09:02 00000000000000073581.log
Seems to be a kafka bug? Or am i'm doing something wrong?
from ruby-kafka.
@mrkamel can you try that with the latest ruby-kafka master? There was an issue regarding partial messages at the end of message sets causing EOFError (#134).
from ruby-kafka.
Hmm, I should really cut a new release...
from ruby-kafka.
Related Issues (20)
- Support Reporting Metrics to OpenTelemetry HOT 5
- Update latest release on repo landing page. HOT 1
- Offset is getting reset to earliest after getting an "invalid offset" error HOT 1
- Support for KAFKA-4148 / KIP-79 / search offsets by timestamp in Consumer HOT 1
- EOFError when connecting with SCRAM authentication HOT 2
- Extra space in `[Producer ]` prefix of async producer logs HOT 1
- Join request timeout doesn't consider rebalance_timeout HOT 2
- MSK IAM: add support for AssumeRole auto refresh HOT 12
- Kafka emit to Datadog is broken under ruby 3 HOT 2
- Create new configuration to be able to disable '[Producer] Sending' logs. HOT 1
- Why "consumer.each_message..." not looping indefinitely HOT 1
- OutOfOrderSequenceNumberError for async idempotent producer HOT 1
- Infinite loop in Cluster.get_coordinator if CoordinatorNotAvailable occurs HOT 1
- parameter "ssl_verify_hostname" is not used HOT 1
- Unexpected failure during shutdown after disconnect from a restarting Broker HOT 1
- Help: Kafka topics are deleted automatically after sudo systemctl stop kafka is done. HOT 2
- Long term plan for ruby-kafka HOT 1
- https://ruby-kafka-slack.herokuapp.com is not working. HOT 4
- Ruby Kafka Issues with MessageBufferOverFlow HOT 2
- Add support for using ssl_ca_cert as string HOT 7
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from ruby-kafka.