Giter VIP home page Giter VIP logo

Comments (22)

debasishg avatar debasishg commented on July 26, 2024

We are having thoughts on this as we speak. There was already a PR (#40) with implicit serdes.

We are in the process of thinking through the use cases as the handling of Serdes from the top level APIs has changed a lot in Kafka Streams 1.0. There is now an abstraction Serialized which is used for handling serialization of key/values. And the APIs have 2 types of signatures, one that takes a Serialized and uses them. And the other that uses the default serializers. We are thinking of how to unify them with implicits without losing the ability to use the default serializers.

from kafka-streams-scala.

maver1ck avatar maver1ck commented on July 26, 2024

By default serializers you mean those from default.key.serde and default.value.serde options ?
The question is if we need them at all? Probably not.

from kafka-streams-scala.

debasishg avatar debasishg commented on July 26, 2024

If u look at the APIs KStream#groupByKey() and KStream#groupByKey(Serializable), the first one uses the default serializers while the second one uses the ones explicitly supplied. If we have implicit serializers and force the users to use them, then we lose out the first capability. Every time the user has to supply the serializers as implicit values. This is the topic we debated in the PR which I referenced earlier.

One option that we are thinking is to check the availability of the implicit values of the serializers and act accordingly .. if both serdes are available for key and value we use the second API, else use the first one.

WDYT ?

from kafka-streams-scala.

maver1ck avatar maver1ck commented on July 26, 2024

In the last question I just wanted to check if we're on the same page.

I'm thinking if using default serializers isn't equal to have them as implicit values ?
(even better because if serializer type is wrong we'll get an error during compilation not in the runtime)

So current setting:

config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

we'll change to:

implicit val stringSerde = Serdes.String()

from kafka-streams-scala.

debasishg avatar debasishg commented on July 26, 2024

Indeed with implicits we have more type safety. The point is does this take away some feature from the underlying library ? We debated this as well in the PR.

So you think it's ok to take away this feature of setting the defaults as part of config and handle with implicits ?

from kafka-streams-scala.

maver1ck avatar maver1ck commented on July 26, 2024

But having function with implicit gives possibility to set explicit values on the runtime.
The only difference will be that we'll be using Serdes instead of Produced.with etc.

from kafka-streams-scala.

debasishg avatar debasishg commented on July 26, 2024

Yes, we can set explicit values with Serdes. We can replace all APIs that take Serialized with implicit Serdes. But possibly we may have to keep the ones that take Produced or Consumed (in addition to the new ones that take implicit Serdes) as these 2 allow setting of other values besides the serdes.

from kafka-streams-scala.

maver1ck avatar maver1ck commented on July 26, 2024

Sounds good.
So for groupBy we will have two definitions:

def groupBy(function)(implicit keySerde: K, valueSerde: V)
def groupBy(function, Serialized.with(keySerde, valueSerde))

from kafka-streams-scala.

debasishg avatar debasishg commented on July 26, 2024

We can debate if we need the second variant for Serialized if we have the first. But I think we need both the variants for functions that take Produced or Consumed as you can set other values besides the serdes with them.

from kafka-streams-scala.

dhoepelman avatar dhoepelman commented on July 26, 2024

Maybe providing an opt-in that enables the default serialized behavior is a solution?

// This could also be Serde instead of Serialized
// The principle is the same but the implementation slightly more difficult
implicit case object ConfigSerialized extends Serialized[Nothing, Nothing] {
   //  throw UnsupportedOperationException in methods
}
def through(topic: String)(implicit serialized: Serialized[K,V]): KStreamS[K, V] =
  if(serialized eq ConfigSerialized)
    inner.through(topic)
  else
    inner.through(topic, serialized)

This way the default is the type-safe Scala-style implicit API, but if the users adds import streams.ConfigSerialized the default serializers can be used.

If you want the behavior as default you could make the signature

def through(topic: String)(implicit serialized: Serialized[K,V] = ConfigSerialized)

this won't need the import, but you don't get a compile-type error with a missing implicit if you forget to define it, which is not what Scala users expect.

from kafka-streams-scala.

debasishg avatar debasishg commented on July 26, 2024

@dhoepelman - this was the idea I was toying with to provide both the options .. see my remark below in one of the earlier comments ..

One option that we are thinking is to check the availability of the implicit values of the serializers and act accordingly .. if both serdes are available for key and value we use the second API, else use the first one.

And you are correct that providing implicits on Serialized or Produced or Consumed calls for an easier implementation.

To everybody on this comment thread, how about the following strategy ..

  1. we provide all implicits on Serialized or Produced or Consumed instead of Serde, because these are the preferred abstractions in Kafka Streams 1.0.0
  2. use the strategy mentioned by @dhoepelman to handle default serializers. The default strategy will be using the implicits (recommended) but if someone wants to use the config serializers, she can do that with a little more effort of an additional import. This also follows the principle of making the recommended way easier to use.

from kafka-streams-scala.

maver1ck avatar maver1ck commented on July 26, 2024

Ad 1)
But then we need to provide O(n^2) implicit values instead of O(n), where n = number of types.

from kafka-streams-scala.

dhoepelman avatar dhoepelman commented on July 26, 2024

Agree with 1 and 2

@maver1ck
The standard solution is to provide an implicit conversion (e.g. in play-json) if there is a good default option

implicit def SerializedFromSerde[K,V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Serialized[K,V] = Serialized.`with`(keySerde, valueSerde)

from kafka-streams-scala.

debasishg avatar debasishg commented on July 26, 2024

@dhoepelman This one may be tricky ..

implicit case object ConfigSerialized extends Serialized[Nothing, Nothing] {
   //  throw UnsupportedOperationException in methods
}

Serialized's constructor takes another Serialized and also the implicit match may not work because of (in)variance of Java type.

from kafka-streams-scala.

debasishg avatar debasishg commented on July 26, 2024

I have started working on an implementation ..

from kafka-streams-scala.

dhoepelman avatar dhoepelman commented on July 26, 2024

@debasishg right. Invariant is correct too since a Serde (and thus Serialized) is both a serializer and deserializer :/

Instead of a case object an implicit producer should work

implicit def configSerialized[K,V]: Serialized[K,V] = new ConfigSerialized[K,V]()
// Named class so we can check isInstanceOf[ConfigSerialized]
private[streams] ConfigSerialized[K,V]() extends Serialized[K,V](Serialized.`with`(null, null)) {}

But I don't know if this will cause weird things with implicit lookups or other problems...
I and my colleague @svroonland have some time to look into or help with an implementation , but only February 16th

from kafka-streams-scala.

debasishg avatar debasishg commented on July 26, 2024

How about the technique described in http://missingfaktor.blogspot.in/2013/12/optional-implicit-trick-in-scala.html ? It's a dressed up variant of the null check on the implicit. The advantage is that we can use this technique generically and NOT have to have specialized implementations for each of Serialized, Produced and Consumed ..

from kafka-streams-scala.

dhoepelman avatar dhoepelman commented on July 26, 2024

I have difficulty seeing the advantage of Perhaps over providing a null default value for the implicit, but using a default null/Perhaps for the implicit parameter is definitely a solution to support both behaviors.

Is it possible to provide an opt-in mechanism in this case?
Doing this by default is a bit dangerous because it transforms compile-time errors into run-time errors or might silently change run-time behavior on changes.

from kafka-streams-scala.

debasishg avatar debasishg commented on July 26, 2024

I agree regarding the transformation of compile time errors into run time errors. And as I already mentioned, it's a dress up over the null check pattern of implicits.

But it is possible to provide an opt-in mechanism with this technique as well .. Just an example I was trying out ..

The API implementation ..

def groupByKey(implicit serialized: Perhaps[Serialized[K, V]]): KGroupedStreamS[K, V] =
    serialized.fold[KGroupedStreamS[K, V]] { inner.groupByKey } { implicit ev => inner.groupByKey(ev) }

an implicit to convert the serdes into a Serialized ..

implicit def SerializedFromSerde[K,V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Serialized[K,V] = 
    Serialized.`with`(keySerde, valueSerde)

and we can have the API usage with ..

implicit val stringSerde: Serde[String] = Serdes.String()
implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]

or ..

implicit val serialized = Serialized.`with`(stringSerde, longSerde)

or if we don't have the implicits in scope, the config values will be used, since in the above implementation inner.groupByKey will be called.

from kafka-streams-scala.

debasishg avatar debasishg commented on July 26, 2024

I was also playing around with your suggestion ..

Here's the implicit for Serialized ..

implicit def configSerialized[K,V]: Serialized[K,V] = new ConfigSerialized[K,V]()

private[streams] class ConfigSerialized[K,V]() extends Serialized[K,V](Serialized.`with`(null, null)) {}

Here's the API implementation ..

def groupByKey(implicit serialized: Serialized[K, V]): KGroupedStreamS[K, V] =
  if (serialized.isInstanceOf[ConfigSerialized[K, V]]) inner.groupByKey
  else inner.groupByKey(serialized)

yeah .. that isInstanceOf (sort of) subverts the type system ..

Now the question is how do I include the implicit from serde to Serialized .. The moment I include the following in the ImplicitConversions object ..

implicit def SerializedFromSerde[K,V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Serialized[K,V] = 
  Serialized.`with`(keySerde, valueSerde)

I get ambiguous implicits in lookup ..

from kafka-streams-scala.

debasishg avatar debasishg commented on July 26, 2024

I have a PR on this, up for review ..

from kafka-streams-scala.

debasishg avatar debasishg commented on July 26, 2024

Implicit Serdes now in master - closing this issue.

from kafka-streams-scala.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.