Giter VIP home page Giter VIP logo

latte's Introduction

Latte

Screenshot 2024-01-21 at 9 59 19 AM

Latte is a modern data engineering toolkit. Latte provides out of the box config-driven help to reduce the time and cost for many common data engineering tasks, including:

  • Load: Load data into data sources for downstream analysis, such as kafka.
  • Aggregate: Aggregate operational data at the source to produce business analytics.
  • Transform: Transform data during processing.
  • Transfer: Transfer data from one system into another maintaining the original state.
  • Extract: Extract data from a variety of sources, including Mongo, Postgres and Prometheus.

Latte extracts and loads data from a variety of sources:

Screenshot 2024-01-29 at 7 15 22 PM

Project Goals

Latte aims to:

  • Reduce cost of analytic data calculation, storage and query.
  • Increase data warehouse data fidelity.
  • Reduce data lake sprawl.
  • Reduce data transfer into and costs of querying datalakes and datawarehouses.
  • Enable product engineers to write self-service analytics built on their operational data stores.

Latte is configured using simple yaml configuration files:

Screenshot 2023-12-24 at 9 05 22 AM

Getting Started

Local Go Development

  • Start docker dependencies
docker-compose -f dev/compose.yaml up -d
  • Validate Configuration
go run cmd/main.go config validate --config=$(PWD)/dev/examples/postgres.kafka.stdout.yaml
VALID=true
  • Invoke collector
go run cmd/main.go config invoke --config=$(PWD)/dev/examples/postgres.kafka.stdout.yaml
  • Verify Kafka Output
docker exec -it kafka1 kafka-console-consumer --bootstrap-server=localhost:9092 --topic=latte --from-beginning | jq .
{
  "uuid": "4fdbc492-7e76-4ae7-9e32-82da7463f374",
  "name": "core.users.total",
  "value": 3,
  "type": "COUNT",
  "tags": {
    "customer": "google"
  },
  "timestamp": "2023-12-24T14:12:01.237538Z",
  "grain_datetime": "2023-12-24T00:00:00Z"
}
{
  "uuid": "cdc14916-15a4-4579-aa06-2dc65f442aba",
  "name": "core.users.total",
  "value": 2,
  "type": "COUNT",
  "tags": {
    "customer": "amazon"
  },
  "timestamp": "2023-12-24T14:12:01.237543Z",
  "grain_datetime": "2023-12-24T00:00:00Z"
}

Local Docker

  • Start docker dependencies:
docker-compose -f dev/compose.with-collector.yaml up -d
  • Validate config
docker-compose -f dev/compose.with-collector.yaml run latte config validate --config=/dev/config/postgres.kafka.stdout.yaml

Creating dev_latte_run ... done
/dev/config/postgres.kafka.stdout.yaml
VALID=true
  • Invoke Collector
docker-compose -f dev/compose.with-collector.yaml run latte config invoke --config=/dev/config/postgres.kafka.stdout.yaml

Creating dev_latte_run ... done
{"level":"info","ts":1703725219.4218154,"caller":"collector/collector.go:165","msg":"collector.Invoke","id":"d20161ac-be75-4868-8794-04d7bfa7d9d3","name":"postgres.users.total.24h"}
{"uuid":"a540fb6c-1638-4109-a385-3b0afda6fa12","name":"core.users.total","value":3,"type":"COUNT","tags":{"customer":"google"},"timestamp":"2023-12-28T01:00:19.422545549Z","grain_datetime":"2023-12-28T00:00:00Z"}
{"uuid":"fdac2b3f-a053-4997-b1a3-1ce1c6ca89a4","name":"core.users.total","value":2,"type":"COUNT","tags":{"customer":"amazon"},"timestamp":"2023-12-28T01:00:19.422548216Z","grain_datetime":"2023-12-28T00:00:00Z"}
  • Verify Kafka Output
docker exec -it kafka1 kafka-console-consumer --bootstrap-server=localhost:9092 --topic=latte --from-beginning

{"uuid":"a540fb6c-1638-4109-a385-3b0afda6fa12","name":"core.users.total","value":3,"type":"COUNT","tags":{"customer":"google"},"timestamp":"2023-12-28T01:00:19.422545549Z","grain_datetime":"2023-12-28T00:00:00Z"}
{"uuid":"fdac2b3f-a053-4997-b1a3-1ce1c6ca89a4","name":"core.users.total","value":2,"type":"COUNT","tags":{"customer":"amazon"},"timestamp":"2023-12-28T01:00:19.422548216Z","grain_datetime":"2023-12-28T00:00:00Z"}
  • Run Collector as daemon
docker-compose -f dev/compose.with-collector.yaml run latte run -c=/dev/config

Creating dev_latte_run ... done
{"level":"info","ts":1703726983.41746,"caller":"cmd/run.go:52","msg":"loading configs","path":"/dev/config"}
{"level":"info","ts":1703726983.4632287,"caller":"cmd/run.go:71","msg":"initialized collectors","num_collectors":5}
{"level":"info","ts":1703726983.4632988,"caller":"service/service.go:27","msg":"run"}
{"level":"info","ts":1703726983.4633727,"caller":"collector/collector.go:165","msg":"collector.Invoke","id":"5ba44984-a8a3-42ac-a70d-85e11f808a6c","name":"postgres.users.total.24h"}
  • Tail the audit log, Check Kafka, Verify Vector
tail -f dev/audit/latte.audit.log

Examples

Example Configurations

Checkout the examples directory for configuration examples.

Additional Documentation

Additional documentation is available in the docs/ directory


Image by catalyststuff on Freepik

latte's People

Contributors

turbolytics avatar

Stargazers

Kevin Damm avatar IAbuElRuzz avatar  avatar Ashwin Jayaprakash avatar Martin Spasovski avatar Nikolay Kolev avatar Philip Nilsson avatar  avatar  avatar

Watchers

 avatar

latte's Issues

Service Mode

Collectors should be scheduled at the desired intervals, and support cron expressions.

  • Create a new "run" command
  • Accepts a directory of configuration files as CLI args
  • Invokes each collector on initialization to ensure data is collected

The first version should be stateless and config file driven.
The config files will be loaded, scheduled and executed at the intervals specified.

The next version should introduce some sort of persistence and a REST interface on top of the scheduler. Persistence will also be necessary for oauth2 sources. The access tokens will need to be stored and mapped.

SQLite State Store

Right now an in memory duckdb state store is used for prometheus POC.

Signals collector should support a persistent state store.

Generic Transform Pipeline

Transformations will most likely be expanded. Right now the following transforms are supported internally:

  • Adding Metric Name
  • Adding metric type
  • Adding metric tags
  • Adding grain datetime
for _, m := range ms {
  for _, tFunc := range transforms {
     tFunc(m)
  }
}

README

  • Project Goals
  • Approach
  • Getting Started
  • Quickstart
  • Examples

Sinker Interface

Many of the sinks need cleanup, such as kafka.

Make Close() part of the interface and call it on service shutdown.

HTTP Sink: Add Retry

The data has already been collected, make a configurable number of retry attempts with backoff and jitter.

Partition Collector!

Ability to collect partitioned data and sink it to target systems.

Examples include issueing S3 blob storage loads into a system like hive or clickhouse.

Grain oriented date field.

Right now each metric contains a time field containing the time the metric was generated. I'm guessing many output sources will be interested in the interval-aligned time.

Consider the following query that counts users by customer:

     SELECT 
        account as customer,
        COUNT(*) as value
      FROM 
        users
      GROUP BY
        account

Each row will produce a separate metric:

{
  "uuid": "55ef5b32-271f-496a-8cde-27924d7f67aa",
  "name": "core.users.total",
  "value": 3,
  "type": "COUNT",
  "tags": {
    "customer": "google",
    "env": "prod"
  },
  "time": "2023-12-23T20:53:57.150929Z"
}
{
  "uuid": "8fcaf7da-76c2-4152-b097-9e15bc47c8cf",
  "name": "core.users.total",
  "value": 2,
  "type": "COUNT",
  "tags": {
    "customer": "amazon",
    "env": "prod"
  },
  "time": "2023-12-23T20:53:57.150934Z"
}

Most of the time aggregating on time will work. Imagine that this collector runs 1x a day. While it's likely that all metrics will contain a time that is in the same day, it's not guaranteed.

An interval aligned time field can introduce a logical time bucket that all metrics in a data collection roll up to.

Source Strategy: Normalized offering

  • Historic Tumbling Window -> Collects data across all historic fully complete windows. Provides windowing guarantees, i.e. counting by day, hour, week, etc.
  • Tick - Invokes collector at point in time, great for running COUNTS like total users, and GAUGEs since it's self correcting. Captures a reading at a moment in time.
  • Incremental - Invokes collector starting from last previous invocation until now. Fully captures all available data but no windowing guarantees.

Service Rest API - StateStore List

The service should support restulf API operations. One of which should be the ability to query the the state store, especially in the case of the memory state store.

This should include a List() method on the statestore interface and expose this through an HTTP api

Config Templating Engine

Ability to globally template the config file. While users can handle this on their end, it should reduce the barrier to entry to allow go template templating support.

Entity Collector

Ability to collect entities and not metrics.

Entities are {}interfaces, or map[string]any types.

These have different rules for persistance as well, sometimes a "snapshot" will be created, where all entities are persisted to a single location (such as s3).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.