Giter VIP home page Giter VIP logo

outbox-event-bus's Introduction

CI Release Quality Gate Status Codacy Badge

Outbox event bus

Kotlin based event bus that implements an outbox pattern approach to publishing domain level events in the context of database transactions. This allows to atomically update the database and publish events. Check this for an in-depth explanation of this pattern.

This solution uses a polling publisher to push messages to the event queue.

What you're getting

This library targets scenarios where we want to reliably persist data and send events to a message bus.

Concepts

  • EventBusProvider: Main entry point used by producers and consumers allowing users to setup kafka.
  • EventProducer: Sends events to the database, that will be eventually sent to the message queue;
  • EventConsumer: Interface that provides a reactive stream of events that the user can subscribe to;
  • Engine: The underlying messaging technology used to propagate the messages;
  • LocalEventStore: Local database table that stores the events;

Event structure

This library has an opinionated view about the internal structure of an event, this may change in the future, but for now the events have the following structure (see the EventOutput class):

Field Data type
uuid String
timestamp Instant
topic String
msgType String
mimeType String
payload ByteArray

The payload is the user-specific content of the message. For now messages are encoded as JSON, but this may also change in the future.

Database table structure

Column Data type
topic varchar(255)
delivered boolean
uuid char(36)
stored_timestamp datetime
send_timestamp datetime
msg_type varchar(255)
mime_type varchar(255)
payload blob

Configuration options

  • consumer.streamMode: Defines the behavior for sending message acknowledge. Possible options are:
    • AutoCommit: acknowledges reads automatically every auto.commit.interval.ms millis;
    • EndOfBatchCommit: acknowledges reads once every message in the batch of messages that were retrieved are dealt with
    • MessageCommit: acknowledges every message individually
  • consumer.syncInterval: Number of millis between message queue polls
  • consumer.backpressureStrategy: How the reactive stream deals with backpressure. See: io.reactivex.BackpressureStrategy
  • producer.event.cleanup.intervalInSeconds: How many seconds an event is retained after itn has been published. Default 7 days

Failure model

The event relay will keep on trying to send events that fails delivery, while also writing a messasge with alert level ERROR indicating that the failure has occurred.

Caveats

Currently, we are using Exposed as our ORM, this forces the users of this library to also use exposed in order to have the transactional properties that are implied.

Getting started

  • Checkout the project
  • Run gradlew clean build
  • Hack away :)

Usage in your project

  • Add the dependency: com.github.waterdog-oss: outbox-event-bus:

Setting up a producer:

  1. Setup the dependencies (database and local event store)
  2. Setup Kafka
  3. Use the event bus provider, to get a producer
  4. Send a message

Example:

// imports are omitted. Check the examples section - producer

// Step 1
val dataSource:DataSource = HikariDataSource(HikariConfig().apply {
    driverClassName = "org.h2.Driver"
    jdbcUrl = "jdbc:h2:mem:test"
    maximumPoolSize = 5
    isAutoCommit = false
    transactionIsolation = "TRANSACTION_REPEATABLE_READ"
    validate()
})

// See the producer example (mobi.waterdog.eventbus.example.sql.LocalEventStoreSql)
val localEventStore: LocalEventStore =  MyLocalEventStore()

// Step 2
val props = Properties()
//General cluster settings and config
props["bootstrap.servers"] = kafkaServer
//Kafka serialization config
props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"

// Step 3
val ebp = EventBusProvider()
ebf.setupProducer(EventBackend.Kafka, localEventStore)
val producer = ebf.getProducer(props)

// Step 4
producer.send(EventInput("test", "OK", "text/plain", "sent at: ${Instant.now()}".toByteArray()))

Setting up a consumer:

  1. Setup the EventBusProvider
  2. Setup the kafka consumer
  3. Setup a subscription to the stream
// imports are omitted. Check the examples section - consumer

// Step 1
val ebp = EventBusProvider()
ebp.setupConsumer(EventBackend.Kafka)

// Step 2
val props = Properties()
//General cluster settings and config
props["bootstrap.servers"] = kafkaServer
props["enable.auto.commit"] = "true"
props["auto.commit.interval.ms"] = "1000"
props["heartbeat.interval.ms"] = "3000"
props["session.timeout.ms"] = "10000"
props["auto.offset.reset"] = "latest"
//Kafka serialization config
props["key.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
props["value.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
//Event bus property that controls the sync loop and the auto-commit mode
props["consumer.syncInterval"] = "1000"
props["consumer.streamMode"] = "AutoCommit"
val consumer = ebf.getConsumer(props)

// Step 3
val topic="my-topic"
val consumerId = "consumer-group-id"
consumer.stream(topic, consumerId)
        .doOnError { it.printStackTrace() }
        .onErrorReturnItem(EventOutput.buildError())
        .subscribe {
           log.info("EVENT: ${it.uuid} @ ${it.timestamp}")
        }

Metrics

The following metrics are exported (using micrometer):

  • events.store.timer - Accounts the time it takes to store an event in the event store
  • events.store.error - Counts the number of errors storing events in the event store
  • events.send.timer - Accounts the time it takes to send an event to the message backend
  • events.send.error - Counts the number of errors sending errors to the message backend
  • events.cleanup.timer - Accounts the time it takes to clear an event from the store
  • events.cleanup.error - Counts the number of errors clearing events from the store

Providing

// Given: A meter registry
val meterRegistry = SimpleMeterRegistry()
meterRegistry.config().commonTags("service-name", "my-beautiful-service")

// See the producer example (mobi.waterdog.eventbus.example.sql.LocalEventStoreSql)
val localEventStore: LocalEventStore =  MyLocalEventStore()

val ebf = EventBusProvider(EventBackend.Kafka, meterRegistry)
ebf.setupProducer(localEventStore)

// You are then able to access your metrics
val timer = meterRegistry.get("events.store.timer").timer()

Roadmap

  • Separate event relay thread from producer
  • Use toxiproxy to assert the correct when errors happen
  • Add support for sending messages with keys!
  • We are using JSON as a serialization mechanism. Allow the operators to provide their own serialization mechanism so that other formats like protobufs can be used;
  • Cleanup events that have been processed and are older than a given threshold;

outbox-event-bus's People

Contributors

felix19350 avatar monxalo avatar

Stargazers

 avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

Forkers

codacy-badger

outbox-event-bus's Issues

Implement JMX metrics

Implement JMX metrics to allow inspection of:

  • No. of consumers active
  • Error rates on kafka message receive
  • Error rates on kafka message send
  • avg msg receive time
  • avg msg send time

Two versions of jackson are referenced.

Stupid error on build.gradle:

 "jackson"     : [
                    "core"    : "com.fasterxml.jackson.core:jackson-core:${versions.jackson}",
                    "databind": "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}",
                    "kotlin"  : "com.fasterxml.jackson.module:jackson-module-kotlin:${versions.jackson}",
                    "jsr310"  : "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.9.2"
            ],

jsr310 should use the same jackson version...

Entity not found, Event Sync failed Error

With a producer sending a event every 5 seconds, sometimes an error occurs with the stack trace below.

The consumer and producer are as follow:

fun start() {
        thread {
            val consumer: EventConsumer by inject()

            consumer.stream("telemetry", "telemetry-consumer")
                .filter { it.msgType == "DecodedFrame" }
                .subscribe { parseMessage(it.payload) }
        }
    }

fun testProducer() {
        val producer: EventProducer by inject()

        timer("producer", false, 5000, 5000) {
            producer.sendAsync(
                EventInput(
                    "telemetry",
                    "DecodedFrame",
                    "application/json",
                    JsonFrames.gpsFrame.toByteArray()
                )
            )
        }
    }

with the following configurations:

private fun initBusEventBus(): EventConsumer {
        val ebf: EventBusProvider = get()

        val props = Properties()
        props["bootstrap.servers"] = "kafka-service:9092"
        props["enable.auto.commit"] = "true"
        props["group.id"] = "test-consumer-group"
        props["auto.commit.interval.ms"] = "1000"
        props["auto.offset.reset"] = "earliest"
        props["session.timeout.ms"] = "30000"
        props["key.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
        props["value.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
        props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
        props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
        props["sync.delayMillis"] = 1000

        ebf.setup(EventBackend.Kafka, props)

        return ebf.getConsumer()
    }

The different errors occur with some time in between them, and just a few times, not always with the same event

org.jetbrains.exposed.exceptions.EntityNotFoundException: Entity EventDAO, id=4 not found in database
        at org.jetbrains.exposed.dao.EntityClass.get(Entity.kt:495)
        at org.jetbrains.exposed.dao.EntityClass.get(Entity.kt:497)
        at mobi.waterdog.eventbus.persistence.sql.LocalEventCacheSql$markAsDelivered$1$1.invoke(LocalEventCacheSql.kt:20)
        at mobi.waterdog.eventbus.persistence.sql.LocalEventCacheSql$markAsDelivered$1$1.invoke(LocalEventCacheSql.kt:13)
        at mobi.waterdog.eventbus.persistence.sql.DatabaseConnection$query$2$1.invoke(DatabaseConnection.kt:16)
        at mobi.waterdog.eventbus.persistence.sql.DatabaseConnection$query$2$1.invoke(DatabaseConnection.kt:9)
        at org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManagerKt.inTopLevelTransaction(ThreadLocalTransactionManager.kt:103)
        at org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManagerKt.transaction(ThreadLocalTransactionManager.kt:74)
        at org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManagerKt.transaction(ThreadLocalTransactionManager.kt:57)
        at mobi.waterdog.eventbus.persistence.sql.DatabaseConnection$query$2.doResume(DatabaseConnection.kt:15)
        at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:42)
        at kotlinx.coroutines.experimental.DispatchedTask$DefaultImpls.run(Dispatched.kt:168)
        at kotlinx.coroutines.experimental.DispatchedContinuation.run(Dispatched.kt:13)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
23:00:27.855 [Thread-5] ERROR m.w.e.p.PersistentEventWriter - Event sync failedEntity EventDAO, id=5 not found in database
org.jetbrains.exposed.exceptions.EntityNotFoundException: Entity EventDAO, id=5 not found in database
        at org.jetbrains.exposed.dao.EntityClass.get(Entity.kt:495)
        at org.jetbrains.exposed.dao.EntityClass.get(Entity.kt:497)
        at mobi.waterdog.eventbus.persistence.sql.LocalEventCacheSql$markAsDelivered$1$1.invoke(LocalEventCacheSql.kt:20)
        at mobi.waterdog.eventbus.persistence.sql.LocalEventCacheSql$markAsDelivered$1$1.invoke(LocalEventCacheSql.kt:13)
        at mobi.waterdog.eventbus.persistence.sql.DatabaseConnection$query$2$1.invoke(DatabaseConnection.kt:16)
        at mobi.waterdog.eventbus.persistence.sql.DatabaseConnection$query$2$1.invoke(DatabaseConnection.kt:9)
        at org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManagerKt.inTopLevelTransaction(ThreadLocalTransactionManager.kt:103)
        at org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManagerKt.transaction(ThreadLocalTransactionManager.kt:74)
        at org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManagerKt.transaction(ThreadLocalTransactionManager.kt:57)
        at mobi.waterdog.eventbus.persistence.sql.DatabaseConnection$query$2.doResume(DatabaseConnection.kt:15)
        at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:42)
        at kotlinx.coroutines.experimental.DispatchedTask$DefaultImpls.run(Dispatched.kt:168)
        at kotlinx.coroutines.experimental.DispatchedContinuation.run(Dispatched.kt:13)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
23:00:32.857 [Thread-5] ERROR m.w.e.p.PersistentEventWriter - Event sync failedEntity EventDAO, id=6 not found in database
org.jetbrains.exposed.exceptions.EntityNotFoundException: Entity EventDAO, id=6 not found in database
        at org.jetbrains.exposed.dao.EntityClass.get(Entity.kt:495)
        at org.jetbrains.exposed.dao.EntityClass.get(Entity.kt:497)
        at mobi.waterdog.eventbus.persistence.sql.LocalEventCacheSql$markAsDelivered$1$1.invoke(LocalEventCacheSql.kt:20)
        at mobi.waterdog.eventbus.persistence.sql.LocalEventCacheSql$markAsDelivered$1$1.invoke(LocalEventCacheSql.kt:13)
        at mobi.waterdog.eventbus.persistence.sql.DatabaseConnection$query$2$1.invoke(DatabaseConnection.kt:16)
        at mobi.waterdog.eventbus.persistence.sql.DatabaseConnection$query$2$1.invoke(DatabaseConnection.kt:9)
        at org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManagerKt.inTopLevelTransaction(ThreadLocalTransactionManager.kt:103)
        at org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManagerKt.transaction(ThreadLocalTransactionManager.kt:74)
        at org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManagerKt.transaction(ThreadLocalTransactionManager.kt:57)
        at mobi.waterdog.eventbus.persistence.sql.DatabaseConnection$query$2.doResume(DatabaseConnection.kt:15)
        at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:42)
        at kotlinx.coroutines.experimental.DispatchedTask$DefaultImpls.run(Dispatched.kt:168)
        at kotlinx.coroutines.experimental.DispatchedContinuation.run(Dispatched.kt:13)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.