Giter VIP home page Giter VIP logo

django-eb-sqs's Introduction

PyPI version License: MIT CircleCI

Django EB SQS - Background Tasks for Amazon SQS

django-eb-sqs is a simple task manager for AWS SQS. It uses SQS and the boto3 library.

Installation

Install the module with pip install git+git://github.com/cuda-networks/django-eb-sqs.git or add it to your requirements.txt.

Don't forget to add django-eb-sqs app to your Django INSTALLED_APPS settings:

INSTALLED_APPS = (
    ...,
    'eb_sqs',
)

Usage

Creating Tasks

Adding a task to a queue is simple.

from eb_sqs.decorators import task

@task(queue_name='test')
def echo(message):
    print(message)

echo.delay(message='Hello World!')

NOTE: This assumes that you have your AWS keys in the appropriate environment variables, or are using IAM roles. Consult the boto3 documentation for further info.

If you don't pass a queue name, the EB_SQS_DEFAULT_QUEUE setting is used. If not set, the queue name is eb-sqs-default.

Additionally the task decorator supports max_retries (default 0) and use_pickle (default False) attributes for advanced control task execution.

You can also delay the execution of a task by specifying the delay time in seconds.

echo.delay(message='Hello World!', delay=60)

During development it is sometimes useful to execute a task immediately without using SQS. This is possible with the execute_inline argument.

echo.delay(message='Hello World!', execute_inline=True)

NOTE: delay is not applied when execute_inline is set to True.

Failed tasks can be retried by using the retry method. See the following example:

from eb_sqs.decorators import task

@task(queue_name='test', max_retries=5)
def upload_file(message):
    print('# of retries: {}'.format(upload_file.retry_num))
    try:
        # upload ...
    except ConnectionException:
        upload_file.retry()

The retry call supports the delay and execute_inline arguments in order to delay the retry or execute it inline. If the retry shall not be counted for the max retry limit set count_retries to false. Use 'retry_num' to get the number of retries for the current task.

NOTE: retry() throws a MaxRetriesReachedException exception if the maximum number of retries is reached.

Executing Tasks

In order to execute tasks, use the Django command process_queue. This command can work with one or more queues, reading from the queues infinitely and executing tasks as they come-in.

python manage.py process_queue --queues <comma-delimited list of queue names>

You can either use full queue names, or queue prefix using prefix:*my_example_prefix* notation.

Examples:

python manage.py process_queue --queues queue1,queue2 # process queue1 and queue2
python manage.py process_queue --queues queue1,prefix:pr1-,queue2 # process queue1, queue2 and any queue whose name starts with 'pr1-'

Use the signals MESSAGES_RECEIVED, MESSAGES_PROCESSED, MESSAGES_DELETED of the WorkerService to get informed about the current SQS batch being processed by the management command.

Auto Tasks

This is a helper tool for the case you wish to define one of your class method as a task, and make it seamless to all callers. This makes the code much simpler, and allows using classes to invoke your method directly without considering whether it's invoked async or not.

This is how you would define your class:

from eb_sqs.auto_tasks.service import AutoTaskService

class MyService:
    def __init__(self, p1=default1, ..., pN=defaultN, auto_task_service=None):
        self._auto_task_service = auto_task_service or AutoTaskService()

        self._auto_task_service.register_task(self.my_task_method)
    
    def my_task_method(self, *args, **kwargs):
        ...

Notice the following:

  1. Your class needs to have defaults for all parameters in the c'tor
  2. The c'tor must have a parameter named auto_task_service
  3. The method shouldn't have any return value (as it's invoked async)

In case you want your method to retry certain cases, you need to raise RetryableTaskException. You can provide on optional delay time for the retry, set count_retries=False in case you don't want to limit retries, or use max_retries_func to specify a function which will be invoked when the defined maximum number of retries is exhausted.

Settings

The following settings can be used to fine tune django-eb-sqs. Copy them into your Django settings.py file.

  • EB_AWS_REGION (us-east-1): The AWS region to use when working with SQS.
  • EB_SQS_MAX_NUMBER_OF_MESSAGES (10): The maximum number of messages to read in a single call from SQS (<= 10).
  • EB_SQS_WAIT_TIME_S (2): The time to wait (seconds) when receiving messages from SQS.
  • NO_QUEUES_WAIT_TIME_S (5): The time a workers waits if there are no SQS queues available to process.
  • EB_SQS_AUTO_ADD_QUEUE (False): If queues should be added automatically to AWS if they don't exist.
  • EB_SQS_QUEUE_MESSAGE_RETENTION (1209600): The value (in seconds) to be passed to MessageRetentionPeriod parameter, when creating a queue (only relevant in case EB_SQS_AUTO_ADD_QUEUE is set to True).
  • EB_SQS_QUEUE_VISIBILITY_TIMEOUT (300): The value (in seconds) to be passed to VisibilityTimeout parameter, when creating a queue (only relevant in case EB_SQS_AUTO_ADD_QUEUE is set to True).
  • EB_SQS_DEAD_LETTER_MODE (False): Enable if this worker is handling the SQS dead letter queue. Tasks won't be executed but group callback is.
  • EB_SQS_DEFAULT_DELAY (0): Default task delay time in seconds.
  • EB_SQS_DEFAULT_MAX_RETRIES (0): Default retry limit for all tasks.
  • EB_SQS_DEFAULT_COUNT_RETRIES (True): Count retry calls. Needed if max retries check shall be executed.
  • EB_SQS_DEFAULT_QUEUE (eb-sqs-default): Default queue name if none is specified when creating a task.
  • EB_SQS_EXECUTE_INLINE (False): Execute tasks immediately without using SQS. Useful during development. Global setting True will override setting it on a task level.
  • EB_SQS_FORCE_SERIALIZATION (False): Forces serialization of tasks when executed inline. This setting is helpful during development to see if all arguments are serialized and deserialized properly.
  • EB_SQS_QUEUE_PREFIX (``): Prefix to use for the queues. The prefix is added to the queue name.
  • EB_SQS_USE_PICKLE (False): Enable to use pickle to serialize task parameters. Uses json as default.
  • EB_SQS_AWS_MAX_RETRIES (30): Default retry limit on a boto3 call to AWS SQS.
  • EB_SQS_REFRESH_PREFIX_QUEUES_S (10): Minimal number of seconds to wait between refreshing queue list, in case prefix is used

Development

Make sure to install the development dependencies from development.txt.

Tests

The build in tests can be executed with the Django test runner.

python -m django test --settings=eb_sqs.test_settings

django-eb-sqs's People

Contributors

alexeyts avatar holger-zeit avatar jacobevans avatar patrick91 avatar rohandev avatar schweigi avatar sherwinlu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

django-eb-sqs's Issues

Release on pypi

There is no release on pypi, would really be handy to be able to install this via pip :)

Handing SIGTERM in process_queues() command

After going through the source code, I didn't see anything meant to handle system signals (maybe I missed it). Is there a way to halt reading from the queue but continue processing running tasks? If not, would it be worth trying to handle a SIGTERM, for example, in the main worker loop?

Loving the project!

MessageGroupId is a required parameter while enqueue to a queue


In [2]: @task(queue_name="mysqs.fifo") 
   ...: def echo(message): 
   ...:    print("SQS send! messag") 
   ...:                                                                                                                             

In [3]: echo.delay(message='Hello World!',group_id="123",MessageGroupId="123")                                                      
---------------------------------------------------------------------------
ClientError                               Traceback (most recent call last)
/usr/local/lib/python3.6/site-packages/eb_sqs/aws/sqs_queue_client.py in add_message(self, queue_name, msg, delay)
     77                 else:
---> 78                     raise ex
     79         except QueueDoesNotExistException:

/usr/local/lib/python3.6/site-packages/eb_sqs/aws/sqs_queue_client.py in add_message(self, queue_name, msg, delay)
     67                     MessageBody=msg,
---> 68                     DelaySeconds=delay
     69                 )

/usr/local/lib/python3.6/site-packages/boto3/resources/factory.py in do_action(self, *args, **kwargs)
    519             def do_action(self, *args, **kwargs):
--> 520                 response = action(self, *args, **kwargs)
    521 

/usr/local/lib/python3.6/site-packages/boto3/resources/action.py in __call__(self, parent, *args, **kwargs)
     82 
---> 83         response = getattr(parent.meta.client, operation_name)(*args, **params)
     84 

/usr/local/lib/python3.6/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
    356             # The "self" in this scope is referring to the BaseClient.
--> 357             return self._make_api_call(operation_name, kwargs)
    358 

/usr/local/lib/python3.6/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
    675             error_class = self.exceptions.from_code(error_code)
--> 676             raise error_class(parsed_response, operation_name)
    677         else:

ClientError: An error occurred (MissingParameter) when calling the SendMessage operation: The request must contain the parameter MessageGroupId.

During handling of the above exception, another exception occurred:

QueueClientException                      Traceback (most recent call last)
/usr/local/lib/python3.6/site-packages/eb_sqs/worker/worker.py in _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retries)
    103             else:
--> 104                 self.queue_client.add_message(worker_task.queue, worker_task.serialize(), delay)
    105                 return None

/usr/local/lib/python3.6/site-packages/eb_sqs/aws/sqs_queue_client.py in add_message(self, queue_name, msg, delay)
     81         except Exception as ex:
---> 82             raise QueueClientException(ex)

QueueClientException: An error occurred (MissingParameter) when calling the SendMessage operation: The request must contain the parameter MessageGroupId.

During handling of the above exception, another exception occurred:

QueueException                            Traceback (most recent call last)
<ipython-input-3-17fe155eac47> in <module>
----> 1 echo.delay(message='Hello World!',group_id="123",MessageGroupId="123")

/usr/local/lib/python3.6/site-packages/eb_sqs/decorators.py in wrapper(*args, **kwargs)
     24 
     25         worker = WorkerFactory.default().create()
---> 26         return worker.delay(group_id, queue, func, args, kwargs, max_retries, pickle, delay, execute_inline)
     27 
     28     return wrapper

/usr/local/lib/python3.6/site-packages/eb_sqs/worker/worker.py in delay(self, group_id, queue_name, func, args, kwargs, max_retries, use_pickle, delay, execute_inline)
     73         # type: (unicode, unicode, Any, tuple, dict, int, bool, int, bool) -> Any
     74         worker_task = WorkerTask(str(uuid.uuid4()), group_id, queue_name, func, args, kwargs, max_retries, 0, None, use_pickle)
---> 75         return self._enqueue_task(worker_task, delay, execute_inline, False, True)
     76 
     77     def retry(self, worker_task, delay, execute_inline, count_retries):

/usr/local/lib/python3.6/site-packages/eb_sqs/worker/worker.py in _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retries)
    114                            ex)
    115 
--> 116             raise QueueException()
    117 
    118     @classmethod

QueueException:

MessageGroupId is a required parameter while enqueue to a queue

Make it clear that Elastic Beanstalk support was removed

Hello,

I just spent awhile trying to figure out why the library was no longer working after updating it. I did not realize that elastic beanstalk was no longer supported until I looked through the diffs of the README and saw references to elastic beanstalk were removed. It would be nice if the README made it clear that support was removed.

Improvement candidates

Hi, this project seems super useful!

I'm interested in using it in a production system and have a few minor improvement candidates.
I'd love to hear your feedback and we can have a quick discussion around these points before I jump into implementing anything. If you have other ideas that seem more pressing we can collect them in this thread as well.

Initial ideas:

  1. Use CI to run tests on PRs and commits to master: useful as sanity check for commits and discussions about pull requests. Services like TravisCI are free for open-source projects like this
  2. Upload project to PyPI for easier installations via pip
  3. Allow explicit setting of credentials (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) in django settings. If there not given, default to existing behaviour by boto discovering the credentials

Let me know what you think, thanks!

Making boto3 related configs vars available to be set in Django settings

Hey Contributors,
First of all I want to say this project has been a very useful component in a project I worked on recently.

I wanted to bring your attention to discuss the future plans or roadmaps for this project as I have a some ideas to discuss and see what you think of it and if it is something can be done I am happy to work on it and share a pull request to be reviewed ๐Ÿ™Œ

  1. Making boto3 configurations available to be set in django.conf.settings (prefixed: AWS_)

some of the django based libraries already acknowledges the settings variables and let user define it in their. i.e django-storages

I came across this error

CredentialRetrievalError
Error when retrieving credentials from iam-role: Credential refresh failed, response did not contain: access_key, secret_key, token, expiry_time

where set higher values for AWS_METADATA_SERVICE_NUM_ATTEMPTS & AWS_METADATA_SERVICE_TIMEOUT could have helped.

  1. A little bit work on the documentation of this project. I see it as a highly efficient and scalable piece of technology with a complete and effective alternative to celery (as celery is not recommended to be used with SQS) and still I came across so many projects and people who want to use AWS SQS for it's reliability and scalability but celery doesn't work that well as it does for other ampq based brokers.

For 2nd part, I want to connect with a core contributor who can help me understand the project in detail so I can start contributing to the improvements on documentation as well.

Looking forwarded to hearing from you guys.

No license?

Project looks awesome from the readme, but I can't use a project without a License.
Can the current contributors agree on a License and license the repo.

I'd like to suggest the use of one of the MIT, BSD, or Apache 2.0 licenses. But its obviously up to the current contributors to decide.

Edit:
I'm not a lawyer, but I think, @patrick91 @Schweigi @itaybleier, you will all have to agree on the license.

Drop python 2 support

Python 2 is deprecated, and it would be better to fix all the typing and other relevant stuff to python 3 format.
Also no need to run python 2 flaky tests.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.