Comments (9)
This is typically caused by invalid data fed to avsc
(encoding mismatch, data prefix, ...). It's hard to say more without data to reproduce it.
from avsc.
I created a sample schema and message that produced the same error.
{
"fields": [
{
"name": "id",
"type": [
"null",
"long"
]
},
{
"name": "name",
"type": [
"null",
{
"avro.java.string": "String",
"type": "string"
}
]
},
{
"name": "email",
"type": [
"null",
{
"avro.java.string": "String",
"type": "string"
}
]
},
{
"name": "timestamp",
"type": [
"null",
"long"
]
},
{
"name": "items",
"type": [
"null",
{
"items": {
"fields": [
{
"name": "name",
"type": [
"null",
{
"avro.java.string": "String",
"type": "string"
}
]
},
{
"name": "price",
"type": [
"null",
"double"
]
}
],
"name": "Item",
"type": "record"
},
"type": "array"
}
]
}
],
"name": "User",
"namespace": "org.example.test.testkafka",
"type": "record"
}
And a Java Kafka producer
package org.example.test.testkafka;// Import libraries
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Service
public class KafkaProducerExample {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.client-id}")
private String clientId;
@Value("${kafka.username}")
private String username;
@Value("${kafka.password}")
private String password;
@Value("${kafka.schema-registry-url}")
String schemaRegistryUrl;
@Value("${kafka.schema-registry-user-info}")
String schemaRegistryUserInfo;
public void test(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put("basic.auth.user.info", schemaRegistryUserInfo);
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("schema.registry.url", schemaRegistryUrl);
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";");
try (Producer<String, GenericRecord> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props)) {
// Create a User record
User user = new User();
user.setId(1L);
user.setName("John Doe");
user.setEmail("[email protected]");
user.setTimestamp(System.currentTimeMillis());
// Create an Item record
Item item = new Item();
item.setName("Product A");
item.setPrice(19.99);
user.setItems(Collections.singletonList(item));
// Produce the User record to Kafka
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("example", null, user);
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
}
}
{
"id": 1,
"name": "John Doe",
"email": "[email protected]",
"timestamp": 1709667230821,
"items": [
{
"name": "Product A",
"price": 19.99
}
]
}
from avsc.
The producer code doesn't have enough information; we need to see what data avsc
tries to decode. Can you share the value of message
in your example?
from avsc.
I had already shared, I am reposting again here
{
"id": 1,
"name": "John Doe",
"email": "[email protected]",
"timestamp": 1709667230821,
"items": [
{
"name": "Product A",
"price": 19.99
}
]
}
from avsc.
That doesn't look like message
, it doesn't have the right fields:
async function handleMessage(message) { // We need this message
try {
const messageAsString = JSON.stringify(message);
const messageObject = JSON.parse(messageAsString);
const avroMessage = messageObject.message.value;
const buffer = Buffer.from(avroMessage.data); // So we can see the buffer avsc sees
let type = avro.Type.forSchema(yourAvroSchema);
let decoded = type.fromBuffer(buffer);
// ...
from avsc.
I added some console.log in between as follows:-
const messageAsString = JSON.stringify(message);
const messageObject = JSON.parse(messageAsString);
console.log('messageObject:', messageObject);
const bufferData = Buffer.from(messageObject.message.value.data);
const hexString = bufferData.toString('hex');
const formattedString = hexString.match(/../g).join(' ');
const decodedString = formattedString.toString("utf-8");
const buffer2 = Buffer.from(decodedString)
const decodedString2 = buffer2.toString("utf-8");
console.log(decodedString2);
let decoded = type.fromBuffer(bufferData);
And the output is
node consume-schema-topic.js
messageObject: {
topic: 'example',
partition: 0,
message: {
magicByte: 2,
attributes: 0,
timestamp: '1711541805856',
offset: '0',
key: null,
value: { type: 'Buffer', data: [Array] },
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '0',
firstTimestamp: '1711541805856',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 0,
producerId: '95946902',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1711541805856',
timestampType: 0,
magicByte: 2
}
}
}
00 00 01 87 1b 02 02 02 10 4a 6f 68 6e 20 44 6f 65 02 28 6a 6f 68 6e 2e 64 6f 65 40 65 78 61 6d 70 6c 65 2e 63 6f 6d 02 d6 fa ba fd cf 63 02 02 02 12 50 72 6f 64 75 63 74 20 41 02 3d 0a d7 a3 70 fd 33 40 00
{"level":"ERROR","timestamp":"2024-03-27T12:19:23.782Z","logger":"kafkajs","message":"[Runner] Error when calling eachMessage","topic":"example","partition":0,"offset":"0","stack":"TypeError: Right-hand side of 'instanceof' is not an object\n at Runner.handleMessage [as eachMessage] (C:\\Users\\myuserid\\testkafka\\consume-schema-topic.js:166:13)\n at Runner.processEachMessage (C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\runner.js:231:20)\nat onBatch (C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\runner.js:447:20)\n at Runner.handleBatch (C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\runner.js:461:11)\n at handler (C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\runner.js:58:30)\n at C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\worker.js:29:15\n at Object.run (C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\utils\\sharedPromiseTo.js:14:17)\n at C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\workerQueue.js:27:38\n at Array.forEach (<anonymous>)\n at Object.push (C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\workerQueue.js:27:13)","error":{}}{"level":"ERROR","timestamp":"2024-03-27T12:19:28.065Z","logger":"kafkajs","message":"[Consumer] Crash: KafkaJSNonRetriableError: Right-hand side of 'instanceof' is not an object","groupId":"microservice1-consumer-group","stack":"KafkaJSNonRetriableError: Right-hand side of 'instanceof' is not an object\n at C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\retry\\index.js:55:18\nat process.processTicksAndRejections (node:internal/process/task_queues:95:5)"}{"level":"INFO","timestamp":"2024-03-27T12:19:28.135Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"microservice1-consumer-group"}
C:\Users\myuserid\testkafka>
from avsc.
Thanks, that's helpful. It looks like the system you are using is adding a prefix to the data, you'll need to strip it before decoding. In this example, it's 5 bytes:
> b = Buffer.from(/* above data: 000001871b... */, 'hex')
> t = avro.Type.forSchema(/* above schema */)
> t.fromBuffer(b.subarray(5))
User {
id: 1,
name: 'John Doe',
email: '[email protected]',
timestamp: 1711541804715,
items: [ Item { name: 'Product A', price: 19.99 } ]
}
from avsc.
Thanks @mtth. This is working as per suggestion. Just that I am not sure if it is always starting from 5.
from avsc.
I'm afraid this is out of scope of avsc
. You'll need to dig into the enclosing system setup to figure out the prefix' properties.
from avsc.
Related Issues (20)
- can schema support dynamic keys? HOT 1
- Extending a schema causes a "truncated buffer" error when using fromBuffer HOT 5
- Invalid Avro header does not raise error event HOT 1
- "new SlowBuffer" is deprecated since Node v.6 --> cannot use it with VITE5 and VUE3 HOT 3
- Doesn't handle trailing 0s from buffer. HOT 1
- Unwrapping unions when deserialising HOT 4
- Using type.isValid() with a union of records HOT 3
- long encoding/decoding is not reversible for some large but safe js ints HOT 1
- Convert String Representations into Logical Type HOT 1
- Which version of the Avro specification does the latest version of avsc (5.7.7) implement? HOT 1
- Not being able to use records that use the "bytes" type field HOT 2
- Using seprately declared enum in union in record. HOT 4
- Avro.types.LogicalType's _copy implementation HOT 1
- "Error: trailing data" when using custom Long type in Avro with KafkaJS Confluent Schema Registry HOT 1
- Missing default value in avro scheam HOT 2
- Handling logical types HOT 1
- Dynamic schema loading HOT 8
- Data transformation before serialization HOT 8
- Zstd compression example HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from avsc.