Giter VIP home page Giter VIP logo

starrocks-connector-for-kafka's Introduction

Download | Docs | Benchmarks | Demo

JAVA&C++ Commit Activities Open Issues Website Slack Twitter

StarRocks, a Linux Foundation project, is the next-generation data platform designed to make data-intensive real-time analytics fast and easy. It delivers query speeds 5 to 10 times faster than other popular solutions. StarRocks can perform real-time analytics well while updating historical records. It can also enhance real-time analytics with historical data from data lakes easily. With StarRocks, you can get rid of the de-normalized tables and get the best performance and flexibility.

Learn more ๐Ÿ‘‰๐Ÿป Introduction to StarRocks



Features

  • ๐Ÿš€ Native vectorized SQL engine: StarRocks adopts vectorization technology to make full use of the parallel computing power of CPU, achieving sub-second query returns in multi-dimensional analyses, which is 5 to 10 times faster than previous systems.
  • ๐Ÿ“Š Standard SQL: StarRocks supports ANSI SQL syntax (fully supported TPC-H and TPC-DS). It is also compatible with the MySQL protocol. Various clients and BI software can be used to access StarRocks.
  • ๐Ÿ’ก Smart query optimization: StarRocks can optimize complex queries through CBO (Cost Based Optimizer). With a better execution plan, the data analysis efficiency will be greatly improved.
  • โšก Real-time update: The updated model of StarRocks can perform upsert/delete operations according to the primary key, and achieve efficient query while concurrent updates.
  • ๐ŸชŸ Intelligent materialized view: The materialized view of StarRocks can be automatically updated during the data import and automatically selected when the query is executed.
  • โœจ Querying data in data lakes directly: StarRocks allows direct access to data from Apache Hiveโ„ข, Apache Icebergโ„ข, and Apache Hudiโ„ข without importing.
  • ๐ŸŽ›๏ธ Resource management: This feature allows StarRocks to limit resource consumption for queries and implement isolation and efficient use of resources among tenants in the same cluster.
  • ๐Ÿ’  Easy to maintain: Simple architecture makes StarRocks easy to deploy, maintain and scale out. StarRocks tunes its query plan agilely, balances the resources when the cluster is scaled in or out, and recovers the data replica under node failure automatically.

Architecture Overview

StarRocksโ€™s streamlined architecture is mainly composed of two modules: Frontend (FE) and Backend (BE). The entire system eliminates single points of failure through seamless and horizontal scaling of FE and BE, as well as replication of metadata and data.

Starting from version 3.0, StarRocks supports a new shared-data architecture, which can provide better scalability and lower costs.


Resources

๐Ÿ“š Read the docs

Section Description
Deploy Learn how to run and configure StarRocks.
Articles How-tos, Tutorials, Best Practices and Architecture Articles.
Docs Full documentation.
Blogs StarRocks deep dive and user stories.

โ“ Get support


Contributing to StarRocks

We welcome all kinds of contributions from the community, individuals and partners. We owe our success to your active involvement.

  1. See Contributing.md to get started.
  2. Set up StarRocks development environment:
  1. Understand our GitHub workflow for opening a pull request; use this PR Template when submitting a pull request.
  2. Pick a good first issue and start contributing.

๐Ÿ“ License: StarRocks is licensed under Apache License 2.0.

๐Ÿ‘ฅ Community Membership: Learn more about different contributor roles in StarRocks community.


Used By

This project is used by the following companies. Learn more about their use cases:

starrocks-connector-for-kafka's People

Stargazers

 avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

starrocks-connector-for-kafka's Issues

we can't use debezium jdbc sink is because hibernate doesn't work with StarRocks. debezium jdbc insert error

The reason why we can't use debezium jdbc sink is because hibernate doesn't work with StarRocks. If we fix hibernate, then we don't need our own sink (unless you want to use stream load).

2024-04-23 15:29:34 org.hibernate.service.spi.ServiceException: Unable to create requested service [org.hibernate.engine.jdbc.env.spi.JdbcEnvironment] due to: Unable to determine Dialect without JDBC metadata (please set 'javax.persistence.jdbc.url', 'hibernate.connection.url', or 'hibernate.dialect')
2024-04-23 15:29:34     at org.hibernate.service.internal.AbstractServiceRegistryImpl.createService(AbstractServiceRegistryImpl.java:277)
2024-04-23 15:29:34     at org.hibernate.service.internal.AbstractServiceRegistryImpl.initializeService(AbstractServiceRegistryImpl.java:239)
2024-04-23 15:29:34     at org.hibernate.service.internal.AbstractServiceRegistryImpl.getService(AbstractServiceRegistryImpl.java:216)
2024-04-23 15:29:34     at org.hibernate.boot.model.relational.Database.<init>(Database.java:45)
2024-04-23 15:29:34     at org.hibernate.boot.internal.InFlightMetadataCollectorImpl.getDatabase(InFlightMetadataCollectorImpl.java:231)
2024-04-23 15:29:34     at org.hibernate.boot.internal.InFlightMetadataCollectorImpl.<init>(InFlightMetadataCollectorImpl.java:199)
2024-04-23 15:29:34     at org.hibernate.boot.model.process.spi.MetadataBuildingProcess.complete(MetadataBuildingProcess.java:169)
2024-04-23 15:29:34     at org.hibernate.boot.model.process.spi.MetadataBuildingProcess.build(MetadataBuildingProcess.java:128)
2024-04-23 15:29:34     at org.hibernate.boot.internal.MetadataBuilderImpl.build(MetadataBuilderImpl.java:451)
2024-04-23 15:29:34     at org.hibernate.boot.internal.MetadataBuilderImpl.build(MetadataBuilderImpl.java:102)
2024-04-23 15:29:34     at org.hibernate.cfg.Configuration.buildSessionFactory(Configuration.java:910)
2024-04-23 15:29:34     at org.hibernate.cfg.Configuration.buildSessionFactory(Configuration.java:960)
2024-04-23 15:29:34     at io.debezium.connector.jdbc.JdbcSinkConnectorTask.start(JdbcSinkConnectorTask.java:75)
2024-04-23 15:29:34     at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:329)
2024-04-23 15:29:34     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
2024-04-23 15:29:34     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
2024-04-23 15:29:34     at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
2024-04-23 15:29:34     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2024-04-23 15:29:34     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2024-04-23 15:29:34     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2024-04-23 15:29:34     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2024-04-23 15:29:34     at java.base/java.lang.Thread.run(Thread.java:829)
2024-04-23 15:29:34 Caused by: org.hibernate.HibernateException: Unable to determine Dialect without JDBC metadata (please set 'javax.persistence.jdbc.url', 'hibernate.connection.url', or 'hibernate.dialect')
2024-04-23 15:29:34     at org.hibernate.engine.jdbc.dialect.internal.DialectFactoryImpl.determineDialect(DialectFactoryImpl.java:190)
2024-04-23 15:29:34     at org.hibernate.engine.jdbc.dialect.internal.DialectFactoryImpl.buildDialect(DialectFactoryImpl.java:86)
2024-04-23 15:29:34     at org.hibernate.engine.jdbc.env.internal.JdbcEnvironmentInitiator.getJdbcEnvironmentWithDefaults(JdbcEnvironmentInitiator.java:229)
2024-04-23 15:29:34     at org.hibernate.engine.jdbc.env.internal.JdbcEnvironmentInitiator.getJdbcEnvironmentUsingJdbcMetadata(JdbcEnvironmentInitiator.java:381)
2024-04-23 15:29:34     at org.hibernate.engine.jdbc.env.internal.JdbcEnvironmentInitiator.initiateService(JdbcEnvironmentInitiator.java:193)
2024-04-23 15:29:34     at org.hibernate.engine.jdbc.env.internal.JdbcEnvironmentInitiator.initiateService(JdbcEnvironmentInitiator.java:69)
2024-04-23 15:29:34     at org.hibernate.boot.registry.internal.StandardServiceRegistryImpl.initiateService(StandardServiceRegistryImpl.java:119)
2024-04-23 15:29:34     at org.hibernate.service.internal.AbstractServiceRegistryImpl.createService(AbstractServiceRegistryImpl.java:264)
2024-04-23 15:29:34     ... 21 more

debezium instructions do not work. should use debezium recommend connector defaults

from https://docs.starrocks.io/docs/loading/Kafka-connector-starrocks/. gist for commands and get testing enviornment. https://gist.github.com/alberttwong/a6d180c4eafecf9bdcf764196ca3d961

No row inserted or updated

If you create a connector with

{
    "name": "tpcds-customer-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgresql",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname": "tpcds",
        "topic.prefix": "tpcds",
        "table.include.list": "public.customer",
        "transforms": "addfield,unwrap",
        "transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "true",
        "transforms.unwrap.delete.handling.mode": "drop"
    }
}
{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": true,
                "field": "col_001"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_002"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_003"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_004"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_005"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_006"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_007"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_008"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_009"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_010"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_011"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_012"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_013"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_014"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_015"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_016"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_017"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_018"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "_sling_loaded_at"
            },
            {
                "type": "int32",
                "optional": false,
                "field": "__op"
            }
        ],
        "optional": false,
        "name": "tpcds.public.customer.Value"
    },
    "payload": {
        "col_001": 6,
        "col_002": "AAAAAAAAGAAAAAAA",
        "col_003": 213219,
        "col_004": 6374,
        "col_005": 27082,
        "col_006": 2451883,
        "col_007": 2451853,
        "col_008": "Ms.",
        "col_009": "Brunilda aaa bbb ccc",
        "col_010": "Sharp",
        "col_011": "Y",
        "col_012": 4,
        "col_013": 12,
        "col_014": 1925,
        "col_015": "SURINAME",
        "col_016": null,
        "col_017": "[email protected]",
        "col_018": 2452430,
        "_sling_loaded_at": 1713464143,
        "__op": 0
    }
}

Invalid value for configuration starrocks.password: String must be non-empty

By default, StarRocks is "root" with no password. We should allow blank / nothing passwords

curl -i localhost:8083/concurl -i localhost:8083/connectors/ -H "Content-Type: application/json" -X POST -d '{ "name":"starrocks-kafka-connector", "config":{ "connector.class":"com.starrocks.connector.kafka.StarRocksSinkConnector", "topics":"tpcds.public.customer", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable":"true", "value.converter.schemas.enable":"false", "starrocks.http.url":"starrocks-fe:8030", "starrocks.topic2table.map":"tpcds.public.customer:customer", "starrocks.username":"root", "starrocks.password":" ", "starrocks.database.name":"tpcds", "sink.properties.strip_outer_array":"true" } }'
HTTP/1.1 400 Bad Request
Date: Thu, 18 Apr 2024 16:29:48 GMT
Content-Type: application/json
Content-Length: 296
Server: Jetty(9.4.52.v20230823)

{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value  for configuration starrocks.password: String must be non-empty\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}[kafka@13160e022df3 ~]$ 

got "error: The JSON element does not have the requested type" while following the guide to load data

When I followed the Kafka-connector-starrocks guide to load data to StarRocks, I got the error message below.

ERROR [starrocks-kafka-connector|task-0] catch exception, wait rollback  (com.starrocks.data.load.stream.v2.StreamLoadManagerV2:424)
com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: example_db, table: test-sqlite-jdbc-accounts, label: -0aba9ed9-fd83-41c3-afcf-9be12bbc926f, 
responseBody: {
    "Status": "DATA_QUALITY_ERROR",
    "Message": "Failed to iterate document stream as object. error: The JSON element does not have the requested type.",
    "ErrorURL": "http://10.5.0.3:8040/api/_load_error_log?file=error_log_174479fdeb882113_29549abd35d3bdb2"
}
errorLog: Error: Data quality error: Failed to iterate document stream as object. error: The JSON element does not have the requested type.. Row: parser current location: [{"id":19,"name":"cathy"}]

        at com.starrocks.data.load.stream.TransactionStreamLoader.prepare(TransactionStreamLoader.java:221)
        at com.starrocks.data.load.stream.v2.TransactionTableRegion.commit(TransactionTableRegion.java:247)
        at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.lambda$init$0(StreamLoadManagerV2.java:210)
        at java.lang.Thread.run(Thread.java:750)

I can get the same error message if I run the command below.

curl --location-trusted -u root             \
    -d '[{"id":19,"name":"cathy"}]'                \
    -H "label:test-3"                  \
    -H "format: JSON"    \
    -H "columns:id,name" \
    -H "Expect:100-continue" \
    -XPUT http://localhost:8030/api/example_db/test-sqlite-jdbc-accounts/_stream_load

But if I add a header by -H "strip_outer_array:true", it will succeed.
So I fix the error by adding "sink.properties.strip_outer_array=true" to the properties file of Kafka-connector-starrocks plugin, because the configuration item will cause kafka-connector-starrocks adding "strip_outer_array:true" to http request header when do stream load.
My question is, is it possible to add the "strip_outer_array:true" header automatically by kafka-connector-starrocks, or do not wrap the data with '[ ]'?

BTW, I submitted a PR to modify the guide by adding "sink.properties.strip_outer_array=true" to the properties file.
StarRocks/starrocks#41218

MySQL datetime type data is not being synchronized in StarRocks.

Issue : There's an issue where MySQL datetime type data is being synchronized as null in StarRocks.

There is create_ymdt and update_ymdt data in topic.

	"create_ymdt": 1702407600000,
        "update_ymdt": 1702407600000,

But those data is being synchronized as null in StarRocks.

format for duplicate table insert, unique key table upsert and delete messages

PRODUCER starrocks-sink-duplicate_table-insert-message
topic: postgres.public.customer
{
"col_001": 6,
"col_002": "AAAAAAAAGAAAAAAA",
"col_003": 213219,
"col_004": 6374,
"col_005": 27082,
"col_006": 2451883,
"col_007": 2451853,
"col_008": "Ms.",
"col_009": "Brunilda aaa bbb ccc ddd eee",
"col_010": "Sharp",
"col_011": "Y",
"col_012": 4,
"col_013": 12,
"col_014": 1925,
"col_015": "SURINAME",
"col_016": null,
"col_017": "[email protected]",
"col_018": 2452430,
"_sling_loaded_at": 1713464143
}

PRODUCER starrocks-sink-unique_table-delete-message
topic: postgres.public.customer
{
"col_001": 1,
"col_002": "AAAAAAAAGAAAAAAA",
"col_003": 213219,
"col_004": 6374,
"col_005": 27082,
"col_006": 2451883,
"col_007": 2451853,
"col_008": "Ms.",
"col_009": "Brunilda aaa",
"col_010": "Sharp",
"col_011": "Y",
"col_012": 4,
"col_013": 12,
"col_014": 1925,
"col_015": "SURINAME",
"col_016": null,
"col_017": "[email protected]",
"col_018": 2452430,
"_sling_loaded_at": 1713464143,
"__op": 1
}

PRODUCER starrocks-sink-unique_table-upsert-message
topic: postgres.public.customer
{
"col_001": 1,
"col_002": "AAAAAAAAGAAAAAAA",
"col_003": 213219,
"col_004": 6374,
"col_005": 27082,
"col_006": 2451883,
"col_007": 2451853,
"col_008": "Ms.",
"col_009": "Brunilda bbbccc",
"col_010": "Sharp",
"col_011": "Y",
"col_012": 4,
"col_013": 12,
"col_014": 1925,
"col_015": "SURINAME",
"col_016": null,
"col_017": "[email protected]",
"col_018": 2452430,
"_sling_loaded_at": 1713464143,
"__op": 0
}

Kafka Connector gives java.lang.NullPointerException: Cannot invoke JsonConverterConfig.decimalFormat()

Steps to reproduce the behavior (Required)

I'm trying to run Kafka Connector on strimzi kafka connect cluster with these values:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: starrocks-kafka-connector
  labels:
    strimzi.io/cluster: my-debezium-connect-cluster
spec:
  class: com.starrocks.connector.kafka.StarRocksSinkConnector
  tasksMax: 1
  config:
    name: starrocks-kafka-connector
    tasks.max: 1
    topics: ...
    starrocks.http.url: 10....:8030
    starrocks.username: root
    starrocks.password: ....
    starrocks.database.name: kafkatest

    key.converter: io.confluent.connect.json.JsonSchemaConverter
    key.converter.schema.registry.url: http://debezium-schema-registry:8081
    key.converter.decimal.format: NUMERIC
    value.converter: io.confluent.connect.json.JsonSchemaConverter
    value.converter.schema.registry.url: http://debezium-schema-registry:8081    
    key.converter.decimal.format: NUMERIC

    transforms: unwrap
    transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
    transforms.unwrap.add.fields: op,source.ts_ms
    transforms.unwrap.delete.handling.mode: rewrite
    transforms.unwrap.drop.tombstones: false    

But I get this error message:

2024-01-14 22:11:46,129 ERROR [starrocks-kafka-connector|task-0] WorkerSinkTask{id=starrocks-kafka-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-starrocks-kafka-connector-0]
Caused by: java.lang.NullPointerException: Cannot invoke "com.starrocks.connector.kafka.json.JsonConverterConfig.decimalFormat()" because "config" is null
        at com.starrocks.connector.kafka.json.JsonConverter$1.toJson(JsonConverter.java:141)
        at com.starrocks.connector.kafka.json.JsonConverter.convertToJson(JsonConverter.java:569)
        at com.starrocks.connector.kafka.json.JsonConverter.convertToJson(JsonConverter.java:656)
        at com.starrocks.connector.kafka.StarRocksSinkTask.getRecordFromSinkRecord(StarRocksSinkTask.java:220)
        at com.starrocks.connector.kafka.StarRocksSinkTask.put(StarRocksSinkTask.java:258)

I tried to add or remove this, but nothing changed:

    key.converter.decimal.format: NUMERIC

I have also tried:

    key.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: true

But the same result.

StarRocks version (Required)

StarRocks version 3.1.7-7e897e2

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.