Giter VIP home page Giter VIP logo

aries-acapy-plugin-kafka-events's People

Contributors

burdettadam avatar chumbert avatar dbluhm avatar dkulic avatar luis-ga avatar mat-work avatar nemqe avatar victormartinez-work avatar

Stargazers

 avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

aries-acapy-plugin-kafka-events's Issues

Integration test: Webhook event pushed to Kafka

We need an integration test that verifies that an event is successfully emitted on Kafka. This doesn't need to be anything more sophisticated than validating at least one message has made it to Kafka initially.

Kafka outbound message listener

Implement outbound message listener that pushes outbound messages to kafka for handling by another service.

This listener will listen for acapy::outbound::message events, perform any required data marshaling, and publish the message to kafka.

On successful handling, this listener will emit an outbound status event (acapy::outbound::status::<status_string>) indicating that processing has concluded with the emitted status. The options for status string are found in what is currently defined as OutboundSendStatus enum in ACA-Py (this location may change).

The following are valid statuses for a message processed by the queue:

    # Message is sent to external queue. We don't know how it will process the queue
    SENT_TO_EXTERNAL_QUEUE = "sent_to_external_queue"

    # No endpoint available, and no internal queue for messages.
    UNDELIVERABLE = "undeliverable"

We may find that new statuses are required. Adding appropriate statuses is in scope for this task.

Kafka Optimizations for ACA-Py Queues

Our first pass on using Kafka for inbound and outbound messages and events uses a pretty minimal set of Kafka's features. The following is a summary of the current state of Kafka usage:

  • Events are published to a kafka topic mapped from the event's topic with wallet ID included in payload.
  • Outbound messages are published as encoded blobs annotated with headers and the endpoint to post the blob to.
  • Inbound messages are received via Kafka consumers all subscribing to the same group.

We should explore whether:

  • Event topics should include wallet ID or other metadata to more efficiently route to relevant consumers
  • Inbound message consumers should be split into groups when scaling
  • Other features of Kafka that could potentially improve performance at scale

startup error

Startup error I get when spinning container via docker-compose

acapy-kafka-queue_1 | 2021-08-11 10:34:31,214 aries_cloudagent.resolver.did_resolver_registry DEBUG Registering resolver <aries_cloudagent.resolver.default.web.WebDIDResolver object at 0x7fa2fd13aa90> acapy-kafka-queue_1 | acapy-kafka-queue_1 | Shutting down acapy-kafka-queue_1 | 2021-08-11 10:34:31,215 aries_cloudagent.commands.start ERROR Exception during startup: acapy-kafka-queue_1 | Traceback (most recent call last): acapy-kafka-queue_1 | File "/home/indy/.cache/pypoetry/virtualenvs/kafka-queue-f-aLBW4O-py3.6/lib/python3.6/site-packages/aries_cloudagent/commands/start.py", line 72, in init acapy-kafka-queue_1 | await startup acapy-kafka-queue_1 | File "/home/indy/.cache/pypoetry/virtualenvs/kafka-queue-f-aLBW4O-py3.6/lib/python3.6/site-packages/aries_cloudagent/commands/start.py", line 28, in start_app acapy-kafka-queue_1 | await conductor.setup() acapy-kafka-queue_1 | File "/home/indy/.cache/pypoetry/virtualenvs/kafka-queue-f-aLBW4O-py3.6/lib/python3.6/site-packages/aries_cloudagent/core/conductor.py", line 90, in setup acapy-kafka-queue_1 | context = await self.context_builder.build_context() acapy-kafka-queue_1 | File "/home/indy/.cache/pypoetry/virtualenvs/kafka-queue-f-aLBW4O-py3.6/lib/python3.6/site-packages/aries_cloudagent/config/default_context.py", line 57, in build_context acapy-kafka-queue_1 | await self.load_plugins(context) acapy-kafka-queue_1 | File "/home/indy/.cache/pypoetry/virtualenvs/kafka-queue-f-aLBW4O-py3.6/lib/python3.6/site-packages/aries_cloudagent/config/default_context.py", line 136, in load_plugins acapy-kafka-queue_1 | await plugin_registry.init_context(context) acapy-kafka-queue_1 | File "/home/indy/.cache/pypoetry/virtualenvs/kafka-queue-f-aLBW4O-py3.6/lib/python3.6/site-packages/aries_cloudagent/core/plugin_registry.py", line 200, in init_context acapy-kafka-queue_1 | await plugin.setup(context) acapy-kafka-queue_1 | File "/home/indy/kafka_queue/__init__.py", line 34, in setup acapy-kafka-queue_1 | producer = AIOKafkaProducer(**producer_conf) acapy-kafka-queue_1 | TypeError: type object argument after ** must be a mapping, not NoneType acapy-kafka-queue_1 | 2021-08-11 10:34:31,217 asyncio ERROR Task exception was never retrieved acapy-kafka-queue_1 | future: <Task finished coro=<run_loop.<locals>.done() done, defined at /home/indy/.cache/pypoetry/virtualenvs/kafka-queue-f-aLBW4O-py3.6/lib/python3.6/site-packages/aries_cloudagent/commands/start.py:77> exception=AttributeError("'NoneType' object has no attribute 'context'",)> acapy-kafka-queue_1 | Traceback (most recent call last): acapy-kafka-queue_1 | File "/home/indy/.cache/pypoetry/virtualenvs/kafka-queue-f-aLBW4O-py3.6/lib/python3.6/site-packages/aries_cloudagent/commands/start.py", line 79, in done acapy-kafka-queue_1 | await shutdown acapy-kafka-queue_1 | File "/home/indy/.cache/pypoetry/virtualenvs/kafka-queue-f-aLBW4O-py3.6/lib/python3.6/site-packages/aries_cloudagent/commands/start.py", line 35, in shutdown_app acapy-kafka-queue_1 | await conductor.stop() acapy-kafka-queue_1 | File "/home/indy/.cache/pypoetry/virtualenvs/kafka-queue-f-aLBW4O-py3.6/lib/python3.6/site-packages/aries_cloudagent/core/conductor.py", line 361, in stop acapy-kafka-queue_1 | multitenant_mgr = self.context.inject(MultitenantManager, required=False) acapy-kafka-queue_1 | File "/home/indy/.cache/pypoetry/virtualenvs/kafka-queue-f-aLBW4O-py3.6/lib/python3.6/site-packages/aries_cloudagent/core/conductor.py", line 85, in context acapy-kafka-queue_1 | return self.root_profile.context acapy-kafka-queue_1 | AttributeError: 'NoneType' object has no attribute 'context'

Kafka queue processor

Create a Kafka consumer responsible for delivering queued messages to their endpoints via HTTP post.

An implementation of an HTTP poster using redis should be in the wild (it's been referenced by the BC Gov team a number of times). We should evaluate this implementation and determine what, if anything, we can reuse in the Kafka HTTP poster.

refactor the consumer.

Ah, this usage of profile will be problematic, unfortunately. The profiles used by our plugin components must correspond to a wallet profile which for normal ACA-Py deployments will just be the "root profile" but in multi-tenanted ACA-Py will be the profile representing a sub-wallet.

  • refactor the consumer to only use a profile after handling an event where a profile is then provided from the appropriate context?

Originally posted by @dbluhm in #15 (comment)

ACA-Py Plugin Teardown

when aca-py is stopped, we need a way to clean up things the plugin created. this is important with kafka plugin.

Kafka instance

Instance a Kafka broker to test the Kafka plugin for development purposes

Create an outbound message poster

What

Create an outbound message poster that consumes messages from the Kafka outbound message queue and posts them to their ultimate destination.

We may be able to reuse pieces of BC Gov's poster created as a proof of concept when using Redis as a queue.

Why

This is the final component in the event driven agency that is not yet implemented.

Success Criteria

  • The poster should be horizontally scalable
  • Integration test showing the poster takes a message from the outbound message queue and posts it to an endpoint.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.