php-amqplib / php-amqplib Goto Github PK
View Code? Open in Web Editor NEWThe most widely used PHP client for RabbitMQ
Home Page: http://www.rabbitmq.com/getstarted.html
License: GNU Lesser General Public License v2.1
The most widely used PHP client for RabbitMQ
Home Page: http://www.rabbitmq.com/getstarted.html
License: GNU Lesser General Public License v2.1
[Just installed old rabbitmq-server
version]
I've tried to connect to an ipv6 rabbitmq host and I get:
PhpAmqpLib\Exception\AMQPRuntimeException: Error Connecting to server(0): php_network_getaddresses: getaddrinfo failed: Name or service not known (uncaught exception)
Hi Alvaro,
do you plan to implement Publisher confirms (acknowledgements)?
This feature is a big deal for enterprise deployments, but I haven't found any information regarding this feature in you library...
http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/
Thanks,
This option seems to be useless now, because actually blocking read is performed. Now with this option enabled wait() will return after something is received, not only when one of allowed methods is received. But if there is not data in the sockets, wait will not return ever even with this option enabled. IMO it is not an expected behaviour.
This option is introdiced here: https://github.com/videlalvaro/php-amqplib/pull/11/files, but it is not even used in new non_blocking example. Seems like it is refuse from unsuccessful experiment
Hi, when I try to run
videlalvaro/php-amqplib/demo/amqp_consumer.php
I've got the error after process completed:
PHP Fatal error: Uncaught exception 'PhpAmqpLib\Exception\AMQPRuntimeException' with message 'Error reading data. Received 0 instead of expected 1 bytes' in /application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php:50
Stack trace:
#0 /application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php(106): PhpAmqpLib\Wire\IO\StreamIO->read(1)
#1 /application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php(145): PhpAmqpLib\Wire\AMQPReader->rawread(1)
#2 /application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(236): PhpAmqpLib\Wire\AMQPReader->read_octet()
#3 /application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(264): PhpAmqpLib\Connection\AbstractConnection->wait_frame(0)
#4 /application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(111): PhpAmqpLib\Connection\AbstractConnection->wait_channel(1, 0)
#5 /application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(207): PhpAmqpLib\Channel\AbstractChannel->next_fram in /application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php on line 50
What's the problem? Help me please.
oren@oren-laptop:~/vendor/videlalvaro/php-amqplib/demo$ php amqp_consumer.php
< [hex]:
0000 41 4D 51 50 01 01 09 01 AMQP....
waiting for 10,10
waiting for a new frame
10,10: Connection.start
Start from server, version: 8.0, properties: capabilities=(), copyright=Copyright (C) 2007-2012 VMware, Inc., information=Licensed under the MPL. See http://www.rabbitmq.com/, platform=Erlang/OTP, product=RabbitMQ, version=2.8.7, mechanisms: PLAIN, AMQPLAIN, locales: en_US
< [hex]:
0000 01 00 00 00 00 00 A2 00 0A 00 0B 00 00 00 38 07 ......�. ......8.
0010 6C 69 62 72 61 72 79 53 00 00 00 13 50 48 50 20 libraryS ....PHP
0020 53 69 6D 70 6C 65 20 41 4D 51 50 20 6C 69 62 0F Simple A MQP lib.
0030 6C 69 62 72 61 72 79 5F 76 65 72 73 69 6F 6E 53 library_ versionS
0040 00 00 00 03 30 2E 31 08 41 4D 51 50 4C 41 49 4E ....0.1. AMQPLAIN
0050 00 00 00 4F 05 4C 4F 47 49 4E 53 00 00 00 16 61 ...O.LOG INS....a
0060 70 70 31 30 35 35 30 30 35 31 5F 68 65 72 6F 6B pp105500 51_herok
0070 75 2E 63 6F 6D 08 50 41 53 53 57 4F 52 44 53 00 u.com.PA SSWORDS.
0080 00 00 20 74 66 38 47 41 4E 51 46 76 36 4D 6B 39 .. tf8GA NQFv6Mk9
0090 63 33 32 72 4D 5A 56 6A 6E 75 35 35 5F 48 55 36 c32rMZVj nu55_HU6
00A0 33 42 56 05 65 6E 5F 55 53 CE 3BV.en_U S
< 10,11: Connection.start_ok
waiting for 10,20, 10,30
waiting for a new frame
10,30: Connection.tune
< [hex]:
0000 01 00 00 00 00 00 0C 00 0A 00 1F FF FF 00 02 00 ........ ...��...
0010 00 00 00 CE ...
< 10,31: Connection.tune_ok
< [hex]:
0000 01 00 00 00 00 00 1E 00 0A 00 28 17 2F 61 70 70 ........ ..(./app
0010 31 30 35 35 30 30 35 31 5F 68 65 72 6F 6B 75 2E 10550051 _heroku.
0020 63 6F 6D 00 00 CE com..
< 10,40: Connection.open
waiting for 10,41, 10,50
waiting for a new frame
PHP Fatal error: Uncaught exception 'Exception' with message 'Error reading data. Received 0 instead of expected 1 bytes' in /vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php:61
Stack trace:
#0 /vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php(94): PhpAmqpLib\Wire\AMQPReader->rawread(1)
#1 /vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php(268): PhpAmqpLib\Wire\AMQPReader->read_octet()
#2 /vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php(288): PhpAmqpLib\Connection\AMQPConnection->wait_frame()
#3 /vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(143): PhpAmqpLib\Connection\AMQPConnection->wait_channel(0)
#4 /vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(240): PhpAmqpLib\Channel\AbstractChannel->next_frame()
#5 /home/oren/workspac in /vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php on line 61
When I call $channel->wait() it calls consumer callback in such call chain:
return AbstractChannel->dispatch
return AMQPChannel->basic_deliver
'basic_deliver' just calls 'call_user_func', but it doesn't returns a result of that call.
It would be nice to add 'return' statement. Then it would be possible to write:
$callback_result = $channel->wait();
Hello,
Whenever a queue_declare fails, the connection is lost.
For example in the above code, the part inside the catch always fails because the connection is closed
<?php
require __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Exception\AMQPException;
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();
try {
// declare a queue with the passive bit.
// It must return a 404 code if the queue does not exists or a 406 if it exists with a different settings
$channel->queue_declare('pretty.queue', true, true);
} catch (AMQPException $e) {
if ($e->getCode() == 406) {
// precondition failed, I delete it to recreate it with the correct settings
$channel->queue_delete('pretty.queue');
$channel->queue_declare('pretty.queue', false, true);
} elseif ($e->getCode() == 404) {
// queue do not exist, I create it
$channel->queue_declare('pretty.queue', false, true);
}
$channel->queue_declare('pretty.queue', true, true);
}
results in :
PHP Fatal error: Call to a member function send_channel_method_frame() on a non-object in /tmp/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php on line 148
PHP Stack trace:
PHP 1. {main}() /tmp/test.php:0
PHP 2. PhpAmqpLib\Channel\AMQPChannel->queue_declare() /tmp/test.php:18
PHP 3. PhpAmqpLib\Channel\AbstractChannel->send_method_frame() /tmp/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AMQPChannel.php:373
Fatal error: Call to a member function send_channel_method_frame() on a non-object in /tmp/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php on line 148
Call Stack:
0.0002 246392 1. {main}() /tmp/test.php:0
0.0125 1122240 2. PhpAmqpLib\Channel\AMQPChannel->queue_declare() /tmp/test.php:18
0.0126 1125536 3. PhpAmqpLib\Channel\AbstractChannel->send_method_frame() /tmp/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AMQPChannel.php:373
The only fix I found is to create a new channel before using it like this :
<?php
require __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Exception\AMQPException;
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();
try {
// declare a queue with the passive bit.
// It must return a 404 code if the queue does not exists or a 406 if it exists with a different settings
$channel->queue_declare('pretty.queue', true, true);
} catch (AMQPException $e) {
if ($e->getCode() == 406) {
// precondition failed, I delete it to recreate it with the correct settings
$channel = $connection->channel();
$channel->queue_delete('pretty.queue');
$channel->queue_declare('pretty.queue', false, true);
} elseif ($e->getCode() == 404) {
// queue do not exist, I create it
$channel = $connection->channel();
$channel->queue_declare('pretty.queue', false, true);
}
$channel->queue_declare('pretty.queue', true, true);
}
The vendor/autoload.php is missing on github, it's used in the demo and test.
Could some add this or tell me where I can get this?
Hey, I was trying to use nack, and I've got an error:
PHP Notice: fwrite(): send of 19 bytes failed with errno=32 Broken pipe in /.../vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php on line 164
The sample of my php code:
public function process(AMQPMessage $message)
{
// ...
if ($condition) {
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
} else {
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, false);
}
}
Do I miss something ?
When I use reject, all is okay.
I apologize if this is not really a amqplib specific question, but i guess this problem is a quite common among users of this library, so i thought it might be an appropriate place to ask.
Some of our amqp consumers create persistent database connections - in our case to a MySQL database. However, when the consumer is idle for a long time, the database will close the connection, and the next message received by the consumer will fail.
Are there any best practices how to tackle this problem when using php consumers?
I don't think there's a mailing list for the library, so i'm misusing the issue tracker for a question...
As far as i understood rabbitmq will send failed messages to newly available workers when a worker dies and doesn't send back an acknowledgment that the message has been processed.
Are there any strategies how to avoid deadlocks which could freeze a whole queue, for example if a worker transcodes a video file and the underlying ffmpeg process fails for a specific file.
Then rabbitmq would send the message to a next worker which would also fail and so on.
Any best practices on that problem?
Can you please change $debug in AbstractChannel to be public, or provide a setter so that debug can be enabled and disabled by callers instead of defining a constant?
Is there a way to check the list of available consumers from the broker? something like this, but available from within php:
rabbitmqctl list_consumers
my php-amqplib version is v2.1.0
I was running a command that send message to a queue. but I've got this error:
PHP Notice: fwrite(): send of 19 bytes failed with errno=32 Broken pipe in /var/www/www.test.com/shared/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php on line 61
We have found a problem which causes the client library not work under some circumstances.
If the PHP INI settings below are set the the indicated values, the client library refuses to finish the handshake.
mbstring.internal_encoding => UTF-8
mbstring.func_overload => 2
It's most likely because to implement the binary protocol you take advantage of string functions excessively.
We got around the problem by callign ini_set each time we access the library, something like this:
$original_internal_encoding = ini_get( 'mbstring.internal_encoding' );
ini_set( 'mbstring.internal_encoding', null );
// Call the php-amqp
ini_set( 'mbstring.internal_encoding', $original_internal_encoding );
...but this is quite inconvenient. Do you think this problem could be addressed inside the library?
Thank you!
Is there a way to modify the message properties like application_headers
when rejecting a message using basic_reject
? I'm trying to add a stack trace to the message before it's put into a dead letter queue for easier debugging.
Right now i've tried something along these lines, but there's no additional properties when retrieving the message in the rabbitmq backend:
<?php
$message->set('application_headers', array('exception' => 'some exception info'));
$message->delivery_info['channel']->basic_reject($message->delivery_info['delivery_tag'], false);
I have 2 queues, q1 and q2. q1 already has some messages in it, q2 is empty. I register a consumer for each of them via basic_consume(), in the same script, on the same channel. In the RabbitMQ management console, both messages in q1 move to unacked state. I then call wait() on the channel. Nothing happens, the registered callbacks don't get called unless a new message is inserted into either q1 or q2. Also, if there already is a message waiting in q2 when the script starts, the callbacks get called for all the messages from q1 and q2.
It seems that the messages get delivered to the consumers as soon as a consumer is registered, but if something else but wait() is called next, the consumer callbacks don't get called for the previously delivered messages.
This issue can be worked around by calling flow(false) before and flow(true) after registering the consumers to prevent delivery of messages.
I don't know which is the intended behavior here, I couldn't find anything about this case on the web. If calling flow() is the proper way, it should be mentioned somewhere (example scripts, documentation, readme, ...). If it is not, then this is a bug in php-amqplib, so please fix it :)
The library is currently unable to simultaneously process messages for different channels on the same connection. E.g. say you want a worker that listens on a job message queue (consumer1) as well as a management queue (to receive messages that trigger it to reconfigure itself, shutdown, etc.) (consumer2) and you want to have each consumer use their own channel.
Such a setup is currently impossible due to the nature of AbstractConnection::wait_channel().
Ironically, it seems this can be easily mitigated by modifying that method to always call wait() on any messages for other channels:
protected function wait_channel($channel_id, $timeout = 0)
{
while (true) {
list($frame_type, $frame_channel, $payload) = $this->wait_frame($timeout);
if ($frame_channel == $channel_id) {
return array($frame_type, $payload);
}
// we introduce a check if the channel still exists since we now have
// potential for other code to interfere
if (isset($this->channels[$frame_channel])) {
// Not the channel we were looking for. Queue this frame
//for later, when the other channel is looking for frames.
array_push($this->channels[$frame_channel]->frame_queue,
array($frame_type, $payload));
// Always invoke a 'sub-wait()' causing the according channel to process
// the just queued frame and call us again
$this->channels[$frame_channel]->wait();
}
}
}
This potentially results in a somewhat ugly call chain, but should be safe due to the frame- and method queueing mechanisms in AbstractChannel.
Of course in a larger refactoring effort this could be done a lot cleaner, but this solution should work without modifying AbstractChannel and hence doesn't require people to change their code.
I am using phpamqplib with OldSoundRabbitMqBundle
and during start of consumer via CLI ./app/console rabbitmq:consumer -m 50 primary
I get an error "Expecting AMQP method, received frame type: 3".
Not sure what is the reason for it to happen, but it looks it happens when query is not empty. Is it bundle specific problem or phpamqplib's ?
Here is the debug output:
< [hex]:
0000 41 4D 51 50 01 01 09 01 AMQP....
waiting for 10,10
waiting for a new frame
> 10,10: Connection.start
Start from server, version: 8.0, properties: capabilities=(), copyright=Copyright (C) 2007-2012 VMware, Inc., information=Licensed under the MPL. See http://www.rabbitmq.com/, platform=Erlang/OTP, product=RabbitMQ, version=2.8.4, mechanisms: PLAIN, AMQPLAIN, locales: en_US
< [hex]:
0000 01 00 00 00 00 00 76 00 0A 00 0B 00 00 00 38 07 ......v. ......8.
0010 6C 69 62 72 61 72 79 53 00 00 00 13 50 48 50 20 libraryS ....PHP
0020 53 69 6D 70 6C 65 20 41 4D 51 50 20 6C 69 62 0F Simple A MQP lib.
0030 6C 69 62 72 61 72 79 5F 76 65 72 73 69 6F 6E 53 library_ versionS
0040 00 00 00 03 30 2E 31 08 41 4D 51 50 4C 41 49 4E ....0.1. AMQPLAIN
0050 00 00 00 23 05 4C 4F 47 49 4E 53 00 00 00 05 67 ...#.LOG INS....g
0060 75 65 73 74 08 50 41 53 53 57 4F 52 44 53 00 00 uest.PAS SWORDS..
0070 00 05 67 75 65 73 74 05 65 6E 5F 55 53 CE ..guest. en_US?
< 10,11: Connection.start_ok
waiting for 10,20, 10,30
waiting for a new frame
> 10,30: Connection.tune
< [hex]:
0000 01 00 00 00 00 00 0C 00 0A 00 1F FF FF 00 02 00 ........ ...??...
0010 00 00 00 CE ...?
< 10,31: Connection.tune_ok
< [hex]:
0000 01 00 00 00 00 00 08 00 0A 00 28 01 2F 00 00 CE ........ ..(./..?
< 10,40: Connection.open
waiting for 10,41, 10,50
waiting for a new frame
> 10,41: Connection.open_ok
Open OK! known_hosts:
using channel_id: 1
< [hex]:
0000 01 00 01 00 00 00 05 00 14 00 0A 00 CE ........ ....?
< 20,10: Channel.open
waiting for 20,11
waiting for a new frame
> 20,11: Channel.open_ok
Channel open
< [hex]:
0000 01 00 01 00 00 00 1A 00 28 00 0A 00 00 07 70 72 ........ (.....pr
0010 69 6D 61 72 79 06 64 69 72 65 63 74 02 00 00 00 imary.di rect....
0020 00 CE .?
< 40,10: Channel.exchange_declare
waiting for 40,11
waiting for a new frame
> 40,11: Channel.exchange_declare_ok
< [hex]:
0000 01 00 01 00 00 00 13 00 32 00 0A 00 00 07 70 72 ........ 2.....pr
0010 69 6D 61 72 79 02 00 00 00 00 CE imary... ..?
< 50,10: Channel.queue_declare
waiting for 50,11
waiting for a new frame
> 50,11: Channel.queue_declare_ok
< [hex]:
0000 01 00 01 00 00 00 1C 00 32 00 14 00 00 07 70 72 ........ 2.....pr
0010 69 6D 61 72 79 07 70 72 69 6D 61 72 79 00 00 00 imary.pr imary...
0020 00 00 00 CE ...?
< 50,20: Channel.queue_bind
waiting for 50,21
waiting for a new frame
> 50,21: Channel.queue_bind_ok
< [hex]:
0000 01 00 01 00 00 00 2A 00 3C 00 14 00 00 07 70 72 ......*. <.....pr
0010 69 6D 61 72 79 1A 50 48 50 50 52 4F 43 45 53 53 imary.PH PPROCESS
0020 5F 42 6F 62 2E 6C 6F 63 61 6C 5F 32 33 33 38 37 _Bob.loc al_23387
0030 00 CE .?
< 60,20: Channel.basic_consume
waiting for 60,21
waiting for a new frame
> 60,21: Channel.basic_consume_ok
waiting for any method
waiting for a new frame
> 60,60: Channel.basic_deliver
waiting for a new frame
waiting for a new frame
using channel_id: 2
< [hex]:
0000 01 00 02 00 00 00 05 00 14 00 0A 00 CE ........ ....?
< 20,10: Channel.open
waiting for 20,11
waiting for a new frame
> 20,11: Channel.open_ok
Channel open
< [hex]:
0000 01 00 02 00 00 00 1A 00 28 00 0A 00 00 07 74 77 ........ (.....tw
0010 69 74 74 65 72 06 64 69 72 65 63 74 02 00 00 00 itter.di rect....
0020 00 CE .?
< 40,10: Channel.exchange_declare
waiting for 40,11
waiting for a new frame
> 40,11: Channel.exchange_declare_ok
< [hex]:
0000 01 00 02 00 00 00 10 00 3C 00 28 00 00 07 74 77 ........ <.(...tw
0010 69 74 74 65 72 00 00 CE itter..?
< 60,40: Channel.basic_publish
< [hex]:
0000 02 00 02 00 00 00 1A 00 3C 00 00 00 00 00 00 00 ........ <.......
0010 00 00 32 90 00 0A 74 65 78 74 2F 70 6C 61 69 6E ..2?..te xt/plain
0020 02 CE .?
< [hex]:
0000 03 00 02 00 00 00 32 7B 22 69 6D 61 67 65 5F 69 ......2{ "image_i
0010 64 22 3A 34 33 35 31 36 2C 22 74 61 73 6B 22 3A d":43516 ,"task":
0020 22 66 69 6E 64 5F 74 61 67 73 5F 61 6E 64 5F 6D "find_ta gs_and_m
0030 65 6E 74 69 6F 6E 73 22 7D CE entions" }?
using channel_id: 3
< [hex]:
0000 01 00 03 00 00 00 05 00 14 00 0A 00 CE ........ ....?
< 20,10: Channel.open
waiting for 20,11
waiting for a new frame
> 20,11: Channel.open_ok
Channel open
< [hex]:
0000 01 00 03 00 00 00 1C 00 28 00 0A 00 00 09 64 65 ........ (.....de
0010 6C 69 63 69 6F 75 73 06 64 69 72 65 63 74 02 00 licious. direct..
0020 00 00 00 CE ...?
< 40,10: Channel.exchange_declare
waiting for 40,11
waiting for a new frame
> 40,11: Channel.exchange_declare_ok
< [hex]:
0000 01 00 03 00 00 00 12 00 3C 00 28 00 00 09 64 65 ........ <.(...de
0010 6C 69 63 69 6F 75 73 00 00 CE licious. .?
< 60,40: Channel.basic_publish
< [hex]:
0000 02 00 03 00 00 00 1A 00 3C 00 00 00 00 00 00 00 ........ <.......
0010 00 00 32 90 00 0A 74 65 78 74 2F 70 6C 61 69 6E ..2?..te xt/plain
0020 02 CE .?
< [hex]:
0000 03 00 03 00 00 00 32 7B 22 69 6D 61 67 65 5F 69 ......2{ "image_i
0010 64 22 3A 34 33 35 31 36 2C 22 74 61 73 6B 22 3A d":43516 ,"task":
0020 22 66 69 6E 64 5F 74 61 67 73 5F 61 6E 64 5F 6D "find_ta gs_and_m
0030 65 6E 74 69 6F 6E 73 22 7D CE entions" }?
< [hex]:
0000 01 00 01 00 00 00 0D 00 3C 00 50 00 00 00 00 00 ........ <.P.....
0010 00 00 01 00 CE ....?
< 60,80: Channel.basic_ack
waiting for any method
waiting for a new frame
[Exception]
Expecting AMQP method, received frame type: 3
hi!
The situation is there is an excange which script will connect in passive mode.
If there no the excange $exchange->declare() throw AMQPExchangeException but if you try second time (in same script session) it hang-on at $exchange->declare().
Say, we need to send a message to queue1, then send another message to queue2 in one script runtime. But when I'm trying to do this (another queue_declare and queue_bind call), the message will be delivered to ALL customers, even though exchange type is DIRECT.
What am I doing wrong?
How i can set up heartheat value for my workers using php-amqplib?
Hi,
I am playing with the code provided in demo/amqp_publisher.php and I have added "$ch->tx_select();" to the channel and I am sending a lot of messages followed by "$ch->tx_commit();" to make sure the messages are persisted on disk.
Then I use amqp_consumer.php to read all the messages from the queue. After the queue is empty and i restart the rabbitmq server the messages are back in the queue.
What is the proper way to acknowledge that this messages are consumed?
p.s. If a set $ch->tx_commit() in the consumer the queue never even reaches 0 message count.
Hi there,
When I try to deploy my app using php-amqplib I have this error :
** [out :: prod] [Symfony\Component\DependencyInjection\Exception\ParameterNotFoundException]
** [out :: prod] You have requested a non-existent parameter "amqp_host".
I double check the "app/config/parameters.ini" and the parameter amqp_host is well written (amqp_host="fully.qualified.domain.name").
Any idea ?
PS : I'm using the last version of both php-amqplib and RabbitMqBundle.
AMQPChannel::basic_reject line 620
Inside basic_reject, it calls basicRecover on frameBuilder instead of basicReject, causing an exception.
I've seen some processes hanging in AMQPConnection->__destruct
after a "Broken pipe or closed connection"
exception was thrown by AMQPConnection->write
.
The backtrace looks like this (from gdb, using zbacktrace from php's .gdbinit):
// hanging here:
[0x7fa4f4e4e618] fread(resource(#865), 1) vendor/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php:55
[0x7fa4f4e4e480] PhpAmqpLib\Wire\AMQPReader->rawread(1) vendor/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php:94
[0x7fa4f4e4e058] PhpAmqpLib\Wire\AMQPReader->read_octet() vendor/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php:268
[0x7fa4f4e4dd00] PhpAmqpLib\Connection\AMQPConnection->wait_frame() vendor/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php:288
[0x7fa4f4e4db20] PhpAmqpLib\Connection\AMQPConnection->wait_channel(0) vendor/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php:143
[0x7fa4f4e4cc18] PhpAmqpLib\Channel\AbstractChannel->next_frame() vendor/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php:240
[0x7fa4f4e4c8b8] PhpAmqpLib\Channel\AbstractChannel->wait(array(1)[0x2eec498]) vendor/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php:338
[0x7fa4f4e4c770] PhpAmqpLib\Connection\AMQPConnection->close() vendor/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php:129
[0x7fff9a628f20] PhpAmqpLib\Connection\AMQPConnection->__destruct()
The resource is in open state, and not eof. netstat show that the underlying socket is in ESTABLISHED state on both side.
Quick fix is to disable close_on_destruct
, however I'm not sure of the implications of that.
If I understand things correctly channels in Rabbit are a way to multiplex connections. It's my understanding that a channel without a connection doesn't really make sense.
Here https://github.com/videlalvaro/php-amqplib/blob/master/PhpAmqpLib/Channel/AMQPChannel.php#L49 there is definitely thought given to cleaning up channels.
I think the reason why the code never got written is that the Connection itself should iterate over its channel array and close them off, since it's the aggregate and has sufficient knowledge of lifetimes.
It's a pretty useful feature, and my initial dig-through the codebase doesn't show me how I can implement this. The basics would be as follows (I think):
<?php
// Add the following to `AMQPChannel`
function confirm_select($nowait = true, $callback = null):
'''
Set this channel to use publisher confirmations.
'''
$nowait = $nowait && $this->allow_nowait() && !$callback;
$this->_msg_id = 0;
$this->_last_ack_id = 0;
list($class_id, $method_id, $args) = $this->protocolWriter->confirmSelect($nowait);
$this->send_method_frame(array($class_id, $method_id), $args);
if (!$nowait) {
$this->select_callbacks[] = $callback;
$this->wait(
$this->waitHelper->get_wait('confirm.select_ok')
)
}
}
function confirm_select_ok($method_frame) {
$callback = array_pop($this->select_callbacks);
$callback();
}
I think this is sort of what is needed, though I am not quite sure whether the callback stuff is correct. I translated the code from here, if it matters.
I'd be happy to work through this with anyone who has any idea as to how this works!
I'm curious on the differences between this library and the PECL extension. It seems like there's a significant API overlap between the two.
It seems like a common pattern for pure PHP libraries to provide a fallback for the compiled versions if they exist. For instance, Zend_Json
provides their custom interface but if a native JSON extensions exists, it uses that native library under the hood to increase performance. (I know that Zend Framework is old news, and that JSON is probably a much simpler API.)
I guess I have these questions:
I'm in a position where I can install the PECL version (and I assume it's faster) but I'll end up having to use the pure PHP version because that is what the Symfony Bundle uses. I'm happy to be told I missed the documentation that addresses this.
This might probably not be a problem with php-amqlib directly, but I would appreciate if someone else could also test it, so we can pinpoint the problem, inform people and maybe even help to find a solution.
It might be some issue with Virtualbox, PHP itself or RabbitMQ, I have no Idea yet...
So, doing some development and performance testing, I noticed a strange phenomenon .
The throughput of messages in an RPC-Queue on a windows machine was about 650msg/s, while on the two tested Linux Virtual machines it wouldn't go higher than 25msg/s with the same scripts, same PHP and same RabbitMQ-Server(3.1.4 (Windows and Centos), (3.1.5 on Fedora))!!! Erlang is R16B01 on Windows and Centos and R16B02 on Fedora
That said, I must confess that the Linux machines are on a VirtualBox inside my Windows development box.
However, I/O performance on both Linux machines is very good.
The Host CPU is a i2600K with 16GB DDR3 1866Mhz RAM, and the CPU doesn't even reach 10% during highest load of the tests (Full throughput on Windows).
Linuxes get 4GB each, but don't actually use more than 1.5GB
The first Linux where I discovered it, was a fully updated Centos 6.4.
After that I quickly set up a Fedora 19 with a 3.10 kernel, just to be sure, there isn't some problem with the Centos Kernel (polling, etc).
So this should rule out any Linux Kernel related problem. Network buffers are big enough, file descriptors are virtually unused in this case, so I don't think we're yet at the point where we should be doing micro optimizations. These are normal installations and should provide decent throughput, even without doing heavy optimizations to the OS.
Unfortunately at this time I don't have a real Linux machine to test it, in order to exclude the possibility of Virtualbox being the problem.
Does anyone of you guys have a real box with Linux, Rabbit and php-amqlib to do a few tests?
I have included some profiling into AbstractConnection.php
and done a 20000 messages loop in the producer. Also, in order to have a clean profiling, I have remarked the fib calculations and just return whatever the producer sent.
I have put the modified AbstractConnection.php
, rpc-consumer.php
and rpc-producer.php
in this gist: https://gist.github.com/frankmayer/6264596, so there's not a lot to prepare for the tests. You might need to change the autoloader path in the producer/consumer though.
So, here's the profiling output of the Producer on Windows and after that, on Linux:
Notice that, in both Windows and Linux, the first wait_frame always takes longer, which is because the first fread() takes longer than the subsequent ones.
But the big difference is that it takes only about 1,5 to 2 ms on windows while it takes around 40ms on Linux, which is a big difference. This amount in every roundtrip is the problem that makes the difference of 650msg/s to 25msg/s throughput.
Producer on Windows:
[.] AbstractConnection::wait_frame() Got response in: 0.0020 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0015 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0005 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0015 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0020 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0015 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0015 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
Producer on Linux:
[.] AbstractConnection::wait_frame() Got response in: 0.0411 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0404 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0416 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0411 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0402 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0404 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0412 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0414 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0403 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0405 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0402 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0403 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
[.] AbstractConnection::wait_frame() Got response in: 0.0412 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] AbstractConnection::wait_frame() Got response in: 0.0000 seconds
[.] Got 30
One more note: the throughput on windows echoing this profiling, went down to 450 msg/s while on Linux it stayed at 25msg/s.
When profiling these kind of scripts, it's important to not have the XDebug extension loaded at all, as it will hugely impact performance, even in its disabled state. Read my blog post on this exact matter, here: http://bit.ly/14SaWpp
Thanks for helping and please let me know if you need any additional info.
Reading Python tutos, we can see that both queue and message has to be configured as durable if we don't want to loose messages if RabbitMQ crashes.
http://www.rabbitmq.com/tutorials/tutorial-two-python.html
Python basic_publish has a delivery_mode property
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
which not exists on php-amqplib
public function basic_publish($msg, $exchange="", $routing_key="",
$mandatory=false, $immediate=false,
$ticket=NULL)
Is there a way to publish a durable message with php-amqplib ?
When I use method of basic_qos, I get the following error
...->basic_qos(1, 1, false);
[PhpAmqpLib\Exception\AMQPProtocolConnectionException]
NOT_IMPLEMENTED - prefetch_size!=0 (1)
How would one go about using this with out composer, i am not using it in a project of mine and dont feel like adding it this close to the end of my project?
Currently, the connection reaches the RabbitMQ server as soon as you instantiate it (it is done in the constructor).
But this is a bad idea for people using dependency injection: their code producing messages will then connect to RabbitMQ each time even when their logic is actually conditional.
To be friendly with DI, you should not put logic in the constructor, so that instantiating the object does not become an heavy operation.
I am a newbie at working upon messaging. Also, my php skills are a bit rusty. So, this might be a no-brainer question !
PHP Parse error: syntax error, unexpected T_STRING, expecting T_CONSTANT_ENCAPSED_STRING or '(' in /opt/rabbitmq_server-2.8.6/php-amqplib/demo/amqp_consumer.php on line 4
This is what I have in my Linux box :-
PHP Version : PHP 5.1.6
Rabbitmq : v2.8.6
I am not sure of how to troubleshoot this. Any inputs on this, please ?
Thanx !
Hey,
my PHP error log mentioned some notices in a PHP 5.5.1 environment on Ubuntu 12.04:
[15-Aug-2013 09:13:49 Europe/Berlin] PHP Notice: fwrite(): send of 12 bytes failed with errno=104 Connection reset by peer in /var/application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php on line 61
[15-Aug-2013 09:13:49 Europe/Berlin] PHP Notice: fwrite(): send of 19 bytes failed with errno=32 Broken pipe in /var/application/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php on line 61
This are only notices and all seems to be working as normal, but i think this has to be fixed. At the moment i do not know any kind of fix for this.
Maybe you got an idea.
We're experiencing a strange behavior which is only happening on our staging server:
When a producer publishes a message to rabbitMQ, that message is never received by rabbitmq, the rabbitMQ log shows that a connection is being opened, but the connection is never closed.
When this is done from the CLI, the php process hangs forever, you need to kill it manually.
The weird thing is that consumers can connect successfully to rabbitMQ.
Any hints how to debug this problem?
The following will send a message of 127 bytes:
php php-amqplib/demo/amqp_publisher.php "$(printf "%127s")"
But when you try to send a message of 128 bytes or more:
php php-amqplib/demo/amqp_publisher.php "$(printf "%128s")"
...it fails.
Hi,
RabbitMQ server puts connection into blocking state if memory watermark alarm is true. In this state RMQ blocks publishing and basic_publish() method returns false. But after this if I try to close channel it will be hanging while waiting ''channel.close_ok' method. Passing a timeout value to the wait method called in AMQPChannel.close() solves this problem but I am not sure whether it is a good idea or bad. The timeout which is set in the AMQPStreenConnection is not working.
When I send mandatory message and there is exchange, but there is no queue connected to the exchange I receive a
'Exception' with message 'Expecting AMQP method, received frame type: 2
when closing the channel.
Shouldn`t the exception be thrown on the point of sending the message?
According to the java http://www.rabbitmq.com/api-guide.html#returning there cat be ReturnListener, which will catch the error.
See also: http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.return
$ php emit_return_error.php
[x] Sent non-mandatory ... done.
[x] Sent mandatory ... done.
PHP Fatal error: Uncaught exception 'Exception' with message 'Expecting AMQP method, received frame type: 2' in ./lib/PhpAmqpLib/Channel/AbstractChannel.php:225
Stack trace:
#0 ./lib/PhpAmqpLib/Channel/AMQPChannel.php(116): PhpAmqpLib\Channel\AbstractChannel->wait(Array)
#1 ./emit_return_error.php(30): PhpAmqpLib\Channel\AMQPChannel->close()
#2 {main}
thrown in ./lib/PhpAmqpLib/Channel/AbstractChannel.php on line 225
code to reproduce (emit_return_error.php) :
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Connection\AMQPConnection;
require_once('autoload.php');
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// declare exchange but don`t bind any queue
$channel->exchange_declare('hidden_exchange', 'topic');
$msg = new AMQPMessage("Hello World!");
echo " [x] Sent non-mandatory ...";
$channel->basic_publish($msg,
'hidden_exchange',
'rkey');
echo " done.\n";
echo " [x] Sent mandatory ... ";
$channel->basic_publish($msg,
'hidden_exchange',
'rkey',
true );
echo " done.\n";
$channel->close();
$connection->close();
Since today i've got a very strange error. We are using this lib together with Symfony 2.
This happens while opening the connection to the RabbitMQ server. This is the part of my controller:
$rabbitmqHost = $this->container->getParameter("rabbitmq_host");
$rabbitmqPort = $this->container->getParameter("rabbitmq_port");
$rabbitmqUser = $this->container->getParameter("rabbitmq_user");
$rabbitmqPassword = $this->container->getParameter("rabbitmq_password");
$rabbitmqVHost = $this->container->getParameter("rabbitmq_vhost");
$rabbitmqExchange = $this->container->getParameter("rabbitmq_exchange");
$this->conn = new AMQPConnection($rabbitmqHost, $rabbitmqPort, $rabbitmqUser, $rabbitmqPassword);
$this->vVhost = $rabbitmqVHost;
$this->exchange = $rabbitmqExchange;
The messaging works without problems. I can send und receive messages from the server, but this message appears in the error log from the webserver:
PHP Fatal error: Uncaught exception 'Symfony\Component\Debug\Exception\ContextErrorException' with message 'Warning: fwrite() expects parameter 1 to be resource, null given in PhpAmqpLib/Wire/IO/StreamIO.php line 61' in PhpAmqpLib/Wire/IO/StreamIO.php:61
Stack trace:
#0 [internal function]: Symfony\Component\Debug\ErrorHandler->handle(2, 'fwrite() expect...', '', 61, Array)
#1 PhpAmqpLib/Wire/IO/StreamIO.php(61): fwrite(NULL, '??????????2????...')
#2 PhpAmqpLib/Connection/AbstractConnection.php(173): PhpAmqpLib\Wire\IO\StreamIO->write('??????????2????...')
#3 PhpAmqpLib/Connection/AbstractConnection.php(258): PhpAmqpLib\Connection\AbstractConnection- in PhpAmqpLib/Wire/IO/StreamIO.php on line 61
Hi,
when i try to run consumer demo im getting 'Uncaught exception 'PhpAmqpLib\Exception\AMQPRuntimeException' with message 'Error reading data. Received 0 instead of expected 7 bytes' in ...'
Details in here => https://gist.github.com/mberberoglu/7212114
I have already checked
-Username-Password
-Virtual Host
-AMQP Library
My amqp library
https://www.dropbox.com/s/xygk215s3pj6fh9/Screenshot%202013-10-29%2012.29.34.png
Could you please help me?
Where i am wrong?
It seems to me that the AMQPSSLConnection class overwrites the passed $vhost variable when constructing an object of this class. With the current code, even when I pass a specific VHost (e.g. '/some_host'), php-amqplib will try to connect to the standard '/' VHost. As I don't allow any users (except for admin) to use the standard '/' VHost, the following error is written to the RabbitMQ log:
exception on TCP connection <0.18196.2> from "ip-adress:port"
{channel0_error,opening,
{amqp_error,access_refused,
"access to vhost '/' refused for user 'username'",
'connection.open'}}
The problem lies in code line 15 of the AMQPSSLConnection class:
parent::__construct($host, $port, $user, $password, $vhost="/",
{...}
By replacing $vhost="/" with simply $vhost, the error can be fixed. Code:
parent::__construct($host, $port, $user, $password, $vhost,
{...}
This does not remove default behavior, as $vhost is already defaulted to '/' in the __construct paramters (line 9):
public function __construct($host, $port,
$user, $password,
$vhost="/", $ssl_options = array(), $options = array())
{...}
As mentioned above, this is a problem of SSL connections only.
The phpdoc is not explicit, and I've scrolled through everything I could find on the web I can't be sure:
Once I know, I'll submit a PR with improved phodoc.
Thanks
I guess I am confused, by looking at the AMQPReader::read_value(), why does it write to error log instead of throwing an exception? Throwing an exception helps developers catch the error immediately, while writing to error log requires developers to keep an eye on error log? Is there any particular reason?
public function read_value($fieldType)
{
$this->bitcount = $this->bits = 0;
$val = NULL;
switch($fieldType) {
case 'S': // Long string
$val = $this->read_longstr();
break;
case 'I': // Signed 32-bit
$val = $this->read_signed_long();
break;
case 'D': // Decimal
$e = $this->read_octet();
$n = $this->read_signed_long();
$val = new AMQPDecimal($n, $e);
break;
case 'T': // Timestamp
$val = $this->read_timestamp();
break;
case 'F': // Table
$val = $this->read_table();
break;
case 'A': // Array
$val = $this->read_array();
break;
default:
// UNKNOWN TYPE
error_log("Usupported table field type $fieldType");
break;
}
return $val;
}
Are there plans to implement additional data types? In AMQPReader it supports 'S', 'I', 'D', 'T', 'F' and outputs an error for any other type. I am experimenting with RabbitMQ and am finding that the reader is unable to parse arrays (type='A'), which are present in messages that are sent to dead-letter exchanges.
I see that some other client libraries implement this, e.g. bodgit/bunny@49f5943
I'm wondering whether you have thought about adding some logic for handling re-connects?
Especially for longer running processes, there is always the possibility that the rabbitmq server is temporarily unavailable which would be good to handle a bit more gracefully than just throwing an exception.
Do you think such logic would belong to this library, or should rather be in a different place?
We ran into a problem with rabbitmq: due to a bug the broker occasionally wouldn't answer to tx.commit and amqpChannel->tx_commit() hangs indefinitly as the tx.commit-ok never arrives.
So we need a timeout for tx_commit and all other methods, which currently do not provide this parameter
Every library user have to implement its own RPC client. We can provide it with the lib
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.