Giter VIP home page Giter VIP logo

starrocks-connector-for-apache-flink's Introduction

StarRocks Connector for Apache Flink®

The connector supports to read from and write to StarRocks through Apache Flink®.

Documentation

For the user manual of the released version of the Flink connector, please visit the StarRocks official documentation.

For the new features in the snapshot version of the Flink connector, please see the docs in this repo.

LICENSE

The connector is under the Apache License 2.0.

starrocks-connector-for-apache-flink's People

Contributors

baisui1981 avatar banmoy avatar bigdata-kuxingseng avatar chaplinthink avatar chenhaifengkeda avatar dixingxing0 avatar emsnap avatar ggke avatar hehuiyuan avatar hellolilyliuyi avatar hffariel avatar imay avatar jameswangchen avatar jin-h avatar lyang77 avatar qingdongzeng3 avatar shouweikun avatar szza avatar waittttting avatar xlfjcg avatar yuchengxin avatar zaorangyang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

starrocks-connector-for-apache-flink's Issues

flink sql table sink 写入SR表无效

  1. 版本
    1. flink:1.13.3
    2. kafka:2.x
    3. SR:2.0.1
    4. 连接器:1.2.1-flink_1.13_2.12
    5. 采用精确一次语义写入,checkpoint为30秒触发一次。
  2. 异常描述
    1. 精确一次
      1. 上游接收kafka数据,然后原生写入SR表,使用精确一次语义写入。flink sql任务模式为streaming。flink sql任务运行期间,无任何报错,所有checkpoint均成功,但是SR表并没有被写入数据。查看flink的TM角色日志,也没有任何报错。
      2. 如果停止flink任务写入,过2分钟之后,对应的表中即可查询到最新数据。
      3. 在任务运行期间,SR没有接收到任何stream load请求,任务停止之后,瞬间接收到非常多的stream load请求。
      4. 运行期间的所有数据都丢失了。
    2. 至少一次.
      1. 将精确一次语义切换为至少一次,配置相关flush参数之后,写入正常,但是Compaction Cumulate明显增加,其他指标监控正常。
  3. 其他
    1. 一共运行了16个相似的flink sql任务,均为上游接收kafka数据,下游写入SR表,有3个任务出现了这种情况,其中两个任务上游kafka数据量为35k/s,一个任务为1.5k/s。
    2. 一些奇怪日志
    Label [f95c2db9-6345-4b25-800c-f8a3e0755de7] has already been used.

The format is JSON, and the dynamic column update is specified. Why not add columns in the code by default like CSV?

Version:
flink-connector-starrocks:1.2.3_flink-1.15
flink:1.15.0

environment:
idea

doubt:
When dynamic column update is specified, CSV does not need to set sink.properties Columns is set according to fieldnames in the Flink connector starlocks code when the Flink SQL table is built. However, JSON does not. It is because JSON has any different operations from CSV or it is forgotten to add them in the Flink connector starlocks code of JSON

Relevant source code:

//CLASS:StarRocksStreamLoadVisitor.java 
//METHOD:doHttpPut
        try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
            HttpPut httpPut = new HttpPut(loadUrl);
            Map<String, String> props = sinkOptions.getSinkStreamLoadProperties();
            for (Map.Entry<String,String> entry : props.entrySet()) {
                httpPut.setHeader(entry.getKey(), entry.getValue());
            }
            if (!props.containsKey("columns") && ((sinkOptions.supportUpsertDelete() && !__opAutoProjectionInJson) || StarRocksSinkOptions.StreamLoadFormat.CSV.equals(sinkOptions.getStreamLoadFormat()))) {
                String cols = String.join(",", Arrays.asList(fieldNames).stream().map(f -> String.format("`%s`", f.trim().replace("`", ""))).collect(Collectors.toList()));
                if (cols.length() > 0 && sinkOptions.supportUpsertDelete()) {
                    cols += String.format(",%s", StarRocksSinkOP.COLUMN_KEY); //COLUMN_KEY="__op"
                }
                httpPut.setHeader("columns", cols);
            }

last:

  1. If the logic about JSON is omitted from the code, I can submit PR to add relevant logic.
  2. If it is designed in this way, please explain the reason for the design.

Can you provide a version of Flink 1.14

The Flink cluster I have installed is 1.14.x,If flink-connector-starrocks is 1.1.14_flink-1.13_2.12,The task cannot be executed because the path or class of the package has changed,thank you

can we change the scope of dependency relevant to the flink to provided?

as you see , the scope of dependency relevant to the flink is compile

This will cause unnecessary trouble,when i introduce the dependency of flink-connector-starrocks to my pom.xml,
I must exclude the flink dependencis in advance .

so , can we change the scope of dependency relevant to the flink to provided?

Thread does not shut down correctly after flink failure recovery

Environment:
flink : 1.12
flink-connector-starrocks : 1.1.4
TM slot number : 8
flink job recovered 10 times.

Description:

  1. Since slot number is 8, so each TM should has 8 async flush threads, but after 10 times recovery, the thread number increase to 176, so we should add 'shutdown' operation for the thread.
    image

  2. starrocks-interval-sink-thread does not shutdown correctly , I will do more dig and try to fix it.
    image

限制streamload json格式数据时一批最大为100m

starrocks服务端限制了一次导入JSON格式数据最大为100M,且此参数无法修改,当json数据超过100M时会导致导入失败,希望connector端可以限制EXACTLY_ONCE模式下导入一批JSON数据的大小最大为100M

Why version 1.2.1 shade many jars such as jackson?

I update connector version from 1.1.16 to 1.2.1, I found such errors in my code,
在相应的 try 语句主体中不能抛出异常错误com.fasterxml.jackson.core.JsonProcessingException,
未报告的异常错误com.starrocks.shade.com.fasterxml.jackson.core.JsonProcessingException; 必须对其进行捕获或声明以便抛出,
because the pom.xml shade many jars, why do this?

There is a Error "parse auth info failed" in FE when using the "SinkFunction<String> sink(StarRocksSinkOptions sinkOptions)"

There is a Error "parse auth info failed" in FE when using the "SinkFunction sink(StarRocksSinkOptions sinkOptions)"

[BaseAction.getAuthorizationInfo():315] parse auth info failed, Authorization header null, url /api/xxxxxxxx fail to process url: /api/xxxxxxxxxxxxx
com.starrocks.http.UnauthorizedException: Need auth information

But there is an Authorization in the header of PUT HTTP when I debug the process of Flink.

And I implement the code of a sample sink by using HttpClients with the header "Authorization", the stream load successes.

The version of "flink-connector-starrocks" is "1.1.10_flink-1.11"

Current running txns on db 11027 is 100, larger than limit 100?

Caused by: com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response:
{"Status":"Fail","BeginTxnTimeMs":0,"Message":"current running txns on db 11027 is 100, larger than limit 100","NumberUnselectedRows":0,"CommitAndPublishTimeMs":0,"Label":"a848b543-0040-4f29-839d-fb9dbb7bd61c","LoadBytes":0,"StreamLoadPutTimeMs":0,"NumberTotalRows":0,"WriteDataTimeMs":0,"TxnId":-1,"LoadTimeMs":0,"ReadDataTimeMs":0,"NumberLoadedRows":0,"NumberFilteredRows":0}

flink cdc + starrocks connector 导入数据,不支持starrocks主键字段更新

场景:
1、mysql 表的日期字段非主键
2、starrocks 表日期字段放在主键列中
通过 flink cdc mysql + starrocks connector 写 flink sql 进行数据导入

更新了mysql表的日期字段,在 starrocks 表现为
1、以旧数据组成的主键数据行未被删除
2、以新数据组成的主键数据行新插入了starrocks
也就是 同时有了两天记录分别为新旧记录

flink cdc 源码是会把 UPDATE 的记录数据拆分成 UPDATE_BEFORE 和 UPDATE_AFTER 两条独立记录
而 starrocks connector 源码

public enum StarRocksSinkOP {
    UPSERT, DELETE;

    public static final String COLUMN_KEY = "__op";

    static StarRocksSinkOP parse(RowKind kind) {
        if (RowKind.INSERT.equals(kind) || RowKind.UPDATE_AFTER.equals(kind)) {
// 转换成 
            return UPSERT;
        }
        if (RowKind.DELETE.equals(kind) || RowKind.UPDATE_BEFORE.equals(kind)) {
            return DELETE;
        }
        throw new RuntimeException("Unsupported row kind.");
    }
}

理论上应该会得到同步 mysql 做数据的插入、更新和删除操作

flink 任务重启导致精确一次写入异常

现象:
flink 由于其他问题导致重启,比如 checkpoint 持久化失败,然后发现连接器写入 SR 数据有重复和丢失同时出现。
如下图所示:

image

image

连接器代码逻辑问题:
精确一次的情况下,数据的真正写出,并不是在本次的 checkpoint 中完成的。理论上来讲,应该是触发本次 checkpoint 时,就应该将本次 checkpoint 的所有数据进行写出,直至写出成功,才应该认为是本次 checkpoint 成功,做到真正意义上的精确一次。

是否考虑flink sql 模式下无Schema写入StarRocks

场景

上游数据的特征为schema变化频繁,为了避免频繁的重启任务,需要借助于StarRocks 根据表Schema解析Json的能力。即flink 任务的输出为Json体,StarRocks 接收到Json后,根据表当前的Schema 去解析为表数据。只要保证StarRocks 表与上游数据的Schema 保持同步,就可以实现在不影响flink 任务的情况下,下游及时获取到新增Schema的数据

现状

目前低阶API 能实现这个功能,但是无法在sql模式下调用
image
#建议
在sql 模式下通过配置的方式,让用户选择使用Schema模式和无Schema模式,比如:
`create table sr_sink (
biz_type int,
vehicle_id string,
tags array
)with(
'connector' = 'starrocks',
'jdbc-url'='',
'load-url'='
',
'database-name'='test',
'table-name'='test_with_schema',
'username'='****',
'password'='*****',
'sink.with_no_schema' = 'false'
'sink.parallelism' = '2'
);

create table sr_sink (
jsonString string
)with(
'connector' = 'starrocks',
'jdbc-url'='',
'load-url'='
',
'database-name'='test',
'table-name'='test_with_no_schema',
'username'='****',
'password'='*****',
'sink.with_no_schema' = 'true'
'sink.parallelism' = '2'
); `

stream load 默认timeout覆盖系统的stream load超时配置

image
image

connector的请求会给一个默认60S的timeout配置,而FE在处理时会优先使用request的timeout而不是系统配置的stream load超时选项,导致sink超过1分钟没完成就会停止请求并失败,这个地方的逻辑和FE的冲突了。

package failed

package failed, what could be the problem?
[INFO] ------------------------------------------------------------------------ [ERROR] Failed to execute goal org.apache.maven.plugins:maven-gpg-plugin:1.5:sign (sign-artifacts) on project flink-connector-starrocks: Exit code: 127 -> [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.apache.maven.plugins:maven-gpg-plugin:1.5:sign (sign-artifacts) on project flink-connector-starrocks: Exit code: 127 at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:216) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116) at org.apache.maven.lifecycle.internal.builder.multithreaded.MultiThreadedBuilder$1.call(MultiThreadedBuilder.java:188) at org.apache.maven.lifecycle.internal.builder.multithreaded.MultiThreadedBuilder$1.call(MultiThreadedBuilder.java:184) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.maven.plugin.MojoExecutionException: Exit code: 127 at org.apache.maven.plugin.gpg.GpgSigner.generateSignatureForFile(GpgSigner.java:168) at org.apache.maven.plugin.gpg.AbstractGpgSigner.generateSignatureForArtifact(AbstractGpgSigner.java:205) at org.apache.maven.plugin.gpg.GpgSignAttachedMojo.execute(GpgSignAttachedMojo.java:140) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 11 more [ERROR] [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException

use canal-json can not insert starrocks

tableEnv.executeSql("CREATE TABLE tb_order(\n" +
" id INT,\n" +
" order_num STRING,\n" +
" counter_code STRING,\n" +
" price INT,\n" +
" status INT \n" +
" ) WITH (\n" +
"'connector' = 'kafka',\n" +
"'topic' = 'tb_order',\n" +
"'scan.startup.mode' = 'latest-offset',\n" +
"'properties.bootstrap.servers' = 'hadoop001.test.com:9092,hadoop002.test.com:9092,hadoop003.test.com:9092 ', \n" +
"'format' = 'canal-json', \n" +
"'properties.group.id' = 'liqitest',\n" +
// "'json.fail-on-missing-field' = 'false', \n" +
"'canal-json.ignore-parse-errors' = 'true' \n "+
")");

    tableEnv.executeSql(
            "CREATE TABLE mytest03 (" +
                    "id INT," +
                    "order_num STRING," +
                    "counter_code STRING," +
                    "price INT ," +
                    "status INT " +
                    //" PRIMARY KEY (id) NOT ENFORCED"+
                    ") WITH ( " +
                    "'connector' = 'starrocks'," +
                    "'jdbc-url'='jdbc:mysql://fat-starrocks.xxx.com:9030?characterEncoding=utf-8&useSSL=false'," +
                    "'load-url'='fat-xx.ppdapi.com:8030'," +
                    "'database-name' = 'gouya_test'," +
                    "'table-name' = 'mytest03'," +
                    "'username' = 'xxxx'," +
                    "'password' = 'xxx'," +
                    "'sink.buffer-flush.max-rows' = '65000'," +
                    "'sink.buffer-flush.max-bytes' = '67108864'," +
                    "'sink.buffer-flush.interval-ms' = '1000'," +
                    "'sink.properties.column_separator' = '\\x01'," +
                    "'sink.properties.row_delimiter' = '\\x02'," +
                    "'sink.max-retries' = '3'" +
                    ")"

    );

tableEnv.executeSql("INSERT INTO mytest03 " +
"SELECT id, order_num,counter_code, price,status FROM tb_order ").print();
}

can not insert into starrocks

the data format about time type

I have build a realtime synchronize channel which is from MySql table to StarRocks:
mysql table DDL:

CREATE TABLE `base` (
  `base_id` int(11) NOT NULL,
  `start_time` datetime DEFAULT NULL,
  `update_date` date DEFAULT NULL,
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`base_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

and StarRocks table DDL

CREATE TABLE `base` (
  `base_id` int(11) NULL COMMENT "",
  `start_time` datetime NULL COMMENT "",
  `update_date` date NULL COMMENT "",
  `update_time` datetime NULL COMMENT ""
) ENGINE=OLAP
UNIQUE KEY(`base_id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`base_id`) BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT"

then I make some modfiies on existing rows,

insert into base(base_id,start_time,update_date,update_time) values(2,now(),now(),now());

and received row send by Flink-CDC-MySQL-connector, example as below :

start_time:1639831884000,update_time:2021-12-18T04:51:24Z,base_id:3,update_date:18979

as a result the values of start_time and update_date is NULL:

select * from base;

+---------+------------+-------------+---------------------+
| base_id | start_time | update_date | update_time |
+---------+------------+-------------+---------------------+
| 1 | NULL | NULL | 2021-12-17 09:21:20 |
+---------+------------+-------------+---------------------+

**My question is ** the data format like date or datetime must be a format '2021-12-17' or '2021-12-17 09:21:20' ?
date can not be a format as integer from 1970-01-01?

项目编译报错

Are there compilation and packaging steps? Can you send one? These cannot be found
import com.starrocks.shade.org.apache.thrift.TException;
import com.starrocks.shade.org.apache.thrift.protocol.TBinaryProtocol;
import com.starrocks.shade.org.apache.thrift.protocol.TProtocol;
import com.starrocks.shade.org.apache.thrift.transport.TSocket;
import com.starrocks.shade.org.apache.thrift.transport.TTransportException;

Implementation of exactly-once semantic in the sink function is totally wrong!

If we refer to the official doc about exactly-once semantic of starRocks sink connector, we can see descriptions below:

The overall process is as follows:

  1. Save data and its label at each checkpoint that is completed at a specific checkpoint interval.
  2. After data and labels are saved, block the flushing of data cached in the state at the first invoke after each checkpoint is completed.

Unfortunately, the current implementation of such semantic in starRocks connector is totally wrong. It uses Flink's operator state to hold the buffered entities on sync-phase snapshot, and it will flush to starRocks on the next #invoke method. However, this cannot handle the case that the overall checkpoint failed. If the checkpoint-42 needs all 3 sub-tasks completed, subtask-0 succeed with subtask-1 failed will lead to the checkpoint-42 failed in the end. However, data will still be sent to starRocks in subtask-0 in the #invoke call after the failed checkpoint-42. Thus, once the job failed and restore from the last successful checkpoint-41, some data will be stored twice from subtask-0.

We should introduce CheckpointListener with notifyCheckpointComplete to implement such semantics. Even though, we should better leverage two-phase commit semantics to achieve exactly-once for better performance.

Flink job blocked by snapshotState

Environment:

flink : 1.12
flink-connector-starrocks : 1.1.4

Description:

Our user reported an issue about flink job blocked:

  1. Can not consume any data from kafka
  2. Back pressures are high
  3. Checkpoint keep failing

After dig we found an substask blocked by StarRocksDynamicSinkFunction#snapshotState :
image

Check async flush thread instances :
image

So it seems async flush thread was gone (176 StarRocksSinkManager and only 175 async flush thread), and the put operation blocked since the blocking queue capacity is 1:flushQueue.put(new Tuple3<>("", 0l, null));

Check JVM and GC: Near async flush thread disapear time, the TM Old Gen GC occurred, i think this may cause the async flush thread disapeared:
image

I think we can do two things to improve this:

  1. Add configuration sink.buffer-offer.timeout-ms, and use flushQueue.offer(tuple3, timeout, unit) instead of flushQueue.put(tuple3) , if offer failed then thow an Exception, and flink will recover the job automatically.
  2. Improve exception handle for async flush thread.

I will try to submit a PR.

StarRocksStreamLoadFailedException: all partitions have no load data

Precondition:
1、Enable checkpoint.
2、sink.semantic = EXACTLY_ONCE

Exception:
Caused by: com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response: {"Status":"Fail","BeginTxnTimeMs":0,"Message":"all partitions have no load data","NumberUnselectedRows":1,"CommitAndPublishTimeMs":0,"Label":"2da8b57b-bdad-4f73-b79c-b8c4664b3ad6","LoadBytes":59,"StreamLoadPutTimeMs":1,"NumberTotalRows":1,"WriteDataTimeMs":2,"TxnId":248101,"LoadTimeMs":4,"ReadDataTimeMs":0,"NumberLoadedRows":0,"NumberFilteredRows":0}

Reason:
When checkpoint triggered, will execute the StarRocksDynamicSinkFunction#snapshotState method, Then add a Tupple2 to the checkpointedState object, And wait for the next checkpoint to write to the StarRocks. If the buffer is empty, a StarRocksStreamLoadFailedException will be thrown.

Suggest:
So i think the buffer should be checked if it is empty before add to checkpointedState.

user flink-connector-starrocks Unable to insert starrocks with date filed

When I use flink-connector-starrocks insert record to starrocks,When there is no date field, it can be inserted,
When I add a date field,The following error occurred:
com.starrocks.connector.flink.table.StarRocksDynamicSinkFunction [] - Unsupported row data invoked: [%s]
this is my code:

import java.util.Date;
public class RequestData {

/**
 * 当前日期(用于starRocks的分区、分桶的处理)
 */
private Date curDate;

}

SinkFunction starRocksSink = StarRocksSink.sink(TableSchema.builder()
.field("cur_date", DataTypes.DATE())
.field("id", DataTypes.STRING())
.build(),
StarRocksSinkOptions.builder()
.withProperty("jdbc-url", "jdbc:mysql://XXX:9030,XXX:9030,XXX:9030")
.withProperty("load-url", "XXX:8030;XXX:8030;XXX:8030")
.withProperty("username", "root")
.withProperty("password", "root")
.withProperty("database-name", "test")
.withProperty("table-name", "logData_url")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
.build(),
(slots, requestLog) -> {
slots[0] = requestLog.getCurDate();
slots[1] = UUID.randomUUID().toString();
}
);

my Table structure:

CREATE TABLE test.logData_url
(
cur_date DATE,
id STRING
)
ENGINE=olap
COMMENT "log data"
DISTRIBUTED BY HASH(id) BUCKETS 32
PROPERTIES ("storage_type"="column");

I print out the inserted parameters:

{"id":"8bb09e5e110845f39sf00df668ef3e80","curDate":1643126400000}

I guess there is a problem with the format of curDate

How should I deal with this problem?thank you

读取StarRocks 表时 谓词下推导致sql解析错误

sql体:

insert into sinkTable
select item_key,
       vehicle_id,
      item_type,item_value,modify_time
from HTWSource 
where CHAR_LENGTH(vehicle_id) < 10 ; 

任务启动时报错为

Caused by: java.lang.RuntimeException: Request of get query plan failed with code 500 {"exception":"The Sql is invalid","status":500}
    at com.starrocks.connector.flink.manager.StarRocksQueryPlanVisitor.getQueryPlan(StarRocksQueryPlanVisitor.java:134) ~[?:?]
    at com.starrocks.connector.flink.manager.StarRocksQueryPlanVisitor.getQueryInfo(StarRocksQueryPlanVisitor.java:64) ~[?:?]
    at com.starrocks.connector.flink.table.source.StarRocksSourceCommonFunc.getQueryInfo(StarRocksSourceCommonFunc.java:207) ~[?:?]
    at com.starrocks.connector.flink.table.source.StarRocksDynamicSourceFunction.<init>(StarRocksDynamicSourceFunction.java:83) ~[?:?]

观察日志发现sql 解析为了

21:06:09,066 INFO  com.starrocks.connector.flink.manager.StarRocksQueryPlanVisitor [] - query sql [select * from `bh_vehicle_iot_stream`.`htw_profile_item_info_sr` where (10)]

Consider to reset `flushException` to null in `checkFlushException`

We should consider to reset flushException to null in checkFlushException, as we came across a scene like this:

  1. Our user developed an UDTF, and he put collect(Row.of(key, value)) in exception catch block, then he just log and swallow the exception.
  2. The collect(row) triggered writeRecord and flush operation (which will offer the batch into the flushQueue).
  3. The asyncFlush operation cause an exception (In our case the reason is: "the length of input is too long than schema."), and flush thread stored the exception in flushException field.
  4. The following collect(row) operation in UDTF triggered writeRecord and checkFlushException, since the exception was swallowed by user code, and the flushException never reset to null, so the collect(row) always met the same exception.
  5. User found the error log, and changed the column length to an bigger value(which will fix the issue), but flink kept printing the origin exception message anyway.

In this case, the flink job can not discover the flushException, and the job will keep running even though it can not write any data into starrocks.

ClassNotFoundException: org.apache.flink.calcite.shaded.com.google.common.base.Preconditions

Caused by: java.lang.ClassNotFoundException: org.apache.flink.calcite.shaded.com.google.common.base.Preconditions
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 7 more

连接器找不到某个节点be的上下文(而且是随机的)

flink连接sr导出数据,程序能够正常运行,但是会突然在某一刻挂掉,提示是某个Failed to get next from be -> ip:[10.151.217.3] NOT_FOUND msg:[context_id: be9a2c1f-2119-4331-ad6b-23dd0698f555 not found],而且这个ip是随机的,每次运行都不一样,数据能够正常导出
image

Reason: there are 280 rows couldn't find a partition

  The flink job failed that consume data from kafka into starrocks, since data can't found partition, sometimes, we can't make sure that all of data both find own partition, so I hope the flink job keep running normally, even though it occur the exception like
Caused by: com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response:

{"Status":"Fail","BeginTxnTimeMs":1,"Message":"too many filtered rows","NumberUnselectedRows":0,"CommitAndPublishTimeMs":0,"Label":"94952a3a-32ce-44bd-a9ef-97392411fa45","LoadBytes":2867274,"StreamLoadPutTimeMs":4,"NumberTotalRows":1725,"WriteDataTimeMs":2879,"TxnId":360,"LoadTimeMs":2885,"ErrorURL":"http://192.168.xx.xxx:8040/api/_load_error_log?file=__shard_34/error_log_insert_stmt_3d41ccd3-cc28-acf2-faa0-71e2679d8fb6_3d41ccd3cc28acf2_faa071e2679d8fb6","ReadDataTimeMs":15,"NumberLoadedRows":1445,"NumberFilteredRows":280}
{"streamLoadErrorLog":"Reason: there are 280 rows couldn't find a partition. src line: [[2021-12-22 15:37:00, '17de1122ff5307-04554ab9f1db16-5373e62-1395396-17de1122ff6950', 'c0cbf7ba22784439b38199145b791998', '1.0.0', 'xxxxx', 0]]; \n"}

Q: What should I do except keeping data find own partition?

Get the table name from Kafka and sink the data into the corresponding table. Can this method be realized by modifying the source code? What needs to be done in the invoke method? I tried to initialize sinkoptions and sinkmanager in the invoke method, but it didn't work.

Get the table name from Kafka and sink the data into the corresponding table. Can this method be realized by modifying the source code? What needs to be done in the invoke method? I tried to initialize sinkoptions and sinkmanager in the invoke method, but it didn't work.

Examples:
kafka datastream<Tuple2<String, String>> : ("{"score": "99", "name": "stephen"}", "tmp_sr_test_api_2")

StarRocksSink : StarRocksSinkOptions.builder()
.withProperty("jdbc-url", ConfigCommon.FLINK_STARROCKS_JDBC_URL)
.withProperty("load-url", ConfigCommon.FLINK_STARROCKS_LOAD_URL)
.withProperty("username", ConfigCommon.STARROCKS_USER)
.withProperty("password", ConfigCommon.STARROCKS_PASSWORD)
.withProperty("table-name", "tmp_sr_test_api_1")
.withProperty("database-name", ConfigCommon.DATABASE_NAME)
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
.build()

Finally, I want to add the data to the “tmp_sr_test_api_2” table。

Flink Connector1.2 查询 Srarrocks 出现乱码

1.脚本如下:
image
2.在sql -client启动yarn session
./yarn-session.sh -jm 1024m -tm 4096m
./sql-client.sh embedded -i etl_starrocks2kafka_assassin_employee_post.sql
3.查询
image
4.结果如下:
image

Improve StarRocks sink metrics

I think we can add some metrics for StarRocks sink, such as 'totalFlushCount', 'totalFlushErrorCount', 'WriteDataTimeMs', etc.
I'll try to submit a PR.

scala多版本

  1. 目前pom文件中指定scala版本为2.12,希望能够对不同的flink和scala版本进行多次打包,比如1.1.13_flink-1.13.0_2.11、1.1.13_flink-1.13.0_2.12、1.1.13_flink-1.13.1_2.11、1.1.13_flink-1.13.1_2.12等,不但区分flink版本,而且区分scala版本,然后上传至**仓库。类似于flink自己提供的连接器jar包。
  2. 打包时,jar包值打包自己的源码,不对第三方jar包进行打包,让用户更容易解决依赖版本冲突问题。下面是我自己使用时对pom文件进行的一些修改,指定了scala版本为2.12,并且只打包源码文件。

<artifactId>flink-connector-starrocks_2.12</artifactId> <version>1.1.13_flink-1.13.3</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <file_encoding>UTF-8</file_encoding> <maven-enforcer-plugin.version>3.0.0-M3</maven-enforcer-plugin.version> <maven-surefire-plugin.version>3.0.0-M4</maven-surefire-plugin.version> <flink.version>1.13.3</flink.version> <scala.version>2.12</scala.version> </properties> <build> <plugins> <!-- 测试代码运行插件,可以在打包之前跳过test包下符合命名规范的所有类的代码 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.2</version> <configuration> <skipTests>true</skipTests> </configuration> </plugin> <!-- 打包插件,通过配置不同属性来控制打包形式 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>3.1.1</version> <executions> <execution> <id>copy</id> <phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> </configuration> </execution> </executions> </plugin> <!-- java编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <!-- scala编译插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.8</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-source-plugin</artifactId> <version>2.2.1</version> <executions> <execution> <id>attach-sources</id> <goals> <goal>jar-no-fork</goal> </goals> </execution> </executions> </plugin> </plugins> <resources> <!--打包资源文件--> <resource> <directory>src/main/resources</directory> <includes> <include>**</include> </includes> <filtering>false</filtering> </resource> </resources> </build>

StreamLoad报错:Failed to parse json as array. error: A string is opened, but never closed.

StarRocks用的是2.2,在StreamLoad json文件时,
Json数组里面如果条数少了返回成功,
如果json数组种的条数太多,返回这样的结果
{
"TxnId": 342,
"Label": "3576b838-6185-4dbc-9ada-b36729d80ee1",
"Status": "Fail",
"Message": "Failed to parse json as array. error: A string is opened, but never closed.",
"NumberTotalRows": 0,
"NumberLoadedRows": 0,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 60632,
"LoadTimeMs": 17,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 1,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 13,
"CommitAndPublishTimeMs": 0
}
这是为什么呢?是我的StarRocks的配置有问题吗?

ps:
导入命令:
curl -v --location-trusted -u 'name:password'
-H "format: json" -H "jsonpaths: ["$.score", "$.grade", "$.name", "$.id", "$.age"]"
-H "strip_outer_array: true"
-T b.json
http://.../api/test/student/_stream_load

b.json的部分数据
[{"score":"124","grade":1,"name":"张三","id":109,"age":"18"},{"score":"941","grade":1,"name":"张三","id":926,"age":"18"},{"score":"942","grade":1,"name":"张三","id":927,"age":"18"},{"score":"943","grade":1,"name":"张三","id":928,"age":"18"},{"score":"944","grade":1,"name":"张三","id":929,"age":"18"},{"score":"945","grade":1,"name":"张三","id":930,"age":"18"},{"score":"946","grade":1,"name":"张三","id":931,"age":"18"},{"score":"947","grade":1,"name":"张三","id":932,"age":"18"},{"score":"948","grade":1,"name":"张三","id":933,"age":"18"},{"score":"949","grade":1,"name":"张三","id":934,"age":"18"},{"score":"950","grade":1,"name":"张三","id":935,"age":"18"},{"score":"951","grade":1,"name":"张三","id":936,"age":"18"},{"score":"952","grade":1,"name":"张三","id":937,"age":"18"},{"score":"953","grade":1,"name":"张三","id":938,"age":"18"},{"score":"954","grade":1,"name":"张三","id":939,"age":"18"},{"score":"968","grade":1,"name":"张三","id":953,"age":"18"},{"score":"981","grade":1,"name":"张三","id":966,"age":"18"},{"score":"982","grade":1,"name":"张三","id":967,"age":"18"},{"score":"983","grade":1,"name":"张三","id":968,"age":"18"},{"score":"984","grade":1,"name":"张三","id":969,"age":"18"},{"score":"985","grade":1,"name":"张三","id":970,"age":"18"},{"score":"986","grade":1,"name":"张三","id":971,"age":"18"},{"score":"987","grade":1,"name":"张三","id":972,"age":"18"},{"score":"988","grade":1,"name":"张三","id":973,"age":"18"},{"score":"989","grade":1,"name":"张三","id":974,"age":"18"},{"score":"990","grade":1,"name":"张三","id":975,"age":"18"},{"score":"991","grade":1,"name":"张三","id":976,"age":"18"},{"score":"1009","grade":1,"name":"张三","id":994,"age":"18"},{"score":"1010","grade":1,"name":"张三","id":995,"age":"18"},{"score":"1011","grade":1,"name":"张三","id":996,"age":"18"},{"score":"1012","grade":1,"name":"张三","id":997,"age":"18"},{"score":"1013","grade":1,"name":"张三","id":998,"age":"18"},{"score":"1014","grade":1,"name":"张三","id":999,"age":"18"}]

关于flink版本的支持

1、flink-1.11.0在v.1.2.1版本中不兼容,后续是否考虑兼容flink-1.11.0版本(需要有source的,目前flink-1.11分支没有source)
2、flink-1.15版本(即将发布)后期是否有考虑支持?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.