Giter VIP home page Giter VIP logo

astronomer / astronomer-cosmos Goto Github PK

View Code? Open in Web Editor NEW
460.0 14.0 118.0 14.29 MB

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code

Home Page: https://astronomer.github.io/astronomer-cosmos/

License: Apache License 2.0

Python 99.18% Starlark 0.12% Dockerfile 0.11% Shell 0.45% HTML 0.14%
airflow airflow-operators apache-airflow dbt python workflow

astronomer-cosmos's Introduction

image


fury ossrank downloads pre-commit.ci status

Run your dbt Core projects as Apache Airflow DAGs and Task Groups with a few lines of code. Benefits include:

  • Run dbt projects against Airflow connections instead of dbt profiles
  • Native support for installing and running dbt in a virtual environment to avoid dependency conflicts with Airflow
  • Run tests immediately after a model is done to catch issues early
  • Utilize Airflow's data-aware scheduling to run models immediately after upstream ingestion
  • Turn each dbt model into a task/task group complete with retries, alerting, etc.

Quickstart

Check out the Getting Started guide on our docs. See more examples at /dev/dags and at the cosmos-demo repo.

Example Usage

You can render a Cosmos Airflow DAG using the DbtDag class. Here's an example with the jaffle_shop project:

"""
An example DAG that uses Cosmos to render a dbt project.
"""
import os
from datetime import datetime
from pathlib import Path
from cosmos import DbtDag, ProjectConfig, ProfileConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="airflow_db",
profile_args={"schema": "public"},
),
)
# [START local_example]
basic_cosmos_dag = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
operator_args={
"install_deps": True, # install any necessary dependencies before running any dbt command
"full_refresh": True, # used only in dbt commands that support this flag
},
# normal dag parameters
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="basic_cosmos_dag",
default_args={"retries": 2},
)
# [END local_example]

This will generate an Airflow DAG that looks like this:

Community

  • Join us on the Airflow Slack at #airflow-dbt

Changelog

We follow Semantic Versioning for releases. Check CHANGELOG.rst for the latest changes.

Contributing Guide

All contributions, bug reports, bug fixes, documentation improvements, enhancements are welcome.

A detailed overview an how to contribute can be found in the Contributing Guide.

As contributors and maintainers to this project, you are expected to abide by the Contributor Code of Conduct.

License

Apache License 2.0

astronomer-cosmos's People

Contributors

agreenburg avatar binhnq94 avatar chrishronek avatar corsettis avatar danmawdsleyba avatar david-mag avatar dimberman avatar dwreeves avatar fritz-astronomer avatar iancmoritz avatar jbandoro avatar jensenity avatar jlaneve avatar joppevos avatar mikewallis42 avatar mrbones757 avatar octiva avatar pankajkoti avatar patawan avatar perttus avatar petedejoy avatar pre-commit-ci[bot] avatar raffifu avatar raphaelauv avatar rnhttr avatar siddiqueahmad avatar szecsip avatar tatiana avatar tboutaour avatar tjanif avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

astronomer-cosmos's Issues

Add support for dbt-dremio adapter

We should add all of the dbt supported platforms (and then move to the community ones). This will require two things:

  • default profile that maps to an Airflow connection

  • new extra in setup.cfg

  • See Dremio setup in dbt docs.

  • This one may be a little more challenging because there isn't an Airflow connection for Dremio.

Create e2e testing in astronomer-cosmos CI/CD pipeline

Ideally, we would want to enforce the following flow:

e2e testing in dev environment:

  1. contributor opens a feature branch and merges it into a main branch (upon approval)
  2. a Cosmos Admin would open a PR into a release- branch
  3. a deploy to an Astronomer sandbox with the /astro_tests dags so that we can do e2e testing to ensure everything still works
  4. deploy in step 3 should be done with the PyPI package located in main (that's being released)

additional checks

  • before PR from main is merged into release- branch - have a check to ensure that the PyPI version in the __init__.py file doesn't exist on PyPI
  • Add pre-commit to our repo and ensure all of the code passes those checks.

How to use it with multi python projects on Airflow

Hello,
This project is excellent and gives a lot of usefulness and verbosity for airflow pipelines.

My question is probably more airflow related rather than cosmos.
I would like to know how to cosmos when all the project dependencies are self-contained in a docker image?

The architecture where the dags and the tasks share the same runtime python environment isn't scalable. I have a case where I would like another version of python, another use case is when I have old/new dbt version which is not compatible with the airflow requirements. Another use case is when airflow is running dags/tasks from different teams and technical stacks.

For those reasons, I choose to use airflow as a scheduler and pack all the task codes in docker images.

Could you please share your experience on how you use this project in airflow installation where a lot of teams runs a lot of pipelines with diffent stacks?

dbt provider don't support gcp connection defined with service account key path.

AIRFLOW_CTX_DAG_RUN_ID=manual__2023-01-01T14:55:15+00:00
[2023-01-31, 14:57:28 UTC] {base.py:73} INFO - Using connection ID 'jaffle_shop_dbt2' for task execution.
[2023-01-31, 14:57:28 UTC] {taskinstance.py:1772} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/cosmos/providers/dbt/core/operators.py", line 336, in execute
    result = self.build_and_run_cmd(env=self.get_env(context))
  File "/home/airflow/.local/lib/python3.7/site-packages/cosmos/providers/dbt/core/operators.py", line 231, in build_and_run_cmd
    conn_id=self.conn_id, db_override=self.db_name, schema_override=self.schema
  File "/home/airflow/.local/lib/python3.7/site-packages/cosmos/providers/dbt/core/utils/profiles_generator.py", line 193, in map_profile
    db = json.loads(conn.extra_dejson.get("keyfile_dict"))["project_id"]
  File "/usr/local/lib/python3.7/json/__init__.py", line 341, in loads
    raise TypeError(f'the JSON object must be str, bytes or bytearray, '
TypeError: the JSON object must be str, bytes or bytearray, not NoneType

In this use case, my google cloud connection jaffle_shop_dbt2 is using a service account key file, and is setup with keyfile path.

I believe the issue is that the dbt provider only looks at thekeyfile_dict field and do not try to lookup the json payload using the keyfile_path .

"BIGQUERY_TYPE": json.loads(conn.extra_dejson.get("keyfile_dict"))["type"],
"BIGQUERY_PROJECT_ID": json.loads(conn.extra_dejson.get("keyfile_dict"))[
"project_id"
],
"BIGQUERY_PRIVATE_KEY_ID": json.loads(
conn.extra_dejson.get("keyfile_dict")
)["private_key_id"],
"BIGQUERY_PRIVATE_KEY": json.loads(conn.extra_dejson.get("keyfile_dict"))[
"private_key"
],
"BIGQUERY_CLIENT_EMAIL": json.loads(conn.extra_dejson.get("keyfile_dict"))[
"client_email"
],
"BIGQUERY_CLIENT_ID": json.loads(conn.extra_dejson.get("keyfile_dict"))[
"client_id"
],
"BIGQUERY_AUTH_URI": json.loads(conn.extra_dejson.get("keyfile_dict"))[
"auth_uri"
],
"BIGQUERY_TOKEN_URI": json.loads(conn.extra_dejson.get("keyfile_dict"))[
"token_uri"
],
"BIGQUERY_AUTH_PROVIDER_X509_CERT_URL": json.loads(
conn.extra_dejson.get("keyfile_dict")
)["auth_provider_x509_cert_url"],
"BIGQUERY_CLIENT_X509_CERT_URL": json.loads(
conn.extra_dejson.get("keyfile_dict")
)["client_x509_cert_url"],
}

Vars parameter on dbt operators is getting parsed as a string

#current
test_operator = DbtRunOperator(
    task_id="test_task",
    conn_id="airflow_db",
    schema="public",
    vars="'{'var_1': 'val_1'}'",
    project_dir="/usr/local/airflow/dbt/jaffle_shop",
    python_venv="/usr/local/airflow/dbt_venv/bin/activate"
)

#desired
test_operator = DbtRunOperator(
    task_id="test_task",
    conn_id="airflow_db",
    schema="public",
    vars={'var_1': 'val_1'},
    project_dir="/usr/local/airflow/dbt/jaffle_shop",
    python_venv="/usr/local/airflow/dbt_venv/bin/activate"

Databricks connection, can't use schema

The databricks connection schema gets set in the db variable here
https://github.com/astronomer/astronomer-cosmos/blob/main/cosmos/providers/dbt/core/utils/profiles_generator.py#L195
and then ultimately is missed over here because it's expecting it in the schema variable
https://github.com/astronomer/astronomer-cosmos/blob/main/cosmos/providers/dbt/core/utils/profiles_generator.py#L164

The end result is
[2023-01-30, 14:12:14 GMT] {profiles_generator.py:223} ERROR - Please specify a target schema using the schema parameter.

For the moment I've worked around it by using the schema override option.

Add support for "Config" select/exclude to DbtDag & DbtTaskGroup parsers

See dbt docs on cli usage examples here

$ dbt run --select config.materialized:incremental    # run all models that are materialized incrementally
$ dbt run --select config.schema:audit                # run all models that are created in the `audit` schema
$ dbt run --select config.cluster_by:geo_country      # run all models clustered by `geo_country`

Ultimately, in our parsers, we should be able to have a new parameter that looks something like this:

# (Either the select or exclude parameter would be specified with the materialized:table config - not both)
jaffle_shop = DbtTaskGroup(
    ...
    select={'configs': ['materialized:table']} # runs all models that are materialized as a table
    exclude={'configs': ['materialized:table']] # runs all models that don't materialize as a table
)

Add support for dbt-databricks adapter

We should add all of the dbt supported platforms (and then move to the community ones). This will require two things:

  • default profile that maps to an Airflow connection
  • new extra in setup.cfg

See Databricks setup in dbt docs. It looks like it's just using a Token (the same way Airflow connects to Databricks), so it should be pretty straightforward.

Remove pydantic dependency

Right now, we require Pedantic because I used it for validation. However, it's one of 3 dependencies we have and we should try to remove it. Options seem to be:

  • use dataclasses
  • write validation/logic ourself

Add support for "test_name" select/exclude to DbtDag & DbtTaskGroup parsers

See dbt docs on the test_name method here.

$ dbt test --select test_name:unique            # run all instances of the `unique` test
$ dbt test --select test_name:equality          # run all instances of the `dbt_utils.equality` test
$ dbt test --select test_name:range_min_max     # run all instances of a custom schema test defined in the local project, `range_min_max`

Ultimately, in our parsers, we should be able to have a new parameter that looks something like this:

# (Either the select or exclude parameter would be specified with the unique test_name - not both)
jaffle_shop = DbtTaskGroup(
    ...
    select={'test_names' : ['unique']} # run all instances of the `unique` test
    exclude={'test_names' : ['unique']} # run all instances not of the `unique` test
)

Add support for "Path" select/exclude to DbtDag & DbtTaskGroup parsers

See dbt docs on cli usage examples here

$ dbt run --select path.to.my.models     # runs all models in a specific directory
$ dbt run --select path/to/models        # run models contained in path/to/models
$ dbt run --select path/to/my_model.sql  # run a specific model by its path

Ultimately, in our parsers, we should be able to have a new parameter that looks something like this:

# (only select or exclude parameter would be specified with staging dir not both)
jaffle_shop = DbtTaskGroup(
    ...
    select={"paths": ["../models/staging"]} # runs all models in the staging directory,
    exclude={"paths": ["../models/staging"]} # runs all models in except the staging directory
)

DbtBaseOperator doesn't recognize execution_timeout parameter

We need to solution for this:

You are starting subprocess and waiting for it, the only way to stop the waiting is to kill the process, but airflow has no notion of the sub-process you started so it cannot do much there (short of killing all the process group forcefully which it should not do), So you should manage both timeout and killing the subprocess on your own.

See more details here

Add support for "source" select/exclude to DbtDag & DbtTaskGroup parsers

See dbt docs on cli usage examples here

$ dbt run --select source:snowplow+    # run all models that select from Snowplow sources

Ultimately, in our parsers, we should be able to have a new parameter that looks something like this:

# (Either the select or exclude parameter would be specified with the snowplow source - not both)
jaffle_shop = DbtTaskGroup(
    ...
    select={'sources': ['snowplow+']} # run all models that select from Snowplow sources
    exclude={'sources': ['snowplow+']} # run all models except those that select from Snowplow sources
)

Configure testing behavior

Currently, all tests run right after each model. This should be configurable, with a few different options:

  • Run tests after each model (current state)
  • Run all tests after all models
  • Don't run tests

Modify venv parameter to use dbt executable path instead of venv

Currently, we are sourcing the venv here. Instead, we should reference the dbt executable so that implementing the dbt-ol wrapper is easier.

Before:

dbt_path = [bash_path, "-c", f"source {self.python_venv} && dbt {self.base_cmd} "]

After:

{self.python_venv}/bin/dbt {self.base_cmd}

Add other tasks as dependencies of specific dbt tasks (and not as dependencies of the whole `DbtTaskGroup`)

At the moment, we can add additional tasks/operators as downstream or upstream dependencies to the DbtTaskGroup.
This means that (e.g. for downstream tasks), all tasks belonging to DbtTaskGroup must finish before they get executed.

Is it possible to add such tasks (e.g. a PythonOperator) as a downstream dependency to a specific task within the DbtTaskGroup rather than adding it as a dependency to the whole DbtTaskGroup?

Tasks within a Databricks Workflow Taskgroup should support individual retries.

Is your feature request related to a problem? Please describe.
With Databricks Workflow support, we want to give users as close to an airflow-native experience as we can while still getting the advantages of running Databricks tasks as a Databricks Job. One critical feature that Airflow users expect when they run a task within Airflow is the ability to retry a task if it fails. Currently, if a task within a DatabricksWorkflowTaskGroup fails, the user would need to start the entire taskgroup over (and essentially run the entire job again), which is a suboptimal experience for our users.

Describe the solution you'd like
Looking at the Databricks Job API we see that there is a "repair" function that reruns a failed task if we have the job's "repair ID" and the task's name. @fhoda has an example here of a system he built for a customer of tracking repair IDs for users.

I think we can take our current system for getting task Ids using job names and mix it with Faisal's repair_id example to make a system where Airflow can send repair requests for individual tasks, giving it control over which tasks.

Alternatively we can also offer a taskgroup level option to "repair all tasks" which would be more efficient for sharing clusters between tasks. This might be a little bit trickier as we'd need to add a "repair_mode" to the _CreateDatabricksWorkflowOperator.

Add utility to get `Dataset` from dbt project + model name

Right now, a user has to write out DBT://{airflow_conn_id}/{project_name}/{model_name} themselves, which can be prone to errors if there are typos, capitalization issues, or if the format changes. We should create a dataset util that can be used along the lines of:

from cosmos.providers.dbt import get_dataset
get_dataset("my_conn_id", "project_name", "model_name")

We should also use it internally, so there's a single source of truth for how datasets are defined.

`dbt_args` with `"db_name"` target is not used for BigQuery profiles

The database (project) used for the dbt target for BigQuery is currently always the project of the service account. For example in a DbtTaskGroup if the following is passed:

    run_dbt = DbtTaskGroup(
        group_id="run_dbt",
        dbt_project_name="my_dbt_project",
        dbt_root_path="dbt",
        conn_id="google_cloud_default",
        dbt_args={
            "db_name": "gcp-target-project",
            "schema": "my-dataset",
        },
        dag=dag,
    )

"db_name": "gcp-target-project" is not used and the GCP project targeted is the one of the service account. This is because in the profiles generator, where "BIGQUERY_PROJECT" is always set to the service account project id even though the dataset (schema) is set correctly:

profile = "bigquery_profile"
profile_vars = {
"BIGQUERY_DATASET": schema,
"BIGQUERY_PROJECT": json.loads(conn.extra_dejson.get("keyfile_dict"))["project_id"],
"BIGQUERY_TYPE": json.loads(conn.extra_dejson.get("keyfile_dict"))["type"],
"BIGQUERY_PROJECT_ID": json.loads(conn.extra_dejson.get("keyfile_dict"))["project_id"],
"BIGQUERY_PRIVATE_KEY_ID": json.loads(conn.extra_dejson.get("keyfile_dict"))["private_key_id"],

The fix (I think) would be simply updating it to be "database" instead.

Add support for "test_type" select/exclude to DbtDag & DbtTaskGroup parsers

See dbt docs on the test_type method here.

$ dbt test --select test_type:generic        # run all generic tests
$ dbt test --select test_type:singular       # run all singular tests

Ultimately, in our parsers, we should be able to have a new parameter that looks something like this:

# (Either the select or exclude parameter would be specified with the generic test_type - not both)
jaffle_shop = DbtTaskGroup(
    ...
    select={'test_types': ['generic']} # runs all generic tests in a dbt project
    exclude={'test_types': ['generic']} # runs all tests except for the generic ones in a dbt project
)

Add support for "tags" select/exclude to DbtDag & DbtTaskGroup parsers

See dbt docs on cli usage examples here

$ dbt run --select tag:nightly    # run all models with the `nightly` tag

Ultimately, in our parsers, we should be able to have a new parameter that looks something like this:

# (Either the select or exclude parameter would be specified with the nightly tag - not both)
jaffle_shop = DbtTaskGroup(
    ...
    select={'tags': ['nightly']} # runs all models that have the nightly tag
    exclude={'tags': ['nightly']} # runs all models that don't have the nightly tag
)

Note: We already have this as the dbt_tags arg so we'll just need to convert that parameter to be part of the select parameter

Add DbtDepsOperator to DbtDag and DbtTaskGroup

The extract DAG for the mrr-playbook example was failing after initializing the cosmos-dev sandbox. This is because mrr-playbook has a packages.yml file with defined dependencies. After creating the DbtDepsOperator and discussing with @jlaneve, we decided that there should be a DbtDepsOperator generated at as the first task of any DbtDag or DbtTaskGroup class (if a packages.yml exists in that project).

We'll work on this after this is merged.

Users should be able to define python packages at job level and task level

Problem Statement

Currently, when users define a databricks job, they have the ability to add pip packages to their Python environment at a task level. However, the DatabricksWorkflowTaskGroup has not exposed this functionality, meaning that our users can only run jobs on base databricks clusters. This is especially blocking since Databricks Job clusters do not allow for defining python libraries at the job cluster creation level, so there is no way to add the libraries to the job_cluster_spec.

Proposed Solution

We propose adding the ability to define pip packages at both the task group and task level within the DatabricksWorkflowTaskGroup. This will look like an array of strings that specify packages. Here is an example of what the DAG could look like:

with dag:
    # [START howto_databricks_workflow_notebook]
    task_group = DatabricksWorkflowTaskGroup(
        group_id="test_workflow",
        databricks_conn_id=DATABRICKS_CONN_ID,
        job_clusters=job_cluster_spec,
        notebook_params=[],
        notebook_packages=[
            {
                "pypi": {
                    "package": "simplejson"
                }
            },
            {
                "pypi": {
                    "package": "tensorflow"
                }
            }],
    )
    with task_group:
        notebook_1 = DatabricksNotebookOperator(
            task_id="notebook_1",
            databricks_conn_id=DATABRICKS_CONN_ID,
            notebook_path="/Users/[email protected]/Test workflow",
            source="WORKSPACE",
            job_cluster_key="Shared_job_cluster",
            notebook_packages=[{
                "pypi": {
                    "package": "foo",
                    "repo": "http://pypi.org"
                }
            }],
        )
        notebook_2 = DatabricksNotebookOperator(
            task_id="notebook_2",
            databricks_conn_id=DATABRICKS_CONN_ID,
            notebook_path="/Users/[email protected]/Test workflow",
            source="WORKSPACE",
            job_cluster_key="Shared_job_cluster",
            notebook_params={
                "foo": "bar",
            },
        )
        notebook_1 >> notebook_2

Implementation details

The big caveat here is that there is no job-level definition for python packages in the Databricks Jobs API. So when we offer a tasgroup-level package definition, it is more as a convenience function that would need to be applied against all tasks. Also please note that we would want to have a separate "notebook_packages" and "python_packages" as databricks specifies a difference between notebooks and python scripts.

Once the global and task-level packages have been merged, we would modify the convert_to_databricks_workflow_task function to include the "libraries" in the task definition json.

        "tasks": [
            {
                "task_key": "example_databricks_workflow__test_workflow__notebook_1",
                "notebook_task": {
                    "notebook_path": "/Users/[email protected]/Test workflow",
                    "source": "WORKSPACE"
                },
                "job_cluster_key": "Shared_job_cluster",
                "libraries": [
                    {
                        "pypi": {
                            "package": "simplejson"
                        }
                    },
                    {
                        "pypi": {
                            "package": "tensorflow"
                        }
                    },
                    {
                        "pypi": {
                            "package": "foo",
                            "repo": "http://pypi.org"
                        }
                    }
                ],
                "timeout_seconds": 0,
                "email_notifications": {}
            },

Please note that we are using dictionaries as there are a wide array of sources users can use for libraries (e.g. s3, jars from maven, etc.) so it would be a pain to define and maintain these connections.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.