Giter VIP home page Giter VIP logo

kafka-streams-scala's Introduction

Note: Scala API for Kafka Streams have been accepted for inclusion in Apache Kafka. We have been working with the Kafka team since the last couple of months working towards meeting the standards and guidelines for this activity. Lightbend and Alexis Seigneurin have contributed this library (with some changes) to the Kafka community. This is already available on Apache Kafka trunk and will be included in the upcoming release of Kafka. Hence it does not make much sense to update this project on a regular basis. For some time however, we will continue to provide support for fixing bugs only.

A Thin Scala Wrapper Around the Kafka Streams Java API

Build Status

The library wraps Java APIs in Scala thereby providing:

  1. much better type inference in Scala
  2. less boilerplate in application code
  3. the usual builder-style composition that developers get with the original Java API
  4. complete compile time type safety

The design of the library was inspired by the work started by Alexis Seigneurin in this repository.

Quick Start

kafka-streams-scala is published and cross-built for Scala 2.11, and 2.12, so you can just add the following to your build:

val kafka_streams_scala_version = "0.2.1"

libraryDependencies ++= Seq("com.lightbend" %%
  "kafka-streams-scala" % kafka_streams_scala_version)

Note: kafka-streams-scala supports onwards Kafka Streams 1.0.0.

The API docs for kafka-streams-scala is available here for Scala 2.12 and here for Scala 2.11.

Running the Tests

The library comes with an embedded Kafka server. To run the tests, simply run sbt testOnly and all tests will run on the local embedded server.

The embedded server is started and stopped for every test and takes quite a bit of resources. Hence it's recommended that you allocate more heap space to sbt when running the tests. e.g. sbt -mem 2000.

$ sbt -mem 2000
> +clean
> +test

Type Inference and Composition

Here's a sample code fragment using the Scala wrapper library. Compare this with the Scala code from the same example in Confluent's repository.

// Compute the total per region by summing the individual click counts per region.
val clicksPerRegion: KTableS[String, Long] = userClicksStream

  // Join the stream against the table.
  .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))

  // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
  .map((_, regionWithClicks) => regionWithClicks)

  // Compute the total per region by summing the individual click counts per region.
  .groupByKey
  .reduce(_ + _)

Implicit Serdes

One of the areas where the Java APIs' verbosity can be reduced is through a succinct way to pass serializers and de-serializers to the various functions. The library uses the power of Scala implicits towards this end. The library makes some decisions that help implement more succinct serdes in a type safe manner:

  1. No use of configuration based default serdes. Java APIs allow the user to define default key and value serdes as part of the configuration. This configuration, being implemented as java.util.Properties is type-unsafe and hence can result in runtime errors in case the user misses any of the serdes to be specified or plugs in an incorrect serde. kafka-streams-scala makes this completely type-safe by allowing all serdes to be specified through Scala implicits.
  2. The library offers implicit conversions from serdes to Serialized, Produced, Consumed or Joined. Hence as a user you just have to pass in the implicit serde and all conversions to Serialized, Produced, Consumed or Joined will be taken care of automatically.

Default Serdes

The library offers a module that contains all the default serdes for the primitives. Importing the object will bring in scope all such primitives and helps reduce implicit hell.

object DefaultSerdes {
  implicit val stringSerde: Serde[String] = Serdes.String()
  implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
  implicit val byteArraySerde: Serde[Array[Byte]] = Serdes.ByteArray()
  implicit val bytesSerde: Serde[org.apache.kafka.common.utils.Bytes] = Serdes.Bytes()
  implicit val floatSerde: Serde[Float] = Serdes.Float().asInstanceOf[Serde[Float]]
  implicit val doubleSerde: Serde[Double] = Serdes.Double().asInstanceOf[Serde[Double]]
  implicit val integerSerde: Serde[Int] = Serdes.Integer().asInstanceOf[Serde[Int]]
}

Compile time typesafe

Not only the serdes, but DefaultSerdes also brings into scope implicit Serialized, Produced, Consumed and Joined instances. So all APIs that accept Serialized, Produced, Consumed or Joined will get these instances automatically with an import DefaultSerdes._.

Just one import of DefaultSerdes._ and the following code does not need a bit of Serialized, Produced, Consumed or Joined to be specified explicitly or through the default config. And the best part is that for any missing instances of these you get a compilation error. ..

import DefaultSerdes._

val clicksPerRegion: KTableS[String, Long] =
  userClicksStream

  // Join the stream against the table.
  .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))

  // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
  .map((_, regionWithClicks) => regionWithClicks)

  // Compute the total per region by summing the individual click counts per region.
  .groupByKey
  .reduce(_ + _)

  // Write the (continuously updating) results to the output topic.
  clicksPerRegion.toStream.to(outputTopic)

kafka-streams-scala's People

Contributors

abdheshkumar avatar debasishg avatar joan38 avatar justinpihony avatar maasg avatar mtebourbi avatar shiv4nsh avatar sv3ndk avatar xuwei-k avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-streams-scala's Issues

StreamsBuilderS.globalTable misses overload with Consumed[K, V] as parameter

First of all, thanks a lot for this library. It makes my code look much cleaner.

I did notice that StreamsBuilderS.globalTable has no overload where I can pass a customer Consumed[K, V] parameter as in

(new StreamsBuilderS).globalTable("my_table", Consumed.`with`(new MyKeySerde, new MyValueSerde))

My current workaround is to use the inner StreamsBuilder like:

(new StreamsBuilderS).inner.globalTable("my_table", Consumed.`with`(new MyKeySerde, new MyValueSerde))

Is there any specific reason why StreamsBuilderS.globalTable doesn't have this overload?

java.lang.ClassCastException for reduce api call

The following example is the modification of StreamToTableJoinScalaIntegrationTestImplicitSerdes test:

server.createTopic(userClicksTopic)

val stringSerde: Serde[String] = Serdes.String()
val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]

implicit val serialized: Serialized[String, Long] = Serialized.`with`(stringSerde, longSerde)

val streamsConfiguration: Properties = {
  val p = new Properties()
  p.put(StreamsConfig.APPLICATION_ID_CONFIG, s"stream-table-join-scala-integration-test-implicit-serdes-${scala.util.Random.nextInt(100)}")
  p.put(StreamsConfig.CLIENT_ID_CONFIG, "join-scala-integration-test-implicit-serdes-standard-consumer")
  p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
  p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
  p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
  p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100")
  p.put(StreamsConfig.STATE_DIR_CONFIG, localStateDir)
  p
}

val builder = new StreamsBuilderS()

val userClicksStream: KStreamS[String, Long] = builder.stream(userClicksTopic)

userClicksStream
  .groupByKey
  .reduce((_: Long, v2: Long) => v2, "my-ktable-name")
  .toStream
  .through(outputTopic)
  .foreach((k, v) => println(k -> v))

val streams: KafkaStreams = new KafkaStreams(builder.build, streamsConfiguration)

streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
  override def uncaughtException(t: Thread, e: Throwable): Unit = try {
    println(s"Stream terminated because of uncaught exception .. Shutting down app", e)
    e.printStackTrace
    val closed = streams.close()
    println(s"Exiting application after streams close ($closed)")
  } catch {
    case x: Exception => x.printStackTrace
  } finally {
    println("Exiting application ..")
    System.exit(-1)
  }
})

streams.start()

val sender = MessageSender[String, Long](brokers, classOf[StringSerializer].getName, classOf[LongSerializer].getName)

userClicks.foreach(r => sender.writeKeyValue(userClicksTopic, r.key, r.value))

val listener = MessageListener(brokers, outputTopic, "join-scala-integration-test-standard-consumer",
  classOf[StringDeserializer].getName,
  classOf[LongDeserializer].getName,
  new RecordProcessor
)

val l = listener.waitUntilMinKeyValueRecordsReceived(3, 30000)

streams.close()

assertEquals(
  l.sortBy(_.key),
  Seq(
    new KeyValue("chao", 25L),
    new KeyValue("bob", 19L),
    new KeyValue("dave", 56L),
    new KeyValue("eve", 78L),
    new KeyValue("alice", 40L),
    new KeyValue("fang", 99L)
  ).sortBy(_.key)
)

This part of the code above:

userClicksStream
   .groupByKey
   .reduce((_: Long, v2: Long) => v2, "my-ktable-name")

constantly fails with the following exception:

java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
at scala.runtime.java8.JFunction2$mcJJJ$sp.apply(JFunction2$mcJJJ$sp.java:12)
at com.lightbend.kafka.scala.streams.KGroupedStreamS.$anonfun$reduce$3(KGroupedStreamS.scala:49)
at com.lightbend.kafka.scala.streams.FunctionConversions$ReducerFromFunction$.$anonfun$asReducer$1(FunctionConversions.scala:46)
at org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceProcessor.process(KStreamReduce.java:76)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)

At the same time the same code rewritten on a pure Java API works fine.

P.S. I use the latest Release 0.1.2 of the kafka-streams-scala library. Scala 2.11.

kafka streams doesn't rebalance when one broker is down

I am using 0.11.0 streams

I have 3 nodes running kafka cluster of 3 broker

and i am running the 3 kafka stream with same application.id

each node has one broker one kafka stream application

everything works fine during setup

i bringdown one node, so one kafka broker and one streaming app is down

now i see exceptions in other two streaming apps and it never gets re balanced waited for hours and never comes back to norma

is there anything am missing?

i also tried looking into when one broker is down call stream.close, cleanup and restart this also doesn't help

can anyone help me?

Question: avro plus Schema Registry?

I notice with this pr that you have an example on working with avro specifically with Avro4s, #59 .

I am wondering if there is any expectation that this library will or won't work with trying integrate with the schema-registry as demonstrated here https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/test/scala/io/confluent/examples/streams/SpecificAvroScalaIntegrationTest.scala#L69

I can add the required dependencies that give me AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG and SpecificAvroSerde and SpecificAvroRecord.

So first question is, do either of things matter? should I be able to ignore them and expect the schemaregistry to just work after adding its url to the stream config? I am guessing not but asking just in case I am overthinking things.

Assuming that it will not just work, I think there might be two obstacles to this approach with your library. The first is how to get an instance of SpecificAvroRecord that you can pass into SpecificAvroSerde and the second problem is will the SpecificAvroSerde be acceptable as the implicit key and value serde

Is the schema-registry a use case you guys have talked about at all? Am I missing something straightforward?

Thanks

Edit: in the example I linked I think they get an instance of SpecificAvroRecord from the fact they are using the Avro maven code generation plugin to generate WikiFeed but I am not positive

Edit 2: hmmm maybe I should be looking at the GenericAvroRecord example instead and using avro4s RecordFormat object to convert from case classes to GenericAvroRecord. that seems to mean I would still then need to use avro to convert it into the case class I want in each step of the stream which seems unneccesary. I wonder why avro4s does not offer a SpecificRecordFormat

Proposal to add Circe support

Hi,

For the people using Circe with Kafka Streams I have created a little Serde adapter: https://github.com/joan38/kafka-streams-circe

This is way too tiny to be in a separate repo on it's own so I was thinking maybe it's a good idea to add this here in a separate project.

Something like:

  • kafka-streams-core
  • kafka-streams-circe
  • docs (with sbt-microsite for example)
  • (Any other stuff here?)

Just like in https://github.com/sksamuel/elastic4s
This may solve the questions around #42

Let me know what you think. I'm more than happy to do the change.

Cheers

Issue with KStreams transform returning null

Regarding the javadoc of transform function of KStream class null return values are possible when we want to discard values.
The actual wrapper code pattern match the result to a tuple which raise a MatchError.

Is 2.11 supported?

I cannot seem to compile a simple application based on Confluent's example in Scala 2.11. Below is a sample of the code I am using. The compiler cannot make sense of the flatMapValues signature, with or without the implicit conversions. I am guessing that I am hitting an issue with the SAM translations from the Java 8 stream API. Does this library have some additional conversions that are meant to apply here? Is 2.11 supported?

    val statusUpdates: KStream[String, StatusUpdate] =
      builder.stream[String, StatusUpdate](Serdes.String(), CustomSerdes.statusUpdate, "statusUpdate")

    statusUpdates.
      flatMapValues(statusUpdate => statusUpdate.metrics.getOrElse(Seq.empty)).
      to(Serdes.String(), CustomSerdes.metric, "metrics")

    new KafkaStreams(builder, streamingConfig)

case classes

Hi,

is there any example how to use kafka-streams-scala with case classes ?

Thanks,

There should be a way to override the StreamBuilder instance present inside the StreamBuilderS for unit testing.

There should be a way to provide the value of inner :StreamsBuilder So that the library must not create its own version of StreamBuilder like in case of using https://github.com/jpzk/mockedstreams library for unit Testing.

For now The workaround is making a separate class that Extends StreamsBuilderS and then overriding inner there:

class StreamSTest(streamBuilder: StreamsBuilder) extends StreamsBuilderS {
  override val inner = streamBuilder
}

KStream-KTable inner join

Is there a reason that KSteamS#join is a left join instead of an inner join? Given that Kafka is pretty consistent that join is an inner join, I'm a little confused why kafka-streams-scala's join calls leftJoin.

KStreamS#merge is recursive, causing StackOverflow on initialization

def merge(stream: KStreamS[K, V]): KStreamS[K, V] = inner.merge(stream)

this is the definition of KStreamS#merge which causes stackoverflow. I think the cause is that the inner KStream instance is getting wrapped implicitly with KStreamS and calling its merge recursively. I believe just changing the name of the function should fix that.

from a quick look, a few more functions there has the same issue.

Kafka 2.0 support

We would to use this library in combination with Kafka 2.0 which currently isn't possible.

KGroupedStreamS recursion

Hello,
When using the "reduce" functions inside KGroupedStreamS I get StackOverflow exception.
Looking at the package itself in my IDE (inteliji) i can see the recursion calls:
screen shot 2018-01-21 at 18 02 18

Mockedstreams for unit testing!

Hi,
I have been learning about Kafka streams lately using Scala, and came across this repo which is perfect ๐Ÿ‘, i have seen that for testing (integration) you use an embedded Kafka which is a bit hard to set up for tests, would it be possible to integrate https://github.com/jpzk/mockedstreams which is based on Kafka's ProcessorTopologyTestDriver to avoid having zookeeper and Kafka brokers on each test?
this will enable developers to fiddle with Kafka streams in a lightweight manner.
Thanks!

StateStore API

Any plans to include the state store API in this project?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.