Giter VIP home page Giter VIP logo

dbt-flink-adapter's People

Contributors

gliter avatar grzegorz-liter-getindata avatar kalondar avatar kosmag avatar maciejmaciejko-gid avatar zqwu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dbt-flink-adapter's Issues

dbt seed not works as expect

new to dbt-adapter, if any opinion is stupid, please corret

Description:

if you run dbt seed twice, it will simple double you csv file to backend.

in some situation, that is ok, like seed to kafka topics, flink cannot clear data in kafka
but other situation, eg, with a hive catalog + managed table, seed should

  • drop table first
  • recreate table
  • insert

current seeds.sql has some promblems:

  • quite complex, it even thinks of view, but seeds cann't be materialized as view (correct me if i'm wrong)
  • macros/seed/seeds.sql, adapter.get_relation always returns None

error when following your tutorial

The tutorial may be out of date on https://getindata.com/blog/dbt-run-real-time-analytics-on-apache-flink-announcing-the-dbt-flink-adapter/

dbt seed
07:18:49  Flink adapter: Statement executed. Status ERROR, operation handle: dd120861-1790-43c6-a768-36be4fc59ea1
07:18:49  Flink adapter: Exception thrown during execution: Statement execution failed
07:18:49  1 of 4 ERROR creating sql table model high_loan ................................ [ERROR in 0.18s]
07:18:49  2 of 4 START sql view model joined_data ........................................ [RUN]
07:18:49  Flink adapter: Restored session from file. Session handle: 9d054d51-803c-455e-849b-8523d9e022f6
07:18:49  Flink adapter: Creating new cursor for session <flink.sqlgateway.session.SqlGatewaySession object at 0x10a742ca0>
07:18:49  Flink adapter: Executing statement "/* {"app": "dbt", "dbt_version": "1.3.5", "profile_name": "chainbook", "target_name": "dev", "node_id": "model.chainbook.joined_data"} */

  create view if not exists /*TODO joined_data__dbt_tmp*/ joined_data /** mode('streaming')*/ as (
    select cs.event_timestamp, cs.user_id, cs.event, trx.source, trx.target, trx.amount, trx.deposit_balance_after_trx, trx.credit_balance_after_trx
from clickstream as cs
join trx as trx
on cs.user_id = trx.user_id
and cs.event_timestamp = trx.event_timestamp
  );"
07:18:49  Flink adapter: Setting 'execution.runtime-mode' to 'streaming'
SQL gateway response: {"operationHandle": "fa5f6472-f4b7-45f9-90bd-44963987bb52"}
SQL gateway response: {"operationHandle": "8e463291-1c1e-443a-bdbf-7bd66df3e3aa"}
07:18:49  Flink adapter: Statement executed. Status ERROR, operation handle: 8e463291-1c1e-443a-bdbf-7bd66df3e3aa
07:18:49  Flink adapter: Exception thrown during execution: Statement execution failed
07:18:49  2 of 4 ERROR creating sql view model joined_data ............................... [ERROR in 0.15s]
07:18:49  3 of 4 SKIP relation _default_database.daily_spending .......................... [SKIP]
07:18:49  4 of 4 SKIP relation _default_database.joined_data_output ...................... [SKIP]
07:18:49  
07:18:49  Finished running 3 table models, 1 view model, 1 hook in 0 hours 0 minutes and 1.07 seconds (1.07s).
07:18:49  
07:18:49  Completed with 2 errors and 0 warnings:
07:18:49  
07:18:49  Runtime Error in model high_loan (models/high_loan.sql)
07:18:49    Statement execution failed
07:18:49  
07:18:49  Runtime Error in model joined_data (models/joined_data.sql)
07:18:49    Statement execution failed
07:18:49  
07:18:49  Done. PASS=0 WARN=0 ERROR=2 SKIP=2 TOTAL=4
dbt run
07:21:22  Flink adapter: Setting 'execution.runtime-mode' to 'streaming'
SQL gateway response: {"operationHandle": "6bd0d202-6158-46fa-af1a-905339e9a35e"}
SQL gateway response: {"operationHandle": "51fdd553-25dc-40bd-b95e-66d80aedba0f"}
07:21:22  Flink adapter: Statement executed. Status ERROR, operation handle: 51fdd553-25dc-40bd-b95e-66d80aedba0f
07:21:22  Flink adapter: Exception thrown during execution: Statement execution failed
07:21:22  1 of 4 ERROR creating sql table model high_loan ................................ [ERROR in 0.18s]
07:21:22  2 of 4 START sql view model joined_data ........................................ [RUN]
07:21:22  Flink adapter: Restored session from file. Session handle: 9d054d51-803c-455e-849b-8523d9e022f6
07:21:22  Flink adapter: Creating new cursor for session <flink.sqlgateway.session.SqlGatewaySession object at 0x1083918b0>
07:21:22  Flink adapter: Executing statement "/* {"app": "dbt", "dbt_version": "1.3.5", "profile_name": "chainbook", "target_name": "dev", "node_id": "model.chainbook.joined_data"} */

  create view if not exists /*TODO joined_data__dbt_tmp*/ joined_data /** mode('streaming')*/ as (
    select cs.event_timestamp, cs.user_id, cs.event, trx.source, trx.target, trx.amount, trx.deposit_balance_after_trx, trx.credit_balance_after_trx
from clickstream as cs
join trx as trx
on cs.user_id = trx.user_id
and cs.event_timestamp = trx.event_timestamp
  );"
07:21:22  Flink adapter: Setting 'execution.runtime-mode' to 'streaming'
SQL gateway response: {"operationHandle": "5268d476-f7eb-47a1-8d43-5a450e80d0c6"}
SQL gateway response: {"operationHandle": "ffdda273-3e09-4269-ac85-f5b97a58711d"}
07:21:22  Flink adapter: Statement executed. Status ERROR, operation handle: ffdda273-3e09-4269-ac85-f5b97a58711d
07:21:22  Flink adapter: Exception thrown during execution: Statement execution failed
07:21:22  2 of 4 ERROR creating sql view model joined_data ............................... [ERROR in 0.17s]
07:21:22  3 of 4 SKIP relation _default_database.daily_spending .......................... [SKIP]
07:21:22  4 of 4 SKIP relation _default_database.joined_data_output ...................... [SKIP]
07:21:22  
07:21:22  Finished running 3 table models, 1 view model, 1 hook in 0 hours 0 minutes and 1.07 seconds (1.07s).
07:21:22  
07:21:22  Completed with 2 errors and 0 warnings:
07:21:22  
07:21:22  Runtime Error in model high_loan (models/high_loan.sql)
07:21:22    Statement execution failed
07:21:22  
07:21:22  Runtime Error in model joined_data (models/joined_data.sql)
07:21:22    Statement execution failed
07:21:22  
07:21:22  Done. PASS=0 WARN=0 ERROR=2 SKIP=2 TOTAL=4

Tutorial `dbt run` errors, but it's running

This tutorial https://getindata.com/blog/dbt-run-real-time-analytics-on-apache-flink-announcing-the-dbt-flink-adapter/

...after downloading the Kafka Table Connector curl -LO https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.16.0/flink-sql-connector-kafka-1.16.0.jar into the right place...

returns:

17:08:33  Running with dbt=1.3.3
17:08:33  Found 4 models, 0 tests, 0 snapshots, 0 analyses, 297 macros, 1 operation, 0 seed files, 2 sources, 0 exposures, 0 metrics
17:08:33
17:08:33
17:08:33  Running 1 on-run-start hook
17:08:33  Flink adapter: Session created: af8d9593-f26b-4aae-b43f-d41bbff8e65c
17:08:33  Flink adapter: Creating new cursor for session <flink.sqlgateway.session.SqlGatewaySession object at 0x1064dee60>
17:08:33  Flink adapter: Executing statement "/* {"app": "dbt", "dbt_version": "1.3.3", "profile_name": "example1", "target_name": "dev", "connection_name": "master"} */







CREATE TABLE IF NOT EXISTS clickstream /** mode('streaming')*/ (
 `event_timestamp` TIMESTAMP(3),
 `user_id` DECIMAL,
 `event` STRING
, WATERMARK FOR event_timestamp AS event_timestamp
)
with (
 'connector' = 'kafka',
 'properties.bootstrap.servers' = 'kafka:29092',
 'scan.startup.mode' = 'earliest-offset',
 'value.format' = 'json',
 'value.json.encode.decimal-as-plain-number' = 'true',
 'value.json.timestamp-format.standard' = 'ISO-8601',
 'properties.group.id' = 'dbt',
 'topic' = 'clickstream'

);

  "
17:08:33  Flink adapter: Setting 'execution.runtime-mode' to 'streaming'
SQL gateway response: {"operationHandle": "4fd944af-96be-4cd2-bcf2-1fab3586c9a3"}
SQL gateway response: {"operationHandle": "4994f297-501c-4ec1-a0c0-b3d4e1a2b30f"}
17:08:34  Flink adapter: Statement executed. Status FINISHED, operation handle: 4994f297-501c-4ec1-a0c0-b3d4e1a2b30f
17:08:34  Flink adapter: SQL rows returned: [{'kind': 'INSERT', 'fields': ['OK']}]
17:08:34  Flink adapter: Buffered: 1 rows
17:08:34  Flink adapter: SQL rows returned: []
17:08:34  Flink adapter: Buffered: 0 rows
17:08:34  Flink adapter: Fetched results from Flink: [('OK',)]
17:08:34  Flink adapter: Returned results from adapter: [('OK',)]
17:08:34  Flink adapter: Creating new cursor for session <flink.sqlgateway.session.SqlGatewaySession object at 0x1064dee60>
17:08:34  Flink adapter: Executing statement "/* {"app": "dbt", "dbt_version": "1.3.3", "profile_name": "example1", "target_name": "dev", "connection_name": "master"} */







CREATE TABLE IF NOT EXISTS trx /** mode('streaming')*/ (
 `event_timestamp` TIMESTAMP(3),
 `user_id` DECIMAL,
 `source` STRING,
 `target` STRING,
 `amount` DECIMAL,
 `deposit_balance_after_trx` DECIMAL,
 `credit_balance_after_trx` DECIMAL
, WATERMARK FOR event_timestamp AS event_timestamp
)
with (
 'connector' = 'kafka',
 'properties.bootstrap.servers' = 'kafka:29092',
 'scan.startup.mode' = 'earliest-offset',
 'value.format' = 'json',
 'value.json.encode.decimal-as-plain-number' = 'true',
 'value.json.timestamp-format.standard' = 'ISO-8601',
 'properties.group.id' = 'dbt',
 'topic' = 'trx'

);

  "
17:08:34  Flink adapter: Setting 'execution.runtime-mode' to 'streaming'
SQL gateway response: {"operationHandle": "14f2770a-833e-4b34-b7d8-4d4a25967e0a"}
SQL gateway response: {"operationHandle": "9dee1626-d008-4f86-80f6-3a169e0629eb"}
17:08:34  Flink adapter: Statement executed. Status FINISHED, operation handle: 9dee1626-d008-4f86-80f6-3a169e0629eb
17:08:34  Flink adapter: SQL rows returned: [{'kind': 'INSERT', 'fields': ['OK']}]
17:08:34  Flink adapter: Buffered: 1 rows
17:08:34  Flink adapter: SQL rows returned: []
17:08:34  Flink adapter: Buffered: 0 rows
17:08:34  Flink adapter: Fetched results from Flink: [('OK',)]
17:08:34  Flink adapter: Returned results from adapter: [('OK',)]
17:08:34  1 of 1 START hook: dbt_flink.on-run-start.0 .................................... [RUN]
17:08:34  1 of 1 OK hook: dbt_flink.on-run-start.0 ....................................... [OK in 0.00s]
17:08:34
17:08:34  Concurrency: 1 threads (target='dev')
17:08:34
17:08:34  1 of 4 START sql table model high_loan ......................................... [RUN]
17:08:34  Flink adapter: Restored session from file. Session handle: af8d9593-f26b-4aae-b43f-d41bbff8e65c
17:08:34  Flink adapter: Creating new cursor for session <flink.sqlgateway.session.SqlGatewaySession object at 0x1065fbe50>
17:08:34  Flink adapter: Executing statement "/* {"app": "dbt", "dbt_version": "1.3.3", "profile_name": "example1", "target_name": "dev", "node_id": "model.example1.high_loan"} */







  create  table
    high_loan
    /** mode('streaming')*/
  with (
     'connector' = 'kafka',
     'properties.bootstrap.servers' = 'kafka:29092',
     'scan.startup.mode' = 'earliest-offset',
     'value.format' = 'json',
     'value.json.encode.decimal-as-plain-number' = 'true',
     'value.json.timestamp-format.standard' = 'ISO-8601',
     'properties.group.id' = 'dbt',
     'topic' = 'high-loan'

  )
  as (
    select *
from trx
where source = 'credit'
and target = 'deposit'
and amount > 5000
  );
  "
17:08:34  Flink adapter: Setting 'execution.runtime-mode' to 'streaming'
SQL gateway response: {"operationHandle": "c1121476-0dbd-4114-9319-c4e05d7a0491"}
SQL gateway response: {"operationHandle": "5e766b11-a03f-4380-8ec0-dec9b97c8166"}
17:08:36  Flink adapter: Statement executed. Status FINISHED, operation handle: 5e766b11-a03f-4380-8ec0-dec9b97c8166
17:08:36  1 of 4 OK created sql table model high_loan .................................... [FINISHED in 1.74s]
17:08:36  2 of 4 START sql table model joined_data ....................................... [RUN]
17:08:36  Flink adapter: Restored session from file. Session handle: af8d9593-f26b-4aae-b43f-d41bbff8e65c
17:08:36  Flink adapter: Creating new cursor for session <flink.sqlgateway.session.SqlGatewaySession object at 0x10661a260>
17:08:36  Flink adapter: Executing statement "/* {"app": "dbt", "dbt_version": "1.3.3", "profile_name": "example1", "target_name": "dev", "node_id": "model.example1.joined_data"} */







  create  table
    joined_data
    /** mode('streaming')*/
  with (
     'connector' = 'kafka',
     'properties.bootstrap.servers' = 'kafka:29092',
     'scan.startup.mode' = 'earliest-offset',
     'value.format' = 'json',
     'value.json.encode.decimal-as-plain-number' = 'true',
     'value.json.timestamp-format.standard' = 'ISO-8601',
     'properties.group.id' = 'dbt'

  )
  as (
    select cs.event_timestamp, cs.user_id, cs.event, trx.source, trx.target, trx.amount, trx.deposit_balance_after_trx, trx.credit_balance_after_trx
from clickstream as cs
join trx as trx
on cs.user_id = trx.user_id
and cs.event_timestamp = trx.event_timestamp
  );
  "
17:08:36  Flink adapter: Setting 'execution.runtime-mode' to 'streaming'
SQL gateway response: {"operationHandle": "ffbb848f-a82a-46dc-87f3-aecd655afcbb"}
SQL gateway response: {"operationHandle": "b993870a-013f-4720-a1c6-336cd1689a13"}
17:08:36  Flink adapter: Statement executed. Status ERROR, operation handle: b993870a-013f-4720-a1c6-336cd1689a13
17:08:36  Flink adapter: Exception thrown during execution: Statement execution failed
17:08:36  2 of 4 ERROR creating sql table model joined_data .............................. [ERROR in 0.14s]
17:08:36  3 of 4 SKIP relation _default_database.daily_spending .......................... [SKIP]
17:08:36  4 of 4 SKIP relation _default_database.joined_data_output ...................... [SKIP]
17:08:36
17:08:36  Finished running 4 table models, 1 hook in 0 hours 0 minutes and 3.17 seconds (3.17s).
17:08:36
17:08:36  Completed with 1 error and 0 warnings:
17:08:36
17:08:36  Runtime Error in model joined_data (models/joined_data.sql)
17:08:36    Statement execution failed
17:08:36
17:08:36  Done. PASS=1 WARN=0 ERROR=1 SKIP=2 TOTAL=4

But the UI looks like good:

image

SET 'execution.runtime-mode' runs on every statement

for example

{{ config( 
    materialized='table',
    pre_hook=[
        "SET 'execution.runtime-mode' = 'streaming';",
        "set 'execution.checkpointing.interval'='20s';",
        "set 'execution.checkpointing.mode'='AT_LEAST_ONCE';"
    ]
  ) 
}}

select * from {{ source('my_source', 'topic01') }}

this will be translated into

SET 'execution.runtime-mode' = 'batch'
SET 'execution.runtime-mode' = 'streaming';
SET 'execution.runtime-mode' = 'batch'
set 'execution.checkpointing.interval'='20s';
SET 'execution.runtime-mode' = 'batch'
set 'execution.checkpointing.mode'='AT_LEAST_ONCE';
SET 'execution.runtime-mode' = 'batch'
create  table xxxx_balabala

2 ways to handle this:

  1. when see "set xx=yy" , do not automatic add another " SET 'execution.runtime-mode' ='batch'"

    this will not help when many model, like model-1 is batch, and model-2 is streaming
    when run create mode1-1, it will set to batch
    then run create model-2, it will not guarante current mode is streaming

  2. simply leave mode to user control
    eg, in model's config, pre_hook = "set xx=xx"
    i prefer this solution, it leaves all things transparent, despite somehow inconvenient

Add support for detecting existing relations/updating model

Currently when we run dbt run twice in the same session we will get exceptions that table already exist

 Caused by: org.apache.flink.table.catalog.exceptions.TableAlreadyExistException: Table (or view) default_database.joined_data already exists in Catalog default_catalog.

This task requires that we first investigate how it should be handled properly by DBT and then implement what is possible.

upgrade to support dbt-core v1.7.0

Background

Minor version v1.7 is targeted for final release on Ocotber 26, 2023. As a maintainer of a dbt adapter, we strongly encourage you to release a corresponding minor version increment to ensure users of your adapter can make use of this new minor version.

How to upgrade

dbt-labs/dbt-core#8307 is an open discussion with more detailed information. If you have questions, please put them there!

Create Flink SQL gateway mock for component tests

We should have mock for SQL Gateway (ether mock client or provide web server mock) with utility methods so we can create component tests.

It would be worth considering to have functionality that would allow us to simply switch from mock to real instance so maybe with some simple tags we could have mixed component tests with mock and e2e test with real instance.

Implement cleanup mechanism

When running dbt run we are typically ending with some pipelines. There is a dbt clean it is worth investigating what it is for and maybe if we should use it to stop deployed jobs.
If not, it would be nice to provide some scripts for cleanup.

Fix dbt test in streaming environment

DBT tests are done as a negative scenario that means the test sql should return no results, which in db enviornment would return a row with count = 0. In streaming however it returns no results.

Example query done by dbt

elect
    count(*) as failures,
    count(*) <> 0 as should_warn,
    count(*) <> 0 as should_error
from (
    select /** fetch_timeout_ms(5000) */
  *
from balance_change
where
  event = 'income'
  and balance_change < 0
    
    ) dbt_internal_test"

Create a way to extract common connector_properties to dbt_project.yml

Dbt allows to move common key from config into dbt_project.yml the problem is it only allows to move there entire key and when overriding in models, seeds, sources it overwrite entire key. This is a problem for connector_properties which is a dictionary.

My initial proposal would be to add separate key like "default_connector_properties" that could be used in dbt_project.yml and it would get merged with connector_properties.

Add watermark support

Watermarks are defined in DDL as part of column list
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

Initial idea could be to provide watermark config in source like:

version: 2
sources:
  - name: kafka
    tables:
      - name: clickstream
        config:
          connector_properties:
            connector: 'kafka'
            'properties.bootstrap.servers': 'kafka:29092'
            'topic': 'clickstream'
            'scan.startup.mode': 'earliest-offset'
            'value.format': 'json'
            'properties.group.id': 'dbt'
            'value.json.encode.decimal-as-plain-number': 'true'
        columns:
          - name: event_timestamp
            data_type: TIMESTAMP(3)
          - name: user_id
            data_type: DECIMAL
          - name: balance
            data_type: DECIMAL
          - name: loan_balance
            data_type: DECIMAL
          - name: event
            data_type: STRING
        watermark:
          column: event_timestamp
# strategies defined https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#watermark
          strategy: event_timestamp - INTERVAL '5' SECOND

Currently we have no mechanism to provide something similar.

ref works incorrectly with views

Views are named with __dbt_tmp suffix

create view joined_data__dbt_tmp /** mode('streaming')*/ as (
    select cs.event_timestamp, cs.user_id, cs.event, trx.source, trx.target, trx.amount, trx.deposit_balance_after_trx, trx.credit_balance_after_trx
from clickstream as cs
join trx as trx
on cs.user_id = trx.user_id
and cs.event_timestamp = trx.event_timestamp
  );"

But {{ ref('joined_data') }} returns just joined_data.

As temporary I will make view do not use __dbt_tmp.

DBT Opens separate sessions for `source` hook and `test/run` step

$ dbt test
...
13:58:02  Flink adapter: Session created: 3fff658f-6ee5-4067-9346-404d62282a6f
13:58:02  Flink adapter: Creating new cursor for session <flink.sqlgateway.session.SqlGatewaySession object at 0x7f92822f88b0>
13:58:02  Flink adapter: Preparing statement "/* {"app": "dbt", "dbt_version": "1.3.1", "profile_name": "example1", "target_name": "dev", "connection_name": "master"} */

    


CREATE TABLE IF NOT EXISTS clickstream (
 `event_timestamp` DECIMAL,
 `user_id` DECIMAL,
 `balance` DECIMAL,
 `loan_balance` DECIMAL,
 `event` STRING

)
with (
 'connector' = 'kafka',
 'properties.bootstrap.servers' = 'kafka:29092',
 'topic' = 'clickstream',
 'scan.startup.mode' = 'earliest-offset',
 'value.format' = 'json',
 'properties.group.id' = 'dbt',
 'value.json.encode.decimal-as-plain-number' = 'true'

);

...

13:58:03  Flink adapter: Fetched results: [('OK',)]
13:58:03  1 of 1 START hook: dbt_flink.on-run-start.0 .................................... [RUN]
13:58:03  1 of 1 OK hook: dbt_flink.on-run-start.0 ....................................... [OK in 0.00s]
13:58:03  
13:58:03  Concurrency: 1 threads (target='dev')
13:58:03  
13:58:03  1 of 1 START test no_negative_income ........................................... [RUN]
13:58:03  Flink adapter: Session created: 18e484c3-ddfc-4043-8125-cc3bce323a3a

upgrade to support dbt-core v1.5.0

Background

The latest version of dbt Core,dbt-core==1.5.0rc1, was published on April 13, 2023 (PyPI | Github).

How to upgrade

dbt-labs/dbt-core#7213 is an open discussion with more detailed information. If you have questions, please put them there!

The above linked guide has more information, but below is a high-level checklist of work that would enable a successful 1.5.0 release of your adapter.

  • Add support Python 3.11 (if you haven't already)
  • Add support for relevant tests (there's a lot of new ones!)
  • Add support model contracts
  • Add support for materialized views (this likely will be bumped to 1.6.0)

the next minor release: 1.6.0

FYI, dbt-core==1.6.0 is expected to be released at the end of July, with a release cut at least two weeks prior.

KeyError: 'nextResultUri'

when trying to run i get KeyError: 'nextResultUri'.

the response data is

{'resultType': 'EOS', 'isQueryResult': False, 'resultKind': 'SUCCESS', 'results': {'columns': [{'name': 'result', 'logicalType': {'type': 'VARCHAR', 'nullable': True, 'length': 2147483647}, 'comment': None}], 'rowFormat': 'JSON', 'data': []}}

traceback:

13:23:19  Traceback (most recent call last):
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/main.py", line 135, in main
    results, succeeded = handle_and_check(args)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/main.py", line 198, in handle_and_check
    task, res = run_from_args(parsed)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/main.py", line 245, in run_from_args
    results = task.run()
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/task/runnable.py", line 472, in run
    result = self.execute_with_hooks(selected_uids)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/task/runnable.py", line 434, in execute_with_hooks
    self.before_run(adapter, selected_uids)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/task/run.py", line 428, in before_run
    self.safe_run_hooks(adapter, RunHookType.Start, {})
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/task/run.py", line 402, in safe_run_hooks
    self.run_hooks(adapter, hook_type, extra_context)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/task/run.py", line 351, in run_hooks
    sql = self.get_hook_sql(adapter, hook, idx, num_hooks, extra_context)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/task/run.py", line 307, in get_hook_sql
    compiled = compiler.compile_node(hook, self.manifest, extra_context)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/compilation.py", line 535, in compile_node
    node = self._compile_node(node, manifest, extra_context)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/compilation.py", line 394, in _compile_node
    compiled_node.compiled_code = jinja.get_rendered(
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/clients/jinja.py", line 587, in get_rendered
    return render_template(template, ctx, node)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/clients/jinja.py", line 542, in render_template
    return template.render(ctx)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/jinja2/environment.py", line 1301, in render
    self.environment.handle_exception()
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/jinja2/environment.py", line 936, in handle_exception
    raise rewrite_traceback_stack(source=source)
  File "<template>", line 1, in top-level template code
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/jinja2/sandbox.py", line 393, in call
    return __context.call(__obj, *args, **kwargs)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/clients/jinja.py", line 326, in __call__
    return self.call_macro(*args, **kwargs)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/clients/jinja.py", line 253, in call_macro
    return macro(*args, **kwargs)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/jinja2/runtime.py", line 777, in _invoke
    rv = self._func(*arguments)
  File "<template>", line 26, in template
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/jinja2/sandbox.py", line 393, in call
    return __context.call(__obj, *args, **kwargs)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/clients/jinja.py", line 326, in __call__
    return self.call_macro(*args, **kwargs)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/clients/jinja.py", line 253, in call_macro
    return macro(*args, **kwargs)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/jinja2/runtime.py", line 777, in _invoke
    rv = self._func(*arguments)
  File "<template>", line 2, in template
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/jinja2/sandbox.py", line 393, in call
    return __context.call(__obj, *args, **kwargs)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/clients/jinja.py", line 326, in __call__
    return self.call_macro(*args, **kwargs)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/clients/jinja.py", line 253, in call_macro
    return macro(*args, **kwargs)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/jinja2/runtime.py", line 777, in _invoke
    rv = self._func(*arguments)
  File "<template>", line 11, in template
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/jinja2/sandbox.py", line 393, in call
    return __context.call(__obj, *args, **kwargs)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/adapters/base/impl.py", line 278, in execute
    return self.connections.execute(sql=sql, auto_begin=auto_begin, fetch=fetch)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/adapters/sql/connections.py", line 126, in execute
    table = self.get_result_from_cursor(cursor)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/adapters/sql/connections.py", line 114, in get_result_from_cursor
    rows = cursor.fetchall()
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/adapters/flink/handler.py", line 49, in fetchall
    self._buffer_results()
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/dbt/adapters/flink/handler.py", line 137, in _buffer_results
    result = self.last_operation.get_result(next_page=next_page)
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/flink/sqlgateway/operation.py", line 94, in get_result
    return SqlGatewayResultParser.parse_result(response.json())
  File "/Users/cheskeltwersky/Documents/dbt-place/pipelines/test/venv/lib/python3.10/site-packages/flink/sqlgateway/result_parser.py", line 35, in parse_result
    next_result_url = data["nextResultUri"]
KeyError: 'nextResultUri'

NotImplementedError when following tutorial

ENV:

python --version
3.11.5

NotImplementedError when following tutorial on https://getindata.com/blog/dbt-run-real-time-analytics-on-apache-flink-announcing-the-dbt-flink-adapter/

File "/Users/haojiangfeng/dbt/dbt-env/bin/dbt", line 5, in <module>
    from dbt.main import main
  File "/Users/haojiangfeng/dbt/dbt-env/lib/python3.11/site-packages/dbt/main.py", line 3, in <module>
    from dbt.logger import log_cache_events, log_manager
  File "/Users/haojiangfeng/dbt/dbt-env/lib/python3.11/site-packages/dbt/logger.py", line 17, in <module>
    from dbt.dataclass_schema import dbtClassMixin
  File "/Users/haojiangfeng/dbt/dbt-env/lib/python3.11/site-packages/dbt/dataclass_schema.py", line 37, in <module>
    class dbtClassMixin(DataClassDictMixin, JsonSchemaMixin):
  File "/Users/haojiangfeng/dbt/dbt-env/lib/python3.11/site-packages/mashumaro/mixins/dict.py", line 16, in __init_subclass__
    builder.add_from_dict()
  File "/Users/haojiangfeng/dbt/dbt-env/lib/python3.11/site-packages/mashumaro/core/meta/builder.py", line 419, in add_from_dict
    dialects_feature = self.is_code_generation_option_enabled(
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/haojiangfeng/dbt/dbt-env/lib/python3.11/site-packages/mashumaro/core/meta/builder.py", line 594, in is_code_generation_option_enabled
    type_name(ancestor)
  File "/Users/haojiangfeng/dbt/dbt-env/lib/python3.11/site-packages/mashumaro/core/meta/helpers.py", line 150, in type_name
    elif is_generic(t) and not is_type_origin:
         ^^^^^^^^^^^^^
  File "/Users/haojiangfeng/dbt/dbt-env/lib/python3.11/site-packages/mashumaro/core/meta/helpers.py", line 217, in is_generic
    raise NotImplementedError
NotImplementedError

min steps to reproduce:

python3 -m venv ~/.virtualenvs/xxx
dbt init
source ~/.virtualenvs/xxx/bin/activate
pip3 install dbt-flink-adapter

upgrade to support dbt-core v1.6.0

Background

Minor version v1.6 is targeted for final release on July 27, 2023. As a maintainer of a dbt adapter, we strongly encourage you to release a corresponding minor version increment to ensure users of your adapter can make use of this new minor version.

How to upgrade

dbt-labs/dbt-core#7958 is an open discussion with more detailed information. If you have questions, please put them there!

The above linked guide has more information, but below is a high-level checklist of work that would enable a successful 1.6.0 release of your adapter:

### Tasks
- [ ] SUPPORT: materialized views
- [ ] SUPPORT: new `clone` command
- [ ] BEHIND THE SCENES: Drop support for Python 3.7 (if you haven't already)
- [ ] BEHIND THE SCENES: new arg for `adapter.execute()`
- [ ] BEHIND THE SCENES: ensure support for revamped `dbt debug``
- [ ] BEHIND THE SCENES: Add support for new/modified relevant tests

the next minor release: 1.7.0

FYI, dbt-core==1.7.0 is expected to be released on October 12, 2023 in time for Coalesce, the annual analytics engineering conference!

Add column name quotes using back tick (`)

Without backticks quotation Flink will try to interprete column names as key words, example given when column name is "timestamp" it will fail.

Flink SQL> create table clickstream (timestamp DECIMAL,user_id DECIMAL,balance DECIMAL,load_balance DECIMAL,event STRING) with (
>        'connector' = 'kafka',
>        'properties.bootstrap.servers' = 'kafka:29092',
>        'topic' = 'clickstream',
>        'value.format' = 'json',
>        'properties.group.id' = 'dbt-seed',
>        'value.json.encode.decimal-as-plain-number' = 'true'
>       
>     )
> 
> ;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "timestamp" at line 1, column 27.
Was expecting one of:
    "CONSTRAINT" ...
    "PRIMARY" ...
    "UNIQUE" ...
    "WATERMARK" ...
    <BRACKET_QUOTED_IDENTIFIER> ...
    <QUOTED_IDENTIFIER> ...
    <BACK_QUOTED_IDENTIFIER> ...
    <HYPHENATED_IDENTIFIER> ...
    <IDENTIFIER> ...
    <UNICODE_QUOTED_IDENTIFIER> ...
    

Flink SQL> create table clickstream (`timestamp` DECIMAL,user_id DECIMAL,balance DECIMAL,load_balance DECIMAL,event STRING) with (
>        'connector' = 'kafka',
>        'properties.bootstrap.servers' = 'kafka:29092',
>        'topic' = 'clickstream',
>        'value.format' = 'json',
>        'properties.group.id' = 'dbt-seed',
>        'value.json.encode.decimal-as-plain-number' = 'true'
>       
>     )
> ;
[INFO] Execute statement succeed.

Add support for primary key definitions in source

We should be able to define primary key for source table https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key
It is for example needed for event time temporal joins https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#event-time-temporal-join

Example:

select *
> from clickstream
> left join trx for SYSTEM_TIME as of clickstream.event_timestamp
> on clickstream.user_id = trx.user_id
> ;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Temporal Table Join requires primary key in versioned table, but no primary key can be found. The physical plan is:
FlinkLogicalJoin(condition=[AND(=($1, $4), __INITIAL_TEMPORAL_JOIN_CONDITION($0, $3, __TEMPORAL_JOIN_LEFT_KEY($1), __TEMPORAL_JOIN_RIGHT_KEY($4)))], joinType=[left])
  FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, clickstream, watermark=[event_timestamp]]], fields=[event_timestamp, user_id, event])
  FlinkLogicalSnapshot(period=[$cor1.event_timestamp])
    FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, trx, watermark=[event_timestamp]]], fields=[event_timestamp, user_id, source, target, amount, deposit_balance_after_trx, credit_balance_after_trx])

Add support for computed and metadata columns

Flink allows providing computed and metadata column during source table creation
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#columns

Ideal solution would be to provide them under columns config like:

- name: clickstream
        config:
          type: streaming
          connector_properties:
            ...
          watermark:
            column: event_timestamp
            strategy: event_timestamp
        columns:
          - name: event_timestamp
            data_type: TIMESTAMP(3)
          - name: cost
            computed_as: price * quanitity
          - name: record_time
            data_type: TIMESTAMP_LTZ(3)
            metedata_from: timestamp
            ...

Which would resolve to:

create table clickstream (
  event_timestamp TIMESTAMP(3),
  cost AS price * quanitity,
  record_time METADATA FROM timestamp
)

Alternatively let's put it under config, my initial idea is:

- name: clickstream
        config:
          type: streaming
          connector_properties:
            ...
          watermark:
            column: event_timestamp
            strategy: event_timestamp
          computed_columns: # I am happy to discuss this name
            - name: cost
              computed_as: price * quanitity
            - name: record_time
              data_type: TIMESTAMP_LTZ(3)
              metedata_from: timestamp
        columns:
          - name: event_timestamp
            data_type: TIMESTAMP(3)
            ...

Fix versioning

DBT adapter plugins should have version that is inline with dbt-core version.
Or should declare DBT version dependency in another way than depending on own version.

Current setup.py

# require a compatible minor version (~=), prerelease if this is a prerelease
def _get_dbt_core_version():
    parts = _get_plugin_version_dict()
    minor = "{major}.{minor}.0".format(**parts)
    pre = parts["prekind"] + "1" if parts["prekind"] else ""
    return f"{minor}{pre}"

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.