Giter VIP home page Giter VIP logo

kafka-streams-in-action's Introduction

Source Code for Kafka Streams in Action

Welcome to the source code for Kafka Streams in Action. Here you'll find directions for running the example code from the book. If any of the examples fail to produce output make sure you have created the topics needed. For those running on Max/*nix there is the create-topic.sh file in the bin directory which creates all required topics ahead of time.

If you have any issues with the examples you can post a question in the Manning Authors forum at https://forums.manning.com/forums/kafka-streams-in-action

Instructions for Chapter 9 Examples

The examples in Chapter 9 are more involved and require some extra steps to run. The first example we'll go over is the Kafka-Connect and Kafka Streams integration.

Kafka-Connect & Kafka Streams Example

To run the Kafka Connect and Kafka Streams example you'll need to do the following:

  1. Update the plugin.path property in the connect-standalone.properties file to the path where you cloned this repository. The plugin.path property contains the path to the upber-jar file with the Confluent JDBC connector and the H2 database classes. Make sure just to update the base location of where you installed the source code, but leave the rest of the path in place.

  2. Copy both the connector-jdbc.properties and connect-standalone.properties files to the <kafka install dir>/config directory.

  3. Open a terminal window and cd into the base directory of the source code, the run ./gradlew runDatabaseInserts this will start the H2 database servers and start inserting data into a table that Kafka-Connect monitors.

  4. In another terminal window cd into <kafka install dir>/bin and run ./connect-standalone.sh ../config/connect-standalone.properties ../config/connector-jdbc.properties this will start Kafka Connect and it will start pulling data from the database table into Kafka.

  5. Open a third terminal window from the base of the source code install and run ./gradlew runStreamsConnectIntegration_Chapter_9 and this will start the Kafka Streams application that will start stream processing data from a database table via Connect!

For this example to work properly you must start the database server/insert process before starting Kafka-Connect.

To clean up or start the example over remove the Connect offsets (stored in the file /tmp/connect.offsets by default) and remove the H2 database file file (findata.mv.db) stored in your home directory.

Running the Interactive Query Examples

To prepare for the interactive queries, you'll need to increase the partitions on the stock-transactions topic to two partitions with the following command: <kafka base dir>/bin/kafka-topics.sh --alter --partitions 2 --zookeeper localhost:2181 --topic stock-transactions

NOTE

The Interactive Query example makes use of the stock-transactions topic which is used in previous examples. If you go back to any of the earlier examples that use the stock-transactions topic, you'll need to delete it topic an create it again.

The to run the interactive query examples you'll execute 3 commands, each in a separate terminal, from the base directory of the source code install:

  1. ./gradlew runProducerInteractiveQueries runs the producer sending
  2. ./gradlew runInteractiveQueryApplicationOne starts a Streams instance with an embedded web server on localhost:4567
  3. ./gradlew runInteractiveQueryApplicationTwo starts a Streams instance with an embedded web server on localhost:4568

After all three are running you can try out some of the REST calls from your browser. It does'nt matter which port you choose, you'll retrieve the same results. Here are some examples:

  • http://localhost:4567/kv/TransactionsBySector shows aggregated transactions by market sector
  • http://localhost:4567/window/NumberSharesPerPeriod/XXXX shows the number of shares traded over 10 second windows for the last 30 seconds for a given stock symbol. Just replace the XXXX with one of the following AEBB, VABC, ALBC, EABC, BWBC, BNBC, MASH, BARX, WNBC, WKRP
  • http://localhost:4567/session/CustomerPurchaseSessions/NNNNNNNNN displays the average dollar amount spent per share by customers over one hour sessions. Just replace the NNNNNNNNN with one of the following customer ids 12345678, 222333444, 33311111, 55556666, 4488990011, 77777799, 111188886,98765432, 665552228, 660309116
The Interactive Queries Web Application

However the best way to watch Interactive Queries in action is to point your browser to localhost:4568/iq or localhost:4567/iq.
This will launch a web application that updates results for all parameters over all three stores TransactionsBySector, NumberSharesPerPeriod, CustomerPurchaseSessions every 7 seconds via ajax requests to the REST endpoints and are displayed in the web application.

Again it does not matter which port you point to you'll get the same results.

General Notes on Examples

Here are some notes regarding the source code:

  1. Since chapter 8 is all about testing, all examples run as tests from within your IDE, although if you want you can run tests from the command line via a ./gradlew clean test command.

  2. All unit tests use JUnit 5.0, so it's good opportunity to learn the changes that have come with the new version.

  3. Chapter 7 examples are concerned with observing performance so at least one of the examples will continue to run until you explicitly stop the program from running. Most of the related results are found in log files or viewed JMX.

  4. For the examples in Chapters 5 and 6 since they rely more on timestamps and potential joins, sometimes it takes a few seconds for data to show up. Additionally random data is generated for each run of an example so example runs produce better data than others. Please be patient.

  5. If at first you don't get any results, re-run the example. It could be a I missed adding a topic name to the create-topics.sh script and the topic does not exist yet, but Kafka is configured to create topics automatically.

Requirements

This project assumes and requires the following

  1. Java 8
  2. Gradle

If you don't have gradle installed, that's ok, this project uses the gradle wrapper. This means the first time you run the ./gradlew or gradlew command gradle will be installed for you.

Included Dependencies

  1. kafka_2.12-1.0.0.tgz

Kafka itself (version 2.12-1.0.0) is included as a convenience.

All other dependencies are taken care of via gradle.

IDE setup

The gradle eclipse and intellij plugins are included in the build.gradle file.

  1. To set up for eclipse run ./gradlew eclipse (for windows gradlew eclipse) from the base directory of this repo.
  2. To set up for intellij run ./gradlew idea (for windows gradlew idea) from the base directory of this repo.

Installing the included Kafka

Run tar xvzf kafka_2.12-1.0.0.tgz some where on your computer.

Running Kafka

  1. To start kafka go to /kafka_2.12-1.0.0/bin
  2. Run zookeeper-server-start.sh
  3. Run kafka-server-start.sh

If you are on windows, go to the /kafka_2.12-1.0.0/bin/windows directory and run the .bat files with the same name and in the same order.

Stopping Kafka

  1. To start kafka go to /kafka_2.12-1.0.0/bin
  2. Run kafka-server-stop.sh
  3. Run zookeeper-server-stop.sh

If you are on windows, go to the /kafka_2.12-1.0.0/bin/windows directory and run the .bat files with the same name and in the same order.

Sample Kafka Streams Code

All the code from the book can be found in the directory corresponding to the chapter where the book introduced or demonstrated the concept/code. Code that is not in a directory named "chapter_N" is either common code used across all chapters, or utility code.

Running the Kafka Streams examples

All of the example programs can be run from within an IDE or from the command line. There are gradle tasks for each of the examples we have so far. The provided Kafka will need to be running before you can start any of the examples. Also there is a script in the bin directory (create-topics.sh) that creates all topics required (I think I've added all topics, but may have missed one or two). If you don't run the script that's fine, Kafka auto-creates topics by default. For the purposes of our examples that is fine.

All examples should print to the console by default. Some may write out to topics and print to standard-out but if you don't see anything in the console you should check the source code to make sure I did'nt miss adding a print statement.

To run any of the example programs, I recommend running them through the set gradle tasks. Remember if you are windows use gradlew.bat instead ./gradlew to run the program. All the example programs are located in the build.gradle file. For your convenience here are the commands to run sample programs for chapters 1-7:

  1. ./gradlew runYellingApp (Kafka Streams version of Hello World)
  2. ./gradlew runZmartFirstAppChapter_3
  3. ./gradlew runZmartAdvancedChapter_3
  4. ./gradlew runAddStateAppChapter_4
  5. ./gradlew runJoinsExampleAppChapter_4
  6. ./gradlew runAggregationsChapter_5
  7. ./gradlew runCountingWindowingChapter_5
  8. ./gradlew runGlobalKtableChapter_5
  9. ./gradlew runKStreamKTableChapter_5
  10. ./gradlew runPopsHopsChapter_6
  11. ./gradlew runStockPerformance_Chapter_6
  12. ./gradlew runStockPerformanceStreamsProcess_Chapter_6
  13. ./gradlew runCoGrouping_Chapter_6
  14. ./gradlew runCoGroupinStateRetoreListener_Chapter_7
  15. ./gradlew runStockPerformanceConsumerInterceptor_Chapter_7
  16. ./gradlew runZmartJmxAndProducerInterecptor_Chapter_7

Example Kafka Streams Program Output

When running the examples, the program will generate data to flow through Kafka and into the sample streams program. The data generation occurs in the background. The Kafka Streams programs will run for approximately one minute each. The sample programs write results to the console as well as topics. While you are free to use the ConsoleConsumer or your own Consumer, it's much easier to view the results flowing to the console.

kafka-streams-in-action's People

Contributors

bbejeck avatar chang-chao avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-streams-in-action's Issues

ZMartKafkaStreamsAddStateApp.java does not process Purchase records

ZMartKafkaStreamsAddStateApp app (in Chapter 4 directory) does not work with latest version of Kafka Streams. I pretty much copied the code in github - https://github.com/bbejeck/kafka-streams-in-action/blob/master/src/main/java/bbejeck/chapter_4/ZMartKafkaStreamsAddStateApp.java . When I run the code as is, no data is flowing through the topology. I verified that mock data is being generated, as I can see the offset in the input topic - transactions go up. However, nothing in the output topics, and nothing is printed to console.

However, when I comment line 81-88 (https://github.com/bbejeck/kafka-streams-in-action/blob/master/src/main/java/bbejeck/chapter_4/ZMartKafkaStreamsAddStateApp.java#L81-L88), basically avoid creating the "through()" processor node, and the code works. After commending, I see data being generated to the "patterns" topics, and output generate in console.

If there is no plans to address/fix the issue, would appreciate any pointers so I can get the code working on my setup. Thank you.

Scala question and where do I get JAR for EmbeddedKafkaCluster

Firstly great book, thanks a million for it. Great

I hope you don't mind but just got a few questions, happy to do this some other way, if you want just let me know.

However if you are ok to help with 2 small questions.

  1. You use EmbeddedKafkaCluster, but what JAR would I need to get this in a IntelliJ IDEA SBT project. Got a link to what artifact it is in Maven Central. I could not tell from your Maven file which one it was

  2. I can see that there was a whole bunch of stuff brought into 2.0.0 (which I am using) to make working with Serdes easier from Scala. https://github.com/apache/kafka/tree/2.0/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala

This is based on the https://github.com/lightbend/kafka-streams-scala repo, that has now been folded into official apache/kafka codebase.

My question is, why don't I see these scala types if I am using the 2.0.0 artifact

I raised a question here too : https://groups.google.com/forum/?utm_medium=email&utm_source=footer#!topic/confluent-platform/f_umV-B6Spw

Its just being the author of the ONLY book on this subject, and if I am not mistaken you work for Confluent too right, so I thought if anyone knows it will be you.

I hope you dont mind direct question here, im really struggling to get this working

Examples not working if running with a locale different from en

I'm running examples on a machine with italian locale and examples don't work. In particular, MockDataProducer is not able to generate data, due to the generation with a different decimal separator. This is the stacktrace:

java.lang.NumberFormatException: For input string: "262,00"
	at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043)
	at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
	at java.lang.Double.parseDouble(Double.java:538)
	at bbejeck.util.datagen.DataGenerator.generatePurchases(DataGenerator.java:96)
	at bbejeck.clients.producer.MockDataProducer.lambda$producePurchaseData$0(MockDataProducer.java:68)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Also it has been very hard to debug since that code is wrapped in a task run with ExecutorService and exception is not thrown.
The solution anyway is to add the following JVM parameters: -Duser.country=US -Duser.language=en

Why KTable is emitting on each update ?

I have tried to simplify the example of KStreamVsKTable down to my problem, which is that KTable is emitting on each update, instead of the latest updates only.

Please see code below (in Scala):

object SimpleTable extends App {
  val topic = "simple-table"
  
  val prodProps = new Properties()
  prodProps.put("bootstrap.servers", "localhost:9092")
  prodProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  prodProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  prodProps.put("acks", "1")
  prodProps.put("retries", "3")

  val producer = new KafkaProducer[String, String](prodProps)

  producer.send(new ProducerRecord[String, String](topic, "key1", "value1"))
  producer.send(new ProducerRecord[String, String](topic, "key2", "value2"))
  producer.send(new ProducerRecord[String, String](topic, "key3", "value3"))
  producer.send(new ProducerRecord[String, String](topic, "key1", "value11"))
  producer.send(new ProducerRecord[String, String](topic, "key2", "value22"))
  producer.send(new ProducerRecord[String, String](topic, "key3", "value33"))

  producer.close()


  val streamProps = new Properties()
  streamProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-table-app1")
  streamProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  //streamProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group11")
  //streamProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "client11")
  //streamProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
  //streamProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "18000")
  //streamProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "18000")
  //streamProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "10485760")
  //streamProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1")
  //streamProps.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "10000")
  //streamProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1)
  //streamProps.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[WallclockTimestampExtractor])

  import org.apache.kafka.streams.scala.Serdes._
  implicit val consumeSerdes: Consumed[String, String] = Consumed.`with`[String, String]
  val builder = new StreamsBuilder()

  val simpleTable: KTable[String, String] = builder.table[String, String](topic)
  simpleTable.toStream.print(Printed.toSysOut[String, String].withLabel("simple-table"))


  val streams = new KafkaStreams(builder.build(), streamProps)
  streams.start()
  Thread.sleep(10000)
  streams.close()
}

This App is displaying this:

[simple-table]: key1, value1
[simple-table]: key2, value2
[simple-table]: key3, value3
[simple-table]: key1, value11
[simple-table]: key2, value22
[simple-table]: key3, value33

I am stuck on the book since almost one week now because of that. I am supposed to have only the latest 3 lines. The same thing is happening when I implement the example KStreamVsKTable from the book.

Please help.

Another question for you

I noticed you happen to work on a PR for this issue at kafka main branch

apache/kafka#4702

If I call topologyTestDriver

I get All sort of NIo exceptions about directory not being empty

If I just don't protect the call to the topologyTestDriver.close in try catch and also delete state directory myself all seems ok

Any idea if this is what I should be doing. Or should that jimust work in 2.1.0

I'm using Windows for dev machine

Sorry to ask another question of you

Shares Aggregation problem (Chapter 5)

First of all - thank you for this amazing book!

I've run in problem when running the example of transforming "Stock Transactions" into "Share Volume". With the example code from Github, it compiles, but fails with the following exception:

Exception in thread "Kafka-Streams-Tables-client-SHARES-StreamThread-1" java.lang.ClassCastException: com.google.gson.internal.LinkedTreeMap cannot be cast to java.lang.Comparable
	at java.util.TreeMap.compare(TreeMap.java:1294)
	at java.util.TreeMap.put(TreeMap.java:538)
	at java.util.TreeSet.add(TreeSet.java:255)
	at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:83)
	at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:61)
	at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:131)
	at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:222)
	at com.google.gson.Gson.fromJson(Gson.java:927)
	at com.google.gson.Gson.fromJson(Gson.java:892)
	at com.google.gson.Gson.fromJson(Gson.java:841)
	at com.google.gson.Gson.fromJson(Gson.java:813)
	at com.example.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:33)
	at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:159)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:245)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:137)
	at org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:78)
	at org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:56)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:351)
	at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

My application is configurred with following properties:

Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "Kafka-Streams-Tables-client-SHARES");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-streams-tables-group-SHARES");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "Kafka-Streams-Tables-App-SHARES");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

Whrn I tried running the debugger around the problem, it looks like it struggles to decode FixedSizePriorityQueue (or, rather, its internal TreeSet).

Is there anything you could recommend for me to try solving the issue?

Thank you for your help and great contributions!

java.lang.ClassCastException

Hello

I try to run SimpleProducer.java at Chapter 2
I got below exception

Caused by: java.lang.ClassCastException: class bbejeck.model.PurchaseKey cannot be cast to class java.lang.String (bbejeck.model.PurchaseKey is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')

need to be fixed

Java version 11
Kafka version : kafka_2.13-2.7.0

Ch. 4. Generating a key twice

In 4.4.2 Generating keys containing customer IDs to perform joins you stated we need to generate keys for values. As I can see from the code, you do this twice. The first time is when we mask CC number and the second one when we do the generation itself. This is a bit confusing. Have I missed something?

ReadMe organization

The README.md would be much more informative / easier to follow if it was re-organized a bit. Even just moving everything from Requirements until the end so that all of that content is above Instructions for Chapter 9 Examples would make things a lot nicer. I'd be happy to throw together a quick PR for you to review but I don't have the rights to create a branch.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.