Giter VIP home page Giter VIP logo

kafka-streams-scala's Issues

StateStore API

Any plans to include the state store API in this project?

KStreamS#merge is recursive, causing StackOverflow on initialization

def merge(stream: KStreamS[K, V]): KStreamS[K, V] = inner.merge(stream)

this is the definition of KStreamS#merge which causes stackoverflow. I think the cause is that the inner KStream instance is getting wrapped implicitly with KStreamS and calling its merge recursively. I believe just changing the name of the function should fix that.

from a quick look, a few more functions there has the same issue.

java.lang.ClassCastException for reduce api call

The following example is the modification of StreamToTableJoinScalaIntegrationTestImplicitSerdes test:

server.createTopic(userClicksTopic)

val stringSerde: Serde[String] = Serdes.String()
val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]

implicit val serialized: Serialized[String, Long] = Serialized.`with`(stringSerde, longSerde)

val streamsConfiguration: Properties = {
  val p = new Properties()
  p.put(StreamsConfig.APPLICATION_ID_CONFIG, s"stream-table-join-scala-integration-test-implicit-serdes-${scala.util.Random.nextInt(100)}")
  p.put(StreamsConfig.CLIENT_ID_CONFIG, "join-scala-integration-test-implicit-serdes-standard-consumer")
  p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
  p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
  p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
  p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100")
  p.put(StreamsConfig.STATE_DIR_CONFIG, localStateDir)
  p
}

val builder = new StreamsBuilderS()

val userClicksStream: KStreamS[String, Long] = builder.stream(userClicksTopic)

userClicksStream
  .groupByKey
  .reduce((_: Long, v2: Long) => v2, "my-ktable-name")
  .toStream
  .through(outputTopic)
  .foreach((k, v) => println(k -> v))

val streams: KafkaStreams = new KafkaStreams(builder.build, streamsConfiguration)

streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
  override def uncaughtException(t: Thread, e: Throwable): Unit = try {
    println(s"Stream terminated because of uncaught exception .. Shutting down app", e)
    e.printStackTrace
    val closed = streams.close()
    println(s"Exiting application after streams close ($closed)")
  } catch {
    case x: Exception => x.printStackTrace
  } finally {
    println("Exiting application ..")
    System.exit(-1)
  }
})

streams.start()

val sender = MessageSender[String, Long](brokers, classOf[StringSerializer].getName, classOf[LongSerializer].getName)

userClicks.foreach(r => sender.writeKeyValue(userClicksTopic, r.key, r.value))

val listener = MessageListener(brokers, outputTopic, "join-scala-integration-test-standard-consumer",
  classOf[StringDeserializer].getName,
  classOf[LongDeserializer].getName,
  new RecordProcessor
)

val l = listener.waitUntilMinKeyValueRecordsReceived(3, 30000)

streams.close()

assertEquals(
  l.sortBy(_.key),
  Seq(
    new KeyValue("chao", 25L),
    new KeyValue("bob", 19L),
    new KeyValue("dave", 56L),
    new KeyValue("eve", 78L),
    new KeyValue("alice", 40L),
    new KeyValue("fang", 99L)
  ).sortBy(_.key)
)

This part of the code above:

userClicksStream
   .groupByKey
   .reduce((_: Long, v2: Long) => v2, "my-ktable-name")

constantly fails with the following exception:

java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
at scala.runtime.java8.JFunction2$mcJJJ$sp.apply(JFunction2$mcJJJ$sp.java:12)
at com.lightbend.kafka.scala.streams.KGroupedStreamS.$anonfun$reduce$3(KGroupedStreamS.scala:49)
at com.lightbend.kafka.scala.streams.FunctionConversions$ReducerFromFunction$.$anonfun$asReducer$1(FunctionConversions.scala:46)
at org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceProcessor.process(KStreamReduce.java:76)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)

At the same time the same code rewritten on a pure Java API works fine.

P.S. I use the latest Release 0.1.2 of the kafka-streams-scala library. Scala 2.11.

case classes

Hi,

is there any example how to use kafka-streams-scala with case classes ?

Thanks,

StreamsBuilderS.globalTable misses overload with Consumed[K, V] as parameter

First of all, thanks a lot for this library. It makes my code look much cleaner.

I did notice that StreamsBuilderS.globalTable has no overload where I can pass a customer Consumed[K, V] parameter as in

(new StreamsBuilderS).globalTable("my_table", Consumed.`with`(new MyKeySerde, new MyValueSerde))

My current workaround is to use the inner StreamsBuilder like:

(new StreamsBuilderS).inner.globalTable("my_table", Consumed.`with`(new MyKeySerde, new MyValueSerde))

Is there any specific reason why StreamsBuilderS.globalTable doesn't have this overload?

Proposal to add Circe support

Hi,

For the people using Circe with Kafka Streams I have created a little Serde adapter: https://github.com/joan38/kafka-streams-circe

This is way too tiny to be in a separate repo on it's own so I was thinking maybe it's a good idea to add this here in a separate project.

Something like:

  • kafka-streams-core
  • kafka-streams-circe
  • docs (with sbt-microsite for example)
  • (Any other stuff here?)

Just like in https://github.com/sksamuel/elastic4s
This may solve the questions around #42

Let me know what you think. I'm more than happy to do the change.

Cheers

Question: avro plus Schema Registry?

I notice with this pr that you have an example on working with avro specifically with Avro4s, #59 .

I am wondering if there is any expectation that this library will or won't work with trying integrate with the schema-registry as demonstrated here https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/test/scala/io/confluent/examples/streams/SpecificAvroScalaIntegrationTest.scala#L69

I can add the required dependencies that give me AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG and SpecificAvroSerde and SpecificAvroRecord.

So first question is, do either of things matter? should I be able to ignore them and expect the schemaregistry to just work after adding its url to the stream config? I am guessing not but asking just in case I am overthinking things.

Assuming that it will not just work, I think there might be two obstacles to this approach with your library. The first is how to get an instance of SpecificAvroRecord that you can pass into SpecificAvroSerde and the second problem is will the SpecificAvroSerde be acceptable as the implicit key and value serde

Is the schema-registry a use case you guys have talked about at all? Am I missing something straightforward?

Thanks

Edit: in the example I linked I think they get an instance of SpecificAvroRecord from the fact they are using the Avro maven code generation plugin to generate WikiFeed but I am not positive

Edit 2: hmmm maybe I should be looking at the GenericAvroRecord example instead and using avro4s RecordFormat object to convert from case classes to GenericAvroRecord. that seems to mean I would still then need to use avro to convert it into the case class I want in each step of the stream which seems unneccesary. I wonder why avro4s does not offer a SpecificRecordFormat

KStream-KTable inner join

Is there a reason that KSteamS#join is a left join instead of an inner join? Given that Kafka is pretty consistent that join is an inner join, I'm a little confused why kafka-streams-scala's join calls leftJoin.

Issue with KStreams transform returning null

Regarding the javadoc of transform function of KStream class null return values are possible when we want to discard values.
The actual wrapper code pattern match the result to a tuple which raise a MatchError.

kafka streams doesn't rebalance when one broker is down

I am using 0.11.0 streams

I have 3 nodes running kafka cluster of 3 broker

and i am running the 3 kafka stream with same application.id

each node has one broker one kafka stream application

everything works fine during setup

i bringdown one node, so one kafka broker and one streaming app is down

now i see exceptions in other two streaming apps and it never gets re balanced waited for hours and never comes back to norma

is there anything am missing?

i also tried looking into when one broker is down call stream.close, cleanup and restart this also doesn't help

can anyone help me?

KGroupedStreamS recursion

Hello,
When using the "reduce" functions inside KGroupedStreamS I get StackOverflow exception.
Looking at the package itself in my IDE (inteliji) i can see the recursion calls:
screen shot 2018-01-21 at 18 02 18

Kafka 2.0 support

We would to use this library in combination with Kafka 2.0 which currently isn't possible.

There should be a way to override the StreamBuilder instance present inside the StreamBuilderS for unit testing.

There should be a way to provide the value of inner :StreamsBuilder So that the library must not create its own version of StreamBuilder like in case of using https://github.com/jpzk/mockedstreams library for unit Testing.

For now The workaround is making a separate class that Extends StreamsBuilderS and then overriding inner there:

class StreamSTest(streamBuilder: StreamsBuilder) extends StreamsBuilderS {
  override val inner = streamBuilder
}

Is 2.11 supported?

I cannot seem to compile a simple application based on Confluent's example in Scala 2.11. Below is a sample of the code I am using. The compiler cannot make sense of the flatMapValues signature, with or without the implicit conversions. I am guessing that I am hitting an issue with the SAM translations from the Java 8 stream API. Does this library have some additional conversions that are meant to apply here? Is 2.11 supported?

    val statusUpdates: KStream[String, StatusUpdate] =
      builder.stream[String, StatusUpdate](Serdes.String(), CustomSerdes.statusUpdate, "statusUpdate")

    statusUpdates.
      flatMapValues(statusUpdate => statusUpdate.metrics.getOrElse(Seq.empty)).
      to(Serdes.String(), CustomSerdes.metric, "metrics")

    new KafkaStreams(builder, streamingConfig)

Mockedstreams for unit testing!

Hi,
I have been learning about Kafka streams lately using Scala, and came across this repo which is perfect ๐Ÿ‘, i have seen that for testing (integration) you use an embedded Kafka which is a bit hard to set up for tests, would it be possible to integrate https://github.com/jpzk/mockedstreams which is based on Kafka's ProcessorTopologyTestDriver to avoid having zookeeper and Kafka brokers on each test?
this will enable developers to fiddle with Kafka streams in a lightweight manner.
Thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.