Giter VIP home page Giter VIP logo

Comments (35)

arnaud-lb avatar arnaud-lb commented on August 23, 2024 3

I would gladly accept help on this.

Basically, we would create an API like this:

$instanceName = "somename";
if (!$rk = \RdKafka\Producer::getInstance($instanceName)) {
    $rk = new \RdKafka\Producer($conf, $instanceName);
}

\RdKafka\Producer::getInstance() would search for an existing persistent Producer instance with the given name, and return it.

\RdKafka\Producer($conf, $name) would register itself if a name is passed as second argument.

Internally, only the rdkafka handle would be persistent, not the actual PHP object. getInstance() would search the persistent rdkafka handle, and create a new Producer PHP object with the rdkafka handle.

Producer() would not destroy the rdkafka handle if it's a persistent handle.

from php-rdkafka.

awons avatar awons commented on August 23, 2024 3

Hi @arnaud-lb!
Since we need the persistent producer we decided to implement something ourselves.
I already have something started (still WIP, only tested on PHP 7.1) here: https://github.com/awons/php-rdkafka/tree/issue-42/persistent-producer

I am not a C developer and this is my first time with PHP's extension so hopefully this is not too bad ;)
I managed to implement a persistent kafka instance. It works quite well (tested within our own integration tests using the extension) but still have one main issue - callbacks. When request ends all callbacks are cleared and kafka, obviously, segfaults when trying to call an arbitrary memory location.

I wanted to ask what do you think about the following solution: For persistent producers we create a hash table with all possible callbacks doing nothing. Those callbacks are then registered in kafka. Then on getInstance() we also need to provide new callbacks like getInstance('name', function(){}).
This way kafka always sees pointer to the same global callback and we only replace pointers within this callback.
After request those temporary pointers are nullified and when we forget to re-register them kafka will still call the global function doing nothing.

I am still not sure how to handle situation when different requests will need register different callbacks for the same instance, but I will try to figure something out.

What do you think about this idea?

from php-rdkafka.

nick-zh avatar nick-zh commented on August 23, 2024 2

Hey guys

Is there like an update, status, eta? I am very interested in this as well.

from php-rdkafka.

nick-zh avatar nick-zh commented on August 23, 2024 1

@Steveb-p @MrMoronIV i love the inputs, don't get me wrong, but i think we are getting side tracked here now. If you want to continue discussing this, i would appreciate it, that you move it to another issue, thx guys 👍

from php-rdkafka.

Steveb-p avatar Steveb-p commented on August 23, 2024 1

@Steveb-p @MrMoronIV i love the inputs, don't get me wrong, but i think we are getting side tracked here now. If you want to continue discussing this, i would appreciate it, that you move it to another issue, thx guys

Sure, we will move to phprdkafka gitter. Do you mind @MrMoronIV ?

Why do you say this? Did you have issues yourself or do you just want to be sure I'm not fixated on the first thing mentioned and should consider all available options first? You kinda make it sound like I should avoid this package all together!

I don't want to advertise it, and I know some people run into issues with it, so I don't want you to waste your time for too long if you run into some. Give it a shot and see if it's alright for you. For all I know Timesplinter (which is a contributor here) does use it, I think?

On a side note: I do like the redis approach but yeah, it means another package installed on my servers yet again.

You can use filesystem to store messages. Php-enqueue actually does have a transport which does exactly that. However, it should be rather simple to just file_put_contents($filename, json_encode($message)) and then subsequently use DirectoryIterator to iterate over directory contents (to prevent memory issues with scandir) and sent each message to Kafka.

// Offtopic off
For permanent Kafka connections, doesn't Mysqli and postgres drivers not allow something similar with permanent database connections? So it should be at least possible?

from php-rdkafka.

Steveb-p avatar Steveb-p commented on August 23, 2024 1

@dtmp just keep in mind that REST will be inherently slower than talking to a Kafka instance directly, without an added layer.

from php-rdkafka.

bennfocus avatar bennfocus commented on August 23, 2024

Is there any plan to implement this?
I think it's most important feature for PHP which running in FPM environment. Otherwise each request it have to trigger a bunch of requests to Kafka brokers which really makes it slow.

from php-rdkafka.

djibomar avatar djibomar commented on August 23, 2024

@arnaud-lb Can we have this ? How can I help ?

from php-rdkafka.

arnaud-lb avatar arnaud-lb commented on August 23, 2024

@awons: I'll take a look this week!

from php-rdkafka.

arnaud-lb avatar arnaud-lb commented on August 23, 2024

Awesome!

I like the general idea.

About callbacks: You are right, we could do it that way. Maybe we could use the opaque setting to store this data (see rd_kafka_conf_set_opaque() / rd_kafka_opaque()). C would just forward calls to the user callbacks referenced in the opaque struct. We could add methods to change these callbacks after getInstance().

About the instance table: Zend hash tables can be created persistent (see zend_hash_init()).

from php-rdkafka.

awons avatar awons commented on August 23, 2024

Totally agree with the approach to callbacks. There is just one more thing I cannot grasp. We need a way to figure out which callback needs to be called exactly. One instance can only register one callback. Hence one event can, in theory, be propagated to n different requests simultaneously. We the would need a way to register per-request callbacks somehow (maybe with request ID/thread ID?). Not sure yet how to approach this.

As for zend_hash_init. I know it can be used for persistent allocation but the issue I found is that every element in that table needs to be a zval. And php < 5 cannot store arbitrary pointers in zvals. PHP 7 can, but then we either have to keep two different implementations or drop support for php<7.
At least this is what I figured.
Maybe we could treat pointers as integers and store them like that... It is kind of a hack but maybe it could work.

from php-rdkafka.

arnaud-lb avatar arnaud-lb commented on August 23, 2024

Hence one event can, in theory, be propagated to n different requests simultaneously

Are you sharing the rdkafka instance between multiple threads ? I believe that it could become very difficult to manage. From the point of view of the user, we will receive events that are not related to the current request. From the point of view of the extension, there is a lot of work required to make sure that callbacks are called in the right thread.

It would be simpler to not share rdkafka instances between multiple threads. Sharing the instance between subsequent requests on the same thread would be enough I think (this is how it would behave on non-threaded environments, like php-fm, too).

PHP internals have a notion of per-thread globals that you can use to achieve that. This is not used in php-rdkafka currently, but you can add support for per-thread globals by applying something like this: https://gist.github.com/arnaud-lb/ed0a6501f17eee05e3f814b6ae2dccd6. Basically, the ZEND_BEGIN_MODULE_GLOBALS() declares a struct that will be thread-specific (each thread has its own instance). You can access field with the RDKAFKA_G() marco, e.g. RDKAFKA_G(some_value)=1.

As for zend_hash_init. I know it can be used for persistent allocation but the issue I found is that every element in that table needs to be a zval

Actually you can store pointers, even though it's not super obvious in PHP 5. See add_consuming_toppar() in rdkafka.c.

from php-rdkafka.

awons avatar awons commented on August 23, 2024

Sorry for late response. Was too busy with another project.

Are you sharing the rdkafka instance between multiple threads ?.

That was my initial idea - to have this feature also available in ZTS. But right now I see it is probably way too complicated and could lead to too many issues. It is probably just not worth it.
We could simply throw a RutimeException if someone tries to use this method in ZTS mode, or not even compile it there; not sure which one would be better.

Actually you can store pointers, even though it's not super obvious in PHP 5. See add_consuming_toppar() in rdkafka.c

That is a really nifty workaround :) I will refactor instances handling to use this tick.

from php-rdkafka.

awons avatar awons commented on August 23, 2024

I was just thinking about another thing. You mentioned module globals. Correct me if I am wrong, but if we are not going to implement this feature for ZTS then it makes no sense to use module globals because in NTS mode they always resolve to a simple variable.

from php-rdkafka.

arnaud-lb avatar arnaud-lb commented on August 23, 2024

We could simply throw a RutimeException if someone tries to use this method in ZTS mode, or not even compile it there; not sure which one would be better.

Actually there is no problem with using this feature in ZTS mode, as long as we use module globals. Module globals are the way to go when we need variables whose value persist across requests, but should not be shared across threads.

from php-rdkafka.

awons avatar awons commented on August 23, 2024

Actually there is no problem with using this feature in ZTS mode, as long as we use module globals. Module globals are the way to go when we need variables whose value persist across requests, but should not be shared across threads.

Did my homework. Got confused by an explanation of threaded model I found somewhere on the internet. But you are 100% right. We either have one PHP interpreter per process or one per thread. One interpreter will be responsible for handling multiple requests sequentially (either withing one process or one thread), so it makes perfect sense having this feature working in ZTS mode.
Getting back to work then :)

from php-rdkafka.

awons avatar awons commented on August 23, 2024

@arnaud-lb I think I am missing something again. You mentioned the add_consuming_toppar() method as an example of how to use raw pointers with hash tables in PHP 5. The problem I see is that methods like zend_hash_str_add_ptr and zend_hash_str_find_ptr are only available in PHP 7. PHP 5 only has zend_hash_add and zend_hash_find.

Now, I see three options here:

  1. I am missing something obvious here and this should work with PHP 5 (highly doubt it);
  2. This is a bug and should be fixed for the extension to work with PHP 5;
  3. This is not a bug and the extension is just not meant to work with PHP 5 (but looking at all the IF/ELSE statements checking for different versions - don't really think so);

Which is it then?

from php-rdkafka.

arnaud-lb avatar arnaud-lb commented on August 23, 2024

These functions are defined in php_rdkafka_priv.h if the version of PHP doesn't have then already:

static inline void *zend_hash_str_add_ptr(HashTable *ht, const char *str, size_t len, void *pData)
{
void *pDest;
zend_hash_add(ht, str, len, &pData, sizeof(pData), &pDest);
return pDest;
}

from php-rdkafka.

Bibob7 avatar Bibob7 commented on August 23, 2024

+1

from php-rdkafka.

awons avatar awons commented on August 23, 2024

Another country, another job, and too little time for after hour projects. And I don't use Kafka in my current job anymore.
But the problem here is that the kafka object that we have is not the same as the one used later by kafka. Kafka makes a copy of it and I couldn't figure out how get the correct reference. Additionally this would caused lots of changes in the extension itself. My C knowledge is unfortunately very limited :(

from php-rdkafka.

nick-zh avatar nick-zh commented on August 23, 2024

Thanks for the update @awons 🙇‍♂️

from php-rdkafka.

dawei101 avatar dawei101 commented on August 23, 2024

Hi @arnaud-lb!
Since we need the persistent producer we decided to implement something ourselves.
I already have something started (still WIP, only tested on PHP 7.1) here: https://github.com/awons/php-rdkafka/tree/issue-42/persistent-producer

I am not a C developer and this is my first time with PHP's extension so hopefully this is not too bad ;)
I managed to implement a persistent kafka instance. It works quite well (tested within our own integration tests using the extension) but still have one main issue - callbacks. When request ends all callbacks are cleared and kafka, obviously, segfaults when trying to call an arbitrary memory location.

I wanted to ask what do you think about the following solution: For persistent producers we create a hash table with all possible callbacks doing nothing. Those callbacks are then registered in kafka. Then on getInstance() we also need to provide new callbacks like getInstance('name', function(){}).
This way kafka always sees pointer to the same global callback and we only replace pointers within this callback.
After request those temporary pointers are nullified and when we forget to re-register them kafka will still call the global function doing nothing.

I am still not sure how to handle situation when different requests will need register different callbacks for the same instance, but I will try to figure something out.

What do you think about this idea?

@awons I have test it under php-fpm, It could not work.

Below is code

                  $conn = $this->config->getConn();
 34             $this->produce = \Rdkafka\Producer::getInstance($conn);
 35             $pid = getmypid();
 36             if (!$this->produce) {
 37                 Log::info("<kafka($conn)>php-process($pid):create single instance of producer");
 38                 $this->produce = new \RdKafka\Producer($this->config->getProduceConf());
 39             } else {
 40                 Log::debug("<kafka($conn)>php-process($pid):no need to create producer for exists instance");
 41             }

Below is log:

[2019-09-20 16:55:59.110786] test.INFO: #[RL][b3167be39f61e38a72a0][3][0.001/0.08] <kafka(device_status)>php-process(380):create single instance of producer
[2019-09-20 16:56:01.560371] test.INFO: #[RL][cfe51416e3e3e73953e9][3][0.031/0.10] <kafka(device_status)>php-process(381):create single instance of producer
[2019-09-20 16:56:02.310157] test.INFO: #[RL][d11170019b80e9a13ee6][3][0.001/0.11] <kafka(device_status)>php-process(382):create single instance of producer
[2019-09-20 16:56:02.911125] test.INFO: #[RL][098a32f35726af04f142][3][0.001/0.09] <kafka(device_status)>php-process(383):create single instance of producer
[2019-09-20 16:56:03.602770] test.INFO: #[RL][8521dbb62aed83ea2d46][3][0.001/0.14] <kafka(device_status)>php-process(384):create single instance of producer
[2019-09-20 16:56:04.204491] test.INFO: #[RL][eaf5cdbdd848addf1185][3][0.001/0.22] <kafka(device_status)>php-process(397):create single instance of producer
[2019-09-20 16:56:04.611187] test.INFO: #[RL][ba0b84f5d65a283a9181][3][0.001/0.13] <kafka(device_status)>php-process(380):create single instance of producer
[2019-09-20 16:56:04.920222] test.INFO: #[RL][cfc30d377a7d1e404711][3][0.001/0.07] <kafka(device_status)>php-process(381):create single instance of producer
[2019-09-20 16:56:05.409298] test.INFO: #[RL][5f58979bf1646cf45273][3][0.001/0.11] <kafka(device_status)>php-process(382):create single instance of producer
[2019-09-20 16:56:05.917587] test.INFO: #[RL][0d5603654ec3dde23c0d][3][0.001/0.09] <kafka(device_status)>php-process(383):create single instance of producer
[2019-09-20 16:56:06.511175] test.INFO: #[RL][c8141141725e66436a66][3][0.001/0.19] <kafka(device_status)>php-process(384):create single instance of producer
[2019-09-20 16:56:06.920342] test.INFO: #[RL][2a8f4ef4a096490bf70b][3][0.001/0.15] <kafka(device_status)>php-process(397):create single instance of producer
[2019-09-20 16:56:07.306934] test.INFO: #[RL][875b552fe5b2ec7e4491][3][0.001/0.13] <kafka(device_status)>php-process(380):create single instance of producer
[2019-09-20 16:56:07.605764] test.INFO: #[RL][6149c1deb2d07e4dbc2a][3][0.001/0.08] <kafka(device_status)>php-process(381):create single instance of producer
[2019-09-20 16:56:10.114968] test.INFO: #[RL][62347f12aa59ee944183][3][0.001/0.11] <kafka(device_status)>php-process(382):create single instance of producer
[2019-09-20 16:56:10.804955] test.INFO: #[RL][9cda48bb918466d49bf8][3][0.001/0.22] <kafka(device_status)>php-process(383):create single instance of producer
[2019-09-20 16:56:11.321044] test.INFO: #[RL][ac6473b208cc20518860][3][0.001/0.11] <kafka(device_status)>php-process(384):create single instance of producer
[2019-09-20 16:56:11.811838] test.INFO: #[RL][3b6acd3425719aeddad4][3][0.001/0.10] <kafka(device_status)>php-process(397):create single instance of producer
[2019-09-20 16:56:12.305731] test.INFO: #[RL][073a3af1b0596a5158ce][3][0.001/0.09] <kafka(device_status)>php-process(380):create single instance of producer
[2019-09-20 16:56:13.587820] test.INFO: #[RL][e1883b9d64def0a0bc95][3][0.001/0.20] <kafka(device_status)>php-process(381):create single instance of producer
[2019-09-20 16:56:14.628406] test.INFO: #[RL][a84e795007746f9137d8][3][0.001/0.07] <kafka(device_status)>php-process(382):create single instance of producer

from php-rdkafka.

nick-zh avatar nick-zh commented on August 23, 2024

@dawei101 as @awons stated before, he is not working on this anymore. There is no current progress on this. I would say this is on hold for now and i am not sure if this will move forward in the next few months unless somebody picks up where @awons left off

from php-rdkafka.

dawei101 avatar dawei101 commented on August 23, 2024

@dawei101 as @awons stated before, he is not working on this anymore. There is no current progress on this. I would say this is on hold for now and i am not sure if this will move forward in the next few months unless somebody picks up where @awons left off

Ok, thanks

from php-rdkafka.

nick-zh avatar nick-zh commented on August 23, 2024

I can have a look at this next, but i will first take care of introducing the Admin API, so my rough estimate would be, maybe i can find time in november, just as fyi

from php-rdkafka.

MrMoronIV avatar MrMoronIV commented on August 23, 2024

Excuse me if I'm wrong, but what if there is a php background script running as a daemon accepting new messages (maybe via socket/ api / or checking a mysql table for queued messages) and forward them to kafka?

This script would then just open the connection with kafka once and produce messages when needed.

It's probably not a good idea to shoot messages to kafka during a page load right?

from php-rdkafka.

Steveb-p avatar Steveb-p commented on August 23, 2024

@MrMoronIV you're partially right, currently it's not exactly 100% safe to send messages to Kafka from PHP process that handles web requests, like PHP-FPM, Apache PHP mod or PHP SAPI - but the reason is not that it's slow.

Usually what phprdkafka does when you're producing a message is:

  1. Send a message to background thread (separate from the one handling the web request, similar in concept to your "listening PHP process"). If necessary this thread is started.
  2. PHP then returns to normal execution until you start polling (to check the result of producing a message - this basically means thread reports which messages we're delivered successfully, and call any registered callbacks). Note that during this time the message is actually being sent.
  3. When PHP process reaches shutdown at the end, thread will keep retrying to send messages until it reaches (configurable) timeouts. Due to how threads work this thread will keep the process alive if there are any issues with sending.

Point 3. is what will cause issues if you're not careful - a dead Kafka broker or connectivity issues will prevent thread from sending a message and will cause PHP processes to live longer than expected. Since usually your webserver is configured to allow only a specific number of PHP processes to handle incoming requests this eventually causes those threads to starve webserver from resources.

Phprdkafka 4.0 will no longer contain the code handling point 3: thread will terminate as soon as PHP process reaches shutdown. User (programmer) will be responsible to handle delivery checks on their own.

In case of my apps I'm doing exactly what you described - I'm keeping a background process that all messages are delegated into.

from php-rdkafka.

MrMoronIV avatar MrMoronIV commented on August 23, 2024

In case of my apps I'm doing exactly what you described - I'm keeping a background process that all messages are delegated into.

Are you willing to share this code? I'm so confused as to why there is no popular standalone daemon that takes messages via a socket and delivers them in the order they were received (and takes care of retries in case of outage)

from php-rdkafka.

Steveb-p avatar Steveb-p commented on August 23, 2024

Are you willing to share this code? I'm so confused as to why there is no popular standalone daemon that takes messages via a socket and delivers them in the order they were received (and takes care of retries in case of outage)

@MrMoronIV It's nothing too complicated really.

What I've meant by delegating is I'm saving them to a "temporary" storage (in my case Redis, but it can be anything). Background process simply reads that storage and pushes messages into Kafka.

If you think about it this helps alleviate a number of issues. Using a permanently running process that listens on a socket would be fine, but in case of any failure it would mean potentially losing messages (or storing them somewhere, but then... what's the point? :P). Introducing a temporary storage prevents that and does not introduce any complicated concepts.

I'm using php-enqueue with two transport - redis and kafka. However, I'm not willing to recommend it because you can run into issues with it. You can give it a go and see if it works for you. I also have to mention that I'm helping with maintenance of Kafka package there so I can be biased in my opinion. I'm only mentioning this because you've asked what I'm using :)

from php-rdkafka.

MrMoronIV avatar MrMoronIV commented on August 23, 2024

you can run into issues with it. You can give it a go and see if it works for you.

Why do you say this? Did you have issues yourself or do you just want to be sure I'm not fixated on the first thing mentioned and should consider all available options first? You kinda make it sound like I should avoid this package all together!

On a side note: I do like the redis approach but yeah, it means another package installed on my servers yet again.

from php-rdkafka.

nick-zh avatar nick-zh commented on August 23, 2024

@Steveb-p it is indeed possible, but from what i got, it is still not that easy. I don't have a lot of time right now and since we use swoole in our project, i don't have an urgent need for this feature anymore, but i will still try to give it a go after i resolved issue 215

from php-rdkafka.

nikolaposa avatar nikolaposa commented on August 23, 2024

Sorry for awakening this issue, but I have encountered similar problems with Confluent Cloud (cloud-native service for Kafka) and hitting its 'Open Connection Attempts' limit.

I have two questions:

  1. what people use nowadays as an alternative to overcome this problem, since persistent connections will obviously never be implemented? I read that some suggest an intermediate step with storing messages in Redis or filesystem. I also saw the Swoole suggestion, but I don't know the details.
  2. is there an option/configuration to cache metadata information, either at php-rdkafka or librdkafka level, so that this request doesn't need to be issued on each connect?

Thanks!

from php-rdkafka.

Steveb-p avatar Steveb-p commented on August 23, 2024
  1. what people use nowadays as an alternative to overcome this problem, since persistent connections will obviously never be implemented? I read that some suggest an intermediate step with storing messages in Redis or filesystem. I also saw the Swoole suggestion, but I don't know the details.

It's a simple concept really.

Instead of pushing messages to Kafka in the original process, you store them anywhere else (be it database, redis, filesystem) for a different, background process to pick up.

This has the added benefit of relieving original process of the need to wait for any confirmation from Kafka - receiving a confirmation from the storage is usually good enough that it will be eventually sent (with note that it's not actually Kafka responding).

After all, it makes sense, since you're not supposed to consume messages in short-lived processes anyway. So kinda the same applies to producers.

Swoole is different in that it's a event-loop based approach. Any event-loop based solution is able to re-use Kafka connection (since you're operating in confines of a process that handles multiple connections, not just one as it usually happens with PHP-FPM (simplified, but in general true)).

from php-rdkafka.

nikolaposa avatar nikolaposa commented on August 23, 2024

Thanks for the answer @Steveb-p, very useful. 👍 Yeah, I'm familiar with the first concept, I also use Enqueue on top of this extension so it should be easy to switch the transport. What I'm concerned about with that idea is increased latency of messages due to the intermediate step.

from php-rdkafka.

dtmp avatar dtmp commented on August 23, 2024

look at https://github.com/confluentinc/kafka-rest

from php-rdkafka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.