Giter VIP home page Giter VIP logo

scylla-cdc-source-connector's Introduction

Scylla CDC Source Connector

Overview

Scylla CDC Source Connector is a source connector capturing row-level changes in the tables of a Scylla cluster. It is a Debezium connector, compatible with Kafka Connect (with Kafka 2.6.0+) and built on top of scylla-cdc-java library.

The connector reads the CDC log for specified tables and produces Kafka messages for each row-level INSERT, UPDATE or DELETE operation. The connector is able to split reading the CDC log accross multiple processes: the connector can start a separate Kafka Connect task for reading each Vnode of Scylla cluster allowing for high throughput. You can limit the number of started tasks by using tasks.max property.

Scylla CDC Source Connector seamlessly handles schema changes and topology changes (adding, removing nodes from Scylla cluster). The connector is fault-tolerant, retrying reading data from Scylla in case of failure. It periodically saves the current position in Scylla CDC log using Kafka Connect offset tracking (configurable by offset.flush.interval.ms parameter). If the connector is stopped, it is able to resume reading from previously saved offset. Scylla CDC Source Connector has at-least-once semantics.

The connector has the following capabilities:

  • Kafka Connect connector using Debezium framework
  • Replication of row-level changes from Scylla using Scylla CDC:
    • INSERT
    • UPDATE
    • DELETE (single row deletes)
  • High scalability - able to split work accross multiple Kafka Connect workers
  • Fault tolerant - connector periodically saves its progress and can resume from previously saved offset (with at-least-once semantics)
  • Support for many standard Kafka Connect converters, such as JSON and Avro
  • Compatible with standard Kafka Connect transformations
  • Metadata about CDC events - each generated Kafka message contains information about source, such as timestamp and table name
  • Seamless handling of schema changes and topology changes (adding, removing nodes from Scylla cluster)

The connector has the following limitations:

  • Only Kafka 2.6.0+ is supported
  • Only row-level operations are produced (INSERT, UPDATE, DELETE):
    • Partition deletes - those changes are ignored
    • Row range deletes - those changes are ignored
  • No support for collection types (LIST, SET, MAP) and UDT - columns with those types are omitted from generated messages
  • No support for preimage and postimage - changes only contain those columns that were modified, not the entire row before/after change. More information here

Connector installation

Building

Prebuilt images

You can download the connector as a prebuilt package:

  1. JAR with dependencies (fat JAR): scylla-cdc-source-connector-1.0.1-jar-with-dependencies.jar
  2. Confluent Hub package (ZIP): ScyllaDB-scylla-cdc-source-connector-1.0.1.zip

The artifacts are also available in Maven Central Repository - we recommend using the "JAR with dependencies" file there.

Building from source

You can also build the connector from source by using the following commands:

git clone https://github.com/scylladb/scylla-cdc-source-connector.git
cd scylla-cdc-source-connector
mvn clean package

The connector JAR file will be available in scylla-cdc-kafka-connect/target/fat-jar directory.

Installation

Copy the JAR file with connector into your Kafka Connect deployment and append the directory containing the connector to your Kafka Connect's plugin path (plugin.path configuration property).

Configuration

Scylla CDC Source Connector exposes many configuration properties. These are the most important:

Property Required Description
scylla.name Yes A unique name that identifies the Scylla cluster and that is used as a prefix for all schemas, topics. The logical name allows you to easily differentiate between your different Scylla cluster deployments. Each distinct Scylla installation should have a separate namespace and be monitored by at most one Scylla CDC Source Connector. It should consist of alphanumeric or underscore (_) characters.
scylla.cluster.ip.addresses Yes List of IP addresses of nodes in the Scylla cluster that the connector will use to open initial connections to the cluster. In the form of a comma-separated list of pairs : (host1:port1,host2:port2).
scylla.table.names Yes List of CDC-enabled table names for connector to read. See Change Data Capture (CDC) for more information about configuring CDC on Scylla. Provided as a comma-separated list of pairs <keyspace name>.<table name>.
scylla.user No The username to connect to Scylla with. If not set, no authorization is done.
scylla.password No The password to connect to Scylla with. If not set, no authorization is done.

See additional configuration properties in the "Advanced administration" section.

Example configuration (as .properties file):

name=ScyllaCDCSourceConnector
connector.class=com.scylladb.cdc.debezium.connector.ScyllaConnector
scylla.name=MyScyllaCluster
scylla.cluster.ip.addresses=127.0.0.1:9042,127.0.0.2:9042
scylla.table.names=ks.my_table

tasks.max=10
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

auto.create.topics.enable=true

This configuration will capture row-level changes in the ks.my_table table from Scylla cluster (127.0.0.1, 127.0.0.2). Change data capture events will appear on MyScyllaCluster.ks.my_table Kafka topic encoded as JSONs with schema information.

Scylla CDC Source Connector writes events to a separate Kafka topic for each source Scylla table. The topic name will be: logicalName.keyspaceName.tableName (logical name configured by scylla.name property). You can turn on automatic topic creation by using the auto.create.topics.enable property.

Data change events

Scylla CDC Source Connector generates a data change event for each row-level INSERT, UPDATE or DELETE operation. Each event consists of key and value.

Debezium and Kafka Connect are designed around continuous streams of event messages, and the structure of these events may change over time. This could be difficult for consumers to deal with, so to make it easy Kafka Connect makes each event self-contained. Every message key and value has two parts: a schema and payload. The schema describes the structure of the payload, while the payload contains the actual data.

Data change event key

The data change event's key will contain a field for each column in the primary key (partition key and clustering key).

For example, given this Scylla table and INSERT operation:

CREATE TABLE ks.t(
    pk int, ck int, v text, PRIMARY KEY(pk, ck)
) WITH cdc = {'enabled': true};

INSERT INTO ks.t(pk, ck, v) VALUES (1, 1, 'example row');

The data change event's key will look like this (with JSON serializer and schema enabled):

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": true,
        "field": "ck"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "pk"
      }
    ],
    "optional": false,
    "name": "MyScyllaCluster.ks.my_table.Key"
  },
  "payload": {
    "ck": 1,
    "pk": 1
  }
}

Data change event value

Data change event's value consists of a schema and a payload section. The payload of every data change events contains the following fields:

  • op: type of operation. c for INSERT, u for UPDATE, d for DELETE.
  • before: an optional field with state of the row before the event occurred. Present in DELETE data change events.
  • after: an optional field with state of the row after the event occurred. Present in UPDATE and INSERT data change events.
  • ts_ms: time at which connector processed the event.
  • source: metadata about the source of event:
    • name: logical name of Scylla cluster (scylla.name).
    • ts_ms: the time that the change was made in the database (in milliseconds). You can compute a difference between source.ts_ms and (top-level) ts_ms to determine the lag between the source Scylla change and the connector.
    • ts_us: the time that the change was made in the database (in microseconds).
    • keyspace_name, table_name: the name of keyspace and table this data change event originated from.

Cell representation

Operations in Scylla, such as INSERT or UPDATE, do not have to modify all columns of a row. To differentiate between non-modification of column and inserting/updating NULL, all non-primary-key columns are wrapped with structure containing a single value field. For example, given this Scylla table and UPDATE operation:

CREATE TABLE ks.t(
    pk int, ck int, v text, PRIMARY KEY(pk, ck)
) WITH cdc = {'enabled': true};

INSERT INTO ks.t(pk, ck, v) VALUES (1, 1, 'example row');
UPDATE ks.t SET v = 'new value' WHERE pk = 1 AND ck = 1;

The v column will be represented as:

...
    "v": {
        "value": "new value"
    }
...

In case of UPDATE setting v to NULL:

UPDATE ks.t SET v = NULL WHERE pk = 1 AND ck = 1;

The v column will be represented as:

...
    "v": {
        "value": null
    }
...

If the operation did not modify the v column, the data event will contain the following representation of v:

...
    "v": null
...

See UPDATE example for full data change event's value.

Single Message Transformations (SMTs)

The connector provides two single message transformations (SMTs): ScyllaExtractNewRecordState (class: com.scylladb.cdc.debezium.connector.transforms.ScyllaExtractNewRecordState) and ScyllaFlattenColumns (com.scylladb.cdc.debezium.connector.transforms.ScyllaFlattenColumns).

ScyllaExtractNewRecordState

ScyllaExtractNewRecordState works like exactly like io.debezium.transforms.ExtractNewRecordState (in fact it is called underneath), but also flattens structure by extracting values from the aforementioned single-field structures. Such transformation makes message structure simpler (and easier to use with e.g. Elasticsearch), but it makes it impossible to differentiate between NULL value and non-modification. If the message is as following:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": true,
        "field": "ck"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "pk"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": true,
            "field": "value"
          }
        ],
        "optional": true,
        "name": "NS2.ks.t.v.Cell",
        "field": "v"
      }
    ],
    "optional": false,
    "name": "NS2.ks.t.After"
  },
  "payload": {
    "ck": 2,
    "pk": 20,
    "v": {
      "value": 3
    }
  }
}

then the same message transformed by ScyllaExtractNewRecordState would be:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": true,
        "field": "ck"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "pk"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "v"
      }
    ],
    "optional": false,
    "name": "NS2.ks.t.After"
  },
  "payload": {
    "ck": 2,
    "pk": 20,
    "v": 3
  }
}

Notice how v field is no longer packed in value.

ScyllaFlattenColumns

ScyllaFlattenColumns flattens columns that are wrapped in value structure, such as:

"v": {
  "value": 3
}

transforming it into:

"v": 3

Compared to ScyllaExtractNewRecordState transformation, ScyllaFlattenColumns does not remove any additional metadata or modify the message in any other way.

For example, running the transformation on this message:

{
  "source": {
    "version": "1.1.4",
    "connector": "scylla",
    "name": "SMTExample",
    "ts_ms": 1706890860030,
    "snapshot": {
      "string": "false"
    },
    "db": "ks",
    "keyspace_name": "ks",
    "table_name": "t",
    "ts_us": 1706890860030414
  },
  "before": null,
  "after": {
    "SMTExample.ks.t.Before": {
      "ck": 7,
      "pk": 1,
      "v": {
        "value": 7
      }
    }
  },
  "op": {
    "string": "c"
  },
  "ts_ms": {
    "long": 1706890892952
  },
  "transaction": null
}

will result in the following message:

{
  "source": {
    "version": "1.1.4",
    "connector": "scylla",
    "name": "SMTExample",
    "ts_ms": 1706890860030,
    "snapshot": {
      "string": "false"
    },
    "db": "ks",
    "keyspace_name": "ks",
    "table_name": "t",
    "ts_us": 1706890860030414
  },
  "before": null,
  "after": {
    "SMTExample.ks.t.Before": {
      "ck": 7,
      "pk": 1,
      "v": 7
    }
  },
  "op": {
    "string": "c"
  },
  "ts_ms": {
    "long": 1706890892952
  },
  "transaction": null
}

while ScyllaExtractNewRecordState would produce:

{
  "ck": 7,
  "pk": 1,
  "v": 7
}

INSERT example

Given this Scylla table and INSERT operation:

CREATE TABLE ks.t(
    pk int, ck int, v text, PRIMARY KEY(pk, ck)
) WITH cdc = {'enabled': true};

INSERT INTO ks.t(pk, ck, v) VALUES (1, 1, 'example row');

The connector will generate the following data change event's value (with JSON serializer and schema enabled):

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_us"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": false,
            "field": "keyspace_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table_name"
          }
        ],
        "optional": false,
        "name": "com.scylladb.cdc.debezium.connector",
        "field": "source"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": true,
            "field": "ck"
          },
          {
            "type": "int32",
            "optional": true,
            "field": "pk"
          },
          {
            "type": "struct",
            "fields": [
              {
                "type": "string",
                "optional": true,
                "field": "value"
              }
            ],
            "optional": true,
            "name": "MyScyllaCluster.ks.my_table.v.Cell",
            "field": "v"
          }
        ],
        "optional": true,
        "name": "MyScyllaCluster.ks.my_table.Before",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": true,
            "field": "ck"
          },
          {
            "type": "int32",
            "optional": true,
            "field": "pk"
          },
          {
            "type": "struct",
            "fields": [
              {
                "type": "string",
                "optional": true,
                "field": "value"
              }
            ],
            "optional": true,
            "name": "MyScyllaCluster.ks.my_table.v.Cell",
            "field": "v"
          }
        ],
        "optional": true,
        "name": "MyScyllaCluster.ks.my_table.After",
        "field": "after"
      },
      {
        "type": "string",
        "optional": true,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "MyScyllaCluster.ks.my_table.Envelope"
  },
  "payload": {
    "source": {
      "version": "1.0.1-SNAPSHOT",
      "connector": "scylla",
      "name": "MyScyllaCluster",
      "ts_ms": 1611578778701,
      "ts_us": 1611578778701813,
      "snapshot": "false",
      "db": "ks",
      "keyspace_name": "ks",
      "table_name": "my_table"
    },
    "before": null,
    "after": {
      "ck": 1,
      "pk": 1,
      "v": {
        "value": "example row"
      }
    },
    "op": "c",
    "ts_ms": 1611578838754,
    "transaction": null
  }
}

UPDATE example

Given this Scylla table and UPDATE operations:

CREATE TABLE ks.t(
    pk int, ck int, v text, PRIMARY KEY(pk, ck)
) WITH cdc = {'enabled': true};

UPDATE ks.t SET v = 'new value' WHERE pk = 1 AND ck = 1;
UPDATE ks.t SET v = NULL WHERE pk = 1 AND ck = 1;

The connector will generate the following data change event's value (with JSON serializer and schema enabled) for the first UPDATE. Note that schema is ommitted as it is the same as in INSERT example:

{
  "schema": {},
  "payload": {
    "source": {
      "version": "1.0.1-SNAPSHOT",
      "connector": "scylla",
      "name": "MyScyllaCluster",
      "ts_ms": 1611578808701,
      "ts_us": 1611578808701321,
      "snapshot": "false",
      "db": "ks",
      "keyspace_name": "ks",
      "table_name": "my_table"
    },
    "before": null,
    "after": {
      "ck": 1,
      "pk": 1,
      "v": {
        "value": "new value"
      }
    },
    "op": "u",
    "ts_ms": 1611578868758,
    "transaction": null
  }
}

Data change event's value for the second UPDATE:

{
  "schema": {},
  "payload": {
    "source": {
      "version": "1.0.1-SNAPSHOT",
      "connector": "scylla",
      "name": "MyScyllaCluster",
      "ts_ms": 1611578808701,
      "ts_us": 1611578808701341,
      "snapshot": "false",
      "db": "ks",
      "keyspace_name": "ks",
      "table_name": "my_table"
    },
    "before": null,
    "after": {
      "ck": 1,
      "pk": 1,
      "v": {
        "value": null
      }
    },
    "op": "u",
    "ts_ms": 1611578868758,
    "transaction": null
  }
}

DELETE example

Given this Scylla table and DELETE operation:

CREATE TABLE ks.t(
    pk int, ck int, v text, PRIMARY KEY(pk, ck)
) WITH cdc = {'enabled': true};

DELETE FROM ks.t WHERE pk = 1 AND ck = 1;

The connector will generate the following data change event's value (with JSON serializer and schema enabled). Note that schema is ommitted as it is the same as in INSERT example:

{
  "schema": {},
  "payload": {
    "source": {
      "version": "1.0.1-SNAPSHOT",
      "connector": "scylla",
      "name": "MyScyllaCluster",
      "ts_ms": 1611578808701,
      "ts_us": 1611578808701919,
      "snapshot": "false",
      "db": "ks",
      "keyspace_name": "ks",
      "table_name": "my_table"
    },
    "before": {
      "ck": 1,
      "pk": 1,
      "v": null
    },
    "after": null,
    "op": "d",
    "ts_ms": 1611578868759,
    "transaction": null
  }
}

Advanced administration

Advanced configuration parameters

In addition to the configuration parameters described in the "Configuration" section, Scylla CDC Source Connector exposes the following (non-required) configuration parameters:

Property Description
scylla.query.time.window.size The size of windows queried by the connector. Changes are queried using SELECT statements with time restriction with width defined by this parameter. Value expressed in milliseconds.
scylla.confidence.window.size The size of the confidence window. It is necessary for the connector to avoid reading too fresh data from the CDC log due to the eventual consistency of Scylla. The problem could appear when a newer write reaches a replica before some older write. For a short period of time, when reading, it is possible for the replica to return only the newer write. The connector mitigates this problem by not reading a window of most recent changes (controlled by this parameter). Value expressed in milliseconds.
scylla.consistency.level The consistency level of CDC table read queries. This consistency level is used only for read queries to the CDC log table. By default, QUORUM level is used.
scylla.local.dc The name of Scylla local datacenter. This local datacenter name will be used to setup the connection to Scylla to prioritize sending requests to the nodes in the local datacenter. If not set, no particular datacenter will be prioritized.

Configuration for large Scylla clusters

Offset (progress) storage

Scylla CDC Source Connector reads the CDC log by quering on Vnode granularity level. It uses Kafka Connect to store current progress (offset) for each Vnode. By default, there are 256 Vnodes per each Scylla node. Kafka Connect stores those offsets in its connect-offsets internal topic, but it could grow large in case of big Scylla clusters. You can minimize this topic size, by adjusting the following configuration options on this topic:

  1. segment.bytes or segment.ms - lowering them will make the compaction process trigger more often.
  2. cleanup.policy=delete and setting retention.ms to at least the TTL value of your Scylla CDC table (in milliseconds; Scylla default is 24 hours). Using this configuration, older offsets will be deleted. By setting retention.ms to at least the TTL value of your Scylla CDC table, we make sure to delete only those offsets that have already expired in the source Scylla CDC table.

tasks.max property

By adjusting tasks.max property, you can configure how many Kafka Connect worker tasks will be started. By scaling up the number of nodes in your Kafka Connect cluster (and tasks.max number), you can achieve higher throughput. In general, the tasks.max property should be greater or equal the number of nodes in Kafka Connect cluster, to allow the connector to start on each node. tasks.max property should also be greater or equal the number of nodes in your Scylla cluster, especially if those nodes have high shard count (32 or greater) as they have a large number of CDC Streams.

scylla-cdc-source-connector's People

Contributors

avelanarius avatar bouncheck avatar lorak-mmk avatar wyfo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

scylla-cdc-source-connector's Issues

Kafka Connect Scylla Connector tasks getting deleted from status topic

In testing scylla cdc using kafka connect tasks are automatically getting removed leading to no cdc events being streamed even when there are write ops on the table where cdc stream has been put.

Connector Config

{ "name": "cdc-platform-scylla-load-test-3", "config": { "connector.class": "com.scylladb.cdc.debezium.connector.ScyllaConnector", "scylla.user": "***", "auto.create.topics.enable": "true", "scylla.table.names": "scyllacdcloadtest.livestream", "tasks.max": "50", "scylla.cluster.ip.addresses": "172.19.0.103:19042,172.19.0.104:19042", "scylla.password": "***", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "scylla.name": "livestream-cdc-load-testing", "key.converter": "org.apache.kafka.connect.json.JsonConverter" } }

Table details -
CREATE TABLE scyllacdcloadtest.livestream ( livestream_id text PRIMARY KEY, createreceivetime bigint, createtime bigint, endreceivetime bigint, endtime bigint, status text ) WITH bloom_filter_fp_chance = 0.01 AND caching = {'keys': 'ALL', 'rows_per_partition': 'ALL'} AND comment = '' AND compaction = {'class': 'IncrementalCompactionStrategy'} AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'} AND crc_check_chance = 1.0 AND dclocal_read_repair_chance = 0.0 AND default_time_to_live = 0 AND gc_grace_seconds = 864000 AND max_index_interval = 2048 AND memtable_flush_period_in_ms = 0 AND min_index_interval = 128 AND read_repair_chance = 0.0 AND speculative_retry = '99.0PERCENTILE';

Image showing write ops on the table where cdc is put.
Screenshot 2023-04-24 at 6 12 53 PM

Image showing cdc kafka topic messages/sec.
Screenshot 2023-04-24 at 6 15 14 PM

List kafka connect tasks returns

curl --location --request GET '100.98.4.123:8083/connectors

[ "cdc-platform-scylla-load-test-3" ]

Also describe kafka connect task return

curl --location --request GET '100.98.1.157:8083/connectors/cdc-platform-scylla-load-test-3

{ "name": "cdc-platform-scylla-load-test-3", "config": { "connector.class": "com.scylladb.cdc.debezium.connector.ScyllaConnector", "scylla.user": "***", "auto.create.topics.enable": "true", "scylla.table.names": "scyllacdcloadtest.livestream", "tasks.max": "50", "scylla.cluster.ip.addresses": "172.19.0.103:19042,172.19.0.104:19042", "scylla.password": "***", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "name": "cdc-platform-scylla-load-test-3", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "scylla.name": "livestream-cdc-load-testing", "key.converter": "org.apache.kafka.connect.json.JsonConverter" }, "tasks": [], "type": "source" }
which shows there are no tasks running.

Also there are null values for task keys in status topic of kafka connect
Screenshot 2023-04-24 at 6 22 23 PM

Any support plan changing consistency level when reading cdc log?

HI.

Only reading the cdc log on Scylla takes too long, over a few seconds. When reading the cdc log, it seems to query other data centers together. Are there any plans to support a setting that changes the consistency level when reading cdc logs? For example local_quorum, each_quorum.. etc.

[Environment]

DC US_WEST_1 => AWS EC2 i3en.6xlarge * 3
DC EU_CENTRAL_1 => AWS EC2 i3en.6xlarge * 3

[Config]

CREATE KEYSPACE IF NOT EXISTS test_service WITH REPLICATION = {
'class' : 'NetworkTopologyStrategy',
'AWS_US_WEST_1' : 3,
'AWS_EU_CENTRAL_1' : 3
};

[SourceConnector Config]

  "connector.class": "com.scylladb.cdc.debezium.connector.ScyllaConnector",
  "tasks.max": "3",
  "scylla.cluster.ip.addresses": "'${database_url}'",
  "scylla.user": "'${database_user}'",
  "scylla.password": "'${database_password}'",
  "scylla.name": "cdc-data.test",
  "scylla.table.names": "'${table_include_list}'",
  "scylla.query.time.window.size": "5000",
  "scylla.confidence.window.size": "5000",
  "producer.override.acks": "-1",
  "producer.override.max.in.flight.requests.per.connection": "1",
  "producer.override.compression.type": "snappy",
  "producer.override.linger.ms": "50",
  "producer.override.batch.size": "327680",
  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "topic.creation.default.replication.factor": "'${replication_factor}'",
  "topic.creation.default.partitions": "11"

[Metric]
스크린샷 2021-04-29 오전 11 27 43

feature request: support for preimage

As a consumer of my CDC event stream (Kafka topic), with table cdc preimages enabled, I'd like to also receive data of the preimage *_cdc_log record (cdc$operation=0).

This would allow me to fully utilise the change event for stream processing use cases.

Optional: either follow the cdc setting of the (source) table in question - or have the scylla-cdc-source-connector to explicitly configure (enable/disable) processing of preimages.

Example use cases:

  • UPDATE to a table account where I'd like to determine if the col accountname has changed (+value from 'a' -> 'b')
  • construct the full updated record (~postimage) to have a complete new record of my document, e.g. for streaming to other systems / databases

Add / use `poll.interval.ms` config option (!= `scylla.query.time.window.size`)

Description

To allow to tune/customise the behaviour of one's source connector setup, I'd like to also have a config option poll.interval.ms in addition to scylla.query.time.window.size which defines effectively the query time window size + query interval for a 'live' / caught up worker task.

As per my understanding / reasoning the poll.interval.ms would/should be smaller than scylla.query.time.window.size - with the latter being applied while catching up / init phase.

Workers (connect tasks) ideally will evenly scatter queries for to the assigned array of streamIds / streamIdGroups (scylla-cdc-java worker task?).

Config Field Definition

poll.interval.ms
Positive integer value that specifies the frequency in milliseconds the connector should wait to poll for new data in each worker task (Vnode). Defaults to 15.000 milliseconds.

  • Type: Integer
    Importance: Low
    Default: 15000
    Frequency in ms to poll for new data in each table.

References

Pulsar compatibility

Could we add Apache Pulsar compatibility to this same CDC Source Connector, or would we need to clone and create a side-by-side project that could be compatible with Apache Pulsar?

Could an event be lost if the tasks.max setting conditions written in the README.md are not met?

Hi
Could an event be lost if the tasks.max setting conditions written in the README.md are not met?

  • tasks.max >= kafka connect cluster nodes (It's ok, 3 >= 3)
  • tasks.max >= number of nodes in Scylla cluster (It's not ok, 3 < 12)
In general, the tasks.max property should be greater or equal the number of nodes in Kafka Connect cluster, to allow the connector to start on each node. tasks.max property should also be greater or equal the number of nodes in your Scylla cluster.

[Infra]

Kafka Connect Cluster Node Size : 3
Total Multi DC ScyllaDB Cluster Node Size : 12
Each ScyllaDB Cluster Node Size : 6 (DC : AWS EU_CENTRAL_1), 6 (DC : AWS US_WEST_1)

[Scylla DB Source Connector's configuration]

"connector.class": "com.scylladb.cdc.debezium.connector.ScyllaConnector",
"tasks.max": "3",
"scylla.cluster.ip.addresses": "'${database_url}'",
"scylla.user": "'${database_user}'",
"scylla.password": "'${database_password}'",
"scylla.name": "cdc-data.test",
"scylla.table.names": "'${table_include_list}'",
"scylla.query.time.window.size": "5000",
"scylla.confidence.window.size": "5000",
"producer.override.acks": "-1",
"producer.override.max.in.flight.requests.per.connection": "1",
"producer.override.compression.type": "snappy",
"producer.override.linger.ms": "50",
"producer.override.batch.size": "327680",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topic.creation.default.replication.factor": "'${replication_factor}'",
"topic.creation.default.partitions": "11"

Detected performance impact query

In scylla-cdc-source-connector, the following query pattern is used to retrieve the changed data from the CDC table.

SELECT * FROM test.user_scylla_cdc_log WHERE "cdc$stream_id" IN ? AND "cdc$time">? AND "cdc$time"<=?;

"cdc$stream_id" IN ?
The query pattern seems to be sending a lookup request to all the ScyllaDB Cluster Nodes. So ScyllaDB will see spikes in load and crash.

How about improving these query patterns by looking up and merging them in parallel inside the CDC Connector?

SELECT * FROM test.user_scylla_cdc_log WHERE "cdc$stream_id" = "stream_id_A" AND "cdc$time">? AND "cdc$time"<=?;

SELECT * FROM test.user_scylla_cdc_log WHERE "cdc$stream_id" = "stream_id_B" AND "cdc$time">? AND "cdc$time"<=?;

SELECT * FROM test.user_scylla_cdc_log WHERE "cdc$stream_id" = "stream_id_C" AND "cdc$time">? AND "cdc$time"<=?;

....

SELECT * FROM test.user_scylla_cdc_log WHERE "cdc$stream_id" = "stream_id_Z" AND "cdc$time">? AND "cdc$time"<=?;


Then, merge all query results in source connector.

Currently, ScyllaDB has failed after attaching the Source Connector in the production environment.


[ScyllaDB Cluster Environment]

  • Total Nodes
    • 12Node
  • AWS_US_WEST_1
    • 6Node
  • AWS_EU_CENTRAL_1
    • 6Node

[Scylla CDC Source Connector Configuration]

  "connector.class": "com.scylladb.cdc.debezium.connector.ScyllaConnector",
  "tasks.max": "3",
  "scylla.cluster.ip.addresses": "'${database_url}'",
  "scylla.user": "'${database_user}'",
  "scylla.password": "'${database_password}'",
  "scylla.name": "cdc-data.test",
  "scylla.table.names": "test.a,test.b",
  "scylla.query.time.window.size": "10000",
  "scylla.confidence.window.size": "5000",
  "scylla.consistency.level": "LOCAL_QUORUM",
  "scylla.local.dc": "AWS_US_WEST_1",
  "producer.override.acks": "-1",
  "producer.override.max.in.flight.requests.per.connection": "1",
  "producer.override.compression.type": "snappy",
  "producer.override.linger.ms": "50",
  "producer.override.batch.size": "327680",
  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "topic.creation.default.replication.factor": "3",
  "topic.creation.default.partitions": "11"

스크린샷 2021-11-05 오후 3 34 29

CDC log stream state (cdc$time) persisted via connect topic `connect-offsets`

Hi, when looking at the data published to connect-offsets table I noticed the latest window state is tracked by

  • 'connector name'
  • array (per table..?) of tuple4
    • keyspace_name
    • table_name
    • vnode_id
    • generation_start

image

Why is this at the vnode_id level and where does this information come from?
When querying the table the vnode_id is not used as a query condition, right?

Further implication (maybe?):
The topic connect-offsets is created by kafka connect (not the scylla connector) and is not a compacted topic.
While running a simple test (scylla.query.time.window.size: 2000) for 1 connector, 1 task, 1 table - resulted in ~1M messages on the docker-connect-offsets topic.
@pkgonan may I ask if you've got numbers to confirm this for a more comprehensive setup?

@haaawk how is this topic consumed upon connector (re)start / task/consumer rebalancing? From beginning?


Update 2021-12-15:

ℹ️ For reference: the part on connect-offsets already has been well described and addressed in a section in the repo README:

#### Offset (progress) storage
Scylla CDC Source Connector reads the CDC log by quering on [Vnode](https://docs.scylladb.com/architecture/ringarchitecture/) granularity level. It uses Kafka Connect to store current progress (offset) for each Vnode. By default, there are 256 Vnodes per each Scylla node. Kafka Connect stores those offsets in its `connect-offsets` internal topic, but it could grow large in case of big Scylla clusters. You can minimize this topic size, by adjusting the following configuration options on this topic:
1. `segment.bytes` or `segment.ms` - lowering them will make the compaction process trigger more often.
2. `cleanup.policy=delete` and setting `retention.ms` to at least the TTL value of your Scylla CDC table (in milliseconds; Scylla default is 24 hours). Using this configuration, older offsets will be deleted. By setting `retention.ms` to at least the TTL value of your Scylla CDC table, we make sure to delete only those offsets that have already expired in the source Scylla CDC table.

Offset (progress) storage

Scylla CDC Source Connector reads the CDC log by quering on Vnode granularity level. It uses Kafka Connect to store current progress (offset) for each Vnode. By default, there are 256 Vnodes per each Scylla node. Kafka Connect stores those offsets in its connect-offsets internal topic, but it could grow large in case of big Scylla clusters. You can minimize this topic size, by adjusting the following configuration options on this topic:

  1. segment.bytes or segment.ms - lowering them will make the compaction process trigger more often.
  2. cleanup.policy=delete and setting retention.ms to at least the TTL value of your Scylla CDC table (in milliseconds; Scylla default is 24 hours). Using this configuration, older offsets will be deleted. By setting retention.ms to at least the TTL value of your Scylla CDC table, we make sure to delete only those offsets that have already expired in the source Scylla CDC table.

feature request: allow to define initial ChangeAgeLimit

As a connect user/admin I'd like to be able to configure scylla.change.age.limit.

(quoting scylla-cdc-go)

When the library starts for the first time it has to start consuming changes from some point in time. This parameter defines how far in the past it needs to look. If the value of the parameter is set to an hour, then the library will only read historical changes that are no older than an hour.

Consuming from a 'connector::table::vnodeId::streamId' should start from ~
max(persistedState, now()-tableCdcTtl, now()-changeAgeLimit)

References

Kafka MaskField transform problem

Hi, I have a user table with a column named ssn that holds SSN#. When the source connector publish to Kafka topic I want to mask out the SSN# field. So I use the MaskField [transform] (https://docs.confluent.io/platform/current/connect/transforms/maskfield.html#maskfield). I setup the transform with the following properties:
transforms=dataMask
transforms.dataMask.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.dataMask.fields=ssn
transforms.dataMask.replacement=
But I'm not seeing the field being masked. I wonder maybe the field name should not be the same as the database column name? Thanks for any help.

Runtime error while starting connector

Stack trace:

Caused by: java.lang.NoSuchMethodError: 'org.apache.kafka.connect.source.SourceConnectorContext com.scylladb.cdc.debezium.connector.ScyllaConnector.context()'
at com.scylladb.cdc.debezium.connector.ScyllaConnector.buildMaster(ScyllaConnector.java:67)
at com.scylladb.cdc.debezium.connector.ScyllaConnector.validate(ScyllaConnector.java:138)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:318)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:672)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:669)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:299)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:248)

Better errors in case automatic topic creation is disabled

If automatic topic creation is disabled on a Kafka Connect cluster, the connector will not start succesfully. We should make sure that it gives clear error messages in that case and that this scenario is documented and explained in docs (including what are the names of topics to create: target topic and heartbeat topic).

cdc events are not sent to kafka

Hi.
cdc events are not sent to kafka.
When we tested in dev environemnt (SIngle DC) worked well. But in production environment (Multi DC) did not work.

When create & update & delete command is executed in my_table (CDC Enabled), cdc log is generated to my_table_scylla_cdc_log successfully. But cdc event not sent to kafka topic. However, heartbeat event is produced to kafka successfully. (Kafka Topic : __debezium-heartbeat.cdc-data.test)

If an error log is detected, we can tell what the problem is, but it is difficult to know what the problem is because the error log does not occur.

[Versions]

Kafka Broker Version : 2.6.0
Confluent Kafka Connect Version : 6.1.1
scylla-cdc-source-connector Version : 1.0.0
ScyllaDB Open Source Version : 4.4.1

[Configs - Same in all environments.]

  "connector.class": "com.scylladb.cdc.debezium.connector.ScyllaConnector",
  "tasks.max": "3",  
  "scylla.cluster.ip.addresses": "'${database_url}'",  
  "scylla.user": "'${database_user}'",
  "scylla.password": "'${database_password}'",
  "scylla.name": "cdc-data.test",
  "scylla.table.names": "'${table_include_list}'",
  "scylla.query.time.window.size": "1000",
  "scylla.confidence.window.size": "1000",
  "producer.override.acks": "-1",
  "producer.override.max.in.flight.requests.per.connection": "1",
  "producer.override.compression.type": "snappy",
  "producer.override.linger.ms": "50",
  "producer.override.batch.size": "327680",
  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "topic.creation.default.replication.factor": "'${replication_factor}'",
  "topic.creation.default.partitions": "11"
CREATE TABLE IF NOT EXISTS test_service.my_table(
user_id varchar,
blocked_user_id varchar,
type varchar,
created_at timeuuid,
PRIMARY KEY((user_id, blocked_user_id)))
with cdc={'enabled': true, 'ttl': 172800};

[Dev Environment Config - Single DC]

CREATE KEYSPACE IF NOT EXISTS test_service WITH REPLICATION = {
'class' : 'NetworkTopologyStrategy',
'ap-northeast-1' : 3
};

[Production Environment Config - Multi DC]

CREATE KEYSPACE IF NOT EXISTS test_service WITH REPLICATION = {
'class' : 'NetworkTopologyStrategy',
'us-west-1' : 3,
'eu-central-1' : 3
};

[Confluent Kafka Connect Log]

[2021-04-26 08:01:55,208] INFO    tasks.max = 3 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    scylla.table.names = test_service.my_table (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    producer.override.batch.size = 327680 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    scylla.cluster.ip.addresses = AA:9042,BB:9042,CC:9042 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    heartbeat.interval.ms = 30000 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    scylla.password = ******** (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    producer.override.linger.ms = 50 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    scylla.query.time.window.size = 1000 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    task.class = com.scylladb.cdc.debezium.connector.ScyllaConnectorTask (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    producer.override.acks = -1 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    scylla.confidence.window.size = 1000 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    topic.creation.default.replication.factor = 2 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    name = my_table_db_connector (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    errors.tolerance = all (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,208] INFO    errors.log.enable = true (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,209] INFO    scylla.name = cdc-data.test (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,209] INFO    producer.override.max.in.flight.requests.per.connection = 1 (io.debezium.connector.common.BaseSourceTask:102)
[2021-04-26 08:01:55,219] INFO Requested thread factory for connector ScyllaConnector, id = cdc-data.test named = change-event-source-coordinator (io.debezium.util.Threads:270)
[2021-04-26 08:01:55,219] INFO Creating thread debezium-scyllaconnector-cdc-data.test-change-event-source-coordinator (io.debezium.util.Threads:287)
[2021-04-26 08:01:55,220] INFO WorkerSourceTask{id=my_table_db_connector-1} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
[2021-04-26 08:01:55,220] INFO Metrics registered (io.debezium.pipeline.ChangeEventSourceCoordinator:91)
[2021-04-26 08:01:55,220] INFO Context created (io.debezium.pipeline.ChangeEventSourceCoordinator:94)
[2021-04-26 08:01:55,220] INFO Snapshot ended with SnapshotResult [status=SKIPPED, offset=com.scylladb.cdc.debezium.connector.ScyllaOffsetContext@58cdf7fb] (io.debezium.pipeline.ChangeEventSourceCoordinator:106)
[2021-04-26 08:01:55,220] INFO Connected metrics set to 'true' (io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics:60)
[2021-04-26 08:01:55,220] INFO Starting streaming (io.debezium.pipeline.ChangeEventSourceCoordinator:139)
[2021-04-26 08:01:55,220] INFO Using native clock to generate timestamps. (shaded.com.scylladb.cdc.driver3.driver.core.ClockFactory:57)
===== Using optimized driver!!! =====
[2021-04-26 08:01:55,220] INFO ===== Using optimized driver!!! ===== (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:186)
[2021-04-26 08:01:55,222] INFO Requested thread factory for connector ScyllaConnector, id = cdc-data.test named = change-event-source-coordinator (io.debezium.util.Threads:270)
[2021-04-26 08:01:55,222] INFO Creating thread debezium-scyllaconnector-cdc-data.test-change-event-source-coordinator (io.debezium.util.Threads:287)
[2021-04-26 08:01:55,223] INFO WorkerSourceTask{id=my_table_db_connector-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
[2021-04-26 08:01:55,253] WARN Unable to register the MBean 'debezium.scylla:type=connector-metrics,context=snapshot,server=cdc-data.test': debezium.scylla:type=connector-metrics,context=snapshot,server=cdc-data.test (io.debezium.pipeline.ChangeEventSourceCoordinator:56)
[2021-04-26 08:01:55,253] WARN Unable to register the MBean 'debezium.scylla:type=connector-metrics,context=streaming,server=cdc-data.test': debezium.scylla:type=connector-metrics,context=streaming,server=cdc-data.test (io.debezium.pipeline.ChangeEventSourceCoordinator:56)
[2021-04-26 08:01:55,253] INFO Metrics registered (io.debezium.pipeline.ChangeEventSourceCoordinator:91)
[2021-04-26 08:01:55,253] INFO Context created (io.debezium.pipeline.ChangeEventSourceCoordinator:94)
[2021-04-26 08:01:55,253] INFO Snapshot ended with SnapshotResult [status=SKIPPED, offset=com.scylladb.cdc.debezium.connector.ScyllaOffsetContext@7b13f3ae] (io.debezium.pipeline.ChangeEventSourceCoordinator:106)
[2021-04-26 08:01:55,254] INFO Connected metrics set to 'true' (io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics:60)
[2021-04-26 08:01:55,254] INFO Starting streaming (io.debezium.pipeline.ChangeEventSourceCoordinator:139)
[2021-04-26 08:01:55,254] INFO Using native clock to generate timestamps. (shaded.com.scylladb.cdc.driver3.driver.core.ClockFactory:57)
===== Using optimized driver!!! =====
[2021-04-26 08:01:55,254] INFO ===== Using optimized driver!!! ===== (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:186)
[2021-04-26 08:01:55,831] INFO Using data-center name 'us-west-1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor) (shaded.com.scylladb.cdc.driver3.driver.core.policies.DCAwareRoundRobinPolicy:110)
[2021-04-26 08:01:55,832] INFO New Cassandra host /AA:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /BB:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /CC:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /DD:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO Using data-center name 'us-west-1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor) (shaded.com.scylladb.cdc.driver3.driver.core.policies.DCAwareRoundRobinPolicy:110)
[2021-04-26 08:01:55,832] INFO New Cassandra host /EE:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /FF:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /AA:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /BB:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /DD:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /CC:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /EE:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)
[2021-04-26 08:01:55,832] INFO New Cassandra host /FF:9042 added (shaded.com.scylladb.cdc.driver3.driver.core.Cluster:1812)

[Below is a log that does not occur often and only occurs once.]

[2021-04-26 05:19:11,602] INFO Query SELECT * FROM test_service.my_table_scylla_cdc_log WHERE "cdc$stream_id" IN ? AND "cdc$time">? AND "cdc$time"<=?; is not prepared on null, preparing before retrying executing. Seeing this message a few times is fine, but seeing it a lot may be source of performance problems (shaded.com.scylladb.cdc.driver3.driver.core.RequestHandler:822)
[2021-04-26 05:19:11,604] INFO Query SELECT * FROM test_service.my_table_scylla_cdc_log WHERE "cdc$stream_id" IN ? AND "cdc$time">? AND "cdc$time"<=?; is not prepared on null, preparing before retrying executing. Seeing this message a few times is fine, but seeing it a lot may be source of performance problems (shaded.com.scylladb.cdc.driver3.driver.core.RequestHandler:822)
[2021-04-26 05:19:11,644] INFO Query SELECT * FROM test_service.my_table_scylla_cdc_log WHERE "cdc$stream_id" IN ? AND "cdc$time">? AND "cdc$time"<=?; is not prepared on null, preparing before retrying executing. Seeing this message a few times is fine, but seeing it a lot may be source of performance problems (shaded.com.scylladb.cdc.driver3.driver.core.RequestHandler:822)

feature request: support for postimage

As a consumer of my CDC event stream (Kafka topic), with table cdc postimages enabled, I'd like to also receive data of the postimage *_cdc_log record (cdc$operation=9).

This would allow me to fully utilise the change event for stream processing use cases.

Without the CDC postimage record included to the message to Kafka the change is lost. Enriching the record as part of stream processing not only would result in extra read operations to Scylla (network IO, latency, ..) but it is also impossible to fetch the actual point-in-time row postimage of the change event (since the row might have changed again in the meantime - or no longer exist..)

Optional: either follow the cdc setting of the (source) table in question - or have the scylla-cdc-source-connector to explicitly configure (enable/disable) processing of postimages.

Example use cases:

  • have the full latest record available for operations where not all columns are written, e.g. for streaming to other systems / databases.

feature request: support for collection types (LIST, SET, MAP) and UDT

As a consumer of my CDC event stream (Kafka topic), with table cdc enabled and collection types (LIST, SET, MAP) and UDT used, I'd like to receive change data of all columns of the *_cdc_log record, incl. collection type + UDT fields.

This would allow me to utilise the change event for stream processing as no data is omitted.

Example use cases:

  • any consumer for a table cdc event where collection type / UDT cols have changed

java.lang.NoSuchFieldError: tlm with kafka 3.*

When creating a connector, it fails with a stacktrace:

java.lang.NoSuchFieldError: tlm
              at org.apache.log4j.MDCFriend.fixForJava9(MDCFriend.java:11)
              at org.slf4j.impl.Log4jMDCAdapter.<clinit>(Log4jMDCAdapter.java:38)
              at org.slf4j.impl.StaticMDCBinder.getMDCA(StaticMDCBinder.java:59)
              at org.slf4j.MDC.bwCompatibleGetMDCAdapterFromBinder(MDC.java:99)
              at org.slf4j.MDC.<clinit>(MDC.java:108)
              at org.apache.kafka.connect.util.LoggingContext.<init>(LoggingContext.java:209)
              at org.apache.kafka.connect.util.LoggingContext.forConnector(LoggingContext.java:104)
              at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:282)
              at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1803)
              at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getConnectorStartingCallable$37(DistributedHerder.java:1809)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
              at java.base/java.lang.Thread.run(Thread.java:833)

Happens with Kafka 3.3.1, 3.4.0 (quay.io/strimzi/kafka). Does not happen with Kafka 2.8.1.

Reducing data observation lag during CDC generation switches

By "data observation lag" I refer to the phenomenon where the data is present in CDC log tables but we don't read it yet. We may introduce such a lag intentionally in order to minimize the chance that an out-of-order write-to-the-past appears which our query window will miss - this is why the "confidence window" concept exists in scylla-cdc-java and scylla-cdc-go. But this lag may also appear unintentionally, as a side effect of library/application design/implementation. Such unintentional lag appears in the source connector during CDC generation switches (which most commonly happen on Scylla cluster topology changes).

Currently the design is roughly as follows. There is a number of "worker" processes and there is a "master" process.

Each worker periodically queries a subset of streams in the current generation. Each worker, roughly each 60 seconds (configurable, I'll call this offset_flush_interval) saves its offsets to some kind of persistent storage (there is one offset per stream, denoting that the worker has read all changes up to this offset in that stream).

The master periodically queries the CDC generations table(s) to check if there are any new generations roughly each 30 seconds (configurable, I'll call this generation_fetch_interval; in code it's called sleepBeforeGenerationDoneMs but I don't like this name). If it sees that there is a generation succeeding the currently operating one, it queries the offsets of all workers from the persistent storage. When it sees that all offsets are >= than the timestamp of the succeeding generation, it turns off the workers and starts new ones which query streams from the new generation.

This design may introduce a huge data observation lag which is unnecessary. New generations appear in the generation table(s) roughly 2 * ring_delay before they start operating, where ring_delay is a Scylla configuration parameter that nobody ever changes (except in tests) and is equal to 30s. So in practice new generations appear 60s before they start operating, speaking in terms of the clock of the Scylla node which creates the generation, and we can probably safely assume that the clocks of all our processes fit within a few-seconds interval, so we can speak in terms of the clock of our master process. This means that the master knows about a generation very early (say, 50s before it starts operating) and can take steps to get rid of the observation lag.

Consider the following example scenario with the current design. Let X be some time point.

  1. at X - 2s the master queries the generations table and sees no new generations.
  2. at X - 1s the workers store their offsets, each offset equal to X - 1s.
  3. at X a new generation appears in the tables with timestamp X + 60s (so that's when it starts operating).
  4. at X + 28s and X + 58s the master queries the generations table and sees a new generation, but does not do anything because the offsets are still < than the new generation's timestamp (X - 1s < X + 60s).
  5. at X + 59s each worker stores its offsets, each offset equal to X + 59s.
  6. at X + 58s + 30s and X + 58s + 60s the master queries the generations table and sees that there is a new generation, but as before, does nothing (X + 59s < X + 60s).
  7. at time X + 59s + 60s each worker again stores its offsets, each offset equal to X + 59s + 60s.
  8. at time X + 58s + 90s the master finally sees that all stored offsets are >= than X + 60s (the generation timestamp) so it performs the switch.

So new workers are created at X + 58s + 90s, but the generation started operating at X + 60s. We get a ~90s lag (90s - epsilon, where epsilon = 2s in my example) before we start observing data from the new generation!

This doesn't have to be the case. Consider the following alternative design (and I'm sure there are many more different/better designs):

  1. As soon as the master sees a new generation in the table (at X + 28s in the above example), it tells the existing workers that they should query no further than the generation's timestamp (X + 60s).
  2. For each worker, as soon as it queries the last window (the window which intersects the X + 60s time point), it persists its offsets and informs the master.
  3. As soon as the master learns that each worker queried the last window, it creates new workers.

Then the observation lag is independent of offset_flush_interval because the workers will truncate this interval when they learn about a new generation (they'll do the last flush earlier than usual). Furthermore, if generation_fetch_interval < 2 * ring_delay = 60s, the master will learn about the new generation before it starts operating. Then the observation lag will depend only on the querying frequency of each worker, the confidence window, and the communication delays between master and workers; thus, assuming that the communication delay is small, the lag will be roughly the same as if no generation switch was performed.

Kafka Connector Vulnerabilities

Confluent regularly performs security scans on Confluent Hub connectors, as per Confluent’s security policy. Unfortunately this connector has been flagged as having unacceptable vulnerabilities and our policy is to escalate the connector to removal stages, unless we receive confirmation that the issues are being addressed by the partner.

I have attached the vulnerability scan. Please note that we acknowledge two exceptions for vulnerabilities raised:
Partner confirms that vulnerability is a false positive
Partner confirms that the issue is valid but not exploitable

Please can you urgently acknowledge receipt of this email, and as soon as possible thereafter let us know the ScyllaDB position on these vulnerabilities.

If you require further information on any of the above, please do not hesitate to get in touch.

Best regards,

Confluent CCET Team

scylladb.csv

MultiDC cluster, observing 100% CPU when connecting kafka debezium source conector

We have our multi dc setup with 3 node in dc1 and 3 in dc2 . Although both the DCs are in the same region n same subnet, This is done as we require separate clusters for reading n writing data.

Our setup creates a new table everyday at 12 midnight with cdc enabled in DC1

At the same time we also create a kafka source connector to consume cdc logs from DC2 everyday

Issue:
At around 12
When creating a new source con , we observe scylladb servers on dc2 consumes 100% cpu.

We increased cpu from 16cores to 32 but still same behavior
Once the kafka connector creates its topic and start reading data from cdc log ,scylladb cpu cools down

Logs:

The logs in syslog shows reader_concurrency_semaphores for that time period.

Any expert thoughts is appreciated
Thanks in advance

before field is null in Debezium format message

This is an example of update operation Debezium format message when I try to use scylla-cdc-source-connector to fetch cdc data from scylla, but before field is null and it can't work together with Flink DebeziumJson

{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"keyspace_name"
},
{
"type":"string",
"optional":false,
"field":"table_name"
},
{
"type":"int64",
"optional":false,
"field":"ts_us"
}
],
"optional":false,
"name":"com.scylladb.cdc.debezium.connector",
"field":"source"
},
{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":true,
"field":"value"
}
],
"optional":true,
"name":"MyScyllaCluster.dev.test_cdc.a.Cell",
"field":"a"
},
{
"type":"int32",
"optional":true,
"field":"pk"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":true,
"field":"value"
}
],
"optional":true,
"name":"MyScyllaCluster.dev.test_cdc.v.Cell",
"field":"v"
}
],
"optional":true,
"name":"MyScyllaCluster.dev.test_cdc.Before",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":true,
"field":"value"
}
],
"optional":true,
"name":"MyScyllaCluster.dev.test_cdc.a.Cell",
"field":"a"
},
{
"type":"int32",
"optional":true,
"field":"pk"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":true,
"field":"value"
}
],
"optional":true,
"name":"MyScyllaCluster.dev.test_cdc.v.Cell",
"field":"v"
}
],
"optional":true,
"name":"MyScyllaCluster.dev.test_cdc.After",
"field":"after"
},
{
"type":"string",
"optional":true,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"MyScyllaCluster.dev.test_cdc.Envelope"
},
"payload":{
"source":{
"version":"1.0.1",
"connector":"scylla",
"name":"MyScyllaCluster",
"ts_ms":1676889174620,
"snapshot":"false",
"db":"dev",
"keyspace_name":"dev",
"table_name":"test_cdc",
"ts_us":1676889174620301
},
"before":null,
"after":{
"a":null,
"pk":1,
"v":{
"value":2222222
}
},
"op":"u",
"ts_ms":1676889216086,
"transaction":null
}
}

https://github.com/apache/flink/blob/release-1.15.2/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java#L146

Scylla CDC Connector Tests

Currently CDC connector is severely lacking in tests department. This results in unnecessarily long processes of manual testing, even when introducing the smallest of changes. And even after such testing it is hard to tell if it was extensive enough.

What kind of tests would be useful:

Unit tests – Currently there are 0 unit tests. First step should be covering at least the critical parts. Would greatly help with quickly checking nothing important is breaking with new changes.

Integration tests – Here debezium-connector-cassandra may be a good reference. We could similarly set up Scylla using Testcontainers and java-driver. After that thoroughly check if the CDC tables are translated into Kafka messages correctly, without yet sending them.

E2E – The most expensive type, but would save a lot of pain from setting up every component manually. It would be good to at least have 1 setup with recent Scylla version and one type of Kafka cluster.

Other nice-to-haves:

Stress tests – Mainly to see if anything breaks only under load. We could also think later about tracking performance.

“Nemesis” type of tests – Scenarios where we intentionally throw a wrench in-between important operations. For example: does the connector correctly resume if we crash it after processing pre-image event but before processing the insert it is related to? Will the offset be correct and pre-image reread upon restart?

Other debezium connectors that may be used as a references:
https://github.com/debezium/debezium-connector-spanner
https://github.com/debezium/debezium-connector-jdbc (This is a sink connector)
https://github.com/debezium/debezium-connector-cassandra
https://github.com/debezium/debezium-examples (Not a connector repo, but has some end-to-end examples)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.