lightbend / kafka-streams-scala Goto Github PK
View Code? Open in Web Editor NEWThin Scala wrapper around Kafka Streams Java API
License: Apache License 2.0
Thin Scala wrapper around Kafka Streams Java API
License: Apache License 2.0
Please add option to use implicit Serdes in this library,
Similar to:
https://github.com/ogirardot/typesafe-kafka-streams
Any plans to include the state store API in this project?
def merge(stream: KStreamS[K, V]): KStreamS[K, V] = inner.merge(stream)
this is the definition of KStreamS#merge which causes stackoverflow. I think the cause is that the inner KStream instance is getting wrapped implicitly with KStreamS and calling its merge recursively. I believe just changing the name of the function should fix that.
from a quick look, a few more functions there has the same issue.
The following example is the modification of StreamToTableJoinScalaIntegrationTestImplicitSerdes test:
server.createTopic(userClicksTopic)
val stringSerde: Serde[String] = Serdes.String()
val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
implicit val serialized: Serialized[String, Long] = Serialized.`with`(stringSerde, longSerde)
val streamsConfiguration: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, s"stream-table-join-scala-integration-test-implicit-serdes-${scala.util.Random.nextInt(100)}")
p.put(StreamsConfig.CLIENT_ID_CONFIG, "join-scala-integration-test-implicit-serdes-standard-consumer")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100")
p.put(StreamsConfig.STATE_DIR_CONFIG, localStateDir)
p
}
val builder = new StreamsBuilderS()
val userClicksStream: KStreamS[String, Long] = builder.stream(userClicksTopic)
userClicksStream
.groupByKey
.reduce((_: Long, v2: Long) => v2, "my-ktable-name")
.toStream
.through(outputTopic)
.foreach((k, v) => println(k -> v))
val streams: KafkaStreams = new KafkaStreams(builder.build, streamsConfiguration)
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
override def uncaughtException(t: Thread, e: Throwable): Unit = try {
println(s"Stream terminated because of uncaught exception .. Shutting down app", e)
e.printStackTrace
val closed = streams.close()
println(s"Exiting application after streams close ($closed)")
} catch {
case x: Exception => x.printStackTrace
} finally {
println("Exiting application ..")
System.exit(-1)
}
})
streams.start()
val sender = MessageSender[String, Long](brokers, classOf[StringSerializer].getName, classOf[LongSerializer].getName)
userClicks.foreach(r => sender.writeKeyValue(userClicksTopic, r.key, r.value))
val listener = MessageListener(brokers, outputTopic, "join-scala-integration-test-standard-consumer",
classOf[StringDeserializer].getName,
classOf[LongDeserializer].getName,
new RecordProcessor
)
val l = listener.waitUntilMinKeyValueRecordsReceived(3, 30000)
streams.close()
assertEquals(
l.sortBy(_.key),
Seq(
new KeyValue("chao", 25L),
new KeyValue("bob", 19L),
new KeyValue("dave", 56L),
new KeyValue("eve", 78L),
new KeyValue("alice", 40L),
new KeyValue("fang", 99L)
).sortBy(_.key)
)
This part of the code above:
userClicksStream
.groupByKey
.reduce((_: Long, v2: Long) => v2, "my-ktable-name")
constantly fails with the following exception:
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
at scala.runtime.java8.JFunction2$mcJJJ$sp.apply(JFunction2$mcJJJ$sp.java:12)
at com.lightbend.kafka.scala.streams.KGroupedStreamS.$anonfun$reduce$3(KGroupedStreamS.scala:49)
at com.lightbend.kafka.scala.streams.FunctionConversions$ReducerFromFunction$.$anonfun$asReducer$1(FunctionConversions.scala:46)
at org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceProcessor.process(KStreamReduce.java:76)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
At the same time the same code rewritten on a pure Java API works fine.
P.S. I use the latest Release 0.1.2 of the kafka-streams-scala library. Scala 2.11.
Hi,
is there any example how to use kafka-streams-scala with case classes ?
Thanks,
First of all, thanks a lot for this library. It makes my code look much cleaner.
I did notice that StreamsBuilderS.globalTable
has no overload where I can pass a customer Consumed[K, V]
parameter as in
(new StreamsBuilderS).globalTable("my_table", Consumed.`with`(new MyKeySerde, new MyValueSerde))
My current workaround is to use the inner
StreamsBuilder like:
(new StreamsBuilderS).inner.globalTable("my_table", Consumed.`with`(new MyKeySerde, new MyValueSerde))
Is there any specific reason why StreamsBuilderS.globalTable
doesn't have this overload?
Hi,
For the people using Circe with Kafka Streams I have created a little Serde adapter: https://github.com/joan38/kafka-streams-circe
This is way too tiny to be in a separate repo on it's own so I was thinking maybe it's a good idea to add this here in a separate project.
Something like:
Just like in https://github.com/sksamuel/elastic4s
This may solve the questions around #42
Let me know what you think. I'm more than happy to do the change.
Cheers
minitest
doesn't allow data setup for an entire suite. It's only for each test case. Running the KafkaLocalServer once for each test case is a huge memory hog especially when running all tests at once from sbt.
I notice with this pr that you have an example on working with avro specifically with Avro4s, #59 .
I am wondering if there is any expectation that this library will or won't work with trying integrate with the schema-registry as demonstrated here https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/test/scala/io/confluent/examples/streams/SpecificAvroScalaIntegrationTest.scala#L69
I can add the required dependencies that give me AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG
and SpecificAvroSerde
and SpecificAvroRecord
.
So first question is, do either of things matter? should I be able to ignore them and expect the schemaregistry to just work after adding its url to the stream config? I am guessing not but asking just in case I am overthinking things.
Assuming that it will not just work, I think there might be two obstacles to this approach with your library. The first is how to get an instance of SpecificAvroRecord
that you can pass into SpecificAvroSerde
and the second problem is will the SpecificAvroSerde
be acceptable as the implicit key and value serde
Is the schema-registry a use case you guys have talked about at all? Am I missing something straightforward?
Thanks
Edit: in the example I linked I think they get an instance of SpecificAvroRecord from the fact they are using the Avro maven code generation plugin to generate WikiFeed but I am not positive
Edit 2: hmmm maybe I should be looking at the GenericAvroRecord example instead and using avro4s RecordFormat object to convert from case classes to GenericAvroRecord. that seems to mean I would still then need to use avro to convert it into the case class I want in each step of the stream which seems unneccesary. I wonder why avro4s does not offer a SpecificRecordFormat
Is there a reason that KSteamS#join
is a left join instead of an inner join? Given that Kafka is pretty consistent that join
is an inner join, I'm a little confused why kafka-streams-scala's join
calls leftJoin
.
Regarding the javadoc of transform function of KStream class null return values are possible when we want to discard values.
The actual wrapper code pattern match the result to a tuple which raise a MatchError.
I am using 0.11.0 streams
I have 3 nodes running kafka cluster of 3 broker
and i am running the 3 kafka stream with same application.id
each node has one broker one kafka stream application
everything works fine during setup
i bringdown one node, so one kafka broker and one streaming app is down
now i see exceptions in other two streaming apps and it never gets re balanced waited for hours and never comes back to norma
is there anything am missing?
i also tried looking into when one broker is down call stream.close, cleanup and restart this also doesn't help
can anyone help me?
API documentation is now available at https://developer.lightbend.com/docs/api/kafka-streams-scala/0.2.1/com/lightbend/kafka/scala/streams/ , but that URL is tied to version 0.2.1. It would be nice to be able to point to https://developer.lightbend.com/docs/api/kafka-streams-scala/current/com/lightbend/kafka/scala/streams/
Calling filter(K,V => Boolean, Materialized)
causes stack overflow. Looks due to strange implicit coversion of inner
property.
Probably similar to: #41
I am using Scala 2.12.4.
Simple Gist here to demonstrate: https://gist.github.com/mtranter/3fc613d63ce584adba28bc81e1622331
Hi guys, the readme says you distribute the test utils in src/test with the library, particularly the local kafka server utils. However it does not appear to be in my path and when I use the ARchive browser in sonatype I dont see the server package in any of releases. Am I missing something or would this line https://github.com/lightbend/kafka-streams-scala/blob/develop/build.sbt#L57 prevent the test resources being shipped?
We would to use this library in combination with Kafka 2.0 which currently isn't possible.
Since it has been included upstream
The Consumed
class can be provided with TimstampExtractor
.
I suggest add this to the wrapper API.
There should be a way to provide the value of inner :StreamsBuilder
So that the library must not create its own version of StreamBuilder like in case of using https://github.com/jpzk/mockedstreams library for unit Testing.
For now The workaround is making a separate class that Extends StreamsBuilderS and then overriding inner there:
class StreamSTest(streamBuilder: StreamsBuilder) extends StreamsBuilderS {
override val inner = streamBuilder
}
I cannot seem to compile a simple application based on Confluent's example in Scala 2.11. Below is a sample of the code I am using. The compiler cannot make sense of the flatMapValues
signature, with or without the implicit conversions. I am guessing that I am hitting an issue with the SAM translations from the Java 8 stream API. Does this library have some additional conversions that are meant to apply here? Is 2.11 supported?
val statusUpdates: KStream[String, StatusUpdate] =
builder.stream[String, StatusUpdate](Serdes.String(), CustomSerdes.statusUpdate, "statusUpdate")
statusUpdates.
flatMapValues(statusUpdate => statusUpdate.metrics.getOrElse(Seq.empty)).
to(Serdes.String(), CustomSerdes.metric, "metrics")
new KafkaStreams(builder, streamingConfig)
It could be used for unit test and release.
as reported by @deanwampler in his code review
Hi,
I have been learning about Kafka streams lately using Scala, and came across this repo which is perfect ๐, i have seen that for testing (integration) you use an embedded Kafka which is a bit hard to set up for tests, would it be possible to integrate https://github.com/jpzk/mockedstreams which is based on Kafka's ProcessorTopologyTestDriver to avoid having zookeeper and Kafka brokers on each test?
this will enable developers to fiddle with Kafka streams in a lightweight manner.
Thanks!
Currently, KStreamS#transformValues only has support for ValueTransformer.
ValueTransformerWithKey support is needed.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.