getindata / dbt-airflow-factory Goto Github PK
View Code? Open in Web Editor NEWLibrary to convert DBT manifest metadata to Airflow tasks
License: Apache License 2.0
Library to convert DBT manifest metadata to Airflow tasks
License: Apache License 2.0
Hi Guys,
When I running this factory with latest Airflow (2.5.0) I'm getting following error:
ERROR [airflow.models.dagbag.DagBag] Failed to import: /Users/maverick/src/ezdihar/tpch-airflow/dags/dbt.py
Traceback (most recent call last):
File "/Users/maverick/.virtualenvs/tpch-airflow-efoi/lib/python3.10/site-packages/airflow/models/dagbag.py", line 339, in parse
loader.exec_module(new_module)
File "<frozen importlib._bootstrap_external>", line 883, in exec_module
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File "/Users/maverick/src/ezdihar/tpch-airflow/dags/dbt.py", line 5, in <module>
dag = AirflowDagFactory(path.dirname(path.abspath(__file__)), "base").create()
File "/Users/maverick/.virtualenvs/tpch-airflow-efoi/lib/python3.10/site-packages/dbt_airflow_factory/airflow_dag_factory.py", line 94, in create
self.create_tasks()
File "/Users/maverick/.virtualenvs/tpch-airflow-efoi/lib/python3.10/site-packages/dbt_airflow_factory/airflow_dag_factory.py", line 104, in create_tasks
start = self._create_starting_task()
File "/Users/maverick/.virtualenvs/tpch-airflow-efoi/lib/python3.10/site-packages/dbt_airflow_factory/airflow_dag_factory.py", line 118, in _create_starting_task
return self._builder.create_seed_task()
File "/Users/maverick/.virtualenvs/tpch-airflow-efoi/lib/python3.10/site-packages/dbt_airflow_factory/tasks_builder/builder.py", line 63, in create_seed_task
return self.operator_builder.create("dbt_seed", "seed")
File "/Users/maverick/.virtualenvs/tpch-airflow-efoi/lib/python3.10/site-packages/dbt_airflow_factory/k8s/k8s_operator.py", line 52, in create
return self._create(self._prepare_arguments(command, model, additional_dbt_args), name)
File "/Users/maverick/.virtualenvs/tpch-airflow-efoi/lib/python3.10/site-packages/dbt_airflow_factory/k8s/k8s_operator.py", line 75, in _create
return KubernetesPodOperator(
File "/Users/maverick/.virtualenvs/tpch-airflow-efoi/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 411, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
File "/Users/maverick/.virtualenvs/tpch-airflow-efoi/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 243, in __init__
raise AirflowException(
airflow.exceptions.AirflowException: Specifying resources for the launched pod with 'resources' is deprecated. Use 'container_resources' instead.
While trying to run dag on k8s on-premise with in_cluster
flag:
image_pull_policy: IfNotPresent
namespace: change_me
labels:
runner: airflow
envs:
POSTGRES_USER: "{{ var.value.get('POSTGRES_USER', 'default') }}"
POSTGRES_PASSWORD: "{{ var.value.get('POSTGRES_PASSWORD', 'default') }}"
is_delete_operator_pod: True
in_cluster: True
resources:
limit:
memory: 512M
cpu: 1
requests:
memory: 512M
cpu: 1
I'm getting an error
Invalid connection configuration. Options kube_config_path, kube_config, in_cluster are mutually exclusive. You can only use one option at a time.
The sanity check was to run a simple KPO with a standard config that works fine:
from pendulum import datetime, duration
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2022, 1, 1),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": duration(minutes=5),
}
with DAG(
dag_id="example_kubernetes_pod", schedule="@once", default_args=default_args
) as dag:
k = KubernetesPodOperator(
namespace="change_me",
image="hello-world",
name="airflow-test-pod",
task_id="task-one",
in_cluster=True, # if set to true, will look in the cluster, if false, looks for a file
is_delete_operator_pod=True,
get_logs=True,
config_file=None,
)
The problem raises due to this default value, which triggers connection config check
The easiest solution for this kind of problem it's to change the default value from "~/.kube/config"
to None like it's done inside KPO
I've tried to reset the default value by overwriting config_file
with None
inside yaml file but due to serialization this cannot be done because the parser will change this to a string rather than leave the original form
Hi there,
I seem to come across an issue with retry_delay after following the documentation and tests. I know its not much to go on, but is there a reason why I get default_args error?
Broken DAG: [/opt/airflow/dags/dag.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/dbt_airflow_factory/airflow_dag_factory.py", line 58, in init
self.airflow_config = self._read_config(dag_path, env, airflow_config_file_name)
File "/home/airflow/.local/lib/python3.7/site-packages/dbt_airflow_factory/airflow_dag_factory.py", line 114, in _read_config
if "retry_delay" in config["default_args"]:
KeyError: 'default_args'
Not all packages required by dbt-airflow-factory
are installed by default. Steps to reproduce:
python -c "from dbt_airflow_factory.airflow_dag_factory import AirflowDagFactory"
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "<REDACTED>/.venv/lib/python3.11/site-packages/dbt_airflow_factory/airflow_dag_factory.py", line 5, in <module>
from airflow import DAG
ModuleNotFoundError: No module named 'airflow'
BashExecutionParameters
does not accept kwargs similar to other classes such as EcsExecutionParameters
and KubernetesExecutionParameters
. This leads to failures whenever bash.yml
is specified:
Traceback (most recent call last):
File "<REDACTED>/dags/dbt_airflow_factory_dag.py", line 4, in <module>
dag = AirflowDagFactory(path.dirname(path.abspath(__file__)), "dev").create()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<REDACTED>/.venv/lib/python3.11/site-packages/dbt_airflow_factory/airflow_dag_factory.py", line 69, in __init__
).create()
^^^^^^^^
File "<REDACTED>/.venv/lib/python3.11/site-packages/dbt_airflow_factory/builder_factory.py", line 77, in create
self._create_operator_builder(execution_env_type, dbt_params),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<REDACTED>/.venv/lib/python3.11/site-packages/dbt_airflow_factory/builder_factory.py", line 108, in _create_operator_builder
BashExecutionParametersLoader.create_config(
File "<REDACTED>/.venv/lib/python3.11/site-packages/dbt_airflow_factory/bash/bash_parameters_loader.py", line 12, in create_config
return BashExecutionParameters(**config)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: BashExecutionParameters.__init__() got an unexpected keyword argument 'type'
First of all, amazing work, glad to have come across this library.
Had a question regarding the usage, it mentions a structure to follow, where the manifest.json
file of DBT is generated. By default DBT generates this inside <root dbt dir>/target/
directory.
Am I right in thinking that the structure outlined needs to sit inside <root dbt dir>/target/
? So like the below:
<root dbt dir>
├── target
│ ├── config
│ │ ├── airflow.yml
│ │ ├── dbt.yml
│ │ └── k8s.yml
│ └── dev
│ └── dbt.yml
│ ├── manifest.json
│ └── dag.py
Would we also need to put the entire DBT directory inside the Airflow DAGs directory? Unless we're saying we can change where the DBT manifest.json
file is generated? If so, how do we do that?
Hello - I am sorry if this was already answered, but is there a way to get this to work with Managed Airflow in AWS (Amazon MWAA)?
Hi there!
We (Astronomer) we would like to cross-link to this project and were wondering if this project also works with dbt-cloud? :-)
Thanks!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.