Airflow + DBT + Snowflake: fact table and dimensional tables including SCD2 with load historical data and incremental updates out of order records using dbt macro because out of the box DBT snapshots aproach can not load out of order data properly. There are different approaches for creating Primary Key in dimensional tables: hash value of natural unique key, sequence, next max value of PK in this table (training and dbt testing).
DBT tests, orchestration log tables (dbt model), tables loads log (dbt pre-hook) and load complete emails with DW tables status. Airflow DAG is dynamically built based on dbt graph.
Not included: moving data from a source system to staging tables.
Orchestration is done using Airflow branch python operators and variables.
- Load Start: Load date is set, orchestration logging starts, DB connection syncs between DBT and Airflow.
- Conditional step: Re-Creating DW tables (dbt full-refresh mode, seeds load). Default values can be loaded in tables at this step.
- Defining Incremental Load range based on a previous load (Airflow hook + dbt analytic query)
- Conditional step: Validating Staging data if they are ready/present to be loaded into DW (Airflow sensor + dbt analytic query)
- Conditional step: Some transformations in the staging area (run dbt models)
- Conditional step: Load dimensions (run dbt models and macro for SCD2)
- Conditional step: Load transactional fact tables (run dbt models)
- Conditional step: Load summaries (based on transactional fact tables) fact tables (run dbt models)
- Finalizing load, closing orchestration log (run dbt models), email notification (Airflow hook + dbt analytic query)
- Conditional step: Testing loaded data (dbt test)
- Conditional step: Refresh dashboards