Giter VIP home page Giter VIP logo

amqpcpp's People

Contributors

akalend avatar bumbar1 avatar chenyujian avatar dennislee1201 avatar dnszaikin avatar hauke68 avatar ichaelm avatar jerrybao avatar kashyap2690 avatar lufeng1102 avatar maciekgajewski avatar milk avatar napylov avatar reunanen avatar seaneclarke avatar vasiliy-briginets avatar xbojer avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

amqpcpp's Issues

How to reject or nack a message?

A client gets a message from server and starts working.If it finishes the work,it can use AMQPQueue.Ack to notify the server.But how to send a fail message to notify the server to distribute the message to other client?

Reply-to example

Hi,

can you post an example of a Publish with reply-to in "amq.rabbitmq.reply-to" ?

I tried to make it work but with no real success.
First I tried to Publish and then make a Get from amq.rabbitmq.reply-to but I got the error "fast reply consumer does not exist".
Then I tried to make two threads, one is launched first to Consume on the "amq.rabbitmq.reply-to" queue and the other to Publish a message with reply-to set to "amq.rabbitmq.reply-to".
Actually sometimes it works but often the createExchange don't return or throw an Exception.
The error is then : operation none caused a connection exception frame_error: "Malformed UTF-8 in shortstr".
So I'm wondering if I can use the AMPQ instance from two threads.
Here the code I'm using:

#include <vcl.h>
#include <time.h>
#include <windows.h>
#include <process.h>
#include "AMQPcpp.h"

std::string getUUID()
{
    UUID uuid;
    UuidCreate ( &uuid );
    unsigned char * str;
    UuidToStringA ( &uuid, &str );
    std::string uuid_s( ( char* ) str );
    RpcStringFreeA ( &str );
    return uuid_s;
}
std::string gCorrelationId;
int  onMessage( AMQPMessage * aMessage )
{
    uint32_t oLength = 0;
    std::cout << "Result: "<< aMessage->getMessage(&oLength) << "\n";
    std::string oCorrelationId = aMessage->getHeader("correlation_id");
    std::cout<<"    with CorrelationId: "<< oCorrelationId <<"\n";
    if (oCorrelationId == gCorrelationId) 
    {
        std::cout<<"    Reply read\n";
        AMQPQueue *queue = aMessage->getQueue();
        queue->Cancel( aMessage->getConsumerTag() );
    }
    return 0;
}

AMQP amqp("localhost");

unsigned __stdcall consumer(void *params)
{
    AMQPQueue * reply_queue = amqp.createQueue("amq.rabbitmq.reply-to");
    reply_queue->addEvent(AMQP_MESSAGE, onMessage );
    reply_queue->setConsumerTag("reply-to");
    std::cout <<"Waiting for response in "<<reply_queue->getName()<<"\n";
    reply_queue->Consume(AMQP_NOACK);
    std::cout<<"exit consumer\n";
    _endthreadex( 0 );
    return 0;
}

int main(int argc, char* argv[])
{
    try {
        HANDLE hThread;
        unsigned threadID;

        hThread = (HANDLE)_beginthreadex( NULL, 0, &consumer, NULL, 0, &threadID );
        std::cout<<"Thread handle: "<<hThread<<"\n";

        Sleep(1000);

        std::string oMessage = "30";    // ask fibonacci for this number
        std::string correlationId = getUUID();
        gCorrelationId = correlationId;

        // request exchange
        std::cout<<"Create exchange\n";
        AMQPExchange * ex = amqp.createExchange();

        std::cout<<"Publish\n";
        ex->setHeader("Content-type", "text/plain");
        ex->setHeader("Content-encoding", "UTF-8");
        ex->setHeader("Reply-to", "amq.rabbitmq.reply-to");
        ex->setHeader("correlation_id", correlationId);
        ex->Publish( oMessage , "rpc_queue");

        std::cout <<"Publish "<<oMessage<<" with correlation-id: "<<correlationId<<"\n";

        WaitForSingleObject( hThread, INFINITE );
    } 
    catch (AMQPException e) {
        std::cout << e.getMessage() << std::endl;
    }
    std::cout << "Fin Client AMQP" << std::endl;   
    return 0;
}```

Errors while building

While trying to build w/wo Cmake i get an error

/home/vmuser/amqpcpp/src/AMQPQueue.cpp: In member function ‘void AMQPQueue::setConsumerTag(std::string)’:
/home/vmuser/amqpcpp/src/AMQPQueue.cpp:392:21: error: no match for ‘operator=’ (operand types are ‘amqp_bytes_t’ and ‘std::string {aka std::basic_string}’)
this->consumer_tag = consumer_tag;

My compiler is g++ (GCC) 4.8.5 20150623 (Red Hat 4.8.5-28)

The versions of rabbitmq-c i tried are 0.8.0 and 0.9.0

No LICENSE

The project needs a LICENSE file stating the license under which the code is released.

src/AMQPMessage.cpp

There is an error during compilation of the file at line 48. The signature of the function is:
char * AMQPMessage::getMessage(uint32_t* length)

On line 48, the return statement is:
return '\0'

which is a char. And it does not agree with the return type.

I would like to suggest to change it to:
return NULL;
or
return (char *) '\0';

keep alive probe support

Hi,

Is keepalive probe supported by library. Actually i set up connection with RMQServer with 5672 port and my connection is idle for long. I can see in netstat output that connection to RMQ server is lost.

Example close connection AMQP

Hi.
Can you explain how to close the connection and delete classes AMQP and AMQPQueue.
When I remove the objects AMQP,AMQPQueue called functions closeChannel from all objects. From first object function executed normally from second application is down. From example i use example_get.cpp

A bug of parsing connection string.

Bug Description:

An AMQP connection string like this: xunuu:[email protected]:5672/search

and then i call the printConnect() method of AMQP object , the print result like this:

AMQP connection:
port = 5672
host = 192.168.166.62
vhost = /search
user = xunuu
passw = xunuu

May be the vhost string is search , not /search , plus a backslash from of search


this error is happen from this method in AMQP:

void AMQP::parseHostPort(string hostPortString )

modify the following statement like this:
...
vhost.assign(hostPortString, pos2, hostPortString.size()-pos2);
...
to

vhost.assign(hostPortString, pos2+1, hostPortString.size()-pos2);

use pos2+1 to skip the backslash in front of vhost.

Messages not routed correctly

I have two queues with their own unique routing key and consumer tags. I have two threads, one consumes on Q1, the other consumes on Q2.

Messages are published to the right queue with the correct information, but the message gets consumed by either consumer instead of only the consumer who's consuming on that queue.

AMQPQueue::~AMQPQueue() is error

AMQPQueue::~AMQPQueue() {
this->closeChannel();
if (pmessage)
delete pmessage;
}
, but use interface Consume, line 451,456, pmessage is a temporary var, don't manage the life cycle of AMQPMessage, so that ::~AMQPQueue() , delete pmessage is error .
#if __cplusplus > 199711L // C++11 or greater
unique_ptr message ( new AMQPMessage(this) );
#else
auto_ptr message ( new AMQPMessage(this) );
#endif
pmessage = message.get();

packet loss!!!!

very surprising error: use AMQP *p = new AMQP() , publish will loss packet; but use AMQP obj , Publish mesg will not loss packet;

AMQP(con_str) need two '/' if vhost is default

when constructing AMQP using "guest:guesu@localhost:5672/", it would throw a exception.
AMQPException.cpp line 38 39 will get invalid data, cause an uncatched exception

So I think there should have an additional judgment in
void AMQP::parseHostPort(string hostPortString )
{
...
if (!vhost.size()) vhost.append("/");
}

`AMQPException::AMQPException( amqp_rpc_reply_t * res) {
if( res->reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
this->message = res->library_error ? strerror(res->library_error) : "end-of-stream";
}

if( res->reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
	char buf[512];
	memset(buf,0,512);
	this->code = 0;

	if(res->reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
		amqp_connection_close_t *m = (amqp_connection_close_t *) res->reply.decoded;
		this->code = m->reply_code;

		sprintf(buf, "server connection error %d, message: %.*s",
			m->reply_code,
			(int) m->reply_text.len,
			(char *) m->reply_text.bytes
		);
	} else if(res->reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
		amqp_channel_close_t *n = (amqp_channel_close_t *) res->reply.decoded;
		this->code = n->reply_code;

		sprintf(buf, "server channel error %d, message: %.*s class=%d method=%d",
			n->reply_code,
			(int) n->reply_text.len,
			(char *) n->reply_text.bytes,
			(int) n->class_id,
			n->method_id
		);
	} else {
		sprintf(buf, "unknown server error, method id 0x%08X", res->reply.id);
	}
	this->message=buf;
}

}`

Binary message read error

My message use msgPack to serialization and use snappy to compress , when i use amqpcpp to load it, the object AMQPMesage will no return the length of message, i only get the char* using getMessage() in AMQPMessage. So if the message is binary format , using amqp will not get the right message body.

My message is like this ,i use rabbit-c to dump it.

gxl@gxl-desktop:~/programe/Rabbitmq/rabbitmq-c-7cecf64752fd/examples$ ./amqp_listenq 192.168.166.62 5672 xunuu_build
Result 0
Frame type 1, channel 1
Method AMQP_BASIC_DELIVER_METHOD

Delivery 1, exchange search_engine routingkey build

00000000: F1 0D F0 4E 00 A4 55 55 : 69 64 01 00 A5 74 69 74 ...N..UUid...tit
00000010: 6C 65 A7 53 75 6D 6D 65 : 72 79 DA 06 D8 50 6F 73 le.Summery...Pos
00000020: 74 67 72 65 53 51 4C 20 : 70 72 69 64 65 73 20 69 tgreSQL prides i
00000030: 74 73 65 6C 66 20 69 6E : 20 73 74 61 6E 64 61 72 tself in standar
00000040: 64 73 20 63 6F 6D 70 6C : 69 61 6E 63 65 2E 20 49 ds compliance. I
00000050: 74 73 20 01 2F 30 69 6D : 70 6C 65 6D 65 6E 74 61 ts ./0implementa
00000060: 74 69 6F 01 2D 90 72 6F : 6E 67 6C 79 20 63 6F 6E tio.-.rongly con
00000070: 66 6F 72 6D 73 20 74 6F : 20 74 68 65 20 41 4E 53 forms to the ANS

Single TCP connection shared between multiple threads having different channels

Hi Akalend,

I have a query on using single tcp connection for multiple threads with different channels. Is it feasible with underlying amqp_connection_state_t, its socket which are not thread safe?

Basically, creating a manager thread to establish tcp connection with rmq. The amqp object created will be shared across multiple threads. These threads can createQueue, createExchange with different channels, but same amqp connection object.
Could you please share your views on the same.

Thanks,
Sheetal

Can't connect to vhost

guest:guest@localhost:5672 works
guest:guest@localhost:5672/ doesn't (it just hangs)
guest:guest@localhost:5672/testing (hangs too)

Single consumer with multiple queues

Requirement is to read messages from multiple queues binded on same exchange. What is the best way to implement it?
Is it possible to have single consumer (with one connection) and consume from multiple queues? or we need to have one connection per queue?

Make error: cannot convert ‘amqp_table_t’ to ‘amqp_boolean_t’ running on Centos

Hi,

I'm getting this error when trying to make amqpcpp on a 64 bit centos virtual machine.

make
g++ -Wall -I/usr/local/include -L/usr/local/lib -Iinclude/ -c -o src/AMQPExchange.o src/AMQPExchange.cpp
src/AMQPExchange.cpp: In member function ‘void AMQPExchange::sendDeclareCommand()’:
src/AMQPExchange.cpp:73: error: cannot convert ‘amqp_table_t’ to ‘amqp_boolean_t’ for argument ‘7’ to ‘amqp_exchange_declare_ok_t* amqp_exchange_declare(amqp_connection_state_t__, amqp_channel_t, amqp_bytes_t, amqp_bytes_t, amqp_boolean_t, amqp_boolean_t, amqp_boolean_t, amqp_boolean_t, amqp_table_t)’
make: *_* [src/AMQPExchange.o] Error 1

It works perfectly fine on my iMac.

I cannot seem to find the cause of the issue. any ideas?

Issue with sending/receiving binary messages

I have issues with sending/receiving binary messages.

In AMQPExchange::Publish(...) you can only provide message in std::string format. While this is ok for containing binary messages, the length is then calculated in a call to amqp_cstring_bytes(...) where it is taken from strlen(). This does not work for binary messages that contain null bytes.

I propose adding

void AMQPExchange::Publish(unsigned char *messageBytes, const size_t messageLength, const string &key)

to the API, allowing correct message lengths to be provided.

Also, when receiving binary messages, I did not get the correct message delivered.

It turns out that in AMQPMessage::setMessage(...) that is called from AMQPQueue::sendConsumeCommand(...) the incoming message is copied by using strdup(...). This does not work for binary messages that contain null bytes.

This should use malloc and memcpy to correctly copy all the memory.

cored in amqpcpp when reconnect

The core stack is:
#0 0x00000030068328e5 in raise () from /lib64/libc.so.6
#1 0x00000030068340c5 in abort () from /lib64/libc.so.6
#2 0x00000030068707f7 in __libc_message () from /lib64/libc.so.6
#3 0x0000003006876126 in malloc_printerr () from /lib64/libc.so.6
#4 0x00000030068765ad in malloc_consolidate () from /lib64/libc.so.6
#5 0x00000030068793c5 in _int_malloc () from /lib64/libc.so.6
#6 0x000000300687a951 in malloc () from /lib64/libc.so.6
#7 0x0000003702c0329b in amqp_tune_connection () from /lib64/librabbitmq.so.0
#8 0x0000003702c03350 in amqp_new_connection () from /lib64/librabbitmq.so.0
#9 0x00000000004f1816 in AMQP::sockConnect (this=0x7f9cd00008c0)

at .../src/amqpcpp/src/AMQP.cpp:193

#10 0x00000000004f16e0 in AMQP::connect (this=0x7f9cd00008c0)

at .../src/amqpcpp/src/AMQP.cpp:176

#11 0x00000000004f0833 in AMQP::AMQP (this=0x7f9cd00008c0, cnnStr=

"x\243......\230"...) at .../src/amqpcpp/src/AMQP.cpp:26

amqp_error,not_implemented,"immediate=true",'basic.publish'

When sending to a non default topic with routing key the message fails to appear on the broker and the connection/channel closes.

Looking at the logs, I see:

=ERROR REPORT====X-Feb-2017::XX:XX:X ===
Error on AMQP connection <0.4460.0> (xx.xx.xxx.x:47246 -> yy.yy.yyy.yy:5672, vhost: '/', user: 'xyz', state: running), channel 1:
{amqp_error,not_implemented,"immediate=true",'basic.publish'}

Looking online I see this was disabled from version 3.5.

I've looked at the code and can see the immediate flag cab be set/unset in the API (using ths short status parameter), however I can't seem to get it to work.

Any ideas on either a work around or fix?

Wrong parse connection string

parseCnnString("123123:akalend@localhost:5673/private") :

AMQP connection:
port = 5673
host = localhost
vhost = /private
user = akalend
passw = 123123

But i need to "vhost=private".
I think need pos2+1 when assigned to vhost.

Memory leak when running amp samples

I have built amqcpp on visual studio 2005 and 2010. Running the examples, e.g the publish example, reveals a memory leak. I tracked the cause down to amqpbase.cpp - the destructor should be declared as virtual, to ensure that the derived class's destructors are invoked when amqpbase objects are destroyed.

AMQPException will crash in function AMQP::login()

Currently the AMQP::login() is as below.
void AMQP::login() { amqp_rpc_reply_t res = amqp_login(cnn, vhost.c_str(), 0, FRAME_MAX, 0, AMQP_SASL_METHOD_PLAIN, user.c_str(), password.c_str()); if ( res.reply_type != AMQP_RESPONSE_NORMAL) { amqp_destroy_connection(cnn); throw AMQPException(&res); } }

I use latest rabbit-c library and it has crash issue if login fails.
The reason is that cache buffer will be cleared in function amqp_destroy_connection(cnn). So the pointer of res.reply.decoded is invalid while execute AMQPException(&res) and causes the following codes to crash.
if(res->reply.id == AMQP_CONNECTION_CLOSE_METHOD) { amqp_connection_close_t *m = (amqp_connection_close_t *) res->reply.decoded; this->code = m->reply_code; sprintf(buf, "server connection error %d, message: %.*s", m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes ); }

I modify the order of the above two lines and it works.
if ( res.reply_type != AMQP_RESPONSE_NORMAL) { auto amqp_exception = AMQPException(&res); amqp_destroy_connection(cnn); throw amqp_exception; }

ConnectionHandler::onError not called in case of invalid login (or "no any error info durring connecting")

When I entering invalid login data I do not have callback call ConnectionHandler::onError. Instead I only have closed socket with error ("The remote host closed the connection"). It's normal when service closes connection in case of error's. But I also need to have ConnectionHandler::onError for know what's exactly error happened during connection process to RabitMQ service. E.g. or this is invalid login, or some protocols error's, protocol versions mismatched etc. Since socket error e.g. "Connection closed by server side" is not enough to know what exactly happened.

But when I writting garbage to socket all works fine:

void MyRpcChannel::onData(AMQP::Connection *m_connection, const char *buffer, size_t size)
{
    m_socket->write("kjhaskjlhdsalkjdhaslkdhas", 10);
}

and I have my error message on ConnectionHandler::onError ("frame size exceeded").

So all is ok when connection established in case of error's (ConnectionHandler::onError is called) but no any error information during making connection (ConnectionHandler::onError is not called).

If you want to have ConnectionHandler::onError only if loggining succeeded and connection already established why you do not have some any callbacks for error processing during making connection process to RabbitMQ?

Is it bug or this feature not implemented? Thank you.

make error

g++ -Wall -I/usr/local/include -L/usr/local/lib -Iinclude/ -c -o src/AMQP.o src/AMQP.cpp
g++ -Wall -I/usr/local/include -L/usr/local/lib -Iinclude/ -c -o src/AMQPBase.o src/AMQPBase.cpp
g++ -Wall -I/usr/local/include -L/usr/local/lib -Iinclude/ -c -o src/AMQPException.o src/AMQPException.cpp
g++ -Wall -I/usr/local/include -L/usr/local/lib -Iinclude/ -c -o src/AMQPMessage.o src/AMQPMessage.cpp
src/AMQPMessage.cpp: In member function ‘void AMQPMessage::addHeader(std::string, uint64_t*)’:
src/AMQPMessage.cpp:109:31: warning: format ‘%llu’ expects argument of type ‘long long unsigned int’, but argument 3 has type ‘uint64_t {aka long unsigned int}’ [-Wformat]
g++ -Wall -I/usr/local/include -L/usr/local/lib -Iinclude/ -c -o src/AMQPExchange.o src/AMQPExchange.cpp
g++ -Wall -I/usr/local/include -L/usr/local/lib -Iinclude/ -c -o src/AMQPQueue.o src/AMQPQueue.cpp
src/AMQPQueue.cpp: In member function ‘void AMQPQueue::sendConsumeCommand()’:
src/AMQPQueue.cpp:386:27: warning: variable ‘consume_ok’ set but not used [-Wunused-but-set-variable]
ar rcs libamqpcpp.a src/AMQP.o src/AMQPBase.o src/AMQPException.o src/AMQPMessage.o src/AMQPExchange.o src/AMQPQueue.o
g++ -Wall -I/usr/local/include -L/usr/local/lib -Iinclude/ -o example_publish examples/example_publish.cpp libamqpcpp.a -lrabbitmq
libamqpcpp.a(AMQPQueue.o): In function AMQPQueue::sendConsumeCommand()': AMQPQueue.cpp:(.text+0x1d8f): undefined reference toamqp_empty_table'
AMQPQueue.cpp:(.text+0x1d9d): undefined reference to `amqp_empty_table'

rabbitmq_server-3.2.3
debian 7.4

amqpcpp lib stops to work after publish message to invalid exchange

amqpcpp lib stop to work after publish to invalid exchange


The code:

AMQP::Channel*    m_channel;

void RpcHandler::send_ack(const AMQP::Message &message, quint64 deliveryTag, const QByteArray& ba)
{
    AMQP::Envelope envelope(ba.data());
    envelope.setCorrelationID(message.correlationID());


    //    if (!m_channel->publish("", message.replyTo(), envelope))
    //        LDEBUG() << "qqqqqqqqqqq";

    // after this string library stop to work
    //     if coment this string and uncomment previous publish
    //    with correct exchange all is work (ack, receiving new messages etc.)
    //
    if (!m_channel->publish("ajhsakjdhasdkjhas", message.replyTo(), envelope))
        LDEBUG() << "qqqqqqqqqqq";

    if (!m_channel->ack(deliveryTag))
        LDEBUG() << "sssssss1";
}

After sending message to invalid exchange all stop to work:

  1. When I sends ack of my previous message, server does not receives it like I did not send ack. After exit from amqpcpp callback, message status on server changes from unacked to ready status, like I did not send ack.
    Also publish with invalid exchange return true, ack also returns true
  2. No new messages does not comes to my all consumed queus. Also if I send messages to other queues. Messages are pushes to RabbitMQ server and does not comes to my service anymore
  3. No any error reporting/conection closing on AMQP::ConnectionHandler::onError
  4. No any errors from ack or invalid publish

I have only error message on rabbitMq server (/var/log/rabbitmq/[email protected])

=ERROR REPORT==== 7-Aug-2015::17:46:25 ===
connection <0.2479.0>, channel 1 - soft error:
{amqp_error,not_found,"no exchange 'asasas' in vhost '/'",'basic.publish'}

No any errors about my ack.

Could you halp me? Is it bug? I do not want to stop working library, I want to have any errors as minimum e.g. ConnectionHandler::onError if everething stopped or return false from publish without stopping working library or any way to detect this situation in code (only from RabitMQ log file in current moment). Just restarting of my service is helping (I did not try to close and reopen connection). Would be great to have false from publish call and all continue to work.

amqpcpp version which I uses I did not find in source code or headers files or readme. So I do not know my current version of library(((

Thank you.

problems with running example_consume

when i running the examples,example_get and example_publish is ok.
but when i run the example_consume ,i found when the program execute ,there may be some problems at
amqp_rpc_reply_t res = amqp_simple_rpc(*cnn, channelNum, AMQP_BASIC_CONSUME_METHOD, replies, &s):AT AMQPQueue::sendConsumeCommand: At AMQPQueue.cpp.
it seems that the program wait here and not execute any more.
minwhile,these examples are too old, MessageQueue:getMessage(uint_32 i) has been updated.

ps: when i run valgrind .

==3720== Invalid read of size 1
==3720== at 0x4A07110: memcpy (mc_replace_strmem.c:406)
==3720== by 0x4C31EEB: amqp_encode_method (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C35A30: amqp_send_frame (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C35E8D: amqp_send_method (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C3609F: amqp_simple_rpc (in /usr/local/lib/librabbitmq.so)
==3720== by 0x40C14B: AMQPQueue::sendConsumeCommand() (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x40CC76: AMQPQueue::Consume(short) (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x402080: main (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== Address 0x4EFFA88 is 24 bytes inside a block of size 32 free'd
==3720== at 0x4A05130: operator delete(void_) (vg_replace_malloc.c:244)
==3720== by 0x39A709DB69: std::string::~string() (in /usr/lib64/libstdc++.so.6.0.8)
==3720== by 0x401FCF: main (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720==
==3720== Invalid read of size 1
==3720== at 0x4A0711B: memcpy (mc_replace_strmem.c:406)
==3720== by 0x4C31EEB: amqp_encode_method (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C35A30: amqp_send_frame (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C35E8D: amqp_send_method (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C3609F: amqp_simple_rpc (in /usr/local/lib/librabbitmq.so)
==3720== by 0x40C14B: AMQPQueue::sendConsumeCommand() (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x40CC76: AMQPQueue::Consume(short) (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x402080: main (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== Address 0x4EFFA89 is 25 bytes inside a block of size 32 free'd
==3720== at 0x4A05130: operator delete(void_) (vg_replace_malloc.c:244)
==3720== by 0x39A709DB69: std::string::~string() (in /usr/lib64/libstdc++.so.6.0.8)
==3720== by 0x401FCF: main (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720==
==3720== Invalid read of size 1
==3720== at 0x4A07124: memcpy (mc_replace_strmem.c:406)
==3720== by 0x4C31EEB: amqp_encode_method (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C35A30: amqp_send_frame (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C35E8D: amqp_send_method (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C3609F: amqp_simple_rpc (in /usr/local/lib/librabbitmq.so)
==3720== by 0x40C14B: AMQPQueue::sendConsumeCommand() (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x40CC76: AMQPQueue::Consume(short) (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x402080: main (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== Address 0x4EFFA8A is 26 bytes inside a block of size 32 free'd
==3720== at 0x4A05130: operator delete(void_) (vg_replace_malloc.c:244)
==3720== by 0x39A709DB69: std::string::~string() (in /usr/lib64/libstdc++.so.6.0.8)
==3720== by 0x401FCF: main (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720==
==3720== Invalid read of size 1
==3720== at 0x4A0712D: memcpy (mc_replace_strmem.c:406)
==3720== by 0x4C31EEB: amqp_encode_method (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C35A30: amqp_send_frame (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C35E8D: amqp_send_method (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C3609F: amqp_simple_rpc (in /usr/local/lib/librabbitmq.so)
==3720== by 0x40C14B: AMQPQueue::sendConsumeCommand() (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x40CC76: AMQPQueue::Consume(short) (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x402080: main (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== Address 0x4EFFA8B is 27 bytes inside a block of size 32 free'd
==3720== at 0x4A05130: operator delete(void_) (vg_replace_malloc.c:244)
==3720== by 0x39A709DB69: std::string::~string() (in /usr/lib64/libstdc++.so.6.0.8)
==3720== by 0x401FCF: main (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720==
==3720== Invalid read of size 1
==3720== at 0x4A07154: memcpy (mc_replace_strmem.c:406)
==3720== by 0x4C31EEB: amqp_encode_method (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C35A30: amqp_send_frame (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C35E8D: amqp_send_method (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C3609F: amqp_simple_rpc (in /usr/local/lib/librabbitmq.so)
==3720== by 0x40C14B: AMQPQueue::sendConsumeCommand() (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x40CC76: AMQPQueue::Consume(short) (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x402080: main (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== Address 0x4EFFA8C is 28 bytes inside a block of size 32 free'd
==3720== at 0x4A05130: operator delete(void_) (vg_replace_malloc.c:244)
==3720== by 0x39A709DB69: std::string::~string() (in /usr/lib64/libstdc++.so.6.0.8)
==3720== by 0x401FCF: main (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720==
==3720== Invalid read of size 1
==3720== at 0x4A07165: memcpy (mc_replace_strmem.c:406)
==3720== by 0x4C31EEB: amqp_encode_method (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C35A30: amqp_send_frame (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C35E8D: amqp_send_method (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C3609F: amqp_simple_rpc (in /usr/local/lib/librabbitmq.so)
==3720== by 0x40C14B: AMQPQueue::sendConsumeCommand() (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x40CC76: AMQPQueue::Consume(short) (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x402080: main (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== Address 0x4EFFA8E is 30 bytes inside a block of size 32 free'd
==3720== at 0x4A05130: operator delete(void_) (vg_replace_malloc.c:244)
==3720== by 0x39A709DB69: std::string::~string() (in /usr/lib64/libstdc++.so.6.0.8)
==3720== by 0x401FCF: main (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720==
==3720== Conditional jump or move depends on uninitialised value(s)
==3720== at 0x4C36B05: amqp_encode_table (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C31D6B: amqp_encode_method (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C35A30: amqp_send_frame (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C35E8D: amqp_send_method (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C3609F: amqp_simple_rpc (in /usr/local/lib/librabbitmq.so)
==3720== by 0x40C14B: AMQPQueue::sendConsumeCommand() (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x40CC76: AMQPQueue::Consume(short) (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x402080: main (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720==
==3720== Syscall param socketcall.sendto(msg) points to uninitialised byte(s)
==3720== at 0x39A0CD4875: send (in /lib64/libc-2.5.so)
==3720== by 0x4C35969: amqp_send_frame (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C35E8D: amqp_send_method (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C3609F: amqp_simple_rpc (in /usr/local/lib/librabbitmq.so)
==3720== by 0x40C14B: AMQPQueue::sendConsumeCommand() (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x40CC76: AMQPQueue::Consume(short) (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x402080: main (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== Address 0x4E9F800 is 24 bytes inside a block of size 131,072 alloc'd
==3720== at 0x4A0590B: realloc (vg_replace_malloc.c:306)
==3720== by 0x4C353B6: amqp_tune_connection (in /usr/local/lib/librabbitmq.so)
==3720== by 0x4C3694C: amqp_login (in /usr/local/lib/librabbitmq.so)
==3720== by 0x402B04: AMQP::login() (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x402EDB: AMQP::connect() (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x402F5C: AMQP::AMQP() (in /root/akalend-amqpcpp-30a2539/example_consume)
==3720== by 0x401D99: main (in /root/akalend-amqpcpp-30a2539/example_consume)

class "amqp_bytes_t_" has no member "c_str"

Hi there, I am posting here cuz whenever I try to compile using make I get this error on the title, from what I see, it happens due to a mismatch between the AMQPQueue class definition and its implementation. In the class definition there is a private struct named "amqp_bytes_t_" which is used in some functions, whereas, in the implementation, they are using std::string. This error is repeated multiple times and it goes back and forth as there are some functions in which this implicit conversion is not possible.

What to do with that?

Memory Leaks

I found that there were some memory leaks associated with creating AMQPExchange and AMQPQueue objects using AMQP::createExchange()/createQueue(). The leak has to do with the way the classes are declared in amqpcpp.h:

When createExchange() or createQueue() is called, the object that's created is cast to AMQPBase* using a reinterpret_cast and put on the AMQPConenction's "channels" vector. Since the destructor for AMQPBase is not virtual, the destructors for AMQPExchange and AMQPQueue are not called, thus all the memory that was allocated for them is not released.

The fix:

  • Change AMQPBase's destructor to be virtual
  • Change AMQPQueue's destructor to be virtual (not necessary, but good practice)
  • Create a virtual destructor for AMQPExchange (again, not necessary, but good practice)
  • Change the "reinterpret_cast" to "dynamic_cast" in AMQP::createExchange()/createQueue()

Memory leak in AMQPExchange::Publish

There is a memory leak in method Publish(). Memory allocated by amqp_bytes_malloc function is never freed.
void AMQPExchange::Publish(const char * data, uint32_t length, string key) {
amqp_bytes_t messageByte = amqp_bytes_malloc(length);
memcpy(messageByte.bytes,data,length);
sendPublishCommand(messageByte, key.c_str());
}

Three possible solutions:

  1. Add dealocation to the and of Publish method
  2. Use memory on stack
  3. Do not allocate memory at all. This is the best way from my point. If someone want make copy of memory, he can wrap this method.

New implementation without allocation. There is one change data is not marked as const now because "amqp_basic_publish" function body parameter is not marked const.

void AMQPExchange::Publish(char * data, uint32_t length, string key) {
amqp_bytes_t messageByte;
messageByte.bytes = data;
messageByte.len = length;
sendPublishCommand(messageByte, key.c_str());
}

Api usage for predefined Queue and Exchange

Hi,

In rabbitmq server i have predefined Exchange and Queue. Do we need to call createExchange() and createQueue() to get exchange and queue pointers? Currently i am using createExchange() and createQueue() for the predefined exchange and queues. Does it overwrite the existing exchange and queue? If my usage is wrong then what is correct way of getting exchange and queue pointer for predefined exchange and queues.[Please note as per requirement queues are durable.]

duplicate call amqp_channel_close, it is not necessary

an AMQP can manager several Exchanges Or QUEUEs.
every of them have their own channel, right?

if I call delete AMQP* directly,
in the Destructor
channels.size() is not zero, but AMQPBase* in the vector is un-reachable.
the delete *i would throw;

So I suggest call AMQP::closeChannel() in its Destructor.

AMQPBase::~AMQPBase() {
this->closeChannel();
}

AMQP::~AMQP() {
if (channels.size()) {
vector<AMQPBase*>::iterator i;
for (i=channels.begin(); i!=channels.end();) {
delete *i;
}
}

amqp_channel_close(cnn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(cnn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(cnn);

};

void AMQPQueue::addEvent( AMQPEvents_e eventType, int (*event)(AMQPMessage*)) only accepts static functions

IS there any possible way to pass a member function as a second argument? I have a class that contains AMQP, AMQP within their fields, as well as other member functions, but I want to benefit the value one of them by using the addEvent funtionality of the AQMPQueue.

Unfortunately, it only accepts static functions so the argument Im required to pass cannot be any of my class members. Any idea to solve that?

30 seconds waiting

Hi iam new in RabbitMQ, iam testing the example but in my case when i call ex->Publish() , it wait 30 sec to exit. It is correct?, How can i eliminate this wait?
Tanks
Dibet

AMQP amqp("localhost:5672");
AMQPExchange *ex = amqp.createExchange("e");
ex->Declare("e", "fanout");
AMQPQueue *qu2 = amqp.createQueue("detectionXY");
//qu2->Declare("/",true);
qu2->Bind("e" , "");
//qu2->setParam(AMQP_DELIVERY_NONPERSISTENT);
ex->setHeader("Delivery-mode", 2);
ex->setHeader("Content-type", "text/text");
ex->setHeader("Content-encoding", "UTF-8");
ex->Publish(ss , "");

loss message

when i init a connection and send a message is success, but wait may be 1h, then send to a message return ok, but rabbitmq server not received that message. retry will return error. how can i check the connection whether ok?

why open a new channel for every created queue

You open a channel for every queue, why have to do that?
It seems not consistent with amqp semantics, and may have low efficiency.
I don't get it, could you give me some detail explanation, thanks.

addEvent callbacks only take a C style function pointers and prevent/hinder a more OO approach

Currently the event callbacks are all assumed to be C style function pointers.

What is the appetite for changing this to be something like std::function<int(AMQPMessage*)> and so allow pointers to C++ member functions etc or a lambda. This will allow a more OO approach and facilitate some established design patterns (MessageListener etc. from Java) to be employed or enable access to member variables for the message processing etc..

AMQPMessage::setMessage error

Hi!
Have a simple error while processed Message body (AMQPMessage::setMessage).
At start & if length this->data > length you have undefined stay \0 in the this->data (corrupt body).

Instead :
line 30:
this->data = (char*)malloc(length);
memcpy(this->data,data,length);

I offer:
this->data = (char*)malloc(length+1);
this->data[length]='\0';
if (!this->data) {
throw AMQPException("cannot alocate memory for data");
}
strncpy(this->data,data,length);

wbr,
buy.

method AMQPQueue::Declare does not take care of params

--- a/src/AMQPQueue.cpp
+++ b/src/AMQPQueue.cpp
@@ -39,7 +39,7 @@ AMQPQueue::~AMQPQueue() {

// Declare command /* 50, 10; 3276810 */
void AMQPQueue::Declare() {

  •   parms=0;
    
  • //parms=0;
    sendDeclareCommand();
    }

Is this a bug? Without commenting the line I'm not able to declare durable queues.

Thanks

addEvent with member function

Hi I'm trying to parse the message within a member function instead a static function in order to operate in the object scope.

I only got erros on my tries.

With member function:
error: conversion from ‘int (Foo::)(AMQPMessage)’ to non-scalar type ‘std::function<int(AMQPMessage*)>’ requested
std::function<int(AMQPMessage* ) > test = &Foo::onMessage;

With lambda capture I had this error:

error: no matching function for call to ‘AMQPQueue::addEvent(AMQPEvents_e, Foo::exec()::<lambda(AMQPMessage*)>&)’
qu2->addEvent( AMQP_MESSAGE, onMessage );

no known conversion for argument 2 from ‘Foo::exec()::<lambda(AMQPMessage*)>’ to ‘int ()(AMQPMessage)’
/usr/local/include/AMQPcpp.h:220:22: note: void AMQPQueue::addEvent(AMQPEvents_e, std::function<int(AMQPMessage*)>&)
void addEvent( AMQPEvents_e eventType, std::function<int(AMQPMessage*)>& event );

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.