Giter VIP home page Giter VIP logo

Comments (7)

mjsax avatar mjsax commented on May 9, 2024

Side remark: Why do you call store = personStoreSupplier.build();?

Interactive Queries are described here: https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-key-value-stores

from kafka-streams-examples.

dahuchao avatar dahuchao commented on May 9, 2024

Yes ! In the documentation i see the "get" method on a store. I need that method to respond to HTTP GET requests :

@Service
@ConfigurationProperties(prefix = "person")
public class PersonService8 extends BsccServiceKafka<String, Long> implements PersonService {
	Logger journal = LoggerFactory.getLogger(PersonService8.class);
	Producer<String, PersonAvro> producer;
	ReadOnlyKeyValueStore<String, CustomerAvro> store;

	@PostConstruct
	private void init() {
		configurer();
		producer = new KafkaProducer<>(props);
		journal.info("Open topic {}...", kafkaTopic);
		StreamsBuilder builder = new StreamsBuilder();
		Topology topology = builder.build();

		Serde<CustomerAvro> serdeCustomer = new SpecificAvroSerde<>();

		StoreBuilder<KeyValueStore<String, CustomerAvro>> personStoreSupplier = Stores.keyValueStoreBuilder(
				Stores.persistentKeyValueStore("personProcessStore"), Serdes.String(),
				Serdes.serdeFrom(serdeCustomer.serializer(), serdeCustomer.deserializer()));
		topology.addSource("person$", kafkaTopic)
				.addProcessor("selection", PersonProcessor2::new, "person$")
				.addStateStore(personStoreSupplier, "selection");
		store = personStoreSupplier.build();
		KafkaStreams streams = new KafkaStreams(topology, props);
		streams.start();
	}

	public Mono<CustomerAvro> get(int id) {
		journal.debug("Recherche customer by id: {}", id);
		return Mono.just(id)
				.map(String::valueOf)
				.map(store::get)
				.onErrorResume(e -> Mono.empty());
	}
...

So i build the "store" attribut of the PersonService8 service in his PersonService8::init method in order to have it in the PersonService8::get method.

But there is something wrong because when i execute that code, i got that error :

org.apache.kafka.streams.errors.InvalidStateStoreException: Store personProcessStore is currently closed

Thanks a lot for help.

from kafka-streams-examples.

dahuchao avatar dahuchao commented on May 9, 2024

Instead of building the store, i tried to find it by this way

	@PostConstruct
	private void init() throws InterruptedException {
		configurer();
		producer = new KafkaProducer<>(props);
		journal.info("Open topic {}...", kafkaTopic);
		StreamsBuilder builder = new StreamsBuilder();
		Topology topology = builder.build();
		Serde<CustomerAvro> serdeCustomer = new SpecificAvroSerde<>();
		StoreBuilder<KeyValueStore<String, CustomerAvro>> personStoreSupplier = Stores.keyValueStoreBuilder(
				Stores.persistentKeyValueStore("personProcessStore"), Serdes.String(),
				Serdes.serdeFrom(serdeCustomer.serializer(), serdeCustomer.deserializer()));
		topology.addSource("person$", kafkaTopic)
				.addProcessor("selection", PersonProcessor2::new, "person$")
				.addStateStore(personStoreSupplier, "selection");
//		store = personStoreSupplier.build();
		KafkaStreams streams = new KafkaStreams(topology, props);
		streams.start();
		store = waitUntilStoreIsQueryable("personProcessStore", streams);
	}
	public ReadOnlyKeyValueStore<K, V> waitUntilStoreIsQueryable(final String storeName, final KafkaStreams streams)
			throws InterruptedException {
		journal.info("Attente store ...");
		while (true) {
			try {
				return streams.store(storeName, QueryableStoreTypes.<K, V>keyValueStore());
			} catch (InvalidStateStoreException ignored) {
				// Journalisation
				journal.info(String.format("topic(%s) appid(%s): %s", kafkaTopic, kafkaAppid, ignored.getMessage()));
				// store not yet ready for querying
				Thread.sleep(500);
			}
		}
	}

But KeyValueStore::put method invocation throws that error :

14:19:25.063 DEBUG f.l.b.e.c.person.PersonProcessor2 - traitement objet: 0, 2
14:19:25.063 DEBUG f.l.b.e.c.person.PersonProcessor2 - enregistrement message: key:0, value:{"id": "0", "compName": "island", "add3": "1183 Walnut Court", "add4": "1797 Schwallie Crescent", "add5": "911 Eller Ridge", "add6": "1311 Langan Square", "add7": "1430 Lithopolis Ridge", "countryCode": "handled", "active": "2", "siretCode": "should"}
14:19:26.432 INFO  f.l.b.e.EncaissementApplication - Started EncaissementApplication in 6.084 seconds (JVM running for 7.275)
14:20:06.191 ERROR o.a.k.s.p.internals.AssignedTasks - stream-thread [pr_loc_u0_person-740dd4e3-b3e1-41be-9aeb-8d9386331280-StreamThread-1] Failed to process stream task 0_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=person$, topic=pr_loc_person_priv_v1, partition=0, offset=0
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:235)
	at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:404)
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:318)
	at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException: null
	at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
	at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
	at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
	at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
	at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:68)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57)
	at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:199)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:121)
	at fr.laposte.bscc.encaissement.coclico.person.PersonProcessor2.lambda$5(PersonProcessor2.java:44)
	at java.util.Optional.ifPresent(Optional.java:159)
	at fr.laposte.bscc.encaissement.coclico.person.PersonProcessor2.process(PersonProcessor2.java:42)
	at fr.laposte.bscc.encaissement.coclico.person.PersonProcessor2.process(PersonProcessor2.java:1)

I don't understand what's wrong :-(

from kafka-streams-examples.

mjsax avatar mjsax commented on May 9, 2024

The Avro Serde need to be configured. Because, you create the Serde object in your code (Serde<CustomerAvro> serdeCustomer = new SpecificAvroSerde<>();), what is absolutely fine, you also need to configure the Serde in your code and call serdeCustomer.configure() to pass in schema registry information.

from kafka-streams-examples.

mjsax avatar mjsax commented on May 9, 2024

@dahuchao Seems this issue is resolved. Closing for now. Feel free to reopen.

from kafka-streams-examples.

jdang67 avatar jdang67 commented on May 9, 2024

I'm using the same technique to create anew StateStore<Strinng, CustomObject> where CustomObject is Avro and still see exception: Error deserializing Avro message for id -1.
Any working example to share?

from kafka-streams-examples.

mjsax avatar mjsax commented on May 9, 2024

Can you share a code snippet how you create the store and serdes?

from kafka-streams-examples.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.