Giter VIP home page Giter VIP logo

fs2-kafka's People

Contributors

aartigao avatar agustafson avatar atnoya avatar backuitist avatar bastewart avatar benfradet avatar biochimia avatar bplommer avatar chenharryhua avatar cremboc avatar cwholmes avatar daenyth avatar danxmoran avatar dmedser avatar geirolz avatar jacktreble avatar janstenpickle avatar jbwheatley avatar joceron avatar jopecko avatar keirlawson avatar l7r7 avatar lmnet avatar mergify[bot] avatar mvalle avatar nasadorian avatar peter-hazell avatar scala-steward avatar sergio-margale avatar vlovgr avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fs2-kafka's Issues

Better document the backpressure and `pollInterval`

The documentation says that the (consumer) stream is backpressured. But after a quick look at the code base I cannot easily find such evidence. This makes it hard to set the pollInterval setting.

As far as I understand there are 3 pieces at work in the consumer stream:

  1. the poll loop -- responsible for throttling the poll requests (the poll queue is bounded with 1 element)
  2. the consumer loop (KafkaConsumerActor) -- handles both fetch requests issued by the stream as well as poll requests by the poll loop, using a State variable that accumulates fetch requests along with fetched kafka records.
  3. the stream -- emits fetch requests on-demand (pull based)

At first it seems that the consumer loop simply accumulates endlessly records in the State.records. But then I came across the poll function in the consumer loop:

val resume = (requested intersect assigned) diff available
val pause = assigned diff resume

consumer.pause(pause.asJava)
consumer.resume(resume.asJava)

This seems like unless a fetch request is present no polling will be performed, in effect providing backpressure?

Perhaps some high-level documentation of the code-base would be useful?

My understanding is that pollInterval is only there to reduce the pressure on Kafka, if the record consumption rate (the logic that consumes the stream) exceeds the production rate (the kafka producer, up-stream)?

Provide functions for creating KafkaProducer test instances

Similarly to #234, it would be useful to provide functions for creating test KafkaProducers.

A good first function would be one which yields somewhat sensible default RecordMetadata.

object KafkaProducer {
  def unit[F[_], K, V](implicit F: Sync[F]): F[KafkaProducer[F, K, V]] = ???
}

Likely, this would require some internal state, hence F[KafkaProducer[F, K, V]].

Question: equivalent implementation of Synchronized class?

I am studying the implementation of Synchronized class in fs2-kafka: https://github.com/ovotech/fs2-kafka/blob/master/modules/core/src/main/scala/fs2/kafka/internal/Synchronized.scala#L43

I come up with an alternative but I am not quite sure if it is equivalent to fs2-kafka:

def of[F[_], A](a: A)(implicit F: Concurrent[F]): F[Synchronized[F, A]] =
    MVar[F].of[A](a).map(mv =>
        new Synchronized[F, A] {
          override def use[B](f: A => F[B]): F[B] = F.bracket(mv.take)(f)(v => mv.put(v))
        })

in case it is equivalent, what's the benefit of using Deferred + Ref?
thanks a lot.

Example in readme doesn't seem to work

Hi, I'm trying to move over to this kafka client since fs2-kafka-client is deprecated.

I tried it in one of my apps and found that the actor seems to pausing the subscription to the topic, so I had a go with a cut down version of the snippet in the readme with embedded kafka:

package test

import java.util.concurrent.ForkJoinPool

import cats.data.NonEmptyList
import cats.effect.{ContextShift, IO, Timer}
import fs2.kafka._
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.common.serialization.{BytesDeserializer, StringSerializer}
import org.apache.kafka.common.utils.Bytes
import org.scalatest.FunSuite

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}

class KafkaSubscriberSuite extends FunSuite with EmbeddedKafka {
  val topic = "test"

  implicit val deser = new BytesDeserializer()
  implicit val ser = new StringSerializer()

  implicit val ec: ExecutionContextExecutor =
    ExecutionContext.fromExecutor(new ForkJoinPool(4))

  implicit val contextShift: ContextShift[IO] = IO.contextShift(ec)
  implicit val timer: Timer[IO] = IO.timer(ec)

  implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig()

  test("simple") {
    withRunningKafka {
      publishToKafka(topic, "test")

      val consumerSettings = (executionContext: ExecutionContext) =>
        ConsumerSettings[Bytes, Bytes](
          keyDeserializer = implicitly,
          valueDeserializer = implicitly,
          executionContext = executionContext
        ).withAutoOffsetReset(AutoOffsetReset.Earliest)
          .withBootstrapServers("localhost:6001")
          .withGroupId("group")

      val stream =
        for {
          executionContext <- consumerExecutionContextStream[IO]
          consumer <- consumerStream[IO].using(consumerSettings(executionContext))
          _ <- consumer.subscribe(NonEmptyList.of(topic))
          _ <- consumer.stream.map(_.committableOffset).to(commitBatchWithin[IO](500, 15.seconds))
        } yield ()

      stream.compile.drain.unsafeRunSync()

      assert(true)
    }
  }
}

I'm using version 0.17.1 of this lib and embedded kafka 2.0.0.

StackOverflowError in KafkaConsumerActor

I have a very simple consumer which runs into an StackOverflowError after some time (fs2-kafka 0.16.2). After increasing -XX:MaxJavaStackTraceDepth I located it in KafkaConsumerActor.scala:223.

Exception in thread "fs2-kafka-consumer-20" java.lang.StackOverflowError
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:505)
	[...]
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:507)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:507)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:507)
	at scala.collection.MapLike$$anon$1.hasNext(MapLike.scala:186)
	at scala.collection.Iterator.foreach(Iterator.scala:937)
	at scala.collection.Iterator.foreach$(Iterator.scala:937)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
	at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:177)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
	at scala.collection.TraversableOnce.$div$colon(TraversableOnce.scala:150)
	at scala.collection.TraversableOnce.$div$colon$(TraversableOnce.scala:150)
	at scala.collection.AbstractTraversable.$div$colon(Traversable.scala:104)
	at scala.collection.generic.Subtractable.$minus$minus(Subtractable.scala:59)
	at scala.collection.generic.Subtractable.$minus$minus$(Subtractable.scala:59)
	at scala.collection.AbstractSet.$minus$minus(Set.scala:47)
	at scala.collection.SetLike.diff(SetLike.scala:179)
	at scala.collection.SetLike.diff$(SetLike.scala:179)
	at scala.collection.AbstractSet.diff(Set.scala:47)
	at fs2.kafka.KafkaConsumerActor.$anonfun$poll$2(KafkaConsumerActor.scala:223)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:87)
	at cats.effect.internals.IORunLoop$.startCancelable(IORunLoop.scala:41)
	at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:86)
	at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:70)
	at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:36)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:93)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:93)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:93)
	at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
	at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:44)
	at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:72)
	at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:52)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:136)
	at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:345)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:366)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:312)
	at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Consumer:

val consumerSettings = (executionContext: ExecutionContext) =>
    ConsumerSettings(
      keyDeserializer = new StringDeserializer,
      valueDeserializer = circeJsonDeserializer[Measurement],
      executionContext = executionContext
    )
      .withAutoOffsetReset(AutoOffsetReset.Latest)
      .withAutoCommitInterval(10.seconds)
      .withBootstrapServers("localhost:9092")
      .withGroupId("group")

val topics = NonEmptyList.one("test")

val stream =
        for {
          executionContext <- consumerExecutionContextStream[IO]
          consumer <- consumerStream[IO].using(consumerSettings(executionContext))
          _ <- consumer.subscribe(topics)
          _ <- consumer.stream
            .evalTap[IO](message =>
              processRecord(List(message.record), service)
            )
        } yield ()

stream.compile.drain.as(ExitCode.Success)

Possibility of blocking/greedy rebalance handlers

#117 was a nice improvement over previous behavior in the sense that commit commands aren't lost anymore, but there is another issue that can't be controlled in any way right now: for consumers that want exactly-once [ish] with batching (e.g. groupWIthin) it's possible to consume the same record more than once due to rebalances.
Consider a 2-consumer group with members A and B, where only A is active right now. Say there is a record R, which gets consumed by A and at the beginning of a 5-second window, and then within this 5-second window a rebalance happens. Now, if R belonged to a partition that was reassigned to B, and if A didn't commit R yet, A's commit will be deferred until after rebalance completes, which means A will commit R and B will consume R and then commit it too, which means double consumption w/o possibility to control this
One solution I can see is having an option to wait for the completion of processing of all currently consumed records in revocation handler, i.e. don't revoke until we completely handled everything we've read up to "now" and then proceed normally.
WDYT?

Scala 2.13 build

Cats and cats effects start to support 2.13. is it possible to provide a preview version for scala 2.13?

Document safe usage conditions of `commitBatchWithin`

CommittableOffsetBatch#commit clearly explains some requirements for safe usage in its scaladoc:

/**
* Commits the [[offsets]] to Kafka in a single commit.
* For the batch to be valid and for commit to succeed,
* the following conditions must hold:<br>
* - [[consumerGroupIdsMissing]] must be false, and<br>
* - [[consumerGroupIds]] must have exactly one ID.<br>
* <br>
* If one of the conditions above do not hold, there will
* be a [[ConsumerGroupException]] exception raised and a
* commit will not be attempted. If [[offsets]] is empty
* then these conditions do not need to hold, as there
* is nothing to commit.
*/
def commit: F[Unit]

It would be good to document these also in fs2.kafka.commitBatchWithin, which calls CommittableOffsetBatch#commit but does not document the failure conditions:

/**
* Commits offsets in batches of every `n` offsets or time window
* of length `d`, whichever happens first. If there are no offsets
* to commit within a time window, no attempt will be made to commit
* offsets for that time window.
*/
def commitBatchWithin[F[_]](n: Int, d: FiniteDuration)(
implicit F: Concurrent[F],
timer: Timer[F]
): Pipe[F, CommittableOffset[F], Unit] =
_.groupWithin(n, d).evalMap(CommittableOffsetBatch.fromFoldable(_).commit)

I experienced an app failing with error fs2.kafka.ConsumerGroupException: multiple or missing consumer group ids [topic_foo_id, topic_bar_id]. After some investigation, I narrowed the issue down: the events consumed from two topics were being merged into a single stream, processed, and offsets committed with a call to fs2.kafka.commitBatchWithin. The issue is slightly vicious because the code was able to run for a while before multiple events from different topics happened to end up in the same batch and blew up at runtime. The solution settled on was to process the consuming streams separately using .parJoinUnbounded instead.

(there is also a "Rolls-Royce" solution of making commitBatchWithin able to separate and batch commits by topic, but I think warnings in the docs are a good start ๐Ÿ˜„)

If it sounds OK I could raise a small PR to append the commit scaladoc (starting from "For the batch to be valid...") onto the end of commitBatchWithin.

Unsubscribe from specific topic

I have multiple nodes dedicated to producing to topics that follow the form region-{n}-{m}. I'm subscribing to specific regions based on my application's state, but I want it to be able to unsubscribe from specific regions as well.

I'm entirely new to Kafka, so if I should take a different approach to this than topics I'm all ears.

Dependency Issue

Using Java 2.12.8 I'm unable to build the latest milestone release. Looks like maybe the kafka-avro-serializer is no longer available, and needs to be bumped to 5.3.0?

[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: UNRESOLVED DEPENDENCIES ::
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: io.confluent#kafka-avro-serializer;5.2.2: not found

[warn] Note: Unresolved dependencies path:
[warn] io.confluent:kafka-avro-serializer:5.2.2
[warn] +- com.ovoenergy:fs2-kafka-vulcan_2.12:0.20.0-M2

Duplicated messages when executing more than once

Hey, I observed weird behaviour which is as follows: when topic has more than 500 messages and we read until particular offset is reached we get different number of messages in 2nd and next runs. Here is a code sample that explains the behaviour:

import cats.effect.{ExitCode, IO, IOApp}
import fs2.kafka._
import org.apache.kafka.clients.consumer.ConsumerRecord
import scala.concurrent.duration._
object Main {
  def main(args: Array[String]): Unit = {

    val ec = scala.concurrent.ExecutionContext.Implicits.global
    implicit val x = cats.effect.IO.contextShift(ec)
    implicit val y = cats.effect.IO.timer(ec)
    val consumerSettings =
      ConsumerSettings[String, String]
        .withAutoOffsetReset(AutoOffsetReset.Earliest)
        .withBootstrapServers("localhost:9092")
        .withGroupId("group")

    val stream =
      consumerStream[IO]
        .using(consumerSettings)
        .evalTap(_.subscribeTo("topic"))
        .flatMap(_.stream)
        .takeWhile(_.committableOffset.offsetAndMetadata.offset() <= 520)
        .compile.toList.map(x => println(x.size))

    stream.unsafeRunSync()
    stream.unsafeRunSync()
    stream.unsafeRunSync()
  }
}

On my machine it prints

520
1020
1020

So on first run it runs correctly but later just breaks.

KafkaAdminClient "should support all defined functionality" test failure

I hit this while running tests for (hopefully unrelated) changes locally. The full failure message was:

should support all defined functionality *** FAILED *** (8 seconds, 80 milliseconds)
[info]   Set("topic-93585a45-70e2-44da-a341-e849f27b3b65", "topic-d17286b2-9fb6-4918-b5cf-422716f58a75", "topic-83120f48-462c-48a2-aaf9-cf938f0d6370", "topic-ff115c87-4e60-4d4c-b4cf-0060acaf898a") had size 4 instead of expected size 1 (KafkaAdminClientSpec.scala:60)

I can reproduce it consistently on my laptop. Will try to track it down.

Should commit batch functions sort offsets?

This is more a question than an issue, as it might work as intended. We are experiences some difficulties when doing grouping per key and then trying to commit the offsets, as when collecting the offsets they are potentially out of order and the commit functions will commit the last offset of the TopicPartition and not the one most forward.

We might sort them out again but that would be quite costly as the commit batch functions are folding on the CommittableOffsets.

Is this behaviour a design decision? What would be a good pattern to work this way if it is? (I was experimenting with a container that will have the entities being transformed and the batch of committable offsets but it was getting a bit unwieldy when doing mapAsync

I would be happy to work on a fix if it is the best course of action.

This is the code I was working with:

import cats.effect._
import cats.syntax.all._
import cats.instances.list._
import fs2.kafka._
import fs2._

import scala.concurrent.duration._
import scala.collection.immutable.List

object GroupedTopicReaderApp extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val consumerSettings =
      ConsumerSettings[String, String]
        .withAutoOffsetReset(AutoOffsetReset.Earliest)
        .withBootstrapServers("localhost:9092")
        .withGroupId("grouped-reader")
          .withAutoCommitInterval(Integer.MAX_VALUE.nanos)

    consumerStream[IO].using(consumerSettings)
      .evalTap(_.subscribeTo("test"))
      .flatMap(processAndCommit)
      .compile.drain *> IO.pure(ExitCode.Success)
  }

  private val processGroupedMsgs: ((String, List[CommittableMessage[IO, String, String]])) => IO[List[CommittableMessage[IO, String, String]]] =
  { case (_, messages) =>
    IO.delay(println(s"Message count: ${messages.size}")) *>
        messages.map(msg => IO(println(s"${msg.record.key} -> ${msg.record.value}"))).sequence  *>
          IO.pure(messages)
  }

  private def commitMsgs: Chunk[CommittableMessage[IO, String, String]] => Stream[IO, Unit] = { chunk =>
    Stream.chunk(chunk.map(_.committableOffset)).covary[IO].map(IO.pure).through(commitBatchF)
  }

  private val processAndCommit: KafkaConsumer[IO, String, String] => Stream[IO, Unit] = { consumer =>
    consumer.stream.chunks
      .flatMap(c => Stream(c.toList.groupBy(_.record.key).to[List]: _*))
      .mapAsync(1)(processGroupedMsgs)
      .chunks.map(_.flatMap(l => Chunk(l: _*))).flatMap(commitMsgs)
  }
}

Possibly leaking org.apache.kafka.clients.consumer.KafkaConsumer instance

When checking fs2-kafka stream behavior during errors, I noticed following warning in logs

2019-10-08 23:25:58,472 | WARN | o.a.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=client_id at java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)

I managed to reproduce error in following test case (I ran it from KafkaConsumerSpec):

it("should restart properly on error") {
      withKafka { (config, topic) =>
        createCustomTopic(topic, partitions = 3)
        val produced = (0 until 1).map(n => s"key-$n" -> s"value->$n")
        publishToKafka(topic, produced)

        val conf = consumerSettings[IO](config)

        consumerStream[IO]
          .using(conf.withClientId("client_id"))
          .evalTap(_.subscribeTo(topic))
          .flatMap(_.stream)
          .flatMap(Stream.emit(_) >> Stream.raiseError[IO](new RuntimeException))
          .attempts(Stream.emit(1.second))
          .compile
          .toVector
          .unsafeRunSync
      }
    }

Enable logging and check logs. Notice that I have changed client.id property to a specific value. Also I checked out code tagged with "v0.20.1".

Setup:
Java: 11.0.4
Scala: 2.12.8
sbt:1.3.0
fs2-kafka: 0.20.1
cats: 2.0.0
cats-effect: 2.0.0

Let me know if you need more info.

Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord

I am trying to produce a key value pair of A, List[B] to kafka using avro, vulcan and a schema registry.

I am creating a serializer for list[B] like so

avroSerializer[List[B]](Codec.List[B](bCodec)).using(SchemaRegistrySettings(conf)

but at runtime I get

Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord

and when I create a breakpoint for that exception it breaks in package io.confluent.kafka.serializers.AvroSchemaUtils.getSchema

and the method looks like this which does not seem to support array

public static Schema getSchema(Object object) {
        if (object == null) {
            return (Schema)primitiveSchemas.get("Null");
        } else if (object instanceof Boolean) {
            return (Schema)primitiveSchemas.get("Boolean");
        } else if (object instanceof Integer) {
            return (Schema)primitiveSchemas.get("Integer");
        } else if (object instanceof Long) {
            return (Schema)primitiveSchemas.get("Long");
        } else if (object instanceof Float) {
            return (Schema)primitiveSchemas.get("Float");
        } else if (object instanceof Double) {
            return (Schema)primitiveSchemas.get("Double");
        } else if (object instanceof CharSequence) {
            return (Schema)primitiveSchemas.get("String");
        } else if (object instanceof byte[]) {
            return (Schema)primitiveSchemas.get("Bytes");
        } else if (object instanceof GenericContainer) {
            return ((GenericContainer)object).getSchema();
        } else {
            throw new IllegalArgumentException("Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord");
        }
    }

Am I doing something wrong? I wonder if I am having a dependency conflict and something is getting evicted but when I look at evicted in sbt everything is getting evicted for newer versions at least

Potentially misleading `maxCacheSize`

Afaik the cache size parameter that is passed in to the constructor of CachedSchemaRegistryClient is not actually a maximum size for the cache itself. Rather it is the max number of schemas that can be registered for each subject within the cache (see here for more). So if it is set to 1000, then the actual max cache size is the number of subjects multiplied by 1000.

If this is the case, would a different name for this option make sense? And is 1000 too large of a default?

This is definitely not a big issue, just wanted to see what others thought ๐Ÿ‘๐Ÿผ

KafkaConsumer#stream "should handle rebalance" test failure

I hit this while running tests for (hopefully unrelated) changes locally. The full failure message was:

should handle rebalance *** FAILED *** (5 minutes, 11 seconds)
[info]   200 equaled 200, but Map("key-198" -> 1, "key-78" -> 3, "key-89" -> 2, "key-105" -> 1, "key-123" -> 1, "key-93" -> 2, "key-112" -> 2, "key-8" -> 2, "key-44" -> 2, "key-74" -> 2, "key-0" -> 2, "key-116" -> 2, "key-85" -> 3, "key-169" -> 1, "key-187" -> 1, "key-152" -> 1, "key-119" -> 1, "key-108" -> 1, "key-27" -> 3, "key-22" -> 2, "key-176" -> 1, "key-11" -> 2, "key-59" -> 2, "key-49" -> 3, "key-163" -> 1, "key-43" -> 2, "key-166" -> 2, "key-38" -> 3, "key-100" -> 1, "key-71" -> 3, "key-170" -> 1, "key-127" -> 1, "key-138" -> 1, "key-155" -> 1, "key-96" -> 2, "key-82" -> 2, "key-181" -> 1, "key-32" -> 2, "key-16" -> 2, "key-193" -> 1, "key-192" -> 2, "key-159" -> 2, "key-134" -> 2, "key-148" -> 2, "key-144" -> 1, "key-62" -> 2, "key-5" -> 3, "key-67" -> 3, "key-51" -> 2, "key-111" -> 1, "key-188" -> 2, "key-177" -> 2, "key-196" -> 1, "key-115" -> 1, "key-79" -> 3, "key-42" -> 2, "key-104" -> 1, "key-31" -> 2, "key-122" -> 1, "key-56" -> 2, "key-28" -> 2, "key-58" -> 3, "key-10" -> 3, "key-75" -> 2, "key-109" -> 1, "key-69" -> 2, "key-97" -> 2, "key-185" -> 1, "key-21" -> 2, "key-17" -> 2, "key-174" -> 1, "key-92" -> 3, "key-199" -> 2, "key-86" -> 2, "key-110" -> 1, "key-81" -> 2, "key-70" -> 3, "key-158" -> 1, "key-143" -> 1, "key-126" -> 1, "key-39" -> 3, "key-9" -> 2, "key-180" -> 1, "key-147" -> 1, "key-63" -> 3, "key-137" -> 1, "key-121" -> 2, "key-68" -> 2, "key-133" -> 2, "key-165" -> 1, "key-191" -> 1, "key-46" -> 3, "key-57" -> 2, "key-35" -> 2, "key-2" -> 3, "key-154" -> 1, "key-52" -> 2, "key-4" -> 2, "key-146" -> 1, "key-106" -> 1, "key-157" -> 1, "key-94" -> 2, "key-25" -> 3, "key-178" -> 2, "key-24" -> 3, "key-140" -> 2, "key-30" -> 3, "key-14" -> 2, "key-195" -> 1, "key-72" -> 2, "key-87" -> 2, "key-161" -> 1, "key-168" -> 1, "key-189" -> 2, "key-150" -> 2, "key-41" -> 3, "key-117" -> 1, "key-153" -> 1, "key-91" -> 2, "key-13" -> 2, "key-173" -> 1, "key-76" -> 3, "key-114" -> 1, "key-131" -> 1, "key-164" -> 1, "key-184" -> 2, "key-103" -> 1, "key-120" -> 1, "key-64" -> 2, "key-125" -> 2, "key-65" -> 2, "key-54" -> 2, "key-80" -> 2, "key-47" -> 2, "key-53" -> 2, "key-190" -> 2, "key-160" -> 1, "key-132" -> 1, "key-139" -> 1, "key-142" -> 2, "key-136" -> 1, "key-1" -> 2, "key-3" -> 3, "key-29" -> 2, "key-98" -> 3, "key-18" -> 3, "key-34" -> 3, "key-20" -> 2, "key-36" -> 2, "key-83" -> 2, "key-45" -> 2, "key-128" -> 2, "key-95" -> 3, "key-40" -> 2, "key-183" -> 1, "key-7" -> 2, "key-15" -> 2, "key-130" -> 1, "key-88" -> 2, "key-26" -> 2, "key-118" -> 2, "key-141" -> 2, "key-77" -> 3, "key-107" -> 1, "key-102" -> 1, "key-182" -> 2, "key-23" -> 3, "key-162" -> 2, "key-90" -> 2, "key-60" -> 2, "key-151" -> 1, "key-12" -> 3, "key-171" -> 1, "key-113" -> 2, "key-194" -> 1, "key-33" -> 3, "key-55" -> 2, "key-48" -> 3, "key-135" -> 1, "key-156" -> 1, "key-197" -> 1, "key-66" -> 3, "key-101" -> 2, "key-175" -> 2, "key-37" -> 2, "key-124" -> 1, "key-167" -> 1, "key-99" -> 2, "key-50" -> 3, "key-129" -> 2, "key-172" -> 1, "key-149" -> 1, "key-19" -> 2, "key-61" -> 2, "key-6" -> 3, "key-84" -> 2, "key-186" -> 2, "key-179" -> 1, "key-145" -> 1, "key-73" -> 3) did not equal Map("key-198" -> 1, "key-78" -> 2, "key-89" -> 2, "key-105" -> 1, "key-123" -> 1, "key-93" -> 2, "key-112" -> 1, "key-8" -> 2, "key-44" -> 2, "key-74" -> 2, "key-0" -> 2, "key-116" -> 1, "key-85" -> 2, "key-169" -> 1, "key-187" -> 1, "key-152" -> 1, "key-119" -> 1, "key-108" -> 1, "key-27" -> 2, "key-22" -> 2, "key-176" -> 1, "key-11" -> 2, "key-59" -> 2, "key-49" -> 2, "key-163" -> 1, "key-43" -> 2, "key-166" -> 1, "key-38" -> 2, "key-100" -> 1, "key-71" -> 2, "key-170" -> 1, "key-127" -> 1, "key-138" -> 1, "key-155" -> 1, "key-96" -> 2, "key-82" -> 2, "key-181" -> 1, "key-32" -> 2, "key-16" -> 2, "key-193" -> 1, "key-192" -> 1, "key-159" -> 1, "key-134" -> 1, "key-148" -> 1, "key-144" -> 1, "key-62" -> 2, "key-5" -> 2, "key-67" -> 2, "key-51" -> 2, "key-111" -> 1, "key-188" -> 1, "key-177" -> 1, "key-196" -> 1, "key-115" -> 1, "key-79" -> 2, "key-42" -> 2, "key-104" -> 1, "key-31" -> 2, "key-122" -> 1, "key-56" -> 2, "key-28" -> 2, "key-58" -> 2, "key-10" -> 2, "key-75" -> 2, "key-109" -> 1, "key-69" -> 2, "key-97" -> 2, "key-185" -> 1, "key-21" -> 2, "key-17" -> 2, "key-174" -> 1, "key-92" -> 2, "key-199" -> 1, "key-86" -> 2, "key-110" -> 1, "key-81" -> 2, "key-70" -> 2, "key-158" -> 1, "key-143" -> 1, "key-126" -> 1, "key-39" -> 2, "key-9" -> 2, "key-180" -> 1, "key-147" -> 1, "key-63" -> 2, "key-137" -> 1, "key-121" -> 1, "key-68" -> 2, "key-133" -> 1, "key-165" -> 1, "key-191" -> 1, "key-46" -> 2, "key-57" -> 2, "key-35" -> 2, "key-2" -> 2, "key-154" -> 1, "key-52" -> 2, "key-4" -> 2, "key-146" -> 1, "key-106" -> 1, "key-157" -> 1, "key-94" -> 2, "key-25" -> 2, "key-178" -> 1, "key-24" -> 2, "key-140" -> 1, "key-30" -> 2, "key-14" -> 2, "key-195" -> 1, "key-72" -> 2, "key-87" -> 2, "key-161" -> 1, "key-168" -> 1, "key-189" -> 1, "key-150" -> 1, "key-41" -> 2, "key-117" -> 1, "key-153" -> 1, "key-91" -> 2, "key-13" -> 2, "key-173" -> 1, "key-76" -> 2, "key-114" -> 1, "key-131" -> 1, "key-164" -> 1, "key-184" -> 1, "key-103" -> 1, "key-120" -> 1, "key-64" -> 2, "key-125" -> 1, "key-65" -> 2, "key-54" -> 2, "key-80" -> 2, "key-47" -> 2, "key-53" -> 2, "key-190" -> 1, "key-160" -> 1, "key-132" -> 1, "key-139" -> 1, "key-142" -> 1, "key-136" -> 1, "key-1" -> 2, "key-3" -> 2, "key-29" -> 2, "key-98" -> 2, "key-18" -> 2, "key-34" -> 2, "key-20" -> 2, "key-36" -> 2, "key-83" -> 2, "key-45" -> 2, "key-128" -> 1, "key-95" -> 2, "key-40" -> 2, "key-183" -> 1, "key-7" -> 2, "key-15" -> 2, "key-130" -> 1, "key-88" -> 2, "key-26" -> 2, "key-118" -> 1, "key-141" -> 1, "key-77" -> 2, "key-107" -> 1, "key-102" -> 1, "key-182" -> 1, "key-23" -> 2, "key-162" -> 1, "key-90" -> 2, "key-60" -> 2, "key-151" -> 1, "key-12" -> 2, "key-171" -> 1, "key-113" -> 1, "key-194" -> 1, "key-33" -> 2, "key-55" -> 2, "key-48" -> 2, "key-135" -> 1, "key-156" -> 1, "key-197" -> 1, "key-66" -> 2, "key-101" -> 1, "key-175" -> 1, "key-37" -> 2, "key-124" -> 1, "key-167" -> 1, "key-99" -> 2, "key-50" -> 2, "key-129" -> 1, "key-172" -> 1, "key-149" -> 1, "key-19" -> 2, "key-61" -> 2, "key-6" -> 2, "key-84" -> 2, "key-186" -> 1, "key-179" -> 1, "key-145" -> 1, "key-73" -> 2) (KafkaConsumerSpec.scala:81)

Luckily the keys were ordered, so a diff was useful:

3c3
<   "key-78" -> 3,
---
>   "key-78" -> 2,
8c8
<   "key-112" -> 2,
---
>   "key-112" -> 1,
13,14c13,14
<   "key-116" -> 2,
<   "key-85" -> 3,
---
>   "key-116" -> 1,
>   "key-85" -> 2,
20c20
<   "key-27" -> 3,
---
>   "key-27" -> 2,
25c25
<   "key-49" -> 3,
---
>   "key-49" -> 2,
28,29c28,29
<   "key-166" -> 2,
<   "key-38" -> 3,
---
>   "key-166" -> 1,
>   "key-38" -> 2,
31c31
<   "key-71" -> 3,
---
>   "key-71" -> 2,
42,45c42,45
<   "key-192" -> 2,
<   "key-159" -> 2,
<   "key-134" -> 2,
<   "key-148" -> 2,
---
>   "key-192" -> 1,
>   "key-159" -> 1,
>   "key-134" -> 1,
>   "key-148" -> 1,
48,49c48,49
<   "key-5" -> 3,
<   "key-67" -> 3,
---
>   "key-5" -> 2,
>   "key-67" -> 2,
52,53c52,53
<   "key-188" -> 2,
<   "key-177" -> 2,
---
>   "key-188" -> 1,
>   "key-177" -> 1,
56c56
<   "key-79" -> 3,
---
>   "key-79" -> 2,
63,64c63,64
<   "key-58" -> 3,
<   "key-10" -> 3,
---
>   "key-58" -> 2,
>   "key-10" -> 2,
73,74c73,74
<   "key-92" -> 3,
<   "key-199" -> 2,
---
>   "key-92" -> 2,
>   "key-199" -> 1,
78c78
<   "key-70" -> 3,
---
>   "key-70" -> 2,
82c82
<   "key-39" -> 3,
---
>   "key-39" -> 2,
86c86
<   "key-63" -> 3,
---
>   "key-63" -> 2,
88c88
<   "key-121" -> 2,
---
>   "key-121" -> 1,
90c90
<   "key-133" -> 2,
---
>   "key-133" -> 1,
93c93
<   "key-46" -> 3,
---
>   "key-46" -> 2,
96c96
<   "key-2" -> 3,
---
>   "key-2" -> 2,
104,108c104,108
<   "key-25" -> 3,
<   "key-178" -> 2,
<   "key-24" -> 3,
<   "key-140" -> 2,
<   "key-30" -> 3,
---
>   "key-25" -> 2,
>   "key-178" -> 1,
>   "key-24" -> 2,
>   "key-140" -> 1,
>   "key-30" -> 2,
115,117c115,117
<   "key-189" -> 2,
<   "key-150" -> 2,
<   "key-41" -> 3,
---
>   "key-189" -> 1,
>   "key-150" -> 1,
>   "key-41" -> 2,
123c123
<   "key-76" -> 3,
---
>   "key-76" -> 2,
127c127
<   "key-184" -> 2,
---
>   "key-184" -> 1,
131c131
<   "key-125" -> 2,
---
>   "key-125" -> 1,
137c137
<   "key-190" -> 2,
---
>   "key-190" -> 1,
141c141
<   "key-142" -> 2,
---
>   "key-142" -> 1,
144c144
<   "key-3" -> 3,
---
>   "key-3" -> 2,
146,148c146,148
<   "key-98" -> 3,
<   "key-18" -> 3,
<   "key-34" -> 3,
---
>   "key-98" -> 2,
>   "key-18" -> 2,
>   "key-34" -> 2,
153,154c153,154
<   "key-128" -> 2,
<   "key-95" -> 3,
---
>   "key-128" -> 1,
>   "key-95" -> 2,
162,164c162,164
<   "key-118" -> 2,
<   "key-141" -> 2,
<   "key-77" -> 3,
---
>   "key-118" -> 1,
>   "key-141" -> 1,
>   "key-77" -> 2,
167,169c167,169
<   "key-182" -> 2,
<   "key-23" -> 3,
<   "key-162" -> 2,
---
>   "key-182" -> 1,
>   "key-23" -> 2,
>   "key-162" -> 1,
173c173
<   "key-12" -> 3,
---
>   "key-12" -> 2,
175c175
<   "key-113" -> 2,
---
>   "key-113" -> 1,
177c177
<   "key-33" -> 3,
---
>   "key-33" -> 2,
179c179
<   "key-48" -> 3,
---
>   "key-48" -> 2,
183,185c183,185
<   "key-66" -> 3,
<   "key-101" -> 2,
<   "key-175" -> 2,
---
>   "key-66" -> 2,
>   "key-101" -> 1,
>   "key-175" -> 1,
190,191c190,191
<   "key-50" -> 3,
<   "key-129" -> 2,
---
>   "key-50" -> 2,
>   "key-129" -> 1,
196c196
<   "key-6" -> 3,
---
>   "key-6" -> 2,
198c198
<   "key-186" -> 2,
---
>   "key-186" -> 1,
201c201
<   "key-73" -> 3
---
>   "key-73" -> 2

It's not consistently reproducible, so there must be a race condition somewhere?

Support for transactions

Kafka has special APIs to support transactional read-process-write message flows. The gist seems to be:

  1. Producers get an extra piece of config transactional.id, and call initTransactions() on startup
  2. Committing offsets runs through the producer, which surrounds each batch with begin-/commit-transaction calls

What would be the best way to add this functionality to fs2-kafka? I was looking through Alpakka for reference and it looks like they separate "normal" and transactional types for both producers and consumed messages. I like the idea of using types to force the separation of use-cases, but I'm not sure how much of a maintenance burden it'd create for this project.

My team has a need for this functionality, so I'd be happy to help implement the new features.

Expose additional consumer functionality

The following functions could be added to KafkaConsumer to expose additional functionality provided by the Java Kafka consumer. Note that already implemented functions at the time of writing are not included. Also, functions for comitting offsets and pausing/resuming partitions are not included.

  • assign(Set[TopicPartition]): F[Unit]
  • committed(TopicPartition): F[OffsetAndMetadata]
  • committed(TopicPartition, FiniteDuration): F[OffsetAndMetadata]
  • listTopics: F[Map[String, List[PartitionInfo]]]
  • listTopics(FiniteDuration): F[Map[String, List[PartitionInfo]]]
  • metrics: F[Map[MetricName, Metric]]
  • offsetsForTimes(Map[TopicPartition, Long]): F[Map[TopicPartition, OffsetAndTimestamp]]
  • offsetsForTimes(Map[TopicPartition, Long], FiniteDuration): F[Map[...]]
  • partitionsFor(String): F[List[PartitionInfo]]
  • partitionsFor(String, FiniteDuration): F[List[PartitionInfo]]
  • paused: F[Set[TopicPartition]]
  • subscribe(Regex, OnRebalance): F[Unit]
  • subscribe[G[_]: Reducible](G[String], OnRebalance): F[Unit]
  • subscription: F[Set[String]]
  • unsubscribe: F[Unit]

Consumer key deserializer throw exception when key is null

When I start consuming from kafka topic, if the messages don't have any key, the consumer throw exception.
Kafka consumer (java api) don't matter if messages don't have key. Is it possible to these messages in fs2-kafka?

object ConsumerExample extends IOApp {

  import cats.effect._
  import cats.implicits._
  import fs2.kafka._

  val settings =
    ConsumerSettings[IO, String, String](
      keyDeserializer = Deserializer[IO, String],
      valueDeserializer = Deserializer[IO, String]
    ).withAutoOffsetReset(AutoOffsetReset.Earliest)
      .withBootstrapServers("kafka:9092")
      .withGroupId("group")


  def run(args: List[String]): IO[ExitCode] = {
    val stream: fs2.Stream[IO, CommittableConsumerRecord[IO, String, String]] =
      consumerStream(settings)
       .evalTap(_.subscribeTo("queue1"))
        .flatMap(_.stream)


    stream.evalMap(s => IO(println(s))).compile.drain.as(ExitCode.Success)
  }
}
java.lang.NullPointerException
	at java.lang.String.<init>(String.java:515)
	at fs2.kafka.Deserializer$.$anonfun$string$1(Deserializer.scala:218)
	at fs2.kafka.Deserializer$.$anonfun$lift$1(Deserializer.scala:194)
	at fs2.kafka.Deserializer$$anon$1.deserialize(Deserializer.scala:143)
	at fs2.kafka.ConsumerRecord$.deserializeFromBytes(ConsumerRecord.scala:176)
	at fs2.kafka.ConsumerRecord$.fromJava(ConsumerRecord.scala:187)
	at fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$2(KafkaConsumerActor.scala:325)
	at cats.data.NonEmptyVectorInstances$$anon$1.traverse(NonEmptyVector.scala:295)
	at cats.data.NonEmptyVectorInstances$$anon$1.traverse(NonEmptyVector.scala:243)
	at cats.Traverse$Ops.traverse(Traverse.scala:19)
	at cats.Traverse$Ops.traverse$(Traverse.scala:19)
	at cats.Traverse$ToTraverseOps$$anon$2.traverse(Traverse.scala:19)
	at fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$1(KafkaConsumerActor.scala:323)
	at cats.instances.VectorInstances$$anon$1.$anonfun$traverse$2(vector.scala:78)
	at cats.instances.VectorInstances$$anon$1.loop$2(vector.scala:41)
	at cats.instances.VectorInstances$$anon$1.$anonfun$foldRight$2(vector.scala:42)
	at cats.Eval$.advance(Eval.scala:271)
	at cats.Eval$.loop$1(Eval.scala:350)
	at cats.Eval$.cats$Eval$$evaluate(Eval.scala:368)
	at cats.Eval$Defer.value(Eval.scala:257)
	at cats.instances.VectorInstances$$anon$1.traverse(vector.scala:77)
	at cats.instances.VectorInstances$$anon$1.traverse(vector.scala:13)
	at cats.Traverse$Ops.traverse(Traverse.scala:19)
	at cats.Traverse$Ops.traverse$(Traverse.scala:19)
	at cats.Traverse$ToTraverseOps$$anon$2.traverse(Traverse.scala:19)
	at fs2.kafka.internal.KafkaConsumerActor.records(KafkaConsumerActor.scala:320)
	at fs2.kafka.internal.KafkaConsumerActor.$anonfun$poll$2(KafkaConsumerActor.scala:352)
	at cats.effect.internals.IORunLoop$.liftedTree1$1(IORunLoop.scala:95)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:95)
	at cats.effect.internals.IORunLoop$.startCancelable(IORunLoop.scala:41)
	at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:86)
	at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:70)
	at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:36)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:93)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:93)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:93)
	at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
	at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:44)
	at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:72)
	at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:52)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:136)
	at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:355)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:376)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:316)
	at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

isomorphism between fs2-ConsumerRecord and kafka raw ConsumerRecord

I try to establish an isomorphism between fs2's ConsumerRecord and the kafka raw ConsumerRecord: https://github.com/chenharryhua/nanjin/blob/master/codec/src/main/scala/com/github/chenharryhua/nanjin/codec/BitraverseFs2Message.scala#L53

the problem here is the the implementation of fs2 ConsumerRecords' TimestampType does not carry the timestamp information when it is none.

My understanding is that the timestamp-type is not related to timestamp so that they might setup independently.
Am I missing something?

Add logging

Currently the lib does not issue any debug level logs. It would be nice to log actor state changes.

Add automatic management of transactional.id to TransactionalKafkaProducer

Follow-up from #128 / #130.

Quoting the PR:

I found that Kafka Streams uses a unique transactional ID per "task" (topic/partition). Other projects like spring-kafka have followed the same pattern. From what I've read, it's effectively the only way to ensure messages don't get double-processed when partitions get rebalanced.

If I understand correctly, TransactionalKafkaProducer in its current form is only really useful if you have a single instance in the consumer group. With multiple instances, you need a stable transactional.id across instances per topic-partition, which is not trivial to implement yourself, and would fit better in the library. I definitely agree this is something we should handle in the library.

The mentioned topic-partition strategy also means one producer per topic-partition, right? That might be a bit much in the single instance scenario, but for multiple instances it sounds acceptable. Perhaps this is even behaviour we can toggle in ProducerSettings, while the remaining API stays as is? If you agree, then we can finish this up and merge it, and then do the remaining work in a separate pull request.

(I think the most tricky part of the producer-per-topic-partition is managing creating and closing producers as partitions are assigned and revoked. Maybe we could do something clever with rebalance listeners to get this working nicely.)

Consumer-only example

Hi, I'm really interested in using this library (thank you for it!), but it appears to be designed around a consumer-producer mode only.

If I wanted a composable stream - (i.e.)

def subscribe(topic: String): Stream[F, MyModel] =
    for {
      ec <- consumerExecutionContextStream[F]
      record <- consumerStream[F]
                 .using(consumerSettings("myGroup", AutoOffsetReset.Latest)(ec))
                 .evalTap(_.subscribeTo(topic))
                 .flatMap(_.stream)
                 .mapAsync(maxConcurrent = 25) { event =>
                   ConcurrentEffect[F].pure(event.record)
                 }
    } yield MyModel(record.value())

This appears to work, but doesn't perform very well (Caveat, I'm an FP novice). Could the lack of commitBatchWithinF(...) interfere with backpressure in this use case? I can't see any calls that let me invoke commit() periodically in the above example that don't break the composability of the function.

In short - I'd love an example where there isn't any assumption about a producer. (even the scaladoc for the commitBatchWithinF hints it was designed around this use case), and I just can return the caller a Stream[F, EventModel]?

Expose more of the underlying apache consumer methods

Currently the KafkaConsumer exposes only a tiny subset of all the apache kafka consumer methods.
Wouldn't it be easier to provide an (albeit dangerous) withConsumer (similar to the one on KafkaConsumerActor) instead of actor style communication? Speaking of which, what's the purpose of the actor since you anyway synchronize calls to the underlying apache kafka consumer?

Kafka.Id is private

I failed to annotate the return type because Kafka.Id is private.

val k: ProducerMessage[kafka.Id, K, V, Unit] //compiler error: Symbol Id is inaccessible from this place
would it be better to use cats.Id instead?

Kafka log messages duplicated

More of a question than an issue. Sorry, not sure where else to ask.

I have a program using fs2-kafka that works fine but all of the Kafka driver messages are duplicated. Is this normal?

12:03:10.606 [scala-execution-context-global-31] INFO o.a.k.c.s.a.AbstractLogin - Successfully logged in.
12:03:10.606 [scala-execution-context-global-31] INFO o.a.k.c.s.a.AbstractLogin - Successfully logged in.
12:03:10.644 [scala-execution-context-global-31] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
12:03:10.644 [scala-execution-context-global-31] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
12:03:10.644 [scala-execution-context-global-31] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
12:03:10.644 [scala-execution-context-global-31] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
12:03:11.033 [fs2-kafka-consumer-37] INFO org.apache.kafka.clients.Metadata - Cluster ID: eZRQO2AEQK-BYCMvtH1GCA
12:03:11.033 [fs2-kafka-consumer-37] INFO org.apache.kafka.clients.Metadata - Cluster ID: eZRQO2AEQK-BYCMvtH1GCA
12:03:11.034 [fs2-kafka-consumer-37] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=ebx-plugin-test-1.0, groupId=ebx-plugin-test-1.0] Discovered group coordinator kafka:9093 (id: 2147482646 rack: null)
12:03:11.034 [fs2-kafka-consumer-37] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=ebx-plugin-test-1.0, groupId=ebx-plugin-test-1.0] Discovered group coordinator kafka:9093 (id: 2147482646 rack: null)
12:03:11.036 [fs2-kafka-consumer-37] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=ebx-plugin-test-1.0, groupId=ebx-plugin-test-1.0] Revoking previously assigned partitions []
12:03:11.036 [fs2-kafka-consumer-37] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=ebx-plugin-test-1.0, groupId=ebx-plugin-test-1.0] Revoking previously assigned partitions []
12:03:11.036 [fs2-kafka-consumer-37] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=ebx-plugin-test-1.0, groupId=ebx-plugin-test-1.0] (Re-)joining group
12:03:11.036 [fs2-kafka-consumer-37] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=ebx-plugin-test-1.0, groupId=ebx-plugin-test-1.0] (Re-)joining group
12:03:11.079 [fs2-kafka-consumer-37] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=ebx-plugin-test-1.0, groupId=ebx-plugin-test-1.0] Successfully joined group with generation 11
12:03:11.079 [fs2-kafka-consumer-37] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=ebx-plugin-test-1.0, groupId=ebx-plugin-test-1.0] Successfully joined group with generation 11
12:03:11.080 [fs2-kafka-consumer-37] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=ebx-plugin-test-1.0, groupId=ebx-plugin-test-1.0] Setting newly assigned partitions [ebx-templates-0, ebx-users-0, ebx-contracts-0]
12:03:11.080 [fs2-kafka-consumer-37] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=ebx-plugin-test-1.0, groupId=ebx-plugin-test-1.0] Setting newly assigned partitions [ebx-templates-0, ebx-users-0, ebx-contracts-0]

Compilation error after upgrading to v0.20.0-RC1

I get the following error after upgrading from 0.20.0-M2 to 0.20.0-RC1:

[error] /foo/server/src/main/scala/com/abc/def/Server.scala:73:38: Symbol 'type cats.Parallel.Aux' is missing from the classpath.
[error] This symbol is required by 'method cats.effect.IOInstances.ioParallel'.
[error] Make sure that type Aux is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
[error] A full rebuild may help if 'IOInstances.class' was compiled against an incompatible version of cats.Parallel.
[error]       serverBuilder <- ServerBuilder[IO](httpPort)
[error]                                      ^
[warn] two warnings found
[error] one error found
[error] (server / Compile / compileIncremental) Compilation failed

Possibility to use circeJsonDeserializer

I'm trying to use a circeJsonDeserializer from kafka-serialization-circe instead of a StringDeserializer. It works fine but when there is a message which cannot be deserialised the stream halts. I can also not recover by using Stream.handleErrorWith.

As a work around I'm using a StringDeserializer and do the JSON decoding in a Stream.evalMap where I catch potential exceptions thrown by circeJsonDeserializer.deserialize.

Is there a way to use circeJsonDeserializer instead of StringDeserializer and be able to recover from deserialisation exceptions?

Support ACL management in KafkaAdminClient

Add support of Java AdminClient methods such as

  • describeAcls
  • createAcls
  • deleteAcls

@vlovgr appreciate your input on this issue. As I see, the KafkaAdminClient.scala just needs to be extended with above operations.

KafkaProducer does not produce record

Hi all
I have the problem, that the KafkaProducer does not send the record to Kafka.
The code is host on https://gitlab.com/sweetsoft/connector/blob/master/consumer/src/test/scala/com/sweetsoft/consumer/MaintainerSpec.scala and the problem is:

private def sendRecord[F[_]](msg: Envelop, settings: ProducerSettings[F, String, String])
                              (implicit F: ConcurrentEffect[F], context: ContextShift[F])
  : Stream[F, Unit] =
    Stream.eval(Applicative[F].pure(ProducerRecords.one[String, String](ProducerRecord(msg.topic, msg.event, msg.data))))
      .through(produce[F, String, String, Unit](settings))
      .map { e =>
        println(e)
        ()
      }

It does not send the record to Kafka.

You can download the code and test it.
Thanks for your help.

Change to use a single CommittableOffsets construct

There's currently two constructs for committable offsets:

The idea is to unify these two into a single construct, CommittableOffsets. The idea is that it works similarly to CommittableOffsetBatch, except when combining two offsets for the same topic-partition, it chooses the highset offset (CommittableOffsetBatch simply chooses the right-hand side). This should allow us to form a Monoid[CommittableOffsets], so that we can write:

offsets.combineAll.commit

given offsets is F[CommittableOffsets] with Foldable[F] (most of the time F will be Chunk).

java.lang.RuntimeException: io.confluent.kafka.serializers.subject.TopicNameStrategy is not an instance of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy

Hi,

I've been seeing this error intermittently when using fs2-kafka in conjunction with Vulcan. It seems to only happen when used in tests and Fabio has also seen this problem.

I've created a simple project where I was able to reproduce the problem: https://github.com/ra-ovo/fs2-kafka-vulcan-issue. There's a screenshot showing the error and the exception stack trace. To reproduce it had to run SBT in repl mode and hit test a couple of times until I finally got the error. I've been trying to explore the problem, without success, on another branch of that repo.

Note that, because I need a schema registry on that test app, I've used the docker configuration from the docker-compose.yml file in the project. All of the configurations in the code point to the defaults ports exported by that docker-compose file.

Using sbt-dependency-graph and IntelliJ I saw that there's a couple of kafka-clients versions showing up: 5.4.0-ccs and 2.4.0 (by using sbt-dependency-graph) and 5.4.0 (when using IntelliJ to open the AbstractKafkaAvroSerDeConfig class, one of the ones that show up in the stack trace).

I'm not entirely sure if this is even a problem on fs2-kafka (on the Vulcan module) or whether anything can be done here, but any hint on how work around this will be very much appreciated.

Please let me know if I can help with this issue.

Thanks!

produceRecord implementation

produceRecord require ConcurrentEffect. I am wondering if Concurrent is enough for the job?

def doSend(data: ProducerRecord[Array[Byte], Array[Byte]]): F[F[RecordMetadata]] = {
      val run = Concurrent[F].async[RecordMetadata] { cb =>
        producer.send(
          data,
          new Callback {
            override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit =
              Option(exception).fold(cb(Right(metadata)))(ex => cb(Left(ex)))
          }
        )
      }
      Concurrent[F].start(run).map(_.join)
    }

How to make infinite retry

Maybe im lacking some fs2 knowledge, but im interested in scenario where I could infinitely keep polling from kafka and repeat the message processing. Cant find this scenario in docs and I bet its dead simple but apparently im missing something.

Race condition between `subscribe` and `stream` leading to duplicated records

When subscribing to a topic the KafkaConsumerActor adds a rebalancing hook that registers FetchRequests.
Then when calling stream the KafkaConsumer will requests partition assignments and - that's the race condition - depending on the timing might get some partition ids for which it will also register FetchRequests.
This is aggravated by the fact that the KafkaConsumerActor.State does not restrict the amount of FetchRequests per (partition, consumer), leading to duplicated records sent downstream.

Add missing extractors for most data types

Most provided data types which provide an apply function should also provide an unapply function.

For example, unapply for ProducerResult would look like:

object ProducerResult {
  def unapply[K, V, P](result: ProducerResult[K, V, P]): Some[(Chunk[(ProducerRecord[K, V], RecordMetadata)], P)] =
    Some((result.records, result.passthrough))
}

which would allow us to match on ProducerResults (note the Some rather than Option in the return type, which is required for exhaustivity checking).

Transactions over multiple topics with messages of different types

Hey guys,

Imagine a use case where a service consumes a message with data a: A from topic aTopic and is supposed to processes it to obtain data b: B and c: C, open a transaction, write b to bTopic and c to cTopic, commit consumption of a's offset, and commit and close the transaction.

A TransactionalKafkaProducer[F, K, V] can .produce TransactionalProducerRecords[F, G, K, V, P] and optionally commit messages from which these records are originated, which is very convenient. But this does not seem to solve the problem at hand.

Is there a different means of doing that in FS2 Kafka?
If there isn't, do you plan to add this functionality?
If you don't, would you be interested in a pull request on the subject?

Thanks!

Simplify the definition of CommitRecovery

It would be nice to explore if we can change the definition of CommitRecovery to something along the following lines. ConsumerSettings could then accept an F[CommitRecovery[F]], and the recovery functions are then less constrained in their implementation.

abstract class CommitRecovery[F[_]] {
  def recoverCommitWith(
    offsets: Map[TopicPartition, OffsetAndMetadata],
    commit: F[Unit]
  ): Throwable => F[Unit]
}

Release v1.0.0

FS2 Kafka is already being used in production across several companies. We should therefore release a v1.0.0 to more explicitly communicate binary compatibility. Before committing to v1.0.0, it might be a good idea to squeeze in a few changes.

Some proposed changes as follows.

Provide functions for creating KafkaConsumer test instances

It would be useful to provide functions for creating test versions of KafkaConsumer.

As a first step, the following function would be a useful addition.

object KafkaConsumer {
  def fromRecords[F[_], K, V](records: F[ConsumerRecord[K, V]])(
    implicit F: Foldable[F]
  ): KafkaConsumer[F, K, V] =
    ???
}

Following, it would be useful to be able to track what offsets have been committed.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.