Giter VIP home page Giter VIP logo

data-streams-go's Introduction

data-streams-go

Introduction

This product is meant to measure end to end latency in async pipelines. It's in an Alpha phase. We currently support instrumentations of async pipelines using Kafka. Integrations with other systems will follow soon.

Glossary

  • Data stream: A set of services connected together via queues
  • Pathway: A single branch of connected services
  • Queue: A connection between two services
  • Edge latency: Latency of a queue between two services
  • Latency from origin: Latency from the first tracked service, down to the current service
  • Checkpoint: records at what time a specific operation on a payload occurred (eg: The payload was sent to Kafka). The product can then measure latency between checkpoints.

The product can measure edge latency, and latency from origin, for a set of checkpoints connected together via queues. To do so, we propagate timestamps, and a hash of the path that messages took with the payload.

Go instrumentation

Prerequisites

You need to start the pipeline with datastreams.Start() at the start of your application. Default trace agent URL is localhost:8126. If it's different for you, use the option:

datastreams.Start(datastreams.WithAgentAddr("notlocalhost:8126"))

The instrumentation relies on creating checkpoints at various points in your data stream services, recording the pathway that messages take along the way. These pathways are stored within a Go Context, and are passed around via message headers.

M1 support of the Kafka instrumentation

This library relies on the Confluent Kafka Go Library. On M1 machines, the following parameters are needed when starting the instrumented service:

-tags dynamic

Kafka

To instrument your data stream services that use Kafka queues, you can use the library provided under github.com/DataDog/data-streams-go/integrations/kafka.

On the producer side, before sending out a Kafka message you can call TraceKafkaProduce(), which sets a new checkpoint onto any existing pathway in the provided Go Context (or creates a new pathway if none are found). It then adds the pathway into your Kafka message headers.

import (ddkafka "github.com/DataDog/data-streams-go/integrations/kafka")
...
ctx = ddkafka.TraceKafkaProduce(ctx, &kafkaMsg)

Similarly, on the consumer side, you can call TraceKafkaConsume(), which extracts the pathway that a particular Kafka message has gone through so far. It also sets a new checkpoint on the pathway to record the successful consumption of a message, and then finally it stores the pathway into the provided Go Context.

import (ddkafka "github.com/DataDog/data-streams-go/integrations/kafka")
...
ctx = ddkafka.TraceKafkaConsume(ctx, &kafkaMsg, consumer_group)

Please note that the output ctx from TraceKafkaProduce() and TraceKafkaConsume() both contains information about the updated pathway. For TraceKafkaProduce(), if you are sending multiple Kafka messages in one go (i.e. fan-out situations), do not reuse the output ctx across calls. And for TraceKafkaConsume(), if you are aggregating multiple messages to create a smaller number of payloads (i.e. fan-in situations), merge the resulting Contexts from TraceKafkaConsume() using MergeContexts() from github.com/DataDog/data-streams-go. This resulting Context can then be passed into the next TraceKafkaProduce() call.

import (
    datastreams "github.com/DataDog/data-streams-go"
    ddkafka "github.com/DataDog/data-streams-go/integrations/kafka"
)

...

contexts := []Context{}
for (...) {
    contexts.append(contexts, ddkafka.TraceKafkaConsume(ctx, &consumedMsg, consumer_group))
}
mergedContext = datastreams.MergeContexts(contexts...)

...

ddkafka.TraceKafkaProduce(mergedContext, &producedMsg)

Manual instrumentation

the example below is for HTTP, but you can instrument any technology you want with these manual instrumentation. In HTTP, we propagate the pathway through HTTP headers.

In the http client, to inject the pathway, use:

req, err := http.NewRequest(...)
...
p, ok := datastreams.PathwayFromContext(ctx)
if ok {
   req.Headers.Set(datastreams.PropagationKeyBase64, p.EncodeStr())
}

And to extract the pathway from HTTP headers (inside the HTTP server), use:

func extractPathwayToContext(req *http.Request) context.Context {
	ctx := req.Context()
	p, err := datastreams.DecodeStr(req.Header.Get(datastreams.PropagationKeyBase64))
	if err != nil {
		return ctx
	}
	ctx = datastreams.ContextWithPathway(ctx, p)
	_, ctx = datastreams.SetCheckpoint(ctx, string[]{"type:http", "direction:in"})
}

data-streams-go's People

Contributors

hokitam avatar jwbdd avatar kr-igor avatar ledor473 avatar nickuraltsev avatar piochelepiotr avatar rabisg avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

data-streams-go's Issues

Newly created pathways show incorrect pathway start in DataDog interface

I'm not sure if this is a bug in the go library or the DD service, but using this basic go script (it's for testing) to produce Kafka messages, the DD interface shows the pathway starting on test-topic-1 instead of go-step-1.

//usr/bin/env go run "$0" "$@"; exit "$?"

package main

import (
	"context"
	"time"
	"fmt"
	"strconv"
	"os"

	"github.com/confluentinc/confluent-kafka-go/kafka"
	"github.com/DataDog/data-streams-go/datastreams"
	ddkafka "github.com/DataDog/data-streams-go/integrations/kafka"
)

func main() {
	fmt.Printf("Starting app\n")

	datastreams.Start()
	p, err := kafka.NewProducer(&kafka.ConfigMap{
		"bootstrap.servers": "localhost:9092",
		"client.id": "go-step-1",
		"acks": "all",
	})

	if err != nil {
		fmt.Printf("Failed to create producer: %s\n", err)
		os.Exit(1)
	}

	delivery_chan := make(chan kafka.Event, 10000)
	topic := "test-topic-1"

	for i := 1; i <= 931; i++ {
		ctx := context.Background()
		message := kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
			Value: []byte(strconv.FormatInt(time.Now().Unix(), 10)),
		}

		ctx = ddkafka.TraceKafkaProduce(ctx, &message)
		err = p.Produce(&message, delivery_chan)

		e := <-delivery_chan
		m := e.(*kafka.Message)

		if m.TopicPartition.Error != nil {
				fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
		} else {
				fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
								*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
		}
	}

	 close(delivery_chan)
	 datastreams.Stop()
}

Code ran with DD_SERVICE=go-step-1 DD_ENV=dev ./main.go.

Showing up in DataDog like so:

image

I haven't been able to setup a java or net project to confirm this behavior with the other officially supported languages.

DSM v0.4

Hi there,

Given that Data Streams has gone GA, would you be able to cut a v0.4 release for users that would like to use the new interfaces + Sarama wrapper without depending on the main branch directly?

Thanks ๐Ÿ™‚

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.