Giter VIP home page Giter VIP logo

kinesis-producer's Introduction

Amazon kinesis producer Build status License GoDoc

A KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK
and using the same aggregation format that KPL use.

Useful links

Example

package main

import (
	"time"

	"github.com/sirupsen/logrus"
	"github.com/a8m/kinesis-producer"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/kinesis"
)

func main() {
	client := kinesis.New(session.New(aws.NewConfig()))
	pr := producer.New(&producer.Config{
		StreamName:   "test",
		BacklogCount: 2000,
		Client:       client
	})

	pr.Start()

	// Handle failures
	go func() {
		for r := range pr.NotifyFailures() {
			// r contains `Data`, `PartitionKey` and `Error()`
			log.Error(r)
		}
	}()

	go func() {
		for i := 0; i < 5000; i++ {
			err := pr.Put([]byte("foo"), "bar")
			if err != nil {
				log.WithError(err).Fatal("error producing")
			}
		}
	}()

	time.Sleep(3 * time.Second)
	pr.Stop()
}

Specifying logger implementation

producer.Config takes an optional logging.Logger implementation.

Using a custom logger
customLogger := &CustomLogger{}

&producer.Config{
  StreamName:   "test",
  BacklogCount: 2000,
  Client:       client,
  Logger:       customLogger,
}

Using logrus

import (
	"github.com/sirupsen/logrus"
	producer "github.com/a8m/kinesis-producer"
	"github.com/a8m/kinesis-producer/loggers"
)

log := logrus.New()

&producer.Config{
  StreamName:   "test",
  BacklogCount: 2000,
  Client:       client,
  Logger:       loggers.Logrus(log),
}

kinesis-producer ships with three logger implementations.

  • producer.Standard used the standard library logger
  • loggers.Logrus uses logrus logger
  • loggers.Zap uses zap logger

License

MIT

kinesis-producer's People

Contributors

a8m avatar aelse-atlassian avatar dharmeshptl avatar glaslos avatar jhop310 avatar mwillfox avatar owais avatar pietro avatar wmorgan6796 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

kinesis-producer's Issues

Logrus Entry vs Logger

To support logging already defined fields using logrus, is there any concern with switching to Entry rather than Logger?

From logrus docs:

These objects can be reused and passed around as much as you wish to avoid field duplication.

Add README

Often it's helpful to have fields always attached to log statements in an application or parts of one. For example, you may want to always log the request_id and user_ip in the context of a request. Instead of writing log.WithFields(log.Fields{"request_id": request_id, "user_ip": user_ip}) on every line, you can create a logrus.Entry to pass around instead:

Race in Producer.Put causes panic

producer.go After deciding to drain aggregator we release the read lock and grab write lock. Another goroutine can drain the aggregator during this period. Calling Drain() on an empty aggregator results in panic.

Use In AWS Lambda

I want to use this library with AWS Lambda but producer cannot be reused after stop.
Below example code occuers Unable to Put record. Producer is already stopped when runs at second times.

package main

import (
	"fmt"
	"github.com/a8m/kinesis-producer"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/kinesis"
	"golang.org/x/sync/errgroup"
	"os"
)

var pr = producer.New(&producer.Config{
		StreamName: os.Getenv("KINESIS_STREAM"),
		Client:     kinesis.New(session.Must(session.NewSession())),
	})

func handle(e events.KinesisEvent) error {
	eg := errgroup.Group{}

	pr.Start()
	eg.Go(func() error {
		for r := range pr.NotifyFailures() {
			return r
		}
		return nil
	})

	for _, r := range e.Records {
		// Any logic for each records
		if err := pr.Put(r.Kinesis.Data, r.Kinesis.PartitionKey); err != nil {
			return err
		}
	}
	pr.Stop()
	return eg.Wait()
}

func main() {
	lambda.Start(handle)
}

Of course, if I generate Producer every time, it works well but I want to reuse the producer as much as possible.

var kc = kinesis.New(session.Must(session.NewSession()))

func handle(e events.KinesisEvent) error {
	var pr = producer.New(&producer.Config{
		StreamName: os.Getenv("KINESIS_STREAM"),
		Client:     kc,
	})

	eg := errgroup.Group{}

	pr.Start()
	eg.Go(func() error {
		for r := range pr.NotifyFailures() {
			return r
		}
		return nil
	})

	for _, r := range e.Records {
		if err := pr.Put(r.Kinesis.Data, r.Kinesis.PartitionKey); err != nil {
			return err
		}
	}
	pr.Stop() 
	return eg.Wait()
}

Is it possible to make the Producer discretion by making the flush() method of the Producer public or by Stop() and then Start() again?

Usage of ExplicitHashKey

Hi there,

I've tried to add a 3rd parameter to Put to make use of the ExplicitHashKey. Since I wanted a quick solution I left the normal PartitionKey untouched and called it with "0" My assumption was, that the ExplicitHashKey will be used if present. Unfortunately it doesn't seem to work since all messages go to the same shard eventhough I'm using the "StartingHashKey" of each of my shards.

I've forked it here: https://github.com/konstantinj/kinesis-producer

Maybe you have an idea why it doesn't work.

I want to use round robin for my partition keys because it never really distributes well using my actual data as keys.

extra junk added to data when read from kinesis

Hi, it appears that as I'm writing JSON using this package that it may be adding extra data and I can't trace the source.

for example
payload := []byte("{\"test\": \"me\"}")
Pr.Put(payload, "11111111-1111-1111-1111-111111111122")

If i consume the stream with the AWS CLI, and base64decode ASCII the data field, I get the following string:

$11111111-1111-1111-1111-111111111122{"test": "me"}iTDu Z�

Can we cut a new release?

Hey @a8m. Thanks for the PR review and merge. Can we cut out 0.2.0 so package managers can figure out how to get the new code?

Thanks

Aggregation Issues

The aggregator uses the same key for all records. This creates problem in the downstream flow - e.g. we map the key to a Kafka partition, and our mapping flow breaks when we try to feed Kinesis records into Kafka. This is mentioned as a problem in comments. Any chance of getting this addressed in the near future?

Also, there doesn't seem to be mapping of key to shard. Such a mapping would be a great help...

Replace logger with standard lib logger or an interface

Projects that use kinesis-producer might use a different logging library than logrus. It would be great if kinesis-producer could expect an interface instead of a concrete type (logrus logger). It would allow us to use the same logging library in a project. Has this been discussed before? If you are open to the idea, I can try to contribute it.

Thanks

Accept an interface instead of []byte in producer's Put method

Put expects records to be written to Kinesis as []byte. This works fine for simple cases but often a more complex layer wraps kinesis-producer and needs more control/flexibility over how different cases (such as failures) are handled. For example, a program might want to handle failures in a special way but right now the FailureResult struct only contains the partition key and the raw data as bytes. There is no other identifying information that might help identify the records.

Accepting an interface instead of raw bytes would make the library a lot more flexible. For example,

type Record struct {
      id string
      data []byte
      failure_count int
}

func (r Record) Bytes() []byte {
     return r.data
}

func process(r Record) {
      producer.Put(r, "partition-key")
}

func processErrors() {
   for f := producer.NotifyFailures() {
      r := f.Item.(Record)
      if (r.failure_count > 3) {
           // log failure 
           return
      }
      r.failure_count++
      process(r)
   }
}

Or a record could marshal itself as bytes that kinesis-producer could use internally by calling the Read() method and then wouldn't have to unmarshal the bytes on every failure in order to be able to inspect the record (for specialized logging, retries, failure handling etc).

kinesis-producer API would add an interface definition

type Record interface {
     Read() []byte
}

producer.Put(r Record, k string)

type FailureRecord struct {
     Record Record
     ParitionKey string
}

Or partition key could be rolled into the record as well like:

type Record interface {
      Read() []byte
      PartitionKey() string
}

producer.Put(r Record)

In this case, failure channel would just return failed Records and client side implementation could look like:

type Record struct {
    ID string
    UserID string
    Timestamp time.Time
    // any other number of fields
}

func (r Record) Read() []byte {
    // marshal r and return []bytes
}

func (r Record) PartitionKey() string {
     return r.ID
}

The library could also ship with a simple implementation to cover simple cases. Usage would look like:

pr.Put(producer.Record{myData, myPartitionKey})

How to use logrus

Hi there,

I cannot get logrus to work unfortunately. I'm following the example from the readme

import (
	"github.com/sirupsen/logrus"
	producer "github.com/a8m/kinesis-producer"
	"github.com/a8m/kinesis-producer/loggers"
)

log := logrus.New()

&producer.Config{
  StreamName:   "test",
  BacklogCount: 2000,
  Client:       client,
  Logger:       loggers.Logrus(log),
}

but the package "loggers" does not exist any more only the folder. And also "Logrus" doesn't exist there anymore. It looks like readme and code drifted apart. What's the correct way here?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.