Comments (2)
- Create a stream against a DLQ topic
CREATE STREAM dlq_bigquery_1 ( key_col VARCHAR KEY, value_col VARCHAR, err_msg ARRAY<STRUCT<key STRING, value BYTES>> HEADERS ) WITH ( WRAP_SINGLE_VALUE=false, kafka_topic = 'dlq-lcc-gqx2qn', value_format = 'json' );
- Create another stream to hold DLQ messages from many other DLQ topics:
CREATE STREAM all_dlq (key_col VARCHAR KEY, value_col VARCHAR, err_msg ARRAY<STRUCT<key STRING, value STRING>>) WITH (kafka_topic='all_dlq', partitions=1, value_format='json');
- Insert each DLQ topics to the all_dlq stream:
INSERT INTO all_dlq SELECT key_col, value_col, TRANSFORM(err_msg, x => Struct(key := x->key, value := FROM_BYTES(x->value, 'base64'))) as err_msg FROM dlq_bigquery_1;
- Sink to bigquery
CREATE SINK CONNECTOR ...
from kafka-tutorials.
A sample DLQ input record produced by the S3 sink connector:
Value of the record:
{ "ordertime": 1497014222380, "orderid": 18, "itemid": "Item_184", "address": { "city": "Mountain View", "state": "CA", "zipcode": 94041 } }
Header of the record:
[ { "key": "__connect.errors.topic", "stringValue": "pksqlc-okr9jACCOMPLISHED_FEMALE_READERS" }, { "key": "__connect.errors.partition", "stringValue": "0" }, { "key": "__connect.errors.offset", "stringValue": "4957217" }, { "key": "__connect.errors.connector.name", "stringValue": "lcc-gqx2qn" }, { "key": "__connect.errors.task.id", "stringValue": "0" }, { "key": "__connect.errors.stage", "stringValue": "VALUE_CONVERTER" }, { "key": "__connect.errors.class.name", "stringValue": "io.confluent.connect.json.JsonSchemaConverter" }, { "key": "__connect.errors.exception.class.name", "stringValue": "org.apache.kafka.connect.errors.DataException" }, { "key": "__connect.errors.exception.message", "stringValue": "Converting byte[] to Kafka Connect data failed due to serialization error of topic pksqlc-okr9jACCOMPLISHED_FEMALE_READERS: " }, { "key": "__connect.errors.exception.stacktrace", "stringValue": "org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic pksqlc-okr9jACCOMPLISHED_FEMALE_READERS: \n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:119)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:500)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:500)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:475)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:233)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)\n\tat java.base/java.lang.Thread.run(Thread.java:831)\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:176)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:231)\n\tat io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:165)\n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:108)\n\t... 17 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:111)\n\t... 20 more\n" } ]
Key of the record:
18
from kafka-tutorials.
Related Issues (20)
- use FORMAT_TIMESTAMP across repo instead of TIMESTAMPTOSTRING HOT 1
- fix or mitigate null table side values in fleet mgmt recipe
- Aviation Tutorial does not compute correct result
- Trim build.gradle to remove unneeded Avro dependencies HOT 1
- New error: harness-runner distribution not found HOT 1
- cloning kafka-consumer-application doesn't generate confluent tutorial page
- semaphore flakiness due to timeouts
- build failure: rm: cannot remove 'kafka-tutorials/harness_runner/harness_runner.egg-info/top_level.txt': Permission denied HOT 2
- build automation shouldn't install harness_runner as root HOT 1
- payment-status-check semaphore build flake
- event counting tutorial no longer needs dummy GROUP BY
- semaphore build should cache Docker images
- anomaly-detection recipe test periodically hangs HOT 1
- tutorials shouldn't be susceptible to topic retention deleting sample data HOT 6
- incorrect syntax highlighting for bash commands
- duplicate configuration explanations
- Query not returning if auto.offset.reset = Latest, which is default
- recipe typos / consistency fixes
- Enhance Credit Card Fraud Recipe: Give example source connector
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-tutorials.