Giter VIP home page Giter VIP logo

fs2-kafka's People

Contributors

adamchlupacek avatar coltfred avatar pchlupacek avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fs2-kafka's Issues

unable to publish without specific partitionId

for example, in "publishN" function, i must mention a specific partition id.

is there a way to get around it ?

def publishN(
topicId: String @@ TopicName
, partition: Int @@ PartitionId
, requireQuorum: Boolean
, serverAckTimeout: FiniteDuration
, compress: Option[Compression.Value]
)(messages: Chunk[(ByteVector, ByteVector)]): F[Long]

Broker failure recovery

Hi. I am getting very strange behaviour. Suppose we have one broker and we have subscribed to a topic there. We are happily consuming from this topic and then the broker goes down. After some time it is going up again and Kafka client is able to reconnect back, but it won't get any messages anymore, and there is no notification about this event that is exposed to fs2 code, so we can, for example, resubscribe to this topic, restart the consumer/stream or something like this.

I think this problem is common and someone can point me where to look for a solution (and maybe we can extend the documentation to cover this issue).

Symbol 'type fs2.util.Async' is missing from the classpath

Hi
I am trying to use fs2-kafka as following:

import fs2._
import spinoco.fs2.kafka
import spinoco.fs2.kafka._
import spinoco.protocol.kafka._
import scala.concurrent.duration._
import scodec.bits.ByteVector


object Client {

  def main(args: Array[String]): Unit = {

    kafka.client(
      ensemble = Set(broker("localhost", port = 9092))
      , protocol = ProtocolVersion.Kafka_0_10_2
      , clientName = "Khinkali"
    ) flatMap { kafkaClient =>
      kafkaClient
        .publish1(topic("topic-A"), partition(0), ByteVector("foo"), ByteVector("Hello FOO"), requireQuorum = true, serverAckTimeout = 10 seconds)
        .asInstanceOf
    }
  }

}

But I've got following error:

[info] Compiling 1 Scala source to /home/developer/Desktop/scala/fskafka/target/scala-2.12/classes ...
[error] /home/developer/Desktop/scala/fskafka/src/main/scala/Client.scala:16:5: Symbol 'type fs2.util.Async' is missing from the classpath.
[error] This symbol is required by 'value spinoco.fs2.kafka.package.F'.
[error] Make sure that type Async is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
[error] A full rebuild may help if 'package.class' was compiled against an incompatible version of fs2.util.
[error]     kafka.client(
[error]     ^
[error] /home/developer/Desktop/scala/fskafka/src/main/scala/Client.scala:16:17: could not find implicit value for parameter AG: java.nio.channels.AsynchronousChannelGroup
[error]     kafka.client(
[error]                 ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 1 s, completed Nov 13, 2017 12:17:13 PM
[IJ]sbt:fskafka> 

What am I doing wrong?
Thanks

Connections stuck in TIME_WAIT when getting offsetRangeFor.

When using offsetRangeFor the connections to brokers do not close up properly and are stuck in the "TIME_WAIT" state, this can be seen in netstat. The connections do die out after while.

There are two connections that get stuck like this. One is when we try to find a leader for the given topicId and second one is the actual fetch of the offsetRange.

There may be other instances where the connections do not close up properly. As such we should check every place where we communicate with kafka.

Status of project?

Hi - could you please tell me if this project is still under development?

I see the last commit was 7 months ago.

Thanks!

Kafka 1.0 support?

I'm still fairly new to this library, but Kafka released 1.0 this week. After doing #8 is there anything that needs to be done to support the 1.0 protocol? Is there anything to be gained? Should we at least test against it so we have something to say? Really I'm just looking for guidance here as a newcomer.

How do I commit offsets?

I found that after running a stream to get a nonempty collection of messages, the offsets weren't incremented. Is there a way in this client to commit offsets while the stream is running?

I used code something like the following, which threw NoOffsetForPartitionException. The groupIds on the clients are the same , and there was only one partition.

      val topicPartition = new TopicPartition(topic, 0)

      val receivedMessages = t.getKafkaMessages[IO]().take(messages.size).compile.toVector.unsafeRunSync()

      val consumerAfter =
        new KafkaConsumer(
          Map[String, AnyRef](
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
            ConsumerConfig.RETRY_BACKOFF_MS_CONFIG -> "1000",
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "none",
            ConsumerConfig.GROUP_ID_CONFIG -> "increments-offsets"
          ).asJava,
          new ByteArrayDeserializer(),
          Generic.kafkaDeserializer
        )
      consumerAfter.assign(List(topicPartition).asJava)
      val endOffset = consumerAfter.position(topicPartition)

fs2 0.10 support

Now that Cats 1.0.0-RC1 is out and fs2 0.10 is near final, it'd be nice to get a version built on top of those versions.

KafkaClient#subscribe scaladoc is wrong about subscribing to tail

KafkaClient#subscribe scaladoc says the following:

-1 specified start from tail (new message arriving to topic)

This is wrong as this will start reading the topic from the beginning rather than tail according to code. The message should say something like following.

TailOffset specified start from tail (new message arriving to topic)

Production usage

Hi
I wanted to ask, if it is save to use it in production?
Are you using in production?
Thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.