Comments (7)
Side remark: Why do you call store = personStoreSupplier.build();
?
Interactive Queries are described here: https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-key-value-stores
from kafka-streams-examples.
Yes ! In the documentation i see the "get" method on a store. I need that method to respond to HTTP GET requests :
@Service
@ConfigurationProperties(prefix = "person")
public class PersonService8 extends BsccServiceKafka<String, Long> implements PersonService {
Logger journal = LoggerFactory.getLogger(PersonService8.class);
Producer<String, PersonAvro> producer;
ReadOnlyKeyValueStore<String, CustomerAvro> store;
@PostConstruct
private void init() {
configurer();
producer = new KafkaProducer<>(props);
journal.info("Open topic {}...", kafkaTopic);
StreamsBuilder builder = new StreamsBuilder();
Topology topology = builder.build();
Serde<CustomerAvro> serdeCustomer = new SpecificAvroSerde<>();
StoreBuilder<KeyValueStore<String, CustomerAvro>> personStoreSupplier = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("personProcessStore"), Serdes.String(),
Serdes.serdeFrom(serdeCustomer.serializer(), serdeCustomer.deserializer()));
topology.addSource("person$", kafkaTopic)
.addProcessor("selection", PersonProcessor2::new, "person$")
.addStateStore(personStoreSupplier, "selection");
store = personStoreSupplier.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
}
public Mono<CustomerAvro> get(int id) {
journal.debug("Recherche customer by id: {}", id);
return Mono.just(id)
.map(String::valueOf)
.map(store::get)
.onErrorResume(e -> Mono.empty());
}
...
So i build the "store" attribut of the PersonService8 service in his PersonService8::init method in order to have it in the PersonService8::get method.
But there is something wrong because when i execute that code, i got that error :
org.apache.kafka.streams.errors.InvalidStateStoreException: Store personProcessStore is currently closed
Thanks a lot for help.
from kafka-streams-examples.
Instead of building the store, i tried to find it by this way
@PostConstruct
private void init() throws InterruptedException {
configurer();
producer = new KafkaProducer<>(props);
journal.info("Open topic {}...", kafkaTopic);
StreamsBuilder builder = new StreamsBuilder();
Topology topology = builder.build();
Serde<CustomerAvro> serdeCustomer = new SpecificAvroSerde<>();
StoreBuilder<KeyValueStore<String, CustomerAvro>> personStoreSupplier = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("personProcessStore"), Serdes.String(),
Serdes.serdeFrom(serdeCustomer.serializer(), serdeCustomer.deserializer()));
topology.addSource("person$", kafkaTopic)
.addProcessor("selection", PersonProcessor2::new, "person$")
.addStateStore(personStoreSupplier, "selection");
// store = personStoreSupplier.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
store = waitUntilStoreIsQueryable("personProcessStore", streams);
}
public ReadOnlyKeyValueStore<K, V> waitUntilStoreIsQueryable(final String storeName, final KafkaStreams streams)
throws InterruptedException {
journal.info("Attente store ...");
while (true) {
try {
return streams.store(storeName, QueryableStoreTypes.<K, V>keyValueStore());
} catch (InvalidStateStoreException ignored) {
// Journalisation
journal.info(String.format("topic(%s) appid(%s): %s", kafkaTopic, kafkaAppid, ignored.getMessage()));
// store not yet ready for querying
Thread.sleep(500);
}
}
}
But KeyValueStore::put method invocation throws that error :
14:19:25.063 DEBUG f.l.b.e.c.person.PersonProcessor2 - traitement objet: 0, 2
14:19:25.063 DEBUG f.l.b.e.c.person.PersonProcessor2 - enregistrement message: key:0, value:{"id": "0", "compName": "island", "add3": "1183 Walnut Court", "add4": "1797 Schwallie Crescent", "add5": "911 Eller Ridge", "add6": "1311 Langan Square", "add7": "1430 Lithopolis Ridge", "countryCode": "handled", "active": "2", "siretCode": "should"}
14:19:26.432 INFO f.l.b.e.EncaissementApplication - Started EncaissementApplication in 6.084 seconds (JVM running for 7.275)
14:20:06.191 ERROR o.a.k.s.p.internals.AssignedTasks - stream-thread [pr_loc_u0_person-740dd4e3-b3e1-41be-9aeb-8d9386331280-StreamThread-1] Failed to process stream task 0_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=person$, topic=pr_loc_person_priv_v1, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:235)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:404)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:318)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException: null
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:68)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:199)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:121)
at fr.laposte.bscc.encaissement.coclico.person.PersonProcessor2.lambda$5(PersonProcessor2.java:44)
at java.util.Optional.ifPresent(Optional.java:159)
at fr.laposte.bscc.encaissement.coclico.person.PersonProcessor2.process(PersonProcessor2.java:42)
at fr.laposte.bscc.encaissement.coclico.person.PersonProcessor2.process(PersonProcessor2.java:1)
I don't understand what's wrong :-(
from kafka-streams-examples.
The Avro Serde need to be configured. Because, you create the Serde object in your code (Serde<CustomerAvro> serdeCustomer = new SpecificAvroSerde<>();
), what is absolutely fine, you also need to configure the Serde in your code and call serdeCustomer.configure()
to pass in schema registry information.
from kafka-streams-examples.
@dahuchao Seems this issue is resolved. Closing for now. Feel free to reopen.
from kafka-streams-examples.
I'm using the same technique to create anew StateStore<Strinng, CustomObject> where CustomObject is Avro and still see exception: Error deserializing Avro message for id -1.
Any working example to share?
from kafka-streams-examples.
Can you share a code snippet how you create the store and serdes?
from kafka-streams-examples.
Related Issues (20)
- DeduplicationTransformer example only supports 1 partition HOT 2
- [BUG] microservices-orders: unknown command "READ" for "ccloud kafka acl create" HOT 3
- how to Packaging and running the Application Examples HOT 1
- GlobalKTablesAndStoresExampleDriver hangs with GlobalKTablesExample HOT 5
- How to create Kafka Server with org.apache.kafka:kafka_2.13:test:2.7.1 HOT 1
- Naming when adding global store HOT 4
- Do we have example on Kafka Stream Processor API to handle Smart routing case HOT 1
- ditch awaits in docker compose file
- Gitpodify so people can easily run the examples with 0 setup
- Do we have example to handle processor exception.
- Join operation is not working after update to 1.2.0 HOT 1
- Is The class of DeserializationExceptionHandler support construct with parameters HOT 1
- Consistency between inventory KTable/Topic and state store "reservedStock" for "shipped" event HOT 1
- Regarding Sum example HOT 1
- Can't find the package io.confluent.examples.streams.avro in src folder HOT 1
- How to suppress window using wall clock time instead of event time in Kafka streams? HOT 1
- Can't load library when running AnomalyDetectionLambdaExample HOT 2
- Case: 100+ KafkaStreams threads on 3.5k+ topics/partitions with exactly_once_v2 guarantee HOT 8
- Event driven order
- Kafka Streams ERROR Could not parse Avro schema
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-streams-examples.