Giter VIP home page Giter VIP logo

jet / equinox Goto Github PK

View Code? Open in Web Editor NEW
453.0 25.0 68.0 4.63 MB

.NET event sourcing library with CosmosDB, DynamoDB, EventStoreDB, message-db, SqlStreamStore and integration test backends. Focused at stream level; see https://github.com/jet/propulsion for cross-stream projections/subscriptions/reactions

Home Page: https://github.com/jet/dotnet-templates

License: Apache License 2.0

F# 99.25% PowerShell 0.32% Shell 0.14% Dockerfile 0.29%
eventstore fsharp event-sourcing dotnet dotnet-core csharp todo-backend sqlstreamstore postgres sql-server mysql cosmosdb memorystore dynamodb eventstoredb message-db

equinox's Introduction

Equinox Build Status release NuGet license code size docs status Discord Gitpod ready-to-code

Equinox is a set of low dependency libraries that allow for event-sourced processing against stream-based stores handling:

Not a framework; you compose the libraries into an architecture that fits your apps' evolving needs.

It does not and will not handle projections and subscriptions. See Propulsion for that.

Table of Contents

Getting Started

Design Motivation

Equinox's design is informed by discussions, talks and countless hours of hard and thoughtful work invested into many previous systems, frameworks, samples, forks of samples, the outstanding continuous work of the EventStore founders and team and the wider DDD-CQRS-ES community. It would be unfair to single out even a small number of people despite the immense credit that is due. Some aspects of the implementation are distilled from Jet.com systems dating all the way back to 2013.

An event sourcing system usually needs to address the following concerns:

  1. Storing events with good performance and debugging capabilities
  2. Transaction processing
    • Optimistic concurrency (handle loading conflicting events and retrying if another transaction overlaps on the same stream)
    • Folding events into a State, updating as new events are added
  3. Decoding events using codecs and formats
  4. Framework and application integration
  5. Projections and Reactions

Designing something that supports all of these as a single integrated solution results in an inflexible and difficult to use framework. Thus, Equinox focuses on two central aspects of event sourcing: items 1 and 2 on the list above.

Of course, the other concerns can't be ignored; thus, they are supported via other libraries that focus on them:

  • FsCodec supports encoding and decoding (concern 3)
  • Propulsion supports projections and reactions (concern 5)

Integration with other frameworks (e.g., Equinox wiring into ASP.NET Core) is something that is intentionally avoided; as you build your application, the nature of how you integrate things will naturally evolve.

We believe the fact Equinox is a library is critical:

  • It gives you the ability to pick your preferred way of supporting your event sourcing system.
  • There's less coupling to worry about as your application evolves over time.

If you're looking to learn more about and/or discuss Event Sourcing and it's myriad benefits, trade-offs and pitfalls as you apply it to your Domain, look no further than the thriving 4000+ member community on the DDD-CQRS-ES Discord; you'll get patient and impartial world class advice 24x7 (there are #equinox, #eventstore and #sql-stream-store channels for questions or feedback). (invite link)

Features

  • Designed not to invade application code; your domain tests can be written directly against your models.
  • Core ideas and features of the library are extracted from ideas and lessons learned from existing production software.
  • Test coverage for it's core features. In addition there are baseline and specific tests for each supported storage system and a comprehensive test and benchmarking story
  • Pluggable event serialization. All encoding is specified in terms of the FsCodec.IEventCodec contract. FsCodec provides for pluggable encoding of events based on:
  • Caching using the .NET MemoryCache to:
    • Minimize round trips; consistent implementation across stores 🙏 @DSilence
    • Minimize latency and bandwidth / Request Charges by maintaining the folded state, without needing the Domain Model folded state to be serializable
    • Enable read through caching, coalescing concurrent reads via opt-in LoadOption.AllowStale
  • Mature and comprehensive logging (using Serilog internally), with optimal performance and pluggable integration with your apps hosting context (we ourselves typically feed log info to Splunk and the metrics embedded in the Serilog.Events.LogEvent Properties to Prometheus; see relevant tests for examples)
  • OpenTelemetry Integration (presently only implemented in Equinox.Core and Equinox.MessageDb ... #help-wanted)
  • Equinox.EventStore, Equinox.SqlStreamStore: In-stream Rolling Snapshots:
    • No additional round trips to the store needed at either the Load or Sync points in the flow
    • Support for multiple co-existing compaction schemas for a given stream (A 'compaction' event/snapshot is an Event). This is done by the FsCodec.IEventCodec
      • Compaction events typically do not get deleted (consistent with how EventStoreDB works), although it is safe to do so in concept (there are no assumptions that the events must be contiguous and/or that the number of events implies a specific version etc)
    • While snapshotting can deliver excellent performance especially when allied with the Cache, it's not a panacea, as noted in this EventStore article on the topic
  • Equinox.MessageDb: Adjacent Snapshots:
    • Maintains snapshot events in an adjacent, separated {Category}:snapshot-{StreamId} stream (in contrast to the EventStoreDb and SqlStreamStore RollingState strategy, which embeds the snapshots directly within the stream in question)
    • Generating & storing the snapshot takes place subsequent to the normal appending of events, once every batchSize events. This means the state of the stream can be reconstructed with exactly 2 round-trips to the database (caching can of course remove the snapshot reads on subsequent calls)
    • Note there's no logic in the system (or in message-db as a whole) to prune snapshots (although it's safe to remove them at any time, including for the 'current' one - a fresh one will get rewritten upon the next successful event append)
  • Equinox.CosmosStore 'Tip with Unfolds' schema:
    • In contrast to Equinox.EventStore's AccessStrategy.RollingSnapshots, when using Equinox.CosmosStore, optimized command processing is managed via the Tip - a document per stream with an identity enabling syncing the read/write position via a single point-read. The Tip maintains the following:
      • It records the current write position for the stream which is used for optimistic concurrency control - i.e. the index at which the next events will be appended for a given stream (events and the Tip share a common logical partition key)
      • It maintains the current unfolds / snapshot data which is deflate+base64 compressed.
      • It can maintain events in a buffer when the tip accumulation limit is reached. The limit is up to a specified count or JSON.stringify length. When the limit is met, events are shifted to a immutable Batch.
    • Has the benefits of the in-stream Rolling Snapshots approach while reducing latency and RU provisioning requirements due to meticulously tuned Request Charge costs:
      • When the stream is empty, the initial Load operation involves a single point read that yields a 404 NotFound response, costing 1.0 RU
      • When coupled with the cache, a typical read is a point read [with IfNoneMatch on an etag], costing 1.0 RU if in-date [to get the 304 Not Modified response] (when the stream is empty, a 404 NotFound response, also costing 1.0 RU)
      • Writes are a single invocation of the Sync stored procedure which:
        • Does a point read
        • Performs a concurrency check
        • Uses that check to apply the write OR returns the conflicting events and unfolds
      • No additional round trips to the store needed at either the Load or Sync points in the flow
    • It should be noted that from a querying perspective, the Tip shares the same structure as Batch documents (a potential future extension would be to carry some events in the Tip as some interim versions of the implementation once did, see also #109.
  • Equinox.CosmosStore RollingState and Custom 'non-event-sourced' modes:
    • Uses 'Tip with Unfolds' encoding to avoid having to write event documents at all. This option benefits from the caching and consistency management mechanisms because the cost of writing and storing infinitely increasing events are removed. Search for transmute or RollingState in the samples and/or see the Checkpoint Aggregate in Propulsion. One chief use of this mechanism is for tracking Summary Event feeds in the dotnet-templates summaryConsumer template.
  • Equinox.DynamoStore:
    • Most features and behaviors are as per Equinox.CosmosStore, with the following key differences:
      • Instead of using a Stored Procedure as CosmosStore does, the implementation involves:
        • conditional PutItem and UpdateItem requests to accumulate events in the Tip (where there is space available).
        • All event writes are guaranteed to first be inserted or appended to the Tip (to guarantee the DynamoDB Streams output is correctly ordered) see #401 and Propulsion #222 🙏 @epNickColeman
        • At the point where the Tip exceeds any of the configured and/or implicit limits, a TransactWriteItems request is used (see implementation in FSharp.AWS.DynamoDB):
          • maximum event count (not limited by default)
          • maximum accumulated event size (default 32KiB)
          • DynamoDB Item Size Limit (hard limit of 400KiB)
      • DynamoDB does not support an etag-checked Read API, which means a cache hit is not as efficient as it is on CosmosDB (and the data hence travels and is deserialized unnecessarily)
      • Concurrency conflicts necessitate an additional roundtrip to resync as the DynamoDB Service does not yield the item in the event of a ConditionalCheckFailedException
        • NOTE: As of 30 June 2023, DDB now supports returning the conflicting events; TODO implement resync without an extra roundtrip via fsprojects/FSharp.AWS.DynamoDB#68
      • Equinox.Cosmos.Core.Events.appendAtEnd/NonIdempotentAppend has not been ported (there's no obvious clean and efficient way to do a conditional insert/update/split as the CosmosDB stored proc can, and this is a low usage feature)
      • The implementation uses the excellent FSharp.AWS.DynamoDB library) (which wraps the standard AWS AWSSDK.DynamoDBv2 SDK Package), and leans on significant preparatory research 🙏 @pierregoudjo
      • CosmosStore dictates (as of V4) that event bodies be supplied as System.Text.Json.JsonElements (in order that events can be included in the Document/ Items as JSON directly. This is also to underscore the fact that the only reasonable format to use is valid JSON; binary data would need to be base64 encoded. DynamoStore accepts and yields event bodies as arbitrary ReadOnlyMemory<byte> BLOBs (the AWS SDK round-trips such blobs as a MemoryStream and does not impose any restrictions on the blobs in terms of required format).
      • CosmosStore defaults to compressing (with System.IO.Compression.DeflateStream) event bodies for Unfolds; DynamoStore round-trips an encoding: int value, which enables the IEventCodec to manage that concern. Regardless, minimizing Request Charges is imperative when request size directly maps to financial charges, 429s, reduced throughput and a lowered scaling ceiling.
    • Azure CosmosDB's ChangeFeed API intrinsically supports replays of all the events in a Store, whereas the DynamoDB Streams facility only retains 24h of actions. As a result, there are ancillary components that provide equivalent functionality composed of:
      • Propulsion.DynamoStore.Lambda: an AWS Lambda that is configured via a DynamoDB Streams Trigger to Index the Events (represented as Equinox Streams, typically in a separated <tableName>-index Table) as they are appended
      • Propulsion.DynamoStore.DynamoStoreSource: consumes the Index Streams akin to how Propulsion.CosmosStore.CosmosStoreSource consumes the CosmosDB Change Feed

Currently Supported Data Stores

  • MemoryStore: In-memory store (volatile, for unit or integration test purposes). Fulfils the full contract Equinox imposes on a store, but without I/O costs (it's ~100 LOC wrapping a ConcurrentDictionary). Also enables take serialization/deserialization out of the picture in tests. See also Propulsion.MemoryStore Change Feed Simulator for integration testing of Reactors.

  • Amazon Dynamo DB: Shares most features with Equinox.CosmosStore (from which it was ported in #321). See above for detailed comparison.

  • Azure Cosmos DB: contains some fragments of code dating back to 2016, however the storage model was arrived at based on intensive benchmarking (squash-merged in #42). The V2 and V3 release lines are being used in production systems. (The V3 release provides support for significantly more efficient packing of events (storing events in the 'Tip')).

  • EventStoreDB: this codebase itself has been in production since 2017 (see commit history), with key elements dating back to approx 2016. Current versions require EventStoreDB Server editions 21.10 or later, and communicate over the modern gRPC interface.

  • MessageDB: bindings for the message-db Postgres event storage system. See MessageDB docs. 🙏 @nordfjord

  • SqlStreamStore: bindings for the powerful and widely used (but presently unmaintained) SQL-backed Event Storage system, derived from the EventStoreDB adapter. See SqlStreamStore docs. 🙏 @rajivhost

    NOTE: The underlying SqlStreamStore project is presently unmaintained; as such its hard to recommend using it for production scenarios

    • For SQL Server, the implementation works, people are believed to be using it to varying degrees and there are no obviously better replacements. However it should be pointed out that something as simple as fixing a bug in a SqlStreamStore.SqlServer library is not a paved path - there is no CI system for it, and nobody to call

    • For MySql, it's pretty much the same as for SQL Server, with the proviso that it's likely that it's had (and has) significantly lower adoption and/or proper scrutiny.

    • For Postgres, Equinox.MessageDb is a better choice as the underlying MessageDB project is actively maintained and has significantly better documentation than SqlStreamStore. The other thing to point out is that Equinox.SqlStreamStore.Postgres does not depend on the final version of SqlStreamStore.Postgres as there are no equivalent releases of the .Mysql and .SqlServer variants.

Components

The components within this repository are delivered as multi-targeted Nuget packages supporting net6.0 (F# >= 6) profiles; each of the constituent elements is designed to be easily swappable as dictated by the task at hand. Each of the components can be inlined or customized easily:-

Core library

  • Equinox NuGet: Store-agnostic decision flow runner that manages the optimistic concurrency protocol and application-level API surface, together with the default [System.Runtime.Caching.MemoryCache-based] Cache implementation. (depends only on FSharp.Core v 6.0.7, FsCodec v 3.0.0, System.Runtime.Caching, Serilog (but not specific Serilog sinks, i.e. you configure to emit to NLog etc)

Serialization support

  • FsCodec Codec NuGet: Defines minimal IEventData, ITimelineEvent and IEventCodec contracts, which are the sole aspects the Stores bind to. No dependencies.
    • FsCodec.IEventCodec: defines a base interface for a serializer/deserializer.
    • FsCodec.Codec: enables plugging in a serializer and/or Union Encoder of your choice (typically this is used to supply a pair of functions:- encode and tryDecode) (depends on nothing
  • FsCodec.NewtonsoftJson Newtonsoft.Json Codec NuGet
  • FsCodec.SystemTextJson SystemTextJson Codec NuGet: Drop in replacement that allows one to target the .NET System.Text.Json serializer solely by changing the referenced namespace.

Data Store libraries

  • Equinox.Core NuGet: Hosts generic utility types frequently useful alongside Equinox: TaskCell, Batcher, BatcherCache, BatcherDictionary. (depends on System.Runtime.Caching)
  • Equinox.MemoryStore MemoryStore NuGet: In-memory store for integration testing/performance base-lining/providing out-of-the-box zero dependency storage for examples. (depends on Equinox)
  • Equinox.CosmosStore CosmosStore NuGet: Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. (depends on Equinox, Equinox, Microsoft.Azure.Cosmos >= 3.35.4, System.Text.Json, FSharp.Control.TaskSeq)
  • Equinox.CosmosStore.Prometheus CosmosStore.Prometheus NuGet: Integration package providing a Serilog.Core.ILogEventSink that extracts detailed metrics information attached to the LogEvents and feeds them to the prometheus-net's Prometheus.Metrics static instance. (depends on Equinox.CosmosStore, prometheus-net >= 3.6.0)
  • Equinox.DynamoStore DynamoStore NuGet: Amazon DynamoDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RC costs, patterned after Equinox.CosmosStore. (depends on Equinox, FSharp.AWS.DynamoDB >= 0.12.0-beta, FSharp.Control.TaskSeq)
  • Equinox.DynamoStore.Prometheus DynamoStore.Prometheus NuGet: Integration package providing a Serilog.Core.ILogEventSink that extracts detailed metrics information attached to the LogEvents and feeds them to the prometheus-net's Prometheus.Metrics static instance. (depends on Equinox.DynamoStore, prometheus-net >= 3.6.0)
  • Equinox.EventStore EventStore NuGet: EventStoreDB Adapter designed to meet Jet's production monitoring requirements. (depends on Equinox, EventStore.Client >= 22.0.0, FSharp.Control.TaskSeq), EventStore Server version 21.10 or later). NO NOT use for new projects - the TCP interface to EventStoreDB has long been deprecated, this package is only provided to ease migration scenarios and will be removed in due course
  • Equinox.EventStoreDb EventStoreDb NuGet: Production-strength EventStoreDB Adapter. (depends on Equinox, EventStore.Client.Grpc.Streams >= 22.0.0, FSharp.Control.TaskSeq, EventStore Server version 21.10 or later)
  • Equinox.MessageDb MessageDb NuGet: MessageDb Adapter. (depends on Equinox, Npgsql >= 7.0.0, FSharp.Control.TaskSeq))
  • Equinox.SqlStreamStore SqlStreamStore NuGet: SqlStreamStore Adapter derived from Equinox.EventStore - provides core facilities (but does not connect to a specific database; see sibling SqlStreamStore.* packages). (depends on Equinox, SqlStreamStore >= 1.2.0-beta.8, FSharp.Control.TaskSeq)
  • Equinox.SqlStreamStore.MsSql MsSql NuGet: SqlStreamStore.MsSql Sql Server Connector implementation for Equinox.SqlStreamStore package). (depends on Equinox.SqlStreamStore, SqlStreamStore.MsSql >= 1.2.0-beta.8)
  • Equinox.SqlStreamStore.MySql MySql NuGet: SqlStreamStore.MySql MySQL Connector implementation for Equinox.SqlStreamStore package). (depends on Equinox.SqlStreamStore, SqlStreamStore.MySql >= 1.2.0-beta.8)
  • Equinox.SqlStreamStore.Postgres Postgres NuGet: SqlStreamStore.Postgres PostgreSQL Connector implementation for Equinox.SqlStreamStore package). (depends on Equinox.SqlStreamStore, SqlStreamStore.Postgres >= 1.2.0-beta.8)

Projection libraries

Equinox does not focus on projection logic - each store brings its own strengths, needs, opportunities and idiosyncrasies. Here's a list of some relevant libraries from sibling projects that get used with regard to this:

  • FsKafka FsKafka NuGet: Wraps Confluent.Kafka to provide efficient batched Kafka Producer and Consumer configurations, with basic logging instrumentation. Used in the propulsion sync kafka tool command; see dotnet new proProjector -k; dotnet new proConsumer to generate a sample app using it (see the BatchedAsync and BatchedSync modules in Examples.fs).
  • Propulsion Propulsion NuGet: A library that provides an easy way to implement projection logic. It defines Propulsion.Streams.StreamEvent used to interop with Propulsion.* in processing pipelines for the proProjector and proSync templates in the templates repo, together with the Ingestion, Streams, Progress and Parallel modules that get composed into those processing pipelines. (depends on Serilog)
  • Propulsion.Cosmos Propulsion.Cosmos NuGet: Wraps the Microsoft .NET ChangeFeedProcessor library providing a processor loop that maintains a continuous query loop per CosmosDB Physical Partition (Range) yielding new or updated documents (optionally unrolling events written by Equinox.CosmosStore for processing or forwarding). (depends on Equinox.Cosmos, Microsoft.Azure.DocumentDb.ChangeFeedProcessor >= 2.2.5)
  • Propulsion.CosmosStore Propulsion.CosmosStore NuGet: Wraps the CosmosDB V3 SDK's Change Feed API, providing a processor loop that maintains a continuous query loop per CosmosDB Physical Partition (Range) yielding new or updated documents (optionally unrolling events written by Equinox.CosmosStore for processing or forwarding). Used in the propulsion sync stats from cosmos tool command; see dotnet new proProjector to generate a sample app using it. (depends on Equinox.CosmosStore)
  • Propulsion.DynamoStore Propulsion.DynamoStore NuGet: Provides a DynamoStoreSource that provides equivalent functionality to Propulsion.CosmosStore in concert with Propulsion.DynamoStore.Lambda; (depends on Equinox.DynamoStore)
  • Propulsion.DynamoStore.Lambda Propulsion.DynamoStore.Lambda NuGet: Indexes events written by Equinox.DynamoStore via a DynamoDB Streams-triggered Lambda. (Depends on Propulsion.DynamoStore)
  • Propulsion.EventStore Propulsion.EventStore NuGet Used in the propulsion sync stats from es tool command; see dotnet new proSync to generate a sample app using it. (depends on Equinox.EventStore)
  • Propulsion.EventStoreDb Propulsion.EventStoreDb NuGet Consumes from EventStoreDB v 21.10 or later using the gRPC interface. (depends on Equinox.EventStoreDb)
  • Propulsion.MessageDb Propulsion.MessageDb NuGet Consumes from a MessageDB store (in Postgres) using Npgsql. (depends on Npgsql, Propulsion.Feed)
  • Propulsion.Kafka Propulsion.Kafka NuGet: Provides a canonical RenderedSpan that can be used as a default format when projecting events via e.g. the Producer/Consumer pair in dotnet new proProjector -k; dotnet new proConsumer. (depends on Newtonsoft.Json >= 11.0.2, Propulsion, FsKafka)

dotnet tool provisioning / benchmarking tool

  • Equinox.Tool Tool NuGet

    • can render events from any of the stores via eqx dump.
    • incorporates a benchmark scenario runner, running load tests composed of transactions in samples/Store and samples/TodoBackend against any supported store; this allows perf tuning and measurement in terms of both latency and transaction charge aspects. (Install via: dotnet tool install Equinox.Tool -g)
    • can configure indices in Azure CosmosDB for an Equinox.CosmosStore Container via eqx init. See here.
    • can search for streams and/or perform a JSON export based on name (p) or Uncompressed unfold values for an Equinox.CosmosStore Container in Azure CosmosDB`. See here.
    • can create tables in Amazon DynamoDB for Equinox.DynamoStore via eqx initaws.
    • can initialize databases for SqlStreamStore via eqx initsql

Starter Project Templates and Sample Applications

Overview

The Propulsion Perspective

Equinox and Propulsion have a Yin and yang relationship; the use cases for both naturally interlock and overlap. It can be relevant to peruse the Propulsion Documentation's Overview Diagrams for the complementary perspective (TL;DR its largely the same topology, with elements that are central here de-emphasized over there, and vice versa)

C4 Context diagram

Equinox focuses on the Consistent Processing element of building an event-sourced system, offering tailored components that interact with a specific Consistent Event Store, as laid out here in this C4 System Context Diagram:

Equinox c4model.com Context Diagram

☝️ Propulsion elements (which we consider External to Equinox) support the building of complementary facilities as part of an overall Application:

  • Ingesters: read stuff from outside the Bounded Context of the System. This kind of service covers aspects such as feeding reference data into Read Models, ingesting changes into a consistent model via Consistent Processing. These services are not acting in reaction to events emanating from the Consistent Event Store, as opposed to...
  • Publishers: react to events as they are arrive from the Consistent Event Store by filtering, rendering and producing to feeds for downstreams (here we label that Publish Simple Notifications). While these services may in some cases rely on synchronous queries via Consistent Processing, it's never transacting or driving follow-on work; which brings us to...
  • Reactors: drive reactive actions triggered by either upstream feeds, or events observed in the Consistent Event Store. These services handle anything beyond the duties of Ingesters or Publishers, and will often drive follow-on processing via Process Managers and/or transacting via Consistent Processing. In some cases, a reactor app's function may be to progressively compose a notification for a Publisher to eventually publish.

C4 Container diagram

The relevant pieces of the above break down as follows, when we emphasize the Containers aspects relevant to Equinox:

Equinox c4model.com Container Diagram

See Overview section in DOCUMENTATION.md for further drill down

TEMPLATES

The best place to start, sample-wise is with the QuickStart, which walks you through sample code, tuned for approachability, from dotnet new templates stored in a dedicated repo.

SAMPLES

The samples/ folder contains various further examples (some of the templates are derived from these), with the complementary goals of:

  • being a starting point to see how one might consume the libraries.
  • acting as Consumer Driven Contracts to validate new and pin existing API designs.
  • providing outline (not official and complete) guidance as to things that are valid to do in an application consuming Equinox components.
  • to validate that each specific Storage implementation can fulfill the needs of each of the example Services/Aggregates/Applications. (unfortunately this concern makes a lot of the DI wiring more complex than a real application should be; it's definitely a non-goal for every Equinox app to be able to switch between backends, even though that's very much possible to achieve.)
  • provide sample scripts referenced in the Tutorial

The repo contains a vanilla ASP.NET Core implementation of the well-known TodoBackend Spec. NB the implementation is largely dictated by spec; no architectural guidance expressed or implied ;). It can be run via:

& dotnet run --project samples/Web -S es # run against eventstore, omit `es` to use in-memory store, or see PROVISIONING EVENTSTORE
start https://www.todobackend.com/specs/index.html?https://localhost:5001/todos # for low-level debugging / validation of hosting arrangements
start https://www.todobackend.com/client/index.html?https://localhost:5001/todos # standard JavaScript UI
start http://localhost:5341/#/events # see logs triggered by `-S` above in https://getseq.net        

The core sample in this repo is the Store sample, which contains code and tests extracted from real implementations (with minor simplifications in some cases).

These facts mean that:

  • some of the code may be less than approachable for a beginner (e.g. some of the code is in its present form for reasons of efficiency)
  • some of the code may not represent official best practice guidance that the authors would necessarily stand over (e.g., the CQRS pattern is not strictly adhered to in all circumstances; some command designs are not completely correct from an idempotency perspective)

While these things can of course be perfected through PRs, this is definitely not top of the work list for the purposes of this repo. (We'd be delighted to place links to other samples, including cleanups / rewrites of these samples written with different testing platforms, web platforms, or DDD/CQRS/ES design flavors right here).

For fun, there's a direct translation of the InventoryItem Aggregate and Command Handler from Greg Young's m-r demo project as one could write it in F# using Equinox. NB any typical presentation of this example includes copious provisos and caveats about it being a toy example written almost a decade ago.

samples/Tutorial (in this repo): Annotated .fsx files with sample aggregate implementations

@ameier38's Tutorial

Andrew Meier has written a very complete tutorial modeling a business domain using Equinox and EventStoreDB; includes Dockerized Suave API, test suite using Expecto, build automation using FAKE, and CI using Codefresh; see the repo and its overview blog post.

QuickStart

  1. Make a scratch area

    mkdir ExampleApp
    cd ExampleApp 
  2. Use a dotnet new template to get fresh code in your repo

    dotnet new -i Equinox.Templates # see source in https://github.com/jet/dotnet-templates
    dotnet new eqxweb -t # -t for todos, defaults to memory store (-m) # use --help to see options regarding storage subsystem configuration etc
  3. Run the TodoBackend:

    dotnet run --project Web
  4. Run the standard TodoMvc frontend against your locally-hosted, fresh backend (See generated README.md for more details)

Spin up a TodoBackend .csproj ... with C# code

While Equinox is implemented in F#, and F# is a great fit for writing event-sourced domain models, the APIs are not F#-specific; there's a C# edition of the template. The instructions are identical to the rest, but you need to use the eqxwebcs template instead of eqxweb.

Store data in EventStore

  1. install EventStore locally (requires admin privilege)

    • For Windows, install with Chocolatey:

      cinst eventstore-oss -y # where cinst is an invocation of the Chocolatey Package Installer on Windows
    • For OSX, install with brew cask install eventstore

  2. start the local EventStore instance on any OS:

    • Check out the github.com/jet/equinox repo
    • docker compose up

    For more complete instructions, follow https://developers.eventstore.com/server/v21.10/installation.html#use-docker-compose

  3. generate sample app with EventStore wiring from template and start

    dotnet new eqxweb -t -e # -t for todos, -e for eventstore
    dotnet run --project Web
  4. browse writes at http://localhost:2113/web/index.html#/streams

Store data in Azure CosmosDB

  1. export 3x env vars (see provisioning instructions)

    $env:EQUINOX_COSMOS_CONNECTION="AccountEndpoint=https://....;AccountKey=....=;"
    $env:EQUINOX_COSMOS_DATABASE="equinox-test"
    $env:EQUINOX_COSMOS_CONTAINER="equinox-test"
  2. use the eqx tool to initialize the database and/or container (using preceding env vars)

    dotnet tool uninstall Equinox.Tool -g
    dotnet tool install Equinox.Tool -g --prerelease
    eqx init -ru 400 cosmos # generates a database+container, adds optimized indexes
  3. generate sample app from template, with CosmosDB wiring

    dotnet new eqxweb -t -c # -t for todos, -c for cosmos
    dotnet run --project Web
  4. Use the eqx tool to dump stats relating the contents of the CosmosDB store

    # run queries to determine how many streams, docs, events there are in the container
    eqx -VC stats -SDEP cosmos # -P to run in parallel # -V -C to show underlying query being used
  5. Use the eqx tool to query streams and/or snapshots in a CosmosDB store

    # Add indexing of the `u`nfolds borne by Tip Items: 1) `c` for the case name 2) `d` for fields of uncompressed unfolds 
    eqx init -m serverless --indexunfolds cosmos -d db -c $EQUINOX_COSMOS_VIEWS
    
    # query all streams LIKE "$User-%" with `Snapshotted2` unfolds. Batches of up to 100,000 events
    eqx query -cn '$User' -un Snapshotted2 cosmos -d db -c $EQUINOX_COSMOS_VIEWS -b 100000
    
    # use a wild card (LIKE) for the stream name 
    eqx query -cl '$Us%' -un Snapshotted cosmos -d db -c $EQUINOX_COSMOS_VIEWS -b 100000
    # > Querying Default: SELECT c.u, c.p, c._etag FROM c WHERE c.p LIKE "$Us%" AND EXISTS (SELECT VALUE u FROM u IN c.u WHERE u.c = "Snapshotted") {}
    # > Page 7166s, 7166u, 0e 320.58RU 3.9s {}
    # > Page 1608s, 1608u, 0e 68.59RU 0.9s {}
    # > TOTALS 1c, 8774s, 389.17RU 4.7s {}   
    
    # Skip loading the _etag to simulate a query where you will only render the result (not `Transact` against it)
    eqx query -cn '$User' -m readOnly -un Snapshotted cosmos -d db -c $EQUINOX_COSMOS_VIEWS -b 100000
    # > Querying ReadOnly: SELECT c.u FROM c WHERE c.p LIKE "$User-%" AND EXISTS (SELECT VALUE u FROM u IN c.u WHERE u.c = "Snapshotted") {}
    # > Page 8774s, 8774u, 0e 342.33RU 3.8s {}
    # > TOTALS 0c, 8774s, 342.33RU 3.8s {} # 👈 cheaper and only one batch as no .p or ._etag 
    
    # add criteria filtering based on an Uncompressed Unfold
    eqx query -cn '$User' -un EmailIndex -uc 'u.d.email = "[email protected]"' cosmos -d db -c $EQUINOX_COSMOS_VIEWS -b 100000
    # > Querying Default: SELECT c.u, c.p, c._etag FROM c WHERE c.p LIKE "$User-%" AND EXISTS (SELECT VALUE u FROM u IN c.u WHERE u.c = "EmailIndex" AND u.d.email = "[email protected]") {}
    # > Page 0s, 0u, 0e 2.8RU 0.7s {}
    # > TOTALS 0c, 0s, 2.80RU 0.7s {} # 👈 only 2.8RU if nothing is returned
    
    # DUMP ONE STREAM TO A FILE (equivalent to queries performed by CosmosStore.AccessStrategy.Unoptimized)
    # Can be imported into another store via `propulsion sync cosmos from json`
    eqx query -sn 'user-f28fb6feea00550e93ca77b6f29899cd' -o dump-user.json cosmos -d db -c $EQUINOX_COSMOS_CONTAINER -b 9999
    # > Dumping Raw content to ./dump-user.json {}
    # > Querying Raw: SELECT * FROM c WHERE c.p = "user-f28fb6feea00550e93ca77b6f29899cd" AND 1=1 {}
    # > Page 9s, 1u, 10e 3.23RU 0.5s 0.0MiB age 0002.10:04:13 {} # 👈 2.80 if no results, adds per KiB charge if there are results 
    # > TOTALS 1c, 9s, 3.23RU R/W 0.0/0.0MiB 3.9s {}
    
    # DUMP FULL CONTENT OF THE CONTAINER TO A FILE
    # Can be imported into another store via `propulsion sync cosmos from json`
    eqx query -o ../dump-240216.json cosmos -d db -c $EQUINOX_COSMOS_CONTAINER -b 9999                             
    # > Dumping Raw content to ~/dumps/dump-240216.json {}
    # > No StreamName or CategoryName/CategoryLike specified - Unfold Criteria better be unambiguous {}
    # > Querying Raw: SELECT * FROM c WHERE 1=1 AND 1=1 {}
    # > Page 2972s, 748u, 3112e 108.9RU 3.8s 4.0MiB age 0212.18:00:45 {}
    # > Page 3211s, 777u, 3161e 112.29RU 3.3s 4.0MiB age 0212.09:06:02 {}
    # > Page 3003s, 663u, 3172e 110.33RU 3.4s 4.0MiB age 0211.04:09:12 {}
    # <chop>
    # > Page 2768s, 498u, 3153e 107.46RU 3.0s 4.0MiB age 0016.13:09:02 {}
    # > Page 2806s, 505u, 3198e 107.17RU 3.0s 4.0MiB age 0010.18:52:45 {}
    # > Page 2903s, 601u, 3188e 107.53RU 3.1s 4.0MiB age 0004.05:24:51 {}
    # > Page 2638s, 316u, 3019e 93.09RU 2.5s 3.4MiB age 0000.05:08:38 {}
    # > TOTALS 11c, 206,356s, 7,886.75RU R/W 290.4/290.4MiB 225.3s {}
  6. Use propulsion sync tool to run a CosmosDB ChangeFeedProcessor

    dotnet tool uninstall Propulsion.Tool -g
    dotnet tool install Propulsion.Tool -g --prerelease
    
    propulsion init -ru 400 cosmos # generates a -aux container for the ChangeFeedProcessor to maintain consumer group progress within
    # -V for verbose ChangeFeedProcessor logging
    # `-g projector1` represents the consumer group - >=1 are allowed, allowing multiple independent projections to run concurrently
    # stats specifies one only wants stats regarding items (other options include `kafka` to project to Kafka)
    # cosmos specifies source overrides (using defaults in step 1 in this instance)
    propulsion -V sync -g projector1 stats from cosmos
  7. Generate a CosmosDB ChangeFeedProcessor sample .fsproj (without Kafka producer/consumer), using Propulsion.CosmosStore

    dotnet new -i Equinox.Templates
    
    # note the absence of -k means the projector code will be a skeleton that does no processing besides counting the events
    dotnet new proProjector
    
    # start one or more Projectors
    # `-g projector2` represents the consumer group; >=1 are allowed, allowing multiple independent projections to run concurrently
    # cosmos specifies source overrides (using defaults in step 1 in this instance)
    dotnet run -- -g projector2 cosmos
  8. Use propulsion tool to Run a CosmosDB ChangeFeedProcessor, emitting to a Kafka topic

    $env:PROPULSION_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b	
    # `-V` for verbose logging	
    # `projector3` represents the consumer group; >=1 are allowed, allowing multiple independent projections to run concurrently	
    # `-l 5` to report ChangeFeed lags every 5 minutes	
    # `kafka` specifies one wants to emit to Kafka	
    # `temp-topic` is the topic to emit to	
    # `cosmos` specifies source overrides (using defaults in step 1 in this instance)	
    propulsion -V sync -g projector3 -l 5 kafka temp-topic from cosmos	
  9. Generate CosmosDB Kafka Projector and Consumer .fsprojects (using Propulsion.Kafka)

    cat readme.md # more complete instructions regarding the code
    
    # -k requests inclusion of Apache Kafka support
    md projector | cd
    dotnet new proProjector -k
    
    # start one or more Projectors (see above for more examples/info re the Projector.fsproj)
    
    $env:PROPULSION_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b
    $env:PROPULSION_KAFKA_TOPIC="topic0" # or use -t
    dotnet run -- -g projector4 -t topic0 cosmos
    
    # generate a consumer app
    md consumer | cd
    dotnet new proConsumer
    
    # start one or more Consumers
    $env:PROPULSION_KAFKA_GROUP="consumer1" # or use -g
    dotnet run -- -t topic0 -g consumer1
  10. Generate an Archive container; Generate a ChangeFeedProcessor App to mirror desired streams from the Primary to it

    # once
    eqx init -ru 400 cosmos -c equinox-test-archive
    
    md archiver | cd
    
    # Generate a template app that'll sync from the Primary (i.e. equinox-test)
    # to the Archive (i.e. equinox-test-archive)
    dotnet new proArchiver
    
    # TODO edit Handler.fs to add criteria for what to Archive
    # - Normally you won't want to Archive stuff like e.g. `Sync-` checkppoint streams
    # - Any other ephemeral application streams can be excluded too
    
    # -w 4 # constrain parallel writers in order to leave headroom for readers; Archive container should be cheaper to run
    # -S -t 40 # emit log messages for Sync calls costing > 40 RU
    # -md 20 (or lower) is recommended to be nice to the writers - the archiver can afford to lag
    dotnet run -c Release -- -w 4 -S -t 40 -g ArchiverConsumer `
      cosmos -md 20 -c equinox-test -a equinox-test-aux `
      cosmos -c equinox-test-archive 
  11. Use a ChangeFeedProcessor driven from the Archive Container to Prune the Primary

    md pruner | cd
    
    # Generate a template app that'll read from the Archive (i.e. equinox-test-archive)
    # and prune expired events from the Primary (i.e. equinox-test)
    dotnet new proPruner
    
    # TODO edit Handler.fs to add criteria for what to Prune
    # - While its possible to prune the minute it's archived, normally you'll want to allow a time lag before doing so
    
    # -w 2 # constrain parallel pruners in order to not consume RUs excessively on Primary
    # -md 10 (or lower) is recommended to contrain consumption on the Archive - Pruners lagging is rarely critical
    dotnet run -c Release -- -w 2 -g PrunerConsumer `
      cosmos -md 10 -c equinox-test-archive -a equinox-test-aux `
      cosmos -c equinox-test

SqlStreamStore is provided in the samples and the eqx tool:

  • being able to supply ms, my, pg flag to eqx loadtest, e.g. eqx loadtest -t cart -f 50 -d 5 -C -U ms -c "sqlserverconnectionstring" -s schema
  • being able to supply ms, my, pg flag to eqx dump, e.g. eqx dump -CU "Favorites-ab25cc9f24464d39939000aeb37ea11a" ms -c "sqlserverconnectionstring" -s schema
  • being able to supply ms, my, pg flag to Web sample, e.g. dotnet run --project samples/Web/ -- my -c "mysqlconnectionstring"
  • being able to supply ms, my, pg flag to new eqx initsql command e.g. eqx initsql pg -c "postgresconnectionstring" -u p "usercredentialsNotToBeLogged" -s schema
cd ~/code/equinox

# set up the DB/schema
dotnet run --project tools/Equinox.Tool -- initsql pg -c "connectionstring" -p "u=un;p=password" -s "schema"

# run a benchmark
dotnet run -c Release --project tools/Equinox.Tool -- loadtest -t saveforlater -f 50 -d 5 -C -U pg -c "connectionstring" -p "u=un;p=password" -s "schema"

# run the webserver, -A to autocreate schema on connection
dotnet run --project samples/Web/ -- my -c "mysqlconnectionstring" -A

# set up the DB/schema
eqx initsql pg -c "connectionstring" -p "u=un;p=password" -s "schema"

# run a benchmark
eqx loadtest -t saveforlater -f 50 -d 5 -C -U pg -c "connectionstring" -p "u=un;p=password" -s "schema" 
eqx dump "SavedForLater-ab25cc9f24464d39939000aeb37ea11a" pg -c "connectionstring" -p "u=un;p=password" -s "schema" # show stored JSON (Guid shown in eqx loadtest output) 

MessageDb support is provided in the samples and the eqx tool:

  • being able to supply mdb flag to eqx loadtest, e.g. eqx loadtest -f 50 -d 5 -C -U mdb -c "pgconnectionstring"
  • being able to supply mdb flag to eqx dump, e.g. eqx dump -CU "Favorites-ab25cc9f24464d39939000aeb37ea11a" mdb -c "pgconnectionstring"
  • being able to supply mdb flag to Web sample, e.g. dotnet run --project samples/Web/ -- mdb -c "pgconnectionstring"

Equinox does not provide utilities for configuring or installing MessageDB. See MessageDB's installation documentation.

In addition to the default access strategy of reading the whole stream forwards in batches, the following access strategies are supported in MessageDb:

AccesStrategy.LatestKnownEvent

  • Uses message-db's get_last_stream_message API to only ever fetch the last event in a stream
  • This is useful for aggregates whose entire state can be constructed from the latest event (e.g. a stream that stores checkpoints)
  • NOTE: The last event should be decodable by the supplied codec. Otherwise you'll receive the initial state.

AccessStrategy.AdjacentSnapshots

  • Generates and stores a snapshot event in an adjacent {Category}:snapshot-{StreamId} stream
  • The generation happens every batchSize events. This means the state of the stream can be reconstructed with exactly 2 round-trips to the database.
    • The first round-trip fetches the most recent event of type snapshotEventCaseName from the snapshot stream.
    • The second round-trip fetches batchSize events from the position of the snapshot

DynamoDB is supported in the samples and the eqx tool equivalent to the CosmosDB support as described:

  • being able to supply dynamo source to eqx loadtest wherever cosmos works, e.g. eqx loadtest -t cart -f 50 -d 5 -CU dynamo -s http://localhost:8000 -t TableName
  • being able to supply dynamo flag to eqx dump, e.g. eqx dump -CU "Favorites-ab25cc9f24464d39939000aeb37ea11a" dynamo
  • being able to supply dynamo flag to Web sample, e.g. dotnet run --project samples/Web/ -- dynamo -s http://localhost:8000
  • being able to supply dynamo flag to eqx initaws command e.g. eqx initaws -r 10 -w 10 -s new dynamo -t TableName
  1. The tooling and samples in this repo default to using the following environment variables (see AWS CLI UserGuide for more detailed guidance as to specific configuration)

    $env:EQUINOX_DYNAMO_SERVICE_URL="https://dynamodb.us-west-2.amazonaws.com" # Simulator: "http://localhost:8000"
    $env:EQUINOX_DYNAMO_ACCESS_KEY_ID="AKIAIOSFODNN7EXAMPLE"
    $env:EQUINOX_DYNAMO_SECRET_ACCESS_KEY="AwJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
    $env:EQUINOX_DYNAMO_TABLE="equinox-test"
    $env:EQUINOX_DYNAMO_TABLE_ARCHIVE="equinox-test-archive"
  2. Tour of the tools/samples:

    cd ~/code/equinox
    
    # start the simulator at http://localhost:8000 and an admin console at http://localhost:8001/
    docker compose up dynamodb-local dynamodb-admin -d
    
    # Establish the table in us-east-1 - keys come from $EQUINOX_DYNAMO_ACCESS_KEY_ID and $EQUINOX_DYNAMO_SECRET_ACCESS_KEY
    dotnet run --project tools/Equinox.Tool -- initaws -r 10 -w 10 -s new dynamo -t TableName -su https://dynamodb.us-east-1.amazonaws.com
    
    # Check the status and get the streams ARN - keys come from AWS SDK config for us-east-1 region
    dotnet run --project tools/Equinox.Tool -- stats dynamo -t TableName -sr us-east-1
    
    # run a benchmark
    dotnet run -c Release --project tools/Equinox.Tool -- loadtest -t saveforlater -f 50 -d 5 -CU dynamo
    
    # run the webserver
    dotnet run --project samples/Web/ -- dynamo -t TableName
    
    # run a benchmark connecting to the webserver
    eqx loadtest -t saveforlater -f 50 -d 5 -CU web
    eqx dump "SavedForLater-ab25cc9f24464d39939000aeb37ea11a" dynamo # show stored JSON (Guid shown in eqx loadtest output) 
  3. Useful articles

BENCHMARKS

A key facility of this repo is being able to run load tests, either in process against a nominated store, or via HTTP to a nominated instance of samples/Web ASP.NET Core host app. The following test suites are implemented at present:

  • Favorite - Simulate a very enthusiastic user that favorites something once per second
    • the test generates an ever-growing state that can only be managed efficiently if you apply either caching, snapshotting or both
    • NB due to being unbounded, Snapshot and MultiSnapshot etc. (even RollingState or Custom) will eventually hit the Store's limits (4MB/event for EventStore, 3MB/Item (document) for CosmosDB)
  • SaveForLater - Simulate a happy shopper that saves 3 items per second, and empties the Save For Later list whenever it is full (when it hits 50 items)
    • Snapshotting helps a lot
    • Caching is not as essential as it is for the Favorite test (as long as you have either caching or snapshotting, that is)
  • Todo - Keeps a) getting the list b) adding an item c) clearing the list when it hits 1000 items.
    • the Cleared event acts as a natural event to use in the isOrigin check. This makes snapshotting less crucial than it is, for example, in the case of the Favorite test
    • the -s parameter can be used to adjust the maximum item text length from the default (100, implying average length of 50)

BUILDING

Please note the QuickStart is probably the best way to gain an overview - these instructions are intended to illustrated various facilities of the build script for people making changes.

build and run

Run, including running the tests that assume you've got a local EventStore and pointers to a CosmosDB database and container prepared (see PROVISIONING):

./build.ps1

build, skipping tests that require a Store instance

./build -s

build, skipping all tests

dotnet pack build.proj

build, skip EventStore tests

./build -se

build, skip EventStore tests, skip auto-provisioning / de-provisioning CosmosDB

./build -se -scp

Run EventStore benchmark on .NET Core (when provisioned)

At present, .NET Core seems to show comparable perf under normal load, but becomes very unpredictable under load. The following benchmark should produce pretty consistent levels of reads and writes, and can be used as a baseline for investigation:

& dotnet run -c Release --project tools/Equinox.Tool -- loadtest -t saveforlater -f 1000 -d 5 -C -U es

run Web benchmark

The CLI can drive the Store and TodoBackend samples in the samples/Web ASP.NET Core app. Doing so requires starting a web process with an appropriate store (EventStore in this example, but can be memory / omitted etc. as in the other examples)

in Window 1

& dotnet run -c Release --project samples/Web -- -C -U es

in Window 2

dotnet tool install -g Equinox.Tool --prerelease # only once
eqx loadtest -t saveforlater -f 200 web

run CosmosDB benchmark (when provisioned)

dotnet run --project tools/Equinox.Tool -- loadtest `
  cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER

PROVISIONING

Provisioning EventStore (when not using -s or -se)

There's a docker-compose.yml file in the root, so installing docker-compose and then running docker-compose up rigs a local 3-node cluster, which is assumed to be configured for Equinox.EventStore.Integration and Equinox.EventStoreDb.Integration

For more complete instructions, follow https://developers.eventstore.com/server/v21.10/installation.html#use-docker-compose

Provisioning CosmosDB (when not using build.ps1 -sc to skip verification)

Using Azure Cosmos DB Service

dotnet run --project tools/Equinox.Tool -- init -ru 400 `
    cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER
# Same for a Archive Container for integration testing of the archive store fallback mechanism
$env:EQUINOX_COSMOS_CONTAINER_ARCHIVE="equinox-test-archive"
dotnet run --project tools/Equinox.Tool -- init -ru 400 `
    cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER_ARCHIVE

Using Cosmos Emulator on an Intel Mac

NOTE There's no Apple Silicon emulator available as yet.

NOTE Have not tested with the Windows Emulator, but it should work with analogous steps.

docker compose up equinox-cosmos -d
bash docker-compose-cosmos.sh

Provisioning SqlStreamStore

There's a docker-compose.yml file in the root, so installing docker-compose and then running docker-compose up rigs local equinox-mssql, equinox-mysql and equinox-postgres servers and databases at known ports. NOTE The Equinox.SqlStreamStore.*.Integration suites currently assume this is in place and will otherwise fail.

DEPROVISIONING

Deprovisioning (aka nuking) EventStore data resulting from tests to reset baseline

While EventStore rarely shows any negative effects from repeated load test runs, it can be useful for various reasons to drop all the data generated by the load tests by casting it to the winds:-

# requires admin privilege
rm $env:ProgramData\chocolatey\lib\eventstore-oss\tools\data

Deprovisioning CosmosDB

The provisioning step spins up RUs in CosmosDB for the Container, which will keep draining your account until you reach a spending limit (if you're lucky!). When finished running any test, it's critical to drop the RU allocations back down again via some mechanism (either delete the container or reset the RU provision down to the lowest possible value).

  • Kill the container and/or database
  • Use the portal to change the allocation

RELEASING

*The perfect is the enemy of the good; all this should of course be automated, but the elephant will be consumed in small bites rather than waiting till someone does it perfectly. This documents the actual release checklist as it stands right now. Any small helping bites much appreciated 🙏 *

Tagging releases

This repo uses MinVer; see here for more information on how it works.

All non-alpha releases derive from tagged commits on master or vX branch. The tag defines the nuget package id etc. that the release will bear (dotnet pack uses the MinVer package to grab the value from the commit)

Checklist

  • 😢 the Azure Pipelines script does not run the integration tests, so these need to be run manually via the following steps:

    • Provision:
      • Set environment variables x 4 for a CosmosDB database and container (you might need to eqx init)
      • Add a EQUINOX_COSMOS_CONTAINER_ARCHIVE environment variable referencing a separate (eqx init initialized) CosmosDB Container that will be used to house fallback events in the Fallback mechanism's tests
      • docker-compose up to start
        • 3 servers for the SqlStreamStore.*.Integration test suites (NOTE: manual step for MS SQL)
        • 3 EventStoreDB cluster nodes
        • DynamoDB local and admin images and (dynamodb-local, dynamodb-admin)
    • Run ./build.ps1 in PowerShell (or PowerShell Core on MacOS via brew install cask pwsh)
  • CHANGELOG should be up to date

  • commit should be tagged (remember to do git push --tags when pushing)

  • after the push has resulted in a successful build, click through from the commit on github thru to the Azure Pipelines build state and verify all artifacts bear the correct version suffix (if the tags were not pushed alongside the commit, they can be wrong). Then, and only then, do the Release (which will upload to nuget.org using a nuget API key that has upload permissions for the packages)

  • When adding new packages: For safety, the NuGet API Key used by the Azure DevOps Releases step can only upload new versions of existing packages. As a result, the first version of any new package needs to be manually uploaded out of band. (then invite jet.com to become owner so subsequent releases can do an automated upload [after the request has been (manually) accepted])

FAQ

What is Equinox?

OK, I've read the README and the tagline. I still don't know what it does! Really, what's the TL;DR ?

  • supports storing events in EventStore, including working with existing data you may have (that's where it got its start)
  • includes a proprietary optimized Store implementation that only needs an empty Azure CosmosDB Account or Amazon DynamoDB Table to get going
  • provides all the necessary infrastructure to build idempotent synchronous command processing against all of the stores; your Domain code intentionally doesn't need to reference any Equinox modules whatsoever (although for smaller systems, you'll often group Events+Fold+interpret/decide+Service in a single module, which implies a reference to the core Equinox package).
  • following on from the previous point: you just write the unit tests without any Equinox-specific hoops to jump through; this really works very well indeed, assuming you're writing the domain code and the tests in F#. If you're working in a more verbose language, you may end up building some test helpers. We don't envisage Equinox mandating a specific pattern on the unit testing side (consistent naming such as Events.Event+evolve+fold+Command+interpret/decide can help though).
  • it helps with integration testing decision processes by
    • staying out of your way as much as possible
    • providing an in-memory store that implements the same interface as the concrete stores (CosmosDB, EventStore, etc.) stores do
  • There is a projection story, but it's not baked in - any 3 proper architects can come up with at least 3 wrong and 3 right ways of running those:-
    • For EventStore, you can use its' projections facilities directly. There's also a Propulsion.EventStore that serves the needs of dotnet new proSync..
    • for CosmosDB, you use the Propulsion.CosmosStore libraries to consume the CosmosDB ChangeFeed using the Microsoft.Azure.Cosmos library's change feed support (and, optionally, project to/consume from Kafka) using the sample app templates (dotnet new proProjector).

Should I use Equinox to learn event sourcing ?

You could. However the Equinox codebase itself is not designed to be a tutorial; it's extracted from production systems and optimized; there is no pedagogical mission. FsUno.Prod on the other hand has this specific intention, walking though that is highly recommended. Also EventStore, being a widely implemented and well-respected open source system has some excellent learning materials and documentation with a wide usage community (search for DDD-CQRS-ES Discord).

Having said that, we'd love to see a set of tutorials written by people looking from different angles, and over time will likely do one too ... there's no reason why the answer to this question can't become "of course!"

Can I use it for really big projects?

You can. Folks in Jet do; we also have systems where we have no plans to use it, or anything like it. That's OK; there are systems where having precise control over one's data access is critical. And (shush, don't tell anyone!) some find writing this sort of infrastructure to be a very fun design challenge that beats doing domain modelling any day...

Can I use it for really small projects and tiny microservices?

You can. Folks in Jet do; but we also have systems where we have no plans to use it, or anything like it as it would be overkill even for people familiar with Equinox.

OK, but should I use Equinox for a small project ?

You'll learn a lot from building your own equivalent wrapping layer. Given the array of concerns Equinox is trying to address, there's no doubt that a simpler solution is always possible if you constrain the requirements to specifics of your context with regard to a) scale b) complexity of domain c) degree to which you use or are likely to use >1 data store. You can and should feel free to grab slabs of Equinox's implementation and whack it into an Infrastructure.fs in your project too (note you should adhere to the rules of the Apache 2 license). If you find there's a particular piece you'd really like isolated or callable as a component and it's causing you pain as you're using it over and over in ~ >= 3 projects, please raise an Issue though !

Having said that, getting good logging, some integration tests and getting lots of off-by-one errors off your plate is nice; the point of DDD-CQRS-ES is to get beyond toy examples to the good stuff - Domain Modelling on your actual domain.

What client languages are supported ?

The main language in mind for consumption is of course F# - many would say that F# and event sourcing are a dream pairing; little direct effort has been expended polishing it to be comfortable to consume from other .NET languages, the dotnet new eqxwebcs template represents the current state. In Equinox V4, the DeciderCore interface offers an interface that uses C#-friendly Task and Func types (compared to Decider, which uses async and curried function signatures to provide an idiomatic F# experience, which is possible, but cumbersome to use from C#)

You say I can use volatile memory for integration tests, could this also be used for learning how to get started building event sourcing programs with equinox?

The MemoryStore is intended to implement the complete semantics of a durable store (aside from caching). The main benefit of using it is that any tests using it have zero environment dependencies. In some cases this can be very useful for demo apps or generators (rather than assuming a specific store at a specific endpoint and/or credentials, there is something to point at which does not require configuration or assumptions.). The single problem of course is that it's all in-process; the minute you stop the host, the items on your list will of course disappear. In general, EventStore is also an attractive option for prototyping; the open source edition is trivial to install and has a Web UI that lets you navigate events being produced etc.

OK, so it supports CosmosDB, DynamoDB, EventStoreDB, MessageDB and SqlStreamStore and might even support more in the future. I really don't intend to shift datastores. Period. Why would I take on this complexity only to get the lowest common denominator ?

Yes, you have decisions to make; Equinox is not a panacea - there is no one size fits all. While the philosophy of Equinox is a) provide an opinionated store-neutral Programming Model with a good pull toward a big pit of success, while not closing the door to using store-specific features where relevant, having a dedicated interaction is always going to afford you more power and control.

Is there a guide to building the simplest possible hello world "counter" sample, that simply counts with an add and a subtract event?

Yes; Counter.fsx in th Tutorial project in this repo. It may also be worth starting with the API Guide in DOCUMENTATION.md. An alternate way is to look at the Todo.fs files emitted by dotnet new equinoxweb in the QuickStart.

Why do the snapshots go in the same stream in Equinox.EventStore and Equinox.SqlStreamStore ? 🙏 @chrisjhoare

I've been looking through the snapshotting code recently. Can see the snapshot events go in the same stream as regular events. Presume this is to save on operations to read/write the streams? and a bit less overhead maintaining two serializers? Are there any other advantages? I quite like it this way but think i saw the geteventstore advice was separate streams so just interested in any other reasoning behind it

The reason GES recommends against is that the entire db is built on writing stuff once in an append only manner (which is a great design from most aspects). This means your choices are:

  • embed snapshots in the same stream, but do that sparingly, as you can't delete them (implemented in Equinox as AccessStrategy.RollingSnapshots)
  • keep snapshots elsewhere (typically in a sister stream with the max items set to 1
    • which the EventStoreDB background scavenging process will tidy up (but it ain't free)
    • which is a separate roundtrip (which is not the end of the world in GES but is still another thing to go wrong)
    • which can't be written as a transaction, i.e. you'd need to write the snapshot after (and only after) a successful write (and worry about inconsistency) That general advice/trade-offs on snapshotting applies to most systems.

The answer as to why that strategy is available in in Equinox.EventStore is for based on use cases (the second strategy was actually implemented in a bespoke manner initially by @eiriktsarpalis:

  • streams like Favorites where every event is small (add sku, drop sku), and the snapshot is pretty compact (list of skus) (but note it is ever growing)
  • streams like SavedForLater items where the state rolls over regularly - even after 5 years and 1000s of items moving in and out, there's a constraint of max 50 items which makes a snapshot pretty light. (The other trick is that a Cleared event counts as a valid starting state for the fold - and we don't write a snapshot if we have one of those)

The big win is latency in querying contexts - given that access strategy, you're guaranteed to be able to produce the full state of the aggregate with a single roundtrip (if max batch size is 200, the snapshots are written every 200 items so reading backward 200 guarantees a snapshot will be included)

The secondary benefit is of course that you have an absolute guarantee there will always be a snapshot, and if a given write succeeds, there will definitely be a snapshot in the maxBatchSize window (but it still copes if there isn't - i.e. you can add snapshotting after the fact)

Equinox.SqlStreamStore implements this scheme too - it's easier to do things like e.g. replace the bodies of snapshot events with nulls as a maintenance task in that instance

Initially, Equinox.CosmosStore implemented the same strategy as the Equinox.EventStore (it started as a cut and paste of the it). However the present implementation takes advantage of the fact that in a Document Store, you can ... update documents - thus, snapshots (termed unfolds) are saved in a custom field (it's an array) in the Tip document - every update includes an updated snapshot (which is zipped to save read and write costs) that overwrites the unfolds entirely. You're currently always guaranteed that the snapshots are in sync with the latest event by virtue of how the stored proc writes. The DynamoDB impl follows the same strategy.

I expand (too much!) on some more of the considerations in https://github.com/jet/equinox/blob/master/DOCUMENTATION.md

The other thing that should be pointed out is the caching can typically cover a lot of perf stuff as long as stream lengths stay sane - Snapshotting (esp polluting the stream with snapshot events should definitely be toward the bottom of your list of tactics for managing a stream efficiently given long streams are typically a design smell)

NOTE The newer Equinox.MessageDb store binding implements snapshotting as separated events in a separate category.

Changing Access / Representation strategies in Equinox.CosmosStore - what happens?

Does Equinox adapt the stream if we start writing with Equinox.CosmosStore.AccessStrategy.RollingState and change to Snapshotted for instance? It could take the last RollingState writing and make the first snapshot ?

what about the opposite? It deletes all events and start writing RollingState ?

TL;DR yes and no respectively

Some context

Firstly, it's recommended to read the documentation section on Access Strategies

General rules:

  • Events are the atoms from which state is built, they live forever in immutable Batch documents.
  • There is a special Batch with id = "-1", entitled the Tip.
  • Snapshots/unfolds live in the .u array in the Tip doc. loading/build of state is composed of
  • regardless of what happens, Events are never destroyed, updated or touched in any way, ever. Having said that, if your Event DU does not match them, they're also as good as not there from the point of view of how State is established.
  • Reads always get the Tip first (one exception: Unoptimized mode skips reading the Tip as, by definition, you're not using snapshots/unfolds/any tricks), Writes always touch the Tip (yes, even in Unoptimized mode; there's no such thing as a stream that has ever been written to that does not have a Tip).
  • In the current implementation, the calling code in the server figures out everything that's going to go in the snapshots unfolds list if this sync is successful.

The high level skeleton of the loading in a given access strategy is: a) load and decode unfolds from tip (followed by events, if and only if necessary) b) offer the events to an isOrigin function to allow us to stop when we've got a start point (a Reset Event, a relevant snapshot, or, failing that, the start of the stream)

It may be helpful to look at how an AccessStrategy is mapped to isOrigin, toSnapshot and transmute lambdas internally

Aaand answering the question

Whenever a State is being built, it always loads Tip first and shows any events snapshots unfolds in there...

If isOrigin says no to those and/or the EventTypes of those unfolds are not in the union / event type to which the codec is mapping, the next thing is a query backwards of the Batches of events, in order.

All those get pushed onto a stack until we either hit the start, or isOrigin says - yes, we can start from here (at which point all the decoded events are then passed (in forward order) to the fold to make the 'state).

So, if you are doing RollingState or any other mode, there are still events and unfolds; and they all have EventTypes - there are just some standard combos of steps that happen.

If the EventType of the Event or Unfold matches, the fold/evolve will see them and build 'state from that.

Then, whenever you emit events from a decide or interpret, the AccessStrategy will define what happens next; a mix of:

  • write actual events (not if RollingState)
  • write updated unfolds/snapshots
  • remove or adjust events before they get passed down to the sync stored procedure (Custom, RollingState, LatestKnownEvent modes)

Ouch, not looking forward to reading all that logic :frown: ? Have a read, it's really not that 😱.

Help me understand how the expectedVersion is used with EventStoreDB - it seems very confusing 🙏 @dharmaturtle

I'm having some trouble understanding how Equinox+ESDB handles "expected version". Most of the examples use Equinox.Decider.Transact which is storage agnostic and doesn't offer any obvious concurrency checking. In Equinox.EventStore.Context, there's a Sync that takes a Token which holds a streamVersion. Should I be be using that instead of Transact?

The bulk of the implementation is in Equinox/Stream.fs, see the let run function.

There are sequence diagrams in Documentation MD but I'll summarize here:

  • As you suggest, Transact is definitely the API you want to be be using
  • The assumption in Equinox is that you always want to do a version check - if you don't, you can't process idempotently, why incur the cost of an ordered append only store? (there is a lower Sync operation which does a blind write to the store in Equinox.CosmosStore which allows you to do a non-version-checked write in that context (its implemented and exposed as the stored procedure needs to handle the concept). For EventStoreDB, if you have such a special case, you can use its APIs directly)
  • The inner API with the Sync is the 'store interface' which represents the actual processing needed to do a version-checked write (The Sync one does not handle retries and is only used for the last attempt, when there are no subsequent retries)
  • The main reason for the separation is that no ephemeral state is held by Equinox in anything like e.g. Unit Of Work during the course of a decide function being invoked - the (token,state) tuple represents all the things known at the point of loading, and the Sync can use anything it stashed in there when it has proposed events passed to it, as the contract involves the caller resupplying that context.
  • Another consideration is that its easy to introduce off by one errors when there's an expectedVersion in play, so encapsulating this is no bad thing (in addition to it being something that you don't want to be passing around in your domain logic)

But why, you might ask? the API is designed such that the token can store any kind of state relevant to the Sync operation.

a. for SqlStreamStore and EventStore, when writing rolling snapshots, we need to retain the index of the last Rolling Snapshot that was written, if we encountered it during loading (e.g. if we read V198-100 and there was a snapshot at at V101, then we need to write a new one iff the events we are writing would make event 101 be > batchSize events away, i.e. we need to always include a RollingSnapshot to maintain the "if you read the last page, it will include a rolling snapshot" guarantee)

b. for CosmosDB, the expectedVersion can actually be an expectedEtag - this is how AccessStrategy.RollingState works - this allows one to update Unfolds without having to add an event every time just to trigger a change in the version

(The second usage did not necessitate an interface change - i.e. the Token mechanism was introduced to handle the first case, and just happened to fit the second case)

Alternatively, I'm seeing in proReactor that there's a decide that does version checking. Is this recommended? code

If you need to know the version in your actual handler, QueryEx and other such APIs alongside Transact expose it (e.g. if you want to include a version to accompany a directly rendered piece of data). (Note that doing this - including a version in a rendering of something should not be a goto strategy - i.e. having APIs that pass around expectedVersion is not a good idea in general)

The typical case for using the version in the output is to be able to publish a versioned summary on a feed, so someone else can build a version-checking idempotent Ingester..... Which brings us to:

For that particular reactor, a different thing is going on though: the input value is versioned, and we don't write if the value is in date e.g. if you reset the checkpoint on the projector, it can re-run all the work idempotently:

a. version 3 of something is never temporarily overwritten with V2 and then V3

b. no redundant writes take place (and no expensive RU costs are incurred in Cosmos)

What kind of values does ISyncContext.Version return; i.e. what numeric value is yielded for an empty stream? 🙏 @ragiano215

Independent of the backing store being used, Equinox uses 0-based versioning, i.e. the version value is equal to the number of events in the stream. Each event's Index is 0-based, akin to how a .NET array is numbered:

  • 1 when the first event is written
  • 0 when there's no events

Side note: for contrast, EventStoreDB employs a different (-1-based) scheme in order to have -1/-2 etc represent various expectedVersion conditions

  • 0 when the first event is written to the stream
  • -1 when the stream exists and is empty but has metadata defined
  • -2 when the stream doesn't exist

Note that for Equinox.CosmosStore with a pruner-archiver pair configured, the primary store may have been stripped of events due to the operation of the pruner. In this case, it will however retain the version of the stream in the tip document, and if that's non-0, will attempt to load the archived events from the Archive store.

What is Equinox's behavior if one does a Query on a 'non-existent' stream? 🙏 @ragiano215

Example: I have an app serving a GET endpoint for a customer order, but the id supplied within the URL is for an order that hasn't yet been created.

Note firstly that Equinox treats a non-existent stream as an empty stream. For the use case stated, it's first recommended that the state is defined to represent this non-existent / uninitialized phase, e.g.: defining a DU with a variant Initial, or in some way following the Null Object Pattern. This value would thus be used as the Fold.initial for the Category. The app will use a .Query/.QueryEx on the relevant Decider, and Equinox will supply the initial value for the project function to render from (as a pattern match).

Side note: the original question is for a read operation, but there's an interesting consideration if we are doing a Transact. Say, for instance, that there's a PUT API endpoint where the code would register a fresh customer order for the customer in its order list via the Decider's Transact operation. As an optimization, one can utilize the AssumeEmpty hint as the Equinox.LoadOption to hint that it's worth operating on the assumption that the stream is empty. When the internal sync operation attempts to perform the write, that assumption will be tested; every write is always version checked. In the scenario where we are dealing with a rerun of an attempt to create an order (lets say the call timed out, but the processing actually concluded successfully on another node of the API server cluster just prior to the caller giving up), the version check will determine that the expected version is not 0 (as expected when a stream is Empty), but instead 1 (as the preceding invocation wrote one event). In this case, the loop will then use the fold function from the initial state, folding in the single event (via the evolve function), passing that state to the decider function, which, assuming it's implemented in an idempotent manner, will indicate that there are no events to be written.

What is a Decider? How does the Equinox type Decider relate to Jérémie's concept of one? 🙏 @rmaziarka

The best treatments of the concept of a Decider are:

  1. Jérémie's intro post - it's not short, but its required reading for anyone considering event sourcing, regardless of whether you're even going to use a functional programmign language to do so.
  2. There's a very thorough treatment with code walk-through and discussion in this 2h45m video on Event Driven Information Systems with Jérémie Chassaing, @thinkb4coding

As teased in both, there will hopefully eventually (but hopefully not inevitably) be a book at some point too :fingers_crossed:

In Equinox

The Equinox type Decider exposes an API that covers the needs of making Consistent Decisions against a State derived from Events on a Stream. At a high level, we have:

  • Transact* functions - these run a decision function that may result in a change to the State, including management of the retry cycle when a consistency violation occurs during the syncing of the state with the backing store (See Optmimistic Concurrency Control). Some variants can also yield an outcome to the caller after the syncing to the store has taken place.
  • Query* functions - these run a render function projecting from the State that the Decider manages (but can't mutate it or trigger changes). The concept of CQRS is a consideration here - using the Decider to read state should not be a default approach (but equally should not be considered off limits).

NOTE the Decider itself in Equinox does not directly touch all three of the ingredients - while you pass it a decide function, the initial and fold functions, are supplied to the specific store library (e.g. Equinox.CosmosStore.CosmosStoreCategory), as that manages the loading, snapshotting, syncing and caching of the state and events.

In general

While the concept of a Decider plays well with Event Sourcing and many different types of Stores, it's important to note that neither storage or event sourcing is a prerequisite. A lot of the value of the concept is that you can and should be able to talk about and implement one without reference to any specific store implementation (or even thinking about it ever being stored - it can also be used to manage in-memory structures such as UI trees etc). By the same token, you can decorate/proxy a Decider with loading or saving behavior (not limited to just 'copying the commands'), e.g. you might be syncing saves of changes to a backend in near-real time while the front end is reflecting changes instantaneously.

Consistency

In any system, any decision (or even query) processed by a Decider should be concurrency controlled.

NOTE: the situation might be different if working in an environment where a particular concurrency model is emphasized. E.g.: if you're running in an Actor based system, one may map a decider to an actor. With this, any impetus to change state would be forwarded to that one actor and in a serial fashion. Potentials for conflicts would be managed by a supervisor.

Another example where the situation could be different is if you're building an in-memory decision system to support a game etc as Jérémie does in the talk. There's only one so that concern is side-stepped.

When applying the concept of a Decider to event sourcing, the consistency requirement means there's more to the exercise than emitting events into a thing those marketing centers on Events. There needs to be a way in the overall processing of a decision that manages a concurrency conflict by taking the state that superseded the one you based the original decision on (the origin state), and re-running the decision based on the reality of that conflicting actual state. The resync operation that needs to take place in that instance can be managed by reloading from events, reloading from a snapshot, or by taking events since your local state and folding those Events on top of that.

The ingredients

With Deciders in general, and Equinox in particular, the following elements are involved:

  • a State type, on which decisions can be based. This can be updated as a consequence of a decision, e.g. the item identifiers in a cart and the associated quantities
  • a decide function, which is presented a State, and returns a decision, which we use to update the State, if, and only if there is no concurrency conflict when applying them, e.g. the decider might validate that it's acceptable to start a process at the present time, returning the identifier of the process; if there is already one in flight, it can return the identifier of that already-started process (covered under the term idempotency further onwards)
  • an initial State, from which we start if there's nothing in the store, e.g. an empty list of product codes representing nothing in the cart
  • an Event type (think a Discriminated union). This might have cases like Cleared, QuantityChanged, ItemAdded (in some cases only a snapshot of the state is persisted, but in Equinox, changes are always represented in terms of the relevant Event type for a given Decider)
  • the Events are logically represented as an ordered list of events (could be an in memory array, a stream in ESDB, documents with a Tip as with Equinox.CosmosStore etc). In some cases e.g. RollingState mode, they may get folded and snapshotted when stored)
  • the State is established by folding the sequence of Events, starting from an initial state. This value should not be mutated by the fold function.

With the Equinox type Decider, the typical decide signature used with the Transact API has the following signature:

  context -> inputsAndOrCommand -> 'State -> Event list

NOTE: There are more advanced forms that allow the decide function to be Async, inspect the State's Version and/or to also return a 'result, which will be yielded to the caller driving the Decision as the return value of the Transact function.

So what kind of a thing is a Decider then?

Is it a marketing term? Jérémie's way of explaining an app?

I'd present the fact that Equinox:

  • was initially generalised and extracted from working code using ESDB in (by most measures) a successful startup written in a mix of F# and C# by the usual mix of devs, experience levels, awareness of event sourcing patterns and language/domain backgrounds
  • for a long time only had its MemoryStore as the thing to force it to be store agnostic
  • did not fundamentally change to add idiomatic support for a Document database (CosmosDB)
  • did not change to add idiomatic support for DynamoDB
  • can and has been used at every stage in an e-commerce pipeline
  • is presently aligning pretty neatly with diverse domains without any changes/extensions, both for me and others

... as evidence for Decider being a pattern language (analogous to how event sourcing and many modern event sourced UI paradigms share a lot of common patterns).

... about the process of making decisions

Finally, I'd say that a key thing the Decider concept brings is a richer way of looking at event sourcing than the typical event sourcing 101 examples you might see:

  • de-emphasizing one way calls into the void that map commands to events without deduplicating and/or yielding a result (not saying that you shouldn't do the simplest thing -- you absolutely should)
  • de-emphasizing the notion that all projection handlers don't get more exciting than sitting there looking for MyThingCreated and doing an INSERT with a try/catch for duplicate inserts as a way to make it idempotent (and that every stream design must have a Created event because that's the magic recipe)

The missing part beyond that basic anemic stuff is where the value lies:

  • any interesting system makes decisions:
  • a decision can yield a result alongside the events that are needed to manifest the state change
  • any decision process can and should consider idempotency - if you initiate a process/request something, a retry can't be a special case you don't worry about. Taking correct handling of such retry and/or replay scenarios into consideration should not be an afterthought, but instead be a concern on your day to day checklist when writing a decision function. Of course idempotency can be handled in many ways
    • sometimes before processing gets to the Decider - i.e. any outer layer of the processing can have semantics that cover the idempotency requirement
    • sometimes within the Decider itself (e.g. a decision can yield the unique id generated the first time the request was triggered on every subsequent invocation)
    • sometimes it can be handled externally (e.g. one might not maintain the state that would be necessary to fully deduplicate triggerings and rely on the EventStoreDB and SqlStreamStore idempotent write deduplication mechanism to elide the redundant the writes just in time)
    • A Decider can also be decorated/proxied to add idempotency. As Jérémie mentioned here, and in his talk, you can also naturally layer idempotency on a decider. You can make a generic function D<Cmd,Event,State> -> D<(IdCmd), Event, (State(Id Set))> where Id is a command identifier. In the decide function it checks whether the id is in the set. In the evolve function, it adds the id to the set.
  • it can let you drive a set of reactions in a fault-tolerant and scalable (both perf/system size, and management/separation of complexity) manner
  • a Decider should generally be maintaining one or more invariants associated with the underlying state it represents; if there isn't some element of your system doing that, you might as well be dumping stuff in a log or mutating a CRUD model.

Quite frequently, a Decider may internally operate as a Process Manager, encapsulating a State Machine. That is to say, there will be a subset of the Deciders in a system that are providing APIs that support some overall protocol that enforces some lifecycle rules.

With Equinox.CosmosStore, it seems it should be possible to handle saving multiple events from multiple streams as an atomic transaction, as long as they share the same partition key in Cosmos DB. However there doesn't seem to be any way to do that with APIs such as Equinox.Decider.Transact? 🙏 @rmaziarka

I'm asking because I had this idea which I was workshopping with a friend, that it could solve typical sync problems in typical availability domains.

Let's assume that our domain is Bike Sharing in different cities. Users can reserve a bike, and then access it and ride.

In our system we would have two subdomains:

  • Inventory - responsible for being sure that is bike (resource) is ready to be used
  • Orders - responsible for tracking information regarding reservations, orders etc

There could be different subdomains as Orders, that would be using Inventory under the hood - like Repairing.

In the system we would like to a) block a particular bike for the user and b) at the same time create a reservation for them to store all important business information.

So we use and save data to the 2 different streams of data - typical ES problem.

We could use event handers / sagas but it brings another level of complexity.

In the case above we could assume that data inside a single city will be so small, that even with prolonged usage it won't fill the whole CosmosDB partition. So we could use it to handle saving 2 events in the same time. (My actual system is not Bike Sharing, and I'm confident of a lack of explosive growth!)

Saving a BikeBlocked event would be handled in a transaction along with the ReservationMade. So we wouldn't end up with the situation that bike is blocked and reservation is not made or conversely.

What do you think of this idea? Does it sound reasonable?

Why not keep it simple and have it one logical partition: a high level perspective

I'd actually attack this problem from an event modeling perspective first (Event Storming and other such things are reasonable too, but I personally found the rampup on EM to be reasonable, and it definitely forces you to traverse the right concerns. Good intro article re Event Modeling.

Once you cover enough scenarios of any non-CRUD system, I'd be surprised if you truly end up with a model with just 2 logical streams that you want to combine into 1 for simplicity of event storage because you are covering all the cases and can reason about the model cleanly.

When you have a reasonable handle from a few passes over that (watch our for analysis paralysis, but also don't assume you can keep it simple via 🙈🙉 and not talking to enough people who understand the whole needs of the system, aka 🙊)

For any set of decisions you control in a single Decider you need to:

  • be able to establish all relevant state to back the decision making efficiently (point reads of single documents, etag-backed caching, small streams, not having to filter out things you don't need)
  • be able to write it with a concurrency check (all writers to all stuff under control of the decider are all contenting for the write capacity, i.e provisioned RUs)
  • be able to see the changes on a feed in a useful way (in order of writes, with related stuff easily accessible and no stuff you don't care about)
  • be able to write it efficiently - if you can't absorb the maximum writes needed, you need to find a way to split it out to multiple logical streams (and hence Deciders)
  • have a set of event contracts, folding logic and decision making logic that a person can read, reason about and test
  • have ways to manage evolution over time of the rules, the event schema and the inevitable need to handle scenarios you didnt envisage, or are genuinely new
  • minimize speculative just in case, well intentioned and/or future-proofing complexity - there'll be plenty challenging complexity fun without you adding to the mix
  • you don't want to have to think about anything outside a given Decider when drawing stuff on a whiteboard, looking at a dashboard, looking at metrics, writing a test unless it makes things fundamentally easier

That's a lot of things.

Before we go on, consider this: you want to minimise how much stuff a single Decider does. Adding stuff into a Decider does not add complexity linearly. There is no technical low level silver bullet solution to this problem.

Right, strap yourself in; there's no TL;DR for this one ;)

Preamble

First, I'd refer to some good resources in this space, which describe key functions of an Event Store

Next, I'd like to call out some things that Equinox is focused on delivering, regardless of the backing store:

  • concurrency-controlled updates
    • to a single consistency control unit (stream) at a time (underlying stores in general rarely provide support for more than that, but more importantly, a huge number of use cases in a huge number of systems have natural mappings to this without anyone having to do evil things or write thousands of lines of code)
    • no major focus on blind-writes, even if there is low level support and/or things work if you go direct and do it out of band)
    • provide a good story for managing the writing of the first event in a stream in an efficient manner
  • have a story for providing a Change Feed
    • typically via a matching Propulsion library (fully ordered for SSS and ESDB, ordered at stream level for CosmosDB and DynamoDB)
  • have a story for caching and efficient usage of each store to the best degree possible
    • Equinox.SqlStreamStore
      • caching is supported and recommended to minimise reads
      • in-stream snapshots sometimes help but there are tradeoffs
    • Equinox.EventStore
      • caching is supported, but less important/relevant than it is with SSS as ESDB has good caching support and options
      • Equinox in-stream snapshots sometimes help but there are tradeoffs)
    • Equinox.CosmosStore and Equinox.DynamoStore
      • multiple events are packed into each document (critical to avoid per-Item space and indexing overhead - limits are configurable)
      • etag-checked RollingState access mode enables allow you to achieve optimal perf and RU cost via the same API without writing an event every time
    • Equinox.CosmosStore
      • etag-checked read caching (use without that is not recommended in general, though you absolutely will and should turn it off for some streams)

The provision of the changefeed needs to be considered as a primary factor in the overall design if you're trying to build a general store - the nature of what you are seeking to provide (max latency and ordering guarantees etc) will be a major factor in designing the schema for how you manage the encoding and updating of the items in the store

Sagas?

In the system we would like to block particular bike for the user. But at the same time create a reservation for them to store all important business information. So we use and save data to the 2 different streams of data - typical ES problem. We could use event handers / sagas but it brings another level of complexity.

There will always be complexity in any interesting system that should not just be a CRUD layer over a relational DB. Taking particular solution patterns off the table from the off is definitely something you need to be careful to avoid. As an analogy: Having lots of classes in a system can make a mess. But collapsing it all into as few as possible can result in ISP and SRP violations and actually make for a hard to navigate and grok system, despite there being less files and less lines of code (aka complexity). Coupling things can sometimes keep things Simple, but can also sometimes simply couple things.

In my personal experience

  1. Sagas, PMs and related patterns and techniques can be scary, and there are not many good examples out there in an event sourcing context
  2. You can build a significant number of systems without ever intentionally applying any of those patterns

But, also IME: 3) They're pretty fundamental 4) They are not as hard as you think when you've done two of them 5) Proper stores enable good ways to manage them 6) They enable you to keep the overall complexity of a system down by decoupling things one might artificially couple if you're working with a toolbox where you've denied yourself a space for PMs and Sagas

In other words, my YAGNI detector was on high alert for it, as it seems yours is 😉

Transactional writes?

In the case above we could assume that data inside a single city will be so small, that even with the long usage it won't complete the whole CosmosDB partition. So we could use it to handle saving 2 events in the same time.

For avoidance of doubt:

  • being able to write two events to a single stream as an atomic action is a perfectly normal thing to do (Equinox itself, and any credible Event Store supports it).
  • being able to write to two streams atomically is not a commonly supported operation for Event Stores.

You're correct to identify the maximum amount of data being managed in a scope as a critical consideration when coupling stuff within a logical partition in order to be able to achieve consistency when managing updates across two set of related pieces of information.

Specifically wrt CosmosDB, the following spring to mind without looking anything up:

  • The max amount of data in any logical partition is 20GB. I would not be shocked if it was 50GB some time soon. But it's not going to be a TB any time soon. When that's gone, you need to delete things
  • All updates in a logical partition are hitting the exact same thing on the same node of a physical partition
  • All reads are also hitting that
  • The maximum amount of RUs you can give that physical partition is 5000 RU (or is it 10000 RU?)
  • the more data you have in a single logical partition, the more pages in the indexes, the higher the RU cost for equivalent things (the amount in other logical partitions does not have this effect)
  • hotspots at either logical partition or physical partition levels
  • if you do an event per document, you are hosed
  • if you don't have an e-tag checked write or read scheme, you won't be able to load state efficiently without resorting to queries

In other words, it's looking like you're painted into a corner: you can't shard, can't scale and are asking for hotspotted partition issues. Correct, that doesn't always matter. But even if you have 10 cities, you can't afford for the two busiest ones to be hosted on the same node as that's going to be the one suffering rate limiting. Trust me, it's not even worth thinking about tricks to manage this fundamental problem (trying to influence the sharding etc is going to be real complexity you do not want to be explaining to other engineers on a whiteboard or a 5am conf call)

So why do all these things spring to mind ?

  • This desire comes up all the time - I've had this conversation with tens of engineers with various years of programming, years as users of document databases, years of writing event sourced systems on this topic. I don't believe many have walked away still believing there's an easy way around this either.
  • I have done lots of benchmarking (measuring latency, RUs, handling of concurrency conflicts) of pretty much every possible approach on Cosmos DB
  • I've done lots of CosmosDB specific reading on this - MS has actually got pretty good docs (most useful ones I'm aware of are linked from DOCUMENTATION.md)
  • I've read and watched lots of stuff on various DBs. Most of that can be replaced with https://www.amazon.com/Designing-Data-Intensive-Applications-Reliable-Maintainable/dp/1449373321 and some in-date equivalent of https://martinfowler.com/books/nosql.html
  • Cosmos DB, when used with Mechanical Sympathy (read: Billing Model Sympathy) at the forefront of one's mind is absolutely a good product that can be extremely cost effective and provide excellent performance However, you are pretty much guaranteed to not get that good ☝️ experience without reading, measuring and iterating (I've seen many people with much bigger brains than me prove that over and over - the bad things you hear about Cosmos DB are true too, and are not only caused by the below average drivers)

TL;DR on Cosmos DB for storing events (but really for all usages)

  • You can't update or the changefeed gets out of order (but there are lots of other reasons not to do updates beyond that)
  • You need to do etag-checked reads and writes or go home
  • Each Item/Document costs you about 500 bytes in headers and space in all the indexes so you need to strongly consider >1 event/document
  • Queries are way more costly than point reads. It's called a Document database because the single thing it does best on this planet is read or update ONE document. Read/write cost rule of thumb is per KB, but has logarithmic properties too, i.e. 50K is not always 50x the cost of 1K
  • Keep documents on the small side. (Max doc/item size for CosmosDB is 2MB, max stored proc arg size is 1MB, 1MB induces roundtrip latency and write contention etc). DynamoDB sets max doc size at 400KB for similar reasons. ESDB allows 4MB but that's less of a problem for it.
  • Keep the combined sizes of items/documents in a logical partition as small as possible but no smaller. Cosmos DB has a hard limit of 20GB per logical partition, but in practice, the latency to read or walk that amount of content and/or documents will hit you long before that.
  • Keep documents uniform in size where possible. Consider taking a write overhead hit to preserve read efficiency or scalability. (inc considering compression) wrt 3+4+5, Equinox.Cosmos default max doc size is 30KB. On occasion where paying a write overhead is merited, I've used/recommended 500K and 1 MB for various reasons. That's the exception more than the rule.

Why think about it and explain it at such a low level?

Why think about this problem from such a low level perspective? Why not just stick to the high level given that's equally important to get right, and if correctly will more often yield a cleanly implementable solution?

Many people have a strong desire to write the least amount of code possible, and that's not unreasonable. The most critical question is going to be, does it work at all? Due to the combination of factors above, the answer is looking pretty clear. You can write the code and run it to be sure. I already have, in spike branches, and will save you the spoiler.

However, the fundamental things that arise when viewing it at a low/storage design level, also have high level issues in terms of modelling the software too, and different people will understand them better from different angles

I've witnessed people attempt to 'solve' the fundamental low level issues by working around reality, moving it all into a Cosmos DB Stored Procedure (Yes, you can guess the outcome). Please don't even think about that, no matter how much tech tricks you'll learn!

Conclusion

You know what's coming next: You don't want to merge two Deciders and 'Just' bring it all under a nice tidy transaction to avoid thinking about Process Managers and/or other techniques.

If you're still serious about making the time investment to write a PoC (or a real) implementation of a Store on a Document DB such as CosmosDB (and/or even writing a SQL-backed one without studying the prior art in that space intently), you can't afford not to invest that time in watching a 2h45m video about Deciders!.

OK, but you didn't answer my question, you just talked about stuff you wanted to talk about!

😲Please raise a question-Issue, and we'll be delighted to either answer directly, or incorporate the question and answer here

Acknowledgements

The diagrams in this README.md and the DOCUMENTATION.md would not and could not have happened without the hard work and assistance of at least:

FURTHER READING

See DOCUMENTATION.md and Propulsion's DOCUMENTATION.md

equinox's People

Contributors

adamralph avatar bartelink avatar belcher-rok avatar brihadish avatar chinwobble avatar cumpsd avatar dharmaturtle avatar dheeraj-chakilam avatar dsilence avatar eiriktsarpalis avatar enricosada avatar epnickcoleman avatar erichgoldman avatar eulerfx avatar fluxlife avatar jgardella avatar jorgef avatar kelvin4702 avatar kimserey avatar michaelliao5 avatar nordfjord avatar omnipotentowl avatar purkhusid avatar ragiano215 avatar rajivhost avatar seclerp avatar stoft avatar swrhim avatar vsliouniaev avatar xandkar avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

equinox's Issues

rename Collection -> Container

The Azure portal, MS docs and V3 SDK all use Container in preference to Collection consistently.

Based on this, I feel all Cosmos related APIs should reflect that - this would affect things with Collection in the name.

I'm also thinking that the EQUINOX_COSMOS_COLLECTION env var used by eqx and dotenet-templates should also be changed at this time.

See changes queued in #144

EventStore: Backoffs/handlng when encountering WrongExpectedVersionException

#19 improved the correctness of conflict handling wrt ESDB when using a twin connection, by reading from a Leader connection after a conflict has been flagged in the course of a write.

Fixing this in full in Equinox as a whole means:

Add dotnet global tool edition of CLI

While the CLI needs to remain runnable from full FW in order to be able to run benchmarks in full framework (so we can't just go in and change the CLI to [only] be a dotnet tool), the provisioning and running facilities are environment independent. We should thus wra the CLI as a global tool (and make that be available via nuget)

Root cause or fix local shortcoming leading to FS0988 warnings

For this repo, a central tenet has always been ./build should produce a clean build with no warnings

Atm it says:
warning FS0988: Main module of program is empty: nothing will happen when it is run

I'm OK with it remaining in the short term if there is a tooling fix in the works

Document release process

We use minver, which derives versions from tags
When a build is triggered (based on a refs/branches/X or refs/tags/X), the newest tag encountered in that branch dictates the name
As long as you push the tag first, a random PR or branch CI build will pick up the correct version
Before releasing, sanity check the artifacts includes nupkg files with the release numbers you anticipated
the Release button (if you are in the relevant Pipelines org), then sends those artifacts to nuget.org
After that, it should show on the nuget landing page per artifact
subsequent to this (you'll see a message on the package nuget page before its indexed), it'll be indexed in 3-30 mins
when the indexing has completed, nuget update checks will pick up the new version

once a tag has been pushed, from which the build will pick up the version, the github Releases tab is used to add release notes (no automation at present, would be delighted to have a PR with support for some as long as it does not pull in a boatload of dependencies)

Nuget conventions/tricks/hacks esp when adding new packages:

  • packages should be owned by jet.com, but for safety the pipelines api keys for a release pipeline should only allow uploads of new versions of existing packages, thus the process for when a new package is added is:
    0. never try to upload if there's a new package in the artifacts as it'll mean a partial upload with associated mess (and no clean way to remediate aside from doing manual uploads of all packages after the first one in the upload order)
    1. manual upload using your mouse on nuget.org
    2. invite jet.com as owner
    3. when accepted, run the release with a new version (noting step 0)
    4. remove yourself as the owner of the new packages

Add support for UnionConverter catchall handling to supply context

The base implementation can map to a catchall case, but it does not yet support indicating what the unknown state was (for diagnostic purposes).

It should be possible to capture the discriminator field and perhaps trap the entire payload as a JObject or simila to facilitate dynamic handling

Cosmos: Facility to auto-create stored procedure where client is sufficiently privileged

Provisioning of the stored procedure is presently an explicitly separated act, driven by Equinox.Cli init, which fits in with DDL/DML separation concepts etc.

However, in practice
a) keys are often provisioned with both DML (manipulate data) and DDL (create proc) permissions
b) as stored procedures are revised, new editions may need to be added

For the above, reasons, the connector should gain a ?provisionStoredProcNow parameter that attempts to add if not present when requested

Provide mechanism to signal that stream being manipulated should be empty

When one has just added allocated a new Guid-based index value, you can be confident the child stream is empty and hence it makes sense to attempt writing with an expected version of -1 (in GES terms).

Atm, the implementation does not afford a way to singnal "it's empty, no read to read it, when writing, assume its empty from a consistency control perspective"

Will likely defer addressing this until further generalizations to the interface contracts to facilitate store switching techniques are completed.

Readme and Documentation Questions

Questions

Some of these have likely been answered already in the Readme, if so consider making these broad points more obvious. I read the Readme a few times but it was frequently challenging to tease out what was some very specific point, or a more broad objective.

  • What kind of project is this for, are there other easier ways to build event sourcing for F#, or is this the easiest? (as in does the compatibility layer make things easier to use)
  • What is the scope of Equinox, or what features have been consciously omitted?
  • Broadly speaking what are the goals of using this instead of using one particular kind of EventStore database directly? Does it just allow me to swap out "event stores" or does it do more?
  • Should I reach for Equinox first if I'm doing DDD + CQRS + ES, even if I may not need to change the event store database?
  • Is this layer constructed for ease of use, or does it make things more challenging toward some end?
  • Do you recommend this for the average person who is building an event sourcing project with F# or should I only use Equinox if I need massive scalability?
  • What are some of the other good options for F# event sourcing, is this the only fleshed out option?
  • How do I get started? (development vs production)
  • How do I use Equinox? (development vs production)
  • Is there an API documentation? Where can I find it?
  • You say I can use volatile memory for integration tests, could this also be used for learning how to get started building event sourcing programs with equinox?
  • Is there a guide to building the simplest possible hello world "counter" sample, that simply counts with an add and a subtract event?

Namespace

Shall root namespace be jet.equinox to avoid potential collisions?

Support SqlStreamStore

SqlStreamStore is a SQL-focused library addressing many of the concerns Equinox does, with support for SQL Server and Postgres. Given the common EventStore (and DDD-CQRS-ES slack!)-influenced designs, there's a good chance there'd be minimal work required on either side to add an Equinox.SqlStreamStore adapter

Doing this work would:

  • improve the breed wrt generalizing the store interfaces in Equinox (SqlStreamStore has a wealth of experience behind it)
  • provide an avenue for folks looking for a hosted solutions without having to concern themselves with request charges (likely giving up some perf, but the CLI benchmark facility will tell all)

Initial implementation should probably take a dependency on a Sql LocalDb, but there's an obvious need for it to work well with Azure SQL (the CLI would likely manage the provisioning etc. see #59)

Replace PowerShell-based derivation of VersionSuffix with MinVer

While I elided #71 from master post-merge (it was only really half-working) in the interests of time, MinVer just makes sense given how we'll be managing all Releases based on tags too.

This will involve removing the powershell VersionSuffix logic and replacing it with MSBuild derivation using the Azure env vars as per the MinVer readme.

Cosmos: Harden and merge `tip-isa-Batch` branch

There's a tip-isa-batch branch, which generalizes the storage scheme to provide even greater efficiency in terms of read and write costs.

This work was shelved for prioritisation purposes in order to avoid having to make projectors deal cleanly with seeing an event multiple times.

Given a projector that does some practical de-duplication of events, its pretty feasible to re-introduce this storage optimization; the performance and RU cost reductions can be inspected by using Equinox.Tool run and/or dotnet new eqxtestbed against said branch.

As noted in #108, this storage generalization also provides benefits wrt competing writer scenarios.

WIP: Equinox.Cosmos Storage + Programming Model description

NB this is long and needs lots of editing.

Storage model (see source)

Batches

Events are stored in immutable batches consisting of:

  • partitionKey: string // stream identifier
  • index: int64 // base index of this batch (0 for first event in a stream)
  • id: string // same as i (CosmosDb forces every doc to have one, and it must be a string)
  • events: Event[] // (see next section) typically there is one item in the array (can be many if events are small, for RU and perf-efficiency reasons)
  • ts // CosmosDb-intrinsic last updated date for this record (changes when replicated etc, hence see t below)

Events

Per Event, we have the following:

  • case - the case of this union in the Discriminated Union of events this stream bears (aka Event Type)
  • data - json data (CosmosDb maintains it as actual json; it can be indexed and queried if desired)
  • metadata - carries ancillary information for an event
  • t - creation timestamp

Tip Batch

The tip is readable via a point-read, as the id has a fixed known value (-1). It uses the same base layout as an Event-Batch, but adds the following:

  • _etag: CosmosDb-managed field updated per-touch (facilitates NotModified result, see below)
  • id: always -1 so one can reference it in a point-read GET request and not pay the cost and latency associated with a full query
  • u: Array of _unfold_ed events based on a point-in-time state (see State, Snapshots, Events and Unfolds, Unfolded Events and unfold in the programming model section)

State, Snapshots, Events and Unfolds

In an Event Sourced system, we typically distinguish between the following basic elements

  • Events - Domain Events representing actual real world events that have occurred, reflecting the domain as understood by domain experts - see Event Storming. The customer favorited the item, the customer saved SKU Y for later, $200 got charged with transaction id X.

  • State - derived representations established from Events. A given set of code in an environment will, in service of some decision making process, interpret those events as implying a particular state in a model. If we change the code slightly or add a field, you wouldn't necessarily expect a version of your code from a year ago to generate you equivalent state that you can simply blast into your object model and go. (But you could easily hold a copy in memory as long as your process runs)

  • Snapshots - A snapshot is an intentionally roundtrippable version of a State, which can be saved and restored. Typically one would do this to save the cost of loading all the Events in a long running sequence of Events to re-establish the State. The EventStore folks have a great walkthrough on Rolling Snapshots.

  • Projections - the term projection is heavily overloaded, meaning anything from the proceeds of a SELECT statement, the result of a map operation, an EventStore projection, an event being propagated via Kafka (no, further examples are not required).

.... and:

  • Unfolds - the term unfold is based on the FP function of that name, bearing the signature 'state -> 'event seq. When using Equinox.Cosmos, the unfold produces projections, represented as _event_s to snapshot the state at a position in the stream.

Generating and saving unfolded events

Periodically, along with writing the events that a decision function yields to represent the implications of a command given the present state, we also unfold the resulting state' and supply those to the sync function too. The unfold function takes the state and projects one or more snapshot-events which can be used to reestablish the same state we have thus far derived from watching the events on the stream. Unlike normal events, unfolded events do not get replicated to other systems, and can also be thrown away at will (we also compress them rather than storing them as fully expanded json).

Reading from the Storage Model

Most reads request tip with anIfNoneMatch precondition citing the `etag it bore when we last saw it, which, when combined with a cache means one of the following happens when a reader is trying to establish the state of a stream prior to processing a Command:

  • NotModified (depending on workload, can be the dominant case) - for 1 RU, minimal latency and close-to-0 network bandwidth, we know the present state
  • NotFound (there's nothing in the stream) - for equivalently low cost, we know the state is initial
  • Found - (if there are multiple writers and/or we don't have a cached version) - for the minimal possible cost (a point read, not a query), we have all we need to establish the state:-
    i: a version number
    e: events since that version number
    u: unfolded auxiliary events computed at the same time as the batch of events was sent (aka projections/snapshots) - (these enable us to establish the state without further queries or roundtrips to load and fold all preceding events)

Building a state from the Storage Model and/or the Cache

Given a stream with:

{ id:0, i:0, e: [{c:c1, d:d1}]},
{ id:1, i:1, e: [{c:c2, d:d2}]}, 
{ id:2, i:2, e: [{c:c2, d:d3}]}, 
{ id:3, i:3, e: [{c:c1, d:d4}]}, 
{ id:-1,
  i:4,
  e: [{i:4, c:c3, d:d5}],
  u: [{i:4, c:s1, d:s5Compressed}, {i:3, c:s2, d:s4Compressed}],
  _etag: "etagXYZ"
}  

If we have state4 based on the events up to {i:3, c:c1, d: d4} and the index document, we can produce the state by folding in a variety of ways:

  • fold initial [ C1 d1; C2 d2; C3 d3; C1 d4; C3 d5 ] (but would need a query to load the first 2 batches, with associated RUs and roundtrips)
  • fold state4 [ C3 d5 ] (only need to pay to transport the tip document as a point read)
  • (if isStart (S1 s5) = true): fold initial [S1 s5] (point read + transport + decompress s5)
  • (if isStart (S2 s4) = true): fold initial [S2 s4; C3 d5] (only need to pay to transport the tip document as a point read and decompress s4 and s5)

If we have state3 based on the events up to {i:3, c:c1, d: d4}, we can produce the state by folding in a variety of ways:

  • fold initial [ C1 d1; C2 d2; C3 d3; C1 d4; C3 d5 ] (but query, roundtrips)
  • fold state3 [C1 d4 C3 d5] (only pay for point read+transport)
  • fold initial [S2 s4; C3 d5] (only pay for point read+transport)
  • (if isStart (S1 s5) = true): fold initial [S1 s5] (point read + transport + decompress s5)
  • (if isStart (S2 s4) = true): fold initial [S2 s4; C3 d5] (only need to pay to transport the tip document as a point read and decompress s4 and s5)

If we have state5 based on the events up to C3 d5, and (being the writer, or a recent reader), have the etag: etagXYZ, we can do a HTTP GET with etag: IfNoneMatch etagXYZ, which will return 302 Not Modified with < 1K of data, and a charge of 1.00 RU allowing us to derive the state as:

  • state5

Programming model

In F#, the Equinox programming model involves, per aggregation of events on a given category of stream:

  • 'state: the state required to support the decision or query being supported (not serializable or stored; can be held in a .NET MemoryCache)
  • initial: 'state: the implied state of an empty stream
  • 'event: a discriminated union representing all the possible Events from which a state be evolved (see e and u in the data model). Typically the mapping of the json to an 'event case is driven by a UnionContractEncoder
  • fold : 'state -> 'event seq -> 'state: function used to fold events (real ones and/or unfolded ones) into the running 'state
  • evolve: state -> 'event -> 'state - the folder function from which fold is built, representing the application of the delta the 'event implies for the model to the state
  • decide: 'state -> 'command -> event' list: responsible for (in an idempotent manner) interpreting a command in the context of a state as the events that should be written to the stream to record the decision

When using the Equinox.Cosmos adapter, one will typically implement two further functions in order to avoid having to have every 'event in the stream having to be loaded and processed in order to build the 'state (versus a single cheap point read from CosmosDb to read the tip):

  • unfold: 'state -> 'event seq: function used to render events representing the state which facilitate quickly re-establishing a state without needing to go back to the first event that occurred on a stream
  • isStart: 'event -> bool: predicate indicating whether a given 'event is sufficient as a starting point e.g.

High level Command Processing flow

When running a decision process, we thus have the following stages:

  1. establish a known 'state (based on a given position in the stream of Events)
  2. let the decide function look at the request/command and yield a set of events (or none) that represent the effect of that decision in terms of events
  3. update the stream _contingent on the stream still being in the same State/Position it was in step 1
    3a. if there is no conflict (nobody else decided anything since we decided what we'd do), append the events to the stream (record the new position and etag)
    3b. if there is a conflict, take the conflicting events that other writers have produced since step 1, fold them into our state, and go back to 2 (the CosmosDb stored procedure sends them back immediately at zero cost or latency)
  4. if it makes sense for our scenario, hold the state, position and etag in our cache. When a reader comes along, do a point-read of the tip and jump straight to step 2 if nothing has been modified.

Sync stored procedure high level flow (see source)

The sync stored procedure takes a document as input which is almost identical to the format of the tip batch (in fact, if the stream is found to be empty, it forms the template for the first document created in the stream). The request includes the following elements:

  • expectedVesion: the position the requestor is basing their proposed batch of events on (no, an etag would not be relevant)
  • e: array of Events (see Event, above) to append if the expectedVersion check is fulfilled
  • u: array of unfolded events which supersede items with equivalent case values (aka snapshots, projectiosn)
  • maxEvents: the maximum number of events to record in an individual batch. For example:
    • if e contains 2 events, the tip document's e has 2 documents and the maxEvents is 5, the events get merged into the tip
    • if maxEvents is 1, the tip gets frozen as a Batch, and the new request becomes the tip (as an atomic transaction on the server side)
  • (PROPOSAL/FUTURE) thirdPartyUnfoldRetention: how many events to keep before the base (i) of the batch if required by lagging unfolds which would otherwise fall out of scope as a result of the appends in this batch (this will default to 0, so for example if a writer says maxEvents 10 and there is an unfold based on an event more than 10 old it will be removed as part of the appending process)

Example

The following example is a minimal version of the Favorites model, with shortcuts for brevity (yes, and imperfect performance characteristics):

(* Event schemas *)

type Item = { id: int; name: string; added: DateTimeOffset } 
type Event =
    | Added of Item
    | Removed of itemId
    | Compacted of items: Item[]

(* State types *)

type State = Item list

let contains state id = state |> List.exists (fun x -> x.id=id)

(* Folding functions to build state from events *)

let evolve state event =
    match event with
    | Compacted items -> List.ofArray items
    | Added item -> item :: state
    | Removed id -> List.filter (not (contains state id)) 
let fold state events = Seq.fold evolve state events 

(* Decision Processing *)

type Command =
    | Add item
    | Remove itemId: int

let decide command state =
    match command with
    | Add (id, name, date) ->
        if contains id then [] else [Added {id=id; name=name; date=date}]
    | Remove id -> 
        if contains id then [Removed id] else []

(* Equinox.Cosmos Unfold Functions to allow loading without queries *)

let unfold state =
    [Event.Compacted state]
let isOrigin = function
    | Compacted _ -> true
    | _ -> false

Expose metadata and/or event index to codec

At present, the OOTB codecs only present the data as UTF-8 to encode/tryDecode functions; need to extend to enable:

  • generically wrapping events as an Envelope<T>
  • combining UnionEncoder with grabbing context from the Event Data
  • some degree of uniformity between consumption afforded by Equinox.Cosmos and Equinox.EventStore

Needs to consider #79 to some degree

🤔 Likely will involve introducing an IEvent into the codec module

cc @ameier38

Support .NETStandard 2.0

Easier said that done as EventStore.ClientAPI.NetCore is very far behind - need multitargeting to have a workable solution

Confront consumers with opt-out from caching in Equinox.Cosmos StreamResolver

As flagged by @jakzale, its critical that consumers be confronted early in the experience with decision making processes around caching and/or unfolds and/or out-of-collection snapshots early in the game.

🤔 we could simply make the caching argument not be optional, at least until #61 is fleshed out more

This also could benefit from more docs/articles and/or tutorials and/or samples ref #57

rename DeprecatedRawName ?

The name DeprecatedRawName was chosen in haste; I was seeking to:
a) give a pejorative name to make people think before putting string concats composing a name into their app logic - AggregateId provides a clean way to compose a category name literal with an identifier in code
b) give a strong nudge to use AggregateId as it ensures that the out-of-the-box default separator used by EventStore ("-") gets used which makes $ec- streams etc work
c) provides scope to map AggregateId to something appropriate for any new store

On reflection, while there's nothing wrong with the above opinionated stance, the reality is that plenty existing codebases already compose names in a variety of ways, and will pick a backend and stick with it.

So, for the final v2 release, I'm open to a name change if we can settle on one. Open to any thoughts...

Add `eqx stats cosmos`

The follow queries provide key store metrics which would be very useful to be able to provide in the eqx tool:

  • event count: SELECT VALUE SUM(c.n) FROM c WHERE c.id="-1"
  • stream count SELECT VALUE COUNT(1) FROM c WHERE c.id="-1" (will need to become a distinct count operation if/when #109 is implemented)
  • document count SELECT VALUE COUNT(1) FROM c

Feature: etag-contingent writes for .Cosmos

At present, EventStore, MemoryStore and Cosmos all predicate the concurrency check on a stream index aka expectedVersion. This makes sense as it's a well proven methodology for OCC.

Equinox.Cosmos's unfolds (snapshots etc) mechanism allow one to combine the writing of a set of events with associated snapshot updates as an atomic upsert.

Normally, if no event is being written, it doesnt makes sense to write a snapshot update either.

For rolling updates where there isn't a useful event one would want to add each time (e.g. maintaining a checkpoint), you want to have
a) a way to check you're up to date and/or read the value a competing writer has applied
b) be able to do an OCC for the write to detect and/or react to conflicts
c) be able to efficiently avoid incurring a write cost in the case of an idempotent write
d) minimize the number of events written permanently (and hence docs ChangFeedProcessor sees (esp when running a replay))

How? at present we track the current version and etag per stream in the cache. The etag is used to make a cached read cheap. However, when writing, for consistency with the other stores, the "is a null change" predicate is based on the expected version. The aforementioned can be achieved by allowing a write to requested that is instead based on the etag not having changed.

An associated mechanism is having a way to write logic based on events, but allow a post processing mechanism to map what would ordinarily be an OCC event append to an etag-contingent snapshot update only, while allowing the logic to be written in the normal manner.

Example of a stream that would benefit from this: https://github.com/jet/propulsion/blob/master/src/Propulsion.EventStore/Checkpoint.fs#L40

Cosmos: Support competing writers with inconsistent `unfold`s

At present, if you supply an unfold function, for each write, its output:
a) travels on the wire to the stored procedure (this does not cost extra, so optimizing that is not really a concern)
b) replaces the current unfolds in the Tip document (as usual, you pay for a read with the original size and a write with the new size (not forgetting the cost of the read per active ChangeFeedProcessor) in terms of Request Charges)

Pros and cons:

  • if you stop writing an unfold, it will get removed on the next write, saving storage space, and RUs on the write, subsequent reads, and induced reads by ChangeFeedProcessor(s)
  • if you are doing blue/green deploys, old and new writers can interleave - the loser needs to rebuild their state unless the competing writer happens to also have written the same one (and then potentially remove the competitor's one iff it writes)

A small change to the JS can improve the semantics by
a) removing unfolds that are being updated and replacing with the supplied values
b) retaining any that are not being touched (perhaps subject to some retention criteria, i.e. only keep it if it's <= N events behind Tip)
c) allowing the writer to indicate a set of caseNames that should be removed regardless of retention rules

Related: there's a tip-isa-batch branch which stores events in the Tip - the competing writers scenario's efficiency would be greatly improved by this (any conflict will yield the competing events cleanly, and any unfolds that are behind will typically see both their unfold and successor events from the single point-read roundtrip)

Improve resync wrt ES WrongExpectedVersionException

Current behavior is to attempt a re-read without a backoff delay; in prod this is regularly failing despite 3 retries (it's a short stream and the typical timespan is <10ms so the conflict never resolves and the app is yielding excessive 500s as a result).

Considerations:

  • Ideally the API will accommodate varying the backoff per transaction (similarly to how the current attempts count is per-transaction not connection-wide)
  • If there is a way to request a read from the master node (perhaps after the first retry?), we should do that (a secondary connection might be overkill)

See EventStore/EventStore#1626

Simplify value object infrastructure.fs in samples

In the sample project we have SkuId as a value object implemented as a class hat inherits a Comparable

/// Endows any type that inherits this class with standard .NET comparison semantics using a supplied token identifier
[<AbstractClass>]
type Comparable<'TComp, 'Token when 'TComp :> Comparable<'TComp, 'Token> and 'Token : comparison>(token : 'Token) =
    member private __.Token = token // I can haz protected?
    override x.Equals y = match y with :? Comparable<'TComp, 'Token> as y -> x.Token = y.Token | _ -> false
    override __.GetHashCode() = hash token
    interface IComparable with
        member x.CompareTo y =
            match y with
            | :? Comparable<'TComp, 'Token> as y -> compare x.Token y.Token
            | _ -> invalidArg "y" "invalid comparand"

/// SkuId strongly typed id
[<Sealed; JsonConverter(typeof<SkuIdJsonConverter>); AutoSerializable(false); StructuredFormatDisplay("{Value}")>]
// (Internally a string for most efficient copying semantics)
type SkuId private (id : string) =
    inherit Comparable<SkuId, string>(id)
    [<IgnoreDataMember>] // Prevent swashbuckle inferring there's a "value" field
    member __.Value = id
    override __.ToString () = id
    new (guid: Guid) = SkuId (guid.ToString("N"))
    // NB tests (specifically, empty) lean on having a ctor of this shape
    new() = SkuId(Guid.NewGuid())
    // NB for validation [and XSS] purposes we prove it translatable to a Guid
    static member Parse(input: string) = SkuId (Guid.Parse input)
/// Represent as a Guid.ToString("N") output externally
and private SkuIdJsonConverter() =
    inherit JsonIsomorphism<SkuId, string>()
    /// Renders as per Guid.ToString("N")
    override __.Pickle value = value.Value
    /// Input must be a Guid.Parseable value
    override __.UnPickle input = SkuId.Parse input

Can this be simplified to a record type like this?

[<JsonConverter(typeof<SkuIdJsonConverter>)>]
type SkuId = private { [<IgnoreDataMember>] Value: Guid } with
    static member Create (value: string): SkuId =
        if (String.IsNullOrWhiteSpace(value)) then
            invalidArg "value" "cannot be whitespace"
        { SkuId.Value = Guid.Parse(value) }
type SkuIdJsonConverter() =
    inherit JsonIsomorphism<SkuId, string>()
    override __.Pickle value = value.Value.ToString("N")
    override __.UnPickle input = SkuId.Create(input)

Caching should not read forward on miss if window size constrained to 1

Via @sorinoboroceanu
Using the caching layer when each event represents a full set of state by using a window size of 1 and a tautologous compaction predicate (see https://github.com/jet/foldunk/blob/master/Samples/Store/Domain/ContactPreferences.fs) currently has suboptimal behavior if there has been >1 competing write -- uncached, it would read exactly one event (backwards) whereas in this impl, we read all the events to catch up, but will then only use one

Port or provide separate Cache implementation for .NETStandard

At present, master targets the full framework. The main blocker on migrating it to netstandard is that the caching impl in feature/ges-caching branch is implemented against the full framework `System.Runtime.Caching. The .NET Core / .NET Standard impl has a different interface AFAICT.

We've yet to put significant load through the current cache impl to base impl to make a call as to whether using a different impl is a loss or a gain.

If this can be resolved, a likely fast follow would be to retarget the Foldunk.EventStore in master against EventStore.Client.NetCore (EDIT: EventStore.Client.NetCore is not being maintained; current thinking is to target a multi-targetted version of the ES client)

Cosmos: Provide ability to separate storage of snapshots

The unfolds mechanism, as documented in #50 provides the ability to efficiently stash relevant data facilitating reading of state based on a point read.

However, this is can be suboptimal in the cases of larger snapshot states:-

  • the data gets compressed and travels each time
  • writing/updating occasions read and write costs within the sync stored procedure each time
  • competing writers pay all but the update case in the case of a re-run of the same transaction
  • readers not interested in/unable to process a specific snapshot schema still have to pay the price to transport it
  • each update goes to the changefeed triggering more read costs
  • the size and cost of the snapshots is harder to separate out
  • snapshots contribute to partition size limits

Thus it makes sense to do some, or all of the following in the case where there is a projector in play and there is an aux collection (in the case of multiple writers one might even consider making an aux collection even if you don't have a projector):

  • maintain the snapshots in the aux collection, where one is not paying for changefeed re-reads of the same data
  • reduce the write frequency for snapshots (when the write happens should be deterministic for competing writers)
  • implement ability for stored proc to maintain events since snapshot in tip when not updating snapshot
  • write to a nominated collection (typically aux), with a deterministic guid based on the case / event type tag of the union as a point write
  • refer to the snapshot in the unfolds; automate loading of the referenced snapshot if picked by isOrigin
  • expose the ability to customize the snapshot loading process (but batteries should be included)
    • (perhaps do a sample showing parallel reads of the Snapshots and the Tip?)

Throw on writing zero-length slices in Equinox.Cosmos

While the normal Stream API short circuits on writing zero events, the lower level Equinox.Comsoms.Core.Events APIs do not, which leads to a confusing error message.

This should be addressed by throwing an exception at an appropriate point in proceedings.

Support DynamoDb

In the spirit of #62, this ticket tracks the considerations for implementing a storage adapter for Amazon DynamoDb; It absolutely does not represent a roadmapped task or anything that's an organic need relevant to a current usage scenario.

The scheme used by Equinox.Cosmos seems to map to the [new DynamoDb transactional API) (https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/transaction-apis.html)

Not 100% about how best to manage OCC - the client Position tracks the etag in the Cosmos impl, and a detailed read and research of guarantees provided is obviously warranted. One thing that does seem clear is that it It seems the client token idempotency is definitely not relevant. Wild guess: Maintaining expectedVersion either inside Tip or as metadata within it and then doing the writes conditional on that should work.

One key difference is the 400K document size upper limit, which means:

  • Unfolds can't practically be maintained directly in Tip - e.g. if you ever need to support a transition in a snapshot schema in a blue/green deploy scenario, that halves the 400K. This means that an equivalent of #61 would need to be implemented
  • Likely one should explore re-introducing the [Tip isa Batch] semantics removed in #58 as that is in general an efficient scheme in terms of reducing document counts, contention and Request Capacity Init consumption Should follow the Equinox.CosmosStore v3 impl, which implements Tip isa Batch semantics
  • The 400K may make #31 more relevant than it has proven for CosmosStore - client side mechanisms about managing how one is faring wrt the limit will be big consideration given its 3MB vs 400K - the former is effectively unlimited for most reasonable sets of events and/or unfolds one might be maintaining.

Cosmos: Provide facility to generate an aux collection

At present, Equinox.Cli init will create a collection and add a stored proc. For the aux collection used by Change Feed Processors, adding the stored procedure should be made optional as this is step is not necessary for the Aux collection

See also #59 - in some cases users may not be interested in adding the stored procedure at the point of provisioning the collection in any case.

See also #61 - snapshots have different (no?) indexing requirements

We'll probably also make ./build provision an aux

Add Handler+Service tutorial to documentation.md

Atm, Todo.fs and Aggregate.fs are the only real examples (aside from the samples/ folder) of what one might do to connect the basic domain logic to a running app.

Documenting the sort of things you put into the Handler and/or Service and how to balance that is a key piece of documentation that's been pointed out as missing....

Patterns to be covered:

  • Command with no response with straight call in Service (todo does this)
  • Command with decision result from Interpret function feeding out (don't think that's covered either here or in the Equinox /samples folder)
  • Command with projection from final folded result post command application (todo does this)
  • Query (see todo and aggregate)

General advice on what to do that the tutorial should cover:

  • there's definitely more - the tutorial should also mention some general concepts around CQRS and provide advice beyond "here are 11 techniques, you fail if you on't use at least 9 of them"
  • address the key point is that one size does not fit all and its important that the Handler and Service be well thought through
    • if you end up cut and pasting exactly the same one as boilerplate per aggregate its a bad sign
    • if you end up using more than 2-3 of the techniques above, that's also a bad sign

Deferred:

  • Command calling out to external decision process (e.g. if a decision process needs to examine the folded state, then propose a relevant command and/or have a longer chain of calls which you want to wrap an optimistic retry loop around)
    • There are Async overloads for prominent functions which result from specific cases in existing codebase. They will not be documented in #96 as they don't constitute best practice worth highlighting (@eiriktsarpalis argues for removal of such temptations from the house entirely; for now they remain)

Customizing cache implementation

I've been researching the library and it looks really neat!

I've been wondering if there are plans to allow the customization if cache implementation for GES and CosmosDB providers.
At the moment it uses System.Runtime.Caching.MemoryCache instance.

Microsoft.Extensions.Caching provides multiple abstractions (IMemoryCache and IDistributedCache), and the possibility to customize the implementation seems like a valuable option. E.g. in our system MemoryCache from Microsoft.Extensions.Caching is used heavily, and having multiple caching providers doesn't make much sense.

Provide practical default Event size limit checks at storage level

In order to simplify backend store migrations, we should default to rejecting messages over the limit of any commonly used backend in order to avoid necessitating e.g. requiring a DocDb-based backend to split messages.

The mechanism should likely enforce the rejection via an exception - applications will be required to ensure that e.g. compaction events guarantee to fit within the constraints implied by this limit (i.e. shedding or truncating of state should be managed at application level)

(ES and DocumentDb respectively have 4Mb and 2Mb message size limits; DynamoDB max item is 400KB.)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.