Giter VIP home page Giter VIP logo

rdkafka-ruby's Introduction

Rdkafka

Build Status Gem Version Join the chat at https://slack.karafka.io

Note

The rdkafka-ruby gem was created and developed by AppSignal. Their impactful contributions have significantly shaped the Ruby Kafka and Karafka ecosystems. For robust monitoring, we highly recommend AppSignal.


The rdkafka gem is a modern Kafka client library for Ruby based on librdkafka. It wraps the production-ready C client using the ffi gem and targets Kafka 1.0+ and Ruby versions under security or active maintenance. We remove a Ruby version from our CI builds when they become EOL.

rdkafka was written because of the need for a reliable Ruby client for Kafka that supports modern Kafka at AppSignal. AppSignal runs it in production on very high-traffic systems.

The most essential pieces of a Kafka client are implemented, and we aim to provide all relevant consumer, producer, and admin APIs.

Table of content

Project Scope

While rdkafka-ruby aims to simplify the use of librdkafka in Ruby applications, it's important to understand the limitations of this library:

  • No Complex Producers/Consumers: This library does not intend to offer complex producers or consumers. The aim is to stick closely to the functionalities provided by librdkafka itself.

  • Focus on librdkafka Capabilities: Features that can be achieved directly in Ruby, without specific needs from librdkafka, are outside the scope of this library.

  • Existing High-Level Functionalities: Certain high-level functionalities like producer metadata cache and simple consumer are already part of the library. Although they fall slightly outside the primary goal, they will remain part of the contract, given their existing usage.

Installation

When installed, this gem downloads and compiles librdkafka. If you have any problems installing the gem, please open an issue.

Usage

Please see the documentation for full details on how to use this gem. Below are two quick examples.

Unless you are seeking specific low-level capabilities, we strongly recommend using Karafka and WaterDrop when working with Kafka. These are higher-level libraries also maintained by us based on rdkafka-ruby.

Consuming Messages

Subscribe to a topic and get messages. Kafka will automatically spread the available partitions over consumers with the same group id.

config = {
  :"bootstrap.servers" => "localhost:9092",
  :"group.id" => "ruby-test"
}
consumer = Rdkafka::Config.new(config).consumer
consumer.subscribe("ruby-test-topic")

consumer.each do |message|
  puts "Message received: #{message}"
end

Producing Messages

Produce several messages, put the delivery handles in an array, and wait for them before exiting. This way the messages will be batched and efficiently sent to Kafka.

config = {:"bootstrap.servers" => "localhost:9092"}
producer = Rdkafka::Config.new(config).producer
delivery_handles = []

100.times do |i|
  puts "Producing message #{i}"
  delivery_handles << producer.produce(
      topic:   "ruby-test-topic",
      payload: "Payload #{i}",
      key:     "Key #{i}"
  )
end

delivery_handles.each(&:wait)

Note that creating a producer consumes some resources that will not be released until it #close is explicitly called, so be sure to call Config#producer only as necessary.

Higher Level Libraries

Currently, there are two actively developed frameworks based on rdkafka-ruby, that provide higher-level API that can be used to work with Kafka messages and one library for publishing messages.

Message Processing Frameworks

  • Karafka - Ruby and Rails efficient Kafka processing framework.
  • Racecar - A simple framework for Kafka consumers in Ruby

Message Publishing Libraries

  • WaterDrop – Standalone Karafka library for producing Kafka messages.

Forking

When working with rdkafka-ruby, it's essential to know that the underlying librdkafka library does not support fork-safe operations, even though it is thread-safe. Forking a process after initializing librdkafka clients can lead to unpredictable behavior due to inherited file descriptors and memory states. This limitation requires careful handling, especially in Ruby applications that rely on forking.

To address this, it's highly recommended to:

  • Never initialize any rdkafka-ruby producers or consumers before forking to avoid state corruption.
  • Before forking, always close any open producers or consumers if you've opened any.
  • Use high-level libraries like WaterDrop and Karafka, which provide abstractions for handling librdkafka's intricacies.

Development

Contributors are encouraged to focus on enhancements that align with the core goal of the library. We appreciate contributions but will likely not accept pull requests for features that:

  • Implement functionalities that can achieved using standard Ruby capabilities without changes to the underlying rdkafka-ruby bindings.
  • Deviate significantly from the primary aim of providing librdkafka bindings with Ruby-friendly interfaces.

A Docker Compose file is included to run Kafka. To run that:

docker-compose up

Run bundle and cd ext && bundle exec rake && cd .. to download and compile librdkafka.

You can then run bundle exec rspec to run the tests. To see rdkafka debug output:

DEBUG_PRODUCER=true bundle exec rspec
DEBUG_CONSUMER=true bundle exec rspec

After running the tests, you can bring the cluster down to start with a clean slate:

docker-compose down

Example

To see everything working, run these in separate tabs:

bundle exec rake consume_messages
bundle exec rake produce_messages

Versions

rdkafka-ruby librdkafka patches
0.19.0 (Unreleased) 2.5.0 (2024-06-10) yes
0.18.0 (2024-09-02) 2.5.0 (2024-06-10) yes
0.17.0 (2024-08-03) 2.4.0 (2024-05-07) no
0.16.0 (2024-06-13) 2.3.0 (2023-10-25) no
0.15.0 (2023-12-03) 2.3.0 (2023-10-25) no
0.14.0 (2023-11-21) 2.2.0 (2023-07-12) no
0.13.0 (2023-07-24) 2.0.2 (2023-01-20) no
0.12.0 (2022-06-17) 1.9.0 (2022-06-16) no
0.11.0 (2021-11-17) 1.8.2 (2021-10-18) no
0.10.0 (2021-09-07) 1.5.0 (2020-07-20) no

rdkafka-ruby's People

Contributors

abicky avatar aboutnisblee avatar bachmanity1 avatar breunigs avatar bruce-szalwinski-he avatar colindkelley avatar dmexe avatar ferrous26 avatar gaffneyc avatar geoff2k avatar gremerritt avatar gvisokinskas avatar jvortmann avatar jychen7 avatar koenrh avatar leonmaia avatar maeve avatar malandrina avatar mensfeld avatar mgrosso avatar mjkillough avatar mollyegibson avatar nijikon avatar nurse avatar piotaixr avatar renovate[bot] avatar robbiepaul avatar thijsc avatar tombruijn avatar zinahia avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rdkafka-ruby's Issues

Get position of a consumer

Support getting the position of a consumer, rd_kafka_position in librdkafka. Result should be supported in lag.

Verify the integrity of the downloaded 'librdkafka' tarball

I think it would be a good thing (at least from a security point of view) to verify the integrity of resources downloaded from external (potentially untrusted) sources. Think, for example, of a CDN compromise.

It would be as easy as adding a sha256 checksum to the file hash:

recipe.files << {
  url: "https://github.com/edenhill/librdkafka/archive/v#{Rdkafka::LIBRDKAFKA_VERSION}.tar.gz",
  sha256: '2b96d7ed71470b0d0027bd9f0b6eb8fb68ed979f8092611c148771eb01abb72c'
}

Unfortunately, 'librdkafka' does not publish "official" checksums (confluentinc/librdkafka#1759), but a hash could be trivially calculated when updating the 'librdkafka' version.

Producing not working + crash for forked processes

A C rdkafka instance does not survive a fork. Producing or polling does not work and rd_kafka_destroy has a failing assertion that crashes the process when it is called via https://github.com/appsignal/rdkafka-ruby/blob/master/lib/rdkafka/config.rb#L149

A fix to not let rd_kafka_destroy crash in this scenario was added in librdkafka: confluentinc/librdkafka@8c67e42

We should move to this release when it's there end of Februari. For the failing produces and polls I see a few options:

  • Raise an exception if our pid changed
  • Recreate the rdkafka instance if our pid changed
  • Add a forked hook you need to call after forking, possibly with a Unicorn integration

Got a log error when close consumer

Environment

OS type: MacOS
OS version: 10.14.5
rdkafka version: 0.7.0
Kafka docker image: confluentinc/cp-kafka:5.0.0
Zookeeper docker image: zookeeper:3.5
Cluster topology: 1 zookeeper, 1 broker

Current behavior

When I close the consumer I got this error:

E, [2019-11-03T18:23:32.298450 #53262] ERROR -- : rdkafka: [thrd:GroupCoordinator]: 1/1 brokers are down

Expected behavior

Don't have error log when I close the consumer

Reproduction

config = Rdkafka::Config.new
config["enable.auto.commit"] = false
config["group.id"] = "test"
config["session.timeout.ms"] = 10_000
config["enable.partition.eof"] = false
config["bootstrap.servers"] = "localhost:9092"

consumer = config.consumer
consumer.subscribe("topic.a", "topic.b")

10.times { consumer.poll(250) }

consumer.close

I have clone the repository and run rspec same error appear, and this error appear in the travis log too. For example this build line 582.

After investigation, I don't understand why I get this error log. Have you any information about this error log? Let's me know if you need more information.

New release with changes in master

Would it be possible to make a new release from master? I'm particularly interested in the changes from #31, which haven't been released yet.

We're currently depending on rdkafka-ruby using a git dependency in our Gem file, but would prefer to fetch it from RubyGems as we start to use it in more services.

Thanks again for doing all the work to maintain this! It's a huge help. :-)

Useless pending delivery waiting upon unsendable message

Given a setup:

  • message.send.max.retries set to 2
  • bootstrap.servers set to invalid:9002

What happens upon producing and waiting is:

E, [2019-08-07T18:01:48.096491 #23972] ERROR -- : rdkafka: [thrd:invalid:9092/bootstrap]: invalid:9092/bootstrap: Failed to resolve 'invalid:9092': Name or service not known (after 21ms in state CONNECT)
E, [2019-08-07T18:01:48.096606 #23972] ERROR -- : rdkafka: [thrd:invalid:9092/bootstrap]: invalid:9092/bootstrap: Failed to resolve 'invalid:9092': Name or service not known (after 21ms in state CONNECT)
E, [2019-08-07T18:01:48.096654 #23972] ERROR -- : rdkafka: [thrd:invalid:9092/bootstrap]: 1/1 brokers are down

After that, rdkafka will not retry to send again, while rdkafka-ruby will wait (here: https://github.com/appsignal/rdkafka-ruby/blob/master/lib/rdkafka/producer/delivery_handle.rb#L49) until the default timeout of 60 seconds.

Even if the invalid url would be resolved by the DNS, the librdkafka won't retry sending it again as it reached the max number of attempts.

code:

config = {:"bootstrap.servers" => "invalid:9092"}
delivery_handles = []
producer = Rdkafka::Config.new(config).producer

producer.produce(
  topic:   "ruby-test-topic",
  payload: "Payload" * 1000,
  key:     "Key"
).wait

There is no point in waiting when we've used max retry attempts.

What I would suggest doing instead: raise a RdKafka error when we've run out of attempts.

Support Time for timestamp attribute on produce

I tried to produce messages with timestamps but timestamp needs to be an integer:

2018-03-07 13:01:19 - TypeError - no implicit conversion of Time into Integer:
        /usr/local/bundle/gems/ffi-1.9.23/lib/ffi/variadic.rb:56:in `invoke'
        /usr/local/bundle/gems/ffi-1.9.23/lib/ffi/variadic.rb:56:in `call'
        (eval):3:in `rd_kafka_producev'

I would expect this library to support Time instances instead of Integers (which should be millisecond unix timestamps I guess?).

About performance

I recently did some tests for performance, but the results were very unsatisfactory.

My config file is as follows:
"batch.num.messages": 1000000,
"queue.buffering.max.ms": 15000,
"request.required.acks": 1,
"queue.buffering.max.messages":10000000,
"queue.buffering.max.kbytes": 4000000,
"socket.keepalive.enable": true,
"socket.send.buffer.bytes": 0,
"socket.blocking.max.ms": 2,
"message.max.bytes": 1000000000

The producer and kafka server are on the same network, so there is no network delay,
The test loads 400,000 pieces of data from a file, each piece of data is about 500 bytes, the average transmission time of the test results is 35s, and only more than 10,000 pieces of data are processed per second, and I found from the trace log print that most of the time is used for data After processing, it will take very little time to send to Kafka after packaging into a MessageSet. Does anyone have experience in this area, please help

Error compiling librdkafka

I tried using this gem but I got this error message when compiling:

Downloading v0.11.5.tar.gz ( 98%) 
Downloading v0.11.5.tar.gz (100%)
Extracting v0.11.5.tar.gz into tmp/x86_64-apple-darwin17.7.0/ports/librdkafka/0.11.5... OK
Running 'configure' for librdkafka 0.11.5... OK
Running 'compile' for librdkafka 0.11.5... ERROR, review '/Users/thedude/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/bundler/gems/rdkafka-ruby-86ee6932b2e2/ext/tmp/x86_64-apple-darwin17.7.0/ports/librdkafka/0.11.5/compile.log' to see
what happened. Last lines are:
========================================================================
      _rd_kafka_sasl_scram_conf_validate in rdkafka_sasl_scram.o
  "_SHA512", referenced from:
      _rd_kafka_sasl_scram_conf_validate in rdkafka_sasl_scram.o
  "_X509_STORE_set_flags", referenced from:
      _rd_kafka_transport_ssl_ctx_init in rdkafka_transport.o
  "_X509_free", referenced from:
      _rd_kafka_transport_ssl_ctx_init in rdkafka_transport.o
      _rd_kafka_transport_io_serve in rdkafka_transport.o
  "_X509_new", referenced from:
      _rd_kafka_transport_ssl_ctx_init in rdkafka_transport.o
  "_X509_verify_cert_error_string", referenced from:
      _rd_kafka_transport_io_serve in rdkafka_transport.o
  "_d2i_PKCS12_fp", referenced from:
      _rd_kafka_transport_ssl_ctx_init in rdkafka_transport.o
  "_sk_pop_free", referenced from:
      _rd_kafka_transport_ssl_ctx_init in rdkafka_transport.o
ld: symbol(s) not found for architecture x86_64
clang: error: linker command failed with exit code 1 (use -v to see invocation)
make[1]: *** [librdkafka.1.dylib] Error 1
make: *** [libs] Error 2
========================================================================
rake aborted!
Failed to complete compile task

Tasks: TOP => default
(See full trace by running task with --trace)

rake failed, exit code 1

Gem files will remain installed in /Users/thedude/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/bundler/gems/rdkafka-ruby-86ee6932b2e2 for inspection.
Results logged to /Users/thedude/.rbenv/versions/2.5.1/lib/ruby/gems/2.5.0/bundler/gems/extensions/x86_64-darwin-17/2.5.0-static/rdkafka-ruby-86ee6932b2e2/gem_make.out

Using latest mac os and xcode. Also I got the same message on the release and latest master.

Handle EOF partition error properly in Consumer#each

When trying this library I got the following error:

/app/vendor/bundle/ruby/2.4.0/gems/rdkafka-0.3.3/lib/rdkafka/consumer.rb:186:in `poll': Broker: No more messages (partition_eof) (Rdkafka::RdkafkaError)
	from /app/vendor/bundle/ruby/2.4.0/gems/rdkafka-0.3.3/lib/rdkafka/consumer.rb:207:in `block in each'
	from /app/vendor/bundle/ruby/2.4.0/gems/rdkafka-0.3.3/lib/rdkafka/consumer.rb:206:in `loop'
	from /app/vendor/bundle/ruby/2.4.0/gems/rdkafka-0.3.3/lib/rdkafka/consumer.rb:206:in `each'

I think the #each method should handle these errors (ignore them, or pass these notifications to the block). Currently the each method itself raises an exception when you reach the end of the partition which is probably not what people expect from this library.

No provider for SASL mechanism GSSAPI

The ruby script is like this:

require 'rdkafka'

config = {
        :"bootstrap.servers" => "192.168.0.238:9092" ,
        :"security.protocol" => "SASL_PLAINTEXT" ,
        :"sasl.mechanisms"   => "GSSAPI" ,
        :"sasl.kerberos.principal" => "kafka/[email protected]" ,
        :"sasl.kerberos.keytab" => "/fluentd/etc/ssl/project_c-gwqnp_project-4nsmt_kafka.keytab" ,
}

rdkafka = Rdkafka::Config.new(config)
producer = rdkafka.producer
producer.produce(topic: "test", payload: "Hello World!")

And when I exec this script, some errors occured as below,

root@fluentd-t9nqb:/fluentd/etc/ssl# ruby ~/test.rb
Traceback (most recent call last):
        2: from /root/test.rb:12:in `<main>'
        1: from /var/lib/gems/2.5.0/gems/rdkafka-0.4.1/lib/rdkafka/config.rb:114:in `producer'
/var/lib/gems/2.5.0/gems/rdkafka-0.4.1/lib/rdkafka/config.rb:164:in `native_kafka': No provider for SASL mechanism GSSAPI: recompile librdkafka with libsasl2 or openssl support. Current build options: PLAIN (Rdkafka::Config::ClientCreationError)

So I need some help for this.

Abort trap: 6 Assertion failed: (r == 0), function rwlock_wrlock, file tinycthread_extra.c

I run into this issue when running specs locally. I checked on 1.2.2-RC1, 1.2.0 and 1.1.0. Any ideas?

balrog:rdkafka-ruby n$ be rspec --profile 20 ./spec/rdkafka/consumer_spec.rb
.......F.......F.....FFFFFFF..F.....F....Assertion failed: (r == 0), function rwlock_wrlock, file tinycthread_extra.c, line 138.
Abort trap: 6
balrog:rdkafka-ruby n$ ruby -v
ruby 2.6.5p114 (2019-10-01 revision 67812) [x86_64-darwin18]

balrog:rdkafka-ruby n$ brew config
HOMEBREW_VERSION: 2.1.15-107-g5da322d
ORIGIN: https://github.com/Homebrew/brew
HEAD: 5da322d6be314e5f8b1729e91e60be743a1e7d8e
Last commit: 31 hours ago
Core tap ORIGIN: https://github.com/Homebrew/homebrew-core
Core tap HEAD: 28d42cb0f3160a0c8e169ce1a3aa30578d901c19
Core tap last commit: 26 hours ago
HOMEBREW_PREFIX: /usr/local
HOMEBREW_AWS_ACCESS_KEY_ID: set
HOMEBREW_AWS_SECRET_ACCESS_KEY: set
HOMEBREW_BINTRAY_KEY: set
HOMEBREW_BINTRAY_USER: nijikon
HOMEBREW_DEVELOPER: 1
HOMEBREW_SANDBOX: 1
HOMEBREW_VERBOSE: 1
CPU: quad-core 64-bit haswell
Homebrew Ruby: 2.6.3 => /usr/local/Homebrew/Library/Homebrew/vendor/portable-ruby/2.6.3/bin/ruby
Clang: 11.0 build 1100
Git: 2.23.0 => /usr/local/bin/git
Curl: 7.54.0 => /usr/bin/curl
Java: 13, 11.0.1, 1.8.0_45
macOS: 10.14.6-x86_64
CLT: 10.3.0.0.1.1562985497
Xcode: 11.2
CLT headers: 10.3.0.0.1.1562985497
XQuartz: 2.7.11 => /opt/X11

Consumer.each does not return after call to consumer.close

Giving rdkafka-ruby a try and I was surprised that consumer.each didn't return after calling consumer.close or consumer.unsubscribe.

config = Rdkafka::Config.new({
  "bootstrap.servers": brokers,
  "security.protocol": "ssl",
  "ssl.ca.location": File.expand_path("../ca.pem", __FILE__),
  "ssl.certificate.location": File.expand_path("../client.pem", __FILE__),
  "ssl.key.location": File.expand_path("../client.key", __FILE__),
  "group.id": prefix + "rdkafka-test",
  "enable.partition.eof": false,
})

consumer = config.consumer
consumer.subscribe("topic")

trap("INT")  { consumer.close }
trap("TERM") { consumer.unsubscribe }

# Never exits
consumer.each do |m|
  puts m.inspect
end

It looks like Consumer#each is an infinite loop and never checks to see if the connection is still open or valid.

See: https://github.com/appsignal/rdkafka-ruby/blob/master/lib/rdkafka/consumer.rb#L244

Ability to pause partitions

When there is an business logic error (e.g. due to unexpected message content) I would like to make progress at least on other partitions. Therefore I propose the following pseudo-code as a draft, implementing the pause method on Rdkafka::Consumer:

loop do
  message = consumer.poll
  process(message) if message
rescue Rdkafka::RdkafkaError => e
  raise unless e.is_partition_eof?
rescue => e
  consumer.pause(message.topic, message.partition)
end

For reference:

How are offset commits handled?

It looks, from reading the code, like Consumer#each does not do any committing -- if I call Consumer#commit with no arguments... what gets committed?

Delivery callback called with an invalid error when producing without ACL permissions

Version: current master

Reproduction:

config = {
    :'bootstrap.servers' => '127.0.0.1:9092',
    :'request.required.acks' => 1,
    :'sasl.mechanisms' => 'PLAIN',
    :'sasl.username' => 'edward',
    :'sasl.password' => 'edward',
    :'security.protocol' => 'sasl_plaintext',
    :'message.timeout.ms' => 100
  }
producer = Rdkafka::Config.new(config).producer
delivery_handles = []

100.times do |i|
  puts "Producing message #{i}"
  delivery_handles << producer.produce(
      topic:   "ruby-test-topic",
      payload: "Payload #{i}",
      key:     "Key #{i}"
  )
end

delivery_handles.each(&:wait)

Rdkafka::RdkafkaError (Local: Message timed out (msg_timed_out))

We never get a proper failure (something like authorization failed). Instead, we /librdkafka wait forever (till the message.timeout.ms).

This behavior can be really confusing as one would expect to get a clear error message indicating a lack of permissions.

ref karafka/waterdrop#108

Support manual offset store

When using autocommit, but not auto offset store you can determine when a message is done. rd_kafka_offset_store in librdkafka.

Random installation failures

I was attempting to install rdkafka-ruby with gem install rdkafka -v '0.3.5'. It failed several times in a row with:

Building native extensions. This could take a while...
ERROR:  Error installing rdkafka:
	ERROR: Failed to build gem native extension.

    current directory: /home/lukas/.rvm/gems/ruby-2.5.0/gems/rdkafka-0.3.5/ext
/usr/share/rvm/rubies/ruby-2.5.0/bin/ruby -rrubygems /home/lukas/.rvm/gems/ruby-2.5.0@global/gems/rake-12.3.0/exe/rake RUBYARCHDIR=/home/lukas/.rvm/gems/ruby-2.5.0/extensions/x86_64-linux/2.5.0/rdkafka-0.3.5 RUBYLIBDIR=/home/lukas/.rvm/gems/ruby-2.5.0/extensions/x86_64-linux/2.5.0/rdkafka-0.3.5
2 retrie(s) left for v0.11.3.tar.gz
1 retrie(s) left for v0.11.3.tar.gz
0 retrie(s) left for v0.11.3.tar.gz
nil can't be coerced into Integer
Extracting v0.11.3.tar.gz into tmp/x86_64-linux-gnu/ports/librdkafka/0.11.3... ERROR, review '/home/lukas/.rvm/gems/ruby-2.5.0/gems/rdkafka-0.3.5/ext/tmp/x86_64-linux-gnu/ports/librdkafka/0.11.3/extract.log' to see what happened. Last lines are:
========================================================================
tar (child): ports/archives/v0.11.3.tar.gz: Cannot open: No such file or directory
tar (child): Error is not recoverable: exiting now
tar: Child returned status 2
tar: Error is not recoverable: exiting now
========================================================================
rake aborted!
Failed to complete extract task
/home/lukas/.rvm/gems/ruby-2.5.0/gems/mini_portile2-2.2.0/lib/mini_portile2/mini_portile.rb:400:in `block in execute'
/home/lukas/.rvm/gems/ruby-2.5.0/gems/mini_portile2-2.2.0/lib/mini_portile2/mini_portile.rb:371:in `chdir'
/home/lukas/.rvm/gems/ruby-2.5.0/gems/mini_portile2-2.2.0/lib/mini_portile2/mini_portile.rb:371:in `execute'
/home/lukas/.rvm/gems/ruby-2.5.0/gems/mini_portile2-2.2.0/lib/mini_portile2/mini_portile.rb:365:in `extract_file'
/home/lukas/.rvm/gems/ruby-2.5.0/gems/mini_portile2-2.2.0/lib/mini_portile2/mini_portile.rb:61:in `block in extract'
/home/lukas/.rvm/gems/ruby-2.5.0/gems/mini_portile2-2.2.0/lib/mini_portile2/mini_portile.rb:60:in `each'
/home/lukas/.rvm/gems/ruby-2.5.0/gems/mini_portile2-2.2.0/lib/mini_portile2/mini_portile.rb:60:in `extract'
/home/lukas/.rvm/gems/ruby-2.5.0/gems/mini_portile2-2.2.0/lib/mini_portile2/mini_portile.rb:150:in `cook'
/home/lukas/.rvm/gems/ruby-2.5.0/gems/rdkafka-0.3.5/ext/Rakefile:10:in `block in <top (required)>'
/home/lukas/.rvm/gems/ruby-2.5.0@global/gems/rake-12.3.0/exe/rake:27:in `<main>'
Tasks: TOP => default
(See full trace by running task with --trace)

rake failed, exit code 1

Gem files will remain installed in /home/lukas/.rvm/gems/ruby-2.5.0/gems/rdkafka-0.3.5 for inspection.
Results logged to /home/lukas/.rvm/gems/ruby-2.5.0/extensions/x86_64-linux/2.5.0/rdkafka-0.3.5/gem_make.out

Out of the blue it ended up working with the same command:

gem install rdkafka -v '0.3.5'
Building native extensions. This could take a while...
Successfully installed rdkafka-0.3.5
Parsing documentation for rdkafka-0.3.5
Installing ri documentation for rdkafka-0.3.5
Done installing documentation for rdkafka after 0 seconds
1 gem installed

It may or may not be worth noting that I'm running elementary OS 0.4.1 Loki (Ubuntu).

dyld: Symbol not found: _timespec_get

I'm getting the following error when trying to open a rails console in a project with rdkafka 0.6.0 and macOS 10.14.6 .

Could it be related to a recent macOS update from 10.14.4/10.14.5?

  Referenced from: /Users/USER/.rvm/gems/ruby-2.6.0@service/gems/rdkafka-0.6.0/lib/rdkafka/../../ext/librdkafka.dylib
  Expected in: /usr/lib/dyld: lazy symbol binding failedlibSystem.B.dylib

: Symbol not found: _timespec_get
  Referenced from: /Users/USERdyld: Symbol not found: _timesperUSER/.rvm/gems/ruby-2.6.0@indc_get
  Referenced from: /Users/exing-service/gems/rdkafka-0.6.0USER/.rvm/gems/ruby-2.6.0/lib/rdkafka/../../ext/librdkafk@service/gems/rdkafka-0a.dylib
  Expected in: /usr/lib/.6.0/lib/rdkafka/../../ext/librdlibSystem.B.dylib

kafka.dylib
  Expected in: /usr/lib/libSystem.B.dylib

dyld: Symbol not found: _timespec_get
  Referenced from: /Users/USER/.rvm/gems/ruby-2.6.0@indexing-service/gems/rdkafka-0.6.0/lib/rdkafka/../../ext/librdkafka.dylib
  Expected in: /usr/lib/libSystem.B.dylib

dyld: lazy symbol binding failedAbort trap: 6```

Rollback the offset if an error is raised during processing

In the following code, the next offset is fetched even though an error is raised during processing of the previous message.

Should we change this behavior and rescue any exceptions to mark it as unprocessed even with auto commits or leave this to the users?

def rdkafka_config(config_overrides={})
  config = {
    :"api.version.request" => false,
    :"broker.version.fallback" => "1.0",
    :"bootstrap.servers" => "localhost:9092",
    :"group.id" => "ruby-test-#{Random.new.rand(0..1_000_000)}",
    :"auto.offset.reset" => "earliest",
    :"enable.partition.eof" => false,
  }
  if ENV["DEBUG_PRODUCER"]
    config[:debug] = "broker,topic,msg"
  elsif ENV["DEBUG_CONSUMER"]
    config[:debug] = "cgrp,topic,fetch"
  end
  config.merge!(config_overrides)
  Rdkafka::Config.new(config)
end

producer = rdkafka_config.producer
consumer = rdkafka_config.consumer

topic = "test1234"
consumer.subscribe(topic)

4.times { |i| producer.produce(topic: topic, payload: (i+1).to_s) }

consumer.each  {|m| p m; raise }

rwlock_wrlock assertion fails on Mac OS

I'm using the following docker-compose.yml

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka1:
    image: wurstmeister/kafka:1.0.1
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.xx.xx.xxx
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "consume_test_topic:3:1,empty_test_topic:3:1,load_test_topic:3:1,produce_test_topic:3:1,rake_test_topic:3:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  kafka2:
    image: wurstmeister/kafka:1.0.1
    ports:
      - "9093:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.xx.xx.xxx
      KAFKA_ADVERTISED_PORT: 9093
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "consume_test_topic:3:1,empty_test_topic:3:1,load_test_topic:3:1,produce_test_topic:3:1,rake_test_topic:3:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

and then running the specs with bundle exec rspec and I'm getting the following error

Assertion failed: (r == 0), function rwlock_wrlock, file tinycthread.c, line 1019.

The issue is happening in the spec/rdkafka/consumer_spec.rb. I'm running MacOS 10.12.6.

Non global logger and statistics callback

Majority of the settings of this lib are per instance based.

Is there any reason why logger and statistics_callback aren't? It is really problematic for systems where you may have several instances of rdkafka running for producing and consuming with different policies around instrumentation.

Release new version

Hi!

Is it possible to release a new version? The last version was released on September according to RubyGems.

We are currently using the master version (because of #86)

Building Native Extensions Fails on Alpine 3.8

Terminal Output:

bash-4.4$ gem install rdkafka --version="0.6.0"
Building native extensions. This could take a while...
/usr/lib/ruby/2.5.0/rubygems/ext/builder.rb:76: warning: Insecure world writable dir /usr/local/bundle in PATH, mode 040777
ERROR:  Error installing rdkafka:
	ERROR: Failed to build gem native extension.

    current directory: /usr/local/bundle/gems/rdkafka-0.6.0/ext
/usr/bin/ruby -rrubygems /usr/local/bundle/gems/rake-12.3.2/exe/rake RUBYARCHDIR=/usr/local/bundle/extensions/x86_64-linux/2.5.0/rdkafka-0.6.0 RUBYLIBDIR=/usr/local/bundle/extensions/x86_64-linux/2.5.0/rdkafka-0.6.0
/usr/local/bundle/gems/mini_portile2-2.4.0/lib/mini_portile2/mini_portile.rb:351: warning: Insecure world writable dir /usr/local/bundle in PATH, mode 040777

Extracting v1.1.0 into tmp/x86_64-alpine-linux-musl/ports/librdkafka/1.1.0... OK
Running 'configure' for librdkafka 1.1.0... OK
Running 'compile' for librdkafka 1.1.0... ERROR, review '/usr/local/bundle/gems/rdkafka-0.6.0/ext/tmp/x86_64-alpine-linux-musl/ports/librdkafka/1.1.0/compile.log' to see what happened. Last lines are:
========================================================================
Creating static library librdkafka++.a
ar rcs librdkafka++.a RdKafka.o ConfImpl.o HandleImpl.o ConsumerImpl.o ProducerImpl.o KafkaConsumerImpl.o TopicImpl.o TopicPartitionImpl.o MessageImpl.o HeadersImpl.o QueueImpl.o MetadataImpl.o
Creating librdkafka++.so symlink
rm -f "librdkafka++.so" && ln -s "librdkafka++.so.1" "librdkafka++.so"
Generating pkg-config file rdkafka++.pc
Generating pkg-config file rdkafka++-static.pc
Checking librdkafka++ integrity
librdkafka++.so.1              OK
librdkafka++.a                 OK
make[1]: Leaving directory '/usr/local/bundle/gems/rdkafka-0.6.0/ext/tmp/x86_64-alpine-linux-musl/ports/librdkafka/1.1.0/librdkafka-1.1.0/src-cpp'
make -C examples
make[1]: Entering directory '/usr/local/bundle/gems/rdkafka-0.6.0/ext/tmp/x86_64-alpine-linux-musl/ports/librdkafka/1.1.0/librdkafka-1.1.0/examples'
gcc -g -O2 -fPIC -Wall -Wsign-compare -Wfloat-equal -Wpointer-arith -Wcast-align  -I../src rdkafka_example.c -o rdkafka_example  \
	../src/librdkafka.a -lm -lssl  -lcrypto  -L/lib -lz  -ldl -lpthread -lrt -lpthread -lrt
../src/librdkafka.a(rdkafka_ssl.o): In function `rd_kafka_transport_ssl_set_endpoint_id':
/usr/local/bundle/gems/rdkafka-0.6.0/ext/tmp/x86_64-alpine-linux-musl/ports/librdkafka/1.1.0/librdkafka-1.1.0/src/rdkafka_ssl.c:429: undefined reference to `SSL_set1_host'
collect2: error: ld returned 1 exit status
make[1]: *** [Makefile:18: rdkafka_example] Error 1
make[1]: Leaving directory '/usr/local/bundle/gems/rdkafka-0.6.0/ext/tmp/x86_64-alpine-linux-musl/ports/librdkafka/1.1.0/librdkafka-1.1.0/examples'
make: *** [Makefile:42: examples] Error 2
========================================================================
rake aborted!
Failed to complete compile task
/usr/local/bundle/gems/mini_portile2-2.4.0/lib/mini_portile2/mini_portile.rb:402:in `block in execute'
/usr/local/bundle/gems/mini_portile2-2.4.0/lib/mini_portile2/mini_portile.rb:373:in `chdir'
/usr/local/bundle/gems/mini_portile2-2.4.0/lib/mini_portile2/mini_portile.rb:373:in `execute'
/usr/local/bundle/gems/mini_portile2-2.4.0/lib/mini_portile2/mini_portile.rb:115:in `compile'
/usr/local/bundle/gems/mini_portile2-2.4.0/lib/mini_portile2/mini_portile.rb:154:in `cook'
/usr/local/bundle/gems/rdkafka-0.6.0/ext/Rakefile:36:in `block in <top (required)>'
/usr/local/bundle/gems/rake-12.3.2/exe/rake:27:in `<main>'
Tasks: TOP => default
(See full trace by running task with --trace)

rake failed, exit code 1

Gem files will remain installed in /usr/local/bundle/gems/rdkafka-0.6.0 for inspection.
Results logged to /usr/local/bundle/extensions/x86_64-linux/2.5.0/rdkafka-0.6.0/gem_make.out

gem_make.out.txt
compile.log.txt

RdkafkaError should have message with more information

For example when querying watermarks the error message is this for example:

Local: Unknown partition (unknown_partition)

This doesn't tell you the topic and offset, which would be very handy to debug. RdkafkaError should have an optional extra message to add this kind of context when appropriate.

Allow establishing TCP connection for producer before the delivery of a first message

When starting to produce messages, there's a certain delay upon establishing the TCP connection with Kafka. It would be really good if this library if possible provide a way to have this connection established prior to sending the first message.

For a high volume system with a background producer, this delay can cause buffer overflows when a background queue is enough to handle the regular traffic but not enough when the initial delay is introduced.

Instrumentation support

To do some deeper introspection on what is going on when receiving or publishing messages it would be useful to have an instrumentation interface compatible to Active Support Instrumentation, default might be just a NullInstrumenter which is just discarding information. To have an idea what might be actually useful to instrument be inspired by ruby-kafka:

  • message producing
  • message delivery
  • message polling
  • join/leave consumer group
  • (re-)assign partitions within consumer group
  • offset changes
  • consumer heartbeat
  • connection updates
  • probably more...

How to enable “gzip” function at AMI Linux2 os?

I try to use fluentd in AMI Linux2 os.
And I need use kafka output plugin for push messages to kafka.
The "gzip" should be good for performance for pushing.
So I use gem to install the rdkaka-ruby.
But the rdkafka don't enable gzip at install/compile stage.
So when I add "gzip" option into fluentd config, it will say "wrong value for compress.type".
How could I enable the gzip feature at "gem install rdkafka" step? Or other way to enable the gzip function?
I do have zlib-devel on the linux, "Package zlib-devel-1.2.8-7.18.amzn1.x86_64 already installed and latest version".

Consume In Batches API

I know that it's currently possible to consume in batches poll in batches issue; however, I believe the API for this could be significantly improved. One of the main draws of this library was the simplicity with which a Kafka consumer could be implemented. Consuming messages individually has become too slow for my current use case. I began the transition to consuming in batches, and have found the work required to be more challenging than I feel is strictly necessary.

I think the users of this library would find an API similar to the code below far superior to the current interface.

consumer.each_in_batches(<batch size>, <timeout>) do |batch|
  batch.each |message|
     puts "Message received: #{message}"
  end   
end
  • batch size -> The number of messages to be consumed before the block of code is run.
  • timeout -> The time in milliseconds waited before a batch of greater than 0 messages and less than batch size messages is passed to the code block.
  • It would be expected that any errors that occur would propagate up and the offset for those messages would not be committed.

Session timeout for long message processing

Hi,

The processing time of one message can be several minutes in my app, should I define a higher session.timeout.ms in order to avoid multiple consumers processing the same partition? I really need to be sure that my messages are processed one after the other, and if there is an error in message processing it is retried until I fix the bug that causes the error.

Right now I'm seeing that sometimes the processing of next message starts before the processing of the previous message has ended. I suspect that it is due to the session timeout.

ruby 2.3.7p456 run crash

ENV:

Debian GNU/Linux 8
ruby 2.3.7p456


/usr/local/bundle/gems/rdkafka-0.3.5/lib/rdkafka/consumer.rb:33: [BUG] Segmentation fault at 0x0000000000000000
ruby 2.3.7p456 (2018-03-28 revision 63024) [x86_64-linux]

-- Control frame information -----------------------------------------------
c:0042 p:---- s:0194 e:000193 CFUNC  :rd_kafka_topic_partition_list_add
c:0041 p:0022 s:0188 e:000187 BLOCK  /usr/local/bundle/gems/rdkafka-0.3.5/lib/rdkafka/consumer.rb:33 [FINISH]
c:0040 p:---- s:0185 e:000184 CFUNC  :each
c:0039 p:0031 s:0182 e:000181 METHOD /usr/local/bundle/gems/rdkafka-0.3.5/lib/rdkafka/consumer.rb:32
c:0038 p:0013 s:0176 e:000175 BLOCK  /app/lib/kafka_client/consumer.rb:31 [FINISH]
c:0037 p:---- s:0173 e:000172 CFUNC  :each
c:0036 p:0010 s:0170 e:000169 METHOD /app/lib/kafka_client/consumer.rb:31
c:0035 p:0071 s:0166 e:000165 METHOD /app/lib/kafka_client/consumer.rb:6
c:0034 p:0030 s:0157 e:000156 METHOD /app/app/daemons/pdm_sync_daemon/task.rb:4
c:0033 p:0124 s:0154 e:000153 METHOD /app/app/daemons/pdm_sync_daemon/control.rb:17
c:0032 p:0017 s:0151 e:000150 TOP    scripts/pdm_sync.rb:1 [FINISH]
c:0031 p:---- s:0149 e:000148 CFUNC  :load
c:0030 p:0123 s:0145 e:000144 METHOD /usr/local/bundle/gems/railties-5.1.4/lib/rails/commands/runner/runner_command.rb:34
c:0029 p:0078 s:0139 e:000138 METHOD /usr/local/bundle/bundler/gems/thor-0b137514427a/lib/thor/command.rb:27
c:0028 p:0058 s:0132 e:000131 METHOD /usr/local/bundle/bundler/gems/thor-0b137514427a/lib/thor/invocation.rb:126
c:0027 p:0303 s:0126 E:000f58 METHOD /usr/local/bundle/bundler/gems/thor-0b137514427a/lib/thor.rb:387
"/tmp/2" 2813L, 221648C

Unable to install rdkafka-0.3.4 - error during native extension

I am unable to install gem rdkafka-0.3.4 --local, failure comes when native extension is being built. The process attempts to download file from github. The host where I am trying to install this can pull file down from internet during install. Is there any work-around where I can provide file needed locally?

> td-agent-enterprise-gem install rdkafka-0.3.4.gem --local                                                                         
Building native extensions.  This could take a while...                                                                                                                                                                                         
ERROR:  Error installing rdkafka-0.3.4.gem:                                                                                                                                                                                                     
        ERROR: Failed to build gem native extension.                                                                                                                                                                                            
                                                                                                                                                                                                                                                
    current directory: /opt/td-agent-enterprise/embedded/lib/ruby/gems/2.4.0/gems/rdkafka-0.3.4/ext                                                                                                                                             
/opt/td-agent-enterprise/embedded/bin/ruby -rubygems /opt/td-agent-enterprise/embedded/lib/ruby/gems/2.4.0/gems/rake-12.0.0/exe/rake RUBYARCHDIR=/opt/td-agent-enterprise/embedded/lib/ruby/gems/2.4.0/extensions/x86_64-linux/2.4.0/rdkafka-0.3
.4 RUBYLIBDIR=/opt/td-agent-enterprise/embedded/lib/ruby/gems/2.4.0/extensions/x86_64-linux/2.4.0/rdkafka-0.3.4                                                                                                                                 
2 retrie(s) left for v0.11.3.tar.gz                                                                                                                                                                                                             
1 retrie(s) left for v0.11.3.tar.gz                                                                                                                                                                                                             
0 retrie(s) left for v0.11.3.tar.gz                                                                                                                                                                                                             
execution expired                                                                                                                                                                                                                               
Extracting v0.11.3.tar.gz into tmp/x86_64-redhat-linux/ports/librdkafka/0.11.3... ERROR, review '/opt/td-agent-enterprise/embedded/lib/ruby/gems/2.4.0/gems/rdkafka-0.3.4/ext/tmp/x86_64-redhat-linux/ports/librdkafka/0.11.3/extract.log' to se
e what happened. Last lines are:                                                                                                                                                                                                                
========================================================================                                                                                                                                                                        
tar (child): ports/archives/v0.11.3.tar.gz: Cannot open: No such file or directory                                                                                                                                                              
tar (child): Error is not recoverable: exiting now                                                                                                                                                                                              
gtar: Child returned status 2                                                                                                                                                                                                                   
gtar: Error is not recoverable: exiting now                                                                                                                                                                                                     
========================================================================                                                                                                                                                                        
rake aborted!                                                                                                                                                                                                                                   
Failed to complete extract task                                                                                                                                                                                                                 
                                                                                                                                                                                                                                                
Tasks: TOP => default                                                                                                                                                                                                                           
(See full trace by running task with --trace)                                                                                                                                                                                                   
                                                                                                                                                                                                                                                
rake failed, exit code 1                                                                                                                                                                                                                        
                                                                                                                                                                                                                                                
Gem files will remain installed in /opt/td-agent-enterprise/embedded/lib/ruby/gems/2.4.0/gems/rdkafka-0.3.4 for inspection.                                                                                                                     
Results logged to /opt/td-agent-enterprise/embedded/lib/ruby/gems/2.4.0/extensions/x86_64-linux/2.4.0/rdkafka-0.3.4/gem_make.out                                                                                                                
root@docker[2d107b36627c]:/apps/sre/pcx/tools/plugins/td-agent-enterprise/embedded/lib/ruby/gems/2.4.0/cache>         

Poll in batches

Handling the offset right is cumbersome. Especially, when I want to consume a few messages, do a batch-insert into the database and don't want an offset change before the batch-insert has succeeded. To do so I've disabled the auto.commit and do the commit manually after the batch got processed successfully (see zendesk/racecar@d00dbc7).

This can probably be implemented using the rd_kafka_consume_batch binding, which allows to auto commit after a callback got processed successfully.

Consumer defaults encourage bad design

While the producer defaults and examples correctly encourage batching out of the gate, the current consumer defaults (and all readily accessible examples) encourage a non-batch design. These defaults and examples should be updated to support the 99% use case, which is batching.

Encouraging handling messages one at a time, even if the polling piece is batched under the hood, in turn encourages secondary side effects like writing to a database, writing to another kafka topic, etc to be performed one at a time rather than in batches, which would be a bad design choice for users of this library.

I propose rethinking the defaults / examples of the consumer to be batch-based, like those of the producer.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.