Giter VIP home page Giter VIP logo

librdkafka's Introduction

librdkafka - the Apache Kafka C/C++ client library

Copyright (c) 2012-2022, Magnus Edenhill. 2023 Confluent Inc..

https://github.com/confluentinc/librdkafka

librdkafka is a C library implementation of the Apache Kafka protocol, providing Producer, Consumer and Admin clients. It was designed with message delivery reliability and high performance in mind, current figures exceed 1 million msgs/second for the producer and 3 million msgs/second for the consumer.

librdkafka is licensed under the 2-clause BSD license.

KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for use by librdkafka. librdkafka has no affiliation with and is not endorsed by The Apache Software Foundation.

Features

  • Full Exactly-Once-Semantics (EOS) support
  • High-level producer, including Idempotent and Transactional producers
  • High-level balanced KafkaConsumer (requires broker >= 0.9)
  • Simple (legacy) consumer
  • Admin client
  • Compression: snappy, gzip, lz4, zstd
  • SSL support
  • SASL (GSSAPI/Kerberos/SSPI, PLAIN, SCRAM, OAUTHBEARER) support
  • Full list of supported KIPs
  • Broker version support: >=0.8 (see Broker version compatibility)
  • Guaranteed API stability for C & C++ APIs (ABI safety guaranteed for C)
  • Statistics metrics
  • Debian package: librdkafka1 and librdkafka-dev in Debian and Ubuntu
  • RPM package: librdkafka and librdkafka-devel
  • Gentoo package: dev-libs/librdkafka
  • Portable: runs on Linux, MacOS X, Windows, Solaris, FreeBSD, AIX, ...

Documentation

NOTE: The master branch is actively developed, use latest release for production use.

Installation

Installing prebuilt packages

On Mac OSX, install librdkafka with homebrew:

$ brew install librdkafka

On Debian and Ubuntu, install librdkafka from the Confluent APT repositories, see instructions here and then install librdkafka:

$ apt install librdkafka-dev

On RedHat, CentOS, Fedora, install librdkafka from the Confluent YUM repositories, instructions here and then install librdkafka:

$ yum install librdkafka-devel

On Windows, reference librdkafka.redist NuGet package in your Visual Studio project.

For other platforms, follow the source building instructions below.

Installing librdkafka using vcpkg

You can download and install librdkafka using the vcpkg dependency manager:

# Install vcpkg if not already installed
$ git clone https://github.com/Microsoft/vcpkg.git
$ cd vcpkg
$ ./bootstrap-vcpkg.sh
$ ./vcpkg integrate install

# Install librdkafka
$ vcpkg install librdkafka

The librdkafka package in vcpkg is kept up to date by Microsoft team members and community contributors. If the version is out of date, please create an issue or pull request on the vcpkg repository.

Build from source

Requirements

The GNU toolchain
GNU make
pthreads
zlib-dev (optional, for gzip compression support)
libssl-dev (optional, for SSL and SASL SCRAM support)
libsasl2-dev (optional, for SASL GSSAPI support)
libzstd-dev (optional, for ZStd compression support)
libcurl-dev (optional, for SASL OAUTHBEARER OIDC support)

NOTE: Static linking of ZStd (requires zstd >= 1.2.1) in the producer enables encoding the original size in the compression frame header, which will speed up the consumer. Use STATIC_LIB_libzstd=/path/to/libzstd.a ./configure --enable-static to enable static ZStd linking. MacOSX example: STATIC_LIB_libzstd=$(brew ls -v zstd | grep libzstd.a$) ./configure --enable-static

Building

  ./configure
  # Or, to automatically install dependencies using the system's package manager:
  # ./configure --install-deps
  # Or, build dependencies from source:
  # ./configure --install-deps --source-deps-only

  make
  sudo make install

NOTE: See README.win32 for instructions how to build on Windows with Microsoft Visual Studio.

NOTE: See CMake instructions for experimental CMake build (unsupported).

Usage in code

See getting Started with Apache Kafka and C/C++ for a basic tutorial.

  1. Refer to the examples directory for code using:

    • Producers: basic producers, idempotent producers, transactional producers.
    • Consumers: basic consumers, reading batches of messages.
    • Performance and latency testing tools.
  2. Refer to the examples GitHub repo for code connecting to a cloud streaming data service based on Apache Kafka

  3. Link your program with -lrdkafka (C) or -lrdkafka++ (C++).

Commercial support

Commercial support is available from Confluent Inc

Community support

Only the latest official release is supported for community members.

File bug reports and feature requests using GitHub Issues.

Questions and discussions are welcome on the Discussions forum, and on the Confluent Community slack #clients channel.

Language bindings

See Powered by librdkafka for an incomplete list of librdkafka users.

librdkafka's People

Contributors

ah- avatar anchitj avatar andoma avatar ankon avatar asharma339 avatar benesch avatar binary85 avatar cchristous avatar crazyfrog007 avatar edenhill avatar emasab avatar eugpermar avatar gridaphobe avatar jliunyu avatar kenneth-jia avatar kodekarl avatar mattclarke avatar mhowlett avatar milindl avatar myd7349 avatar oxymoron79 avatar paravoid avatar pranavrth avatar quuxplusone avatar rondagostino avatar rthalley avatar tbsaunde avatar thedrow avatar vincentbernat avatar whissi avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

librdkafka's Issues

librdkafka 0.8 can not produce messages when the leader of the topic partition failed

Hi, Magnus.
I find that librdkafka 0.8 can not produce messages to the kafka brokers when the leader of the topic partition failed. After I check the new leader is elected, the problem is not yet solved.
I use the rdkafka_example.c in the example folder and create a topic with 3 replicas and 1 partition.
After I restart the producer (rdkafka_example.c, I use it as a producer), the problem is solved.
I am not sure whether it is a bug for librdkafka 0.8.
Thanks.

Broke Kafka replication when requesting ack != -1 and a message larger than 1000000 bytes

I'm not sure if this is a librdkafka issue or a kafka general one. I did the following steps:

  • Changed Kafka broker to have max message size of 4000000
  • Ran the rdkafka_performance binary and specified a size of 3000 bytes and an ack of 1. Ran successfully
  • Ran the rdkafka_performance binary and specified a size of 2000000 bytes and an ack of 1. Ran successfully
  • Ran the rdkafka_performance binary and specified a size of 2000000 and an ack of -1. The binary reports a failure and times out after 5000 ms.
    Any attempt to run the rdkafka_performance binary (regardless of msg size) using an ack of anything other than 1 fails thereafter with a timeout of 5000 ms. The replication appears to be broken hereafter.

Was able to reproduce this at will, using a new topic each time. I wasn't able to reproduce this when using a cluster max message size of 1000000 (the default). So I suspect there's some sort of issue when the msg is > 1000000 but less than the max msg size configured.

Was wondering if you could reproduce this and if so had any thoughts?

kafka v0.6 support

Hi edenhill.

I'm trying to send messages between a producer and a consumer version of rdkafka_example through a kafka server, and no messages are given to the consumer. Kafka's version is 0.6, and I think that the problem right there.

Is there a way to send messages using this version of kafka using your library? If not, can I download a previous version or another library in C to do so? The language is very important because I want integrate kafka in a existing project.

Thanks and regards

crashes on too large messages

I want to use librdkafa in my project, but when I do test, it cause my app crash.
my situation is, in main thread create neccessary object, and multithread will call rd_kafka_produce to send the message to kafka server.

when the application runnig sometimes, it will crashed. the call stack like following:

Core was generated by `nelo2-thrift                                          '.
Program terminated with signal 11, Segmentation fault.
#0  0x00000000005ec47d in rd_kafka_broker_produce_toppar (rkb=0x7f8124000ef0) at rdkafka_broker.c:1944
1944            rkbuf->rkbuf_ts_timeout =
Missing separate debuginfos, use: debuginfo-install glibc-2.12-1.80.el6_3.3.x86_64 libgcc-4.4.6-4.el6.x86_64 libgcrypt-1.4.5-9.el6_2.2.x86_64 libgpg-error-1.7-4.el6.x86_64 libstdc++-4.4.6-4.el6.x86_64 ncurses-libs-5.7-3.20090208.el6.x86_64 zlib-1.2.3-27.el6.x86_64
(gdb) bt
#0  0x00000000005ec47d in rd_kafka_broker_produce_toppar (rkb=0x7f8124000ef0) at rdkafka_broker.c:1944
#1  rd_kafka_broker_producer_serve (rkb=0x7f8124000ef0) at rdkafka_broker.c:2105
#2  0x00000000005ed1d8 in rd_kafka_broker_thread_main (arg=0x7f8124000ef0) at rdkafka_broker.c:3002
#3  0x0000003e86e07851 in start_thread () from /lib64/libpthread.so.0
#4  0x0000003e86ae76dd in clone () from /lib64/libc.so.6

Librdkafka does not compile on OSX 10.8.5

Hi
I try to build the library in my Mac, but it failed. My mac info is:

Configured with: --prefix=/Applications/Xcode.app/Contents/Developer/usr --with-gxx-include-dir=/usr/include/c++/4.2.1
Apple LLVM version 5.0 (clang-500.2.79) (based on LLVM 3.3svn)
Target: x86_64-apple-darwin12.5.0
Thread model: posix

the error message is:

cc -MD -MP -O2 -Wall -Werror -Wfloat-equal -Wpointer-arith -fPIC -I. -g -DSG -c rdkafka.c
rdkafka.c:1064:29: error: use of GNU old-style field designator extension [-Werror,-Wgnu-designator]
        struct consume_ctx ctx = { consume_cb: consume_cb, opaque: opaque };
                                   ^~~~~~~~~~~
                                   .consume_cb = 
rdkafka.c:1064:53: error: use of GNU old-style field designator extension [-Werror,-Wgnu-designator]
        struct consume_ctx ctx = { consume_cb: consume_cb, opaque: opaque };
                                                           ^~~~~~~
                                                           .opaque = 
2 errors generated.
make: *** [rdkafka.o] Error 1

Cannot send messages when the topic is set long time after the broker

The message producer only works if you set the topic short time after the broker addittion. If you just put a sleep(1) between rd_brokers_add and rd_kafka_topic_new, rd_kafka_example cannot send messages anymore.

Here is the debug output: Without the sleep:
%7|1376053795.949|PARTCNT|0x7ff2168bf700|default#producer-0| Ignore unknown topic rb_event
%7|1376053795.949|METADATA|0x7ff2168bf700|default#producer-0| Topic #2/3: rb_event partition 0 Leader 0
%7|1376053795.949|TOPICUPD|0x7ff2168bf700|default#producer-0| Ignoring topic rb_event: not found locally
%7|1376053795.949|METADATA|0x7ff2168bf700|default#producer-0| Topic #2/3: rb_event partition 1 Leader 0
%7|1376053795.949|TOPICUPD|0x7ff2168bf700|default#producer-0| Ignoring topic rb_event: not found locally
%7|1376053795.949|METADATA|0x7ff2168bf700|default#producer-0| Topic #2/3: rb_event partition 2 Leader 0
%7|1376053795.949|TOPICUPD|0x7ff2168bf700|default#producer-0| Ignoring topic rb_event: not found locally
%7|1376053795.949|PARTCNT|0x7ff2168bf700|default#producer-0| Ignore unknown topic rb_event
%7|1376053799.159|TOPIC|0x7ff2184d1020|default#producer-0| new topic: rb_event

and when I add the sleep:
%7|1376053831.161|PARTCNT|0x7f762d015700|default#producer-0| No change in partition count for topic rb_event
%7|1376053831.161|METADATA|0x7f762d015700|default#producer-0| Topic #2/3: rb_event partition 0 Leader 0
%7|1376053831.161|TOPICUPD|0x7f762d015700|default#producer-0| No leader change for topic rb_event [0] with leader 0
%7|1376053831.161|METADATA|0x7f762d015700|default#producer-0| Topic #2/3: rb_event partition 1 Leader 0
%7|1376053831.161|TOPICUPD|0x7f762d015700|default#producer-0| No leader change for topic rb_event [1] with leader 0
%7|1376053831.161|METADATA|0x7f762d015700|default#producer-0| Topic #2/3: rb_event partition 2 Leader 0
%7|1376053831.161|TOPICUPD|0x7f762d015700|default#producer-0| No leader change for topic rb_event [2] with leader 0
%7|1376053831.161|PARTCNT|0x7f762d015700|default#producer-0| Partitioning 0 unassigned messages in topic rb_event to 3 partitions
%7|1376053831.161|UAS|0x7f762d015700|default#producer-0| 0/0 messages were partitioned

PHP extension on top of librdkafka - Undefined symbol: snappy_compress_iov

Hello, I'm trying to build a PHP extension on top of this librdkafka C library. Unfortunately, after linking, compiling and installing I'm getting this error:

PHP Warning: PHP Startup: Unable to load dynamic library '/usr/lib/php5/2010052
5+lfs/kafka.so' - /usr/lib/php5/20100525+lfs/kafka.so: undefined symbol: snappy_
compress_iov
in Unknown on line 0

Because I'm not so good with C, maybe someone can help me to figure out what is happening. Here is my repo: https://github.com/salebab/phpkafka and here is compile log http://pastebin.com/FZnzz7JU

Thanks!

make error: rdkafka_example

The make rdkafka_performance works fine, but
make rdkafka_example gives this:
cc1: warnings being treated as errors
rdkafka_example.c: In function ‘hexdump’:
rdkafka_example.c:57: error: format ‘%lu’ expects type ‘long unsigned int’, but argument 4 has type ‘size_t’
make: *** [rdkafka_example] Error 1

Suggestion? (I don't do a lot of C work)

Thanks,
Chris

static linking of librdkafka.a needs dynamic pthreads linking

Seeing this on Red Hat Linux 6. It's possible there's something in my code leading up to it, but the stack trace seems to show only librdkafka code:
#0 0x0000003456832885 in raise () from /lib64/libc.so.6

Missing separate debuginfos, use: debuginfo-install glibc-2.12-1.47.el6_2.9.x86_64 libgcc-4.4.6-3.el6.x86_64 libstdc++-4.4.6-3.el6.x86_64 zlib-1.2.3-27.el6.x86_64
(gdb) where
#0 0x0000003456832885 in raise () from /lib64/libc.so.6
#1 0x0000003456834065 in abort () from /lib64/libc.so.6
#2 0x000000345686f977 in __libc_message () from /lib64/libc.so.6
#3 0x0000003456875296 in malloc_printerr () from /lib64/libc.so.6
#4 0x0000003456865a1d in fclose@@GLIBC_2.2.5 () from /lib64/libc.so.6
#5 0x00000034584047fa in _nss_files_gethostbyname2_r () from /lib64/libnss_files.so.2
#6 0x0000003456900a39 in gethostbyname2_r@@GLIBC_2.2.5 () from /lib64/libc.so.6
#7 0x00000034568ce9f6 in gaih_inet () from /lib64/libc.so.6
#8 0x00000034568d0170 in getaddrinfo () from /lib64/libc.so.6
#9 0x00007f288c761d8c in rd_getaddrinfo (nodesvc=, defsvc=0x7f287c0008e0 "5757", flags=32, family=,

socktype=<value optimized out>, protocol=<value optimized out>, errstr=0x7f2877ffeda8) at rdaddr.c:161

#10 0x00007f288c757d46 in rd_kafka_broker_resolve (rkb=0x1d15fe0) at rdkafka_broker.c:432
#11 0x00007f288c75e03b in rd_kafka_broker_connect (arg=0x1d15fe0) at rdkafka_broker.c:1292
#12 rd_kafka_broker_thread_main (arg=0x1d15fe0) at rdkafka_broker.c:3041
#13 0x00000034570077f1 in start_thread () from /lib64/libpthread.so.0
#14 0x00000034568e5ccd in clone () from /lib64/libc.so.6

Valgrind shows:

==24258== Thread 3:^M
==24258== Invalid read of size 8^M
==24258== at 0x345687112E: _IO_file_underflow@@GLIBC_2.2.5 (in /lib64/libc-2.12.so)^M
==24258== by 0x3456872C2D: _IO_default_uflow (in /lib64/libc-2.12.so)^M
==24258== by 0x3456867419: _IO_getline_info (in /lib64/libc-2.12.so)^M
==24258== by 0x3456870140: fgets_unlocked (in /lib64/libc-2.12.so)^M
==24258== by 0x34584046CE: _nss_files_gethostbyname2_r (in /lib64/libnss_files-2.12.so)^M
==24258== by 0x3456900A38: gethostbyname2_r@@GLIBC_2.2.5 (in /lib64/libc-2.12.so)^M
==24258== by 0x34568CE9F5: gaih_inet (in /lib64/libc-2.12.so)^M
==24258== by 0x34568D016F: getaddrinfo (in /lib64/libc-2.12.so)^M
==24258== by 0x529DD8B: rd_getaddrinfo (rdaddr.c:161)^M
==24258== by 0x5293D45: rd_kafka_broker_resolve (rdkafka_broker.c:432)^M
==24258== by 0x529A03A: rd_kafka_broker_thread_main (rdkafka_broker.c:1292)^M
==24258== by 0x34570077F0: start_thread (in /lib64/libpthread-2.12.so)^M
==24258== Address 0x4ef3d40 is 144 bytes inside a block of size 568 free'd^M
==24258== at 0x4A06300: free (vg_replace_malloc.c:446)^M
==24258== by 0x3456865A1C: fclose@@GLIBC_2.2.5 (in /lib64/libc-2.12.so)^M
==24258== by 0x34584047F9: _nss_files_gethostbyname2_r (in /lib64/libnss_files-2.12.so)^M
==24258== by 0x3456900A38: gethostbyname2_r@@GLIBC_2.2.5 (in /lib64/libc-2.12.so)^M
==24258== by 0x34568CE9F5: gaih_inet (in /lib64/libc-2.12.so)^M
==24258== by 0x34568D016F: getaddrinfo (in /lib64/libc-2.12.so)^M
==24258== by 0x529DD8B: rd_getaddrinfo (rdaddr.c:161)^M
==24258== by 0x5293D45: rd_kafka_broker_resolve (rdkafka_broker.c:432)^M
==24258== by 0x529A03A: rd_kafka_broker_thread_main (rdkafka_broker.c:1292)^M
==24258== by 0x34570077F0: start_thread (in /lib64/libpthread-2.12.so)^M
==24258== by 0x34568E5CCC: clone (in /lib64/libc-2.12.so)^M

With debug turned on, this is what I see:
1388099207.266 RDKAFKA-7-BROKER: LARA#producer-0: kafkadevcluster1-1.maskedout.com:5757/bootstrap: Added new broker with NodeId -1
1388099207.266 RDKAFKA-7-BRKMAIN: LARA#producer-0: kafkadevcluster1-1.maskedout.com:5757/bootstrap: Enter main broker thread
1388099207.266 RDKAFKA-7-CONNECT: LARA#producer-0: kafkadevcluster1-1.maskedout.com:5757/bootstrap: broker in state DOWN connecting
1388099207.266 RDKAFKA-7-BROKER: LARA#producer-0: kafkadevcluster1-2.maskedout.com:5757/bootstrap: Added new broker with NodeId -1
1388099207.266 RDKAFKA-7-BRKMAIN: LARA#producer-0: kafkadevcluster1-2.maskedout.com:5757/bootstrap: Enter main broker thread
1388099207.266 RDKAFKA-7-CONNECT: LARA#producer-0: kafkadevcluster1-2.maskedout.com:5757/bootstrap: broker in state DOWN connecting
1388099207.267 RDKAFKA-7-BROKER: LARA#producer-0: kafkadevcluster1-3.maskedout.com:5757/bootstrap: Added new broker with NodeId -1
AFTER TRYING BROKERS:kafkadevcluster1-1.maskedout.com:5757,kafkadevcluster1-2.maskedout.com:5757,kafkadevcluster1-3.maskedout.com:5757
1388099207.267 RDKAFKA-7-BRKMAIN: LARA#producer-0: kafkadevcluster1-3.maskedout.com:5757/bootstrap: Enter main broker thread
1388099207.267 RDKAFKA-7-CONNECT: LARA#producer-0: kafkadevcluster1-3.maskedout.com:5757/bootstrap: broker in state DOWN connecting
1388099207.267 RDKAFKA-7-TOPIC: LARA#producer-0: New local topic: Lara
*** glibc detected *** ./replicator: double free or corruption (out): 0x00007fd1f0001200 ***

Build Fails on Centos6.4

Is librdkakfa ready for use w/ Kafka 0.8.x?

I'm unable to build from master which is currently at: a29956d

[david@ops-1 librdkafka{master}]$ make clean ;  make all 
rm -f rdkafka.o rdkafka_broker.o rdkafka_msg.o rdkafka_topic.o rdkafka_defaultconf.o rdcrc32.o rdgz.o rdaddr.o rdrand.o rdthread.o rdqueue.o rdlog.o rdkafka.d rdkafka_broker.d rdkafka_msg.d rdkafka_topic.d rdkafka_defaultconf.d rdcrc32.d rdgz.d rdaddr.d rdrand.d rdthread.d rdqueue.d rdlog.d \
        librdkafka*.a librdkafka*.so librdkafka*.so.1
cc -MD -MP -O2 -Wall -Werror -Wfloat-equal -Wpointer-arith -fPIC -I. -g -rdynamic -c rdkafka.c
In file included from rdkafka.c:37:
rdkafka_int.h:288: error: redefinition of typedef ‘rd_kafka_topic_t’
rdkafka.h:60: note: previous declaration of ‘rd_kafka_topic_t’ was here
rdkafka_int.h:383: error: redefinition of typedef ‘rd_kafka_t’
rdkafka.h:59: note: previous declaration of ‘rd_kafka_t’ was here
make: *** [rdkafka.o] Error 1

When I check out the 0.7 branch, I can build without errors.

Minor fix to sample program

Error should say 'failed to create new consumer rather than producer'.

/* Create Kafka handle */
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
errstr, sizeof(errstr)))) {
fprintf(stderr,
"%% Failed to create new producer: %s\n",
errstr);
exit(1);
}

Message timeout scans

The unlikely condidtion for last_timeout_scan in rd_kafka_broker_serve that has * 1000000 will 'unlikely' trigger timeout scans, is this as intended, or should it be + 1000000 (1 second) to trigger scans a bit more frequent?

Does the library support broker recover back

Hi,
In my case, when the kafka cluster is shutdown, and the producer use the librdkafka still try to send the log to kafka, it will failed.
but When I start the Kafka cluster, the producer also can't send the log to kafka, and I found the socket in producer is "CLOSE_WAIT" status

[[email protected] logs]$ netstat -an|grep 9093
tcp        1      0 10.96.250.211:43934         10.99.116.53:9093           CLOSE_WAIT  
tcp        1      0 10.96.250.211:50729         10.99.116.54:9093           CLOSE_WAIT  
tcp        1      0 10.96.250.211:45626         10.99.116.55:9093           CLOSE_WAIT  
tcp        1      0 10.96.250.211:45625         10.99.116.55:9093           CLOSE_WAIT  
tcp        1      0 10.96.250.211:43951         10.99.116.53:9093           CLOSE_WAIT  

Does the librdlibary has the feature to support checking broker alive? Thanks~!

High concurrency usage

Hi,

I'm writing a nginx module, which parses a message and send it to kafka using librdkafka. But under high level of concurrency, the api locks e and says it cannot create more threads to send the message. Is there a way to overcome this, by using it in a synchronous way, for example?

thanks in advance,

andre

Can you explain a bit about the 'key' passed by the producer?

I know it can be used by the partitioner, but I'm also wondering what other value it has - I read that it is also passed to the consumer. Could this be considered like a 'header field'? (thinking like a JMS Header field). Is there only a single key that can be passed? If so, I guess I would have to make it somewhat of a compound/concatenated/delimited one if I want to have a bunch of properties passed?

Program received signal SIGPIPE, Broken pipe.

I'm getting the following when trying the example on Ubuntu 12.10:
f
Kafka 0.7.2 (installed from https://github.com/downloads/wmf-analytics/kafka-debian/kafka_0.7.2_all.deb)

Program received signal SIGPIPE, Broken pipe.
[Switching to Thread 0xb7dc8b40 (LWP 8168)]
0xb7fdd424 in __kernel_vsyscall ()
(gdb) bt
#0  0xb7fdd424 in __kernel_vsyscall ()
#1  0xb7fabf08 in sendmsg () from /lib/i386-linux-gnu/libpthread.so.0
#2  0x0804a7ca in rd_kafka_send (rk=0x80518f8, msg=<optimized out>)
    at rdkafka.c:413
#3  0x0804a95a in rd_kafka_send_request (rk=rk@entry=0x80518f8, 
    msgtype=msgtype@entry=0, topicpart=0xb7dc7124) at rdkafka.c:477
#4  0x0804aca6 in rd_kafka_produce_send (rko=0x8051d70, rk=0x80518f8)
    at rdkafka.c:862
#5  rd_kafka_wait_op (rk=<optimized out>) at rdkafka.c:909
#6  rd_kafka_thread_main (arg=0x80518f8) at rdkafka.c:984
#7  0xb7fa4d4c in start_thread () from /lib/i386-linux-gnu/libpthread.so.0
#8  0xb7ec0d3e in clone () from /lib/i386-linux-gnu/libc.so.6

Support sync interface

Hi,
currently the librdkafka only support async, but in my case, we prefer use sync interface.
Do you have plan to support this, Thanks~!

Do you have to specify a callback function for the rd_kafka_poll() to work?

I indicated request.required.acks=1 and am polling in a loop until I get a return value of 1. I have not specified a callback function (I may do this later), but I find that it doesn't ever seem to return 1? Is a callback function required?

In a different program I do have a callback function and it seems to work as expected.

excessive memory allocation == poor performance?

Testing with code based on the example to stream data from rsyslog to kafka. I'm hit by the poor performance - when I would expect 5000 messages / s, I'm only getting ~300 messages / s.

Reading the code, I'm concerned that there's at least two mallocs in the producer thread:

First, in the client:

        char *opbuf = malloc(len + 1);
        strncpy(opbuf, buf, len + 1);

...then the actual producer:

void rd_kafka_produce (rd_kafka_t *rk, char *topic, uint32_t partition,
               int msgflags,
               char *payload, size_t len) {
    rd_kafka_op_t *rko;

    rko = calloc(1, sizeof(*rko));

It seems that it would be best for librdkafka to allocate, track usage and reuse of rd_kafka_op_t and rd_kafka_op_t->rko_payload for optimal performance.

Topic+partition specific produce errors from the broker not properly handled

This may be a Kafka bug rather than a librdkafka bug, but if I send messages larger than the value, I get no indication of failure. I get successful delivery callbacks. My consumers simply receive no messages and the only way I knew was that the JMX console showed failedproduce counter increases. Any chance the API could provide an error? Even if the broker doesn't, not sure if the API knows the max and can locally complain?

0.8.1 master branch makefile can't generate librdkafka.so.1

edenhill,
i use centos 6.3 final version, 0.8.1 master branch librdkafka dont generate so.1, the bug is described below:

There is a issue in Makefile, line 50,
50 @(if [ $(UNAME_S) = "Linux" -o $(CYGWIN) = CYGWIN ]; then \

should use
@(if [ $(UNAME_S) = "Linux" ]; then \

or else no so.1 generated. I think it's brought in by your recent support for cygwin.

metadata req flood if no leader

If there is no leader for a wanted partition a new metadata request will be sent immediately, which will detect there is no leader for the wanted partition and send a new metadata request... goto 10.

This causes a steady flood of metadata requests to the broker.

Error during make examples

During a "make" or "make examples", I get the following compiler error:

#
# More usage options:
./rdkafka_example --help
cc1: warnings being treated as errors
rdkafka_performance.c: In function ‘main’:
rdkafka_performance.c:640: error: ‘rkmessages’ may be used uninitialized in this function
make[1]: *** [rdkafka_performance] Error 1
make[1]: Leaving directory `/tmp/librdkafka-master/examples'
make: *** [examples] Error 2

Changing line 640 of rdkafka_performance.c to the following fixes my problem:

                rd_kafka_message_t **rkmessages=NULL;

Information about my system if it helps you:

$ cat /etc/redhat-release 
CentOS release 6.4 (Final)
$ uname -a
Linux XXXXXXXX 2.6.32-279.11.1.el6.x86_64 #1 SMP Tue Oct 16 15:57:10 UTC 2012 x86_64 x86_64 x86_64 GNU/Linux
$ cc --version
cc (GCC) 4.4.7 20120313 (Red Hat 4.4.7-3)
$ make --version
GNU Make 3.81

rd_kafka_destroy doesn't seem to close the socket properly

I'm using this library in the end of message callback of a milter application. I've got a JSON object of the email message, and I need to hand it off to Kafka before I delete it from the Postfix queue. Here is the code:

  rk=rd_kafka_new(RD_KAFKA_PRODUCER, kafka_url, NULL);
  if(rk != NULL) {
    rd_kafka_produce(rk,"Trout-test",0,RD_KAFKA_OP_F_FREE,scratch,strlen(scratch));
    fprintf(logfile,"TID: %ld :: %ld :: scmfi_eom - Sent %d bytes to Trout-test:0\n",(long int)syscall(SYS_gettid),time(NULL),strlen(scratch));
    while (rd_kafka_outq_len(rk) > 0) {                                    /* Pulled this from the rdkafka_example */
            usleep(50000);
    }
    fprintf(logfile,"TID: %ld :: %ld :: scmfi_eom - kafka outq is now 0\n",(long int)syscall(SYS_gettid),time(NULL));
    //free(scratch);                                                         /* the rd_kafka_produce call is freeing scratch (RD_KAFKA_OP_F_FREE) */
    usleep(500000);
    rd_kafka_destroy(rk);                                                  /* Destroy the Kafka handle */
  }

When I run this code, everything works fine, until I've sent in about 1000 messages through the MTA. At that point, the rd_kafka_new started to fail with this message: Failed to create inet socket: Too many open files

So I upped my open files with ulimit to a number greater than 200000 (I was sending in batches of 100000 messages), and then it started failing at around 30000 messages because there were no more ephemeral sockets available to make connections to the broker.

When I look at the source, I see the close call on the socket, but when I follow the execution with lsof, or just netstat, the sockets are all established. Am I using the rd_kafka_new, rd_kafka_produce, rd_kafka_destroy improperly (once for each message), or is this an actual problem?

Thank you,
Paul

Error when compiling against librdkafka with g++ (as opposed to gcc), even with extern "C"

Hi,

I had to make a change to get a C++ program to compile against librdkafka even with extern "C" of the header file. I think the error is less about C vs. C++ though but perhaps related to more strict error checking.

rd_kafka_message_errstr returns a const char *, however, it is returning the rkmessage->payload, which is a void *. I had to add cast it to const char * in the return statement to get it to compile.

Compile on Mac

Can you give any insight on overcoming errors below when trying to make on Mac?

cc -MD -MP -O2 -Wall -Werror -Wfloat-equal -Wpointer-arith -fPIC -I. -g -DSG -c rdkafka.c
rdkafka.c:1065:29: error: use of GNU old-style field designator extension [-Werror,-Wgnu-designator]
struct consume_ctx ctx = { consume_cb: consume_cb, opaque: opaque };
^~~~~~~~~~~
.consume_cb =
rdkafka.c:1065:53: error: use of GNU old-style field designator extension [-Werror,-Wgnu-designator]
struct consume_ctx ctx = { consume_cb: consume_cb, opaque: opaque };
^~~~~~~
.opaque =
2 errors generated.
make: *** [rdkafka.o] Error 1

librdkafka 0.8 logger callback error

Hi, Magnus.
librdkafka 0.8 provides a callback for logger. The comment of function "rd_kafka_set_logger" mentions that "Alternatively the application may provide its own logger callback".
However, when I provide my logger callback, some compiler errors occur.

error: invalid use of incomplete type ‘const rd_kafka_t {aka const struct rd_kafka_s}’
/usr/local/include/librdkafka/rdkafka.h:59:16: error: forward declaration of ‘const rd_kafka_t {aka const struct rd_kafka_s}’

I wonder whether I use logger callback correctly.

Thanks.

Librdkafka does not compile on OSX 10.7.5

Hey Magnus,

I am trying to compile Librdkafka on OSX with the following platform env:

Apple LLVM version 4.2 (clang-425.0.28) (based on LLVM 3.2svn)
Target: x86_64-apple-darwin11.4.2
Thread model: posix

A couple of issues showed up:

  1. the -rdynamic switch is not supported on OSX
  2. byteswap.h is not available on OSX, that can be fixed using the following:

if defined(APPLE)

include <libkern/OSByteOrder.h>

define le32toh OSSwapLittleToHostInt32

define htole32 OSSwapHostToLittleInt32

define bswap_32 OSSwapInt32

else

#include <byteswap.h>

endif

  1. The TIMEVAL_TO_TIMESPEC from rdtime.h can be removed
  2. OSX has an incompatible sys queue.h
  3. replace strndupa with strndup

Best,
D

No error callback and hanging when publishing to non existent partition

my topic is defined as having 10 partitions. (0-9) When I attempt to send to partition 10, I expected to get an error callback. However, the poll (which I set to -1) just hangs. Shouldn't I get an error callback? I know the API is aware this is a bad partition because my log callback is being called and includes this detail:
27-Dec-2013 11:44:21:918 [T07][3828] kafka/kafka.cc:23 fac:'PART' name:'LARA#producer-0' log:'LaraReplicator_kafkacluster2 partition [10] not currently available'^M

RPM packaging

Proper RPM packaging for librdkafka would be good.

Failed to decompress gzip

I'm setting gzip in my publisher and am receiving this in rdkafka_example:

1389132516.572 RDKAFKA-7-GZIP: rdkafka#consumer-0: d145931-002.masked.com:5757/0: Failed to decompress Gzip message at offset 3145 of 216 bytes: ignoring message

I get this whether -z gzip is set or not. Any idea?

nginx worker processes exit with signal 11 when publishing messages in nginx

Hi edenhill,
I write c code which called by lua to produce messages to kafka server . The lua code is called in nginx server.
I create one kafka handle and add a list of brokers once in nginx initialisation. Then when receiving requests to nginx server, call the lua code to publish messages.
But I got the nginx worker processes exit with signal 11 once requests sent out.
I also wonder if it's right to hold one kafka handle for dealing with all topic write effectively.

As I don't know how to attach the code,i past the major code snippt below:
//the code try to create one kafka handle and add brokers
int mercury_add_kafkabroker(mercury_t *me, const char *kafka_brokers)
{

    if (!me || !kafka_brokers) return -1;

    init_error();

    char errstr[100];
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    rd_kafka_conf_set_error_cb(conf, err_cb);
    rd_kafka_conf_set_dr_cb(conf, msg_delivered);

    char err[100];
    rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    me->kafka_handler = rk; 

   if (rk == NULL) {
            set_error("rd_kafka_new fail");
            return -1;
    }

    /* Add brokers */
    if (rd_kafka_brokers_add(me->kafka_handler, kafka_brokers) == 0) {
            fprintf(stderr, "%% No valid brokers specified\n");
            return -1;
    }
    return 0;

}

//the code is called looply by receiving request to nginx server
int mercury_sendmsg_kafka(mercury_t *me, const char * topic, const char *msg)
{
if (!me || !msg) return -1;

    init_error();

    size_t msg_len = strlen(msg);

    char *opbuf = malloc(msg_len + 1);
    strcpy(opbuf, msg);

    if (NULL == me->kafka_topic_handler)  {
    rd_kafka_topic_conf_t * topic_conf = rd_kafka_topic_conf_new();
    rd_kafka_topic_t * rkt = rd_kafka_topic_new(me->kafka_handler, topic, topic_conf);
    me->kafka_topic_handler = rkt;
    }
    while(rd_kafka_produce(me->kafka_topic_handler, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_FREE, opbuf, msg_len, NULL, 0, NULL) == -1) {
        fprintf(stderr, "produce error; resend!");
        /* Poll to handle delivery reports */
        rd_kafka_poll(me->kafka_handler, 10); 
    }
    rd_kafka_poll(me->kafka_handler, 0); 

    if (NULL != me->kafka_handler)  { 
    rd_kafka_destroy(me->kafka_handler);
    }


    return 0;

}

Thanks
Aaron

No errors when produce to non-existent external broker

Hi,

here is example when I try to produce message to non-existent broker at localhost:

examples/rdkafka_example -P -b localhost:9092 -t test00
% Type stuff and hit enter to send
1389364025.904 RDKAFKA-3-FAIL: rdkafka#producer-0: localhost:9092/bootstrap:
localhost:9092/bootstrap: Failed to connect to broker at localhost:9092: Connection refused

And that is fine, since I'm not running kafka broker on my localhost. But If I put external host, it doesn't show any error:

examples/rdkafka_example -P -b 2.2.2.2:9092 -t test00
% Type stuff and hit enter to send

% Sent 1 bytes to topic test00 partition -1

In this case, I'm expecting some "Failed to connect" message or so.

So, how to check does library can make a connection and produce messages?

Regards

Question about error call backs

Hi,

I've registered for error call backs (and have done so as early in the program as possible), but I find that if I give an intentionally wrong broker hostname, I don't get called back for it, until I attempt to publish. However, I can see with debug turned on, that the library knows that the host is wrong much earlier (and reports this in the log). Is there a way to have this called back to my error handler so that I can know earlier and exit/complain?

Thanks.

It's possible to compile the library under cygwin?

Once the -Werror option was removed from Makefiles to avoid some warnings as errors (see below), when trying to compile the library under cygwin I obtain the following error:

$ make all
cc -MD -MP -O2 -Wall -Wfloat-equal -Wpointer-arith -fPIC -I. -g -Wno-gnu-designator -DSG -c rdkafka.c
rdkafka.c:1:0: warning: -fPIC ignored for target (all code is position independent) [enabled by default]
rdkafka.c: In function 'rd_kafka_q_serve':
rdkafka.c:356:2: error: 'rd_kafka_op_t' has no member named 'tqh_first'
rdkafka.c:356:2: error: 'rko_link' undeclared (first use in this function)
rdkafka.c:356:2: note: each undeclared identifier is reported only once for each function it appears in
rdkafka.c:356:2: error: expected identifier before '&' token
rdkafka.c:356:2: warning: left-hand operand of comma expression has no effect [-Wunused-value]
rdkafka.c: In function 'rd_kafka_destroy':
rdkafka.c:518:2: error: 'rd_kafka_topic_t' has no member named 'tqh_first'
rdkafka.c:518:2: error: 'rkt_link' undeclared (first use in this function)
rdkafka.c:518:2: error: expected identifier before '&' token
rdkafka.c:518:2: warning: left-hand operand of comma expression has no effect [-Wunused-value]
rdkafka.c: At top level:
cc1: warning: unrecognized command line option "-Wno-gnu-designator" [enabled by default]
Makefile:39: recipe for target `rdkafka.o' failed
make: *** [rdkafka.o] Error 1

Tools version are:

  • CYGWIN_NT-6.1 1.7.25(0.270/5/3) i686 Cygwin
  • gcc (GCC) 4.7.3

Is compression in fact supported?

The introduction seems to indicate yes, but the rdkafka_int.h indicates no/not supported in the enum definition of rd_kafka_compression_t. Hoping that it's simply outdated?

issue with rdkafka_example when auto.create.topics.enable=true in kafka8

Kafka 8 offers a feature where new topics are automatically created by the broker if a producer attempts to send messages to a non-existent topic. When 'auto.create.topics.enable=true' is set in Kafka 8, messages sent by the 'rdkafka_example producer' get black holed by the broker. FWIW: messages sent to existing topics when 'auto.create.topics.enable=true' works just fine.

To reproduce this condition, add 'auto.create.topics.enable=true' to the server.properties file and start the broker. You should find that kafka-consumer-producer.sh triggers the broker to create the new topics and works as advertised. But something goes wrong with rdkafka_example.

Thanks for continuing to maintain this project and looking into this issue.

-- Adam

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.