Giter VIP home page Giter VIP logo

kafka-php's Introduction

Kafka-php

中文文档

QQ Group Build Status Packagist Packagist Packagist GitHub issues GitHub forks GitHub stars GitHub license

Kafka-php is a pure PHP kafka client that currently supports greater than 0.8.x version of Kafka, this project v0.2.x and v0.1.x are incompatible if using the original v0.1.x You can refer to the document Kafka PHP v0.1.x Document, but it is recommended to switch to v0.2.x . v0.2.x use PHP asynchronous implementation and kafka broker interaction, more stable than v0.1.x efficient, because the use of PHP language so do not compile any expansion can be used to reduce the access and maintenance costs

Requirements

  • Minimum PHP version: 7.1
  • Kafka version greater than 0.8
  • The consumer module needs kafka broker version greater than 0.9.0

Installation

Add the lib directory to the PHP include_path and use an autoloader like the one in the examples directory (the code follows the PEAR/Zend one-class-per-file convention).

Composer Install

Simply add a dependency nmred/kafka-php to your project if you use Composer to manage the dependencies of your project.

$ composer require nmred/kafka-php

Here is a minimal example of a composer.json file :

{
	"require": {
		"nmred/kafka-php": "0.2.*"
	}
}

Configuration

Configuration properties are documented in Configuration

Producer

Asynchronous mode

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(
    function() {
        return [
            [
                'topic' => 'test',
                'value' => 'test....message.',
                'key' => 'testkey',
            ],
        ];
    }
);
$producer->setLogger($logger);
$producer->success(function($result) {
	var_dump($result);
});
$producer->error(function($errorCode) {
		var_dump($errorCode);
});
$producer->send(true);

Synchronous mode

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9192');
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer();
$producer->setLogger($logger);

for($i = 0; $i < 100; $i++) {
    $producer->send([
        [
            'topic' => 'test1',
            'value' => 'test1....message.',
            'key' => '',
        ],
    ]);
}

Consumer

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setGroupId('test');
$config->setBrokerVersion('1.0.0');
$config->setTopics(['test']);
//$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();
$consumer->setLogger($logger);
$consumer->start(function($topic, $part, $message) {
	var_dump($message);
});

Low-Level API

Refer Example

QQ Group

Group 1: 531522091 Group 2: 657517955 QQ Group

kafka-php's People

Contributors

alexkr avatar bd808 avatar crim avatar davidsheldon avatar dc-dandriyanov avatar ebernhardson avatar eknowlton avatar git-hulk avatar gromnan avatar lcobucci avatar lixiyu avatar mm-ylei avatar nmred avatar noname007 avatar nowgoo avatar pavemaksim avatar reedy avatar simpod avatar stevenhilder avatar suchasplus avatar thecrissaegrim avatar therifler avatar tongsq avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-php's Issues

set old offset

file:Kafka\Protocol\Fetch\Helper\CommitOffset on line 114

$offsetObject->setOffset($offset); //wrong

$offsetObject->setOffset($offset + 1); //correct

update partition offset for myself

I want update the offset with partition,but how to know the offset and how to calculate the current offset in the consumer's loop,it's my code:

$consumer = \Kafka\Consumer::getInstance('localhost:2181');
$consumer->setGroup('testgroup');
$consumer->setPartition('aatest-1', 0);
$consumer->setPartition('aatest-1', 1);
$consumer->setPartition('aatest-1', 2);
//$consumer->setTopic('aatest-1');
//while (1) {
$result = $consumer->fetch();
foreach ($result as $topicName => $partition) {
    foreach ($partition as $partId => $messageSet) {
        // var_dump($partition->getHighOffset())."-";
        foreach ($messageSet as $message) {
                //var_dump((string)$message);
               // on this,I will to do somethings with the message,if false,not update the offset,if true,call the commitOffset
        }
        //$offset = $partition->getMessageOffset();
    }
}

//up the offset
function commitOffset($topicName, $partition, $highOffset)
{
    echo $partition . "-" . $highOffset."\n";
//    exit;
    $data = array(
        'group_id' => 'testgroup',
        'data' => array(
            array(
                'topic_name' => 'aatest-1',
                'partitions' => array(
                    array(
                        'partition_id' => $partition,
                        'offset' => $highOffset,
                    ),
                ),
            ),
        ),
    );

    $conn = new \Kafka\Socket('locahost', '9092');
    $conn->connect();
    $encoder = new \Kafka\Protocol\Encoder($conn);
    $encoder->commitOffsetRequest($data);
    $decoder = new \Kafka\Protocol\Decoder($conn);
    $result = $decoder->commitOffsetResponse();
    var_dump($result);
}

"Bus error (core dumped)" when trying to run Produce example

I ran Produce.php in the example directory and it's dying after throwing "Bus error (core dumped)" message. Code that I have downloaded is unaltered except in ZooKeeper.php constructor where it was found to be Zookeeper instead of ZooKeeper so there was autoload error. I corrected it and now this error is coming

Recent change broke Offset Protocol

So the following line change: 12d4f2f#diff-6a05497ce6c49eaedc44b9b2b7075c8eL69

Broke retrieving offsets from Kafka. The response I get is:
array(1) { ["MyTopic"]=> array(1) { [0]=> array(3) { ["offset"]=> int(-1) ["metadata"]=> string(0) "" ["errCode"]=> int(768) } } }

Reverting that line and I can correctly retrieve offsets for various topics/partitions via the Offset api, which looks like
array(1) { ["MyTopic"]=> array(1) { [0]=> array(3) { ["offset"]=> int(-1) ["metadata"]=> string(0) "" ["errCode"]=> int(3) } } }

Produce::getInstance() ignores arguments

After first initialization arguments for Produce::getInstance() are silently ignored. This way you cannot use more than one kafka cluster. Is there a reason for this?

$ProducerOne = new Producer($someHosts, $timeout);
$ProducerTwo = new Producer($otherHosts, $timeout);

This way it is more flexible, users would cache Producer instance if they want to.

Consumer example failed

Hello,

I tried to execute example/Consumer.php, but the autoloader failed to include ZooKeeper.php:

$ php Consumer.php
PHP Fatal error:  Uncaught exception 'RuntimeException' with message 'ZooKeeper not found' in /tmp/kafka-php/example/autoloader.php:39
Stack trace:
#0 [internal function]: {closure}('ZooKeeper')
#1 /tmp/kafka-php/src/Kafka/ZooKeeper.php(94): spl_autoload_call('ZooKeeper')
#2 /tmp/kafka-php/src/Kafka/Consumer.php(134): Kafka\ZooKeeper->__construct('172.31.63.176:2...', NULL)
#3 /tmp/kafka-php/src/Kafka/Consumer.php(117): Kafka\Consumer->__construct('172.31.63.176:2...', NULL)
#4 /tmp/kafka-php/example/Consumer.php(4): Kafka\Consumer::getInstance('172.31.63.176:2...')
#5 {main}
  thrown in /tmp/kafka-php/example/autoloader.php on line 39

partition not required in kafka 0.8

in Kafka 0.8 when using zookeeper it is not required to specify a partition when consuming, do you have any plan for allowing this functionality?

Ack'ing messages

Hello,

How do I ack messages while consuming so that the server can remove them from the queue.

Also, is there a way to detect any errors while publishing the messages so that we can take actions, such as storing the message in a DB, when an error in publishing occurs.

Cleanup fully qualified names

This would be a biggish change, so want to ask before I do it...

Would you be alright if I tidied up the various fully qualified class names to swap them for imports etc?

Not bothered either way, just wanted to ask before making a relatively big change :)

Close connection

I am working with 2 clusters, and I am having issues with closing connections to A in order to use B cluster.

Cut version 1.5?

39 commits to master since 1.4 was released. Any plans to cut a new release?

Multiple instances of consumer processing same messages.

I am using this kafka client and its working for me properly for single instance of consumer. If I am running 2 - 3 instances of same consumer with respective to the group name all instances processing same messages. But I am expecting processed(committed) messages should not consume by other consumer if group name is same.

Consumer.php
1 Topic - test
3 Partitions - 0,1.2
Group name - testgroup

Running separate instances of consumer file. Randomly producing messages in each partitions.
php consumer.php
php consumer.php
php consumer.php

After message consumed, commiting same message. But all partitions are consuming on each consumer. Am I missing something?

How to handle exceptions when iterating?

I wrote a small consumer program for testing:

$consumer = \Kafka\Consumer::getInstance($zookeeperHosts, $zookeeperTimeout);
$consumer->setGroup('test-group');
$consumer->setPartition($topicName, $partitionId);

$response = $consumer->fetch();

foreach ($response as $tn => $tm) {
    foreach ($tm as $pid => $pm) {
        foreach ($pm as $m) {
            echo (string)$m . "\n";
        }
    }
}

After running about an hour, it crashed and output:

Fatal error: Uncaught exception 'Kafka\Exception\SocketEOF' with message 'Could not read 4 bytes from stream (not readable)' in /xxx/Kafka/Socket.php on line 267

Kafka\Exception\SocketEOF: Could not read 4 bytes from stream (not readable) in /xxx/Kafka/Socket.php on line 267

Call Stack:
    0.0003     652744   1. {main}() /xxx/consumer.php:0
 3531.9644   12658856   2. Kafka\Protocol\Fetch\MessageSet->valid() /xxx/consumer.php:44
 3531.9644   12658856   3. Kafka\Protocol\Fetch\Helper\Helper::onPartitionEof() /xxx/Kafka/Protocol/Fetch/MessageSet.php:168
 3531.9644   12658936   4. Kafka\Protocol\Fetch\Helper\CommitOffset->onPartitionEof() /xxx/Kafka/Protocol/Fetch/Helper/Helper.php:153
 3531.9670   12660800   5. Kafka\Offset->setOffset() /xxx/Kafka/Protocol/Fetch/Helper/CommitOffset.php:114
 3531.9680   12662704   6. Kafka\Protocol\Decoder->commitOffsetResponse() /xxx/Kafka/Offset.php:175
 3531.9680   12663096   7. Kafka\Socket->read() /xxx/Kafka/Protocol/Decoder.php:267

It seems that iterator threw the exception SocketEOF.

So my question is: when the same thing happens again, what should i do to keep my program safe and could re-fetch again.

(try-catch, then?)

error when not specifying the offset

I am recieving the following error when not specifying the offset while consuming.

Unexpected EOF while reading 4 bytes from stream (no data)

this is in the following function:

 public function fetchOffsetResponse()
    {
        $result = array();
        $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
        $dataLen = array_shift($dataLen);

I have also tried specifying an offset but no data seems to be returned from the topic. is there a specific option for the offset to return the latest data?

I believe that you can fetch the current offset from kafka and have it handle the offsets. you should only need to specify whether you want the beginning or end offset.

getting a zookeeper error when producing

I am receiving the following error when producing to kafka using the zookeeper method. Is there a way to close the connection to zookeeper? I am making multiple call to the function below and it seems to be trying to hold the connections open.

        $rand = rand(0, 130);
        $producer = \Kafka\Produce::getInstance('zookeeper-host', 3000);
        $producer->setRequireAck(1);
        $producer->setMessages('topic', $rand, $message);
        $result = $producer->send();

Here is the error i am getting constantly.It eventually causes my code to stop running.

2015-04-22 20:04:11,185:5586(0x7f502b936700):ZOO_WARN@zookeeper_interest@1557: Exceeded deadline by 13ms

Sending partitionId in setMessages

The Produce::setMessages call requires a partitionId to be sent to which the message will be sent.

Shouldn't this be decided by the zookeeper rather than the producer to specify it.

Is there a way to list all partitions in a given topic.

Can we create topics/partitions.

Unwritable Stream in Producer

I'm executing a long running producer job that is taking data from a database and putting it onto a Kafka topic for later processing. Sporadically, this producer job will error out with the exception Could not write 16353587 bytes to stream. I've looked into the code and tracked it down to the Kafka\Socket object's write($buf) function here:

$writable = stream_select($null, $write, $null, $this->sendTimeoutSec, $this->sendTimeoutUsec);
if ($writable > 0) { ... }
if (false !== $writable) { ... }
throw new \Kafka\Exception\Socket('Could not write ' . strlen($buf) . ' bytes to stream');

The only information I can find about this is that it has something to do with the stream no longer responding, or responding with an error. No errors are thrown to the system logs so this is becoming more and more difficult to debug. Any help would be appreciated.

堆积消息太多了怎么办?

Kafka\Exception\SocketEOF: Could not read 4294967295 bytes from stream, length too longer.

堆积消息太多。看过 Kafka broker/consumer 的 message 相关的 maxSize 配置,值都不大的,这里却一次性读取这么大,该怎么解决呢?

异常处理将位移回滚该怎么做才最好?

如下,$messageSet 循环完毕后会自动更新位移,我想在 $messageSet 循环完毕后将 $rows 写到文件里,但万一文件写入失败,想做异常处理将位移回滚该怎么做才是最好的实践?

            $consumer = \Kafka\Consumer::getInstance('127.0.0.1:2181');
            $consumer->setGroup($group);
            $consumer->setFromOffset(true);
            //$consumer->setPartition('recom_page', 0);
            $consumer->setTopic($topic);
            $consumer->setMaxBytes(102400);
            $result = $consumer->fetch();

            $rows = [];
            foreach ($result as $topicName => $partition) {
                foreach ($partition as $partId => $messageSet) {
                    foreach ($messageSet as $message) {
                        $rows[] = $this->process($message);
                    }
                }
            }

fwrite randomly fails in Socket

Hi,

I'm trying to send 30 000 messages to Kafka and sometimes fwrite fails with message:

PHP Notice:  fwrite(): send of 8192 bytes failed with errno=11 Resource temporarily unavailable in /test/vendor/nmred/kafka-php/src/Kafka/Socket.php on line 296

It's just a Notice, loop continues until $written reaches $buflen and all is written. But this notice is ugly and should be correctly handled. However, I don't understand why it happens "sometimes"...

composer clear-compiled error

I am gettign the below error after doing a composer update and then clear-compiled

Fatal error: Class 'ZooKeeper' not found in /home/vagrant/laravel/praxis/vendor/nmred/kafka-php/src/Kafka/ZooKeeper.php on line 67

This request is for a topic or partition that does not exist on this broker.

Kafka Version: 0.8

<?php

$hosts = '...';

$consumer = \Kafka\Consumer::getInstance($hosts);
$consumer->setGroup('test_consumer');
$consumer->setFromOffset(true);
$consumer->setTopic('test', 0);
$result = $consumer->fetch();
foreach ($result as $topicName => $partition) {
    echo 'Topic: ' . $topicName . PHP_EOL;
    foreach ($partition as $partId => $messageSet) {
        echo 'PartitionId: ' . $partId . PHP_EOL;
        foreach ($messageSet as $message) {
            echo 'Message: ' . $message . PHP_EOL;
        }
    }
}

Output:

Topic: test
PartitionId: 0
Message: {"type":"test","timestamp":1444062734}
PHP Fatal error:  Uncaught exception 'Kafka\Exception' with message 'This request is for a topic or partition that does not exist on this broker.' in /home/acme/kafka/vendor/nmred/kafka-php/src/Kafka/Offset.php:181
Stack trace:
#0 /home/acme/kafka/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php(114): Kafka\Offset->setOffset(0)
#1 /home/acme/kafka/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php(153): Kafka\Protocol\Fetch\Helper\CommitOffset->onPartitionEof(Object(Kafka\Protocol\Fetch\Partition))
#2 /home/acme/kafka/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php(168): Kafka\Protocol\Fetch\Helper\Helper::onPartitionEof(Object(Kafka\Protocol\Fetch\Partition))
#3 /home/acme/kafka/src/kafka_consumer.php(30): Kafka\Protocol\Fetch\MessageSet->valid()
#4 {main}
  thrown in /home/acme/kafka/vendor/nmred/kafka-php/src/Kafka/Offset.php on line 181

kafka_2.11-0.9.0.0 支持

Hi,我们这边的kafka_版本信息如下

kafka_2.11-0.9.0.0
zookeeper-3.4.6
PHP 5.5.25 (cli) (built: May 20 2015 07:30:18)

请问此扩展现在支持么
谢谢

Problem in install zookeeper extension in windows(Xampp)

Hello,
I am not able to get zookeeper class in php....Please suggest. How can i get zookeeper ext...

I got error messeage: Uncaught exception 'RuntimeException' with message 'ZooKeeper not found' in C:\xampp\htdocs\kafka\code\autoloader.php:38 Stack trace: #0 [internal function]: {closure}('ZooKeeper') #1 C:\xampp\htdocs\kafka\src\Kafka\ZooKeeper.php(103): spl_autoload_call('ZooKeeper') #2 C:\xampp\htdocs\kafka\src\Kafka\Consumer.php(121): Kafka\ZooKeeper->__construct('localhost:2181', NULL) #3 C:\xampp\htdocs\kafka\src\Kafka\Consumer.php(103): Kafka\Consumer->__construct('localhost:2181', NULL) #4 C:\xampp\htdocs\kafka\code\Consumer_test.php(39):

Please suggest.

Fatal error: Class 'ZooKeeper' not found

Hi,

I have problem when integrating with yii project, can you please check:
Fatal error: Class 'ZooKeeper' not found in:
\protected\vendors\kafka-php-master\src\Kafka\ZooKeeper.php on line 92

Here is the code:
Yii::import('application.vendors.kafka-php-master.example.*');
require_once 'autoloader.php';
$produce = \Kafka\Produce::getInstance('localhost:2181', 3000);
$produce->setRequireAck(-1);
$produce->setMessages('test', 0, array('test1111111'));
$result = $produce->send();
var_dump($result);

Duplicate messages in consumers

I'm seeing duplicate messages in my consumers, despite the log files appearing correctly. I see the 5 messages I send in the log, and then I see something like 400 messages (those same 5 messages repeated multiple times over) in the PHP consumer. A console consumer operates as expected. Any ideas?

$this->consumer = \Kafka\Consumer::getInstance('localhost:2181');
$this->consumer->setGroup($this->group);
$this->consumer->setFromOffset(true);
$this->consumer->setPartition($this->queue, $this->partition);
$this->consumer->setMaxBytes(1024000);

        $result = $this->consumer->fetch();
        foreach ($result as $topicName => $partition) {
                foreach ($partition as $partId => $messageSet) {
                        foreach ($messageSet as $message) {
                            $msg = json_decode($message->getMessage(), true);
                            $messages[] = $msg;

                            if ($msg['message'] === $this->stop) {
                                $this->completed = true;
                                return $messages;
                            }
                        }
                }
        }

any ideas?

Server side disconnect can cause library to go into infinite loop when producing

So I've noticed this after upgrading to Kafka 0.9.0.0, but I believe this problem exists with 0.8.x brokers if the server closes the clients connection.

Kafka 0.9.0.0 introduces a new configuration property on the broker called connections.max.idle.ms defined as: Idle connections timeout: the server socket processor threads close the connections that idle more than this. By default the timeout is 10 minutes.

If you connect a producer using this library to a 0.9.0.0 broker, and the client exceeds this idle timeout, and then goes to publish, it will get stuck in an infinite loop trying to fwrite the now dead stream. You'll get warnings like this repeated:

PHP Notice: fwrite(): send of 4096 bytes failed with errno=32 Broken pipe in /path/vendor/nmred/kafka-php/src/Kafka/Socket.php on line 358

PHP's fwrite() is great (sarcasm) in that it will return 0 when you attempt to fwrite a stream that has been closed by the server. Additionally stream_select() will continue to say that the stream is available to write. As far as I can tell, there is no reliable way to determine if the stream has been closed by the server. The only 'best practice' I can find online for this is to set a limit on how many write retries to allow when 0 bytes are written.

Cut a new release?

Hey is it possible to cut a new release? 1.5 is from December 2015.

Thanks!
Stephen

Getting Timed out reading socket error for kafka cluster setup

I want to setup a kafka cluster type setup for three similar application having same queues like AppA -> {TopicX, TopicY, TopicZ}, AppB -> {TopicX, TopicZ}, AppC -> {TopicX, TopicY}. Producer and Consumer will be App specific.
I setup kafka cluster with three brokers having partition 1,2,3 in three different config files with different ports. Then start kafka server ( cluster )

So I used Producer code for App A like

       $producer->setRequireAck(-1);
       $producer->setMessages("TopicX", 0, array(json_encode($this->data)));
       $producer->send();

AND used Producer code for App B like

       $producer->setRequireAck(-1);
       $producer->setMessages("TopicX", 1, array(json_encode($this->data)));
       $producer->send();

And So On.

Then I made my Consumer scripts for three apps like

        $queues = array("TopicX", "TopicY", "TopicZ");
        while(true) {
            foreach($queues as $queue) {
                $consumer = \Kafka\Consumer::getInstance('localhost:2181');
                $consumer->setGroup('testgroup');
                $consumer->setPartition($queue, 0);
                $result = $consumer->fetch();
           }
        }

But when I try to execute consumer script for any App I get error like

"Timed out reading socket while reading 750437 bytes with 750323 bytes to go"

I just don't know How I can fix this issue I tried to modify some kafka config parameters like

 zookeeper.connection.timeout.ms=24000         # Initially 6000
 replica.socket.timeout.ms=15000                      # Not exists in default file

but that not worked.

Support for KeyedMessages

Hi,
I don't know how to create KeyedMessage to be sent to Kafka.
Could you please provide an example or just say whether it is possible.
Regards.

Cannot connect to any kafka brokers

Hi,

Using the example without zookeeper.

$hostList = '192.168.85.181:2181';

$produce = \Kafka\Produce::getInstance(null, null, $hostList);
// get available partitions
$partitions = $produce->getAvailablePartitions('MyFirstTopic1');
var_dump($partitions);
// send message
$produce->setRequireAck(-1);
$produce->setMessages('test', 0, array('test11111110099090'));
$produce->setMessages('test', 0, array('test22222220099090'));
$result = $produce->send();
var_dump($result);

The PHP Error:

An uncaught Exception was encountered

Type: Kafka\Exception

Message: Could not connect to any kafka brokers

Filename: ../vendor/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php

Line Number: 202

The zookeeper log file:

[2017-03-27 15:59:25,305] INFO Accepted socket connection from /192.168.85.185:56502 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2017-03-27 15:59:25,309] WARN Exception causing close of session 0x0 due to java.io.IOException: Unreasonable length = 223181126 (org.apache.zookeeper.server.NIOServerCnxn)
[2017-03-27 15:59:25,309] INFO Closed socket connection for client /192.168.85.185:56502 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)

Using Kafka version: 0.10.2.0

Thanks

Huge delay while sending the message from producer

I was profiling producer code and found that producer->send() was taking more than 30 ms even on localhost! When I dug up a little I found that $this->zookeeper->getPartitionState($topicName, $partitionId); in src/Kafka/Client.php in function name getHostByPartition. This line of code was taking almost all of that time. The application I am working with cannot accept a delay of 5-10 ms more. Is there any workaround for this?

How to consume only new message from kafka

Hello,
I am not able to understand to how i consume only new message from kafka topic.. Please suggest:

I m using code as given below

$consumer = \Kafka\Consumer::getInstance('localhost:2181');
$group = 'updation';
$consumer->setGroup($group);
$consumer->setFromOffset(false);
//$consumer->setPartition('recom_page', 0);
$consumer->setTopic('updation',0);
$consumer->setMaxBytes(102400);
$result = $consumer->fetch();
$message_array = '';
foreach ($result as $topicName => $partition) {
foreach($partition as $partId => $messageSet){
foreach ($messageSet as $message) {
var_dump((string)$message);
}

}

}

Cached broker list causes issues when a broker drops in/out of a cluster

Using the Zookeeper metadata, the topic partition metadata is not cached, but Client.php does cache the broker list.

So now imagine the following scenario:
You have a 3 node cluster, replica factor of 3. You bring down one node in the cluster. So you now have 2 brokers. You fire up a long running php script that publishes to this cluster. At this point all of the metadata (brokers and partitions) are consistent with eachother. Now while this long running php script continues to run, you bring back up the 3rd broker. What happens now is your cached broker list only shows brokers 1 and 2, but your partition metadata will return broker 3 for the topics/partitions its assumed leadership for.

Since the Kafka Metadata instance caches both broker list and topic partition metadata, I believe it also suffers from a similar issue if leaders are shifted around within the cluster during the lifetime of the process.

Inbound PR that should solve this issue.

Consumer Groups

Hi,

I'm trying to use this library to access the high level consumer api. I'm doing something like:

$consumer->setGroup('group1');
$consumer->setTopic('topic1');
$iterator = $consumer->fetch();

Then I create a topic with 5 partitions (1 replica), run two processes of the consumer, and then start producing some random messages to different partitions. All messages end up in both consumers. Since they use the same consumer group, I was expecting the messages (via their partitions) to be distributed to the two processes. Is this not the case? Are there additional steps I have to take to make this work?

Producer code looks like:

$producer->setRequireAck(-1);
$producer->setMessages($topic, $partition, $messages);
$producer->send();

Sorry for any misunderstandings on my behalf of the kafka protocol, or this library - but any help would be greatly appreciated!

Fetching and consuming

Please explain what is advantages and disadvantages of consuming against fetching. The first uses Zookeeper, the last connects to Kafka directly. In which cases what should I use?

Two problems

Hi, nmred:
I am using your php kafka extension, Here is my question:
1, How to fetch only one message when consumer is running?
2, How to know that Message exists or not in the topic partition?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.