Giter VIP home page Giter VIP logo

kafka-streams-examples's Introduction

Kafka Streams Examples

This project contains code examples that demonstrate how to implement real-time applications and event-driven microservices using the Streams API of Apache Kafka aka Kafka Streams.

For more information take a look at the latest Confluent documentation on the Kafka Streams API, notably the Developer Guide


Table of Contents


This repository has several branches to help you find the correct code examples for the version of Apache Kafka and/or Confluent Platform that you are using. See Version Compatibility Matrix below for details.

There are three kinds of examples:

  • Examples under src/main/: These examples are short and concise. Also, you can interactively test-drive these examples, e.g. against a local Kafka cluster. If you want to actually run these examples, then you must first install and run Apache Kafka and friends, which we describe in section Packaging and running the examples. Each example also states its exact requirements and instructions at the very top.
  • Examples under src/test/: These examples should test applications under src/main/. Unit Tests with TopologyTestDriver test the stream logic without external system dependencies. The integration tests use an embedded Kafka clusters, feed input data to them (using the standard Kafka producer client), process the data using Kafka Streams, and finally read and verify the output results (using the standard Kafka consumer client). These examples are also a good starting point to learn how to implement your own end-to-end integration tests.
  • Ready-to-run Docker Examples: These examples are already built and containerized.

Additional examples may be found under src/main/.

Application Name Concepts used Java 8+ Java 7+ Scala
WordCount DSL, aggregation, stateful Java 8+ example Scala Example
MapFunction DSL, stateless transformations, map() Java 8+ example Scala Example
SessionWindows Sessionization of user events, user behavior analysis Java 7+ example
GlobalKTable join() between KStream and GlobalKTable Java 8+ example
GlobalStore "join" between KStream and GlobalStore Java 8+ example
PageViewRegion join() between KStream and KTable Java 8+ example Java 7+ example
PageViewRegionGenericAvro Working with data in Generic Avro format Java 8+ example Java 7+ example
WikipediaFeedSpecificAvro Working with data in Specific Avro format Java 8+ example Java 7+ example
SecureKafkaStreams Secure, encryption, client authentication Java 7+ example
Sum DSL, stateful transformations, reduce() Java 8+ example
WordCountInteractiveQueries Interactive Queries, REST, RPC Java 8+ example
KafkaMusic Interactive Queries, State Stores, REST API Java 8+ example
ApplicationReset Application Reset Tool kafka-streams-application-reset Java 8+ example
Microservice Microservice ecosystem, state stores, dynamic routing, joins, filtering, branching, stateful operations Java 8+ example

The stream processing of Kafka Streams can be unit tested with the TopologyTestDriver from the org.apache.kafka:kafka-streams-test-utils artifact. The test driver allows you to write sample input into your processing topology and validate its output.

See the documentation at Testing Streams Code.

We also provide several integration tests, which demonstrate end-to-end data pipelines. Here, we spawn embedded Kafka clusters and the Confluent Schema Registry, feed input data to them (using the standard Kafka producer client), process the data using Kafka Streams, and finally read and verify the output results (using the standard Kafka consumer client).

Additional examples may be found under src/test/.

Tip: Run mvn test to launch the tests.

Integration Test Name Concepts used Java 8+ Java 7+ Scala
WordCount DSL, aggregation, stateful Java 8+ Example Scala Example
WordCountInteractiveQueries Interactive Queries, REST, RPC Java 7+ Example
Aggregate DSL, groupBy(), aggregate() Java 8+ Example Scala Example
CustomStreamTableJoin DSL, Processor API, Transformers Java 8+ Example
EventDeduplication DSL, Processor API, Transformers Java 8+ Example
GlobalKTable DSL, global state Java 7+ Example
GlobalStore DSL, global state, Transformers Java 7+ Example
HandlingCorruptedInputRecords DSL, flatMap() Java 8+ Example
KafkaMusic (Interactive Queries) Interactive Queries, State Stores, REST API Java 7+ Example
MapFunction DSL, stateless transformations, map() Java 8+ Example
MixAndMatch DSL + Processor API Integrating DSL and Processor API Java 8+ Example
PassThrough DSL, stream(), to() Java 7+ Example
PoisonPill DSL, flatMap() Java 8+ Example
ProbabilisticCounting*** DSL, Processor API, custom state stores Scala Example
Reduce (Concatenate) DSL, groupByKey(), reduce() Java 8+ Example Scala Example
SessionWindows DSL, windowed aggregation, sessionization Java 7+ Example
StatesStoresDSL DSL, Processor API, Transformers Java 8+ Example
StreamToStreamJoin DSL, join() between KStream and KStream Java 7+ Example
StreamToTableJoin DSL, join() between KStream and KTable Java 7+ Example Scala Example
Sum DSL, aggregation, stateful, reduce() Java 8+ Example
TableToTableJoin DSL, join() between KTable and KTable Java 7+ Example
UserCountsPerRegion DSL, aggregation, stateful, count() Java 8+ Example
ValidateStateWithInteractiveQueries Interactive Queries for validating state Java 8+ Example
GenericAvro Working with data in Generic Avro format Java 7+ Example Scala Example
SpecificAvro Working with data in Specific Avro format Java 7+ Example Scala Example

***demonstrates how to probabilistically count items in an input stream by implementing a custom state store (CMSStore) that is backed by a Count-Min Sketch data structure (with the CMS implementation of Twitter Algebird)

This containerized example launches:

The Kafka Music application demonstrates how to build of a simple music charts application that continuously computes, in real-time, the latest charts such as latest Top 5 songs per music genre. It exposes its latest processing results -- the latest charts -- via Kafka’s Interactive Queries feature via a REST API. The application's input data is in Avro format, hence the use of Confluent Schema Registry, and comes from two sources: a stream of play events (think: "song X was played") and a stream of song metadata ("song X was written by artist Y").

You can find detailed documentation at https://docs.confluent.io/current/streams/kafka-streams-examples/docs/index.html.

For additional examples that showcase Kafka Streams applications within an event streaming platform, please refer to the examples GitHub repository.

The code in this repository requires Apache Kafka 0.10+ because from this point onwards Kafka includes its Kafka Streams library. See Version Compatibility Matrix for further details, as different branches of this repository may have different Kafka requirements.

For the master branch: To build a development version, you typically need the latest trunk version of Apache Kafka (cf. kafka.version in pom.xml for details). The following instructions will build and locally install the latest trunk Kafka version:

$ git clone [email protected]:apache/kafka.git
$ cd kafka
$ git checkout trunk

# Now build and install Kafka locally
$ ./gradlew clean && ./gradlewAll install

The code in this repository requires Confluent Schema Registry. See Version Compatibility Matrix for further details, as different branches of this repository have different Confluent Platform requirements.

For the master branch: To build a development version, you typically need the latest master version of Confluent Platform's Schema Registry (cf. confluent.version in pom.xml, which is set by the upstream Confluent Common project). The following instructions will build and locally install the latest master Schema Registry version, which includes building its dependencies such as Confluent Common and Confluent Rest Utils. Please read the Schema Registry README for details.

$ git clone https://github.com/confluentinc/common.git
$ cd common
$ git checkout master

# Build and install common locally
$ mvn -DskipTests=true clean install

$ git clone https://github.com/confluentinc/rest-utils.git
$ cd rest-utils
$ git checkout master

# Build and install rest-utils locally
$ mvn -DskipTests=true clean install

$ git clone https://github.com/confluentinc/schema-registry.git
$ cd schema-registry
$ git checkout master

# Now build and install schema-registry locally
$ mvn -DskipTests=true clean install

Also, each example states its exact requirements at the very top.

If you are using an IDE and import the project you might end up with a "missing import / class not found" error. Some Avro classes are generated from schema files and IDEs sometimes do not generate these classes automatically. To resolve this error, manually run:

$ mvn -Dskip.tests=true compile

If you are using Eclipse, you can also right-click on pom.xml file and choose Run As > Maven generate-sources.

Some code examples require Java 8+, primarily because of the usage of lambda expressions.

IntelliJ IDEA users:

  • Open File > Project structure
  • Select "Project" on the left.
    • Set "Project SDK" to Java 1.8.
    • Set "Project language level" to "8 - Lambdas, type annotations, etc."

Scala is required only for the Scala examples in this repository. If you are a Java developer you can safely ignore this section.

If you want to experiment with the Scala examples in this repository, you need a version of Scala that supports Java 8 and SAM / Java lambda (e.g. Scala 2.11 with -Xexperimental compiler flag, or 2.12).

If you are compiling with Java 9+, you'll need to have Scala version 2.12+ to be compatible with the Java version.

The instructions in this section are only needed if you want to interactively test-drive the application examples under src/main/.

Tip: If you only want to run the integration tests (mvn test), then you do not need to package or install anything -- just run mvn test. These tests launch embedded Kafka clusters.

The first step is to install and run a Kafka cluster, which must consist of at least one Kafka broker as well as at least one ZooKeeper instance. Some examples may also require a running instance of Confluent schema registry. The Confluent Platform Quickstart guide provides the full details.

In a nutshell:

# Ensure you have downloaded and installed Confluent Platform as per the Quickstart instructions above.

# Start ZooKeeper
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

# In a separate terminal, start Kafka broker
$ ./bin/kafka-server-start ./etc/kafka/server.properties

# In a separate terminal, start Confluent Schema Registry
$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

# Again, please refer to the Confluent Platform Quickstart for details such as
# how to download Confluent Platform, how to stop the above three services, etc.

The next step is to create a standalone jar ("fat jar") of the application examples:

# Create a standalone jar ("fat jar")
$ mvn clean package

# >>> Creates target/kafka-streams-examples-7.8.0-0-standalone.jar

Tip: If needed, you can disable the test suite during packaging, for example to speed up the packaging or to lower JVM memory usage:

$ mvn -DskipTests=true clean package

You can now run the application examples as follows:

# Run an example application from the standalone jar. Here: `WordCountLambdaExample`
$ java -cp target/kafka-streams-examples-7.8.0-0-standalone.jar \
  io.confluent.examples.streams.WordCountLambdaExample

The application will try to read from the specified input topic (in the above example it is streams-plaintext-input), execute the processing logic, and then try to write back to the specified output topic (in the above example it is streams-wordcount-output). In order to observe the expected output stream, you will need to start a console producer to send messages into the input topic and start a console consumer to continuously read from the output topic. More details in how to run the examples can be found in the java docs of each example code.

If you want to turn on log4j while running your example application, you can edit the log4j.properties file and then execute as follows:

# Run an example application from the standalone jar. Here: `WordCountLambdaExample`
$ java -cp target/kafka-streams-examples-7.8.0-0-standalone.jar \
  -Dlog4j.configuration=file:src/main/resources/log4j.properties \
  io.confluent.examples.streams.WordCountLambdaExample

Keep in mind that the machine on which you run the command above must have access to the Kafka/ZooKeeper clusters you configured in the code examples. By default, the code examples assume the Kafka cluster is accessible via localhost:9092 (aka Kafka's bootstrap.servers parameter) and the ZooKeeper ensemble via localhost:2181. You can override the default bootstrap.servers parameter through a command line argument.

This project uses the standard maven lifecycle and commands such as:

$ mvn compile # This also generates Java classes from the Avro schemas
$ mvn test    # Runs unit and integration tests
$ mvn package # Packages the application examples into a standalone jar
Branch (this repo) Confluent Platform Apache Kafka
5.4.x* 5.4.0-SNAPSHOT 2.4.0-SNAPSHOT
5.3.0-post 5.3.0 2.3.0
5.2.2-post 5.2.2 2.2.1
5.2.1-post 5.2.1 2.2.1
5.1.0-post 5.1.0 2.1.0
5.0.0-post 5.0.0 2.0.0
4.1.0-post 4.1.0 1.1.0
4.0.0-post 4.0.0 1.0.0
3.3.0-post 3.3.0 0.11.0

*You must manually build the 2.3 version of Apache Kafka and the 5.3.x version of Confluent Platform. See instructions above.

The master branch of this repository represents active development, and may require additional steps on your side to make it compile. Check this README as well as pom.xml for any such information.

License

Usage of this image is subject to the license terms of the software contained within. Please refer to Confluent's Docker images documentation reference for further information. The software to extend and build the custom Docker images is available under the Apache 2.0 License.

kafka-streams-examples's People

Contributors

agavra avatar andrewegel avatar bbejeck avatar benstopford avatar cadonna avatar confluentjenkins avatar confluentsemaphore avatar davetroiano avatar dguy avatar dnozay avatar elismaga avatar ewencp avatar gracechensd avatar guozhangwang avatar jimgalasyn avatar jkao97 avatar jukkakarvanen avatar lct45 avatar londoncalling avatar lucasbru avatar maxzheng avatar miguno avatar mjsax avatar mkandaswamy avatar niteshmor avatar rspurgeon avatar vvcephei avatar xiangxin72 avatar xli1996 avatar ybyzek avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-streams-examples's Issues

Streams App stuck after broker restart

I have a 2 Kafka Streams application. One application listens from say topic1 and produces to topic2 and the other listens from topic2 and produces to topic3. The applications were working fine before the kafka broker went down. The broker came back up but the streams applications have stopped.

Following is the exception of first streams app:

Exception in thread "streams-collection-7cda47bc-a1db-4ad5-a3d4-bd8f8dc85bf4-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=o365_activity_contenturl, partition=0, offset=2151 at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232) at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317) at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key null value {"RecordType":6,"ListId":"affd3b1e-5d16-4e36-b97a-871b755b2b40","Version":1,"SourceFileName":"9617","ClientIP":"94.245.89.59","Workload":"OneDrive","UserType":0} timestamp 1527845926991) to topic o365_user_activity due to org.apache.kafka.common.errors.TimeoutException: Expiring 15 record(s) for topic1-0: 32551 ms has passed since batch creation plus linger time. You can increase producer parameter retriesandretry.backoff.ms to avoid this error. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 15 record(s) for topic1-0: 32551 ms has passed since batch creation plus linger time

Exception of second streams app:
Exception in thread "streams-distribution-bf0d8698-f198-4d91-ad66-f0833b4ef398-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key null value {"item_type":"File","workload":"OneDrive","current_date":"2018-06-01","client_ip":"94.245.89.59"} timestamp 1527845926986) to topic topic3 due to org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for topic3-0: 34706 ms has passed since last attempt plus backoff time. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for topic3-0: 34706 ms has passed since last attempt plus backoff time

To reproduce this I intentionally stopped the kafka broker while streams applications were running and the same exceptions.

Once I restarted the application, it again started processing records from topic.

Why does streams application fails to recover?

Failed to create dir: Invalid argument at Windows

I ran example file EventDeduplicationLambdaIntegrationTest and get following exception at Windows 10.

[2018-01-16 00:38:46,485] WARN [main] [Consumer clientId=consumer-1, groupId=deduplication-integration-test-standard-consumer] Synchronous auto-commit of offsets {outputTopic-0=OffsetAndMetadata{offset=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
Exception in thread "deduplication-lambda-integration-test-219b1b1a-3754-47b1-abe3-7ecdc75dc0f2-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=inputTopic, partition=0, offset=0
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232)
	at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
	at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error opening store eventId-store:1516052100000 at location C:\Users\NKUZNE~1\AppData\Local\Temp\kafka-3711583845759769165\deduplication-lambda-integration-test\0_0\eventId-store\eventId-store:1516052100000
	at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:204)
	at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:174)
	at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40)
	at org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:89)
	at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81)
	at org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:43)
	at org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:34)
	at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:67)
	at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:33)
	at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:96)
	at io.confluent.examples.streams.EventDeduplicationLambdaIntegrationTest$DeduplicationTransformer.rememberNewEvent(EventDeduplicationLambdaIntegrationTest.java:174)
	at io.confluent.examples.streams.EventDeduplicationLambdaIntegrationTest$DeduplicationTransformer.transform(EventDeduplicationLambdaIntegrationTest.java:152)
	at io.confluent.examples.streams.EventDeduplicationLambdaIntegrationTest$DeduplicationTransformer.transform(EventDeduplicationLambdaIntegrationTest.java:100)
	at org.apache.kafka.streams.kstream.internals.KStreamTransform$KStreamTransformProcessor.process(KStreamTransform.java:56)
	at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
	... 6 more
Caused by: org.rocksdb.RocksDBException: Failed to create dir: C:\Users\NKUZNE~1\AppData\Local\Temp\kafka-3711583845759769165\deduplication-lambda-integration-test\0_0\eventId-store\eventId-store:1516052100000: Invalid argument
	at org.rocksdb.RocksDB.open(Native Method)
	at org.rocksdb.RocksDB.open(RocksDB.java:231)
	at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:197)
	... 25 more

How use a store in interactive queries when it has been made by a Process API

In that post :
https://stackoverflow.com/a/50752095/9234107

you might be able to use a "global store" instead of a GlobalKTable. For this case, you can provide a custom Processor that can implement a filter before you populate the global store.

So I initialize my spring bean like this :

@PostConstruct
private void init() {
	configurer();
	producer = new KafkaProducer<>(props);
	journal.info("Open topic {}...", kafkaTopic);
	StreamsBuilder builder = new StreamsBuilder();
	Topology topology = builder.build();
	StoreBuilder<KeyValueStore<String, CustomerAvro>> personStoreSupplier = Stores.keyValueStoreBuilder(
			Stores.persistentKeyValueStore("personProcessStore"), Serdes.String(),
			Serdes.serdeFrom(CustomerAvro.class));
	topology.addSource("person$", kafkaTopic)
			.addProcessor("selection", PersonProcessor::new, "person$")
			.addStateStore(personStoreSupplier, "selection");
	store = personStoreSupplier.build();
	KafkaStreams streams = new KafkaStreams(topology, props);
	streams.start();
}

And my processor is like this

public class PersonProcessor extends AbstractProcessor<String, PersonAvro> {
	private KeyValueStore<String, CustomerAvro> stateStore;

	@Override
	public void process(String key, PersonAvro avroPerson) {
		Optional.ofNullable(avroPerson)
				.filter(person -> Optional.ofNullable(person)
						.map(PersonAvro::getActive)
						.filter(activation -> !activation.matches("0"))
					.isPresent())
			.map(person -> {
				return CustomerAvro.newBuilder()
						.setId(person.getId())
						.setCompName(person.getCompName())
						.setSiretCode(person.getSiretCode())
						.setActive(person.getActive())
						.setAdd3(person.getAdd3())
						.setAdd4(person.getAdd4())
						.setAdd5(person.getAdd5())
						.setAdd6(person.getAdd6())
						.setAdd7(person.getAdd7());
			})
			.map(CustomerAvro.Builder::build)
			.ifPresent(person -> {
				stateStore.put(key, person);
				context().forward(key, person);
				context().commit();
			});
}

@Override
public void init(ProcessorContext context) {
	super.init(context);
	stateStore = (KeyValueStore<String, CustomerAvro>) context.getStateStore("personStore");
}

But how can I use the store in interactive queries ? For exemple to get the state of one customer.

And after i will make a global state but step by step :-)

Thank's

Microservices example fails when build against upcoming AK 2.0

I created a test branch to showcase the problem: https://github.com/confluentinc/kafka-streams-examples/tree/master-KIP270-Orders-Example. (The relevant commit is d7a36aa.) This test branch is based upon the PR branch we use for adding Scala API examples (for upcoming AK 2.0) to this repo, see #109.

Build error:

Tests in error:
  EndToEndTest.shouldCreateNewOrderAndGetBackValidatedOrder:61 » NotFound HTTP 4...
  EndToEndTest.shouldProcessManyInvalidOrdersEndToEnd:122 » NotFound HTTP 404 No...
  EndToEndTest.shouldProcessManyValidOrdersEndToEnd:87 » NotFound HTTP 404 Not F...
  OrdersServiceTest.shouldGetOrderByIdWhenOnDifferentHost:182 » NotFound HTTP 40...
  OrdersServiceTest.shouldGetValidatedOrderOnRequest:131 » NotFound HTTP 404 Not...
  OrdersServiceTest.shouldPostOrderAndGetItBack:86 » NotFound HTTP 404 Not Found

@benstopford Could you take a look at the test failures above? I don't have the time unfortunately to finish #109 and to also fix/update the microservices example.

Could not find class io.confluent.connect.avro.ConnectDefault

I am using a jdbc source connector with query mode. It seems that without a specified table name, the schemas registered in the schema-registry for the record key and record value have empty schema names and are being assigned the default name "ConnectDefault" as defined in Confluent's AvroData class https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroData.java

When running a Kafka Streams application using generated avro sources and SpecificAvroSerde, I am getting the error:

Exception in thread "streams-app-6e39ebfd-db14-49bc-834f-afaf108a6d25-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=topic-name, partition=0, offset=0
  at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46)
  at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
  at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
  at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
  at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
  at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
  at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 2
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class io.confluent.connect.avro.ConnectDefault specified in writer's schema whilst finding reader's schema for a SpecificRecord.

When running a sink connector reading from the topic there are no issues. The exception is only caused in the streams layer with the deserialization.

Am I missing a class named ConnectDefault or can I specify somewhere in the source connector a schema name with a different namespace?

Filter Predicate not gettting called

KStreams.zip
My filter predicate is not working.

KStream<String, GenericRecord> table1 = builder.stream("new-input-topic");
KStream<String, GenericRecord> table2=table1.map(new KeyValueMapper<String, GenericRecord, KeyValue<String, GenericRecord>>() {
		@Override
		public KeyValue<String, GenericRecord> apply(final String dummy, final GenericRecord record) {
			return new KeyValue<>(record.get("ip").toString(), record);
		}
	});
// Prints all values . So the stream is reading from the partion
table2.print();
KStream<String, GenericRecord> table3=table2.filter(new Predicate<String, GenericRecord>() {
	@Override
	public boolean test(String arg0, GenericRecord arg1) {
		// TODO Auto-generated method stub
		return arg1.get("switch").equals("77");
	}
});
	
table3.to("new-output-topic", Produced.with(stringSerde, genericAvroSerde));

Nothing is been dumped to table3. Complete code is attached

How to ensure exactly-once in kafka streams?

Is there any way to auto commit in kafka streams's processor?

When kill -9 or stream application abnormal shutdown, messages have not been commit.
Restart stream application will poll duplicate messages.

Kafka streams only guarantee At-least-once?

@Override
public void punctuate(long timestamp) {
          System.out.println("Partition = " + context.partition() + " offset = " + context.offset());
          context.commit();
}

Time window 24 hour

I have a problem , I want to aggregate in a 24 hour time window in a Kafka Streams application, it works ok, except that the aggregation intevale becomes 02-02 instead of 24-24 (midnight to midnight).

Is there any settings about the timezone in the Kafka Streams api? the server is running whith the following datetime settings:

Local time: tor 2018-08-09 08:29:55 CEST
Universal time: tor 2018-08-09 06:29:55 UTC
RTC time: tor 2018-08-09 06:29:55
Time zone: Europe/Copenhagen (CEST, +0200)
Network time on: yes
NTP synchronized: yes
RTC in local TZ: no

Any hints or help will be appreciated

Could KStream/KTable be used consecutively?

For example as the pseudocode:
(1) streamA=builder.stream(topicB)
(2) streamAaggr = streamA.aggregate(...)
(3) streamAaggr->to(topicB)
so far so good, and consecutively generating streams based on previous topic, like
(4) streamB=builder.stream(topicB)

Seems the streamB is empty. I have set the commit interval as 1s, and generate streamB afte sleeping 10s.

Any clue for this issue? Thanks

Kafka Map Example does not compile

[ERROR] /home/ubuntu/kafka_streams/pearlai.tests/src/main/java/apps/MapTest.java:[79]
[ERROR] final KStream<byte[], String> uppercasedWithMap = textLines.map((key, value) -> new KeyValue<>(key, value.toUpperCase()));
[ERROR] ^^^^^^^^
[ERROR] KeyValue cannot be resolved to a type
[ERROR] /home/ubuntu/kafka_streams/pearlai.tests/src/main/java/apps/MapTest.java:[82]
[ERROR] final KStream<String, String> originalAndUpper = textLines.map((key, value) -> KeyValue.pair(value, value.toUpperCase()));
[ERROR] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[ERROR] Type mismatch: cannot convert from KStream<Object,Object> to KStream<String,String>
[ERROR] /home/ubuntu/kafka_streams/pearlai.tests/src/main/java/apps/MapTest.java:[82]
[ERROR] final KStream<String, String> originalAndUpper = textLines.map((key, value) -> KeyValue.pair(value, value.toUpperCase()));
[ERROR] ^^^^^^^^
[ERROR] KeyValue cannot be resolved
[ERROR] 3 problems (3 errors)

New to Java and Kafka, does not make learning it fun.

Also the examples need updated to show the shutdown thread hook.

APPLICATION_SERVER_CONFIG is always pointing to 0.0.0.0

I am trying to execute a stream processing application with the following configuration,
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "interactive-queries-example"); streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "interactive-queries-example-client"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // Provide the details of our embedded http service that we'll use to connect to this streams // instance and discover locations of stores. streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "127.0.01:7070"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/test"); // standby replicas for shadow copy streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

But it always gives the application host as 0.0.00, you may see in the logs below:

[2018-06-18 18:18:01,025] INFO Started o.e.j.s.ServletContextHandler@faa3fed{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744) [2018-06-18 18:18:01,031] INFO Started ServerConnector@aa21042{HTTP/1.1}{0.0.0.0:7070} (org.eclipse.jetty.server.ServerConnector:266) [2018-06-18 18:18:01,031] INFO Started @3495ms (org.eclipse.jetty.server.Server:379)

I have tried updating APPLICATION_SERVER_CONFIG to point to an external ip, but it always keeps pointing to 0.0.0.0 . Can anyone please help me in understanding where is the mistake?

kafka streams - memory issue

hello my java program uses about 700MB of memory, and i don't quite understand why, i don't use KTables just 2 KStreams, I deserialize to JsonNode
This 2 streams have 4k messages each, but after they are processed in 10-20 seconds, nothing happens, and the memory still grows, cache is disabled,
The only process i do, is to merge those two stream into another topic and change their key
here is my code

streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, clientId);
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, TimeUnit.SECONDS.toMillis(5));
        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

        final Serde<JsonNode> valSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());
        final KStreamBuilder builder = new KStreamBuilder();

        builder.stream(Serdes.String(), valSerde, CONTRACT_CREATED)
                .map((key, value) -> {
                    int x = Integer.parseInt(key);
                    int y = value.get("SiteId").asInt();
                    long l = (((long)x) << 32) | (y & 0xffffffffL);
                    return KeyValue.pair(l, value);
                } )
                .to(Serdes.Long(), valSerde, PORTAL_CONTRACTS);

        builder.stream(Serdes.String(), valSerde, PAYMENT_SCHEDULE_UPDATED)
                .map((key, value) -> {
                    int x = Integer.parseInt(key);
                    int y = value.get("ContractSiteId").asInt();
                    long l = (((long)x) << 32) | (y & 0xffffffffL);
                    return KeyValue.pair(l, value);
                } )
                .to(Serdes.Long(), valSerde, PORTAL_CONTRACTS);

        final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
        streams.cleanUp();
        streams.start();

do i have to do something special to free up memory, ?
thanks

Error for Aggregator : [B cannot be cast to java.lang.String]

I tried with Streaming Aggregator to sum up a stream value.
However, I always got the error "Exception in thread "kafkastream_client_id-StreamThread-1" java.lang.ClassCastException: [B cannot be cast to java.lang.String" for new Aggregator<String, Trade, TradeStatus>()

A snapshot of the codes is:
KTable<String, TradeStatus> positions = tradeKStream.groupByKey()
                .aggregate(
                      new Initializer<Status>() {
                            @Override
                            public TradeStatus apply() {
                                return new TradeStatus();
                            }
                        },
                        new Aggregator<String, Trade, TradeStatus>() {
                            @Override
                            public TradeStatus apply(String key, Trade value, TradeStatus aggregate) {
                                aggregate.totalsize += value.size();
                                return aggregate;
                            }
                        });

Error trying to compile examples project: scala.reflect.internal.MissingRequirementError

Hi,

I just cloned the kafka-streams-examples project and am trying to compile it but getting this error:

scala.reflect.internal.MissingRequirementError

I noticed that I had some missing files, as described in this issue, so tried following the steps there, but it made no difference.

Here are the steps I took:

$ git clone https://github.com/confluentinc/kafka-streams-examples.git .

At this point I checked what branch I was on, as mentioned in issue 144:

$ git branch
* 5.0.0-post

Then I ran this:

$ mvn clean install -Dmaven.test.skip=true
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by com.google.inject.internal.cglib.core.$ReflectUtils$1 (file:/usr/share/maven/lib/guice.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain)
WARNING: Please consider reporting this to the maintainers of com.google.inject.internal.cglib.core.$ReflectUtils$1
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] Detecting the operating system and CPU architecture
[INFO] ------------------------------------------------------------------------
[INFO] os.detected.name: linux
[INFO] os.detected.arch: x86_64
[INFO] os.detected.version: 4.15
[INFO] os.detected.version.major: 4
[INFO] os.detected.version.minor: 15
[INFO] os.detected.release: ubuntu
[INFO] os.detected.release.version: 18.04
[INFO] os.detected.release.like.ubuntu: true
[INFO] os.detected.release.like.debian: true
[INFO] os.detected.classifier: linux-x86_64
[INFO] 
[INFO] ------------------------------------------------------------------------
[INFO] Building kafka-streams-examples 5.0.0
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-clean-plugin:3.0.0:clean (default-clean) @ kafka-streams-examples ---
[INFO] 
[INFO] --- maven-checkstyle-plugin:2.17:check (validate) @ kafka-streams-examples ---
[INFO] 
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-versions) @ kafka-streams-examples ---
[INFO] 
[INFO] --- build-helper-maven-plugin:1.10:add-source (add-source) @ kafka-streams-examples ---
[INFO] Source directory: /home/yoni/work/github/confluentinc/kafka-streams-examples-2/src/main/scala added.
[INFO] Source directory: /home/yoni/work/github/confluentinc/kafka-streams-examples-2/target/generated-sources added.
[INFO] 
[INFO] --- avro-maven-plugin:1.8.2:schema (default) @ kafka-streams-examples ---
[INFO] 
[INFO] --- maven-resources-plugin:3.0.2:resources (default-resources) @ kafka-streams-examples ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 15 resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.3:compile (default-compile) @ kafka-streams-examples ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 68 source files to /home/yoni/work/github/confluentinc/kafka-streams-examples-2/target/classes
[INFO] /home/yoni/work/github/confluentinc/kafka-streams-examples-2/src/main/java/io/confluent/examples/streams/SumLambdaExampleDriver.java: Some input files use or override a deprecated API.
[INFO] /home/yoni/work/github/confluentinc/kafka-streams-examples-2/src/main/java/io/confluent/examples/streams/SumLambdaExampleDriver.java: Recompile with -Xlint:deprecation for details.
[INFO] /home/yoni/work/github/confluentinc/kafka-streams-examples-2/src/main/java/io/confluent/examples/streams/microservices/domain/Schemas.java: Some input files use unchecked or unsafe operations.
[INFO] /home/yoni/work/github/confluentinc/kafka-streams-examples-2/src/main/java/io/confluent/examples/streams/microservices/domain/Schemas.java: Recompile with -Xlint:unchecked for details.
[INFO] 
[INFO] --- scala-maven-plugin:3.2.1:compile (default) @ kafka-streams-examples ---
[INFO] /home/yoni/work/github/confluentinc/kafka-streams-examples-2/src/main/java:-1: info: compiling
[INFO] /home/yoni/work/github/confluentinc/kafka-streams-examples-2/src/main/scala:-1: info: compiling
[INFO] /home/yoni/work/github/confluentinc/kafka-streams-examples-2/target/generated-sources:-1: info: compiling
[INFO] /home/yoni/work/github/confluentinc/kafka-streams-examples-2/target/generated-sources/annotations:-1: info: compiling
[INFO] Compiling 75 source files to /home/yoni/work/github/confluentinc/kafka-streams-examples-2/target/classes at 1536317738525
[ERROR] error: scala.reflect.internal.MissingRequirementError: object java.lang.Object in compiler mirror not found.
[ERROR]         at scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:17)
[ERROR]         at scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:18)
[ERROR]         at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:53)
[ERROR]         at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:45)
[ERROR]         at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:45)
[ERROR]         at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:66)
[ERROR]         at scala.reflect.internal.Mirrors$RootsBase.getClassByName(Mirrors.scala:102)
[ERROR]         at scala.reflect.internal.Mirrors$RootsBase.getRequiredClass(Mirrors.scala:105)
[ERROR]         at scala.reflect.internal.Definitions$DefinitionsClass.ObjectClass$lzycompute(Definitions.scala:257)
[ERROR]         at scala.reflect.internal.Definitions$DefinitionsClass.ObjectClass(Definitions.scala:257)
[ERROR]         at scala.reflect.internal.Definitions$DefinitionsClass.init(Definitions.scala:1394)
[ERROR]         at scala.tools.nsc.Global$Run.<init>(Global.scala:1215)
[ERROR]         at scala.tools.nsc.Driver.doCompile(Driver.scala:31)
[ERROR]         at scala.tools.nsc.MainClass.doCompile(Main.scala:23)
[ERROR]         at scala.tools.nsc.Driver.process(Driver.scala:51)
[ERROR]         at scala.tools.nsc.Driver.main(Driver.scala:64)
[ERROR]         at scala.tools.nsc.Main.main(Main.scala)
[ERROR]         at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[ERROR]         at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[ERROR]         at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[ERROR]         at java.base/java.lang.reflect.Method.invoke(Method.java:564)
[ERROR]         at scala_maven_executions.MainHelper.runMain(MainHelper.java:164)
[ERROR]         at scala_maven_executions.MainWithArgsInFile.main(MainWithArgsInFile.java:26)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 6.386 s
[INFO] Finished at: 2018-09-07T11:55:39+01:00
[INFO] Final Memory: 44M/154M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.1:compile (default) on project kafka-streams-examples: wrap: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException

In case it's of any use, here's two other bits of info:

$ java -version
openjdk version "10.0.2" 2018-07-17
OpenJDK Runtime Environment (build 10.0.2+13-Ubuntu-1ubuntu0.18.04.1)
OpenJDK 64-Bit Server VM (build 10.0.2+13-Ubuntu-1ubuntu0.18.04.1, mixed mode)
$ scala -version
Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL

Thanks.

docker-compose up fails! Missing docker image for 4.0.0

v4.0.0 (FAILS)

I'm trying to follow the guide to running the demo application:
https://docs.confluent.io/current/streams/kafka-streams-examples/docs/index.html

Running the services as in the example:

docker-compose up -d
...
Digest: sha256:164ea53ad8e42c4cc5ab8ae7269d1f4b455e333c2ba2971d8279e3840930b64f
Status: Downloaded newer image for confluentinc/cp-schema-registry:latest
Pulling kafka-music-data-generator (confluentinc/kafka-streams-examples:4.0.0-SNAPSHOT)...
ERROR: manifest for confluentinc/kafka-streams-examples:4.0.0-SNAPSHOT not found

Looking at your docker repository, there is no 4.0.0-SNAPSHOT tag.
https://hub.docker.com/r/confluentinc/cp-kafka-streams-examples/tags/

This means checking out this repository as is will not work.

Also attempting to build this image myself locally using the Dockerfile in the repo is non-trivial, since it uses a bunch of ARG and ENV statements I have no idea what I'm supposed to set them to:

LABEL io.confluent.docker=true
ARG COMMIT_ID=unknown
LABEL io.confluent.docker.git.id=$COMMIT_ID
ARG BUILD_NUMBER=-1
LABEL io.confluent.docker.build.number=$BUILD_NUMBER
WORKDIR /build
ENV COMPONENT="${ARTIFACT_ID}"
# We run the Kafka Streams demo application as a non-priviledged user.
ENV STREAMS_USER="streams"
ENV STREAMS_GROUP=$STREAMS_USER
ENV STREAMS_EXAMPLES_BRANCH="${CONFLUENT_MAJOR_VERSION}.${CONFLUENT_MINOR_VERSION}.x"
ENV STREAMS_EXAMPLES_FATJAR="kafka-streams-examples-${STREAMS_VERSION}-standalone.jar"
ENV STREAMS_APP_DIRECTORY="/usr/share/java/kafka-streams-examples"
ENV STREAMS_EXAMPLES_FATJAR_DEPLOYED="$STREAMS_APP_DIRECTORY/$STREAMS_EXAMPLES_FATJAR"
ENV KAFKA_MUSIC_APP_CLASS="io.confluent.examples.streams.interactivequeries.kafkamusic.KafkaMusicExample"
ENV KAFKA_MUSIC_APP_REST_HOST=localhost
ENV KAFKA_MUSIC_APP_REST_PORT=7070
EXPOSE $KAFKA_MUSIC_APP_REST_PORT

And I could find no instructions on this.

v3.3.1 (FAILS)

I checkout out v3.3.1 and that fails as well:

Pulling kafka-music-data-generator (confluentinc/kafka-streams-examples:3.3.1-SNAPSHOT)...
ERROR: manifest for confluentinc/kafka-streams-examples:3.3.1-SNAPSHOT not found

Older versions

The latest version on docker hub is v3.2.2. And this is the tags found in this repo:

$ git tag
v3.3.0-1
v3.3.1
v3.3.1-rc0
v3.3.1-rc2
v3.3.1-rc3
v3.3.1-rc4
v3.3.1-rc5
v4.0.0
v4.0.0-rc0
v4.0.0-rc1
v4.0.0-rc2
v4.0.0-rc3

So there is no way to run this "out of the box"

java.lang.AbstractMethodError: javax.ws.rs.core.UriBuilder.uri(Ljava/lang/String;)Ljavax/ws/rs/core/UriBuilder;

I am using eclipse IDE to run KafkaMusicExample:
https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java

I got the following error at 5th step:

5) Use your browser to hit the REST endpoint of the app instance you started in step 3 to query the state managed by this application.

example:
List all running instances of this application
http://localhost:7070/kafka-music/instances

[2018-03-21 15:54:39,472] WARN Error for /kafka-music/instances (org.eclipse.jetty.servlet.ServletHandler:667)
java.lang.AbstractMethodError: javax.ws.rs.core.UriBuilder.uri(Ljava/lang/String;)Ljavax/ws/rs/core/UriBuilder;
	at javax.ws.rs.core.UriBuilder.fromUri(UriBuilder.java:119)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:291)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:221)
	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:808)
	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
	at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
	at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
	at org.eclipse.jetty.server.Server.handle(Server.java:499)
	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
	at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
	at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
	at java.lang.Thread.run(Unknown Source)

Missing container?

I get the following error when trying to run docker-compose up -d

Pulling kafka-music-data-generator (confluentinc/kafka-streams-examples:4.0.0-SNAPSHOT)...
ERROR: manifest for confluentinc/kafka-streams-examples:4.0.0-SNAPSHOT not found

Exception in thread ... java.lang.ClassCastException: [B cannot be cast to java.base/java.lang.Long

Hi, I get this exception and I can't seem to find a solution. This is basically the same as the examples hosted here, instead the names are different.

Code:

var sb = new StreamsBuilder();

var personProfilesStream = sb.stream("trial-person-profile", Consumed.with(new DataSerde<Long>(), new DataSerde<PersonProfile>()));
var personProfilesTable = personProfilesStream
        .groupBy((key, value) -> value.Id, new DataSerde<>(), new DataSerde<>())
        .reduce((x, y) -> (x.Timestamp > y.Timestamp)? x : y, Materialized.as("store-person-profiles-table"));

var kStreams = new KafkaStreams(sb.build(), streamsConfiguration);
kStreams.start();

DataSerde is a custom Serde that deal with JSON.

Exception:

Exception in thread "trial-streamer-app-044e9f28-dad2-4cdf-a34e-e6f2d08af950-StreamThread-1" java.lang.ClassCastException: [B cannot be cast to java.base/java.lang.Long
at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:157)
at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:154)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)

Kafka version
1.1.0 built with 2.11 Scala

Libraries versions
image

Make a state store with AVRO

In that post :
https://stackoverflow.com/a/50752095/9234107

you might be able to use a "global store" instead of a GlobalKTable. For this case, you can provide a custom Processor that can implement a filter before you populate the global store.

I try to make a store like this :

StoreBuilder<KeyValueStore<String, CustomerAvro>> personStoreSupplier = Stores.keyValueStoreBuilder(
		Stores.persistentKeyValueStore("personProcessStore"), Serdes.String(),
		Serdes.serdeFrom(CustomerAvro.class));

But a got that error :

Caused by: java.lang.IllegalArgumentException: Unknown class for built-in serializer. Supported types are: String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes

How can I make a store with Avro objects ?

Inner join by key where key is GenericRecord

Hi everyone,
Is it possible to join records where both records from topic A and B for instance has a key as GenericRecord type?
It's clear for primitive types, Kafka doing the join and trigger the ValueJoiner but if my topics has key as GenericType nothing happens.
Of course I can apply selectKey to select some concrete field from GenericRecord but in my case I need to do some boolean expression evaluating before to join records.
Maybe some how I can override the equals method or it is wrong way?
I still don't understand how Kafka join 2 records from both topics, tried to debug and I didn't found the place where kafka doing something like keyFromttopic1 == keyFromTopic2 e.t.c.

How to join two Kafka streams and produce the result in a topic with Avro values

I've got two Kafka Streams with keys in String and values in Avro format which I have created using KSQL.

Here's the first one:

DESCRIBE EXTENDED STREAM_1; 
Type                 : STREAM
Key field            : IDUSER
Timestamp field      : Not set - using <ROWTIME>
Key format           : STRING
Value format         : AVRO
Kafka output topic   : STREAM_1 (partitions: 4, replication: 1)

 Field                      | Type
--------------------------------------------------------
 ROWTIME                    | BIGINT           (system)
 ROWKEY                     | VARCHAR(STRING)  (system)
 FIRSTNAME                  | VARCHAR(STRING)
 LASTNAME                   | VARCHAR(STRING)
 IDUSER                     | VARCHAR(STRING)

and the second one:

DESCRIBE EXTENDED STREAM_2;
Type                 : STREAM
Key field            : IDUSER
Timestamp field      : Not set - using <ROWTIME>
Key format           : STRING
Value format         : AVRO
Kafka output topic   : STREAM_2 (partitions: 4, replication: 1)

 Field                      | Type
--------------------------------------------------------
 ROWTIME                    | BIGINT           (system)
 ROWKEY                     | VARCHAR(STRING)  (system)
 USERNAME                   | VARCHAR(STRING)
 IDUSER                     | VARCHAR(STRING)

The desired output should include IDUSER, LASTNAME and USERNAME.

I want to left join these streams (on IDUSER) using Streams API and write the output into a kafka topic in AVRO format.

To do so, I've tried the following:

public static void main(String[] args) {

    final Properties streamsConfiguration = new Properties();

    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-strteams");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
    streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

    streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    final Serde<String> stringSerde = Serdes.String();
    final Serde<GenericRecord> genericAvroSerde = new GenericAvroSerde();


    boolean isKeySerde = false;
    genericAvroSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), isKeySerde);

    KStreamBuilder builder = new KStreamBuilder();

    KStream<String, GenericRecord> left = builder.stream("STREAM_1");
    KStrea,<String, GenericRecord> right = builder.stram("STREAM_2");

    // Java 8+ example, using lambda expressions
    KStream<String, GenericRecord> joined = left.leftJoin(right,
        (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
        JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
        Joined.with(
          stringSerde, /* key */
          genericAvroSerde,   /* left value */
          genericAvroSerde)  /* right value */
      );
    joined.to(stringSerde, genericAvroSerde, "streams-output-testing");

    KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
    streams.cleanUp();
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}

However,

KStream<String, GenericRecord> joined = ...

throws an error on my IDE:

incompatible types: inference variable VR has incompatible bounds

When I try to use a String Serde for both keys and values, it works but the data is not that readable from kafka-console-consumer. What I want to do is to produce the data in AVRO format in order to be able to read them off using kafka-avro-console-consumer

Any suggestions?

Thanks!

java.lang.UnsatisfiedLinkError librocksdbjni8410746186306597530.dll: A dynamic link library (DLL) initialization routine failed

I am using intellij to run WordCountLambdaIntegrationTest, it fails with below error

[2018-01-03 00:06:16,540] WARN [wordcount-lambda-integration-test-822ae7d4-3f3c-4f39-b919-37ea5337d8c3-StreamThread-1] stream-client [wordcount-lambda-integration-test-822ae7d4-3f3c-4f39-b919-37ea5337d8c3]All stream threads have died. The instance will be in error state and should be closed. (org.apache.kafka.streams.KafkaStreams)
Exception in thread "wordcount-lambda-integration-test-822ae7d4-3f3c-4f39-b919-37ea5337d8c3-StreamThread-1" java.lang.UnsatisfiedLinkError: C:\Users\user\AppData\Local\Temp\librocksdbjni8410746186306597530.dll: A dynamic link library (DLL) initialization routine failed

Exception in thread "kafka-music-charts-53af5ee5-082b-419b-93fd-5efbc5c23e29-StreamThread-1" java.lang.ExceptionInInitializerError at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64).

I am using eclipse to run KafkaMusicExample,the below RuntimeException occured while running:

Exception in thread "kafka-music-charts-53af5ee5-082b-419b-93fd-5efbc5c23e29-StreamThread-1" java.lang.ExceptionInInitializerError
	at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
	at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35)
	at org.rocksdb.Options.<clinit>(Options.java:25)
	at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:128)
	at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:185)
	at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:59)
	at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.init(InnerMeteredKeyValueStore.java:160)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.init(MeteredKeyValueBytesStore.java:104)
	at org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:225)
	at org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:163)
	at org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:122)
	at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:260)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:813)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: java.lang.UnsupportedOperationException: Cannot determine JNI library name for ARCH='x86' OS='windows 10' name='rocksdb'
	at org.rocksdb.util.Environment.getJniLibraryName(Environment.java:66)
	at org.rocksdb.NativeLibraryLoader.<clinit>(NativeLibraryLoader.java:19)
	... 16 more

I am using 5.9.2 version of rocksdbjni on windows 10, 64-bit machine.

OrderDetailsService Consumer re-processes the last commited record on restart

Steps to reproduce.

  • start OrderService
    java -cp target/kafka-streams-examples-4.0.0-standalone.jar io.confluent.examples.streams.microservices.OrderService
  • start OrderDetailsService
    java -cp target/kafka-streams-examples-4.0.0-standalone.jar io.confluent.examples.streams.microservices.OrderDetailsService

Post Order (valid one with id=17) via http OrderService endpoint
You can see in logs that that OrderDetailsService consumer correctly processes the order and producer inserts it in order-validated topic (within the same transaction)

[2017-11-30 15:05:04,633] DEBUG [pool-1-thread-1] process order 17 and order state CREATED (io.confluent.examples.streams.microservices.OrderDetailsService)
[2017-11-30 15:05:04,636] DEBUG [pool-1-thread-1] produced validation record for order 17, status PASS (io.confluent.examples.streams.microservices.OrderDetailsService)

Kill OrderDetailsService

^C
[2017-11-30 15:05:16,705] INFO [Thread-0] stopping service; running false (io.confluent.examples.streams.microservices.OrderDetailsService)
[2017-11-30 15:05:17,709] INFO [Thread-0] OrderDetailsService was stopped (io.confluent.examples.streams.microservices.OrderDetailsService)

and start it again. You will see that the order 17 is reprocessed twice (for order CREATED and VALIDATED).

java -cp target/kafka-streams-examples-4.0.0-standalone.jar io.confluent.examples.streams.microservices.OrderDetailsService
[2017-11-30 15:05:19,387] INFO [main] Connecting to Kafka cluster via bootstrap servers localhost:9092 (io.confluent.examples.streams.microservices.util.MicroserviceUtils)
[2017-11-30 15:05:19,387] INFO [main] Connecting to Confluent schema registry at http://localhost:8081 (io.confluent.examples.streams.microservices.util.MicroserviceUtils)
[2017-11-30 15:05:19,895] DEBUG [pool-1-thread-1] Started OrderDetailsService consumer (io.confluent.examples.streams.microservices.OrderDetailsService)
[2017-11-30 15:05:19,922] DEBUG [pool-1-thread-1] startService : producer and consumer, running true (io.confluent.examples.streams.microservices.OrderDetailsService)
[2017-11-30 15:05:20,442] DEBUG [pool-1-thread-1] process order 17 and order state CREATED (io.confluent.examples.streams.microservices.OrderDetailsService)
[2017-11-30 15:05:20,502] DEBUG [pool-1-thread-1] produced validation record for order 17, status PASS (io.confluent.examples.streams.microservices.OrderDetailsService)
[2017-11-30 15:05:20,798] DEBUG [pool-1-thread-1] process order 17 and order state VALIDATED (io.confluent.examples.streams.microservices.OrderDetailsService)

Kill again OrderDetailsService

^C[2017-11-30 15:06:01,413] INFO [Thread-0] stopping service; running false (io.confluent.examples.streams.microservices.OrderDetailsService)
[2017-11-30 15:06:02,415] INFO [Thread-0] OrderDetailsService was stopped (io.confluent.examples.streams.microservices.OrderDetailsService)

And start it again. You will see that the order 17 is reprocessed three times (for order CREATED and 2 times for order VALIDATED).

localhost:order-microservices agaton$ java -cp target/kafka-streams-examples-4.0.0-standalone.jar io.confluent.examples.streams.microservices.OrderDetailsService
[2017-11-30 15:06:03,695] INFO [main] Connecting to Kafka cluster via bootstrap servers localhost:9092 (io.confluent.examples.streams.microservices.util.MicroserviceUtils)
[2017-11-30 15:06:03,696] INFO [main] Connecting to Confluent schema registry at http://localhost:8081 (io.confluent.examples.streams.microservices.util.MicroserviceUtils)
[2017-11-30 15:06:04,202] DEBUG [pool-1-thread-1] Started OrderDetailsService consumer (io.confluent.examples.streams.microservices.OrderDetailsService)
[2017-11-30 15:06:04,230] DEBUG [pool-1-thread-1] startService : producer and consumer, running true (io.confluent.examples.streams.microservices.OrderDetailsService)
[2017-11-30 15:06:04,743] DEBUG [pool-1-thread-1] process order 17 and order state CREATED (io.confluent.examples.streams.microservices.OrderDetailsService)
[2017-11-30 15:06:04,802] DEBUG [pool-1-thread-1] produced validation record for order 17, status PASS (io.confluent.examples.streams.microservices.OrderDetailsService)
[2017-11-30 15:06:04,802] DEBUG [pool-1-thread-1] process order 17 and order state VALIDATED (io.confluent.examples.streams.microservices.OrderDetailsService)
[2017-11-30 15:06:05,136] DEBUG [pool-1-thread-1] process order 17 and order state VALIDATED (io.confluent.examples.streams.microservices.OrderDetailsService)

Really don't understand what is going on.
Any idea?

Enable Travis CI integration

Once this new repo is made public, we need to enable Travis CI integration under https://travis-ci.org (we can't enable it before it is public because we don't use the commercial Travis CI offering).

To do this we need to flip the "enabled yes/no" switch for confluentinc/kafka-streams-examples under https://travis-ci.org/profile/miguno (replace "miguno" with your Travis CI username). I already enabled the Travis CI integration on the side of GitHub under https://github.com/confluentinc/kafka-streams-examples/settings/installations from what I can tell by using the settings from examples.

java.lang.ClassCastException: org.apache.avro.util.Utf8 cannot be cast to java.lang.String

Please help with this:

Exception in thread "kafka-music-charts-c9fcbacd-94bf-4e58-9a04-33fc0cce6b2c-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [2_0] Failed to flush state store song-play-count
	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245)
	at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
	at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:327)
	at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:307)
	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
	at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:302)
	at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:292)
	at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
	at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:452)
	at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:381)
	at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:310)
	at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: java.lang.ClassCastException: org.apache.avro.util.Utf8 cannot be cast to java.lang.String
	at com.data.music.KafkaMusicExample.lambda$5(KafkaMusicExample.java:238)
	at org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:84)
	at org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:72)
	at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
	at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:40)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:35)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:79)
	at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
	at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
	at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:112)
	at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:268)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:153)
	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
	... 14 more

Thank you.

[AnomalyDetectionLambdaExample] how to ingest message into topic when count > 3 ?

Hi
I need something similar to the result of this code, however, I would need to have a simple JSON message like {timestamp:"ts", anomaly:"Anomalous user"} ingested to the topic instead of the result of the KStream introduced by the following line:

anomalousUsersForConsole.to("AnomalousUsers", Produced.with(stringSerde, longSerde));

Should I use KStream transform() or is there an easier way to do it?

Race condition in streams/microservices/OrdersService.java

In the function fetchLocal it's being checked if the order is in the store: if not a callback is being put in the outstandingRequests hashmap. in this way when a message arrive the maybeCompleteLongPollGet can invoke the callback and complete the function.

However after testing the presence of the order in the store (line 200) the message related to that order might have arrived in the meanwhile, but the callback is not yet int the hasmap, so the response is never completed.

RocksDBStore.openDB(..): Error opening store

Hello,

I have tried various examples from the branch 3.3 with complete success, created running poc application. Unfortunately, after I switched to v4.1 (and v4.0) I am unable to run aggregation based examples (SessionWindowsExampleDriver produces input data, but SessionWindowsExample fails with exc). Running on win, IDEA. Please, see error right below. Bug or environment issue?

"C:\Program Files\Java\jdk1.8.0_144\bin\java" <some args> Connected to the target VM, address: '127.0.0.1:58840', transport: 'socket' SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Exception in thread "session-windows-example-client-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: Error opening store play-events-per-session:1513036800000 at location \tmp\kafka-streams\session-windows-example\0_2\play-events-per-session\play-events-per-session:1513036800000 at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:204) at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:174) at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40) at org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:89) at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81) at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore$1.restore(RocksDBSegmentedBytesStore.java:113) at org.apache.kafka.streams.processor.internals.WrappedBatchingStateRestoreCallback.restore(WrappedBatchingStateRestoreCallback.java:44) at org.apache.kafka.streams.processor.internals.WrappedBatchingStateRestoreCallback.restoreAll(WrappedBatchingStateRestoreCallback.java:37) at org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreAll(CompositeRestoreListener.java:89) at org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:75) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:277) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:238) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) Caused by: org.rocksdb.RocksDBException: Failed to create dir: C:\tmp\kafka-streams\session-windows-example\0_2\play-events-per-session\play-events-per-session:1513036800000: Invalid argument at org.rocksdb.RocksDB.open(Native Method) at org.rocksdb.RocksDB.open(RocksDB.java:231) at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:197)

Missing imports

The following imports are missing

import io.confluent.examples.streams.avro.microservices.Order;
import io.confluent.examples.streams.avro.microservices.OrderState;

Kafka Connect distributed and Connector failures

I see there are many drawbacks in the way connectors are run within the distributed connect framework some of which have already been mentioned in KIP 304. That said, there needs to be additional control within a connector to be able to

  1. exit upon certain conditions (unload itself from the worker) - currently the only workaround is to use REST API
  2. produce connector specific log files
  3. command line interface for workers to launch (similar to how it works for standalone mode) without the need for REST interfaces.

missing import

The following imports are missing :

import io.confluent.examples.streams.avro.PlayEvent

Microservices example: Add jersey-hk2 dependency to pom

In 5.0.x:

Trying to piece out the microservices example and run OrdersService standalone (outside of io.confluent.examples.streams.microservices.EndToEndTest):

$ mvn exec:java -Dexec.mainClass=io.confluent.examples.streams.microservices.OrdersService

However it fails:

Caused by: java.lang.RuntimeException: javax.servlet.ServletException: org.glassfish.jersey.servlet.ServletContainer-f71f946@e6d289c7==org.glassfish.jersey.servlet.ServletContainer,jsp=null,order=-1,inst=false
    at io.confluent.examples.streams.microservices.util.MicroserviceUtils.startJetty(MicroserviceUtils.java:154)
    at io.confluent.examples.streams.microservices.OrdersService.start(OrdersService.java:314)
    at io.confluent.examples.streams.microservices.OrdersService.main(OrdersService.java:396)
    ... 6 more
Caused by: javax.servlet.ServletException: org.glassfish.jersey.servlet.ServletContainer-f71f946@e6d289c7==org.glassfish.jersey.servlet.ServletContainer,jsp=null,order=-1,inst=false
    at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:691)
    at org.eclipse.jetty.servlet.ServletHolder.initialize(ServletHolder.java:427)
    at org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:760)
    at org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:374)
    at org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:785)
    at org.eclipse.jetty.servlet.ServletContextHandler.doStart(ServletContextHandler.java:287)
    at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
    at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:138)
    at org.eclipse.jetty.server.Server.start(Server.java:419)
    at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:108)
    at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:113)
    at org.eclipse.jetty.server.Server.doStart(Server.java:386)
    at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
    at io.confluent.examples.streams.microservices.util.MicroserviceUtils.startJetty(MicroserviceUtils.java:152)
    ... 8 more
Caused by: java.lang.IllegalStateException: InjectionManagerFactory not found.
    at org.glassfish.jersey.internal.inject.Injections.lambda$lookupInjectionManagerFactory$0(Injections.java:98)
    at java.util.Optional.orElseThrow(Optional.java:290)
    at org.glassfish.jersey.internal.inject.Injections.lookupInjectionManagerFactory(Injections.java:98)
    at org.glassfish.jersey.internal.inject.Injections.createInjectionManager(Injections.java:93)
    at org.glassfish.jersey.server.ApplicationHandler.<init>(ApplicationHandler.java:282)
    at org.glassfish.jersey.servlet.WebComponent.<init>(WebComponent.java:335)
    at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:178)
    at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:370)
    at javax.servlet.GenericServlet.init(GenericServlet.java:244)
    at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:670)
    ... 21 more

Adding the following dependency to the pom file seems to work:

    <dependency>
            <groupId>org.glassfish.jersey.inject</groupId>
            <artifactId>jersey-hk2</artifactId>
            <version>${jersey.version}</version>
     </dependency>

Microservices example: incorrect parsing of args

In 5.0.x:

Regarding these lines of code: https://github.com/confluentinc/kafka-streams-examples/blob/5.0.x/src/main/java/io/confluent/examples/streams/microservices/OrdersService.java#L388-L391

The parsing is incorrect, the array index should be shifted down as follows:

    final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
    final String schemaRegistryUrl = args.length > 1 ? args[1] : "http://localhost:8081";
    final String restHostname = args.length > 2 ? args[2] : "localhost";
    final String restPort = args.length > 3 ? args[3] : null;

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.