Giter VIP home page Giter VIP logo

Comments (30)

mrkamel avatar mrkamel commented on September 14, 2024 1

yeaah, that fixed it. thx alot!

from ruby-kafka.

dasch avatar dasch commented on September 14, 2024

I'm not quite sure why, but it seems like sometimes open sockets by a consumer are killed by the broker when the consumer is kicked from its group. Make sure to not sleep for longer than the session timeout – you can also increase this timeout when calling consumer:

consumer = kafka.consumer(group_id: "x", session_timeout: 45)

Note that there are broker-defined limits to the timeout value, so you may get an error if you set it too high or too low.

from ruby-kafka.

dasch avatar dasch commented on September 14, 2024

Actually, are you even using the new consumer code or are you calling kafka.fetch_messages directly? Note that you may have to implement your own retry logic if you want to go down this path. I would recommend using the new consumer if you're running Kafka 0.9.

from ruby-kafka.

mrkamel avatar mrkamel commented on September 14, 2024

For the use case i'm talking about, i'm using fetch_messages directly, because i need to do batch processing such that consumer.each_message isn't appropiate. Having access to the batch (fetch_batch) would be a valuable extension for me as well.

from ruby-kafka.

dasch avatar dasch commented on September 14, 2024

I'm considering adding a batch option to Consumer as well. Under the hood, a set of message batches are fetched and each message successively yielded in each_message. What kind of processing do you need to do?

from ruby-kafka.

mrkamel avatar mrkamel commented on September 14, 2024

... i'm indexing messages/records into ElasticSearch. Not using bulk/batch would kill performance.

And for ES <-> DB constistency, i have a second process which additionally "delays" the messages to re-check (after some time has passed) if everything got correctly indexed/or re-index. That's why i'm sleeping and getting the EOFError.

from ruby-kafka.

dasch avatar dasch commented on September 14, 2024

Hmm. Would it still make sense to use a balanced consumer rather than a single process, for fault tolerance? And are you tracking offsets in ES as well, or are you using idempotent writes and at-least-once processing?

I've also been thinking about how an un-coordinated, single-process consumer would work. Would this API work for you https://gist.github.com/dasch/34f40482e242abbcf896?

from ruby-kafka.

dasch avatar dasch commented on September 14, 2024

By the way, does it work if you rescue ConnectionError and do a retry?

from ruby-kafka.

mrkamel avatar mrkamel commented on September 14, 2024

currently, i'm fetching a batch (fetch_messages) and index this batch. If, and only if, it finishes successfully i'm then storing the current offset within zookeeper. If an exception gets raised, i rescue it, log and notify, then sleep for a few seconds and retry (so: yes, when i retry after the EOFError, ruby-kafka seems to reconnect automatically and everything continues to work again). For fault tolerance, i'm doing leader election via zookeeper myself. Sure, i'd appreciate having this implemented for me instead of doing it on my own, but i need the batch processing.

Moreover, for the "delay message for re-check" use case, i'm not quite sure if kafka's consumer group feature is even applicable, because the delaying-process won't get the same messages the indexing-process gets. Thus, i have to use the low level fetch_messages method within the delaying-process, right? So please don't remove it from the API, i need it :-)

from ruby-kafka.

mrkamel avatar mrkamel commented on September 14, 2024

well, it's applicable by using a separated delay-topic. So, i'd be pretty happy if the new consumer feature gets a batch option, such that i then can remove all my manual offset handling and leader election :-)

from ruby-kafka.

dasch avatar dasch commented on September 14, 2024

Regarding the first part, the indexer: I think you should use the new consumer group feature with the built-in offset commit handling. I want to expand the API to allow for batch processing. The main difficulties are:

  1. A nice API for manually committing offsets (i.e., after the ES transaction returns)
  2. Heartbeats need to be sent with a somewhat high frequency, ~30 seconds.
  3. The API should be simple to use but still expose enough information.

There are basically two ways to go: have a batch-oriented loop that sends heartbeats and commits offsets automatically after yielding a batch, or a more manual poll based API where the user must poll with some frequency in order to stay in the group, and offset commits would perhaps be triggered manually.

The first case could look like this:

consumer.each_batch do |batch|
  # Pseudo-ES-code:
  txn = ES::Transaction.new
  batch.messages.each {|message| txn << message.value }
  txn.commit
end

The second case could look like this:

# Do this in a loop.

# A "batch" is a set of messages from the same topic/partition.
batches = consumer.fetch_batches

batches.each do |batch|
  txn = ES::Transaction.new
  batch.messages.each do |message|
    txn << message.value

    # Marks the message as processed; the next offset commit will include the offset.
    consumer.mark_message_as_processed(message)
  end
  txn.commit
end

consumer.commit_offsets

from ruby-kafka.

dasch avatar dasch commented on September 14, 2024

Somewhat related to #132

from ruby-kafka.

mrkamel avatar mrkamel commented on September 14, 2024

am i right that a heartbeat frequency of 30 seconds means: a crashed consumer (currently leader) gets removed after 30 seconds? if yes, then that seems quite unfrequent to me, since i'm trying to use ES for realtime purposes. 5 seconds seems reasonable to me, which afaik is zookeepers heartbeat frequency and thus corresponds to my current (manual leader election) setup.

from ruby-kafka.

dasch avatar dasch commented on September 14, 2024

The session timeout basically means that a consumer group member must send a heartbeat every n seconds, otherwise it gets kicked from the group, and the group needs to re-balance (which is costly in terms of time.) Setting it low increases the risk of instability; setting it high means that a consumer can be dead for a while before the group notices. In case of a high value there's also the risk of a group rebalance not being noticed by a member, causing it to keep processing a partition that the group has now re-assigned to a different member. By setting the heartbeat interval to a lower value that risk can be mitigated, but there will never by any hard guarantee that two processes won't simultaneously process the same message.

UPDATE: The heartbeat interval is not currently configurable (it's set to half the session timeout) but that could easily be changed.

from ruby-kafka.

dasch avatar dasch commented on September 14, 2024

Take a look at #132 and see if that would be helpful.

from ruby-kafka.

mrkamel avatar mrkamel commented on September 14, 2024

thx. very fast failover is important to me and it's good to have full control over everything for this use case, such that i'm fine if i have to manage the low level details (leader election, offset handling, fetch_messages) myself here, but i'll for sure experiment with it when the batch feature is available. and for less important use cases, i'll of course use the consumer groups feature. Regarding the heartbeats: wouldn't it make sense to push this into a separate ruby thread?

from ruby-kafka.

dasch avatar dasch commented on September 14, 2024

thx. very fast failover is important to me and it's good to have full control over everything for this use case, such that i'm fine if i have to manage the low level details (leader election, offset handling, fetch_messages) myself here, but i'll for sure experiment with it when the batch feature is available. and for less important use cases, i'll of course use the consumer groups feature.

I'm still actively developing the consumer groups API, and I think your use case is somewhat typical, so I want to make this work for you.

Regarding the heartbeats: wouldn't it make sense to push this into a separate ruby thread?

Not really – the heartbeat should indicate that the consumer isn't dead or stuck. If a separate thread just continues to send heartbeats while the consumer thread is deadlocked there would be no point.

from ruby-kafka.

dasch avatar dasch commented on September 14, 2024

Regarding the "checker" consumer – is there no way to consistently write to ES? What kind of consistency guarantees are made, if any?

from ruby-kafka.

mrkamel avatar mrkamel commented on September 14, 2024

huh, long story. i try to keep it short. it's not the consistency of ES i'm worried about (writes to ES should be consistent and ES supports optimistic concurrency control via incrementing versions, etc). it's about achieving DB <-> ES consistency.

class Model
  after_save :queue_for_recheck
  after_destroy :queue_for_recheck
  after_commit :queue_for_immediate_index
end

Within the DB transaction (after_safe, after_destroy) i queue for recheck, because the background indexer can't already see the changes to the DB record since we're in a DB transaction and the process could crash right before after_commit. So, if it crashes, the recheck will correct the DB <-> ES inconsistency.

from ruby-kafka.

dasch avatar dasch commented on September 14, 2024

Hmm... that's essentially a two-phased commit, right? That's very hard to do wrong. Have you considered using something like Maxwell that is based on the SQL database binlog.

Anyway, the consumer group API should cover your use case. I think "write this batch of messages atomically to this remote store" is a common consumer use case. When writes are idempotent, it shouldn't be difficult, and performance should be fine. Combining a batch API with a configurable heartbeat interval should make this a good fit for your use case.

from ruby-kafka.

mrkamel avatar mrkamel commented on September 14, 2024

yes, have considered listening to the binlog, thx for pointing me to maxwell. re-check is still neccessary, though, because deletions are an issue, unless you do "soft deletes" (only mark a record as deleted), because a long GC pause, virtual machine pause or whatever just before an update operation could undo a delete operation: 1. leader A paused right before update operation 2. leader B takes over, leader B updates record, leader B deletes record 3. leader A wakes up again, thinks still leader, updates record, then killed because no longer leader -> inconsistency between DB/ES. So, since the binlog doesn't provide an immense benefit here, i don't want to integrate another dependency to the stack.

Thanks for your support! really appreciate it and looking forward for the consumer group batch support.

from ruby-kafka.

dasch avatar dasch commented on September 14, 2024

Maxwell will include deletions in its topic, as far as I know, so you'll get a serialized stream of operations – as long as you process them in-order, you should be fine.

from ruby-kafka.

mrkamel avatar mrkamel commented on September 14, 2024

in a distributed environment it's not possible to guarantee that you process them in order at all times unless the datastore you're replicating to provides features to support it (ES does not, no datastore i know of does).
that's what i tried to say with:

  1. leader A paused right before update operation 2. leader B takes over, leader B updates record, leader B deletes record 3. leader A wakes up again, thinks still leader, updates record, then killed because no longer leader -> inconsistency between DB/ES

from ruby-kafka.

dasch avatar dasch commented on September 14, 2024

Yes and no – if your datastore has transactions, you can use optimistic locking to ensure no other agent has modified the data set since last time you saw it. Let's say your input Kafka topic is partitioned based on the ES document id, and that ES provides transactions (replace ES with Postgres for a more realistic scenario...). If the Kafka client was able to seek to a specific offset (which is certainly possible to implement) you could store the offset of the last message processed for a given partition within the data store itself. On startup, you'd query the data store; continue from that offset; and start fetching batches. When storing a batch in the store, you'd fail if the partition/offset has been changed since you did the lookup.

This of course only works for data stores that provide consistency and CAS semantics.

from ruby-kafka.

dasch avatar dasch commented on September 14, 2024

A concrete example could be a page view counter: all page views for a given page must of course be written to the same partition for this to work:

consumer = kafka.consumer(group_id: "page_view_counter")
consumer.subscribe("page-views")

kafka.partitions_for("page-views").each do |partition|
  last_processed_offset = db.query("SELECT offset FROM kafka_commits WHERE partition = ?", partition)

  # This doesn't exist yet.
  consumer.seek("page-views", partition: partition, offset: last_processed_offset)
end

consumer.each_batch do |batch|
  partition = batch.partition
  last_offset = db.query("SELECT offset FROM kafka_commits WHERE partition = ?", partition)
  new_offset = batch.messages.last.offset

  db.transaction do |txn|
    messages.each do |message|
      url = JSON.parse(message.value)
      txn.execute("UPDATE page_view_counts SET count = count + 1 WHERE url = ?", url)
    end

    txn.execute("UPDATE kafka_commits SET offset = ? WHERE partition = ? AND offset = ?", new_offset, partition, last_offset)
  end
end

If two consumers process the same partition concurrently, only one would succeed in committing the transaction.

from ruby-kafka.

mrkamel avatar mrkamel commented on September 14, 2024

right, transactions are a means to get this done. unfortunately, ES and other stores i want to replicate to don't support transactions.

from ruby-kafka.

dasch avatar dasch commented on September 14, 2024

Yeah :-/

from ruby-kafka.

mrkamel avatar mrkamel commented on September 14, 2024

wondering ... did you ever encounter something similar? fetching offset 73580 works, 73581 ... 73641 raises EOFError, fetching offset 73642 is working again.

irb(main):011:0> client.fetch_messages(:topic => "my-topic", :offset => 73580, :partition => 0, :max_wait_time => 10).last
=> #<Kafka::FetchedMessage:0x000000077a0b28 ...>

irb(main):003:0> client.fetch_messages(:topic => "my-topic", :offset => 73581, :partition => 0, :max_wait_time => 10).last
Kafka::ConnectionError: Connection error: EOFError

irb(main):004:0> client.fetch_messages(:topic => "my-topic", :offset => 73582, :partition => 0, :max_wait_time => 10).last
Kafka::ConnectionError: Connection error: EOFError

...

irb(main):007:0> client.fetch_messages(:topic => "my-topic", :offset => 73641, :partition => 0, :max_wait_time => 10).first
Kafka::ConnectionError: Connection error: EOFError

irb(main):005:0> client.fetch_messages(:topic => "my-topic", :offset => 73642, :partition => 0, :max_wait_time => 10).last
=> #<Kafka::FetchedMessage:0x000000072a2a40 ...>

the respective kafka logfiles:

kafka kafka 12256 Mär 15 07:54 00000000000000000000.index
kafka kafka 6392949 Mär 15 07:54 00000000000000000000.log
kafka kafka 10485760 Mär 15 07:54 00000000000000073581.index
kafka kafka 1520909 Mär 15 09:02 00000000000000073581.log

Seems to be a kafka bug? Or am i'm doing something wrong?

from ruby-kafka.

dasch avatar dasch commented on September 14, 2024

@mrkamel can you try that with the latest ruby-kafka master? There was an issue regarding partial messages at the end of message sets causing EOFError (#134).

from ruby-kafka.

dasch avatar dasch commented on September 14, 2024

Hmm, I should really cut a new release...

from ruby-kafka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.