Giter VIP home page Giter VIP logo

kafka-junit's Introduction

Kafka JUnit Build Status Maven Central

Kafka Junit provides helpers for starting and tearing down a Kafka broker during tests.

Please note that version 3.x.x drops Java 7 support and contains breaking API changes.

Version support matrix
Version Kafka Version
1.6 0.8.2.1
1.7 0.8.2.2
1.8 0.9.0.0
2.3 0.9.0.1
2.4 0.10.0.0
2.5 0.10.0.1
3.0.0 0.10.0.1
3.0.1 0.10.0.1
3.0.2 0.10.1.1
3.0.3 0.10.2.0
3.0.4 0.10.2.1
3.1.0 0.11.0.0
3.1.1 0.11.0.1
4.0.0 1.0.0
4.1.0 1.0.0 (Adds support for both Junit 4 and 5)
4.1.1 1.1.0
4.1.2 2.0.0
4.1.3 2.1.0
4.1.4 2.1.1
4.1.5 2.2.0
4.1.6 2.3.0
4.1.7 2.4.0
4.1.8 2.4.1
4.1.9 2.5.0
4.1.10 2.6.0
4.1.11 2.6.0 (Scala 2.13)
4.2.0 2.8.0
4.2.1 3.0.0
4.2.2 3.2.0
4.2.3 3.2.1
4.2.4 3.4.0
4.2.5 3.5.0
4.2.6 3.5.0 (Security update)
4.2.7 3.5.0 (Security update)
4.2.10 3.6.0

Installation

Releases are available on Maven Central.

Maven Central

Snapshot versions containing builds from the latest master are available in the Sonatype snapshots repo.

Javadocs

http://charithe.github.io/kafka-junit/

Usage

JUnit 4

Create an instance of the rule in your test class and annotate it with @Rule. This will start and stop the broker between each test invocation.

@Rule
public KafkaJunitRule kafkaRule = new KafkaJunitRule(EphemeralKafkaBroker.create());

To spin up the broker at the beginning of a test suite and tear it down at the end, use @ClassRule.

@ClassRule
public static KafkaJunitRule kafkaRule = new KafkaJunitRule(EphemeralKafkaBroker.create());

kafkaRule can be referenced from within your test methods to obtain information about the Kafka broker.

@Test
public void testSomething(){
    // Convenience methods to produce and consume messages
    kafkaRule.helper().produceStrings("my-test-topic", "a", "b", "c", "d", "e");
    List<String> result = kafkaRule.helper().consumeStrings("my-test-topic", 5).get();

    // or use the built-in producers and consumers
    KafkaProducer<String, String> producer = kafkaRule.helper().createStringProducer();

    KafkaConsumer<String, String> consumer = kafkaRule.helper().createStringConsumer();

    // Alternatively, the Zookeeper connection String and the broker port can be retrieved to generate your own config
    String zkConnStr = kafkaRule.helper().zookeeperConnectionString();
    int brokerPort = kafkaRule.helper().kafkaPort();
}

EphemeralKafkaBroker contains the core logic used by the JUnit rule and can be used independently.

KafkaHelper contains a bunch of convenience methods to work with the EphemeralKafkaBroker

JUnit 5

JUnit 5 does not have support for Rules, but instead uses the new JUnit 5 Extension Model.

So if you are using JUnit 5 you can use KafkaJunitExtension which provides a kafka broker that is started and stopped for each test.

The extension is configured using the optional class annotation @KafkaJunitExtensionConfig and provides dependency injection for constructors and methods for the classes KafkaHelper and EphemeralKafkaBroker

@ExtendWith(KafkaJunitExtension.class)
@KafkaJunitExtensionConfig(startupMode = StartupMode.WAIT_FOR_STARTUP)
class MyTestClass {

    @Test
    void testSomething(KafkaHelper kafkaHelper) throws ExecutionException, InterruptedException {
        // Convenience methods to produce and consume messages
        kafkaHelper.produceStrings("my-test-topic", "a", "b", "c", "d", "e");
        List<String> result = kafkaHelper.consumeStrings("my-test-topic", 5).get();
        assertThat(result).containsExactlyInAnyOrder("a", "b", "c", "d", "e");

        // or use the built-in producers and consumers
        KafkaProducer<String, String> producer = kafkaHelper.createStringProducer();

        KafkaConsumer<String, String> consumer = kafkaHelper.createStringConsumer();

        // Alternatively, the Zookeeper connection String and the broker port can be retrieved to generate your own config
        String zkConnStr = kafkaHelper.zookeeperConnectionString();
        int brokerPort = kafkaHelper.kafkaPort();
    }
}

Refer to Javadocs and unit tests for more usage examples.

kafka-junit's People

Contributors

cchacin avatar charithe avatar dariodariodario avatar dependabot[bot] avatar edwardmlyte avatar erik-helleren avatar flowertwig avatar fmthoma avatar frederikp avatar hanleyt avatar hypnoce avatar jeqo avatar jordi-devs avatar makkan avatar mattyb678 avatar mbode avatar mtillu avatar nicobn avatar rassmate avatar softqwewasd avatar teabot avatar theknowles avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-junit's Issues

Transactional producer not working

I have created a kafka producer(v1.0.0) and tried using transactions. The kafka-junit broker(v4.1.0) is running. The code snippet looks like:

    Properties config = new Properties();
    config.put("transactional.id","id");
    config.put("acks","all");
    KafkaProducer<String, String> producer =
                 broker.createProducer(new StringSerializer(), new StringSerializer(), config);
    System.out.println("producer created");
    producer.initTransactions();
    System.out.println("init transaction");
    producer.send(new ProducerRecord<>(TEST_TOPIC, "key1", "value1"));

This code gets stuck in a loop while executing the statement initTransactions().

readStringMessages takes 30+ seconds

Hi,

I'm using readStringMessages to read messages and the method call takes at least 30 seconds to return. The long delay seems to occur when connector.shutdown() is called on line 293 of KafkaJunitRule.java. Any pointers on how to decrease the execution time ?

Use Scala version 2.11

kafka-junit is compiled for org.apache.kafka_2.10:0.8.2.1

I would like a version that is compiled with scala 2.11 instead as the 2.10 version does not work when the rest of my project uses 2.11.

Tried to play around with mvn profiles to get this to work, but mvn isn`t by strongest side to say the least.

Any chance of getting a version compiled for 2.11 or maybe compile 1.6 for 2.11?

Getting error when running unit test

I tried to run below test in my environment and get error. The test keeps on running for sometime then finally gives up.

https://github.com/charithe/kafka-junit/blob/master/src/test/java/com/github/charithe/kafka/KafkaJunitClassRuleTest.java.

Error:>>
Exception in thread "Thread-0" java.lang.NoSuchFieldError: configFileStr
at org.apache.curator.test.QuorumConfigBuilder$1.(QuorumConfigBuilder.java:135)
at org.apache.curator.test.QuorumConfigBuilder.buildConfig(QuorumConfigBuilder.java:130)
at org.apache.curator.test.TestingZooKeeperServer$1.run(TestingZooKeeperServer.java:149)
at java.lang.Thread.run(Thread.java:745)

<< Hangs here for sometime and then quits with below error>>

java.lang.IllegalStateException: Timed out waiting for watch removal

at org.apache.curator.test.TestingZooKeeperMain.blockUntilStarted(TestingZooKeeperMain.java:153)
at org.apache.curator.test.TestingZooKeeperServer.start(TestingZooKeeperServer.java:159)
at org.apache.curator.test.TestingServer.<init>(TestingServer.java:117)
at org.apache.curator.test.TestingServer.<init>(TestingServer.java:100)
at org.apache.curator.test.TestingServer.<init>(TestingServer.java:52)
at com.github.charithe.kafka.EphemeralKafkaBroker.startBroker(EphemeralKafkaBroker.java:124)
at com.github.charithe.kafka.EphemeralKafkaBroker.start(EphemeralKafkaBroker.java:114)
at com.github.charithe.kafka.KafkaJunitRule.before(KafkaJunitRule.java:41)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

Reference: >>

Below are the dependency I use.

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.0.1</version>
    </dependency>
    <dependency>
        <groupId>com.github.charithe</groupId>
        <artifactId>kafka-junit</artifactId>
        <version>3.0.1</version>
        <scope>test</scope>
    </dependency>

java.lang.NoSuchFieldError: configFileStr

Any idea how to fix it?

com.geophy.coredb.core.kafka.producer.StardogTxProducerTest
Exception in thread "Thread-1" java.lang.NoSuchFieldError: configFileStr
at org.apache.curator.test.QuorumConfigBuilder$1.(QuorumConfigBuilder.java:142)
at org.apache.curator.test.QuorumConfigBuilder.buildConfig(QuorumConfigBuilder.java:137)
at org.apache.curator.test.TestingZooKeeperServer$1.run(TestingZooKeeperServer.java:149)
at java.lang.Thread.run(Thread.java:748)

java.lang.IllegalStateException: Timed out waiting for watch removal

at org.apache.curator.test.TestingZooKeeperMain.blockUntilStarted(TestingZooKeeperMain.java:153)
at org.apache.curator.test.TestingZooKeeperServer.start(TestingZooKeeperServer.java:159)
at org.apache.curator.test.TestingServer.<init>(TestingServer.java:117)
at org.apache.curator.test.TestingServer.<init>(TestingServer.java:100)
at org.apache.curator.test.TestingServer.<init>(TestingServer.java:52)
at com.github.charithe.kafka.EphemeralKafkaBroker.startBroker(EphemeralKafkaBroker.java:124)
at com.github.charithe.kafka.EphemeralKafkaBroker.start(EphemeralKafkaBroker.java:114)
at com.geophy.coredb.core.kafka.producer.StardogTxProducerTest.setup(StardogTxProducerTest.java:25)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
com.github.charithe kafka-junit 3.0.3 test
	<dependency>
		<groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>0.10.2.0</version>
		<scope>compile</scope>

EphemeralKafkaCluster with more than 2 brokers fails with NOT_ENOUGH_REPLICAS error

When I create cluster with more 2 brokers cluster = EphemeralKafkaCluster.create(3); it starts but on sending messages ends with tons of error messages:
[data-plane-kafka-request-handler-4] ERROR kafka.server.ReplicaManager - [ReplicaManager broker=2] Error processing append operation on partition dlq_topic-0 org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(2) is insufficient to satisfy the min.isr requirement of 2 for partition dlq_topic-0 and [kafka-producer-network-thread | kafka-junit] WARN org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=kafka-junit, transactionalId=INTEGRATIONS_TRANSACTION_ID] Got error produce response with correlation id 578 on topic-partition dlq_topic-0, retrying (2147483173 attempts left). Error: NOT_ENOUGH_REPLICAS

How I should configure cluster to support transactions?

Failed to construct Kafka Producer

On a unit test that I am writing, I have the following code below to test out that my Kafka Consumer is obtaining stuff from any relevant Producer. My Kafka Consumer code is wrapped within the KafkaStreamObtainer class. With the code below,

package com.termmerge.nlpcore.obtainer;

import junit.framework.TestCase;
import org.junit.Test;
import org.junit.ClassRule;
import com.github.charithe.kafka.KafkaJunitRule;
import com.github.charithe.kafka.EphemeralKafkaBroker;
import net.jodah.concurrentunit.Waiter;

import java.util.Map;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;


public class KafkaStreamObtainerTest extends TestCase
{

  @ClassRule
  public KafkaJunitRule kafkaRule =
          new KafkaJunitRule(EphemeralKafkaBroker.create());

  @Test
  public void testOneMessage() throws Throwable
  {
    Waiter waiter = new Waiter();

    KafkaProducer testProducer = kafkaRule.helper().createStringProducer();
    testProducer.send(
            new ProducerRecord<>("testTopic", "testKey", "testValue")
    );
    testProducer.close();

    Map consumerSettings = new Properties();
    consumerSettings.put(
            "connection_string",
            "localhost:" + Integer.toString(kafkaRule.helper().kafkaPort())
    );
    consumerSettings.put("group_id", "test");
    KafkaStreamObtainer kafkaStream =
            new KafkaStreamObtainer(consumerSettings);
    kafkaStream.addListener((record) -> {
      waiter.assertEquals(record.get("key"), "testKey");
      waiter.assertEquals(record.get("value"), "testValue");
      waiter.resume();
    });
    kafkaStream.listenToStream("testTopic");

    waiter.await(50000);
  }

}

I keep getting the following error:

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

I'm completely stumped. What could possibly cause this?

JUnit 5 example test fails with NoSuchFieldError: configFileStr

I created a very simple maven project with junit5 and the example test (which btw is missing a throws declaration to actually compile). Unfortunately, when I run the test (on windows 7 in idea 2017.3) I get this error:

Exception in thread "Thread-1" java.lang.NoSuchFieldError: configFileStr
	at org.apache.curator.test.QuorumConfigBuilder$1.<init>(QuorumConfigBuilder.java:142)
	at org.apache.curator.test.QuorumConfigBuilder.buildConfig(QuorumConfigBuilder.java:137)
	at org.apache.curator.test.TestingZooKeeperServer$1.run(TestingZooKeeperServer.java:157)
	at java.lang.Thread.run(Thread.java:748)

java.lang.IllegalStateException: Timed out waiting for watch removal

	at org.apache.curator.test.TestingZooKeeperMain.blockUntilStarted(TestingZooKeeperMain.java:146)
	at org.apache.curator.test.TestingZooKeeperServer.start(TestingZooKeeperServer.java:167)
	at org.apache.curator.test.TestingServer.<init>(TestingServer.java:117)
	at org.apache.curator.test.TestingServer.<init>(TestingServer.java:100)
	at org.apache.curator.test.TestingServer.<init>(TestingServer.java:52)
	at com.github.charithe.kafka.EphemeralKafkaBroker.startBroker(EphemeralKafkaBroker.java:135)
	at com.github.charithe.kafka.EphemeralKafkaBroker.start(EphemeralKafkaBroker.java:124)
	at com.github.charithe.kafka.KafkaJunitExtension.beforeEach(KafkaJunitExtension.java:62)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$null$0(TestMethodTestDescriptor.java:126)
	at org.junit.jupiter.engine.execution.ThrowableCollector.execute(ThrowableCollector.java:40)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeMethodsOrCallbacksUntilExceptionOccurs(TestMethodTestDescriptor.java:152)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeEachCallbacks(TestMethodTestDescriptor.java:125)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:104)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.lambda$execute$3(HierarchicalTestExecutor.java:83)
	at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:77)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.lambda$null$2(HierarchicalTestExecutor.java:92)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.Iterator.forEachRemaining(Iterator.java:116)
	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.lambda$execute$3(HierarchicalTestExecutor.java:92)
	at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:77)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.lambda$null$2(HierarchicalTestExecutor.java:92)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.Iterator.forEachRemaining(Iterator.java:116)
	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.lambda$execute$3(HierarchicalTestExecutor.java:92)
	at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:77)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:51)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:43)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:170)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:154)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:90)
	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:65)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Rule does not allow configuration of de/serializers

Recently I ran into the need to apply configuration to the StringDeserializer and found two issues:

  1. There is no way to provide additional producer/consumer configuration properties (other than building everything outside the Rule)
  2. The configure(Map,boolean) methods are never called with the configuration during setup.

I ended up coding a simple extension in Groovy for our needs, but this would be a nice addition to the base functionality. Here is my extension:

class ExtendedKafkaJunitRule extends KafkaJunitRule {

    Properties consumerConfig(final Properties addedProps, final boolean autoConfig) {
        Properties config = consumerConfig(autoConfig)
        config.putAll(addedProps)
        config
    }

    public <K, V> KafkaConsumer<K, V> createConsumer(final Properties extraConfig, final Deserializer<K> keyDeser, final Deserializer<V> valDeser, final boolean autoCommit) {
        def config = consumerConfig(extraConfig, autoCommit)

        keyDeser.configure(config, true)
        valDeser.configure(config, false)

        new KafkaConsumer<K, V>(config, keyDeser, valDeser)
    }

    public <K, V> ListenableFuture<List<ConsumerRecord<K, V>>> pollMessages(Properties extraConfig, String topic, int numMessagesToPoll, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        KafkaConsumer consumer = createConsumer(extraConfig, keyDeserializer, valueDeserializer, false)
        consumer.subscribe([topic])

        ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor())
        return executor.submit(new KafkaJunitRule.RecordConsumer(numMessagesToPoll, consumer))
    }

    public ListenableFuture<List<ConsumerRecord<String, String>>> pollStringMessages(Properties extraConfig, String topic, int numMessagesToPoll) {
        return pollMessages(extraConfig, topic, numMessagesToPoll, new StringDeserializer(), new StringDeserializer())
    }
}

I would be willing to add this to the base class myself (in Java) and create a pull request, if you are open to contributions.

Does not work with dependency kafka-avro-serializer

When you include

       <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <!-- For CP 3.3.0 -->
            <version>3.3.0</version>
        </dependency>

one gets

Exception in thread "Thread-0" java.lang.NoSuchFieldError: configFileStr
        at org.apache.curator.test.QuorumConfigBuilder$1.<init>(QuorumConfigBuilder.java:142)
        at org.apache.curator.test.QuorumConfigBuilder.buildConfig(QuorumConfigBuilder.java:137)
        at org.apache.curator.test.TestingZooKeeperServer$1.run(TestingZooKeeperServer.java:157)
        at java.lang.Thread.run(Thread.java:748)

Is it possible to get official guidance to make the two libraries co-exist?

Kafka 0.9.0.1 and Kafka Clients API for JDK 7

Hi

It would be nice to have your recent changes you made in version 2.0 for JDK 7 too. I have a couple of projects using the latest Kafka version but still running on Java 7.

If you would be so kind and decide to release such version, please find changes in my repo on branch kafka-junit-1.9.1. I do not want to create PR to your master because of project version and JDK difference.

My branch:
https://github.com/wojtek-szymanski/kafka-junit/tree/kafka-junit-1.9.1

Commits:
wojtek-szymanski@74129bd
wojtek-szymanski@c76baba

consumeStrings() discards buffered messages

When there are multiple messages in topic and consumeStrings() is called with number of messages less than number present in there, the extra messages will be discarded.

Example:

@ClassRule
public static KafkaJunitRule kafka = new KafkaJunitRule(EphemeralKafkaBroker.create()).waitForStartup();

@Test
public void test() throws Exception {
        kafka.helper().produceStrings("test", "a", "b");
        kafka.helper().consumeStrings("test", 1).get();
        kafka.helper().consumeStrings("test", 1).get();
}

Here I send two string messages into topic and then try to read one-by-one using consumeStrings(). The first one will succeed and return "a" while the second one will block forever.

The reason is enable.auto.commit set to true. The consumer will receive both messages in one poll() and commit only one (RecordConsumer.call(), line 311). This is correct but since autocommit is enabled, both message were already committed. The problem is that I'm not able to override properties passed to KafkaConsumer or set autocommit to false in consumerConfig() - as these are both called automatically from consumeStrings().

I think the solution could be to use consumerConfig(false) for creating consumer in consumeStrings(). But I haven't tested it.

Kafka 2.4

We'll soon get pressure to support Kafka 2.4, so will be helpful to have a compatible release here. cheers.

Kafka 2.3 is out

I suspect there will be some work here as there usually is a little thing here or there..

Failing JUnits on Windows

Running JUnit EphemeralKafkaBrokerTest on Windows is failing:

11:08:19.131 [onPool-worker-1] WARN     k.server.BrokerMetadataCheckpoint - No meta.properties file under dir C:\Users\KASSOV~1.MAR\AppData\Local\Temp\kafka_junit2358289997418960258\meta.properties
11:08:19.764 [onPool-worker-1] WARN     k.server.BrokerMetadataCheckpoint - No meta.properties file under dir C:\Users\KASSOV~1.MAR\AppData\Local\Temp\kafka_junit2358289997418960258\meta.properties
11:08:19.961 [d | kafka-junit] WARN  o.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 1 : {test-topic=LEADER_NOT_AVAILABLE}
11:08:20.096 [d | kafka-junit] WARN  o.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 3 : {test-topic=LEADER_NOT_AVAILABLE}
11:08:23.019 [main           ] ERROR     c.g.c.kafka.EphemeralKafkaBroker - Failed to clean-up Kafka
java.nio.file.FileSystemException: C:\Users\KASSOV~1.MAR\AppData\Local\Temp\kafka_junit2358289997418960258\test-topic-0\00000000000000000000.timeindex: The process cannot access the file because it is being used by another process.
	at sun.nio.fs.WindowsException.translateToIOException(Unknown Source)
	at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)
	at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)
	at sun.nio.fs.WindowsFileSystemProvider.implDelete(Unknown Source)
	at sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(Unknown Source)
	at java.nio.file.Files.deleteIfExists(Unknown Source)
	at com.github.charithe.kafka.EphemeralKafkaBroker$1.visitFile(EphemeralKafkaBroker.java:171)
	at com.github.charithe.kafka.EphemeralKafkaBroker$1.visitFile(EphemeralKafkaBroker.java:1)
	at java.nio.file.Files.walkFileTree(Unknown Source)
	at java.nio.file.Files.walkFileTree(Unknown Source)
	at com.github.charithe.kafka.EphemeralKafkaBroker.stopBroker(EphemeralKafkaBroker.java:168)
	at com.github.charithe.kafka.EphemeralKafkaBroker.stop(EphemeralKafkaBroker.java:147)
	at com.github.charithe.kafka.EphemeralKafkaBrokerTest.testReadAndWrite(EphemeralKafkaBrokerTest.java:112)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
11:08:23.563 [main           ] ERROR             kafka.server.KafkaServer - Fatal error during KafkaServer shutdown.
java.lang.IllegalStateException: Kafka server is still starting up, cannot shut down!
	at kafka.server.KafkaServer.shutdown(KafkaServer.scala:575)
	at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:51)
	at com.github.charithe.kafka.EphemeralKafkaBroker.stopBroker(EphemeralKafkaBroker.java:158)
	at com.github.charithe.kafka.EphemeralKafkaBroker.stop(EphemeralKafkaBroker.java:147)
	at com.github.charithe.kafka.EphemeralKafkaBrokerTest.testStartAndStop(EphemeralKafkaBrokerTest.java:68)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)

There has to be some issue with statitcs since next run ends differenly

11:12:32.587 [onPool-worker-1] WARN     k.server.BrokerMetadataCheckpoint - No meta.properties file under dir C:\Users\KASSOV~1.MAR\AppData\Local\Temp\kafka_junit3765302414122141202\meta.properties
11:12:33.143 [onPool-worker-1] WARN     k.server.BrokerMetadataCheckpoint - No meta.properties file under dir C:\Users\KASSOV~1.MAR\AppData\Local\Temp\kafka_junit3765302414122141202\meta.properties
11:12:33.413 [d | kafka-junit] WARN  o.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 1 : {test-topic=LEADER_NOT_AVAILABLE}
11:12:33.565 [d | kafka-junit] WARN  o.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 3 : {test-topic=LEADER_NOT_AVAILABLE}
11:12:34.751 [onPool-worker-1] WARN     k.server.BrokerMetadataCheckpoint - No meta.properties file under dir C:\Users\KASSOV~1.MAR\AppData\Local\Temp\kafka_junit4699908709488809068\meta.properties
11:12:34.822 [main           ] ERROR             kafka.server.KafkaServer - [Kafka Server 1], Fatal error during KafkaServer shutdown.
java.lang.IllegalStateException: Kafka server is still starting up, cannot shut down!
	at kafka.server.KafkaServer.shutdown(KafkaServer.scala:575)
	at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:51)

This might not be obvious on Linux platform. I suppose that there is some resource leak that keeps directory file handle open.

kafka-junit support for kafka_2.13 (Kafka 2.6.0 recommends using Scala 2.13.x)

Seems that kafka-junit 4.1.0 works until Scala version 2.12.11. When scala-library is updated to 2.12.12 runtime errors occurs
Stacktrace:
java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$7
at kafka.api.ApiVersion$.orderingByVersion(ApiVersion.scala:45) ~[kafka_2.12-2.6.0.jar:?]
at kafka.api.ApiVersion.compare(ApiVersion.scala:141) ~[kafka_2.12-2.6.0.jar:?]
at kafka.api.ApiVersion.compare$(ApiVersion.scala:140) ~[kafka_2.12-2.6.0.jar:?]
at kafka.api.KAFKA_2_6_IV0$.compare(ApiVersion.scala:348) ~[kafka_2.12-2.6.0.jar:?]
at kafka.api.KAFKA_2_6_IV0$.compare(ApiVersion.scala:348) ~[kafka_2.12-2.6.0.jar:?]
at scala.math.Ordered.$greater$eq(Ordered.scala:91) ~[scala-library-2.12.12.jar:?]
at scala.math.Ordered.$greater$eq$(Ordered.scala:91) ~[scala-library-2.12.12.jar:?]
at kafka.api.KAFKA_2_6_IV0$.$greater$eq(ApiVersion.scala:348) ~[kafka_2.12-2.6.0.jar:?]
at kafka.server.KafkaConfig.(KafkaConfig.scala:1564) ~[kafka_2.12-2.6.0.jar:?]
at kafka.server.KafkaConfig.(KafkaConfig.scala:1272) ~[kafka_2.12-2.6.0.jar:?]
at com.github.charithe.kafka.EphemeralKafkaBroker.buildKafkaConfig(EphemeralKafkaBroker.java:229) ~[kafka-junit-4.1.10.jar:?]

KafkaHelper threading issue

Hi there,

When using KafkaHelper consume methods I can see exceptions like this ocasionally:

Suppressed: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1808)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1701)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1680)

I see that consume methods take as input a kafkaconsumer instance and try to consume a given number of records using a different thread. But I think this not OK, because the org.apache.kafka.clients.consumer.kafkaConsumer class is documented as not thread safe. In fact, the kafkaConsumer looks able to detect this bad use of the API and throws the previous exception.

I guess that a possible approach could be to offer methods that take as input consumer configuration and that consumer is created kafka consumer in the same thread that consumption is done. The methods could return Futures.

I am currently using kafka-junit 4.0.0 with kafka-client 1.0., but I have been checking the latest release and I think it has the same problem.

Thanks in advance,

/Evaristo

When using timeout to simulate "no-message" scenario, other regular test cases stop working

I posted a question on stackoverflow about the problem i was having when running my entire test suite - https://stackoverflow.com/q/56076971/1029684

Further investigation showed that my test cases would fail when executed after some test cases that use the timeout function in kafkaRule.helper().consumeStrings().get(). I am using the @rule annotation but still the test case seems to be getting affected by previous runs. I had to do some exclusions in maven to get this lib working. Below is the Maven part and the test cases i have:

POM.xml

 <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.3-beta</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.1.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>scala-library</artifactId>
                    <groupId>org.scala-lang</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>scala-reflect</artifactId>
                    <groupId>org.scala-lang</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>scala-logging_2.12</artifactId>
                    <groupId>com.typesafe.scala-logging</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>zookeeper</artifactId>
                    <groupId>org.apache.zookeeper</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.github.charithe</groupId>
            <artifactId>kafka-junit</artifactId>
            <version>4.1.3</version>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <artifactId>kafka_2.12</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
            </exclusions>
        </dependency>

Jnuit 4 tests

public class KafkaTest {
    private static int kafkaBrokerPort = 9000;

    private final String MY_TOPIC = "my-topic";

    @Rule
    public KafkaJunitRule kafkaRule = new KafkaJunitRule(EphemeralKafkaBroker.create(kafkaBrokerPort));

    @Test
    public void B() throws InterruptedException, TimeoutException, ExecutionException {
        // App logic that sends messages to Kafka
        kafkaRule.helper().produceStrings(MY_TOPIC, "a", "b", "c", "d", "e");

        List<String> result = kafkaRule.helper().consumeStrings(MY_TOPIC, 5).get(30, TimeUnit.SECONDS);
        assertEquals(5, result.size());
    }

    @Test
    public void A() throws InterruptedException, ExecutionException {
        // App logic that doesn't send messages to Kafka

        List<String> result = Collections.emptyList();
        try {
            result = kafkaRule.helper().consumeStrings(MY_TOPIC, 1).get(10000, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            // Do nothing
        }

        assertEquals(0, result.size());
    }
}

When i run test A or B individually, they are execute successfully all the time. When i run the whole test suite and if B executed before A, then again everything goes fine. It is only when i run the test suite and B gets executed before A, it fails.
It has something to do with the timeout thing i implemented. I could not find out any other way to simulate the condition where i check the message was not sent.

Multipartition topic

Hello, I have not found how can I create a multipartition topic, using KafkaJunitRule.
Probably I have missed something. Any suggestions?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.