Giter VIP home page Giter VIP logo

agent's Introduction

Welcome to Agent

License: MIT GitHub Workflow Status PyPI Release Documentation Status codecov

Introduction

Agent is a worker thread library that you can use to easily process any chunk of work that you can package into a chunk of memory. In the source you'll find a class called Worker which inherits from IWorker as its base and which writes a function ProcessMessage that performs some job on data stored in a pointer by calling AddMessage on the data from the main thread, your main entry point. This call adds this data message to the queue of work that the Worker needs to perform. Once you start up the Worker you can then proceed to call AddMessage(message, size) to add a job to perform.

An example Worker thread

Start with design of your message. For the purposes of this project, we'll be using Google's FlatBuffers. An example used to write the Worker class is just called Message. To see the final result of this setup take a look at the contents of the examples directory. Our example will be a very simple image processor.

First we create a FlatBuffers definition in a file named Message.fbs. It will define the format for the image and related data we'll be giving to the worker to work on. The contents are as follows.

namespace Messages;

table Message {
    id:uint;
    height:uint;
    width:uint;
    pixels:[byte];
}

root_type Message;

In this step what you've just done is to create a message format that will be passed to your worker containing all the information and data needed to do the job you require it to do. In this case the data is an image which includes the message ID, its width and height, and the image data itself.

At the top you'll see the first line gives the namespace in which you'd like the object Message to be created. So the compiler will produce a header file for a class named Messages::Message. After this you'll see the definition of Message itself. The first field in your table, is an unsigned int called id. The next two are height and width, each also unsigned int fields collectively telling us how much data to expect in the next field, pixels, which is an array of byte, denoted [byte]. The final line in Message.fbs tells the compiler that you want the functions it creates to be named using Message in the name. For example, as a result of this line flatc will write a function named CreateMessage. Without this line the resulting function won't be named accordingly. Once you've made it through this step you can run the FlatBuffers compiler to produce the header file that can be included in your project directly. It turns out that FlatBuffers has a built-in CMake module which allows you to create these header files as proper targets if your project uses CMake. More on this later.

Now that we have Message.fbs completed we can compile it with flatc, the FlatBuffers compiler. You may have flatc installed separated, but it is included in agent's CMakeLists.txt. If you installed agent then you should have flatc if you installed it into your path.

flatc --cpp Message.fbs

The end result should be a file named Message_generated.hpp, which you'll want to include in your worker's source.

Now for the meat of the sample code. You need create your Worker class to inherit from IWorker. Worker.hpp will declare a constructor which takes care of some details that the abstract class IWorker needs to know.

#pragma once

#include "IWorker.hpp"
#include <string>
#include <cstdint>

class Worker : public IWorker
{
public:
    Worker(unsigned int _id);
    Worker(unsigned int _id, std::string _name);
    ~Worker();

    int ProcessMessage(const void* _msg, std::uint32_t _size) const override;
};

Your source file for Worker will only need a couple definitions. You'll see that there are two constructors we've defined. One will match a similar constructor for IWorker which takes only a single number which becomes the worker's identification number. There's also another IWorker constructor which takes a name to identify it to the logger as well. If you call the first constructor the name will be filled in with a default value.

#include "Worker.hpp"
#include "Message_generated.h"

#include <string>

// We use spdlog embedded into IWorker
#include <spdlog/spdlog.h>
#include <flatbuffers/flatbuffers.h>

Worker::Worker(unsigned int _id)
    : IWorker(_id) {}

Worker::Worker(unsigned int _id, std::string _name)
    : IWorker(_id, _name) {}

Worker::~Worker()
{
    _logger->info("Worker {} finished", GetId());
}

int Worker::ProcessMessage(const void* _msg, std::uint32_t _size) const
{
    // The first step: Deserialize and get your data out
    auto message = Messages::GetMessage(_msg);
    int id = message->id();
    int width = message->width();
    int height = message->height();
    auto pixels = message->pixels()->Data();

    /**
     * Use the built-in logger to announce events
     */
    _logger->info("[{}] Received message: id: {} width: {} height: {}",
        agent::ThreadToString(std::this_thread::get_id()),
        id, width, height);

    /**
     * Now you can do some work; maybe pull in OpenCV?
     */

    // ProcessMessage is expected to return the ID of the message you processed
    return id;
}

The most important part is in the definition of ProcessMessage. Its sole job is to take in a message, deserialize it, and process whatever data it contains. This could be anything you want it to be. You could import any other library and do any additional work you need. For image processing you might just want to import OpenCV. You can imagine limitless use cases.

Now that we have a Worker class you'll want to create one and try to do some work on a few threads. To do this create a source file with a main entry point named main.cpp and within it create a Worker, name it, and run it. Once you have it running you'll need to serialize a message and send it off to be processed.

#include "Worker.hpp"
#include <iostream>
#include <thread>
#include <chrono>

int main(int argc, char** argv)
{
    // Create your worker
    auto worker = agent::Worker(0, "MyWorker");

    // Run the worker with two threads
    worker.Run(2);

    // Give it a second to get moving
    std::this_thread::sleep_for(std::chrono::milliseconds(100));

    // Load in some data and send it off
    flatbuffers::FlatBufferBuilder builder(8096);
    auto pixels = builder.CreateVector(
        std::vector<std::int8_t>{
            { 0, 0, 1,
              2, 2, 1,
              4, 0, 1 }
        }
    );
    auto message = Messages::CreateMessage(builder, 0, 3, 3, pixels);
    builder.Finish(message);
    auto buffer = builder.GetBufferPointer();
    auto size = builder.GetSize();

    /**
     * Now simulate receiving asynchronous messages; send your worker a chunk of
     * three messages four times separated by about half a second 
     */
    for (int i = 0; i < 4; i++)
    {
        worker.AddMessage(buffer, size);
        worker.AddMessage(buffer, size);
        worker.AddMessage(buffer, size);
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }

    return 0;
}

At this point you're ready to build. If you're accustomed to CMake. You'll notice that the samples directory already contains a pre-built version of the Worker class you just wrote in this introduction. At this point Worker is a part of the basic unit tests which can be found in tests/agent_t.cpp. Making sure that BUILD_TESTS=ON is set during your build with CMake will ensure that it will be compiled into the bin/tests binary. We're in the process of rearranging the code. Eventually this will be placed into a samples or examples directory.

Prerequisites

Building Agent requires the following software installed:

  • A C++17-compliant compiler
  • CMake >= 3.9
  • Doxygen (optional, documentation building is skipped if missing)* Python >= 3.6 for building Python bindings

Building Agent

The following sequence of commands builds Agent. It assumes that your current working directory is the top-level directory of the freshly cloned repository:

mkdir build
cd build
cmake -DCMAKE_BUILD_TYPE=Release ..
cmake --build .

The build process can be customized with the following CMake variables, which can be set by adding -D<var>={ON, OFF} to the cmake call:

  • BUILD_TESTING: Enable building of the test suite (default: ON)
  • BUILD_DOCS: Enable building the documentation (default: ON)
  • BUILD_PYTHON: Enable building the Python bindings (default: ON)

Documentation

Agent provides a Sphinx-based documentation, that can be browsed online at readthedocs.org.

agent's People

Contributors

mathemaphysics avatar

Stargazers

 avatar

Watchers

 avatar  avatar

agent's Issues

Add an `AMQPWorker` example to `README.md`

The most useful aspect of the library is its ease of integration with AMQP via AMQP-CPP. There should be an example showing how to use it in this way. It might be best to finish adding IConnectionHandler::AddMessage overloaded for AMQP submission first.

Modify `ProcessMessage` to return further information about the results

The IWorker and FWorker base classes don't return more than simply the message ID processed. We need some an extended prototype for IWorker::ProcessMessage and the equivalent in FWorker which returns a general Result class containing an easily customized data format the user can modify to return results that may be needed for logging purposes (since withint ProcessMessage, e.g. in FWorker it does not a priori have access to the FWorker::_logger).

`IAMQPWorker` sends `ack` to broker immediately while `IWorker` still running

Ideally we would only like to see an ack sent when the message has been processed and the IWorker is finished. This currently isn't the case. Since The IWorker is another thread which proceeds on its own, the callback set by onComplete is executed as soon as the IAMQPWorker finishes its call to IWorker::AddMessage.

Configure IAMQPWorker and IWorker with either automatic ack by IAMQPWorker or allow IWorker to send the ack when it has finished.

Move install target headers into the `include/agent`

Install target header files aren't getting installed at the moment. We're only succeeding in using agent via FetchContent because $<BUILD_INTERFACE:include> is in the build path. Move all headers needed after install into the include/agent path at the top level.

Standardize `ProcessMessage` to allow custom handling of resulting output

The virtual ProcessMessage function in IWorker takes four arguments, the final two with default nullptr values to allow it to be called with only the first two, the inputs only. Currently the only place to handle processed data returned from ProcessMessage is in the operator() loop function. For a user to make use of this it would require overloading the loop itself.

Rewrite the loop function in IWorker to check for those additional output arguments and process them according to user specifications, possibly via addition of another virtual function such as ProcessResult.

There may be an easier way to create this functionality using multiple workers/queues. Part of ProcessMessage could obviously simply be to submit the result to another queue or IWorker. Choose whichever is best and implement it.

Create a `WorkerChain` class

Just at the Worker class level we can create a series of worker that process data in steps just like a program. This is a pipeline with an adjustable width. It's workers run just like a program with the exception that their data can be relocated anywhere and consumed by another Worker or series of workers. The output of the previous Worker is the input to the next Worker in the chain.

The natural generalization of this WorkerChain class is a WorkerGraph. A single Worker can process data and send the results to any one of multiple other kinds of workers, depending on what was discovered in the message. This is the eventual goal of the concept. Such graphs of Workers are natural granular generalizations of a computer program with the exception that operations are always transferable to another worker without shared memory as a requirement.

Convert ConnectionHandler into an AMQP worker which contains its own connection handler

Currently AMQPWorker contains AMQP::Connection and everything else required to connect to an AMQP host. It creates a ConnectionHandler and runs its operator() to process messages. However, it should be possible for ConnectionHandler to pass itself into an AMQP::Connection constructor and store AMQP::Channel inside of it, itself. This would remove the need for an additional class to contain the connection information.

Find out if this is possible.

Develop a use case and implement it

Agent is a fairly simple and customizable library for building multiprocessing systems with shared memory. It would do well to have an examples folder to demonstrate its use cases.

Add option to run an `IAMQPWorker` without an `IWorker`

Sometimes we don't really care about heartbeats in the main operator() loop in IAMQPWorker and we just want to run and ack() right inside it without a separate thread running the job. As mentioned, the downside is that it while you're running the job in the main operator() loop you can't do anything else, including respond to a heartbeat or process any other messages. Those are blocked. The upside is that ack() isn't called as soon as the job is finished.

Refashion the constructor for IAMQPWorker constructor to run without an external worker if the pointer passed in is nullptr.

Create a `WorkerGraph` class to enable "worker programming"

As discussed in #13 we would like to also create a WorkerGraph class which allows for more than simply linear processes to be constructed. The end result is a kind of topological program flow. The ideal is that for each Worker there will be a running in the ProcessMessage function which is capable of discovering something about the message data and sending the resulting message to any number of other processing accepting that type of message.

Because of the unidirectional flow of messages, the resulting graph will be a directed graph, although it is obviously possible for messages to flow from one Worker's output to another Worker's input and vice versa.

Add a `this_thread::sleep_for` call into event loops to prevent CPU pinning

While the IConnecitonHandler and children have sleep statements within them to prevent CPU pinning, the IWorker objects used to do the actual work for IAMQPWorker instances do not. These result in CPU pinning, causing the CPU usage to read very high when nothing is actually being done. Even worse it eats up energy.

Insert a std::this_thread::sleep_for(std::chrono::milliseconds(10)) into the root IWorker and FWorker classes to make them realistic to use for anything.

Devcontainer.json needs updated to remove runArgs

We no longer want to use network=agent because it will require that it already exist. This is currently not a docker-compose project so it won't generate its own network and volumes if they don't already exist. This may be better suited to this style for testing purposes.

Allow `IAMQPWorker` to be configured via JSON map file input

The constructor for IAMQPWorker has become very unwieldly to use because of the number of configuration arguments like prefetch, durability of queue, exchange settings, etc. Create an overloaded constructor which takes either Json::Value, a map-like class, or an std::map. This can be used in combination with std::map variables which send e.g. durable to the symbol AMQP::durable from the amqpcpp.h header.

Use CMakePreset.json settings when building CI

The project settings in CMakePresets.json configure the project to build only Net and Foundation modules in POCO, dramatically speeding up the build process. Make sure that these settings are used in CI and any deployment in the future.

Templatize IWorker and its subclasses by message type

The code for IWorker and its subclasses is general enough at this point that it should be possible to generalize into a template. Update IWorker and its corresponding child classes to take different message formats and corresponding downstream variable types.

IWorker and ConnectionHandler need rearranged to be more consistent

IWorker should contain a non-pure virtual operator() loop function defined
to be overridden by any inheriting class. This would mean that Worker would
only need to define ProcessMessage (a pure virtual function), and
ConnectionHandler would only need to ignore ProcessMessage and override
operator().

Overload `Worker` constructor to allow specification of `ProcessMessage` as a `std::function`

Originally, the end user could make use of the agent library by inheriting from IWorker. However, there is a more succinct means of accomplishing the same effect for those users who just want to build something quickly. ProcessMessage can simply be specified via function pointer using std::function (or less, but ideally via std::function).

This will add another avenue for others to make use of it for either education or practical purposes.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.