Giter VIP home page Giter VIP logo

mrsal's Introduction

MRSAL 20200907_104224

Release Python 3.10 Documentation

Tests Status Coverage Status Flake8 Status

Intro

Mrsal is a simple to use message broker abstraction on top of RabbitMQ and Pika. The goal is to make Mrsal trivial to re-use in all services of a distributed system and to make the use of advanced message queing protocols easy and safe. No more big chunks of repetive code across your services or bespoke solutions to handle dead letters.

Mrsal is Arabic for a small arrow and is used to describe something that performs a task with lightness and speed.

Quick Start guide

0. Install

First things first:

pip install mrsal

We need to install RabbitMQ to use Mrsal. Head over to install RabbitMQ. Make sure to stick to the configuration that you give the installation throughout this guide. You can also use the Dockerfile and the docker-compose that we are using in the full guide.

Next set the default username, password and servername for your RabbitMQ setup. It's advisable to use a .env script or the rc file for persistence.

[RabbitEnvVars]
RABBITMQ_DEFAULT_USER=******
RABBITMQ_DEFAULT_PASS=******
RABBITMQ_DEFAULT_VHOST=******
RABBITMQ_DOMAIN=******
RABBITMQ_DOMAIN_TLS=******

RABBITMQ_GUI_PORT=******
RABBITMQ_PORT=******
RABBITMQ_PORT_TLS=******

# FOR TLS
RABBITMQ_CAFILE=/path/to/file
RABBITMQ_CERT=/path/to/file
RABBITMQ_KEY=/path/to/file

Please read the full guide to understand what Mrsal currently can and can't do.

Mrsal was first developed by NeoMedSys and the research group CRAI at the univeristy hospital of Oslo.

1. Setup and connect

The first thing we need to do is to setup our rabbit server before we can subscribe and publish to it. Lets set up a server on our localhost with the port and credentials we used when spinning up the docker-compose

import json
import pika
from mrsal.mrsal import Mrsal

# If you want to use SSL for external listening then set it to True
SSL = False

# Note RabbitMQ container is listening on:
# 1. When SSL is False the default port 5672 which is exposed to RABBITMQ_PORT in docker-compose
# 2. When SSL is True the default port 5671 which is exposed to RABBITMQ_PORT_TLS in docker-compose
port = RABBITMQ_PORT_TLS if SSL else RABBITMQ_PORT
host = RABBITMQ_DOMAIN_TLS if SSL else RABBITMQ_DOMAIN

# It should match with the env specifications (RABBITMQ_DEFAULT_USER, RABBITMQ_DEFAULT_PASS)
credentials=(RABBITMQ_DEFAULT_USER, RABBITMQ_DEFAULT_PASS)

# It should match with the env specifications (RABBITMQ_DEFAULT_VHOST)
v_host = RABBITMQ_DEFAULT_VHOST

mrsal = Mrsal(
    host=host,
    port=port,
    credentials=credentials,
    virtual_host=v_host,
    ssl=SSL
)

mrsal.connect_to_server()

2 Publish

Now lets publish our message of friendship on the friendship exchange. Note: When fast_setup=True that means Mrsal will create the specified exchange and queue, then bind them together using routing_key.

# BasicProperties is used to set the message properties
prop = pika.BasicProperties(
        app_id='friendship_app',
        message_id='friendship_msg',
        content_type='text/plain',
        content_encoding='utf-8',
        delivery_mode=pika.DeliveryMode.Persistent,
        headers=None)

message_body = 'Hello'

# Publish the message to the exchange to be routed to queue
mrsal.publish_message(exchange='friendship',
                        exchange_type='direct',
                        queue='friendship_queue',
                        routing_key='friendship_key',
                        message=json.dumps(message_body), 
                        prop=prop,
                        fast_setup=True)

3 Consume

Now lets setup a consumer that will listen to our very important messages. If you are using scripts rather than notebooks then it's advisable to run consume and publish separately. We are going to need a callback function which is triggered upon receiving the message from the queue we subscribe to. You can use the callback function to activate something in your system.

Note:

  • If you start a consumer with callback_with_delivery_info=True then your callback function should have at least these params (method_frame: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, message_param: str).
  • If not, then it should have at least (message_param: str)
import json

def consumer_callback_with_delivery_info(host_param: str, queue_param: str, method_frame: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, message_param: str):
    str_message = json.loads(message_param).replace('"', '')
    if 'Hello' in str_message:
        app_id = properties.app_id
        msg_id = properties.message_id
        print(f'app_id={app_id}, msg_id={msg_id}')
        print('Hola habibi')
        return True  # Consumed message processed correctly
    return False

def consumer_callback(host_param: str, queue_param: str, message_param: str):
    str_message = json.loads(message_param).replace('"', '')
    if 'Hello' in str_message:
        print('Hola habibi')
        return True  # Consumed message processed correctly
    return False

mrsal.start_consumer(
        queue='friendship_queue',
        callback=consumer_callback_with_delivery_info,
        callback_args=(test_config.HOST, 'friendship_queue'),
        inactivity_timeout=1,
        requeue=False,
        fast_setup=True,
        callback_with_delivery_info=True
    )

Done! Your first message of friendship has been sent to the friendship queue on the exchange of friendship.

3 Concurrent Consumers

Sometimes we need to start multiple consumers to listen to the same queue and process received messages concurrently. You can do that by calling start_concurrence_consumer which takes total_threads param in addition to the same parameters used in start_consumer. This method will create a thread pool and spawn new Mrsal object and start new consumer for every thread.

import json
import time

import pika
from pika.exchange_type import ExchangeType

import mrsal.config.config as config
import tests.config as test_config
from mrsal.mrsal import Mrsal


mrsal = Mrsal(host=test_config.HOST,
              port=config.RABBITMQ_PORT,
              credentials=config.RABBITMQ_CREDENTIALS,
              virtual_host=config.V_HOST)
mrsal.connect_to_server()

APP_ID="TEST_CONCURRENT_CONSUMERS"
EXCHANGE="GoodFriends"
EXCHANGE_TYPE='direct'
QUEUE_EMERGENCY="alleSindInkludiert"  # place the excluded (but no fundamentalist danke) in an emergency queue  
NUM_THREADS=3
NUM_MESSAGES=3
INACTIVITY_TIMEOUT=30 # time out after 30 seconds
ROUTING_KEY="bleib-cool"
MESSAGE_ID="Bleib cool und alles wird besser"

def test_concurrent_consumer():
    # Start concurrent consumers
    mrsal.start_concurrence_consumer(total_threads=NUM_THREADS, queue=QUEUE_EMERGENCY,
                                     callback=consumer_callback_with_delivery_info,
                                     callback_args=(test_config.HOST, QUEUE_EMERGENCY),
                                     exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE,
                                     routing_key=ROUTING_KEY,
                                     inactivity_timeout=INACTIVITY_TIMEOUT,
                                     fast_setup=True,
                                     callback_with_delivery_info=True)

    mrsal.close_connection()

def consumer_callback_with_delivery_info(host_param: str, queue_param: str, method_frame: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, message_param: str):
    time.sleep(5)
    return True

That simple! You have now setup a full advanced message queueing protocol that you can use to promote friendship or other necessary communication between your services.

Note! Please refer to the >>>FULL GUIDE<<< on how to use customize Mrsal to meet specific needs. There are many parameters and settings that you can use to set up a more sophisticated communication protocol.

References


mrsal's People

Contributors

jonnesvold avatar rafatzahran avatar

Watchers

 avatar

mrsal's Issues

Messy arguments

Can we rewrite this in order to have it more compact. One approach could be to asterix operator to unpack args --> *args * total_threads

executor.map(self._spawn_mrsal_and_start_new_consumer, range(total_threads), [queue]*total_threads, [callback]*total_threads, [callback_args]*total_threads,

manual ack

We need to be able to manually acknowledge a message by creating a method can be called upon to do so. The current situation is not ideal as the ack only happens after the callback returns true. The consumption should NOT handle the ack rather the callback should always handle the ack as it sees itself fit.

CI/CD

After tests are written, then make a CI/CD pipeline for auto release and publish

Proper abstraction

The basic props of Pika should be handled by Mrsal with default values that can be changed by the user.

Make consumption continuous

We need to make the start_consumer continuous, which can be done by a while loop. We have to let the user keyboard interrupt as well.

Clean up messages comming from tests in MRSAL

It appears that numerous unnecessary messages from tests have been published to the Mrsal exchanges. We should take steps to clean up and remove these messages.

Connection info:

host=rabbitmq.neomodels.app,
virtual_host=myMrsalHost,
port=5671,
heartbeat=18000,
ssl=True

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.