Giter VIP home page Giter VIP logo

akka / alpakka-kafka Goto Github PK

View Code? Open in Web Editor NEW
1.4K 74.0 386.0 4.2 MB

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.

Home Page: https://doc.akka.io/docs/alpakka-kafka/current/home.html

License: Other

Scala 86.44% Java 13.53% Shell 0.04%
kafka java scala reactive-streams akka akka-streams reactive kafka-client kafka-connector

alpakka-kafka's Introduction

Alpakka Kafka gh-actions-badge

Systems don't come alone. In the modern world of microservices and cloud deployment, new components must interact with legacy systems, making integration an important key to success. Reactive Streams give us a technology-independent tool to let these heterogeneous systems communicate without overwhelming each other.

The Alpakka project is an open source initiative to implement stream-aware, reactive, integration pipelines for Java and Scala. It is built on top of Akka Streams, and has been designed from the ground up to understand streaming natively and provide a DSL for reactive and stream-oriented programming, with built-in support for backpressure. Akka Streams is a Reactive Streams and JDK 9+ java.util.concurrent.Flow-compliant implementation and therefore fully interoperable with other implementations.

This repository contains the sources for the Alpakka Kafka connector. Which lets you connect Apache Kafka to Akka Streams. It was formerly known as Akka Streams Kafka and even Reactive Kafka.

Akka Stream connectors to other technologies are listed in the Alpakka repository.

Documentation

To keep up with the latest Alpakka releases check out Alpakka releases and Alpakka Kafka releases.

Community

You can join these groups and chats to discuss and ask Akka and Alpakka related questions:

In addition to that, you may enjoy following:

The Kafka connector was originally created as Reactive Kafka by SoftwareMill logo.

Contributing

The Akka family of projects is managed by teams at Lightbend with help from the community.

Contributions are very welcome! Lightbend appreciates community contributions by both those new to Alpakka and those more experienced.

Alpakka depends on the community to keep up with the ever-growing number of technologies with which to integrate. Please step up and share the successful Akka Stream integrations you implement with the Alpakka community.

If you find an issue that you'd like to see fixed, the quickest way to make that happen is to implement the fix and submit a pull request.

Refer to the CONTRIBUTING.md file for more details about the workflow, and general hints on how to prepare your pull request.

You can also ask for clarifications or guidance in GitHub issues directly.

Caveat Emptor

Alpakka components are not always binary compatible between releases. API changes that are not backward compatible might be introduced as we refine and simplify based on your feedback. A module may be dropped in any release without prior deprecation.

License

Akka is licensed under the Business Source License 1.1, please see the Akka License FAQ.

Tests and documentation are under a separate license, see the LICENSE file in each documentation and test root directory for details.

alpakka-kafka's People

Contributors

13h3r avatar 2m avatar amorfis avatar charlibot avatar chbatey avatar e8kor avatar ennru avatar gabrielreid avatar ignasi35 avatar jamesmorgan avatar jeqo avatar jiminhsieh avatar johanandren avatar jyates avatar kazuhiro avatar kciesielski avatar ktoso avatar leviramsey avatar manub avatar nafshartous avatar octonato avatar patriknw avatar philippus avatar raboof avatar rgcase avatar rtimush avatar scala-steward avatar seglo avatar sullis avatar ygree avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

alpakka-kafka's Issues

Add support for user-written partitionizers to the Java API

Right now this capability is available only in the Scala API. This is because the signature of the ProducerProperties constructor contains a partitionizer parameter that is a function that takes a scala.Function1, which does not satisfy the Java 8 functional interface requirement that there be only one unimplemented method.

Support Topic Filters

I need to be able to setup a stream using topic filters.

The idea is to be able to setup a Consumer Group given a topic filter. In turn, we want to be able to consume messages from each topic that matches the filter.

I am going to be using Reactive Streams for the processing the messages. If there is something already in the works, it would be great. Otherwise, perhaps I can create a PR. I wasn't sure what the status was with the Simple Consumer work.

support error handling for KafkaActorSubscriber

  private def processElement(element: T) = {
    producer.send(props.encoder.toBytes(element), props.partitionizer(element))
  }

I here the encoder may cause an error,and I don't want the actor subscriber to fail.I can't find anyway to handle it.

the akka doc said :

ActorPublisher source and ActorSubscriber sink components do not honour the supervision strategy attribute yet.

should recreate KafkaProducer once actor restart

the current code have a bug that once there is an exception occur,theKafkaActorSubscriber is restarted ,and then cause the producer was passed in and was closed then it will cause exception of producer already closed,and will trap the subscriber.

Consumption of messages pause after 10mins

In my application I create 3 consumers each consuming from 8 partitions (total 24).
Application runs smoothly by consuming message for around 10mins from startup.

Then I get 1 or 2 INFO messages like the following in my logs:

INFO [2016-01-26 01:40:07,221] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Attempt to heart beat failed since the group is rebalancing, try to re-join group.

Resulting in consumption lag to be increased and after a while a portion of the consumer/partitions begin to consume messages again, some other consumer/partitions seem to never start again.

I am on the default parameters (for dispatchers). Is these a bug? or is it related to my configurations?

Please let me know with ideas/comments and if there are extra stuff you need so I could elaborate

Thanks in advance

Kafka consumer

I have a project that deals with high volume of twitter data. Kafka is used for queuing tweets for later processing. I have two metrics of interest - one that shows how much time it takes for a tweet to come in the system before being put to Kafka and a second that displays the total time it takes from creating a tweet to storing it in a db after it is fetched from Kafka. The first one is pretty much constant. The second however is slowly rising.

I'm investigating as to what the cause could be and I ended up with the growing number of tweets in Kafka as the probable reason. And it kind of makes sense since the time displayed by the second metric is proportional to the increasing number of tweets. It could be both the way the kafka is setup and the way the I'm reading from the queue.

Something that may be of interest is that I have a implemented a simple partitionizer for the Producer because I need the tweets in the order they are created with respect to a particular conversation.

private def createSupervisedSubscriberActor() = {
    val kafka = new ReactiveKafka()

    val subscriberProperties = ProducerProperties(
      brokerList = config.getStringList("kafka.brokers").toList.mkString(","),
      topic = config.getString("kafka.topic"),
      clientId = config.getString("kafka.clientId"),
      encoder = new StringEncoder(),
      partitionizer
    )

    val subscriberActorProps: Props = kafka.producerActorProps(subscriberProperties)
    context.actorOf(subscriberActorProps, subscriber)
}

private def partitionizer: String => Option[Array[Byte]] = (s: String) => Option(s.getBytes)

On the consumer side I am simply reading from the queue. I expect that I'm just getting the latest message every time something is pushed to the end of the queue. Am I right? Can you suggest something that I could do to optimize my reading from Kafka? How does the size of the queue effect the way the consumer reads from it?

Please let me know if you are missing any info.
Thanks for the help!

How to use it as flow part instead of sink

Hi

May be not a good place to ask this. But I can't figure out how to use the subscriber as other place than Sink. I want to use the producer as flow part not sink.
So this working flow.to(Sink(subscriber))
But how I can use like flow.via(subscriber).via ....
Then I need also to be able to throw exception out
Thanks

Remove deprecated API

The ReactiveKafka class should have an empty constructor. We also should remove old, deprecated functions.

Akka Streams 2.0-M1?

Looks like Akka Streams 1.0 is deprecated and the site documentation recommends migrating to version 2.0. Do you have a branch that is tracking this?

Change from Decoder to Deserializer

I see that passing from version 0.8 to 0.9 the ConsumerProperties are changed to take a org.apache.kafka.common.serialization.Deserializer instead of a kafka.serializer.Decoder.

It seems that the Deserializer API is a little more general, but I would like to understand the rationale behind the change. Is it due to an upstream change - that is, Kafka 0.9 now takes Deserializers instead of Decoders? I see no deprecation in the Kafka Scala API.

Moreover, if I understand correctly, the namespace kafka holds the Scala API, while the namespace org.apache.kafka holds the Java API. Does this mean that Kafka is now trying to unify the two APIs?

It is no practical problem for me to use one or the other, but I was curious what is the reason for this change, and if it has significant consequences

How to scale out reading from partitions via akka cluster?

Hi,

First of all great project! I'm planning to kick off a POC for a component that augments a spark streaming app with exactly once semantics for sending HTTP calls. Instead of sending from spark I'm planning to publish the messages to kafka and send them from an akka app, in order to avoid duplicate messages in case of spark executor failures (which trigger reprocessing of side effects).

The idea is to use reactive-kafka + Hbase or Cassandra (similar to #51) in order to simulate transactions regarding whether a call has already been sent or not (to avoid duplicates).

Anyway - the first question that came to mind is how can I distribute the load across an akka-cluster when reading from a large number of partitions (high tens) that may contain tens of thousands of messages per second?
Even if one machine could cope with the load (although I doubt it) I'd like to use akka-cluster for HA and failover capability.

So the question is - what is your recommended design for sharding out partitions across a cluster similar to how the spark direct consumer does it?

Thanks!

Native committer fails to work after broken connection

A problem has been reported by @cleliofs:

Hi, we are having a problem on the offset committer (com.softwaremill.react.kafka.commit.native.NativeCommitFailedException: Failed to commit after 5 retires). Since last week we lost the commits for a specific topic (other topics seem to been committed fine) and the current offset commit lag is in the magnitude of ~ 100k. This error message is thrown every 5 secs (based on the commit interval as explained below).
The underlining exception java.io.IOException: Broken pipe, the last Reactive Code before this exception is thrown is at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
The pain is that once it gets into that stage of NativeCommitFailedException it never recovers, I mean all offset commits keep failing, which it is a bit strange if we consider a temporary communication issue that should be sorted after sometime?

Any plans for Producer/Consumer using org.apache.kafka.clients?

Firstly, thank you for your work with reactive-kafka!

Secondary, I ran with limitation of integration Confluent Kafka Avro Serialization and Confluent Schema Registry using old Producer/Consumer (which writing in Scala). According to Kafka documentation:

As of 0.8.2 Kafka includes a newly rewritten Java producer. The next release will include an equivalent Java consumer. These new clients are meant to supplant the existing Scala clients, but for compatability they will co-exist for some time. These clients are available in a seperate jar with minimal dependencies, while the old Scala clients remain packaged with the server.

Integration with new Producer/Consumer would require a change how the reactive-kafka will be configured, as well a internal implementation of KafkaActorPublisher and KafkaActorSubscriber. I am not see the paths how to not break user-api.

Any thoughts?

Graceful shutdown in KafkaActorPublisher

As it's mentioned in Handling errors, it's possible to have a publisher or consumer actor supervised by its parent. I'm doing that as follows:

val publisherActorProps = kafka.consumerActorProps(consumerProps)
 .withDispatcher(ReactiveKafka.ConsumerDefaultDispatcher)
val publisherActor = context.actorOf(publisherActorProps)
val kafkaPublisher: Publisher[KafkaMessage[Payload]] = ActorPublisher(publisherActor)
Source(kafkaPublisher) runForeach (context.self ! _.message())

Unfortunately, if the parent actor is stopped then its child actors are also stopped in advance so that the publisher actor is not shutdown gracefully and the kafka consumer is not closed either. So now, my only way to shutdown gracefully is sending a previous cancellation message as follows:

publisherActor ! ActorPublisherMessage.Cancel

But what happen if the parent actor is unexpectedly stopped?. I think the kafka consumer won't be properly stopped and remain working trying to read from the kafka topic.

As far as I understand, a way to solve this would be overriding postStop() in KafkaActorPublisher and sending the cancellation message there.

WDYT?.

Thanks in advance.

Off-by-one issue in offset committer

Hi,

I believe there is an issue in the offset committer.
As per the kafka API, the offset is the position the consumer shall read from to process its next message. So when flushing offsets (which goes through NativeCommitter.commitPhaseTrial) the offset of the last processed message is incremented by 1 (OffsetMap.toCommitRequestInfo) before being sent, and the result (NativeCommitter.getOffsetsTrial) is kept.
However this result is used as base of comparison for the next flush trigger (ConsumerCommitter.commitGatheredOffsets): the issue here is that the code compares values with different meaning since a registered commit request contains the last processed offset, while the flush response contains the first unprocessed offset.

For example if you enable debug logging, you get the following result:

...
[c.s.r.kafka.commit.ConsumerCommitter] Received commit request for partition 0 and offset 34
[c.s.r.kafka.commit.ConsumerCommitter] Registering commit for partition 0 and offset 34, last registered = 33
[c.s.r.kafka.commit.ConsumerCommitter] Flushing offsets to commit
[c.s.r.kafka.commit.ConsumerCommitter] committed offsets: OffsetMap(Map([topic-name,0] -> 35))
[c.s.r.kafka.commit.ConsumerCommitter] Received commit request for partition 0 and offset 35
[c.s.r.kafka.commit.ConsumerCommitter] Registering commit for partition 0 and offset 35, last registered = 34
...

As you can see, offset 34 is registered, but the committer will send 34+1=35 (as can be seen in the committed offsets).

This triggers 2 issues if no more than one commit request (for the same topic) is received after a flush and before the next one:

  1. The last commit request is not committed, because ConsumerCommitter.commitGatheredOffsets compares the received value to the previous flush result (which are the same in this case) and does nothing if there is no difference (the diff result stored in offsetMapToFlush is empty).
  2. Even if new commit requests are received later, they are no more processed because since the code believed there was nothing to commit (see 1.), the performFlush function was not called and the scheduledFlush variable was not set to None: this prevents a registered commit request to schedule a new flush (since it believes one is already pending).
    This issue was introduced with e2d3e47.

Make the GraphStage API parts reusable

The issue has been reported by @13h3r:
The idea of graphs and graph stages are to be completely reusable. You construct you graph with shapes and nothing happens. Then you run your graph and you have an access to "runtime" stuff via materialization process.

GraphStage API in reactive kafka right now is mix of graph and runtime stuff. For example - sourceWithOffsetSink. Creation of ReactiveKafkaConsumer makes graph not reusable and breaks the contract of graphs. All runtime stuff should be created at materialization step.

Change group name

Currently it's a generic com.softwaremill, probably would be better to have a kafka-specific one?

onNext is not allowed when the stream has not requested elements

We're getting the following occassional exception in our logs:

java.lang.IllegalStateException: onNext is not allowed when the stream has not requested elements, totalDemand was 0
        at akka.stream.actor.ActorPublisher$class.onNext(ActorPublisher.scala:190) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0-RC3.jar:na]
        at com.softwaremill.react.kafka.KafkaActorPublisher.onNext(KafkaActorPublisher.scala:12) ~[com.softwaremill.reactive-kafka_2.11-0.6.0.jar:0.6.0]
        at com.softwaremill.react.kafka.KafkaActorPublisher$$anonfun$com$softwaremill$react$kafka$KafkaActorPublisher$$readDemandedItems$1.apply(KafkaActorPublisher.scala:35) ~[com.softwaremill.reactive-kafka_2.11-0.6.0.jar:0.6.0]
        at com.softwaremill.react.kafka.KafkaActorPublisher$$anonfun$com$softwaremill$react$kafka$KafkaActorPublisher$$readDemandedItems$1.apply(KafkaActorPublisher.scala:35) ~[com.softwaremill.reactive-kafka_2.11-0.6.0.jar:0.6.0]
        at scala.Option.foreach(Option.scala:257) ~[org.scala-lang.scala-library-2.11.6.jar:na]
        at com.softwaremill.react.kafka.KafkaActorPublisher.com$softwaremill$react$kafka$KafkaActorPublisher$$readDemandedItems(KafkaActorPublisher.scala:35) ~[com.softwaremill.reactive-kafka_2.11-0.6.0.jar:0.6.0]
        at com.softwaremill.react.kafka.KafkaActorPublisher$$anonfun$receive$1.applyOrElse(KafkaActorPublisher.scala:17) ~[com.softwaremill.reactive-kafka_2.11-0.6.0.jar:0.6.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467) ~[com.typesafe.akka.akka-actor_2.11-2.3.11.jar:na]
        at com.softwaremill.react.kafka.KafkaActorPublisher.akka$stream$actor$ActorPublisher$$super$aroundReceive(KafkaActorPublisher.scala:12) ~[com.softwaremill.reactive-kafka_2.11-0.6.0.jar:0.6.0]
        at akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:317) ~[com.typesafe.akka.akka-stream-experimental_2.11-1.0-RC3.jar:na]
        at com.softwaremill.react.kafka.KafkaActorPublisher.aroundReceive(KafkaActorPublisher.scala:12) ~[com.softwaremill.reactive-kafka_2.11-0.6.0.jar:0.6.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) ~[com.typesafe.akka.akka-actor_2.11-2.3.11.jar:na]
        at akka.actor.ActorCell.invoke(ActorCell.scala:487) ~[com.typesafe.akka.akka-actor_2.11-2.3.11.jar:na]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) ~[com.typesafe.akka.akka-actor_2.11-2.3.11.jar:na]
        at akka.dispatch.Mailbox.run(Mailbox.scala:220) ~[com.typesafe.akka.akka-actor_2.11-2.3.11.jar:na]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_40]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_40]
        at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_40]

It doesn't seem like the stream is suffering from this, probably due to the one-for-one actor recover strategy.

Java PropertiesBuilder.java throws NPE

https://github.com/softwaremill/reactive-kafka/blob/master/core/src/main/java/com/softwaremill/react/kafka/PropertiesBuilder.java#L131

When this line is triggered, partitionizer is null, and cause NPE thrown at https://github.com/softwaremill/reactive-kafka/blob/ebf7aa417d7e481d4dd1526194d0d15193e11cc4/core/src/main/scala/com/softwaremill/react/kafka/KafkaActorSubscriber.scala#L27


[ERROR] [11/08/2015 10:55:08.843] [ReactiveKafka-akka.actor.default-dispatcher-4] [akka://ReactiveKafka/user/$b] null
java.lang.NullPointerException
at com.softwaremill.react.kafka.KafkaActorSubscriber.com$softwaremill$react$kafka$KafkaActorSubscriber$$processElement(KafkaActorSubscriber.scala:27)
at com.softwaremill.react.kafka.KafkaActorSubscriber$$anonfun$receive$1.applyOrElse(KafkaActorSubscriber.scala:18)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at com.softwaremill.react.kafka.KafkaActorSubscriber.akka$stream$actor$ActorSubscriber$$super$aroundReceive(KafkaActorSubscriber.scala:7)
at akka.stream.actor.ActorSubscriber$class.aroundReceive(ActorSubscriber.scala:180)
at com.softwaremill.react.kafka.KafkaActorSubscriber.aroundReceive(KafkaActorSubscriber.scala:7)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Error handling

We streaming Twitter data from a service called GNIP suddenly I get a

ERROR [lt-dispatcher-2] c.s.r.k.KafkaActorSubscriber Stopping Kafka subscriber due to fatal error. WARNING arguments left: 1

As I read in order to understand what is happening I need to implement error handling with actors.
I would like some help on the example. Can someone show me how I can create a sink from the actor that supervises the KafkaActorSubscriber child?

Java API connection problems

There's an issue with the Java api, I get this error:

Caused by: org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 6000
    at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:880)
    at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:98)
    at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:84)
    at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:171)
    at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:126)
    at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:143)
    at kafka.consumer.Consumer$.create(ConsumerConnector.scala:94)
    at kafka.consumer.KafkaConsumer.<init>(KafkaConsumer.scala:12)
    at com.softwaremill.react.kafka.ReactiveKafka.consumerActorProps(ReactiveKafka.scala:174)
    at com.softwaremill.react.kafka.ReactiveKafka.consumerActor(ReactiveKafka.scala:170)
    at com.softwaremill.react.kafka.ReactiveKafka.consumerActor(ReactiveKafka.scala:163)
    at com.softwaremill.react.kafka.ReactiveKafka.consume(ReactiveKafka.scala:152)

This is the code I'm using to create the consumer

ConsumerProperties<Byte[], String> consumerProps =
                new PropertiesBuilder.Consumer(zookeeperHost, brokerList, topic, groupName, new StringDecoder(null))
                        .build();

        Publisher<KeyValueKafkaMessage<Byte[], String>> publisher = this.reactiveKafka.consume(consumerProps, actorSystem);

        return Source.from(publisher).map(KeyValueKafkaMessage::msg);

I'm still trying to figure out why, will let you know if I make progress - might be in the morning though as I've got to head off soon.

FYI this code seems to fair a little better (connects), although it seems to fail to shutdown

public Source<String, BoxedUnit> createConsumerSource(final String topic, final String groupName, ActorSystem actorSystem) {

        Map<String, String> consumerParams = new HashMap<>();
        consumerParams.put("zookeeper.connect", zookeeperHost);
        consumerParams.put("group.id", groupName);
        ConsumerProperties<String, String> consumerProps = new ConsumerProperties<>(
                toScalaMap(consumerParams),
                topic,
                groupName,
                new StringDecoder(null),
                new StringDecoder(null)
        );

        Publisher<KeyValueKafkaMessage<String, String>> publisher = this.reactiveKafka.consume(consumerProps, actorSystem);

        return Source.from(publisher).map(KeyValueKafkaMessage::msg);
    }

    private static scala.collection.immutable.Map toScalaMap(Map<String, String> javaMap) {
        return new scala.collection.immutable.HashMap<String, String>().$plus$plus(
                JavaConverters.mapAsScalaMapConverter(javaMap).asScala()
        );
    }

Simple Kafka Consumer

This request somewhat relates to #3. It would be very useful if one could materialize a partition source with a given offset - leaving the offset management completely to the application.

possible api:

class ReactiveKafka(val host: String, val zooKeeperHost: String) {
/** consumes from a given topic/partition at a given offset and provides a stream of tuples of offset and message */ 
def consume(topic: String, partitionId: Int, startOffset:Long)(implicit actorSystem: ActorSystem): Publisher[(Long,String)] = { ...

Though, it might be a bit of work to create a robust simple consumer ([example from the kafka-wiki(]https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example)).

Kafka 0.8.2.2

I understand that it is a small patch update but for the sake of continuity are you looking to upgrade the Kafka dependency to 0.8.2.2? If not is there a reason why you holding off?

The New API ProducerProvider sample does not compile

The code samples in https://github.com/softwaremill/reactive-kafka/blob/master/docs/NewAPI.md are not included in the project (at core/test/scala/examples), and the ProducerProvider sample does not seem to compile.

I was expecting that code + docs would be committed to master at the same time - if documents are referring to some sample, it should compile, right? Otherwise following the project becomes really hard. So I presume this is a bug?

...Will provide more information soon...

commit 2086da0

Close resources when done

Hello, I am trying to understand how to properly close the Kafka connection when done. According to the documentation, this is done by sending the message com.softwaremill.react.kafka.KafkaActorPublisher.Stop to the actor in charge of reading data from Kafka.

I see two issues with this, one minor and the other major.

The minor one is that there seems not to be a way to do cleanup when using the higher level API - the only way seems to be to manually create an actor even if it is used just for this purpose.

The major one is that the message seems to be private to your codebase. Namely

import com.softwaremill.react.kafka.KafkaActorPublisher.Stop

results in object KafkaActorPublisher in package kafka cannot be accessed in package com.softwaremill.react.kafka

I think there is need to publish a version that exports this message (and possibly other that I did not find). Even better would be to add a function that sends this message internally, while only requiring clients to interact with the KafkaPublisher, so that one needs to resort to working with actors only if that is desired

Offset management

Currently the streaming always starts from the start of the topic, as far as I understand the code. It would be good to have some sort of offset management:

  • as a starter, making it possible to stream from the end of a topic could be useful
  • Kafka can store the last-consumed offset to make the topic behave as a point-to-point messaging system (see https://kafka.apache.org/documentation.html#semantics). Storing the streamed offsets could be a first step
  • finally, an at-least-once-delivery option could be realised by accepting a stream of consumed offsets, and writing only the highest consumed offset. This could be very useful, but also hard to implement probably :)

how to shut down a stream cleanly

I would like to see an example of how to shut down kafka source cleanly and how to detect errors (for example if the kafka server goes down or if the connection fails)

How to store processed log to Kafka or Cassandra with Manual Commit?

Sorry I can't find a proper place to ask so I just open up an issue here (Close it if you think it's improper).

I'm currently doing a PoC based on this awesome reactive kafka module with the manual commit feature.
And I'm struggling adding a Sink to store log into some permanent storage system such as Kafka or Cassandra.
From your sample code messages are being processed on-the-fly in processMessage function, if I need to store data into Kafka, then I need to replace offsetCommitSink to another Kafka Sink, but in that way I can use offsetCommitSink to stream back for commit.
Another approach is to use a saveToKafka function to store processed log into Kafka(Shown in below), which is the current implementation of my PoC.

Source(consumerWithOffsetSink.publisher)
.map(processMessage()) // your message processing
.map(saveToKafka(
))
.to(consumerWithOffsetSink.offsetCommitSink) // stream back for commit
.run()

Do you think this is the best practice to achieve my goal?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.