Comments (3)
Wrote an extension to support it
// Add ValueTransformerWithKeySupplier
import com.lightbend.kafka.scala.streams.KStreamS
implicit class KStreamSWithValueTransformerKey[K,V](inner: KStreamS[K,V]) extends KStreamS[K,V](inner.inner){
def transformValuesWithKey[VR](valueTransformerSupplierWithKey: () => ValueTransformerWithKey[K, V, VR],
stateStoreNames: String*): KStreamS[K, VR] = {
val valueTransformerWithKeySupplierJ: ValueTransformerWithKeySupplier[K, V, VR] = () => valueTransformerSupplierWithKey()
new KStreamS(inner.inner.transformValues[VR](valueTransformerWithKeySupplierJ, stateStoreNames: _*))
}
}
from kafka-streams-scala.
Not submitting a pull request since this only applies to Kafka 1.1.0 and the current branches are for Kafka 1.0.0.
Let me know if a branch for Kafka 1.1.0 is opened.
def transformValuesWithKey[VR](valueTransformerWithKeySupplier: () => ValueTransformerWithKey[K, V, VR],
stateStoreNames: String*): KStreamS[K, VR] = {
val valueTransformerWithKeySupplierJ: ValueTransformerWithKeySupplier[K, V, VR] = () => valueTransformerWithKeySupplier()
new KStreamS(inner.transformValues[VR](valueTransformerWithKeySupplierJ, stateStoreNames: _*))
}
would do
from kafka-streams-scala.
Please submit a PR to apache/kafka. streams-scala
is on master
now .. https://github.com/apache/kafka/tree/trunk/streams/streams-scala .. and will be released with 2.0
from kafka-streams-scala.
Related Issues (20)
- Issue with KStreams transform returning null HOT 1
- Is 2.11 supported? HOT 2
- KGroupedStreamS recursion HOT 12
- case classes HOT 12
- Implicit Serdes HOT 22
- StateStore API HOT 1
- Mockedstreams for unit testing! HOT 2
- KTable#filter(K,V => Boolean, Materialized) is called recusively on KTableS HOT 2
- Kafka server utils not being packaged HOT 7
- java.lang.ClassCastException for reduce api call HOT 11
- Question: avro plus Schema Registry? HOT 9
- Timestamp extractor not supported HOT 1
- KStreamS#merge is recursive, causing StackOverflow on initialization HOT 3
- KStream-KTable inner join HOT 1
- Proposal to add Circe support HOT 4
- kafka streams doesn't rebalance when one broker is down HOT 1
- Kafka 2.0 support HOT 2
- stable URL to publish 'current' API documentation HOT 2
- Archive this repository HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-streams-scala.