Giter VIP home page Giter VIP logo

Comments (2)

chuck-confluent avatar chuck-confluent commented on June 17, 2024
  1. Create a stream against a DLQ topic
    CREATE STREAM dlq_bigquery_1 (
    key_col VARCHAR KEY,
    value_col VARCHAR,
    err_msg ARRAY<STRUCT<key STRING, value BYTES>> HEADERS
    ) WITH (
    WRAP_SINGLE_VALUE=false,
    kafka_topic = 'dlq-lcc-gqx2qn',
    value_format = 'json'
    );
  2. Create another stream to hold DLQ messages from many other DLQ topics:
    CREATE STREAM all_dlq (key_col VARCHAR KEY, value_col VARCHAR, err_msg ARRAY<STRUCT<key STRING, value STRING>>)
        WITH (kafka_topic='all_dlq', partitions=1, value_format='json');
  3. Insert each DLQ topics to the all_dlq stream:
    INSERT INTO all_dlq SELECT key_col, value_col, TRANSFORM(err_msg, x => Struct(key := x->key, value := FROM_BYTES(x->value, 'base64'))) as err_msg FROM dlq_bigquery_1;
    
  4. Sink to bigquery
    CREATE SINK CONNECTOR ...

from kafka-tutorials.

hendrasutanto avatar hendrasutanto commented on June 17, 2024

A sample DLQ input record produced by the S3 sink connector:

Value of the record:
{ "ordertime": 1497014222380, "orderid": 18, "itemid": "Item_184", "address": { "city": "Mountain View", "state": "CA", "zipcode": 94041 } }

Header of the record:
[ { "key": "__connect.errors.topic", "stringValue": "pksqlc-okr9jACCOMPLISHED_FEMALE_READERS" }, { "key": "__connect.errors.partition", "stringValue": "0" }, { "key": "__connect.errors.offset", "stringValue": "4957217" }, { "key": "__connect.errors.connector.name", "stringValue": "lcc-gqx2qn" }, { "key": "__connect.errors.task.id", "stringValue": "0" }, { "key": "__connect.errors.stage", "stringValue": "VALUE_CONVERTER" }, { "key": "__connect.errors.class.name", "stringValue": "io.confluent.connect.json.JsonSchemaConverter" }, { "key": "__connect.errors.exception.class.name", "stringValue": "org.apache.kafka.connect.errors.DataException" }, { "key": "__connect.errors.exception.message", "stringValue": "Converting byte[] to Kafka Connect data failed due to serialization error of topic pksqlc-okr9jACCOMPLISHED_FEMALE_READERS: " }, { "key": "__connect.errors.exception.stacktrace", "stringValue": "org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic pksqlc-okr9jACCOMPLISHED_FEMALE_READERS: \n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:119)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:500)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:500)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:475)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:233)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)\n\tat java.base/java.lang.Thread.run(Thread.java:831)\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:176)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:231)\n\tat io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:165)\n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:108)\n\t... 17 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:111)\n\t... 20 more\n" } ]

Key of the record:
18

from kafka-tutorials.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.