Giter VIP home page Giter VIP logo

flink-connector-clickhouse-1.13's Introduction

flink-connector-clickhouse

flink版本1.13+

支持sink和source,支持实时数据写入,支持离线数据写入

DROP TABLE if exists test.lbs_ck;
CREATE TABLE if not exists test.lbs_ck (
   ts BIGINT,
   id STRING,
   geohash12 STRING,
   loc_type STRING,
   wifimac STRING,
   id_type STRING,
   .....
   address STRING,
   PRIMARY KEY(ts, id) NOT ENFORCED
) WITH (
    'connector' = 'clickhouse',  -- 使用 ck connector
    'url' = 'clickhouse://xxxxx:8123',  --集群中任意一台
    'username' = '',  
    'password' = '',  
    'database-name' = 'test', 
    'table-name' = 'lbs',  
    -----以下为sink参数------
    'sink.batch-size' = '1000000',  -- 批量插入数量
    'sink.flush-interval' = '5000',  --刷新时间,默认1s
    'sink.max-retries' = '3',  --最大重试次数
    'sink.partition-strategy' = 'hash', --插入策略hash\balanced\shuffle
    'sink.partition-key' = 'id'
    'sink.write-local' = 'true',--是否写入本地表
    'sink.ignore-delete' = 'true',
    -----以下为source参数-----
    'lookup.cache.max-rows' = '100',
    'lookup.cache.ttl' = '10',
    'lookup.max-retries' = '3'
);
--1、sink.partition-strategy选择hash时,需配置sink.partition-key,并且sink.write-local=true写入本地表;
hash函数使用murmur3_32,与官方murmurHash3_32()集群表分发策略保持一致
--2、当sink.write-local=false时写入集群表,sink.partition-strategy无效,分发策略以来ck集群表配置;

CREATE TABLE test.lbs (
    ts BIGINT,
    id STRING,
    geohash12 STRING,
    loc_type STRING,
    wifimac STRING,
    id_type STRING,
    .....
    address STRING,
    row_timestamp as TO_TIMESTAMP(FROM_UNIXTIME(ts/1000)),--需要将bigint时间转为flink的timestamp
    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
    WATERMARK FOR row_timestamp as row_timestamp - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列
) WITH (
    'connector' = 'kafka',  -- 使用 kafka connector
    'topic' = 'LBS',  
    'scan.startup.mode' = 'latest-offfset',  
    --'scan.startup.mode' = 'earliest-offset',  
    'properties.group.ib' = 'group1',  
    'properties.bootstrap.servers' = 'xxxx1:9092,xxxx2:9092',  -- kafka broker 地址
    'format.type' = 'csv',  -- 数据源格式为 csv
    'csv.disable-quote-character' = 'true',
    'csv.ignore-parser-errors' = 'false',
    'csv.field-delimiter' = '|',
    'csv.null-literal' = ''
);

insert into test.lbs_ck select ..... from test.lbs

flink-connector-clickhouse-1.13's People

Contributors

liekkassmile avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

flink-connector-clickhouse-1.13's Issues

采用source参数时,采用flink lookup join时报错java.lang.IllegalArgumentException: open() failed

打包放到flink lib目录下,flink建表语句
CREATE TABLE ipos_qtlsd (
Id bigint,
pid bigint,
tn_id bigint,
djbh string,
ydjh string,
rq int,
yyrq int,
zddm string,
lastchanged timestamp ,
proctime as PROCTIME(),
PRIMARY KEY (Id) NOT ENFORCED
) COMMENT '前台零售单' WITH (
'connector' = 'mysql-cdc',
'hostname' = '',
'port' = '3306',
'username' = '',
'password' = '',
'database-name' = 'ipos',
'table-name' = 'ipos_qtlsd',
'scan.startup.mode' = 'latest-offset',
'server-time-zone' = 'Asia/Shanghai'
)

create table dim_shop_info (
shop_code String,
shop_name String,
busi_type String
)
WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://...:8123',
'username' = 'default',
'password' = '******',
'database-name' = 'default',
'table-name' = 'dim_shop_info',
'lookup.cache.max-rows' = '100',
'lookup.cache.ttl' = '10',
'lookup.max-retries' = '3'
);

lookupjoin语句:
SELECT
t1.Id ,
t1.pid ,
t1.tn_id ,
t1.zddm,
t2.shop_name,
t2.busi_type
FROM ipos_qtlsd AS t1
LEFT JOIN dim_shop_info FOR SYSTEM_TIME AS OF t1.proctime as t2
on t1.zddm =t2.shop_code;

报错:
image

genericRowData 报空指针异常

请问下,为何writeRecordToOneExecutor的addBatch的时候要重新生成一个RowData,但是writeRecordToAllExecutors却没有?
然后我在写入的时候,genericRowData报以下异常:
2023-02-23 16:20:48,226 ERROR com.glab.flink.connector.clickhouse.table.internal.ClickHouseShardSinkFunction [] - null
java.lang.NullPointerException: null
at org.apache.flink.table.data.GenericRowData.getShort(GenericRowData.java:144) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at com.glab.flink.connector.clickhouse.table.internal.ClickHouseShardSinkFunction.genericRowData(ClickHouseShardSinkFunction.java:211) ~[flink-test.jar:?]
at com.glab.flink.connector.clickhouse.table.internal.ClickHouseShardSinkFunction.writeRecordToOneExecutor(ClickHouseShardSinkFunction.java:163) [flink-test.jar:?]
at com.glab.flink.connector.clickhouse.table.internal.ClickHouseShardSinkFunction.invoke(ClickHouseShardSinkFunction.java:140) [flink-test.jar:?]
at com.glab.flink.connector.clickhouse.table.internal.ClickHouseShardSinkFunction.invoke(ClickHouseShardSinkFunction.java:33) [flink-test.jar:?]
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:72) [flink-table-blink_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) [flink-test.jar:?]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) [flink-test.jar:?]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) [flink-test.jar:?]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-test.jar:?]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-test.jar:?]
at StreamExecCalc$984.processElement(Unknown Source) [flink-table-blink_2.12-1.12.0.jar:?]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) [flink-test.jar:?]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) [flink-test.jar:?]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) [flink-test.jar:?]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-test.jar:?]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-test.jar:?]
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) [flink-test.jar:?]
at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:87) [flink-table-blink_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36) [flink-table-blink_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) [flink-test.jar:?]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) [flink-test.jar:?]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) [flink-test.jar:?]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) [flink-test.jar:?]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-test.jar:?]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-test.jar:?]
at StreamExecCalc$28.processElement(Unknown Source) [flink-table-blink_2.12-1.12.0.jar:?]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) [flink-test.jar:?]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) [flink-test.jar:?]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) [flink-test.jar:?]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-test.jar:?]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-test.jar:?]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) [flink-test.jar:?]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) [flink-test.jar:?]
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) [flink-connector-kafka_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) [flink-connector-kafka_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137) [flink-connector-kafka_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761) [flink-connector-kafka_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [flink-test.jar:?]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [flink-test.jar:?]
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215) [flink-test.jar:?]

谢谢~

启动sql client报错

flink1.13版本,启动sql client报错
flink1.14版本,建表成功,select时报错,说找不到LogFactory
请问怎么回事呢?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.