Giter VIP home page Giter VIP logo

php-rdkafka's Introduction

PHP Kafka client - php-rdkafka

Join the chat at https://gitter.im/arnaud-lb/php-rdkafka

Supported librdkafka versions: >= 0.11 Supported Kafka versions: >= 0.8 Supported PHP versions: 7.x .. 8.x

PHP-rdkafka is a stable, production-ready, long term support, and fast Kafka client for PHP based on librdkafka.

It supports PHP 7, PHP 8, PHP 5 (in older versions), all librdkafka versions since 0.11, all Kafka versions since 0.8. This makes it easy to deploy the extension in production.

The goal of the extension is to be a low-level un-opinionated librdkafka binding focused on production and long term support.

The high level and low level consumers, producer, and metadata APIs are supported.

Documentation is available here.

Sponsors

Upstash

Upstash: Serverless Kafka

  • True Serverless Kafka with per-request-pricing
  • Managed Apache Kafka, works with all Kafka clients
  • Built-in REST API designed for serverless and edge functions

Start for free in 30 seconds!

php-rdkafka supports Ukraine. Proceeds from our generous sponsors are currently donated to the Support Ukraine collective.

Table of Contents

  1. Installation
  2. Examples
  3. Usage
  4. Documentation
  5. Credits
  6. License

Installation

https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.setup.html

Examples

https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.examples.html

Usage

Configuration parameters used below can be found in Librdkafka Configuration reference

Producing

Creating a producer

For producing, we first need to create a producer, and to add brokers (Kafka servers) to it:

<?php
$conf = new RdKafka\Conf();
$conf->set('log_level', (string) LOG_DEBUG);
$conf->set('debug', 'all');
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("10.0.0.1:9092,10.0.0.2:9092");

Producing messages

Warning Make sure that your producer follows proper shutdown (see below) to not lose messages.

Next, we create a topic instance from the producer:

<?php

$topic = $rk->newTopic("test");

From there, we can produce as much messages as we want, using the produce method:

<?php

$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");

The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.
The second argument are message flags and should be either 0
or RD_KAFKA_MSG_F_BLOCK to block produce on full queue. The message payload can be anything.

Proper shutdown

This should be done prior to destroying a producer instance
to make sure all queued and in-flight produce requests are completed
before terminating. Use a reasonable value for $timeout_ms.

Warning Not calling flush can lead to message loss!

$rk->flush($timeout_ms);

In case you don't care about sending messages that haven't been sent yet, you can use purge() before calling flush():

// Forget messages that are not fully sent yet
$rk->purge(RD_KAFKA_PURGE_F_QUEUE);

$rk->flush($timeout_ms);

High-level consuming

The RdKafka\KafkaConsumer class supports automatic partition assignment/revocation. See the example here.

Low-level consuming (legacy)

Note The low-level consumer is a legacy API, please prefer using the high-level consumer

We first need to create a low level consumer, and to add brokers (Kafka servers) to it:

<?php
$conf = new RdKafka\Conf();
$conf->set('log_level', (string) LOG_DEBUG);
$conf->set('debug', 'all');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("10.0.0.1,10.0.0.2");

Next, create a topic instance by calling the newTopic() method, and start consuming on partition 0:

<?php

$topic = $rk->newTopic("test");

// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

Next, retrieve the consumed messages:

<?php

while (true) {
    // The first argument is the partition (again).
    // The second argument is the timeout.
    $msg = $topic->consume(0, 1000);
    if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
        // Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
        continue;
    } elseif ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $msg->payload, "\n";
    }
}

Low-level consuming from multiple topics / partitions (legacy)

Note The low-level consumer is a legacy API, please prefer using the high-level consumer

Consuming from multiple topics and/or partitions can be done by telling librdkafka to forward all messages from these topics/partitions to an internal queue, and then consuming from this queue:

Creating the queue:

<?php
$queue = $rk->newQueue();

Adding topic partitions to the queue:

<?php

$topic1 = $rk->newTopic("topic1");
$topic1->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);
$topic1->consumeQueueStart(1, RD_KAFKA_OFFSET_BEGINNING, $queue);

$topic2 = $rk->newTopic("topic2");
$topic2->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);

Next, retrieve the consumed messages from the queue:

<?php

while (true) {
    // The only argument is the timeout.
    $msg = $queue->consume(1000);
    if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
        // Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
        continue;
    } elseif ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $msg->payload, "\n";
    }
}

Using stored offsets

Broker (default)

librdkafka per default stores offsets on the broker.

File offsets (deprecated)

If you're using local file for offset storage, then by default the file is created in the current directory, with a name based on the topic and the partition. The directory can be changed by setting the offset.store.path configuration property.

Consumer settings

Low-level consumer: auto commit settings

To manually control the offset, set enable.auto.offset.store to false.
The settings auto.commit.interval.ms and auto.commit.enable will control
if the stored offsets will be auto committed to the broker and in which interval.

High-level consumer: auto commit settings

To manually control the offset, set enable.auto.commit to false.

High level consumer: max.poll.interval.ms

Maximum allowed time between calls to consume messages for high-level consumers.
If this interval is exceeded the consumer is considered failed and the group will
rebalance in order to reassign the partitions to another consumer group member.

Consumer group id (general)

group.id is responsible for setting your consumer group ID and it should be unique (and should not change). Kafka uses it to recognize applications and store offsets for them.

<?php

$topicConf = new RdKafka\TopicConf();
$topicConf->set("auto.commit.interval.ms", 1e3);

$topic = $rk->newTopic("test", $topicConf);

$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

Interesting configuration parameters

Librdkafka Configuration reference

queued.max.messages.kbytes

librdkafka will buffer up to 1GB of messages for each consumed partition by default. You can lower memory usage by reducing the value of the queued.max.messages.kbytes parameter on your consumers.

topic.metadata.refresh.sparse and topic.metadata.refresh.interval.ms

Each consumer and producer instance will fetch topics metadata at an interval defined by the topic.metadata.refresh.interval.ms parameter. Depending on your librdkafka version, the parameter defaults to 10 seconds, or 600 seconds.

librdkafka fetches the metadata for all topics of the cluster by default. Setting topic.metadata.refresh.sparse to the string "true" makes sure that librdkafka fetches only the topics he uses.

Setting topic.metadata.refresh.sparse to "true", and topic.metadata.refresh.interval.ms to 600 seconds (plus some jitter) can reduce the bandwidth a lot, depending on the number of consumers and topics.

internal.termination.signal

This setting allows librdkafka threads to terminate as soon as librdkafka is done with them. This effectively allows your PHP processes / requests to terminate quickly.

When enabling this, you have to mask the signal like this:

<?php
// once
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
// any time
$conf->set('internal.termination.signal', SIGIO);

socket.blocking.max.ms (librdkafka < 1.0.0)

Maximum time a broker socket operation may block. A lower value improves responsiveness at the expense of slightly higher CPU usage.

Reducing the value of this setting improves shutdown speed. The value defines the maximum time librdkafka will block in one iteration of a read loop. This also defines how often the main librdkafka thread will check for termination.

queue.buffering.max.ms

This defines the maximum and default time librdkafka will wait before sending a batch of messages. Reducing this setting to e.g. 1ms ensures that messages are sent ASAP, instead of being batched.

This has been seen to reduce the shutdown time of the rdkafka instance, and of the PHP process / request.

Performance / Low-latency settings

Here is a configuration optimized for low latency. This allows a PHP process / request to send messages ASAP and to terminate quickly.

<?php

$conf = new \RdKafka\Conf();
$conf->set('socket.timeout.ms', 50); // or socket.blocking.max.ms, depending on librdkafka version
if (function_exists('pcntl_sigprocmask')) {
    pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
    $conf->set('internal.termination.signal', SIGIO);
} else {
    $conf->set('queue.buffering.max.ms', 1);
}

$producer = new \RdKafka\Producer($conf);
$consumer = new \RdKafka\Consumer($conf);

It is advised to call poll at regular intervals to serve callbacks. In php-rdkafka:3.x
poll was also called during shutdown, so not calling it in regular intervals might
lead to a slightly longer shutdown. The example below polls until there are no more events in the queue:

$producer->produce(...);
while ($producer->getOutQLen() > 0) {
    $producer->poll(1);
}

Documentation

https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/book.rdkafka.html
The source of the documentation can be found here

Asking for Help

If the documentation is not enough, feel free to ask a questions on the php-rdkafka channels on Gitter or Google Groups.

Stubs

Because your IDE is not able to auto discover php-rdkadka api you can consider usage of external package providing a set of stubs for php-rdkafka classes, functions and constants: kwn/php-rdkafka-stubs

Contributing

If you would like to contribute, thank you :)

Before you start, please take a look at the CONTRIBUTING document to see how to get your changes merged in.

Credits

Documentation copied from librdkafka.

Authors: see contributors.

License

php-rdkafka is released under the MIT license.

php-rdkafka's People

Contributors

arnaud-lb avatar breath-co2 avatar carusogabriel avatar cb-freddysart avatar chengguoqiang avatar cmb69 avatar dariuskasiulevicius avatar dbakiu avatar gitter-badger avatar gytislakavicius avatar itzaphenix avatar jyono avatar kwn avatar lkm avatar mariam-japaridze avatar martynaszaliaduonis avatar nick-zh avatar nicolas-t avatar nyoung avatar plediii avatar remicollet avatar ricardclau avatar romainneutron avatar ruudk avatar sndsgd avatar steveb-p avatar timesplinter avatar tpl0ch avatar wirone avatar zsxsoft avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

php-rdkafka's Issues

How to build php_rdkafka extension on Windows

I placed sources downloaded from pecl and changed config.w32 to

ARG_ENABLE('rdkafka', 'enable rdkafka support', 'no');

if (PHP_RDKAFKA != 'no')
{
    EXTENSION('rdkafka', 'rdkafka.c');
}

Then I run configure --disable-all --enable-cli --enable-rdkafka all is good, but then I try nmake in console appears message that it's needed librdkafka/rdkafka.h, when I downloaded it and placed in the folder with php-rdkafka extension in console appers error that reached 100 count of errors compilation stopped.. (fatal error c1003)

How I can build it together? I found that in librdkafka exists Visual Studio project.. Should I use it somehow?

Write HHVM extension glue

I'm looking at using this extension to replace aging ActiveMQ infrastructure in our production application. We're planning to migrate our servers to HHVM in the next year, so I'd be very interested in helping write HHVM glue for your module. There's a decent overview of the process here, it includes some slightly annoying details like renaming all of the C files ".cpp", and repeating the function declarations in a new .php file.

Please let me know if this is a direction you'd like to pursue, and I'll start looking at writing the glue.

Nice work on this extension, thank you!

time_wait

why so many connections in a time_wait status, thanks in advance

Memory issue

memory of php-fpm worker process continues to grow , does anyone else have the same problem? thanks

My producer executed once,but it produced 3 records.

  • PHP version: 5.5.35
  • Kafka version: 0.8.2
  • librdkafka version: master
  • php-rdkfaka version: master

PHP Code

<?php
    $rk = new \RdKafka\Producer();
    $rk->setLogLevel(LOG_DEBUG);
    $rk->addBrokers(env("KAFKA_HOST"));
    $topic = $rk->newTopic($topic);
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, $content);

My Issues

# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic AppAccess
123 ua12555354  1464766008000
123 ua12555354  1464766008000
123 ua12555354  1464766008000

Question

My producer executed once,but it produced 3 records.
I don't know in which part the problem occurs.

Generate php files to include

Hello !

I need to include PHP files in my PHP script for daemon!

How can i do this for not execute the following error:

"RKafka\Consumer not found in directory"

Thanks a lot!

configure --includedir not work?

configure:

./configure --with-php-config=/home/work/odp/bin/php-config --includedir=/home/work/libkafka/include --libdir=/home/work/libkafka/lib

but:

checking for librdkafka/rdkafka.h" in default path... not found
configure: error: Please reinstall the rdkafka distribution

rdkafka.h in this dir:

# ls -l /home/work/libkafka/include/librdkafka/
-rwxr-xr-x  1 work root 53740 Jul  6 15:37 rdkafkacpp.h
-rwxr-xr-x  1 work root 77704 Jul  6 15:37 rdkafka.h

Segmentation fault on PHP shutdown when security.protocol is ssl

This issue is occurring both on my Mac (with Homebrew or by-hand installs), and on a Ubuntu 14.04 system (with PHP etc compiled by hand).

  • PHP version: 7.0.7 on Mac OS, 5.6.22 on Ubuntu
  • librdkafka version: 0.9.1
  • php-rdkfaka version: both with 0.9.1 and master/php7 branches

Simple-ish test script:

foreach(['KAFKA_CLIENT_CERT', 'KAFKA_CLIENT_CERT_KEY', 'KAFKA_TRUSTED_CERT'] as $name) {
    $$name = tempnam('/tmp', $name);
    file_put_contents($$name, $_ENV[$name]);
}

$conf = new RdKafka\Conf();
$conf->set('api.version.request', 'false');
$conf->set('debug', 'all');
$conf->set('security.protocol', 'ssl');
$conf->set('ssl.ca.location', $KAFKA_TRUSTED_CERT);
$conf->set('ssl.certificate.location', $KAFKA_CLIENT_CERT);
$conf->set('ssl.key.location', $KAFKA_CLIENT_CERT_KEY);

$rk = new RdKafka\Producer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers(str_replace('kafka+ssl://', '', $_ENV['KAFKA_URL']));

$topic = $rk->newTopic('test');

$topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Message payload');

The message makes it through to the broker just fine. It appears that the segfault occurs on PHP shutdown; possibly related to the use of SSL. GDB:

(gdb) run kafka.php
Starting program: /usr/local/bin/php kafka.php

Program received signal SIGSEGV, Segmentation fault.
0x0000000102893090 in ?? ()
(gdb) bt
#0  0x0000000102893090 in ?? ()
#1  0x0000000100c9458a in int_err_del () from /usr/local/opt/openssl/lib/libcrypto.1.0.0.dylib
#2  0x000000010003a673 in zm_shutdown_openssl ()
#3  0x0000000100429e19 in module_destructor ()
#4  0x0000000100421a80 in module_destructor_zval ()
#5  0x00000001004312ec in _zend_hash_del_el_ex ()
#6  0x0000000100432701 in zend_hash_graceful_reverse_destroy ()
#7  0x0000000100421c09 in zend_shutdown ()
#8  0x00000001003c52f3 in php_module_shutdown ()
#9  0x0000000000000001 in ?? ()
#10 0x0000000000000001 in ?? ()
#11 0x00007fff5fbffb20 in ?? ()
#12 0x00000001004b0767 in main ()
Backtrace stopped: frame did not save the PC

If you need a Kafka server with SSL and client certs for auth to test/reproduce, let me know.

does it support the group feature?

For example, there are two consumers in one group consuming one topic, so the two consumers can get different messages from the topic.
Do this project support this group feature?
I use like following, but it seems does not work. The two consumers(two processes) get all the messages from the topic.

$topicConf = new RdKafka\TopicConf();
$topicConf->set("auto.commit.interval.ms", 1e3);
$topicConf->set("offset.store.sync.interval.ms", 60e3);
$topicConf->set("group.id", 'group1');

$rk = new RdKafka\Consumer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("10.110.16.46:9092,10.110.16.47:9092,10.110.16.48:9092");

$topic = $rk->newTopic("test1", $topicConf);
$topic->consumeStart(1, RD_KAFKA_OFFSET_BEGINNING);

$queue = $rk->newQueue();

$topic->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);
$topic->consumeQueueStart(1, RD_KAFKA_OFFSET_BEGINNING, $queue);

$i = 0;
while (true) {
$msg = $queue->consume(1000);
if ($msg->err) {
echo "----------------" . $msg->errstr(), "\n";
} else {
echo $msg->payload, "\n";
}
echo " ======= " . $i++ . " =============== " . "\n";
usleep(1000);
}

Merge PHP5/7 branches, or version separately

The current practice of having two different tags of the same version for PHP 5 and PHP 7 has the obvious fault of not allowing a PECL release for both.

Either the codebases should be merged into one (like some extensions do), or e.g. version 1.x should be for PHP 5, and 2.x for PHP 7 (like other extensions do). Option #1 is the better choice IMO :)

KafkaConsumer->commit() crash

Whenever commit() is called php-rdkafka crashes with segmentation fault.
Calling commit(null) says that null is not a valid operation.
Calling commit($message) works.

So basically the two cases that are most interesting are not working. I'm using php-rdkafka commit 6ad5fea and rdkafka 0.8.6-1 compiled from librdkafka-confluent-debian-0.9.0.99 release, php 5.6.19+dfsg-0+deb8u1.

Last frame of the crash:

#0  consumer_commit (async=0, ht=<optimized out>, this_ptr=<optimized out>, 
    return_value_used=<optimized out>, return_value_ptr=<optimized out>, 
    return_value=<optimized out>) at php-rdkafka/kafka_consumer.c:439
439         if (Z_TYPE_P(zarg) == IS_OBJECT && instanceof_function(Z_OBJCE_P(zarg), ce_kafka_message TSRMLS_CC)) {
(gdb) l
434     if (!intern) {
435         return;
436     }
437 
438     if (zarg) {
439         if (Z_TYPE_P(zarg) == IS_OBJECT && instanceof_function(Z_OBJCE_P(zarg), ce_kafka_message TSRMLS_CC)) {
440             zval *zerr;
441             zval *ztopic;
442             zval *zpartition;
443             zval *zoffset;

Support of 0.9.x versions of Kafka

When I try to build extension with librdkafka greater than 0.9.0.0 error occurs
configure: error: Unsupported librdkafka version. Supported versions: 0.8.x

RD_KAFKA_OFFSET_STORED not work

I ran /examples/producer.php and /examples/consumer.php and it worked well. But if I changes consumer.php:9 as the following:

// $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); // Before
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); // After

It always output

Broker: No more messages

What's the possible reason?

I run it on CentOS 6.7/PHP 7.0.1.

Expose rd_kafka_position and rd_kafka_committed

librdkafka has two methods that are very useful for comitting offsets manually when consuming.

rd_kafka_position: https://github.com/edenhill/librdkafka/blob/dca8b2ee91076f2ca72f00abb4d18102a4b7e19a/src/rdkafka.h#L2035

and

rd_kafka_committed:
https://github.com/edenhill/librdkafka/blob/dca8b2ee91076f2ca72f00abb4d18102a4b7e19a/src/rdkafka.h#L2016

That is, instead of relying on commit() without arguments which automatically commits the fetched messages, it's possible to manually provide the offsets to be committed.

The use case is when fetched messages are consumed partially. That is, if 10 messages are fetched but only the first 5 have been processed for one reason or another, it's wanted to only commit 5 messages, hence pass custom information to commit().
However, currently in php-rdkafka there's no way to know such offsets, if I'm not wrong.

rd_kafka_position is certainly the most interesting for such a use case, but given rd_kafka_committed has a similar API, it shouldn't be hard to also expose it.

Segfault on debian

Hello,
I have a bug with PHP 5.5.30-1~dotdeb+7.1, kafka_2.10-0.8.2.2.
When I use the library I have a segfault when I use this function :

"$topic->produce(RD_KAFKA_PARTITION_UA, 0, serialize($argument));"

Here my code :
$topic_name = 'task_open';
$rk = new RdKafka\Producer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("localhost");
$topic = $rk->newTopic($topic_name);
$topic->produce(RD_KAFKA_PARTITION_UA, 0, serialize($argument));

Thank you,

php7 process dont die while failing to connect to broker

thanks for the extension !

testing again php7.0.1, i am facing this error :

$ php test.php
%3|1452077507.698|FAIL|rdkafka#producer-0| 192.168.50.14:9092/bootstrap: Failed to connect to broker at kafka-node1:9092: Connection refused
%3|1452077507.698|ERROR|rdkafka#producer-0| 192.168.50.14:9092/bootstrap: Failed to connect to broker at kafka-node1:9092: Connection refused
%3|1452077507.698|ERROR|rdkafka#producer-0| 1/1 brokers are down
^C^C^C^C^C^C^C

and the script become unkillable. Am I missing something about timeout ?

my test is pretty straitforward

<?php

$rk = new RdKafka\Producer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("192.168.50.14");

$topic = $rk->newTopic("test");
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");

exit(0);

Bump version to 0.0.3

Some time has passed since last tag. How about bumping a version to 0.0.3 in PECL repository?

producer can not produce message when the loop count > 25

hi,arnaud-lb:

My producer can not produce message to kafka broker when the look count > 25.

  • PHP version:5.5.35 on CentOS
  • librdkafka version:0.9.1
  • php-rdkfaka version:master branch

this is my test script:

<?php

$conf = new RdKafka\Conf();
$conf->set('queue.buffering.max.ms','600000');
$kafka = new RdKafka\Producer($conf);
$kafka->setLogLevel(LOG_DEBUG);
$kafka->addBrokers("172.16.10.236:9092");

$topicConf = new RdKafka\TopicConf();
$topic = $kafka->newTopic("topic1");

$i = 1;
while ($i <= 25) {
    $topic->produce(0, 0, "Message $i");
    $i++;
}

?>

then run the consume shell,i can not see any message from the topic "topic1":

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic1 --from-beginning

but if i adjust the loop count to 20,it works.

i wonder why the different loop count can have different performance. is the produce queue related?

Segfault on consuming messages via Queue

Hi,

I've just spotted some issue around consuming messages using Queue object. My environmental settings:

php version: 5.6.20
librdkafka: 0.8.6.0
php-rdkafka: the recent one (commit 5ff585986c0cbf6f01753f79b5c2291d437a60eb)

My Kafka broker runs with default configuration. There are three topics, every topic has stored ~50 messages, so there's no uncommon settings or edge cases here. Once I'm running a simple unit test I get a segfault. A code for unit test:

<?php

namespace RdKafka;

class QueueTest extends \PHPUnit_Framework_TestCase
{
    const PARTITION = 0;

    /**
     * @var ConsumerTopic
     */
    private $consumerTopic;

    /**
     * @var Queue
     */
    private $queue;

    public function setUp()
    {
        $consumer = new Consumer();
        $consumer->addBrokers('localhost:9092');

        $this->consumerTopic = $consumer->newTopic('test');
        $this->queue = $consumer->newQueue();
    }

    public function testConsumeIntoQueue()
    {
        $this->consumerTopic->consumeQueueStart(self::PARTITION, -5, $this->queue);
        $this->consumerTopic->consume(self::PARTITION, 100);
        $this->consumerTopic->consumeStop(self::PARTITION);

        $message = $this->queue->consume(200);
    }
}

Once I run tests, I get a below output:

*** rdkafka_msg.c:40:rd_kafka_msg_destroy: assert: rk->rk_producer.msg_cnt > 0 ***
rd_kafka_t 0x2e77bd0: rdkafka#consumer-28
 refcnt 5, producer.msg_cnt 0
 rk_rep reply queue: 0 ops
 brokers:
 rd_kafka_broker_t 0x2ebae10: localhost:9092/bootstrap NodeId -1 in state UP (for 0.013s)
  refcnt 1
  outbuf_cnt: 0 waitresp_cnt: 0
  1 messages sent, 25 bytes, 0 errors, 0 timeouts
  1 messages received, 150 bytes, 0 errors
  0 messageset transmissions were retried
  0 toppars:
 rd_kafka_broker_t 0x7ff16c001fc0: thinkpad:9092/0 NodeId 0 in state UP (for 0.012s)
  refcnt 2
  outbuf_cnt: 0 waitresp_cnt: 0
  1 messages sent, 25 bytes, 0 errors, 0 timeouts
  1 messages received, 150 bytes, 0 errors
  0 messageset transmissions were retried
  1 toppars:
Segmentation fault (core dumped)

How to reuse rd_kafka_t?

Hi guys,

I was talking with @arnaud-lb about reuse rd_kafka_t to do not create different connections every message published, how can I help you guys to do this?

Thanks for the project

'bootstrap: Receive failed: Disconnected' when long time no producer input data

I create a topic:

bin/kafka-topics.sh --create --zookeeper 'xxx' --replication-factor 2 --partitions 2 --topic test

Then I run consumer.php in background:

<?php
$conf = new RdKafka\Conf();
// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'myConsumerGroup');
// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '10.81.8.xxx:xxx,10.81.7.xxx:xxx');
$consumer = new RdKafka\KafkaConsumer($conf);
// Subscribe to topic 'test'
$consumer->subscribe(['test']);
while (true) {
    $message = $consumer->consume(10e3);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timedout\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

But I don't produce any data.
PHP script output:

...
Timedout
Timedout
%3|1458051022.959|FAIL|rdkafka#consumer-1| 10.81.7.xxx:xxx/bootstrap: Receive failed: Disconnected
%3|1458051022.959|ERROR|rdkafka#consumer-1| 10.81.7.xxx:xxx/bootstrap: Receive failed: Disconnected
...
%3|1458051923.417|FAIL|rdkafka#consumer-1| 10.81.8.xxx:xxx/bootstrap: Receive failed: Disconnected
%3|1458051923.417|ERROR|rdkafka#consumer-1| 10.81.8.xxx:xxx/bootstrap: Receive failed: Disconnected
Timedout
...

But when producer input data, consumer script cant get message. Not like disconnect.
Is this normal?

I use PHP7 + kafka_2.11-0.9.0.1 + librdkafka-master + php-rdkafka-0.9.1-php7.

Conceptual question using the extension

Hi Guys,

I am very sorry if my question seems irrelevant, but I though I would ask the question here first as perhaps you have experience with this or it might be useful for other users too.

I would like to send log data to a Kafka cluster from PHP although I have a question on a potential problem.

1, if I have events generated on the server running PHP regularly. so What happens if the box goes offline ( no internet or can not reach Kafka) but it generates tons of events which will be sent to librdkafka.

  • Does it have a buffer to hold messages until the connection to the Kafka cluster is back online ?
  • Is this buffer in memory only. Does the buffer have any fail-over?

Do I need to come up with a persistence solution in case for such failover scenario to make sure that my events will be sent at least once ?

Thanks for any suggestions!

Call to undefined method RdKafka\KafkaConsumer::newTopic()

php version is 5.6.16
rdkafka

rdkafka support => enabled
version => 0.9.1
build date => Mar  4 2016 17:43:12
librdkafka version => 0.9.1.0

code:

<?php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'test');
$conf->set('metadata.broker.list', '127.0.0.1');
$consumer = new RdKafka\KafkaConsumer($conf);
$topic = $consumer->newTopic('test');
var_dump($topic);

Consumer not working

Consumer on start gives an error:
%4|1461081921.391|PROTOERR|rdkafka#consumer-1| xx.xxx.xx.xx:9092/0: Protocol parse failure at rd_kafka_fetch_reply_handle:3042
%4|1461081921.391|PROTOERR|rdkafka#consumer-1| xx.xxx.xx.xx:9092/0: expected 7298525 bytes > 1048600 remaining bytes.
Unable to figure out the reason for such an error. Tried different configuration for consumer setting still the same issue.

Persistant connection would be nice to have

Each time PHP script gets executed, a new instance is created of Kafka. It would be nice if there was some way of keeping the connection persistent and not having to fetch meta data for each request being made. This is especially helpful in case of web services with php-fpm and nginx to avoid latencies due to meta data fetch.

Instead of creating a new topic object each time, it would reuse the same topic object across script invocations.

Concurrent consuming timeout error

When I try to consume one topic and partition by two workers in one consumer group works only one worker.. another can't consume and raises time out error. Why?

how to make

i use centos 6.7 64 system
when configure and make command like this

creating libtool
appending configuration tag "CXX" to libtool
configure: creating ./config.status
config.status: creating config.h
config.status: config.h is unchanged

/bin/sh /root/kafka/php-rdkafka-master/libtool --mode=compile cc -I. -I/root/kafka/php-rdkafka-master -DPHP_ATOM_INC -I/root/kafka/php-rdkafka-master/include -I/root/kafka/php-rdkafka-master/main -I/root/kafka/php-rdkafka-master -I/data/bin/php/include/php -I/data/bin/php/include/php/main -I/data/bin/php/include/php/TSRM -I/data/bin/php/include/php/Zend -I/data/bin/php/include/php/ext -I/data/bin/php/include/php/ext/date/lib -I/usr/local/include -DHAVE_CONFIG_H -g -O0 -c /root/kafka/php-rdkafka-master/rdkafka.c -o rdkafka.lo
mkdir .libs
cc -I. -I/root/kafka/php-rdkafka-master -DPHP_ATOM_INC -I/root/kafka/php-rdkafka-master/include -I/root/kafka/php-rdkafka-master/main -I/root/kafka/php-rdkafka-master -I/data/bin/php/include/php -I/data/bin/php/include/php/main -I/data/bin/php/include/php/TSRM -I/data/bin/php/include/php/Zend -I/data/bin/php/include/php/ext -I/data/bin/php/include/php/ext/date/lib -I/usr/local/include -DHAVE_CONFIG_H -g -O0 -c /root/kafka/php-rdkafka-master/rdkafka.c -fPIC -DPIC -o .libs/rdkafka.o
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘kafka_init’:
/root/kafka/php-rdkafka-master/rdkafka.c:142: warning: cast to pointer from integer of different size
/root/kafka/php-rdkafka-master/rdkafka.c: At top level:
/root/kafka/php-rdkafka-master/rdkafka.c:148: error: expected ‘=’, ‘,’, ‘;’, ‘asm’ or ‘attribute’ before ‘kafka_new’
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘get_kafka_object’:
/root/kafka/php-rdkafka-master/rdkafka.c:166: warning: cast to pointer from integer of different size
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘kafka_conf_free’:
/root/kafka/php-rdkafka-master/rdkafka.c:185: error: used struct type value where scalar is required
/root/kafka/php-rdkafka-master/rdkafka.c: At top level:
/root/kafka/php-rdkafka-master/rdkafka.c:202: error: expected ‘=’, ‘,’, ‘;’, ‘asm’ or ‘attribute’ before ‘kafka_conf_new’
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘get_kafka_conf_object’:
/root/kafka/php-rdkafka-master/rdkafka.c:220: warning: cast to pointer from integer of different size
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘kafka_conf_error_cb’:
/root/kafka/php-rdkafka-master/rdkafka.c:239: error: wrong type argument to unary exclamation mark
/root/kafka/php-rdkafka-master/rdkafka.c:247:35: error: macro "ZVAL_STRING" passed 3 arguments, but takes just 2
/root/kafka/php-rdkafka-master/rdkafka.c:247: error: ‘ZVAL_STRING’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c:247: error: (Each undeclared identifier is reported only once
/root/kafka/php-rdkafka-master/rdkafka.c:247: error: for each function it appears in.)
/root/kafka/php-rdkafka-master/rdkafka.c:252: error: ‘zend_fcall_info’ has no member named ‘retval_ptr_ptr’
/root/kafka/php-rdkafka-master/rdkafka.c:253: warning: assignment from incompatible pointer type
/root/kafka/php-rdkafka-master/rdkafka.c:259: warning: passing argument 1 of ‘_zval_ptr_dtor’ from incompatible pointer type
/data/bin/php/include/php/Zend/zend_variables.h:112: note: expected ‘struct zval ’ but argument is of type ‘struct zval *
/root/kafka/php-rdkafka-master/rdkafka.c:261: warning: passing argument 1 of ‘_zval_ptr_dtor’ from incompatible pointer type
/data/bin/php/include/php/Zend/zend_variables.h:112: note: expected ‘struct zval ’ but argument is of type ‘struct zval *
/root/kafka/php-rdkafka-master/rdkafka.c:262: warning: passing argument 1 of ‘_zval_ptr_dtor’ from incompatible pointer type
/data/bin/php/include/php/Zend/zend_variables.h:112: note: expected ‘struct zval ’ but argument is of type ‘struct zval *
/root/kafka/php-rdkafka-master/rdkafka.c: At top level:
/root/kafka/php-rdkafka-master/rdkafka.c:284: error: expected ‘=’, ‘,’, ‘;’, ‘asm’ or ‘attribute’ before ‘kafka_queue_new’
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘get_kafka_queue_object’:
/root/kafka/php-rdkafka-master/rdkafka.c:302: warning: cast to pointer from integer of different size
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘kafka_topic_free’:
/root/kafka/php-rdkafka-master/rdkafka.c:320: warning: passing argument 1 of ‘_zval_ptr_dtor’ from incompatible pointer type
/data/bin/php/include/php/Zend/zend_variables.h:112: note: expected ‘struct zval ’ but argument is of type ‘struct zval *
/root/kafka/php-rdkafka-master/rdkafka.c: At top level:
/root/kafka/php-rdkafka-master/rdkafka.c:329: error: expected ‘=’, ‘,’, ‘;’, ‘asm’ or ‘attribute’ before ‘kafka_topic_new’
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘get_kafka_topic_object’:
/root/kafka/php-rdkafka-master/rdkafka.c:347: warning: cast to pointer from integer of different size
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__Conf___construct’:
/root/kafka/php-rdkafka-master/rdkafka.c:401: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__Conf_dump’:
/root/kafka/php-rdkafka-master/rdkafka.c:428: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c:449:67: error: macro "add_assoc_string" passed 4 arguments, but takes just 3
/root/kafka/php-rdkafka-master/rdkafka.c:449: error: ‘add_assoc_string’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__Conf_set’:
/root/kafka/php-rdkafka-master/rdkafka.c:479: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__Conf_setErrorCb’:
/root/kafka/php-rdkafka-master/rdkafka.c:525: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c:530: error: incompatible type for argument 1 of ‘zval_addref_p’
/data/bin/php/include/php/Zend/zend_types.h:820: note: expected ‘struct zval *’ but argument is of type ‘zval’
/root/kafka/php-rdkafka-master/rdkafka.c:532: error: used struct type value where scalar is required
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__ConsumerTopic_consumeQueueStart’:
/root/kafka/php-rdkafka-master/rdkafka.c:579: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__ConsumerTopic_consumeStart’:
/root/kafka/php-rdkafka-master/rdkafka.c:624: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__ConsumerTopic_consumeStop’:
/root/kafka/php-rdkafka-master/rdkafka.c:662: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__ConsumerTopic_consume’:
/root/kafka/php-rdkafka-master/rdkafka.c:702: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__ConsumerTopic_offsetStore’:
/root/kafka/php-rdkafka-master/rdkafka.c:742: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__Consumer___construct’:
/root/kafka/php-rdkafka-master/rdkafka.c:785: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__Kafka_addBrokers’:
/root/kafka/php-rdkafka-master/rdkafka.c:813: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__Kafka_metadata’:
/root/kafka/php-rdkafka-master/rdkafka.c:844: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__Kafka_setLogLevel’:
/root/kafka/php-rdkafka-master/rdkafka.c:883: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__Kafka_newQueue’:
/root/kafka/php-rdkafka-master/rdkafka.c:908: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c:923: warning: cast to pointer from integer of different size
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__Kafka_newTopic’:
/root/kafka/php-rdkafka-master/rdkafka.c:956: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c:989: warning: cast to pointer from integer of different size
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__Kafka_outqLen’:
/root/kafka/php-rdkafka-master/rdkafka.c:1014: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__Kafka_poll’:
/root/kafka/php-rdkafka-master/rdkafka.c:1039: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__Kafka_setLogger’:
/root/kafka/php-rdkafka-master/rdkafka.c:1065: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c:1085: warning: ‘rd_kafka_set_logger’ is deprecated (declared at /usr/local/include/librdkafka/rdkafka.h:2103)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__Message_errstr’:
/root/kafka/php-rdkafka-master/rdkafka.c:1118: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c:1118: error: too few arguments to function ‘zend_read_property’
/root/kafka/php-rdkafka-master/rdkafka.c:1124: error: too few arguments to function ‘zend_read_property’
/root/kafka/php-rdkafka-master/rdkafka.c:1133:32: error: macro "RETURN_STRING" passed 2 arguments, but takes just 1
/root/kafka/php-rdkafka-master/rdkafka.c:1133: error: ‘RETURN_STRING’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__ProducerTopic_produce’:
/root/kafka/php-rdkafka-master/rdkafka.c:1179: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__Producer___construct’:
/root/kafka/php-rdkafka-master/rdkafka.c:1216: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__Queue_consume’:
/root/kafka/php-rdkafka-master/rdkafka.c:1245: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__TopicConf___construct’:
/root/kafka/php-rdkafka-master/rdkafka.c:1286: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__TopicConf_setPartitioner’:
/root/kafka/php-rdkafka-master/rdkafka.c:1310: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zim_RdKafka__Topic_getName’:
/root/kafka/php-rdkafka-master/rdkafka.c:1355: error: ‘this_ptr’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c:1360:54: error: macro "RETURN_STRING" passed 2 arguments, but takes just 1
/root/kafka/php-rdkafka-master/rdkafka.c:1360: error: ‘RETURN_STRING’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c:1384:32: error: macro "RETURN_STRING" passed 2 arguments, but takes just 1
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zif_rd_kafka_err2str’:
/root/kafka/php-rdkafka-master/rdkafka.c:1384: error: ‘RETURN_STRING’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c: In function ‘zm_startup_rdkafka’:
/root/kafka/php-rdkafka-master/rdkafka.c:1515: error: ‘kafka_new’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c:1518: error: too many arguments to function ‘zend_register_internal_class_ex’
/root/kafka/php-rdkafka-master/rdkafka.c:1521: error: too many arguments to function ‘zend_register_internal_class_ex’
/root/kafka/php-rdkafka-master/rdkafka.c:1525: error: ‘kafka_conf_new’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c:1534: error: ‘kafka_topic_new’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c:1537: error: too many arguments to function ‘zend_register_internal_class_ex’
/root/kafka/php-rdkafka-master/rdkafka.c:1540: error: too many arguments to function ‘zend_register_internal_class_ex’
/root/kafka/php-rdkafka-master/rdkafka.c:1554: error: ‘kafka_queue_new’ undeclared (first use in this function)
/root/kafka/php-rdkafka-master/rdkafka.c:1557: error: too many arguments to function ‘zend_register_internal_class_ex’

Producer waiting for something after `$topic->produce()`

public function addToKafka($queue, $params = array())
{
        echo "started\n";
        $job = new \RdKafka\Producer();
        $job->setLogLevel(LOG_DEBUG);
        $job->addBrokers(KafkaJob::getBrokers());

        echo "get topic\n";
        $topic = $job->newTopic($queue);

        $message = new KafkaJob();
        $message->queue = $queue;
        $message->parameters = urlencode(serialize($params));

        echo "producing...\n";
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message->toArray()));
        echo "produced\n";
        $message = null;
        echo "unsetting...\n";
        unset($message);
        echo "unset\n";
        return true;
}

This is my function for producing messages to kafka.
The function waits for something while execution. So I tried debugging and found out that $topic->produce() was asynchronous, the function ran properly till echo "unset\n"; but, it was not executing return true; for ~30 secs.

I tried producing messages with
./bin/kafka-console-producer.sh --broker-list host:port --topic test-produce
This was waiting for nothing.

Am I missing something?

Windows php client?

Usually developed and debugged in the windows environment? Is there an extension to the Windows version?

Problem of consuming messages

Hi, I have some problems of consuming messages.

 * PHP version:  5.4.45
 * librdkafka version: 0.9.1.0
 * php-rdkfaka version: 0.9.1

Here is my code:

<?php

$conf = new RdKafka\Conf();

$conf->set('group.id', 'testgroup');

$rk = new RdKafka\Consumer();
$rk->addBrokers("10.1.14.15");

$queue = $rk->newQueue();

$topicConf = new RdKafka\TopicConf();

$topic = $rk->newTopic("my-test3", $topicConf);

$topic->consumeQueueStart(0, RD_KAFKA_OFFSET_STORED, $queue);

while (true) {

    $msg = $queue->consume(1000);

    if ($msg->err) {
        echo $msg->errstr(), "<br>";
        return;

    } else {

        echo $msg->payload, "<br>";
    }

}

And it gets an error like this:

PHP Fatal error:  Uncaught exception 'RdKafka\Exception' with message 'Local: Invalid argument or configuration' in /home/www/site/consumer_2.php:16
Stack trace:
#0 /home/www/site/consumer_2.php(16): RdKafka\ConsumerTopic->consumeQueueStart(0, -1000, Object(RdKafka\Queue))
#1 {main}
  thrown in /home/www/site/consumer_2.php on line 16

It seems that RD_KAFKA_OFFSET_STORED is an Invalid argument.

errors when install this package

I try the librdkafka 0.8.6, and install this package for php 5.6 using "pecl install channel://pecl.php.net/rdkafka-0.0.2", get some errors as follow, also i have tried the version 0.8.5/master, and got the same errors.

/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘kafka_init’:
/tmp/pear/temp/rdkafka/rdkafka.c:133: error: ‘tsrm_ls’ undeclared (first use in this function)
/tmp/pear/temp/rdkafka/rdkafka.c:133: error: (Each undeclared identifier is reported only once
/tmp/pear/temp/rdkafka/rdkafka.c:133: error: for each function it appears in.)
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘kafka_new’:
/tmp/pear/temp/rdkafka/rdkafka.c:145: error: too few arguments to function ‘zend_object_std_init’
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘get_kafka_object’:
/tmp/pear/temp/rdkafka/rdkafka.c:157: error: too few arguments to function ‘zend_object_store_get_object’
/tmp/pear/temp/rdkafka/rdkafka.c:160: error: ‘tsrm_ls’ undeclared (first use in this function)
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘kafka_conf_new’:
/tmp/pear/temp/rdkafka/rdkafka.c:196: error: too few arguments to function ‘zend_object_std_init’
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘get_kafka_conf_object’:
/tmp/pear/temp/rdkafka/rdkafka.c:208: error: too few arguments to function ‘zend_object_store_get_object’
/tmp/pear/temp/rdkafka/rdkafka.c:211: error: ‘tsrm_ls’ undeclared (first use in this function)
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘kafka_queue_new’:
/tmp/pear/temp/rdkafka/rdkafka.c:238: error: too few arguments to function ‘zend_object_std_init’
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘get_kafka_queue_object’:
/tmp/pear/temp/rdkafka/rdkafka.c:250: error: too few arguments to function ‘zend_object_store_get_object’
/tmp/pear/temp/rdkafka/rdkafka.c:253: error: ‘tsrm_ls’ undeclared (first use in this function)
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘kafka_topic_new’:
/tmp/pear/temp/rdkafka/rdkafka.c:283: error: too few arguments to function ‘zend_object_std_init’
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘get_kafka_topic_object’:
/tmp/pear/temp/rdkafka/rdkafka.c:295: error: too few arguments to function ‘zend_object_store_get_object’
/tmp/pear/temp/rdkafka/rdkafka.c:298: error: ‘tsrm_ls’ undeclared (first use in this function)
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘new_message’:
/tmp/pear/temp/rdkafka/rdkafka.c:307: error: ‘tsrm_ls’ undeclared (first use in this function)
/tmp/pear/temp/rdkafka/rdkafka.c:309: error: too few arguments to function ‘zend_update_property_long’
/tmp/pear/temp/rdkafka/rdkafka.c:310: error: too few arguments to function ‘zend_update_property_string’
/tmp/pear/temp/rdkafka/rdkafka.c:311: error: too few arguments to function ‘zend_update_property_long’
/tmp/pear/temp/rdkafka/rdkafka.c:313: error: too few arguments to function ‘zend_update_property_stringl’
/tmp/pear/temp/rdkafka/rdkafka.c:316: error: too few arguments to function ‘zend_update_property_stringl’
/tmp/pear/temp/rdkafka/rdkafka.c:318: error: too few arguments to function ‘zend_update_property_long’
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘zim_RdKafka___construct’:
/tmp/pear/temp/rdkafka/rdkafka.c:327: error: too few arguments to function ‘zend_throw_exception’
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘zim_RdKafka__Conf_set’:
/tmp/pear/temp/rdkafka/rdkafka.c:452: error: too few arguments to function ‘zend_throw_exception’
/tmp/pear/temp/rdkafka/rdkafka.c:455: error: too few arguments to function ‘zend_throw_exception’
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘zim_RdKafka__ConsumerTopic_consumeQueueStart’:
/tmp/pear/temp/rdkafka/rdkafka.c:512: error: too few arguments to function ‘zend_throw_exception’
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘zim_RdKafka__ConsumerTopic_consumeStart’:
/tmp/pear/temp/rdkafka/rdkafka.c:551: error: too few arguments to function ‘zend_throw_exception’
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘zim_RdKafka__ConsumerTopic_consumeStop’:
/tmp/pear/temp/rdkafka/rdkafka.c:588: error: too few arguments to function ‘zend_throw_exception’
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘zim_RdKafka__ConsumerTopic_consume’:
/tmp/pear/temp/rdkafka/rdkafka.c:630: error: too few arguments to function ‘zend_throw_exception’
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘zim_RdKafka__Kafka_metadata’:
/tmp/pear/temp/rdkafka/rdkafka.c:743: error: too few arguments to function ‘zend_throw_exception’
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘zim_RdKafka__ProducerTopic_produce’:
/tmp/pear/temp/rdkafka/rdkafka.c:1024: error: too few arguments to function ‘zend_throw_exception’
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘zim_RdKafka__Queue_consume’:
/tmp/pear/temp/rdkafka/rdkafka.c:1096: error: too few arguments to function ‘zend_throw_exception’
/tmp/pear/temp/rdkafka/rdkafka.c: In function ‘zm_startup_rdkafka’:
/tmp/pear/temp/rdkafka/rdkafka.c:1308: error: too few arguments to function ‘zend_declare_property_null’
/tmp/pear/temp/rdkafka/rdkafka.c:1309: error: too few arguments to function ‘zend_declare_property_null’
/tmp/pear/temp/rdkafka/rdkafka.c:1310: error: too few arguments to function ‘zend_declare_property_null’
/tmp/pear/temp/rdkafka/rdkafka.c:1311: error: too few arguments to function ‘zend_declare_property_null’
/tmp/pear/temp/rdkafka/rdkafka.c:1312: error: too few arguments to function ‘zend_declare_property_null’
/tmp/pear/temp/rdkafka/rdkafka.c:1313: error: too few arguments to function ‘zend_declare_property_null’
make: *** [rdkafka.lo] Error 1
ERROR: `make' failed

Typo in README

I'm not sure, but it seems there is a typo in the README for queue based consumer,

It should probably be $topic1->consumeQueueStart(...) rather than $queue->..., right?

<?php

$topic1 = $rk->newTopic("topic1");
$queue->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);
$queue->consumeQueueStart(1, RD_KAFKA_OFFSET_BEGINNING, $queue);

$topic2 = $rk->newTopic("topic2");
$queue->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);

configure error

/data/software/php-rdkafka/rdkafka.c: In function ‘zim_RdKafka__TopicConf_setPartitioner’:
/data/software/php-rdkafka/rdkafka.c:1207: error: ‘rd_kafka_msg_partitioner_consistent’ undeclared (first use in this function)
/data/software/php-rdkafka/rdkafka.c:1207: error: (Each undeclared identifier is reported only once
/data/software/php-rdkafka/rdkafka.c:1207: error: for each function it appears in.)
make: *** [rdkafka.lo] Error 1

cat /proc/version

Linux version 2.6.32-279.el6.x86_64 ([email protected]) (gcc version 4.4.6 20120305 (Red Hat 4.4.6-4) (GCC) ) #1 SMP Fri Jun 22 12:19:21 UTC 2012

undefined symbol: object_properties_init

CentOS 6

phpize

Configuring for:
PHP Api Version: 20090626
Zend Module Api No: 20090626
Zend Extension Api No: 220090626

Performed make, added extension to php.ini and make tests says...
PHP Warning: PHP Startup: Unable to load dynamic library '/home/gspiegel/src/kafka/php-rdkafka/modules/rdkafka.so' - /home/gspiegel/src/kafka/php-rdkafka/modules/rdkafka.so: undefined symbol: object_properties_init in Unknown on line 0

TIA.

TopicConf is invalid

I'am using topic configuration for a comsumer, and get an error below:

PHP Fatal error: Uncaught exception 'RdKafka\Exception' with message 'Local: Invalid argument or configuration' in /home/clyde/c.php:13
Stack trace:
#0 /home/clyde/c.php(13): RdKafka\ConsumerTopic->consumeStart(0, -1000)
#1 {main}

thrown in /home/clyde/c.php on line 13

And I dumped the configuration as follows:
array(12) {
["request.required.acks"]=>
string(1) "1"
["request.timeout.ms"]=>
string(4) "5000"
["message.timeout.ms"]=>
string(6) "300000"
["produce.offset.report"]=>
string(5) "false"
["compression.codec"]=>
string(7) "inherit"
["auto.commit.enable"]=>
string(4) "true"
["auto.commit.interval.ms"]=>
string(4) "1000"
["auto.offset.reset"]=>
string(7) "largest"
["offset.store.path"]=>
string(1) "."
["offset.store.sync.interval.ms"]=>
string(2) "-1"
["offset.store.method"]=>
string(6) "broker"
["consume.callback.max.messages"]=>
string(1) "0"
}

Also I went through the https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md, but got nothing about this error.

Error with PHP7

I compiled extension with php7, but i get this error when executing script
PHP Warning: PHP Startup: Unable to load dynamic library '/pathto/rdkafka.so' - /pathto/rdkafka.so: undefined symbol: zval_used_for_init in Unknown on line 0
Any idea to make it work ?
Thanks

Segmentation fault for producer

when I used the producer as your examples, and got an error below:
Segmentation fault
what happened ? please help me and give more details about it.

configure: error: Please reinstall the rdkafka distribution

First, Thx @arnaud-lb create this extension. But I have problem to build it.

$ ./configure --with-php-config=/home/work/local/app/php7/bin/php-config 

checking for grep that handles long lines and -e... /bin/grep
checking for egrep... /bin/grep -E
checking for a sed that does not truncate output... /bin/sed
checking for cc... cc
checking for C compiler default output file name... a.out
checking whether the C compiler works... yes
checking whether we are cross compiling... no
checking for suffix of executables...
checking for suffix of object files... o
checking whether we are using the GNU C compiler... yes
checking whether cc accepts -g... yes
checking for cc option to accept ISO C89... none needed
checking how to run the C preprocessor... cc -E
checking for icc... no
checking for suncc... no
checking whether cc understands -c and -o together... yes
checking for system library directory... lib
checking if compiler supports -R... no
checking if compiler supports -Wl,-rpath,... yes
checking build system type... x86_64-unknown-linux-gnu
checking host system type... x86_64-unknown-linux-gnu
checking target system type... x86_64-unknown-linux-gnu
checking for PHP prefix... /home/work/local/app/php7
checking for PHP includes... ...
checking for PHP extension directory... ...
checking for PHP installed headers prefix... /home/work/local/app/php7/include/php
checking if debug is enabled... no
checking if zts is enabled... no
checking for re2c... re2c
checking for re2c version... 0.16 (ok)
checking for gawk... gawk
checking for rdkafka support... yes, shared
checking for librdkafka/rdkafka.h" in default path... not found
configure: error: Please reinstall the rdkafka distribution
$ php7 -v

PHP 7.0.4 (cli) (built: Mar 11 2016 17:01:16) ( NTS )
Copyright (c) 1997-2016 The PHP Group
Zend Engine v3.0.0, Copyright (c) 1998-2016 Zend Technologies
    with Zend OPcache v7.0.6-dev, Copyright (c) 1999-2016, by Zend Technologies

My system is CentOS release 6.3 (Final).

I down php-rdkafka-php7.zip from git clone and https://github.com/arnaud-lb/php-rdkafka/archive/0.9.1-php7.zip, But both build failure.

Someone who help me? Thx.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.