Giter VIP home page Giter VIP logo

flink-sql-lineage's Introduction

Anurag's GitHub stats

I’m currently working on LLM agent and big data.

Speech

  1. [Apache CommunityOverCode Asia 2023] Flinksql's field lineage and data permission solution
  2. [Flink Forward Asia 2023] Entering the Future: Empowering Flink Intelligence with Large Models
  3. 第六届金猿奖《2023大数据产业年度趋势人物》

Research

  1. Smart Public Transportation Sensing: Enhancing Perception and Data Management for Efficient and Safety Operations [paper]
    Tianyu Zhang, Xin Jin, Song Bai, Yuxin Peng, Ye Li and Jun Zhang

flink-sql-lineage's People

Contributors

asteriachiu avatar dependabot[bot] avatar hamawhitegg avatar tonybobam avatar yangstar avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flink-sql-lineage's Issues

[Bug]Flink cep sql column lineage parse error

Here is my test cep sql and ddl sql

CREATE TABLE `sink_table`(`agent_id` STRING, `room_id` STRING, `application_id` STRING, `type` STRING, `begin_time` BIGINT, `end_time` BIGINT) WITH(...)

CREATE TABLE `source_table`(`agent_id` STRING, `room_id` STRING, `create_time` BIGINT, `type` STRING, `application_id` STRING, `connect_time` BIGINT, `row_time` AS PROCTIME()) WITH(...)

insert into sink_table (agent_id,room_id,application_id,type,begin_time,end_time) 
select agent_id,room_id,application_id,type,begin_time,end_time 
	from source_table 
	match_recognize (partition by agent_id,room_id,application_id order by row_time 
	measures AF.type as type,last(BF.create_time) as begin_time,last(AF.create_time) as end_time 
	one row per match 
	after match SKIP PAST LAST ROW 
	pattern (BF+ AF) WITHIN INTERVAL '1' HOUR 
	define BF as BF.type = 'assign',AF as AF.type = 'pick_up' ) as T

As the code logic of the RelMdColumnOrigins, the lineage result of my cep sql is wrong, such as sink_table application_id column lineage

Execute SQL: USE Flink16_memory.`default` 执行异常

2024-06-19 11:09:09,971 INFO main com.hw.lineage.flink.LineageServiceImpl.executeSql:158 - Execute SQL: USE Flink16_memory.default 启动异常

Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: A catalog with name [Flink16_memory] does not exist.

main 分支测试报错

@Slf4j
public class SimpleTest extends AbstractBasicTest {

    @Before
    public void createTable() {
        createMysqlUser();
        createOdsUser();
    }

    @Test
    public void concat() {
        String sql = "insert into mysql_user " +
                "select id, birthday, concat(first_name, last_name) as full_name " +
                "from ods_user";
        List<Result> actualList = context.parseFieldLineage(sql);
        log.info("Linage Result: ");
        actualList.forEach(e -> log.info(e.toString()));
    }

    protected void createMysqlUser() {
        context.execute("DROP TABLE IF EXISTS mysql_user ");
        context.execute("CREATE TABLE IF NOT EXISTS mysql_user (" +
                "       id                        BIGINT           ," +
                "       birthday                  TIMESTAMP(3)     ," +
                "       full_name                 STRING            " +
                ") WITH ( " +
                "       'connector' = 'jdbc'                 ," +
                "       'url'       = 'jdbc:mysql://127.0.0.1:3306/demo?useSSL=false&characterEncoding=UTF-8'," +
                "       'username'  = 'root'                 ," +
                "       'password'  = 'xxx'          ," +
                "       'table-name'= 'mysql_user' " +
                ")"
        );
    }

    protected void createOdsUser() {
        context.execute("DROP TABLE IF EXISTS ods_user ");
        context.execute("CREATE TABLE IF NOT EXISTS ods_user (" +
                "       id                        BIGINT           ," +
                "       birthday                  TIMESTAMP(3)     ," +
                "       first_name                STRING           ," +
                "       last_name                 STRING           ," +
                "       company_name              STRING           " +
                ") WITH ( " +
                "       'connector' = 'jdbc'                 ," +
                "       'url'       = 'jdbc:mysql://127.0.0.1:3306/demo?useSSL=false&characterEncoding=UTF-8'," +
                "       'username'  = 'root'                 ," +
                "       'password'  = 'xxx'          ," +
                "       'table-name'= 'ods_user' " +
                ")"
        );
    }
}

如果去掉 ods_user 的 company_name 字段就能解析成功。我这个写法是有问题吗。

UDTF 函数使用别名时解析异常

INSERT INTO
dwd_hudi_users
SELECT
length,
name,
word as company_name,
birthday,
ts,
DATE_FORMAT(birthday, 'yyyyMMdd')
FROM
ods_mysql_users,
LATERAL TABLE (my_split_udtf (name))
作者给的这个示例是可以解析出血缘关系;
如果sql更换为
INSERT INTO
dwd_hudi_users
SELECT
length,
column1 as name,
word as company_name,
birthday,
ts,
DATE_FORMAT(birthday, 'yyyyMMdd')
FROM
ods_mysql_users,
LATERAL TABLE (my_split_udtf (name)) as T (column1)
解析此sql是用2.0.0的RelMdColumnOrigins 会抛出 org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc cannot be cast to org.apache.calcite.rel.core.TableFunctionScan异常
使用main的RelMdColumnOrigins
mq.getColumnOrigins(rel.getRight(), iOutputColumn - nLeftColumns)
mappings 为null 并且 rel.getInputs().isEmpty() 返回set size 0
source column无法解析

血缘解析 使用UUID()时血缘解析 血缘关系错误 flink版本1.14

使用作者的Test 添加 如下代码

    @Test
    public void testInsertSelectUUID() {
        String sql = "INSERT INTO dwd_hudi_users (id, name, birthday) " +
                "SELECT " +
                "   ROW_NUMBER() OVER (ORDER BY ts DESC) as id," +
                "   UUID() as name," +
                "   birthday " +
                "FROM" +
                "   ods_mysql_users";
    
        String[][] expectedArray = {
                {"ods_mysql_users", "ts", "dwd_hudi_users", "id",
                        "ROW_NUMBER() OVER (ORDER BY ts DESC NULLS LAST)"},
                {"ods_mysql_users", "name", "dwd_hudi_users", "name"},
                {"ods_mysql_users", "ts", "dwd_hudi_users", "birthday"}
        };
        
        analyzeLineage(sql, expectedArray);
    }

其中 {"ods_mysql_users", "ts", "dwd_hudi_users", "birthday"} 期望应该是 {"ods_mysql_users", "birthday", "dwd_hudi_users", "birthday"} name字段的sourceCol 是uuid函数 应该是没有列吧

当where中包含字段等于常量时,会把源字段优化成常量,导致丢失改字段的血缘关系

当输入以下 FlinkSQL 时:
`
CREATE TABLE demo_log_01 (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);
CREATE TABLE demo_log_02 (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);
CREATE TABLE demo_log_05 (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) WITH (
  'connector' = 'print'
);
insert into demo_log_05
select a.user_id,b.item_id,b.behavior,a.dt,a.hh from 
(select  user_id,item_id,behavior,dt,hh from demo_log_01 where dt='2023-17-02') a 
left join 
(select  user_id,item_id,behavior,dt,hh from demo_log_02) b
on a.user_id = b.user_id

`

作业级别血缘实现方案探讨

本地运行了一下项目,使用了下基本功能,感觉还是很不错的,很厉害!请问 @HamaWhiteGG 有做任务间血缘的打算吗,我最近也在做flink实时平台的血缘工作可以一起探讨下,将 source、task、sink 之间的血缘展示出来。

[Discussion] 关于在flink当中添加atlasHook的实现

目前flink社区对接atlas的进展: https://docs.google.com/document/d/1wSgzPdhcwt-SlNBBqL-Zb7g8fY6bN8JwHEg7GCdsBG8/edit#heading=h.mfdxvgyhozur

flink侧需要改动的poc验证: https://github.com/gyfora/atlas/blob/flink-bridge/addons/flink-bridge/src/main/java/org/apache/atlas/flink/hook/FlinkAtlasHook.java

前辈如果有兴趣的话可以参考下作者的实现,对flink的侵入性比较小. 也比较通用
现在atlas2.2.0这部分代码已经合并到主干了 flink这边还要做一些poc验证

flinksql lineage result missed one column

here is my sql script

-- source1
DROP TABLE IF EXISTS `dwd_status_log`;
CREATE TABLE `dwd_status_log`
(
    mechine STRING,
    timenow STRING,
    `value` STRING,
    pre_value STRING,
    PRIMARY KEY (timenow) NOT ENFORCED
)  
WITH (
    'connector' = 'kafka',
    'topic' = 'dwd_status_log',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'debezium-json'
);

-- source2
DROP TABLE IF EXISTS dim_equipment_info;
CREATE TABLE IF NOT EXISTS dim_equipment_info (
    id STRING 
    ,station_type STRING 
    ,equipment_no STRING 
    ,emp_name STRING 
    ,emp_phone STRING 
    ,vendor STRING 
    ,site STRING 
    ,`floor` STRING 
    ,line STRING 
    ,model STRING 
    ,stage STRING 
    ,id_value STRING 
    ,emp_info STRING
    ,PRIMARY KEY ( id ) NOT ENFORCED
) 
WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:4000/data_warehouse_p_10046?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true',
   'table-name' = 'dim_equipment_info',
   'username' = 'root',
   'password' = 'xxx'
);


-- sink
DROP TABLE IF EXISTS dws_alarm_status_count;
CREATE TABLE IF NOT EXISTS dws_alarm_status_count (
    mechine_id STRING 
    ,mechine_name STRING 
    ,trigger_date STRING 
    ,alarm_status_count BIGINT 
    ,PRIMARY KEY ( mechine_id,trigger_date ) NOT ENFORCED
) 
WITH (
    'connector' = 'kafka',
    'topic' = 'dws_alarm_status_count',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'debezium-json'
);

-- trans
INSERT INTO 
    dws_alarm_status_count 
SELECT
    dsl.mechine AS mechine_id,
    dei.equipment_no AS mechine_name,
    DATE_FORMAT(dsl.timenow, 'yyyy-MM-dd') AS `trigger_date`,
    COUNT(*) AS alarm_status_count
FROM 
    dwd_status_log dsl 
LEFT JOIN 
    dim_equipment_info dei 
ON 
    dsl.mechine = dei.id 
WHERE 
    dsl.`value` = '4' 
GROUP BY 
DATE_FORMAT(dsl.timenow, 'yyyy-MM-dd'), dsl.mechine,dei.equipment_no;

it missed the last column lineage data.
1688983293972

The transform of lineage result is null when parsing UDAF

public class TestAggregateFunction extends AggregateFunction<String, TestAggregateFunction.TestAggregateAcc> {

    public void accumulate(TestAggregateAcc acc, String param1, String param2, String param3) {
        acc.test = param1 + param2 + param3;
    }

    @Override
    public String getValue(TestAggregateAcc accumulator) {
        return accumulator.test;
    }

    @Override
    public TestAggregateAcc createAccumulator() {
        return new TestAggregateAcc();
    }

    public static class TestAggregateAcc {
        public String test;
    }
}

create function test_aggregate as 'com.hw.lineage.flink.aggregatefunction.TestAggregateFunction'

INSERT INTO dwd_hudi_users SELECT id, name, test_aggregate(concat_ws('_', name, 'test'), name, 'test'), birthday, ts, DATE_FORMAT(birthday, 'yyyyMMdd') FROM ods_mysql_users group by id, name, birthday, ts

The transform of lineage result is null when parsing UDAF.
eq:
LineageResult(sourceCatalog=hive, sourceDatabase=default, sourceTable=ods_mysql_users, sourceColumn=name, targetCatalog=hive, targetDatabase=default, targetTable=dwd_hudi_users, targetColumn=company_name, transform=null)

CROSS JOIN UNNEST 血缘解析报错,可能要扩展 RelMdColumnOrigins.getColumnOrigins 方法实现,大佬有空支持下。

CREATE TABLE kafka_source (
id bigint,
data ARRAY<row<c1 bigint,c2 string>>
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'hadoop101:9092',
'topic' = 't1',
'properties.group.id' = 'g1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);

CREATE TABLE sink_print (
id bigint,
c1 bigint,
c2 string
) WITH (
'connector' = 'print'
);

insert into sink_print
SELECT id, t.c1, t.c2
FROM kafka_source
CROSS JOIN UNNEST(data) AS t(c1,c2)
;

IMG20240423113658

Error when parsing UDAF which the number of input and udf argument is equal

image

public class TestAggregateFunction extends AggregateFunction<String, TestAggregateFunction.TestAggregateAcc> {

    public void accumulate(TestAggregateAcc acc, String param1, String param2, String param3) {
        acc.test = param1 + param2 + param3;
    }

    @Override
    public String getValue(TestAggregateAcc accumulator) {
        return accumulator.test;
    }

    @Override
    public TestAggregateAcc createAccumulator() {
        return new TestAggregateAcc();
    }

    public static class TestAggregateAcc {
        public String test;
    }
}

create function test_aggregate as 'com.hw.lineage.flink.aggregatefunction.TestAggregateFunction'

  String sql = "INSERT INTO dwd_hudi_users " +
                "SELECT " +
                "   id ," +
                "   name ," +
                "   test_aggregate(concat_ws('_', name, email), address, 'test')," +
                "   birthday ," +
                "   ts ," +
                "   DATE_FORMAT(birthday, 'yyyyMMdd') " +
                "FROM" +
                "   ods_mysql_user_detail group by id, name, birthday, ts ";
context.execute("CREATE TABLE IF NOT EXISTS ods_mysql_user_detail (" +
                "       id                  BIGINT PRIMARY KEY NOT ENFORCED ," +
                "       name                STRING                          ," +
                "       birthday            TIMESTAMP(3)                    ," +
                "       ts                  TIMESTAMP(3)                    ," +
                "       email               STRING                          ," +
                "       address             STRING                          ," +
                "       proc_time as proctime()                              " +
                ") WITH ( " +
                "       'connector' = 'mysql-cdc'            ," +
                "       'hostname'  = '127.0.0.1'       ," +
                "       'port'      = '3306'                 ," +
                "       'username'  = 'root'                 ," +
                "       'password'  = 'xxx'          ," +
                "       'server-time-zone' = 'Asia/Shanghai' ," +
                "       'database-name' = 'demo'             ," +
                "       'table-name'    = 'users' " +
                ")");

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.