Giter VIP home page Giter VIP logo

carrot's Introduction

carrot - AMQP Messaging Framework for Python

Version: 0.10.7

Status

Carrot is discontinued in favor of the new Kombu framework.

  • Kombu is ready, start to use it now!
  • Kombu comes with a Carrot compatible API, so it's easy to port your software.
  • Carrot will not be actively maintained, only critical bugs will be fixed.

Kombu links:

** ORIGINAL CARROT README CONTINUES BELOW **

Introduction

carrot is an AMQP messaging queue framework. AMQP is the Advanced Message Queuing Protocol, an open standard protocol for message orientation, queuing, routing, reliability and security.

The aim of carrot is to make messaging in Python as easy as possible by providing a high-level interface for producing and consuming messages. At the same time it is a goal to re-use what is already available as much as possible.

carrot has pluggable messaging back-ends, so it is possible to support several messaging systems. Currently, there is support for AMQP (py-amqplib, pika), STOMP (python-stomp). There's also an in-memory backend for testing purposes, using the Python queue module.

Several AMQP message broker implementations exists, including RabbitMQ, ZeroMQ and Apache ActiveMQ. You'll need to have one of these installed, personally we've been using RabbitMQ.

Before you start playing with carrot, you should probably read up on AMQP, and you could start with the excellent article about using RabbitMQ under Python, Rabbits and warrens. For more detailed information, you can refer to the Wikipedia article about AMQP.

Documentation

Carrot is using Sphinx, and the latest documentation is available at GitHub:

http://github.com/ask/carrot/

Installation

You can install carrot either via the Python Package Index (PyPI) or from source.

To install using pip,:

$ pip install carrot

To install using easy_install,:

$ easy_install carrot

If you have downloaded a source tarball you can install it by doing the following,:

$ python setup.py build
# python setup.py install # as root

Terminology

There are some concepts you should be familiar with before starting:

  • Publishers

    Publishers sends messages to an exchange.

  • Exchanges

    Messages are sent to exchanges. Exchanges are named and can be configured to use one of several routing algorithms. The exchange routes the messages to consumers by matching the routing key in the message with the routing key the consumer provides when binding to the exchange.

  • Consumers

    Consumers declares a queue, binds it to a exchange and receives messages from it.

  • Queues

    Queues receive messages sent to exchanges. The queues are declared by consumers.

  • Routing keys

    Every message has a routing key. The interpretation of the routing key depends on the exchange type. There are four default exchange types defined by the AMQP standard, and vendors can define custom types (so see your vendors manual for details).

    These are the default exchange types defined by AMQP/0.8:

    • Direct exchange

      Matches if the routing key property of the message and the routing_key attribute of the consumer are identical.

    • Fan-out exchange

      Always matches, even if the binding does not have a routing key.

    • Topic exchange

      Matches the routing key property of the message by a primitive pattern matching scheme. The message routing key then consists of words separated by dots (".", like domain names), and two special characters are available; star ("*") and hash ("#"). The star matches any word, and the hash matches zero or more words. For example "*.stock.#" matches the routing keys "usd.stock" and "eur.stock.db" but not "stock.nasdaq".

Examples

Creating a connection

You can set up a connection by creating an instance of carrot.messaging.BrokerConnection, with the appropriate options for your broker:

>>> from carrot.connection import BrokerConnection
>>> conn = BrokerConnection(hostname="localhost", port=5672,
...                           userid="guest", password="guest",
...                           virtual_host="/")

If you're using Django you can use the carrot.connection.DjangoBrokerConnection class instead, which loads the connection settings from your settings.py:

BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "guest"
BROKER_PASSWORD = "guest"
BROKER_VHOST = "/"

Then create a connection by doing:

>>> from carrot.connection import DjangoBrokerConnection
>>> conn = DjangoBrokerConnection()

Receiving messages using a Consumer

First we open up a Python shell and start a message consumer.

This consumer declares a queue named "feed", receiving messages with the routing key "importer" from the "feed" exchange.

The example then uses the consumers wait() method to go into consume mode, where it continuously polls the queue for new messages, and when a message is received it passes the message to all registered callbacks.

>>> from carrot.messaging import Consumer
>>> consumer = Consumer(connection=conn, queue="feed",
...                     exchange="feed", routing_key="importer")
>>> def import_feed_callback(message_data, message):
...     feed_url = message_data["import_feed"]
...     print("Got feed import message for: %s" % feed_url)
...     # something importing this feed url
...     # import_feed(feed_url)
...     message.ack()
>>> consumer.register_callback(import_feed_callback)
>>> consumer.wait() # Go into the consumer loop.

Sending messages using a Publisher

Then we open up another Python shell to send some messages to the consumer defined in the last section.

>>> from carrot.messaging import Publisher
>>> publisher = Publisher(connection=conn,
...                       exchange="feed", routing_key="importer")
>>> publisher.send({"import_feed": "http://cnn.com/rss/edition.rss"})
>>> publisher.close()

Look in the first Python shell again (where consumer.wait() is running), where the following text has been printed to the screen:

Got feed import message for: http://cnn.com/rss/edition.rss

Serialization of Data

By default every message is encoded using JSON, so sending Python data structures like dictionaries and lists works. YAML, msgpack and Python's built-in pickle module is also supported, and if needed you can register any custom serialization scheme you want to use.

Each option has its advantages and disadvantages.

json -- JSON is supported in many programming languages, is now

a standard part of Python (since 2.6), and is fairly fast to decode using the modern Python libraries such as cjson or ``simplejson.

The primary disadvantage to JSON is that it limits you to the following data types: strings, unicode, floats, boolean, dictionaries, and lists. Decimals and dates are notably missing.

Also, binary data will be transferred using base64 encoding, which will cause the transferred data to be around 34% larger than an encoding which supports native binary types.

However, if your data fits inside the above constraints and you need cross-language support, the default setting of JSON is probably your best choice.

pickle -- If you have no desire to support any language other than
Python, then using the pickle encoding will gain you the support of all built-in Python data types (except class instances), smaller messages when sending binary files, and a slight speedup over JSON processing.
yaml -- YAML has many of the same characteristics as json,

except that it natively supports more data types (including dates, recursive references, etc.)

However, the Python libraries for YAML are a good bit slower than the libraries for JSON.

If you need a more expressive set of data types and need to maintain cross-language compatibility, then YAML may be a better fit than the above.

To instruct carrot to use an alternate serialization method, use one of the following options.

  1. Set the serialization option on a per-Publisher basis:

    >>> from carrot.messaging import Publisher
    >>> publisher = Publisher(connection=conn,
    ...                       exchange="feed", routing_key="importer",
    ...                       serializer="yaml")
  2. Set the serialization option on a per-call basis

    >>> from carrot.messaging import Publisher
    >>> publisher = Publisher(connection=conn,
    ...                       exchange="feed", routing_key="importer")
    >>> publisher.send({"import_feed": "http://cnn.com/rss/edition.rss"},
    ...                serializer="pickle")
    >>> publisher.close()

Note that Consumer``s do not need the serialization method specified in their code. They can auto-detect the serialization method since we supply the ``Content-type header as part of the AMQP message.

Sending raw data without Serialization

In some cases, you don't need your message data to be serialized. If you pass in a plain string or unicode object as your message, then carrot will not waste cycles serializing/deserializing the data.

You can optionally specify a content_type and content_encoding for the raw data:

>>> from carrot.messaging import Publisher
>>> publisher = Publisher(connection=conn,
...                       exchange="feed",
                          routing_key="import_pictures")
>>> publisher.send(open('~/my_picture.jpg','rb').read(),
                   content_type="image/jpeg",
                   content_encoding="binary")
>>> publisher.close()

The message object returned by the Consumer class will have a content_type and content_encoding attribute.

Receiving messages without a callback

You can also poll the queue manually, by using the fetch method. This method returns a Message object, from where you can get the message body, de-serialize the body to get the data, acknowledge, reject or re-queue the message.

>>> consumer = Consumer(connection=conn, queue="feed",
...                     exchange="feed", routing_key="importer")
>>> message = consumer.fetch()
>>> if message:
...    message_data = message.payload
...    message.ack()
... else:
...     # No messages waiting on the queue.
>>> consumer.close()

Sub-classing the messaging classes

The Consumer, and Publisher classes can also be sub classed. Thus you can define the above publisher and consumer like so:

>>> from carrot.messaging import Publisher, Consumer
>>> class FeedPublisher(Publisher):
...     exchange = "feed"
...     routing_key = "importer"
...
...     def import_feed(self, feed_url):
...         return self.send({"action": "import_feed",
...                           "feed_url": feed_url})
>>> class FeedConsumer(Consumer):
...     queue = "feed"
...     exchange = "feed"
...     routing_key = "importer"
...
...     def receive(self, message_data, message):
...         action = message_data["action"]
...         if action == "import_feed":
...             # something importing this feed
...             # import_feed(message_data["feed_url"])
                message.ack()
...         else:
...             raise Exception("Unknown action: %s" % action)
>>> publisher = FeedPublisher(connection=conn)
>>> publisher.import_feed("http://cnn.com/rss/edition.rss")
>>> publisher.close()
>>> consumer = FeedConsumer(connection=conn)
>>> consumer.wait() # Go into the consumer loop.

Getting Help

Mailing list

Join the carrot-users mailing list.

Bug tracker

If you have any suggestions, bug reports or annoyances please report them to our issue tracker at http://github.com/ask/carrot/issues/

Contributing

Development of carrot happens at Github: http://github.com/ask/carrot

You are highly encouraged to participate in the development. If you don't like Github (for some reason) you're welcome to send regular patches.

License

This software is licensed under the New BSD License. See the LICENSE file in the top distribution directory for the full license text.

carrot's People

Contributors

andrewwatts avatar ask avatar istruble avatar jbalogh avatar legneato avatar nvie avatar nyren avatar p2k avatar pmac avatar runeh avatar stevvooe avatar stovenator avatar tswicegood avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

carrot's Issues

Do you need django to generate the docs?

Hi there.
I don't know sphinx at all and I hope you can help me generate docs for carrot.
Do I need to be in a django project?

When I run: sphinx-build ~/git/carrot/docs /opt/docs/carrot

It complains:

File "/Users/rich/git/carrot/docs/conf.py", line 25, in
from django.conf import settings
ImportError: No module named django.conf

many thanks!!!!!!

qos is not subclassed in amqp backend

Amqp backend needs to implement qos method. Work around is the following:

Consumer.backend.channel.basic_qos(0, 1, False)

Monkey Patching fixes the problem as well ;)

global name 'django_settings' is not defined

Hello, I've started testing celery, carrot and ghettoq using a database backend and when starting celeryd, receive the error below:

I've also pushed a fix to andrewwatts/carrot@c7ebcf9

Traceback (most recent call last):
  File "./manage.py", line 18, in <module>
    execute_manager(settings)
  File "/path/to/virtualenv/lib/python2.6/site-packages/django/core/management/__init__.py", line 362, in execute_manager
    utility.execute()
  File "/path/to/virtualenv/lib/python2.6/site-packages/django/core/management/__init__.py", line 303, in execute
    self.fetch_command(subcommand).run_from_argv(self.argv)
  File "/path/to/virtualenv/lib/python2.6/site-packages/django/core/management/base.py", line 195, in run_from_argv
    self.execute(*args, **options.__dict__)
  File "/path/to/virtualenv/lib/python2.6/site-packages/django/core/management/base.py", line 222, in execute
    output = self.handle(*args, **options)
  File "/path/to/virtualenv/lib/python2.6/site-packages/celery/management/commands/celeryd.py", line 18, in handle
    run_worker(**options)
  File "/path/to/virtualenv/lib/python2.6/site-packages/celery/bin/celeryd.py", line 153, in run_worker
    "conninfo": info.format_broker_info(),
  File "/path/to/virtualenv/lib/python2.6/site-packages/celery/utils/info.py", line 45, in format_broker_info
    broker_connection = establish_connection()
  File "/path/to/virtualenv/lib/python2.6/site-packages/celery/messaging.py", line 122, in establish_connection
    return DjangoBrokerConnection(connect_timeout=connect_timeout)
  File "/path/to/virtualenv/src/carrot/carrot/connection.py", line 227, in __init__
    kwargs = dict(get_django_conninfo(settings), **kwargs)
  File "/path/to/virtualenv/src/carrot/carrot/connection.py", line 190, in get_django_conninfo
    if hasattr(django_settings, "AMQP_SERVER"):
NameError: global name 'django_settings' is not defined

test_consumerset_iterconsume fails

Running the tests results in:

FAIL: test_consumerset_iterconsume (tests.test_pyamqplib.TestAMQPlibMessaging)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/tmp/buildd/python-carrot-0.10.5/tests/backend.py", line 465, in test_consumerset_iterconsume
    assertDataIs({"rkey": "foo.baz"})
  File "/tmp/buildd/python-carrot-0.10.5/tests/backend.py", line 446, in assertDataIs
    self.assertEquals(scratchpad.get("data"), what)
AssertionError: {'rkey': 'foo.bam'} != {'rkey': 'foo.baz'}
>>  raise self.failureException, \
          (None or '%r != %r' % ({'rkey': 'foo.bam'}, {'rkey': 'foo.baz'}))

-------------------- >> begin captured logging << --------------------
amqplib: DEBUG: Start from server, version: 8.0, properties: {u'platform': 'Erlang/OTP', u'product': 'RabbitMQ', u'version': '1.7.2', u'copyright': 'Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.', u'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: ['en_US']
amqplib: DEBUG: Open OK! known_hosts [xxxxxxx:5672]
amqplib: DEBUG: using channel_id: 1
amqplib: DEBUG: Channel open
amqplib: DEBUG: using channel_id: 2
amqplib: DEBUG: Channel open
amqplib: DEBUG: Closed channel #1
amqplib: DEBUG: Closed channel #2
--------------------- >> end captured logging << ---------------------

New API

The current Consumer/Publisher classes hides the channels, I think we should rethink the API altogether.

Braindump:

from carrot import Connection, Consumer, Producer

connection = Connection(host="localhost", user="guest", "password="guest", vhost="/")

channel = connection.Channel()
consumer = Consumer(channel, queue="celery", exchange="celery", routing_key="celery")
consumer.register_callback(process_task)

while True:
    connection.drain_events()
    # or channel.drain_events() will only receive events on that channel.

producer = Producer(channel, exchange="celery")
producer.publish({"task": "celery.ping", args=[], kwargs={}}, serializer="pickle")

A ConsumerSet is now simply using the same channel with more than one consumer:

channel = connection.Channel()
c1 = Consumer(channel, queue="foo")
c2 = Consumer(channel, queue="bar")
c3 = Consumer(channel, queue="baz")

while True:
    connection.drain_events()

0.4.1 breaks easy_install

jason@jason-mac:~$ sudo easy_install carrot
Searching for carrot
Reading http://pypi.python.org/simple/carrot/
Reading http://github.com/ask/carrot/
Best match: carrot 0.4.1
Downloading http://pypi.python.org/packages/source/c/carrot/carrot-0.4.1.tar.gz#md5=f512b78a24868e837adf5afd3cebdc7d
Processing carrot-0.4.1.tar.gz
Running carrot-0.4.1/setup.py -q bdist_egg --dist-dir /tmp/easy_install-xhJDTo/carrot-0.4.1/egg-dist-tmp-fzwd9g
error: README: No such file or directory

Creating AMQPConnection hangs in busy loop

When I try to do this straight from the introduction:

from carrot.connection import AMQPConnection
amqpconn = AMQPConnection(hostname="localhost", port=5672, userid="test", password="test", vhost="test")

the whole thing ends up in a busy loop:

File "", line 1, in
File "/Library/Python/2.5/site-packages/carrot-0.5.1-py2.5.egg/carrot/connection.py", line 92, in init self.connect()
File "/Library/Python/2.5/site-packages/carrot-0.5.1-py2.5.egg/carrot/connection.py", line 110, in connect connect_timeout=self.connect_timeout)
File "build/bdist.macosx-10.5-i386/egg/amqplib/client_0_8/connection.py", line 143, in init
File "build/bdist.macosx-10.5-i386/egg/amqplib/client_0_8/connection.py", line 477, in _x_open
File "build/bdist.macosx-10.5-i386/egg/amqplib/client_0_8/abstract_channel.py", line 64, in wait
File "build/bdist.macosx-10.5-i386/egg/amqplib/client_0_8/connection.py", line 201, in _wait_method
File "build/bdist.macosx-10.5-i386/egg/amqplib/client_0_8/method_framing.py", line 212, in read_method
File "build/bdist.macosx-10.5-i386/egg/amqplib/client_0_8/method_framing.py", line 127, in _next_method
File "build/bdist.macosx-10.5-i386/egg/amqplib/client_0_8/transport.py", line 105, in read_frame
File "build/bdist.macosx-10.5-i386/egg/amqplib/client_0_8/transport.py", line 191, in _read

It won't time out or anything. Am I doing something wrong or missing something obvious?

Tried on OS X 10.5.7, against RabbitMQ 1.6.0 which otherwise seems to be working just fine. This happens both with Python 2.6.2 (from Macports) and Python 2.5.1 (from Apple)

carrot.utils - cannot import _uuid_generate_random - python 2.7.11

Line 1 of carrot/utils.py imports _uuid_generate_random which no longer exists in python 2.7.11. I am running python 2.7.11 on Windows 10 currently.

Possible code to fix.
try:
from uuid import _uuid_generate_random
except ImportError:
_uuid_generate_random = None
... import other stuff

carrot.messaging.establish_connection requires DjangoBrokerConnection

along the same lines as the issue I reported earlier in celery, establish_connection uses DjangoBrokerConnection so if in celery you write a custom loader for the purpose of setting your configuration from some source other than django.conf, it will be ignored in the establish_connection method and will assume all the django settings configuration machinery has been stood up.

not sure what the right fix would be, maybe an optional param for the BrokerConnection defaulted to DjangoBrokerConnection ? just thinking out loud

AttributeError: content_encoding

carrot failed with AttributeError: content_encoding when receiving message without Content-Encoding header. Such message comes from rabbitmq via 'amq.rabbitmq.log' exchange.
Wireshark show only delivery_mode header in published message.

When using ampqlib alone I can consume the message like a charm.

Traceback:

Traceback (most recent call last):
  File "./carrot-log-reader.py", line 33, in <module>
    log_consumer.wait()
  File "/usr/local/lib/python2.6/dist-packages/carrot-0.5.0-py2.6.egg/carrot/messaging.py", line 423, in wait
    it.next()
  File "/usr/local/lib/python2.6/dist-packages/carrot-0.5.0-py2.6.egg/carrot/backends/pyamqplib.py", line 164, in consume
    self.channel.wait()
  File "build/bdist.linux-i686/egg/amqplib/client_0_8/abstract_channel.py", line 82, in wait
  File "build/bdist.linux-i686/egg/amqplib/client_0_8/channel.py", line 1978, in _basic_deliver
  File "/usr/local/lib/python2.6/dist-packages/carrot-0.5.0-py2.6.egg/carrot/messaging.py", line 269, in _receive_callback
    message = self.backend.message_to_python(raw_message)
  File "/usr/local/lib/python2.6/dist-packages/carrot-0.5.0-py2.6.egg/carrot/backends/pyamqplib.py", line 135, in message_to_python
    return Message(backend=self, amqp_message=raw_message)
  File "/usr/local/lib/python2.6/dist-packages/carrot-0.5.0-py2.6.egg/carrot/backends/pyamqplib.py", line 68, in __init__
    "content_encoding": amqp_message.content_encoding})
  File "build/bdist.linux-i686/egg/amqplib/client_0_8/serialization.py", line 454, in __getattr__
AttributeError: content_encoding

Consumer code:

from carrot.connection import AMQPConnection
from carrot.messaging import Consumer
import time
import uuid

def log_handler( message_data, message ):
    print '''%s [%s] %s''' % ( time.ctime(), message_data.get( 'routing_key' ), message )
    message.ack()

if __name__ == '__main__':

    amqpconn = AMQPConnection(
        hostname = 'dendron.fdi',
        userid = 'guest',
        password = 'guest', 
        vhost = '/'
    )

    log_consumer = Consumer(
        connection = amqpconn,
        exclusive = True,
        exchange = 'amq.rabbitmq.log',
        exchange_type = 'topic', 
        routing_key = "#",
        queue = str( uuid.uuid4() )
    )

    log_consumer.register_callback( log_handler )
    log_consumer.wait()

NameError: global name 'amqp_message' is not defined

File "/Users/gregoire/Documents/Projects/easionet/pip/src/carrot/carrot/backends/pyamqplib.py", line 141, in message_to_python
    return Message(backend=self, amqp_message=amqp_message,

I think ampq_message should be replaced by raw_message

Apache Qpid C++ Broker 0.5 support seems to be broken

Apache Qpid supports the latest version of AMQP 0-10 while py-amqplib only supports AMQP 0-8.

I suggest that AMQP client library should be like a plugin for one to be able to use any of py-amqplib, python-qpid, txAMQP, etc.

Docs specify "vhost" instead of "virtual_host"

The docs specify using the "vhost" argument to create connections when it should actually be "virtual_host". This messed me about for ages until I finally looked through the code.

btw: using **kwargs in BrokerConnection.__init__() doesn't help as it masks incorrect arguments making it harder to debug problems.

Here's a diff to correct the docs:

diff --git a/README.rst b/README.rst
index 3c48b89..f638176 100644
--- a/README.rst
+++ b/README.rst
@@ -148,7 +148,7 @@ Creating a connection
     >>> from carrot.connection import BrokerConnection
     >>> conn = BrokerConnection(hostname="localhost", port=5672,
     ...                           userid="test", password="test",
-    ...                           vhost="test")
+    ...                           virtual_host="test")


     If you're using Django you can use the
diff --git a/carrot/connection.py b/carrot/connection.py
index 6221fc0..463f08a 100644
--- a/carrot/connection.py
+++ b/carrot/connection.py
@@ -170,7 +170,7 @@ class DjangoBrokerConnection(BrokerConnection):
     :keyword password: The users password. If not provided this is taken
         from ``settings.AMQP_PASSWORD``.

-    :keyword vhost: The name of the virtual host to work with.
+    :keyword virtual_host: The name of the virtual host to work with.
         This virtual host must exist on the server, and the user must
         have access to it. Consult your brokers manual for help with
         creating, and mapping users to virtual hosts. If not provided

Global request timeout

Without having intricate (or cursory) knowledge of sockets, it seems to me that only a very basic failure scenario is handled by the connect_timeout (perhaps aptly named to indicate that it does just that) currently implemented in BrokerConnection. I'm guessing that this means that only if the port is pretending not to be there (it's stealthed, and connection refusal isn't signaled), this timeout applies.

When experimenting with the current timeout mechanism, this timeout did not seem to kick in where I created a "dummy" server that either only listened to a given port or listened and subsequently accepted the connection, disregarding any messages received.

I'd be interested in allowing "more global" timeouts because our server has had some problems with responding under certain loads, and this currently makes our application hang until Apache times out.

I'm interested to hear if you've given any thought to this, and if you think this is a valid feature I'm available to help out implementing and/or testing this.

memory backend not working: 'Backend' object has no attribute 'callback'

I'm still trying to find a way to get rid of the ghettoq DB backend and I tried the memory which sounds very close to what I'm looking for. But...

[2010-07-05 22:14:41,845: WARNING/MainProcess] Traceback (most recent call last):
[2010-07-05 22:14:41,845: WARNING/MainProcess] File "extra/celery/bin/celeryd.py", line 386, in <module>
[2010-07-05 22:14:41,845: WARNING/MainProcess] main()
[2010-07-05 22:14:41,845: WARNING/MainProcess] File "extra/celery/bin/celeryd.py", line 383, in main
[2010-07-05 22:14:41,846: WARNING/MainProcess] return run_worker(**vars(options))
[2010-07-05 22:14:41,846: WARNING/MainProcess] File "extra/celery/bin/celeryd.py", line 378, in run_worker
[2010-07-05 22:14:41,846: WARNING/MainProcess] return Worker(**options).run()
[2010-07-05 22:14:41,846: WARNING/MainProcess] File "extra/celery/bin/celeryd.py", line 212, in run
[2010-07-05 22:14:41,846: WARNING/MainProcess] self.run_worker()
[2010-07-05 22:14:41,847: WARNING/MainProcess] File "extra/celery/bin/celeryd.py", line 305, in run_worker
[2010-07-05 22:14:41,847: WARNING/MainProcess] worker.start()
[2010-07-05 22:14:41,847: WARNING/MainProcess] File "/usr/lib64/python2.6/site-packages/celery-2.0.0-py2.6.egg/celery/worker/__init__.py", line 198, in start
[2010-07-05 22:14:41,847: WARNING/MainProcess] component.start()
[2010-07-05 22:14:41,847: WARNING/MainProcess] File "/usr/lib64/python2.6/site-packages/celery-2.0.0-py2.6.egg/celery/worker/listener.py", line 235, in start
[2010-07-05 22:14:41,847: WARNING/MainProcess] self.consume_messages()
[2010-07-05 22:14:41,848: WARNING/MainProcess] File "/usr/lib64/python2.6/site-packages/celery-2.0.0-py2.6.egg/celery/worker/listener.py", line 249, in consume_messages
[2010-07-05 22:14:41,848: WARNING/MainProcess] wait_for_message()
[2010-07-05 22:14:41,848: WARNING/MainProcess] File "/usr/lib64/python2.6/site-packages/celery-2.0.0-py2.6.egg/celery/worker/listener.py", line 389, in _mainloop
[2010-07-05 22:14:41,848: WARNING/MainProcess] yield self.connection.drain_events()
[2010-07-05 22:14:41,848: WARNING/MainProcess] File "/usr/lib64/python2.6/site-packages/carrot-0.10.5-py2.6.egg/carrot/connection.py", line 158, in drain_events
[2010-07-05 22:14:41,853: WARNING/MainProcess] return self.connection.drain_events(**kwargs)
[2010-07-05 22:14:41,853: WARNING/MainProcess] File "/usr/lib64/python2.6/site-packages/carrot-0.10.5-py2.6.egg/carrot/backends/queue.py", line 54, in drain_events
[2010-07-05 22:14:41,856: WARNING/MainProcess] self.callback(message)
[2010-07-05 22:14:41,856: WARNING/MainProcess] AttributeError
[2010-07-05 22:14:41,856: WARNING/MainProcess] :
[2010-07-05 22:14:41,856: WARNING/MainProcess] 'Backend' object has no attribute 'callback'

No ability to set different auto_delete setting for both exchange and queue.

When declaring a "Consumer", the exchange is also declared. This means the consumers auto_delete setting will be used for the exchange as well.

I see some solutions here:

  1. Don't declare the exchange in the consumer.
  2. Have the ability to use an Exchange as the exchange argument:
 e = Exchange("foo", "direct", auto_delete=True)
 c = Consumer(queue="foo", exchange=e, auto_delete=False)

Python Queue module based backend doesn't work with iterconsume() or wait()

iterconsume (and by extension, wait()) does:

self.backend.consume(limit=limit)

but backends/queue.py defines consumer as:

def consume(self, queue, no_ack, callback, consumer_tag):

so obviously it doesn't work. Not sure the best way to fix it, perhaps implement declare_consumer and stash the callback?

No exception raised when mandatory=True and message cannot be routed

The documentation for the mandatory parameter of publisher.send states:

"If set, the message has mandatory routing. By default the message is silently dropped by the server if it can’t be routed to a queue. However - If the message is mandatory, an exception will be raised instead."

At least for the pyamqplib backend, no exception appears to be raised when the message cannot be routed to a queue. This is possibly more of an issue with an underlying driver or a documentation bug.

content_encoding and pyamqplib's auto_decode

Line 55 and following of amqplib/client_0_8/channel.py state that there is a "auto_decode" parameter for AMQP channels, which defaults to True. This will automatically decode incoming messages which have the "content_encoding" property set. Because of this fact, the "decode" method of carrot's built-in deserializer (line 141 of carrot/serialization.py) will receive an already decoded unicode object and tries to decode it again (line 160), which will work just fine if there are only ASCII characters but will crash if there are non-ASCII characters in the data (happened to me).

My suggestions:
Either instantiate the Channel object with "auto_decode=False" in carrot/backends/pyamqplib.py line 89 like this:
self.channel = Channel(self.connection.connection, auto_decode=False)

Or check if "data" already is an unicode object in carrot/serialization.py line 158-160 like that:
# Don't decode 8-bit strings or unicode objects
if content_encoding not in ('binary', 'ascii-8bit') and not isinstance(data, unicode):
data = codecs.decode(data, content_encoding)

I would prefer the latter method and leave the decoding to pyamqplib.

carrot cannot used on python 2.4

hi ask,

I think we should keep carrot compatible with python 2.4. It's worth and it's easy. currently I find only two places:

1). in setup.py the use of any():

def is_unwanted_file(filename):
return any([filename.endswith(skip_ext) for skip_ext in SKIP_EXTENSIONS])

  1. File "/home/zhigang/lib/python2.4/site-packages/carrot-0.8.0-py2.4.egg/carrot/backends/pystomp.py", line 114
    ack = "auto" if no_ack else "client"
    ^
    SyntaxError: invalid syntax

please consider.

thanks,

zhigang

ImportError: No module named backend

when try the example
got error

consumer.py

from carrot.messaging import Consumer
from carrot.connection import BrokerConnection

conn = BrokerConnection(hostname="localhost", port=5672,
                        userid="test", password="test",
                        virtual_host="test")
consumer = Consumer(connection=conn, queue="feed",exchange="feed", routing_key="importer")

def import_feed_callback(message_data, message):
     feed_url = message_data["import_feed"]
     print("Got feed import message for: %s" % feed_url)
     # something importing this feed url
     # import_feed(feed_url)
     message.ack()

consumer.register_callback(import_feed_callback)
consumer.wait() # Go into the consumer loop.

TypeError: reject() got an unexpected keyword argument 'requeue'

File "/Users/gregoire/Documents/Projects/easionet/pip/src/carrot/carrot/backends/base.py", line 30, in reject
    return self.backend.reject(self.delivery_tag, requeue=False)

carrot.backends.pyamqplib.Message.reject() does not have the same signature than base carrot.backends.base.Message.reject()

Consumer.iterconsume should raise StopIteration when backend channel is closed

A colleague of mine noticed that I had these peppered through some consumer daemons:
for m in self.iterconsume():
if self.closed:
break

To handle signals for a graceful daemon close, we close the channel asynchronously. Which will cause an exception in the iterconsume or when it tries to fetch the next message. I put the flag there to prevent some wild exception from bleeding through. As a test, we modified the pyamqplib backend to raise a StopIteration when the channel is closed (sorry for the lack of a diff):
def consume(self, limit=None):
"""Returns an iterator that waits for one message at a time."""
for total_message_count in count():
if limit and total_message_count >= limit:
raise StopIteration

        if not self.channel.is_open:
            raise StopIteration

        self.channel.wait()
        yield True

There is also the case where an amqp exception can be raised by the channel.wait call, but I defer the handling of that up to you (catch AMQPException and raise StopIteration or let the Exception bleed through; I am for the latter).

Let me know if you need any other information on this.

No way of passing AMPQConnection instance to publisher/consumer

The Connection argument to pub/con is a class, not an instance. If you're not using django, you can't pass AMPQConnection as the connection class, as that requires arguments when instanciated. (As a workaround, functools.partial works btw).

Should the connection be an instance, or should kwargs be passed on to the connection constructor or something?

Receive messages with any routing_key

Hello,

I'm trying out carrot (version 0.10.1 on ubuntu lucid) with the amqp backend and I'm having the following problem.
On the same exchange I register one consumer on the routing_key "requests" and two publishers: one with routing_key "requests" and the other on with routing_key "response".

On the consumer I receive the messages from both publishers.
Is this a bug, or am I doing something wrong?

Thanks in advance,
Stefan

==== test_carrot.py

from carrot.serialization import registry
from carrot.connection import BrokerConnection
from carrot.messaging import Consumer, Publisher

import time
import threading

conn = BrokerConnection(hostname="localhost", port=5672,
userid="guest", password="guest",
virtual_host="/")

publisher = Publisher(connection=conn,
exchange="cortinillas",
exchange_type="direct",
routing_key="responses")

reqpublisher = Publisher(connection=conn,
exchange="cortinillas",
exchange_type="direct",
routing_key="requests")

consumer = Consumer(connection=conn,
queue="requests",
exchange="cortinillas",
exchange_type="direct",
routing_key="requests")

def printMessage(message_data, message):
print "RECEIVED MESSAGE:", message_data, message, message.dict

print "ROUTING KEY:", message.delivery_info["routing_key"]

message.ack()

class Sender(threading.Thread):
def init(self):
super(Sender, self).init()

def run(self):
    time.sleep(2)
    print "SENDING: request!", reqpublisher
    reqpublisher.send("request!")

    time.sleep(2)
    print "SENDING: response!", publisher
    publisher.send("response!")

consumer.register_callback(printMessage)

s=Sender()
s.start()

consumer.wait()

=== Console output:
SENDING: request! <carrot.messaging.Publisher object at 0x9fb7f2c>
RECEIVED MESSAGE: request! <carrot.backends.pyamqplib.Message object at 0x9fbea0c> {'body': 'request!', '_amqp_message': <amqplib.client_0_8.basic_message.Message object at 0x9fbe8cc>, 'delivery_info': {'exchange': u'cortinillas', 'consumer_tag': u'carrot.messaging.Consumer-c1213852-27bf-41c2-93eb-8cc8a125abec', 'routing_key': u'requests', 'redelivered': False, 'delivery_tag': 1, 'channel': <amqplib.client_0_8.channel.Channel object at 0x9fbe3cc>}, '_state': 'RECEIVED', 'content_encoding': u'binary', 'content_type': u'application/data', 'delivery_tag': 1, '_decoded_cache': 'request!', 'backend': <carrot.backends.pyamqplib.Backend object at 0x9fbe2ec>}
ROUTING KEY: requests
SENDING: response! <carrot.messaging.Publisher object at 0x9fa864c>
RECEIVED MESSAGE: response! <carrot.backends.pyamqplib.Message object at 0x9fbea4c> {'body': 'response!', '_amqp_message': <amqplib.client_0_8.basic_message.Message object at 0x9fbe94c>, 'delivery_info': {'exchange': u'cortinillas', 'consumer_tag': u'carrot.messaging.Consumer-c1213852-27bf-41c2-93eb-8cc8a125abec', 'routing_key': u'responses', 'redelivered': False, 'delivery_tag': 2, 'channel': <amqplib.client_0_8.channel.Channel object at 0x9fbe3cc>}, '_state': 'RECEIVED', 'content_encoding': u'binary', 'content_type': u'application/data', 'delivery_tag': 2, '_decoded_cache': 'response!', 'backend': <carrot.backends.pyamqplib.Backend object at 0x9fbe2ec>}

ROUTING KEY: responses

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.