Giter VIP home page Giter VIP logo

bunny's Introduction

BunnyPHP

Build Status Downloads this Month Latest stable

Performant pure-PHP AMQP (RabbitMQ) sync/async (ReactPHP) library

Requirements

BunnyPHP requires PHP 7.1 and newer.

Installation

Add as Composer dependency:

$ composer require bunny/bunny:@dev

Comparison

You might ask if there isn't a library/extension to connect to AMQP broker (e.g. RabbitMQ) already. Yes, there are multiple options:

Why should you want to choose BunnyPHP instead?

  • You want nice idiomatic PHP API to work with (I'm looking at you, php-amqplib). BunnyPHP interface follows PHP's common coding standards and naming conventions. See tutorial.

  • You can't (don't want to) install PECL extension that has latest stable version in 2014. BunnyPHP isn't as such marked as stable yet. But it is already being used in production.

  • You have both classic CLI/FPM and ReactPHP applications and need to connect to RabbitMQ. BunnyPHP comes with both synchronous and asynchronous clients with same PHP-idiomatic interface. Async client uses react/promise.

Apart from that BunnyPHP is more performant than main competing library, php-amqplib. See benchmark/ directory and php-amqplib's benchmark/.

Benchmarks were run as:

$ php benchmark/producer.php N & php benchmark/consumer.php
Library N (# messages) Produce sec Produce msg/sec Consume sec Consume msg/sec
php-amqplib 100 0.0131 7633 0.0446 2242
bunnyphp 100 0.0128 7812 0.0488 2049
bunnyphp +/- +2.3% -8.6%
php-amqplib 1000 0.1218 8210 0.4801 2082
bunnyphp 1000 0.1042 9596 0.2919 3425
bunnyphp +/- +17% +64%
php-amqplib 10000 1.1075 9029 5.1824 1929
bunnyphp 10000 0.9078 11015 2.9058 3441
bunnyphp +/- +22% +78%
php-amqplib 100000 20.7005 4830 69.0360 1448
bunnyphp 100000 9.7891 10215 35.7305 2789
bunnyphp +/- +111% +92%

Tutorial

Connecting

When instantiating the BunnyPHP Client accepts an array with connection options:

$connection = [
    'host'      => 'HOSTNAME',
    'vhost'     => 'VHOST',    // The default vhost is /
    'user'      => 'USERNAME', // The default user is guest
    'password'  => 'PASSWORD', // The default password is guest
];

$bunny = new Client($connection);
$bunny->connect();

Connecting with SSL/TLS

Options for SSL-connections should be specified as array ssl:

$connection = [
    'host'      => 'HOSTNAME',
    'vhost'     => 'VHOST',    // The default vhost is /
    'user'      => 'USERNAME', // The default user is guest
    'password'  => 'PASSWORD', // The default password is guest
    'ssl'       => [
        'cafile'      => 'ca.pem',
        'local_cert'  => 'client.cert',
        'local_pk'    => 'client.key',
    ],
];

$bunny = new Client($connection);
$bunny->connect();

For options description - please see SSL context options.

Note: invalid SSL configuration will cause connection failure.

See also common configuration variants.

Providing client properties

Client Connections can present their capabilities to a server by presenting an optional client_properties table when establishing a connection.

For example, a connection name may be provided by setting the connection_name property:

$connection = [
    'host'              => 'HOSTNAME',
    'vhost'             => 'VHOST',    // The default vhost is /
    'user'              => 'USERNAME', // The default user is guest
    'password'          => 'PASSWORD', // The default password is guest
    'client_properties' => [
        'connection_name' => 'My connection',
    ],
];

$bunny = new Client($connection);
$bunny->connect();

Publish a message

Now that we have a connection with the server we need to create a channel and declare a queue to communicate over before we can publish a message, or subscribe to a queue for that matter.

$channel = $bunny->channel();
$channel->queueDeclare('queue_name'); // Queue name

Publishing a message on a virtual host with quorum queues as a default

From RabbitMQ 4 queues will be standard defined as Quorum queues, those are by default durable, in order to connect to them you should use the queue declare method as follows. In the current version of RabbitMQ 3.11.15 this is already supported, if the virtual host is configured to have a default type of Quorum.

$channel = $bunny->channel();
$channel->queueDeclare('queue_name', false, true); // Queue name

With a communication channel set up, we can now publish a message to the queue:

$channel->publish(
    $message,    // The message you're publishing as a string
    [],          // Any headers you want to add to the message
    '',          // Exchange name
    'queue_name' // Routing key, in this example the queue's name
);

Subscribing to a queue

Subscribing to a queue can be done in two ways. The first way will run indefinitely:

$channel->run(
    function (Message $message, Channel $channel, Client $bunny) {
        $success = handleMessage($message); // Handle your message here

        if ($success) {
            $channel->ack($message); // Acknowledge message
            return;
        }

        $channel->nack($message); // Mark message fail, message will be redelivered
    },
    'queue_name'
);

The other way lets you run the client for a specific amount of time consuming the queue before it stops:

$channel->consume(
    function (Message $message, Channel $channel, Client $client){
        $channel->ack($message); // Acknowledge message
    },
    'queue_name'
);
$bunny->run(12); // Client runs for 12 seconds and then stops

Pop a single message from a queue

$message = $channel->get('queue_name');

// Handle message

$channel->ack($message); // Acknowledge message

Prefetch count

A way to control how many messages are prefetched by BunnyPHP when consuming a queue is by using the channel's QOS method. In the example below only 5 messages will be prefetched. Combined with acknowledging messages this turns into an effective flow control for your applications, especially asynchronous applications. No new messages will be fetched unless one has been acknowledged.

$channel->qos(
    0, // Prefetch size
    5  // Prefetch count
);

Asynchronous usage

Node: Up to version v0.5.x Bunny had two different clients, one sync, and one async. As of v0.6 both clients have been folder into one: An async client with a sync API.

AMQP interop

There is amqp interop compatible wrapper(s) for the bunny library.

Testing

Create client/server SSL certificates by running:

$ cd test/ssl && make all && cd -

You need access to a RabbitMQ instance in order to run the test suite. The easiest way is to use the provided Docker Compose setup to create an isolated environment, including a RabbitMQ container, to run the test suite in.

Docker Compose

  • Use Docker Compose to create a network with a RabbitMQ container and a PHP container to run the tests in. The project directory will be mounted into the PHP container.

    $ docker-compose up -d
    

    To test against different SSL configurations (as in CI builds), you can set environment variable CONFIG_NAME=rabbitmq.ssl.verify_none before running docker-compose up.

  • Optionally use docker ps to display the running containers.

    $ docker ps --filter name=bunny
    [...] bunny_rabbit_node_1_1
    [...] bunny_bunny_1
    
  • Enter the PHP container.

    $ docker exec -it bunny_bunny_1 bash
    
  • Within the container, run:

    $ vendor/bin/phpunit
    

Contributing

  • Large part of the PHP code (almost everything in Bunny\Protocol namespace) is generated from spec in file spec/amqp-rabbitmq-0.9.1.json. Look for DO NOT EDIT! in doc comments.

    To change generated files change spec/generate.php and run:

    $ php ./spec/generate.php

Broker compatibility

Works well with RabbitMQ

Does not work with ActiveMQ because it requires AMQP 1.0 which is a completely different protocol (Bunny is implementing AMQP 0.9.1)

License

BunnyPHP is licensed under MIT license. See LICENSE file.

bunny's People

Contributors

allenjb avatar besir avatar ceeram avatar donatello-za avatar edudobay avatar enumag avatar fritz-gerneth avatar jakubkulhan avatar jeroenvdgulik avatar jeromegamez avatar jsor avatar kuai6 avatar kubasimon avatar kukulich avatar makasim avatar mbonneau avatar mente avatar mermshaus avatar mtijn avatar ondrejmirtes avatar readmecritic avatar realflowcontrol avatar roman-huliak avatar rtm-ctrlz avatar samnela avatar simpod avatar standa avatar tatikoma avatar vincentlanglet avatar wyrihaximus avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

bunny's Issues

Calling `Channel:txSelect` twice throws an error

If you call Channel:txSelect more than once it throws an error, because the channel is not in regular mode.

The problem is, there is no way to check what mode the channel is in (the property is protected), so I can't actually check if I need to call txSelect again.

It seems like there should be a Channel::getMode method so you can do:

if ($channel->getMode() !== ChannelModeEnum::TRANSACTIONAL) {
   $channel->txSelect();
}

Alternatively txSelect could be idempotent but I don't think that will work because the method returns a MethodTxSelectOkFrame.

Message priority

Hello
Ask me please, how I can use message priority added in RabbitMq 3.5 ???

Images from rabbitmq.com

I'm not sure you are aware of this but the images you've taken from rabbitmq.com are not open source (at least yet) and the footer of the site says

Copyright © 2015 Pivotal Software, Inc. All rights reserved

So unless until the RabbitMQ team gets a permission from legal to open source the site, it may be a good idea to replace them with your own images. We certainly don't want to cause anybody any trouble but you haven't even asked if it's OK to use them for your project.

Hangs on fail-connect

When async client try to connect to RabbitMQ which down (or not exists) its just hangs.
No errors, no exceptions...no reaction.

Reactphp loop stil works, but Bunny hangs.

Is it my problem only or mass bug?

As a temp solve I add timer to loop:

$loop->addTimer(10, function () {
    if (!$this->isReady()) {
        throw new Exception('Client is not ready yet');
    }
});

Question

What is the difference of MethodQueueDeclareOkFrame and MethodQueueDeclareFrame

[question] broken pipe or closed connection with large messages

Heey Jakub,

I really like your bunny client but when trying to send large json files I always get an exception: "broken pipe or closed connection"

This always happens at: https://github.com/jakubkulhan/bunny/blob/master/src/Bunny/AbstractClient.php
At the write method.
(sorry I cant see linenumbers on my iPhone).

Do you know if this is a known issue or do you have an idea what might trigger this exception?

Already thanks in advance!

Kind regards Leon

isConnected reports true while it is not

If it is not a bug then maybe You can show me right path how to handle this. I have a long-lived consumer that parses incoming messages (text documents) and then publisher sends a reply to another queue. Now, if heartbeat is set to 10s, producer connection is killed after that time but isConnected() method always reports true.

public function reconnect(): void
    {
        var_dump($this->client->isConnected()); // always true even if connection was closed
        $this->client->disconnect();
        var_dump($this->client->isConnected()); // this is also true :)
        if (!$this->client->isConnected()) { // never executed
            $this->client->connect();
        }
    }

After that time I get Broken pipe or closed connection exception.

As a workaround, I set heartbeat option to 5 minutes, then changed $channel->run to $bunny->run(5 * 60) and the supervisord daemon starts the process again.

Unhandled method frame Bunny\Protocol\MethodConnectionCloseFrame

I faced with an Exception sometimes

'Bunny\Exception\ClientException' with message 'Unhandled method frame Bunny\Protocol\MethodConnectionCloseFrame.' in ~/.composer/vendor/bunny/bunny/src/Bunny/AbstractClient.php:410

What does it mean and how can I avoid it?

How consumer can publish message to another queue?

When Consumer publish message to another queue - get Exception:

Exception 'Bunny\Exception\ChannelException' with message 'Unhandled method frame Bunny\Protocol\MethodChannelCloseFrame.' in /var/www/payments/vendor/bunny/bunny/src/Bunny/Channel.php:613

Throw uncaught Exception

I reopen issue as you close issue #36

I know , it is only a test , I just want to create an error case

I want to know , why in this error case I cannot throw an uncaught exception?
I have an uncaught exception handler that log any error in my system and do other jobs

My question it Why I cannot throw an uncaught Exception?

[question] How to handle reconnecting on RabbitMQ server restart

I have a consumer and I'm wondering how best to recover when RabbitMQ goes away.

Currently I get a Bunny\Exception\ClientException thrown which kills my React event loop:

Fatal error: Uncaught Bunny\Exception\ClientException: Broken pipe or closed connection. in .../vendor/bunny/bunny/src/Bunny/AbstractClient.php on line 280

Bunny\Exception\ClientException: Broken pipe or closed connection. in .../vendor/bunny/bunny/src/Bunny/AbstractClient.php on line 280

Call Stack:
    0.0001     354160   1. ...
    0.0015     587600   2. ...
    0.0090    2108960   3. React\EventLoop\StreamSelectLoop->run() ...
   41.4237    2972512   4. React\EventLoop\StreamSelectLoop->waitForStreamActivity() .../vendor/react/event-loop/src/StreamSelectLoop.php:201
   41.4698    2972992   5. call_user_func:{.../vendor/react/event-loop/src/StreamSelectLoop.php:232}() .../vendor/react/event-loop/src/StreamSelectLoop.php:232
   41.4698    2972992   6. Bunny\Async\Client->onDataAvailable() .../vendor/react/event-loop/src/StreamSelectLoop.php:232
   41.4698    2972992   7. Bunny\AbstractClient->read() .../vendor/bunny/bunny/src/Bunny/Async/Client.php:292

I can handle the ClientException at the top-level of my application with:

while (1) {
    try {
        $loop-run();
    } catch (ClientException $ex) {
        // reconnect logic goes here
    }
}

That seems a little heavy-handed to me. Is this the right thing to do or is there a better way?

Throw uncaught Exception

I ask this question reactphp/promise#63
In last reply

The code is probably wrapped into another promise which catches the exception. You must ensure that you call done() on any promise which you don't return for comsumption, eg. from a function. See done() vs. then().

I what to know where is the wrapper promise?

$request = my request object
$this->_channel->queueDeclare($request->getQueueName(), false, $request::PERSISTENT)->then(function () use ($request) {
                $this->publish($request);
            }, function (ClientException $e) {
                throw new \Exception();

            })->done();

Why react/promise/src/FulfilledPromise.php:26 catch my exception?

`$channel->queueDeclare($queueName);` never resolves

This may be a bug or simply misuse by me (or misunderstanding).

I've taken the example code in consumer_async.php and built following script:

$clientConfig  = [
    \React\EventLoop\Factory::create(),
    ['host' => 'localhost', 'user' => 'guest', 'password' => 'guest'],
];

(new BunnyClient(...$clientConfig))
    ->connect() // works
    ->then(function (BunnyClient $client) {
        return $client->channel(); // works
    })
    ->then(function (Channel $channel) {
        return \React\Promise\all([
            $channel,
            $channel->queueDeclare('queuename') // hangs
        ]);
    })
    ->then(function (array $channels) use ($app) {
        // never reached
        $channels[0]->consume(function (Message $msg) use ($app) {
            echo 'Message received!' . PHP_EOL;
        }, 'queuename');
    });

The call $channel->queueDeclare('queuename') hangs, and the returned promise is never fulfilled nor rejected.

A few notes on (in my opinion) expected behavior:

  • removing the queueDeclare code makes the code work, but only if the channel was already declared by another process
  • if the channel doesn't exist, the app silently hangs without receiving any messages at all

Mandatory not indicated

I'm setting the mandatory flag with a message that is not getting routed but not receiving an indication of the message was delivered to a queue. Here is an example where the message certainly fails to be delivered (result was the same with async client):

<?php
    $c = (new \Bunny\Client)->connect();
    $ch = $c->channel();
    $result = $ch->publish('An un-routed message', [], 'amq.topic', 'nopeKey', true);

    var_dump($result);

bool(true);

screen shot 2016-05-04 at 10 33 56 am

Connect using SSL protocol

Hello. Is it possible to connect to RabbitMQ using SSL connection? I'm getting error message "RabbitMQ: Broken pipe or closed connection.".

Mixture of properties and headers

Note: I have omitted some output fields for the sake of readability.

It seems that when Bunny consumes messages with custom headers that were not published by Bunny the properties and headers fields are flattened.

Given the screenshot:
screen shot 2016-05-26 at 3 41 37 pm
Bunny's output is:

Bunny\Message Object
(
    [routingKey] => props
    [headers] => Array
        (
            [user_id] => 96ce7157f70de72e08b0a8c78300bf1e
            [delivery-mode] => 1
            [user-id] => admin
        )

    [content] => Wat?
)

But if Bunny publishes the same message like so:

$ch->publish('Hello World!', [
    'user-id' => 'admin',
    'headers' => [
      'user_id' => '96ce7157f70de72e08b0a8c78300bf1e'
    ]
  ], '', 'props');

The output is as expected:

Bunny\Message Object
(
    [routingKey] => props
    [headers] => Array
        (
            [headers] => Array
                (
                    [user_id] => 96ce7157f70de72e08b0a8c78300bf1e
                )

            [user-id] => admin
        )
    [content] => Hello World!
)

Here is the output from a Node AMQP lib which looks the same when consuming messages published by Bunny, node.amqp, and RabbitMQ web admin:

 { fields: 
   { routingKey: 'props',
     messageCount: 0 },
  properties: 
   { headers: { user_id: '96ce7157f70de72e08b0a8c78300bf1e' },
     userId: 'admin' },
  content: <Buffer 57 61 74 3f> }

And the output from RabbitMQ web admin
screen shot 2016-05-26 at 3 48 16 pm

What's wrong with the worker-async example?

I used this example https://github.com/jakubkulhan/bunny/blob/master/examples/worker-async.php to test async worker. I have these results:

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 1
WU 0
WU 1
WU 2
 [x] Done 1
ACK :: 1
 [x] Received 2
WU 0
WU 1
WU 2
 [x] Done 2
ACK :: 2
 [x] Received 3
WU 0
WU 1
WU 2
 [x] Done 3
ACK :: 3 

Why it works consistently? Am I wrong that the expected behavior should be something like that:

 [x] Received 1
WU 0
[x] Received 2
WU 1
WU 0
WU 2
WU 1
 [x] Done 1
WU 2
 [x] Done 2
ACK :: 1
ACK :: 2
...

?

Please rename your project

I have to be that guy but there has been a RabbitMQ client named Bunny since March 2009. Please rename your project as it will mess up search results both the users of the existing client and your own future users.

Connection failure

Hi,
we have running RabbitMQ on shared domain but on unique port. (vyv.domain.cz:15672)
Bunny could'n reach Rabbit there, but if we put Rabbit on unique domain (rabbit.vyv.domain.cz) there is no problem with connection.
Is it possible, that bunny have problem with connection on shared domain?
Thans

get() should return a Message instance or NULL

Right now get() returns a MethodBasicGetOkFrame instance (or MethodBasicGetEmptyFrame instance after #14). From the developers perspective that is way too lowlevel. When I call get(), I expect to get a Message instance (or NULL when there is no message in the queue). Just like when I register a consumer, I expect the callback to get a Message instance and not a Frame instance.

Moreover the Frame returned by get() does not contain the payload which is probably caused by this.

Being very new to Bunny I have no idea why the payload is not being set. And even if it was, I still don't know how to process it to get the message contents. I can provide a PR but I need some advice :)

.git folder

I dont want ignore vendor folder in my project

When I install bunny , as it receive .git folder , there is no bunny file in my repository when I want to commit
It is possible remove .git folder when I install it with composer?

Automatic reconnect

  • Client option to allow reconnect (+ reconnect timeout)
  • open all previously opened channels
  • start all previously started consumers
    • what about anonymous queues?
  • return callbacks
  • confirm callbacks
  • MAYBE: declare all previously declared queues
    • only anonymous queues?
  • MAYBE: declare all previously declared exchanges

Bunny isn't handling unexpected disconnections properly

When bunny is disconnected from RabbitMQ an exception is being thrown when attempting to flush the write buffer.

Bunny\Exception\ClientException: Broken pipe or closed connection.

The issue is that that exception is being thrown via the __destruct method, which always creates a fatal error in PHP.

Attempting to throw an exception from a destructor (called in the time of script 
termination) causes a fatal error.

http://php.net/manual/en/language.oop5.decon.php#language.oop5.decon.destructor

Reference #26

Stop consuming and return result

Hello.
i have two queues. One created dynamically data-1 and second persistent dataQueue.

dataQueue is created and managed by gameeapp/nette-rabbitmq.
When i start consuming messages from dataQueue, it will create new queue via $queueService->queueDeclare(...) (code bellow) and publish messages to data-1 queue.
After that it consumes messages from data-1 with $queueService->consumeDataSourceQueue(...). At this point everything works.

Problem is, i need to stop consuming data-1 when queue is empty (at last message i know its empty), delete it (delete can be managed another way) and and return some result to dataQueue.

Method consumeDataSourceQueue is called at dataQueue consumer.
With method call $queueService->consumeDataSourceQueue(...) bellow getting this error (after var_dump('finished'); line):

In Channel.php line 623:
                                                                  
  Unhandled method frame Bunny\Protocol\MethodChannelCloseFrame.  

Thank you!

<?php


namespace ReportModule\Service\QueueService;

use Bunny\Channel;
use Bunny\Client;
use Bunny\Message;
use ReportModule\RabbitMq\Queue\DataQueueProcessQueue;
use ReportModule\Service\DataSourceService\DataSourceService;

/**
 * Class QueueService
 * @package ReportModule\Service\QueueService
 */
class QueueService
{

    /**
     * @var array
     */
    protected $connection;

    /**
     * @var Client
     */
    protected $bunny;

    /**
     * @var Channel
     */
    protected $channel;

    /**
     * @var DataSourceService
     */
    private $dataSourceService;

    /**
     * QueueService constructor.
     * @param array             $connection
     * @param DataSourceService $dataSourceService
     * @throws \Exception
     */
    public function __construct(array $connection, DataSourceService $dataSourceService)
    {
        $this->dataSourceService = $dataSourceService;

        $this->connection = $connection;
        $this->bunny = (new Client(
            [
                "host" => $this->connection['host'],
                "port" => $this->connection['port'],
                "vhost" => empty($this->connection['vhost']) ? "/" : $this->connection['vhost'],
                "user" => $this->connection['user'],
                "password" => $this->connection['password'],
            ]
        ))->connect();
        $this->channel = $this->bunny->channel();
        $this->channel->qos(0,1);
    }

    /**
     *
     */
    public function __destruct()
    {
        $this->bunny->disconnect();
    }

    /**
     * @param string $queue_name
     * @return bool|\Bunny\Protocol\MethodQueueDeclareOkFrame|\React\Promise\PromiseInterface
     */
    public function queueDeclare(string $queue_name)
    {
        $this->channel->exchangeDeclare('app.data', 'direct', false, true, false);
        $queue = $this->channel->queueDeclare($queue_name, false, true, false, false);
        $this->channel->queueBind($queue_name, 'app.data', $queue_name);

        return $queue;
    }

    /**
     * @param string $queue_name
     * @param array  $message
     * @throws \Exception
     */
    public function publish(string $queue_name, array $message): void
    {
        $this->channel->publish(
            json_encode($message),
            [
                'delivery-mode' => 2
            ],
            'app.data',
            $queue_name,
            $queue_name
        );
    }

    /**
     * @return Channel|\React\Promise\PromiseInterface
     */
    public function getChannel()
    {
        return $this->channel;
    }

    /**
     * @param string $queue_name
     */
    public function consumeDataSourceQueue(string $queue_name)
    {
        $queue = $this->queueDeclare($queue_name);
        $this->channel->run(
            function (Message $message, Channel $channel, Client $client) use ($queue_name) {

                $message_data = json_decode($message->content, true);
                $id_log_action_detail = $message_data[DataQueueProcessQueue::LOG_ACTION_DETAIL];
                $id_data_source = $message_data[DataQueueProcessQueue::DATA_SOURCE];
                $id_report_entity = $message_data[DataQueueProcessQueue::REPORT_ENTITY];
                $id_report_part = $message_data[DataQueueProcessQueue::REPORT_PART];
                $id_log_action = $message_data[DataQueueProcessQueue::LOG_ACTION];

                $result = $this->dataSourceService->saveDataFromDataSource(
                    $id_report_entity, $id_report_part, $id_data_source, $id_log_action, $id_log_action_detail
                );

                if ($result['detail_success'] === true) {
                    $channel->ack($message);
                } else {
                    $channel->nack($message, false, false);
                }

                if ($result['batch_finished'] === true) {
                    var_dump('finished');
                    /* TODO close connection and accept message (delete it from queue) */
                }
            },
            $queue->queue,
            '',
            false,
            true
        );
    }
}

Invalid Exception when payload empty

Putting aside why message payload is empty, this occured and though I'm not precisely sure whether it's even bunny's mistake here goes:
If a message with empty payload arrives it's treated as MethodBasicDeliverFrame. e.g.

object(Bunny\Protocol\MethodBasicDeliverFrame)#555 (11) {
  ["consumerTag"]=>
  string(31) "amq.ctag-0wiQ0mvNiMhKkINcA079XQ"
  ["deliveryTag"]=>
  int(2)
  ["redelivered"]=>
  bool(false)
  ["exchange"]=>
  string(6) "action"
  ["routingKey"]=>
  string(12) "action.#####"
  ["classId"]=>
  int(60)
  ["methodId"]=>
  int(60)
  ["type"]=>
  int(1)
  ["channel"]=>
  int(1)
  ["payloadSize"]=>
  int(65)
  ["payload"]=>
  NULL
}

Since after receiving Header frame, channel is in awaiting body state, the frame is treated as unexpected and exception is thrown at https://github.com/jakubkulhan/bunny/blob/master/src/Bunny/Channel.php#L573

Any ideas why this might occur and how this should be resolved?

Env:

  • Debian 8.3 Jessie
  • PHP 5.6.17
  • RabbitMQ 3.3.5

Tutorial plagiarism

Your README has the tutorials from rabbitmq.com with slight edits. The footer on the site says

Copyright © 2015 Pivotal Software, Inc. All rights reserved

Would you mind writing your own tutorials? You haven't even asked the team if it's OK to use their work for your project.

Builder Objects

Was wondering if you would be open to PR's to add some Object Oriented sugar to make Bunny easier to read.

Here's an example:

$channel = $bunny->channel();

$queue = (new QueueBuilder)
            ->setName('my_queue')
            ->setDurable(true)
            ->setExchange( 
                (new ExchangeBuilder())
                    ->setName('dlx')
                    ->setType('direct')
                    ->setAutoDelete(true)
                    ->build($channel)
            )
            ->build($channel);

$queue->publish('I am a teapot'); //Just send it to my_queue using the dlx Exchange

Channel does not handle server-initiated closes.

Channel::onFrameReceived() has no path that handles an inbound MethodChannelCloseFrame.

This means that if the server closes a channel, we get the generic exception message Message: Unhandled method frame Bunny\Protocol\MethodChannelCloseFrame., thrown from https://github.com/jakubkulhan/bunny/blob/master/src/Bunny/Channel.php#L623.

It would be great if the exception thrown here included the AMQP error information from the close frame, such as the error code constant and the error message. I'm not sure if it's appropriate to add this to ChannelException, or whether a new exception type should be added.

Aside from providing more information to the developer I'd like to add that there are programatic use cases for having access to the error code. For example, being able to check if a resource-locked code is returned when attempting to start an exclusive consumer.

I'm not very familiar with Bunny yet, so please forgive me if I've missed something here. I can put together a PR if you could provide some direction as to what you think should be changed :)

/cc @act28

What is Async\Client?

Hi, I'd like to ask what is Async\Client in README.md (new Async\Client($connection)->connect()->then(function (Client $client) {

Thanks!

TCP_NODELAY landed in php 7.1

Being curious i've written a benchmark comparing php-amqplib vs amqp extension vs bunny.

The winner was amqp extension with socket connection being a bit behind, then bunny, then stream connection. Apparently such amazing results of socket connection in php-amqplib were due to using TCP_NODELAY. Digging further into stream connection looks like the same has landed for stream connections in 7.1.

Would be nice if you could add TCP_NODELAY to greatly improve performance of 1st message send/consume

Async best practice question

Hi!

We have an async socket server written in ReactPHP. In some cases we would like to forward the processed packages to an AMQP server. Can you tell me what is the best practice to publish a message in this case? Currently we build up an async connection when the server starts, and in the onConnection event handler we connect to a channel, publish a message and then we close the channel. I don't think this is the proper way...

  $loop = React\EventLoop\Factory::create();
  $socket = new React\Socket\Server('127.0.0.1:9999', $loop);
  
  $amqp_options = [
    'host' => 'localhost',
    'vhost' => 'x',
    'user' => 'x',
    'password' => 'xx',
  ];
  
  $amqp_async_client = new Bunny\Async\Client($loop, $amqp_options);
  $amqp_async_client->connect()->then(function (Bunny\Async\Client $client) use (&$socket) {
    
    $socket->on('connection', function (React\Socket\ConnectionInterface $connection) use ($client) {
      $connection->write("Hello " . $connection->getRemoteAddress() . "!\n");
      $connection->on('data', function ($data) use ($connection, $client) {
        $connection->write("You said: " . strtoupper($data) . "\n");
        $client->channel()->then(function (Bunny\Channel $channel) use ($data) {
          return $channel->exchangeDeclare('test', 'direct');
        }
        )->then(function (Bunny\Channel $channel) use ($data) {
          $channel->publish($data, [], 'test')->then( function () use ($channel) {
            $channel->close();
          });
        }
        );
      });
      $connection->on('end', function () use ($connection) {
        $connection->close();
      });
      $connection->on('error', function (Exception $e) {
        echo 'conn error: ' . $e->getMessage() . PHP_EOL;
      });
    })->on('error', function (Exception $e) {
      echo 'server error: ' . $e->getMessage() . PHP_EOL;
    })->on('end', function () {
      echo 'server quit';
    });
    
    
  });
  
  $loop->run();

Missing changelog / upgrade guide

@jakubkulhan I can't find any changelog or upgrade guide for this repository. Except git history of course but I don't have time to study everything to see if there were any BC breaks. There are no release notes to look at either.

I'd like to use newer bunny in enqueue/amqp-bunny but when asked if bunny 0.3 or 0.4 have any BC breaks I have no way to answer that question.

Bug in onFrameReceived() function in Channel.php, $client property unset

When disconnecting a client, e.g. $client->disconnect() or $channel->close() the client property is unset in the onFrameReceived() function in Channel.php, see here.

This causes the exception below when subsequent calls are made to $channel->getClient() after a client have been disconnected:

PHP Notice:  Undefined property: Bunny\Channel::$client in /home/vagrant/projects/gateways/vendor/bunny/bunny/src/Bunny/Channel.php on line 118

I think that instead of unsetting the property, e.g.,

unset($this->client);

it should simply be set to null

$this->client = null;

Failing connections cause exceptions thrown outside of Promise

I'm getting this error when connecting to RabbitMQ
PHP Fatal error: Uncaught Bunny\Exception\ClientException: Broken pipe or closed connection. in /.../vendor/bunny/bunny/src/Bunny/AbstractClient.php:289

This happens because when I connect, I'm connecting with specific credentials and RabbitMQ is denying the connection. When this happens the stream is closed so we get an end of file:

if (@feof($this->stream)) {
throw new ClientException("Broken pipe or closed connection.");
}

I have multiple connections opened by different users, so I need to handle this gracefully. This is the extension I've written to handle the problem.

I'm using https://packagist.org/packages/evenement/evenement as well.

Hope this helps.

<?php
declare(strict_types=1);

namespace MyClient;

use Bunny\Async\Client;
use Evenement\EventEmitterInterface;
use Evenement\EventEmitterTrait;
use React\Promise;

class BunnyAsyncClient extends Client implements EventEmitterInterface
{
    use EventEmitterTrait;

    /**
     *
     */
    public function onDataAvailable()
    {
        try {
            parent::onDataAvailable();
        } catch (\Throwable $e) {
            $this->eventLoop->removeReadStream($this->getStream());
            $this->eventLoop->futureTick(function () use ($e) {
                $this->emit('error', [$e, $this]);
            });
        }
    }

    /**
     * @return Promise\PromiseInterface
     */
    public function connect()
    {
        $deferred = new Promise\Deferred();

        $errBack = function (\Throwable $e) use ($deferred, &$errBack) {
            $this->removeListener('error', $errBack);
            $deferred->reject($e);
        };

        $this->on('error', $errBack);

        parent::connect()->then(
            function () use ($deferred) {
                return $deferred->resolve($this);
            },
            function (\Throwable $e) use ($deferred) {
                // needed in case rejected not by the errBack
                $deferred->reject($e);
            }
        )->always(function () use ($errBack) {
            $this->removeListener('error', $errBack);
        });

        return $deferred->promise();
    }
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.