Giter VIP home page Giter VIP logo

kafka-streams-dotnet's Introduction

Hi there ๐Ÿ‘‹

I'm a Customer Success Technical Architect at Confluent, the company founded by the original creators of Apache Kafka.

๐Ÿ—ฃ๏ธ I created couple of years ago an open source project which names Streamiz as a .NET Stream Processing Library for Apache Kafka ๐Ÿš€

The aim of this project is to deliver a .NET library to allow the stream processing mindset with Apache Kafka in the .NET world.

โœ๏ธ Twitter : @LGouellec

kafka-streams-dotnet's People

Contributors

attributeerror avatar cacodev avatar dependabot[bot] avatar dmitrygladky avatar drinehimer avatar fab60 avatar fglaeser avatar gladskih avatar icher-g avatar joaovitorpina avatar lgouellec avatar lollo25 avatar lukasswm avatar masteryee avatar mladenovic avatar mmoron avatar nardu92 avatar ppilev avatar rogersep avatar terry-yip avatar tsuz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-streams-dotnet's Issues

Use Parallel.ForEach(...) to forward records in child processor inside one topology

Description

At time, forwarding records in child processor inside topology is synchronous.

        public virtual void Forward<K1, V1>(K1 key, V1 value)
        {
            log.Debug($"{logPrefix}Forward<{typeof(K1).Name},{typeof(V1).Name}> message with key {key} and value {value} to each next processor");
            foreach (var n in Next)
                if (n is IProcessor<K1, V1>)
                    (n as IProcessor<K1, V1>).Process(key, value);
        }

Aims of this ticket is to propose a parallel execution like that :

        public virtual void Forward<K1, V1>(K1 key, V1 value)
        {
            log.Debug($"{logPrefix}Forward<{typeof(K1).Name},{typeof(V1).Name}> message with key {key} and value {value} to each next processor");
            Parallel.ForEach(Next, (n) => n.Process(key, value));
        }

Test to execute :

  • No regression in stateless processor
  • No regression in statefull processor

Dispose method for KafkaStream object throws 'System.ObjectDisposedException: handle is destroyed'

Description

Hello, I have created simple console application to test GlobalTable + InMemory state store and everything works fine untill we close application. As recommended I do a call to KafkaStream.Dispose() and it throws an error :

image

Can you please help me with that ? What could be the cause of this issue ?

How to reproduce

Create .net 5.0 console application with Streamiz.Kafka.Net v1.1.5
I removed reading from state store because it doesn't affect this issue, the minimal code to reproduce it below.

Configure and start:

            var config = new StreamConfig<StringSerDes, StringSerDes>();
            config.ApplicationId = "test-app-123";
            config.BootstrapServers = "localhost:9092";
            config.AutoOffsetReset = AutoOffsetReset.Earliest;

            StreamBuilder builder = new StreamBuilder();
            builder.GlobalTable("dima-test", InMemory<string, string>.As("dima-test-store"));

            Topology t = builder.Build();
            KafkaStream stream = new KafkaStream(t, config);
            await stream.StartAsync();

            await Task.Delay(2000);

            stream.Dispose();

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • A code snippet with your topology builder (ex: builder.Stream<string, string>("topic").to("an-another-topic");)
  • Streamiz.Kafka.Net nuget version. - v1.1.5
  • Apache Kafka version. - v1.6.2 (which is referenced from Streamiz library)
  • Client configuration.
  • Operating system. - Windows 10
  • Provide logs (with in debug mode (log4net and StreamConfig.Debug) as necessary in configuration).
  • Critical issue. - no

Propagating correlation id context.

Hi, do you have any advice as to how to propagate the correlation id into the logging context? Have you done it for your applications?

I would like to have my applications log a header I'm sending called X-Correlation-Id. This is a common thing to do for http servers and http calls to other services, but I want my kafka consumers to do it too. Is there a mechanism in this library that I can use for it?

An overview of what I'm after is:

  • Get the correlation id from the message headers
  • Propagate this into the logging context
  • Calls to web services should get that too sent in the request headers
  • Have produced messages get the correlation id header too
builder.Stream<string, string>("topic")
    .Peek((k, v) => Log.Debug($"Got a message! ({k}, {v})")) // Should log the correlation id
    .MapValues(v => makehttpcall(v)) // Should send the request header with the same correlation id
    .To("topic2"); // Should produce the message with the correlation id header.

Of course not all of these need to be solved by the library, but this library is to be used in the context of an application where observability plays a big role. Perhaps I could work on a pull request to play nicely with this feature. Let me know!

Regards.

Change Start/Stop public API more .net compliant

  • Use async/await in start processing
  • Implement IDisposable
KafkaStream stream1 = new KafkaStream(t, config);

Console.CancelKeyPress += (o, e) => {
    stream1.Dispose();
};

await stream1.StartAsync();

Improve sonarcloud kpi

Sonar KPI are really bad. This issue is for tracking that we must improve theses KPIs.

Benchmarking

Hi there,
I know you're still in early days but do you perhaps have any perspective f performance?
I meant any benchmarking with basic functions built so far?
Or any rough comparison with KSqlDB?
Thanks
Ehsan

Refactor topology node processor builder

Refactor topology node processor builder

Building processor node must implement Cloneable interface for guarantee one processor instance by stream task.
But state store instances are shared between each stream task ===> It's not correct.
So this issue is for tracking, that we must refactor processor creator with factory which garantee one processor instance and state store instance by stream task.

Performance was not there with a lot of messages.

Description

With a simple topology :

builder.Stream<string, MyDTO>("input-topic", null, JsonSerDes<MyDTO>.Instance)
.To<StringSerDes, JsonSerDes<MyDTO>>("output-topic");

Client can just process 30 messages per seconds.
Source topic : 3 partitions
Destination topic : 1 partition

Streamiz Configuration :

ApplicationId = settings.ApplicationId,
BootstrapServers = settings.BootstrapServers,
SaslMechanism = SaslMechanism.Gssapi,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
AutoOffsetReset = AutoOffsetReset.Earliest,
NumStreamThreads = 3

Add a log processing summary every X minutes (configurable)

Description

Aims to log (info level) processing summary records every X minutes.

// default value 1 minutes
config.LogProcessingSummary = TimeSpan.FromMinutes(5);

Example of execution

Processed {XX} total records and committed {YY} total tasks since the last update

Implementation

At the end of StreamThread.Run(..) around L215, add logical to check duration from last Log Processing Summary and display all KPI in INFO log.

Questions: How to provide serializer to the WrappedWindowStore

Description

When creating tumbling windows the result object's SerDes is not passed to the inner window store.

How to reproduce

Having the following stream:

StreamBuilder builder = new StreamBuilder();

builder.Stream<string, ObjectA, StringSerDes, SchemaAvroSerDes<ObjectA>>(_config.InputTopicName)
    .Map((key, value) => new KeyValuePair<string, ObjectA>(value.symbol, value))
    .GroupByKey()
    .WindowedBy(TumblingWindowOptions.Of(TimeSpan.FromMinutes(5)))
    .Aggregate<ObjectB, SchemaAvroSerDes<ObjectB>>(
        () => new ObjectB(),
        (key, ObjectA, ObjectB) => _ObjectBHelper.CreateObjectB(key, ObjectA, ObjectB))
    .ToStream()
    .Map((key, ObjectB) => new KeyValuePair<string, ObjectB>(key.Key, ObjectB))
    .To<StringSerDes, SchemaAvroSerDes<ObjectB>>(_config.OutputTopicName);

Will throw the following exception:

Streamiz.Kafka.Net.Errors.StreamsException: SchemaSerDes<ObjectB> is not initialized !
   at Streamiz.Kafka.Net.SchemaRegistry.SerDes.SchemaSerDes`1.Serialize(T data, SerializationContext context) in /Users/oxid/code/kafka-streams-dotnet/serdes/Streamiz.Kafka.Net.SchemaRegistry.SerDes/SchemaSerDes.cs:line 68
   at Streamiz.Kafka.Net.SerDes.ValueAndTimestampSerDes`1.Serialize(ValueAndTimestamp`1 data, SerializationContext context) in /Users/oxid/code/kafka-streams-dotnet/core/SerDes/ValueAndTimestampSerDes.cs:line 56
   at Streamiz.Kafka.Net.State.Internal.WrappedWindowStore`2.GetValueBytes(V value) in /Users/oxid/code/kafka-streams-dotnet/core/State/Internal/WrappedWindowStore.cs:line 48
   at Streamiz.Kafka.Net.State.Internal.WrappedWindowStore`2.Put(K key, V value, Int64 windowStartTimestamp) in /Users/oxid/code/kafka-streams-dotnet/core/State/Internal/WrappedWindowStore.cs:line 77
   at Streamiz.Kafka.Net.Processors.KStreamWindowAggregateProcessor`4.Process(K key, V value) in /Users/oxid/code/kafka-streams-dotnet/core/Processors/KStreamWindowAggregateProcessor.cs:line 74
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward[K1,V1](K1 key, V1 value) in /Users/oxid/code/kafka-streams-dotnet/core/Processors/AbstractProcessor.cs:line 83
   at Streamiz.Kafka.Net.Processors.KStreamMapProcessor`4.Process(K key, V value) in /Users/oxid/code/kafka-streams-dotnet/core/Processors/KStreamMapProcessor.cs:line 20
   at Streamiz.Kafka.Net.Processors.SourceProcessor`2.Process(K key, V value) in /Users/oxid/code/kafka-streams-dotnet/core/Processors/SourceProcessor.cs:line 57
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Process(Object key, Object value) in /Users/oxid/code/kafka-streams-dotnet/core/Processors/AbstractProcessor.cs:line 200
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Process(ConsumeResult`2 record) in /Users/oxid/code/kafka-streams-dotnet/core/Processors/AbstractProcessor.cs:line 194
   at Streamiz.Kafka.Net.Processors.StreamTask.Process() in /Users/oxid/code/kafka-streams-dotnet/core/Processors/StreamTask.cs:line 311
   at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Process(Int64 now) in /Users/oxid/code/kafka-streams-dotnet/core/Processors/Internal/TaskManager.cs:line 217

If I remove the windowing tasks and just leave an input/output Map steps, ObjectBs are properly written in the output topic, so the SerDes alone is working fine. But when the Windowing aggregator step is added the SerDes gets lost for the output object.

Modifying the ValueAndTimestampSerDes class to initialize the inner serdes causes seems to be working but it's not quite 100%, the output topic SerializationContext is still of the input and not the output.

InnerSerdes.Initialize(serDesContext);

How can I properly feed in the output schema SerDes and topic name to the below method:
public override byte[] Serialize(ValueAndTimestamp<V> data, SerializationContext context)

Checklist

"Streamiz.Kafka.Net" Version="1.1.5"
"Streamiz.Kafka.Net.SchemaRegistry.SerDes.Avro" Version="1.1.5"
Kafka docker image: confluentinc/cp-kafka:5.3.0
Schema registry: confluentinc/cp-schema-registry:6.0.0
.net 5.0 on MacOS

Suppression functionality

I was looking for the suppression over table functionality as in the Java version of Kafka Streams. Is it planned to be added in the future?

Restoration infinite loop when changelog topic is empty

Description

When you start a Kafka Streams app for the first time, with a statefull topology.
If one state store use logging (default true since 1.2.0), this state store restore his state indefinitely. So you can't process records for this subtopology.
See

How to reproduce

Use this topology with a real cluster (seems not working also with TopologyTestDriver but it's not the same problem) :

StreamBuilder builder = new StreamBuilder();

builder
    .Stream<string, string>("myTopic")
    .Filter((key, value) =>
    {
        return key == "1";
    })
    .To("tempTopic");

builder.Stream<string, string>("tempTopic")
    .GroupByKey()
    .Reduce(new MediumEventMessageReducer())
    .ToStream()
    .To("finalTopic");

var topology = builder.Build();

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • A code snippet with your topology builder (ex: builder.Stream<string, string>("topic").to("an-another-topic");)
  • Streamiz.Kafka.Net nuget version. 1.2.0
  • Apache Kafka version.
  • Client configuration.
  • Operating system.
  • Provide logs (with in debug mode (log4net and StreamConfig.Debug) as necessary in configuration).
  • Critical issue.

Examples for state store usage

Description

Hi,

Kafka Streams noob here, I couldn't find examples of how would I persist messages into and get them back out of the state store, in your documentation. Would appreciate some help!

Essentially I am looking for a C# equivalent of (from Confluent's website):

image

Most Processor classes in your library are internal, so I can't create derivatives like the one shown in the screenshot above, hence my question.

Cheers
Aman

Join KTable Processing

KTable-KTable implementation

  • Join KTable-KTable
  • LeftJoin KTable-KTable
  • OuterJoin KTable-KTable

Getting involved

I would like to get involved with this what is the best way to connect?

Operation not valid in state Ready

Description

I created an application to try your package, and from time to time I get this exception: Streamiz.Kafka.Net.Errors.StreamsException: 'Operation not valid in state Ready'
here is the log that might be helful:

170579 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-2 since 100,0099ms has elapsed (commit interval is 100ms)
170583 [data-ingestion-appkstreamtestin-stream-thread-1] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-1 since 100,0081ms has elapsed (commit interval is 100ms)
170586 [data-ingestion-appkstreamtestin-stream-thread-4] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,0041ms has elapsed (commit interval is 100ms)
170586 [data-ingestion-appkstreamtestin-stream-thread-3] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,0632ms has elapsed (commit interval is 100ms)
170588 [data-ingestion-appkstreamtestin-stream-thread-5] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,0215ms has elapsed (commit interval is 100ms)
170609 [data-ingestion-appkstreamtestin-stream-thread-7] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,0051ms has elapsed (commit interval is 100ms)
170615 [data-ingestion-appkstreamtestin-stream-thread-4] INFO Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[data-ingestion-appkstreamtestin-stream-thread-4] State transition from RUNNING to PARTITIONS_REVOKED
170617 [data-ingestion-appkstreamtestin-stream-thread-3] INFO Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[data-ingestion-appkstreamtestin-stream-thread-3] State transition from RUNNING to PARTITIONS_REVOKED
170617 [data-ingestion-appkstreamtestin-stream-thread-6] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 101,8294ms has elapsed (commit interval is 100ms)
170617 [data-ingestion-appkstreamtestin-stream-thread-7] INFO Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[data-ingestion-appkstreamtestin-stream-thread-7] State transition from RUNNING to PARTITIONS_REVOKED
170618 [data-ingestion-appkstreamtestin-stream-thread-4] INFO Streamiz.Kafka.Net.KafkaStream - stream-application[data-ingestion-appKStreamTestIn] State transition from RUNNING to REBALANCING
170620 [data-ingestion-appkstreamtestin-stream-thread-4] INFO Streamiz.Kafka.Net.Kafka.Internal.StreamsRebalanceListener - Partition revocation took 00:00:00.0052987 ms
        Current suspended active tasks:

170620 [data-ingestion-appkstreamtestin-stream-thread-3] INFO Streamiz.Kafka.Net.Kafka.Internal.StreamsRebalanceListener - Partition revocation took 00:00:00.0053174 ms
        Current suspended active tasks:

170620 [data-ingestion-appkstreamtestin-stream-thread-7] INFO Streamiz.Kafka.Net.Kafka.Internal.StreamsRebalanceListener - Partition revocation took 00:00:00.0025206 ms
        Current suspended active tasks:

170640 [data-ingestion-appkstreamtestin-stream-thread-0] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-0 since 100,0111ms has elapsed (commit interval is 100ms)
170686 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-2 since 100,0122ms has elapsed (commit interval is 100ms)
170687 [data-ingestion-appkstreamtestin-stream-thread-1] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-1 since 100,007ms has elapsed (commit interval is 100ms)
170690 [data-ingestion-appkstreamtestin-stream-thread-5] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,004ms has elapsed (commit interval is 100ms)
170719 [data-ingestion-appkstreamtestin-stream-thread-6] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,0039ms has elapsed (commit interval is 100ms)
170754 [data-ingestion-appkstreamtestin-stream-thread-0] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-0 since 100,0124ms has elapsed (commit interval is 100ms)
170787 [data-ingestion-appkstreamtestin-stream-thread-1] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-1 since 100,0117ms has elapsed (commit interval is 100ms)
170787 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-2 since 100,3638ms has elapsed (commit interval is 100ms)
170795 [data-ingestion-appkstreamtestin-stream-thread-5] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,0073ms has elapsed (commit interval is 100ms)
170808 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|2] Suspending
170814 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|2] Comitting
170820 [data-ingestion-appkstreamtestin-stream-thread-6] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,005ms has elapsed (commit interval is 100ms)
170822 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager - stream-task[0|2] Flushing all stores registered in the state manager
170835 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Kafka.Internal.RecordCollector - stream-task[0|2] Flusing producer
170841 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.Internal.RecordQueue - stream-task[0|2] - recordQueue [record-queue-KStreamTestIn-0-2]  cleared !
170841 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Kafka.Internal.RecordCollector - stream-task[0|2] Closing producer
170861 [data-ingestion-appkstreamtestin-stream-thread-0] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-0 since 100,0118ms has elapsed (commit interval is 100ms)
170891 [data-ingestion-appkstreamtestin-stream-thread-1] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-1 since 100,0116ms has elapsed (commit interval is 100ms)
170904 [data-ingestion-appkstreamtestin-stream-thread-5] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,005ms has elapsed (commit interval is 100ms)
170918 [data-ingestion-appkstreamtestin-stream-thread-2] ERROR Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[data-ingestion-appkstreamtestin-stream-thread-2] Encountered the following unexpected Kafka exception during processing, tis usually indicate Streams internal errors:
Confluent.Kafka.KafkaException: Operation not valid in state Ready
   at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
   at Confluent.Kafka.Consumer`2.Consume(TimeSpan timeout)
   at Streamiz.Kafka.Net.Crosscutting.KafkaExtensions.ConsumeRecords[K,V](IConsumer`2 consumer, TimeSpan timeout)
   at Streamiz.Kafka.Net.Processors.StreamThread.Run()
170968 [data-ingestion-appkstreamtestin-stream-thread-0] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-0 since 100,0092ms has elapsed (commit interval is 100ms)
170925 [data-ingestion-appkstreamtestin-stream-thread-6] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,7812ms has elapsed (commit interval is 100ms)
170996 [data-ingestion-appkstreamtestin-stream-thread-1] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks 0-1 since 100,0136ms has elapsed (commit interval is 100ms)
171005 [data-ingestion-appkstreamtestin-stream-thread-5] DEBUG Streamiz.Kafka.Net.Processors.StreamThread - Committing all active tasks  since 100,0077ms has elapsed (commit interval is 100ms)
171010 [data-ingestion-appkstreamtestin-stream-thread-2] INFO Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[data-ingestion-appkstreamtestin-stream-thread-2] Shutting down
171010 [data-ingestion-appkstreamtestin-stream-thread-2] INFO Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[data-ingestion-appkstreamtestin-stream-thread-2] State transition from RUNNING to PENDING_SHUTDOWN
171012 [data-ingestion-appkstreamtestin-stream-thread-2] INFO Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|2] Closing
171012 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|2] Suspending
171013 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.StreamTask - stream-task[0|2] Comitting
171013 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager - stream-task[0|2] Flushing all stores registered in the state manager
171013 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Kafka.Internal.RecordCollector - stream-task[0|2] Flusing producer
171038 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Processors.Internal.RecordQueue - stream-task[0|2] - recordQueue [record-queue-KStreamTestIn-0-2]  cleared !
171042 [data-ingestion-appkstreamtestin-stream-thread-2] DEBUG Streamiz.Kafka.Net.Kafka.Internal.RecordCollector - stream-task[0|2] Closing producer
171042 [data-ingestion-appkstreamtestin-stream-thread-2] ERROR Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[data-ingestion-appkstreamtestin-stream-thread-2] Failed to close stream thread due to the following error:
System.NullReferenceException: Object reference not set to an instance of an object.
   at Streamiz.Kafka.Net.Processors.StreamTask.Commit(Boolean startNewTransaction)
   at Streamiz.Kafka.Net.Processors.StreamTask.Suspend()
   at Streamiz.Kafka.Net.Processors.StreamTask.Close()
   at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Close()
   at Streamiz.Kafka.Net.Processors.StreamThread.Close(Boolean cleanUp)

How to reproduce

I don't know how to reproduce, but occurs fairly regularly.
my config

var config = new StreamConfig<StringSerDes, StringSerDes>
            {
                ApplicationId = "my-app" ,
                BootstrapServers = _bootstrapServers,
                NumStreamThreads = 2,
                Guarantee = ProcessingGuarantee.EXACTLY_ONCE,
                MessageSendMaxRetries = 100,
            };

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • A code snippet with your topology builder (ex: builder.Stream<string, string>("topic").to("an-another-topic");)
  • [1.0.0] Streamiz.Kafka.Net nuget version.
  • Apache Kafka version.
  • Client configuration.
  • Operating system.
  • Provide logs (with in debug mode (log4net and StreamConfig.Debug) as necessary in configuration).
  • Critical issue.

High cpu consumption if kafka is down

Description

I noticed that stream app consumes a lot of cpu if kafka is not accessible. It looks like confluent consumer ignores "timeout" parameter after throwing first "KafkaException" and returns 0 records immediately. That causes an infinite loop without any pauses.

image

I experimented a bit and it seems that upgrading "confluent-kafka-dotnet" package to the latest version changes the situation, but not very significantly. Instead of immediately raising CPU consumption we get the same effect in 3-5-10 minutes.

NuGet version: 1.1.5.
Kafka version: latest confluent kafka

How to reproduce

  1. Start kafka
  2. Start application
  3. Stop kafka
  4. Wait a bit

I believe it should have the same behavior for any topology

builder.Stream("test-input", new StringSerDes(), new StringSerDes())
                .To<StringSerDes, StringSerDes>("test-output");

Configuration:

            var streamConfig = new StreamConfig {
                ApplicationId = "someid",
                BootstrapServers = "localhost:9092",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                SchemaRegistryUrl = "http://localhost:8081",
                AutoRegisterSchemas = true
            };

            streamConfig.AddConsumerConfig("allow.auto.create.topics", "true");

            streamConfig.InnerExceptionHandler = (ex) => {
                logger.Error("Exception inside Kafka Streams", ex);
                return ExceptionHandlerResponse.CONTINUE;
            };

            streamConfig.DeserializationExceptionHandler = (context, consumed, ex) => {
                logger.Error("Exception at deserialization inside Kafka Streams", ex);
                return ExceptionHandlerResponse.FAIL;
            };

            streamConfig.ProductionExceptionHandler = (delivery) => {
                logger.Error("Exception at producing inside Kafka Streams");
                return ExceptionHandlerResponse.FAIL;
            };

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • A code snippet with your topology builder (ex: builder.Stream<string, string>("topic").to("an-another-topic");)
  • Streamiz.Kafka.Net nuget version.
  • Apache Kafka version.
  • Client configuration.
  • Operating system.
  • Provide logs (with in debug mode (log4net and StreamConfig.Debug) as necessary in configuration).
  • Critical issue.

Async api usage patterns

Now StartAsync returns Task which is an abstraction of starting: stream started => task completed.
And it leads to code like

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            using var stream = new KafkaStream(_topology, _config)
            await stream.StartAsync(stoppingToken);
            await Task.Delay(Timeout.Infinite, stoppingToken);
        }

If Task returned by StartAsync were abstraction of streaming as background activity, code could be like

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            using var stream = new KafkaStream(_topology, _config)
            await stream.StartAsync(stoppingToken);
        }

NullReferenceException when restoring topology with stream-stream join

Description

NullReferenceException when restoring topology with stream-stream join.

  • Code to reproduce the issue. Make sure at least 1 message in the stream consumed. Stop the test application and run it again so that the state will be restored, then the issue will happen.
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.Stream;

var config = new StreamConfig<StringSerDes, StringSerDes>
{
    ApplicationId = "reproduce-nullreferenceexception", 
    BootstrapServers = "127.0.0.1:9092",
    AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Earliest,
    Logger = LoggerFactory.Create(builder =>
    {
        builder.SetMinimumLevel(LogLevel.Debug);
        builder.AddConsole();
    })
};

StreamBuilder builder = new StreamBuilder();

var stream = builder.Stream<string, string>("topic1").SelectKey((k, v) => "key");

builder
    .Stream<string, string>("topic2")
    .SelectKey((k, v) => "key")
    .Join(
        stream,
        (s, v) => $"{s}-{v}",
        JoinWindowOptions.Of(TimeSpan.FromSeconds(10)))
    .To("output-join");

Topology t = builder.Build();

var kafkaStream = new KafkaStream(t, config);
await kafkaStream.StartAsync();

Console.ReadLine();
  • Streamiz.Kafka.Net nuget version
    1.2.1

  • Apache Kafka version
    any version

  • Client configuration
    ApplicationId = "reproduce-nullreferenceexception",
    BootstrapServers = "127.0.0.1:9092",
    AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Earliest,

  • Operating system
    Win10

  • logs

info: Streamiz.Kafka.Net.KafkaStream[0]
      stream-application[test-stream-stream-join] State transition from REBALANCING to RUNNING
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[test-stream-stream-join-stream-thread-0]  State is RUNNING, initializing and restoring tasks if necessary
info: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[1|0] Task 1-0 state transition from CREATED to RUNNING
info: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[0|0] Task 0-0 state transition from CREATED to RUNNING
dbug: Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager[0]
      Loaded offsets from checkpoint manager:
info: Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager[0]
      Initializing to the starting offset for changelog test-stream-stream-join-KSTREAM-JOINTHIS-0000000012-store-changelog [[0]] of in-memory state store KSTREAM-JOINTHIS-0000000012-store
info: Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager[0]
      Initializing to the starting offset for changelog test-stream-stream-join-KSTREAM-JOINOTHER-0000000013-store-changelog [[0]] of in-memory state store KSTREAM-JOINOTHER-0000000013-store
info: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[2|0] Task 2-0 state transition from CREATED to RESTORING
info: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[2|0] Restoration will start soon.
dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      State store test-stream-stream-join-KSTREAM-JOINTHIS-0000000012-store-changelog [[0]] metadata found (begin offset: 0 / end offset : 4)
dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      State store test-stream-stream-join-KSTREAM-JOINOTHER-0000000013-store-changelog [[0]] metadata found (begin offset: 0 / end offset : 4)
dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      Added partitions with offsets test-stream-stream-join-KSTREAM-JOINTHIS-0000000012-store-changelog-[0]#Beginning [-2],test-stream-stream-join-KSTREAM-JOINOTHER-0000000013-store-changelog-[0]#Beginning [-2] to the restore consumer, current assignment is test-stream-stream-join-KSTREAM-JOINOTHER-0000000013-store-changelog-[0],test-stream-stream-join-KSTREAM-JOINTHIS-0000000012-store-changelog-[0]
fail: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[test-stream-stream-join-stream-thread-0] Encountered the following error during processing:
      System.NullReferenceException: Object reference not set to an instance of an object.
         at Streamiz.Kafka.Net.ProcessorContext.get_Timestamp()
         at Streamiz.Kafka.Net.State.InMemory.InMemoryWindowStore.<>c__DisplayClass23_0.<Init>b__0(Bytes key, Byte[] value)
         at Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager.Restore(StateStoreMetadata storeMetadata, IEnumerable`1 records)
         at Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader.RestoreChangelog(ChangelogMetadata changelogMetadata)
         at Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader.Restore()
         at Streamiz.Kafka.Net.Processors.StreamThread.RestorePhase()
         at Streamiz.Kafka.Net.Processors.StreamThread.Run()
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[test-stream-stream-join-stream-thread-0] Shutting down
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
      stream-thread[test-stream-stream-join-stream-thread-0] State transition from RUNNING to PENDING_SHUTDOWN

How to reproduce

  1. Make sure kafka is running, Messages exist in Topic1 and Topic2
  2. Run the test application provided above, make sure at least 1 message in the streams are consumed.
  3. Stop the test application, make sure the JOIN state topics are generated in kafka.
  4. Re-run the test application, the exception will occur during the restore phase

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • A code snippet with your topology builder (ex: builder.Stream<string, string>("topic").to("an-another-topic");)
  • Streamiz.Kafka.Net nuget version.
  • Apache Kafka version.
  • Client configuration.
  • Operating system.
  • Provide logs (with in debug mode (log4net and StreamConfig.Debug) as necessary in configuration).
  • Critical issue.

Tumbling windowing causes Out of range exception

Description

Creating a tumbling window from a timestamp causes a value out of range exception in the Streamiz DateTimeExtensions class.

How to reproduce

Having the following stream:

StreamBuilder builder = new StreamBuilder();

builder.Stream<string, ObjectA, StringSerDes, SchemaAvroSerDes<ObjectA>>(_config.InputTopicName)
    .Map((key, value) => new KeyValuePair<string, ObjectA>(value.symbol, value))
    .GroupByKey()
    .WindowedBy(TumblingWindowOptions.Of(TimeSpan.FromMinutes(5)))
    .Aggregate<ObjectB, SchemaAvroSerDes<ObjectB>>(
        () => new ObjectB(),
        (key, ObjectA, ObjectB) => _ObjectBHelper.CreateObjectB(key, ObjectA, ObjectB))
    .ToStream()
    .Map((key, ObjectB) => new KeyValuePair<string, ObjectB>(key.Key, ObjectB))
    .To<StringSerDes, SchemaAvroSerDes<ObjectB>>(_config.OutputTopicName);

Will cause the exception:

Unhandled exception. Streamiz.Kafka.Net.Errors.StreamsException: Value to add was out of range. (Parameter 'value')
---> System.ArgumentOutOfRangeException: Value to add was out of range. (Parameter 'value')
at System.DateTime.Add(Double value, Int32 scale)
at System.DateTime.AddMilliseconds(Double value)
at Streamiz.Kafka.Net.Crosscutting.DateTimeExtensions.FromMilliseconds(Int64 epoch)
at Streamiz.Kafka.Net.Stream.Window..ctor(Int64 startMs, Int64 endMs)
at Streamiz.Kafka.Net.Stream.TimeWindowOptions.WindowsFor(Int64 timestamp)
at Streamiz.Kafka.Net.Processors.KStreamWindowAggregateProcessor4.Process(K key, V value) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Forward[K1,V1](K1 key, V1 value)
at Streamiz.Kafka.Net.Processors.KStreamMapProcessor4.Process(K key, V value) at Streamiz.Kafka.Net.Processors.SourceProcessor2.Process(K key, V value)
at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Process(Object key, Object value) at Streamiz.Kafka.Net.Processors.AbstractProcessor2.Process(ConsumeResult`2 record)
at Streamiz.Kafka.Net.Processors.StreamTask.Process()
at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Process(Int64 now)
at Streamiz.Kafka.Net.Processors.StreamThread.Run()
--- End of inner exception stack trace ---
at Streamiz.Kafka.Net.Processors.StreamThread.TreatException(Exception exception)
at Streamiz.Kafka.Net.Processors.StreamThread.Run()
at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location ---
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.ThreadHelper.ThreadStart()

I was able to fix the issue by modifying the FromMilliseconds method like follows:

public static DateTime FromMilliseconds(this long epoch)
{
//return Jan1St1970.AddMilliseconds(epoch);
return new DateTime(epoch);
}

Basically, the long epoch is the ticks of the StartMS DateTime and not a ticks delta as the existing implementation expects.

Checklist

"Streamiz.Kafka.Net" Version="1.1.5"
"Streamiz.Kafka.Net.SchemaRegistry.SerDes.Avro" Version="1.1.5"
Kafka docker image: confluentinc/cp-kafka:5.3.0
Schema registry: confluentinc/cp-schema-registry:6.0.0
.net 5.0 on MacOS

High CPU usage, seen on Linux

Description

Observed high COU usage when streams starts.

How to reproduce

StreamBuilder builder = new StreamBuilder();
var config = GetConfig();
builder.Stream<string, string, StringSerDes, StringSerDes>(inputTopic)
.MapValues((v) => SaveAndCompare(v))
.Filter((k, v) => string.IsNullOrEmpty(v) == false)
.To<StringSerDes, StringSerDes>(outputTopic);

        Topology t = builder.Build();
        KafkaStream stream = new KafkaStream(t, config);
        stream.Start(cancellationToken);
        while (!cancellationToken.IsCancellationRequested)
        {
            await Task.Delay(1000, cancellationToken);
        }
        stream.Close();
        IsClosed = true;
        _logger.Info("ByHost compare is closed.");

How to know if the kafka stream is ready to starting consuming

Description

Hi!
We are making use of kafka-streams-dotnet library in a dotnet with a pretty simple config:

var streamBuilder = new StreamBuilder();
            streamBuilder.Stream<string, byte[]>(topic)
                .Foreach((k, v) =>
                {
                    // doing some async process with the message consumed (storing to a postgres table)
                });

            var topology = streamBuilder.Build();
            var streamConfig = new StreamConfig<StringSerDes, ByteArraySerDes>
            {
                ApplicationId = "test-app",
                BootstrapServers = bootstrapServer,
                NumStreamThreads = 1
            };
            var stream = new KafkaStream(topology, streamConfig);

            Console.CancelKeyPress += (o, e) => { stream.Dispose(); };

            await stream.StartAsync();

We are trying to test this consumer using test containers and xunit. We are successfully creating the kafka test container but we are facing some issues when consuming messages with the stream. These tests are producing some messages and the expectation is to see the process of them being consumed, but looks like the message produced by the first test is not being consumed.

While taking a look at the code I'm seeing logs like the following one:

1383 [test-app-stream-thread-0] INFO Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[test-app-stream-thread-0] State transition from STARTING to PARTITIONS_ASSIGNED

// other logs related to the execution of the test

1436 [test-app-stream-thread-0] INFO Streamiz.Kafka.Net.Processors.StreamThread - stream-thread[test-app-stream-thread-0] State transition from PARTITIONS_ASSIGNED to RUNNING

And this leads me to think that maybe this first tests is running before the kafka stream is actually running and ready to start consuming. Worth to mention that the following messages produced by the tests are being consumed with no problem.

Question is: does my assumption sound correct? if so, is there a way to know that the stream is ready to consume messages? I'm also new working with dotnet and would appreciate any thoughts on how to make the tests wait until the stream is ready.

Thank you in advance!

Not possible to join two streams with complex key type

Description

Hello! Looks like it's impossible to join two streams with complex key types.

I tried to do so and got an exception
[StreamsException] The serializer is not compatible to the actual key (Key type: Program.UserKey). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters(using the DSL)

I debugged through Streamiz.Kafka.Net library and found this:
https://github.com/LGouellec/kafka-streams-dotnet/blob/develop/core/Stream/StreamJoinProps.cs#L221
Looks like Key and Value SerDes are ignored when StreamJoinProps is being built from another StreamJoinProps (and that operations is performed during the Join node building

Nuget version: 1.1.3

How to reproduce

class UserKey
{
    public string Id { get; set; }
    public string CustomerId { get; set; }
}

IKStream<UserKey, object> stream1;
IKStream<UserKey, object> stream2;

stream1.Join(
    stream2,
    (entry1, entry) => new { entry1, entry },
    JoinWindowOptions.Of(TimeSpan.FromMinutes(1)),
    StreamJoinProps.With(
        keySerde: new JsonSerializer<UserKey>(),
        valueSerde: new JsonSerializer<object>(),
        otherValueSerde: new JsonSerializer<object>()
    )
)

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • A code snippet with your topology builder (ex: builder.Stream<string, string>("topic").to("an-another-topic");)
  • Streamiz.Kafka.Net nuget version.
  • Apache Kafka version.
  • Client configuration.
  • Operating system.
  • Provide logs (with in debug mode (log4net and StreamConfig.Debug) as necessary in configuration).
  • Critical issue.

ArgumentException after multiple produce authorization exceptions.

ArgumentException after multiple produce authorization exceptions.

Service had taken the full cpu core after this exception:

[ArgumentException] An item with the same key has already been added. Key: topic-name [[0]]
at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
   at Confluent.Kafka.Consumer`2.Consume(TimeSpan timeout)
   at Streamiz.Kafka.Net.Crosscutting.KafkaExtensions.ConsumeRecords[K,V](IConsumer`2 consumer, TimeSpan timeout, Int64 maxRecords)
   at Streamiz.Kafka.Net.Processors.StreamThread.Run()

After multiple produce fails:

[StreamsException] Error encountered trying to send record to topic topic-name [stream-task[0|2] ] : Broker: Topic authorization failed
at Streamiz.Kafka.Net.Kafka.Internal.RecordCollector.Send[K,V](String topic, K key, V value, Headers headers, Int64 timestamp, ISerDes`1 keySerializer, ISerDes`1 valueSerializer)
   at Streamiz.Kafka.Net.Processors.SinkProcessor`2.Process(K key, V value)
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward(K key, V value)
   at Streamiz.Kafka.Net.Processors.KStreamPeekProcessor`2.Process(K key, V value)
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward[K1,V1](K1 key, V1 value)
   at Streamiz.Kafka.Net.Processors.KStreamKTableJoinProcessor`5.Process(K1 key, V1 value)
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward(K key, V value)
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward[K1,V1](K1 key, V1 value)
   at Streamiz.Kafka.Net.Processors.KStreamMapProcessor`4.Process(K key, V value)
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward(K key, V value)
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward[K1,V1](K1 key, V1 value)
   at Streamiz.Kafka.Net.Processors.KStreamMapProcessor`4.Process(K key, V value)
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward[K1,V1](K1 key, V1 value)
   at Streamiz.Kafka.Net.Processors.Internal.TimestampedTupleForwarder`2.MaybeForward(K key, V newValue, V oldValue, Int64 timestamp)
   at Streamiz.Kafka.Net.Processors.KStreamWindowAggregateProcessor`4.Process(K key, V value)
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward(K key, V value)
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward[K1,V1](K1 key, V1 value)
   at Streamiz.Kafka.Net.Processors.KStreamMapProcessor`4.Process(K key, V value)
   at Streamiz.Kafka.Net.Processors.SourceProcessor`2.Process(K key, V value)
   at Streamiz.Kafka.Net.Processors.StreamTask.Process()
   at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Process(Int64 now)
   at Streamiz.Kafka.Net.Processors.StreamThread.Run()

This is far from a fatal bug. More FYI.

Add metrics

You have in your TODO list adding metrics which is a feature that I need.
Would you kindly give me advice on how to make this happen? I will file a pull request once I have something.

SchemaAvroSerDes<T> is not initialized with TopologyTestDriver

Description

I got "SchemaAvroSerDes is not initialized !" message after pipe a new avro message using the TopologyTestDriver. After checking the source code I saw that the SchemaAvroSerDes.Initialize method is nerver called.

How to reproduce

   var config = new StreamConfig();
   config.ApplicationId = "app-test";
   config.AutoOffsetReset = AutoOffsetReset.Earliest;
   config.NumStreamThreads = 1;

   config.Acks = Acks.All;
   config.AddConsumerConfig("allow.auto.create.topics", "false");
   config.MaxTaskIdleMs = 50;

   StreamBuilder builder = new StreamBuilder();

   var ss = builder.Stream<string, Order, StringSerDes, SchemaAvroSerDes<Order>>("test-topic")
   .Peek((k, v) =>
   {
     Console.WriteLine($"Order #  {v.Number }");
   });
   Topology t = builder.Build();

   using (var driver = new TopologyTestDriver(t, config))
   {
       var inputTopic = driver.CreateInputTopic<string, Order, StringSerDes, SchemaAvroSerDes<Order>>("test-topic");
       inputTopic.PipeInput("test",  new Order { Number = "sdsdsds" } ); // <---- ERROR MESSAGE HERE
   }

Streamiz.Kafka.Net: 1.1.4-RC1
Kafka 2.7.0
Win 10.

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • A code snippet with your topology builder (ex: builder.Stream<string, string>("topic").to("an-another-topic");)
  • Streamiz.Kafka.Net nuget version.
  • Apache Kafka version.
  • Client configuration.
  • Operating system.
  • Provide logs (with in debug mode (log4net and StreamConfig.Debug) as necessary in configuration).
  • Critical issue.

Upgrade to .net 5

Upgrade to .net 5 (core projects + samples + unit test + cross projects)

Continuously Rising CPU usage by Kafka streams

This issues is related to another one raised earlier on this package:
https://github.com/LGouellec/kafka-streams-dotnet/issues/43

We are making use of kafka-streams-dotnet library in a dotnet core microservices based architecture. We have noticed a peculiar behaviour where our pods running streamer applications always end up consuming 100% of CPU resources allocated to them, sometimes even when they are idling and no processing is happening.

Our question here is whether this is the intended behaviour with the streams implementation, since the stream threads are constantly polling the source topics. If not could you help us with the configurations that we have to use to optimise CPU usage.

The streams topology that we are using is :

Streamiz.kafka.net version being used : 1.1.3

Operating system : Linux (Kubernetes node)

var builder = new StreamBuilder();
            builder.Stream<string, string, StringSerDes, StringSerDes>(_streamOptions.Value.SourceTopic, _timestampExtractor)
                .MapValues((key, value) => _messageProcessor.ProcessMessage(value))
                .Filter((key, value) => CheckThis(key,value))
                .To(_streamOptions.Value.SinkTopic);

Broker: Unknown topic or partition

Description

I tried to run a simple sample application with a confluent cloud cluster (first kafka-streams-dotned try for me). When starting there appears a StreamsException: Unknown topic or partition.

image

As you can see the necessary topics do exist:
image
image

Perhaps there is a simple solution, but I don't have anymore ideas :-(

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.