Demo code for processing ordered events in Apache Beam pipelines.
This repo contain a simulation of the Order Book processing in streaming and batch Apache Beam pipelines. It shows how ordering processing can be done in Apache Beam at scale, provides a fully functional pipeline, a simulator test harness and a set of scripts to visualize processing steps and the output of the pipeline.
The use case is maintaining an order book of security order events (buy, sell or cancellation) and producing the security's market depth on every trade.
The market depth data can be saved to a persistent storage for and additional analysis or can be analyzed in the same pipeline to build a streaming analytics solution.
Use case's is implemented as a standalone Java module (business model), with the core logic residing in the OrderBookBuilder class. The simulator package has utilities to generate order book events simulating financial institution trading sessions.
The pipeline uses the Beam's state and timers to process events in order. For a detailed description of the steps needed to implement the pipeline see this document.
- Clone this repo and switch to the checked out directory
- Designate or create a project to run the tests and create
terraform/terraform.tfvars
file with the following content:
project_id = "<your project id>"
- Create infrastructure to run the demo:
cd terraform
terraform init
terraform apply
cd ..
- Build the project
mvn clean install
This will start a simulator which will be generating synthetic orders and expected order book events:
./start-pubsub-simulator.sh
Once the pipeline is running, you can use BigQuery console, or bq
utility to see how the pipeline
processes the data.
To see the processing state for the latest session:
WITH latest_statuses AS (
-- Stats for each contract
SELECT s.received_count,
s.buffered_count,
s.result_count,
s.duplicate_count,
s.last_event_received
FROM `ordered_processing_demo.processing_status` s
WHERE
-- Find latest session_id
session_id = (SELECT DISTINCT session_id
FROM `ordered_processing_demo.processing_status`
ORDER BY session_id DESC
LIMIT
1
)
-- Most recent stats by status_id across (session_id, contract_id)
QUALIFY RANK() OVER (
PARTITION BY session_id
, contract_id
ORDER BY status_ts DESC, received_count DESC
) = 1
)
SELECT COUNT(*) total_contracts,
COUNTIF(last_event_received
AND buffered_count = 0) fully_processed,
SUM(received_count) total_orders_received,
SUM(buffered_count) total_orders_buffered,
SUM(result_count) total_results_produced,
SUM(duplicate_count) total_duplicates
FROM latest_statuses;
This query shows processing status per contract for the latest session:
SELECT *
FROM `ordered_processing_demo.processing_status`
WHERE session_id = (SELECT DISTINCT session_id
FROM `ordered_processing_demo.processing_status`
ORDER BY session_id DESC
LIMIT 1
)
QUALIFY RANK() OVER (
PARTITION BY
session_id
, contract_id
ORDER BY
status_ts DESC, received_count DESC
) <= 5
ORDER BY
session_id,
contract_id,
status_ts DESC,
received_count DESC
LIMIT
300
SELECT *
FROM `ordered_processing_demo.market_depth` QUALIFY RANK() OVER (
PARTITION BY
session_id, contract_id
ORDER BY
session_id, contract_sequence_id DESC
) <= 5
ORDER BY
session_id, contract_id, contract_sequence_id DESC
LIMIT
300
./stop-pipeline.sh
terraform -chdir terraform destroy
Contributions to this repo are always welcome and highly encouraged.
See CONTRIBUTING for more information how to get started.
Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms. See Code of Conduct for more information.
Apache 2.0 - See LICENSE for more information.