Giter VIP home page Giter VIP logo

prefect-dbt's Introduction

Note

Active development of this project has moved within PrefectHQ/prefect. The code can be found here and documentation here. Please open issues and PRs against PrefectHQ/prefect instead of this repository.

prefect-dbt

PyPI

Visit the full docs here to see additional examples and the API reference.

With prefect-dbt, you can easily trigger and monitor dbt Cloud jobs, execute dbt Core CLI commands, and incorporate other services, like Snowflake, into your dbt runs!

Check out the examples below to get started!

Getting Started

Be sure to install prefect-dbt and save a block to run the examples below!

Integrate dbt Cloud jobs with Prefect flows

If you have an existing dbt Cloud job, take advantage of the flow, run_dbt_cloud_job.

This flow triggers the job and waits until the job run is finished.

If certain nodes fail, run_dbt_cloud_job efficiently retries the specific, unsuccessful nodes.

from prefect import flow

from prefect_dbt.cloud import DbtCloudJob
from prefect_dbt.cloud.jobs import run_dbt_cloud_job

@flow
def run_dbt_job_flow():
    result = run_dbt_cloud_job(
        dbt_cloud_job=DbtCloudJob.load("my-block-name"),
        targeted_retries=5,
    )
    return result

run_dbt_job_flow()

Integrate dbt Core CLI commands with Prefect flows

prefect-dbt also supports execution of dbt Core CLI commands.

To get started, if you don't have a DbtCoreOperation block already saved, set the commands that you want to run; it can include a mix of dbt and non-dbt commands.

Then, optionally specify the project_dir.

If profiles_dir is unset, it will try to use the DBT_PROFILES_DIR environment variable. If that's also not set, it will use the default directory $HOME/.dbt/.

Using an existing profile

If you already have an existing dbt profile, specify the profiles_dir where profiles.yml is located.

from prefect import flow
from prefect_dbt.cli.commands import DbtCoreOperation

@flow
def trigger_dbt_flow() -> str:
    result = DbtCoreOperation(
        commands=["pwd", "dbt debug", "dbt run"],
        project_dir="PROJECT-DIRECTORY-PLACEHOLDER",
        profiles_dir="PROFILES-DIRECTORY-PLACEHOLDER"
    ).run()
    return result

trigger_dbt_flow()

Writing a new profile

To setup a new profile, first save and load a DbtCliProfile block and use it in DbtCoreOperation.

Then, specify profiles_dir where profiles.yml will be written.

from prefect import flow
from prefect_dbt.cli import DbtCliProfile, DbtCoreOperation

@flow
def trigger_dbt_flow():
    dbt_cli_profile = DbtCliProfile.load("DBT-CORE-OPERATION-BLOCK-NAME-PLACEHOLDER")
    with DbtCoreOperation(
        commands=["dbt debug", "dbt run"],
        project_dir="PROJECT-DIRECTORY-PLACEHOLDER",
        profiles_dir="PROFILES-DIRECTORY-PLACEHOLDER",
        dbt_cli_profile=dbt_cli_profile,
    ) as dbt_operation:
        dbt_process = dbt_operation.trigger()
        # do other things before waiting for completion
        dbt_process.wait_for_completion()
        result = dbt_process.fetch_result()
    return result

trigger_dbt_flow()

Resources

If you need help getting started with or using dbt, please consult the dbt documentation.

Installation

To use prefect-dbt with dbt Cloud:

pip install prefect-dbt

To use dbt Core (CLI):

pip install "prefect-dbt[cli]"

To use dbt Core with Snowflake profiles:

pip install "prefect-dbt[snowflake]"

To use dbt Core with BigQuery profiles:

pip install "prefect-dbt[bigquery]"

To use dbt Core with Postgres profiles:

pip install "prefect-dbt[postgres]"

!!! warning "Some dbt Core profiles require additional installation"

According to dbt's [Databricks setup page](https://docs.getdbt.com/reference/warehouse-setups/databricks-setup), users must first install the adapter:

```bash
pip install dbt-databricks
```

Check out the [desired profile setup page](https://docs.getdbt.com/reference/profiles.yml) on the sidebar for others.

Requires an installation of Python 3.8+.

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

These tasks are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the Prefect documentation.

Saving credentials to block

Note, to use the load method on Blocks, you must already have a block document saved through code or saved through the UI.

!!! info "Registering blocks"

Register blocks in this module to
[view and edit them](https://docs.prefect.io/ui/blocks/)
on Prefect Cloud:

```bash
prefect block register -m prefect_dbt
```

A list of available blocks in prefect-dbt and their setup instructions can be found here.

dbt Cloud

To create a dbt Cloud credentials block:

  1. Head over to your dbt Cloud profile.
  2. Login to your dbt Cloud account.
  3. Scroll down to "API" or click "API Access" on the sidebar.
  4. Copy the API Key.
  5. Click Projects on the sidebar.
  6. Copy the account ID from the URL: https://cloud.getdbt.com/settings/accounts/<ACCOUNT_ID>.
  7. Create a short script, replacing the placeholders.
from prefect_dbt.cloud import DbtCloudCredentials

DbtCloudCredentials(
    api_key="API-KEY-PLACEHOLDER",
    account_id="ACCOUNT-ID-PLACEHOLDER"
).save("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")

Then, to create a dbt Cloud job block:

  1. Head over to your dbt home page.
  2. On the top nav bar, click on Deploy -> Jobs.
  3. Select a job.
  4. Copy the job ID from the URL: https://cloud.getdbt.com/deploy/<ACCOUNT_ID>/projects/<PROJECT_ID>/jobs/<JOB_ID>
  5. Create a short script, replacing the placeholders.
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob

dbt_cloud_credentials = DbtCloudCredentials.load("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")
dbt_cloud_job = DbtCloudJob(
    dbt_cloud_credentials=dbt_cloud_credentials,
    job_id="JOB-ID-PLACEHOLDER"
).save("JOB-BLOCK-NAME-PLACEHOLDER")

Congrats! You can now easily load the saved block, which holds your credentials:

from prefect_dbt.cloud import DbtCloudJob

DbtCloudJob.load("JOB-BLOCK-NAME-PLACEHOLDER")

dbt Core CLI

!!! info "Available TargetConfigs blocks"

The following may vary slightly depending on the service you want to incorporate.

Visit the [API Reference](cli/configs/base) to see other built-in `TargetConfigs` blocks.

If the desired service profile is not available, check out the
[Examples Catalog](examples_catalog/#clicredentials-module) to see how you can
build one from the generic `TargetConfigs` class.

To create dbt Core target config and profile blocks for BigQuery:

  1. Save and load a GcpCredentials block.
  2. Determine the schema / dataset you want to use in BigQuery.
  3. Create a short script, replacing the placeholders.
from prefect_gcp.credentials import GcpCredentials
from prefect_dbt.cli import BigQueryTargetConfigs, DbtCliProfile

credentials = GcpCredentials.load("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")
target_configs = BigQueryTargetConfigs(
    schema="SCHEMA-NAME-PLACEHOLDER",  # also known as dataset
    credentials=credentials,
)
target_configs.save("TARGET-CONFIGS-BLOCK-NAME-PLACEHOLDER")

dbt_cli_profile = DbtCliProfile(
    name="PROFILE-NAME-PLACEHOLDER",
    target="TARGET-NAME-placeholder",
    target_configs=target_configs,
)
dbt_cli_profile.save("DBT-CLI-PROFILE-BLOCK-NAME-PLACEHOLDER")

Then, to create a dbt Core operation block:

  1. Determine the dbt commands you want to run.
  2. Create a short script, replacing the placeholders.
from prefect_dbt.cli import DbtCliProfile, DbtCoreOperation

dbt_cli_profile = DbtCliProfile.load("DBT-CLI-PROFILE-BLOCK-NAME-PLACEHOLDER")
dbt_core_operation = DbtCoreOperation(
    commands=["DBT-CLI-COMMANDS-PLACEHOLDER"],
    dbt_cli_profile=dbt_cli_profile,
    overwrite_profiles=True,
)
dbt_core_operation.save("DBT-CORE-OPERATION-BLOCK-NAME-PLACEHOLDER")

Congrats! You can now easily load the saved block, which holds your credentials:

from prefect_dbt.cloud import DbtCoreOperation

DbtCoreOperation.load("DBT-CORE-OPERATION-BLOCK-NAME-PLACEHOLDER")

Feedback

If you encounter any bugs while using prefect-dbt, feel free to open an issue in the prefect-dbt repository.

If you have any questions or issues while using prefect-dbt, you can find help in the Prefect Slack community.

Feel free to star or watch prefect-dbt for updates, too!

Contributing

If you'd like to help contribute to fix an issue or add a feature to prefect-dbt, please propose changes through a pull request from a fork of the repository.

Here are the steps:

  1. Fork the repository
  2. Clone the forked repository
  3. Install the repository and its dependencies:
pip install -e ".[dev]"
  1. Make desired changes
  2. Add tests
  3. Insert an entry to CHANGELOG.md
  4. Install pre-commit to perform quality checks prior to commit:
pre-commit install
  1. git commit, git push, and create a pull request

prefect-dbt's People

Contributors

ahuang11 avatar antupis avatar belasobral93 avatar chrisguidry avatar dependabot[bot] avatar desertaxle avatar discdiver avatar jawnsy avatar jsopkin avatar prefect-collection-synchronizer[bot] avatar robfreedy avatar seanpwlms avatar urimandujano avatar zzstoatzz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

prefect-dbt's Issues

Schema was not saved when creating TargetConfig block

The schema was not saved after I created a TargetConfig with Python code.

from prefect_dbt.cli.configs.base import TargetConfigs

myconfig = TargetConfigs(schema="abc", type="snowflake")
myconfig.save("testconf", True)

UI:
image

Version:
prefect v2.3.0 with prefect-dbt v0.2.0

add `trigger_dbt_cli_command` example to docs

Expectation / Proposal

I think it would be helpful to have a section(or subsection) with a code snippet example and explanation of the trigger_dbt_cli_command. It is helpful for running a one off dbt cli command, but I had to read the prefect-dbt/cli/commands.py file to figure out what it was and how to use it. I'm willing to contribute this to the readme.

Traceback / Example

Execute user testing plan

follow through and kicking the tires of code so far and verify the UX is elegant in both how you feel confident it's working correctly and is maximally helpful when things are going wrong

Manage Compute engine credentials in addition of oauth2

Hi Prefect-dbt team,

Following this tread #56
I tried to authenticate within a kubernetes pod associated to a valid service account.
google.auth.default() returns a compute engine credentials https://google-auth.readthedocs.io/en/master/reference/google.auth.compute_engine.credentials.html

Then the code fails because of 'Credentials' object has no attribute 'refresh_token'
From what I understood the code expects a https://google-auth.readthedocs.io/en/stable/reference/google.oauth2.credentials.html

Would be super nice to be able to use the compute_engine credentials

DBT improvements: dbt_build_task

This method in the DBT Prefect collection will run the "dbt build" command as a task and create a summarization artifact based on the results.json of the dbt build command.

  • Implementation
  • Unit Tests
  • QA
  • Merge

Execute a dbt CLI command

Running the given sample code for 'Execute a dbt CLI command' in docs

from prefect import flow
from prefect_dbt.cli.commands import trigger_dbt_cli_command

@flow
def trigger_dbt_cli_command_flow() -> str:
    result = trigger_dbt_cli_command("dbt debug")
    return result # Returns the last line the in CLI output

trigger_dbt_cli_command_flow()

Results in the error


esponse: {'exception_message': 'Internal Server Error'}
For more information check: https://httpstatuses.com/500
Worker information:
    Approximate queue length: 0
    Pending log batch length: 6
    Pending log batch size: 2160
The log worker will attempt to send these logs again in 2.0s

the models runs fine from CLI , running it from prefect-dbt results in the error

Why this matters and technical considerations

Developer Pain Points:

  • I as a developer don't want to fiddle with creating custom python api calls when something more established should exist with Prefect
  • When I create my own homegrown api call from Prefect to dbt Cloud, I don't have full confidence what's a Prefect error vs. dbt Cloud error
  • I don't want to maintain integration code and/or my own api calls when the open source community may have better implementations
  • I as a developer don't know all the potential workflows dbt Cloud + Prefect can support
  • Data engineers and Analytics Engineers play baton pass and not hot potato to schedule dbt Cloud jobs elegantly via Prefect

Success Criteria:

  • "Plug and play" my dbt Cloud connection and job configuration
  • Secure: my api token isn't in plain text
  • A verified badge on the prefect registry
  • dbt Cloud jobs run successfully
  • Docs to celebrate and empower developers
    • This should also help us clarify how prefect and dbt can be complementary

Technical Considerations:

  • Store api token in environment variable?
  • Send temporary override dbt command steps that don't permanently override the job configuration: here
  • Will leverage battle-tested V2 dbt Cloud API
  • Sending useragent metadata in the API request to track Prefect usage with dbt Cloud
  • How is pagination handled in v2 dbt Cloud API?
    • you need to use the offset/limit/count and build it yourself in a loop: One main function and one that does the pagination
    • Example:GET /items?limit=20&offset=100

Prefect dbt 0.2.6 coroutine error

Hi @ahuang11

Following this issue Sadly it's still doesn't work, got a new error

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1339, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect_dbt/cli/commands.py", line 139, in trigger_dbt_cli_command
    yaml.dump(profile, f, default_flow_style=False)
  File "/usr/local/lib/python3.10/site-packages/yaml/__init__.py", line 253, in dump
    return dump_all([data], stream, Dumper=Dumper, **kwds)
  File "/usr/local/lib/python3.10/site-packages/yaml/__init__.py", line 241, in dump_all
    dumper.represent(data)
  File "/usr/local/lib/python3.10/site-packages/yaml/representer.py", line 27, in represent
    node = self.represent_data(data)
  File "/usr/local/lib/python3.10/site-packages/yaml/representer.py", line 48, in represent_data
    node = self.yaml_representers[data_types[0]](self, data)
  File "/usr/local/lib/python3.10/site-packages/yaml/representer.py", line 207, in represent_dict
    return self.represent_mapping('tag:yaml.org,2002:map', data)
  File "/usr/local/lib/python3.10/site-packages/yaml/representer.py", line 118, in represent_mapping
    node_value = self.represent_data(item_value)
  File "/usr/local/lib/python3.10/site-packages/yaml/representer.py", line 48, in represent_data
    node = self.yaml_representers[data_types[0]](self, data)
  File "/usr/local/lib/python3.10/site-packages/yaml/representer.py", line 207, in represent_dict
    return self.represent_mapping('tag:yaml.org,2002:map', data)
  File "/usr/local/lib/python3.10/site-packages/yaml/representer.py", line 118, in represent_mapping
    node_value = self.represent_data(item_value)
  File "/usr/local/lib/python3.10/site-packages/yaml/representer.py", line 48, in represent_data
    node = self.yaml_representers[data_types[0]](self, data)
  File "/usr/local/lib/python3.10/site-packages/yaml/representer.py", line 207, in represent_dict
    return self.represent_mapping('tag:yaml.org,2002:map', data)
  File "/usr/local/lib/python3.10/site-packages/yaml/representer.py", line 118, in represent_mapping
    node_value = self.represent_data(item_value)
  File "/usr/local/lib/python3.10/site-packages/yaml/representer.py", line 52, in represent_data
    node = self.yaml_multi_representers[data_type](self, data)
  File "/usr/local/lib/python3.10/site-packages/yaml/representer.py", line 317, in represent_object
    reduce = data.__reduce_ex__(2)
TypeError: cannot pickle 'coroutine' object

I guess it's related to the async function definition but don't know how to fix it

Saving `DbtCliProfile` with `prefect_sqlalchemy.DatabaseCredentials` raises `TypeError: Type is not JSON serializable: URL`

I'm trying to persist a DbtCliProfile block using a prefect_sqlalchemy.DatabaseCredentials, but running into a TypeError: Type is not JSON serializable: URL error.

Any ideas what's going wrong?

DEFAULT_BLOCK = "default"

credentials = DatabaseCredentials(
    driver=AsyncDriver.POSTGRESQL_ASYNCPG,
    username=os.environ.get("POSTGRES_USER", DEFAULT_BLOCK),
    password=os.environ.get("POSTGRES_PASSWORD", DEFAULT_BLOCK),
    database=os.environ.get("POSTGRES_DATABASE", DEFAULT_BLOCK),
    host=os.environ.get("POSTGRES_HOST", DEFAULT_BLOCK),
    port=os.environ.get("POSTGRES_PORT", DEFAULT_BLOCK),
)
credentials.save(DEFAULT_BLOCK)

dbt_cli_profile = DbtCliProfile(
    name="dbt_dwh",
    target=DEFAULT_BLOCK,
    target_configs=PostgresTargetConfigs(
        credentials=DatabaseCredentials.load(DEFAULT_BLOCK), 
        schema="public"
    ),
)
dbt_cli_profile.save(name="dbt_dwh")
  File "/Users/markns/Library/Caches/pypoetry/virtualenvs/alakazam-vR_Oc5bv-py3.10/lib/python3.10/site-packages/prefect/orion/utilities/schemas.py", line 124, in orjson_dumps
    return orjson.dumps(v, default=default).decode()
TypeError: Type is not JSON serializable: URL

Add easy way to define DBT-profile in the code

Currently defining the DBT profile inside code is kinda hassle especially if you have multiple steps at flow. When creating flow you need to specify profiles_dir and call overwrite_profiles so that code is truly deterministic.

    credentials = DatabaseCredentials(
        driver=SyncDriver.POSTGRESQL_PSYCOPG2,
        username="postgres",
        password="password",
        database="postgres",
        host="localhost",
        port=5432,
    )

    target_configs = PostgresTargetConfigs(credentials=credentials, schema="postgres")

    dbt_cli_profile = DbtCliProfile(
        type="postgres", name="jaffa", target="jaffa_test", target_configs=target_configs
    )

    deps = trigger_dbt_cli_command(
        "dbt deps",
        profiles_dir=".",
        overwrite_profiles=True,
        project_dir=f"{project_root}/dbt/",
        dbt_cli_profile=dbt_cli_profile,
    )

    hesiod_etl = trigger_dbt_cli_command(
        f"dbt run -s +exposure:hesiod_etl --vars '{{client_key: {client_key}}}'",
        project_dir=f"{project_root}/dbt/",
        profiles_dir=".",
    )

When you could do something simple like pass dbt_cli_profile at every call. Which is not DRY but much cleaner than the current way.

    credentials = DatabaseCredentials(
        driver=SyncDriver.POSTGRESQL_PSYCOPG2,
        username="postgres",
        password="password",
        database="postgres",
        host="localhost",
        port=5432,
    )

    target_configs = PostgresTargetConfigs(credentials=credentials, schema="postgres")

    dbt_cli_profile = DbtCliProfile(
        type="postgres", name="jaffa", target="jaffa_test", target_configs=target_configs
    )

    deps = trigger_dbt_cli_command(
        "dbt deps",
        overwrite_profiles=True,
        dbt_cli_profile=dbt_cli_profile,
    )

    hesiod_etl = trigger_dbt_cli_command(
        f"dbt run -s +exposure:hesiod_etl --vars '{{client_key: {client_key}}}'",
        project_dir=f"{project_root}/dbt/",
        dbt_cli_profile=dbt_cli_profile,
    )

Show polling status updates selectively by comparing previous status vs current status

In #95 we made it debug level, but maybe we can be smarter about when to emit the logs; e.g. comparing current state to previous state


        last_state = state = current_state
        t0 = time.time()

        while state not in until_states:
            job_run = await run_sync_in_worker_thread(
                job_service_client.get_custom_job, name=full_job_name,
            )
            state = job_run.state
            if state != last_state:
                self.logger.info(f"{self._log_prefix}: State is now {state.name}.")
                last_state = state
            else:
                # Intermittently, the job will not be described. We want to respect the
                # watch timeout though.
                self.logger.debug(f"{self._log_prefix}: Job not found.")

            elapsed_time = time.time() - t0
            if timeout is not None and elapsed_time > timeout:
                raise RuntimeError(
                    f"Timed out after {elapsed_time}s while watching job for states "
                    "{until_states!r}"
                )
            time.sleep(self.job_watch_poll_interval)

dbt flow fails to sync back

Hi there,

I think I hit a strange bug :|

Bug summary:
I'm defining a flow that triggers a dbt cloud job, which should wait until it finishes. Fails instead.

Repro:
prefect 2.4.0
prefect-dbt 0.2.2
python 3.8.10

from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion
import constants


@flow(name="trigger_dbt_cloud_job")
def trigger_dbt_cloud_job():
    run_result = trigger_dbt_cloud_job_run_and_wait_for_completion(
        dbt_cloud_credentials=DbtCloudCredentials(
            api_key=constants.dbt["auth_token"],
            account_id=int(constants.dbt["account_id"]),
            domain=constants.dbt["domain"]
        ),
        job_id=int(constants.dbt["jobs"]["qa"]["dev_test_job"])
    )


trigger_dbt_cloud_job()

Expected behavior: trigger the flow and wait until it finishes.
Actual behavior: the dbt cloud job gets triggered, but the prefect flow fails while checking for job status.

Complete error message:

16:52:54.757 | INFO    | prefect.engine - Created flow run 'ingenious-chameleon' for flow 'trigger_dbt_cloud_job'
16:52:55.067 | INFO    | Flow run 'ingenious-chameleon' - Created subflow run 'manipulative-newt' for flow 'Trigger dbt Cloud job run and wait for completion'
16:52:55.119 | ERROR   | Flow run 'manipulative-newt' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/engine.py", line 588, in orchestrate_flow_run
    result = await flow_call()
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect_dbt/cloud/jobs.py", line 287, in trigger_dbt_cloud_job_run_and_wait_for_completion
    run_id_future = get_run_id.submit(triggered_run_data_future)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/tasks.py", line 492, in submit
    return enter_task_run_engine(
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/engine.py", line 740, in enter_task_run_engine
    return flow_run_context.sync_portal.call(begin_run)
AttributeError: 'NoneType' object has no attribute 'call'
16:52:55.152 | INFO    | Flow run 'manipulative-newt' - Created task run 'Trigger dbt Cloud job run-e4229069-0' for task 'Trigger dbt Cloud job run'
16:52:55.152 | INFO    | Flow run 'manipulative-newt' - Submitted task run 'Trigger dbt Cloud job run-e4229069-0' for execution.
16:52:55.783 | INFO    | Task run 'Trigger dbt Cloud job run-e4229069-0' - Triggering run for job with ID 2
16:52:56.490 | INFO    | Task run 'Trigger dbt Cloud job run-e4229069-0' - Run successfully triggered for job with ID 2. You can view the status of this run at https://<hidden_domain_on_purpose>.getdbt.com/#/accounts/1/projects/1/runs/579/
16:52:57.111 | INFO    | Task run 'Trigger dbt Cloud job run-e4229069-0' - Finished in state Completed()
16:52:57.162 | ERROR   | Flow run 'manipulative-newt' - Finished in state Failed('Flow run encountered an exception.')
16:52:57.163 | ERROR   | Flow run 'ingenious-chameleon' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/engine.py", line 595, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/bogdan/_dev/<hidden_domain_on_purpose>_corporate_it/orchestration-tools-poc/prefect/v2/main.py", line 9, in trigger_dbt_cloud_job
    run_result = trigger_dbt_cloud_job_run_and_wait_for_completion(
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/flows.py", line 384, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/engine.py", line 162, in enter_flow_run_engine_from_flow_call
    return run_async_from_worker_thread(begin_run)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 137, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/client.py", line 103, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/engine.py", line 521, in create_and_begin_subflow_run
    return terminal_state.result()
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 143, in result
    raise data
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/engine.py", line 588, in orchestrate_flow_run
    result = await flow_call()
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect_dbt/cloud/jobs.py", line 287, in trigger_dbt_cloud_job_run_and_wait_for_completion
    run_id_future = get_run_id.submit(triggered_run_data_future)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/tasks.py", line 492, in submit
    return enter_task_run_engine(
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/engine.py", line 740, in enter_task_run_engine
    return flow_run_context.sync_portal.call(begin_run)
AttributeError: 'NoneType' object has no attribute 'call'
16:52:57.194 | ERROR   | Flow run 'ingenious-chameleon' - Finished in state Failed('Flow run encountered an exception.')
Traceback (most recent call last):
  File "/home/bogdan/_dev/<hidden_domain_on_purpose>_corporate_it/orchestration-tools-poc/prefect/v2/main.py", line 19, in <module>
    trigger_dbt_cloud_job()
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/flows.py", line 384, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/engine.py", line 158, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/client.py", line 103, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/engine.py", line 238, in create_then_begin_flow_run
    return state.result()
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 143, in result
    raise data
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/engine.py", line 595, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/bogdan/_dev/<hidden_domain_on_purpose>_corporate_it/orchestration-tools-poc/prefect/v2/main.py", line 9, in trigger_dbt_cloud_job
    run_result = trigger_dbt_cloud_job_run_and_wait_for_completion(
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/flows.py", line 384, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/engine.py", line 162, in enter_flow_run_engine_from_flow_call
    return run_async_from_worker_thread(begin_run)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 137, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/client.py", line 103, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/engine.py", line 521, in create_and_begin_subflow_run
    return terminal_state.result()
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 143, in result
    raise data
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/engine.py", line 588, in orchestrate_flow_run
    result = await flow_call()
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect_dbt/cloud/jobs.py", line 287, in trigger_dbt_cloud_job_run_and_wait_for_completion
    run_id_future = get_run_id.submit(triggered_run_data_future)
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/tasks.py", line 492, in submit
    return enter_task_run_engine(
  File "/home/bogdan/.virtualenvs/orchestration-tools-poc-prefect-v2/lib/python3.8/site-packages/prefect/engine.py", line 740, in enter_task_run_engine
    return flow_run_context.sync_portal.call(begin_run)
AttributeError: 'NoneType' object has no attribute 'call'

Credentials missing when using SnowflakeTargetConfigs in CLIProfile

Expectation / Proposal

I define four block entities (Snowflake credentials, Snowflake connector, Snowflake target config and dbt CLI profile) and link them together. In this case credentials can be changed via block configuration.

I expect that the CLI profile holds the credential/connection information as well but it didn't. This is maybe related to the TargetConfigs which can't be linked to another block entity.

Traceback / Example

from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector

from prefect_dbt.cli.credentials import DbtCliProfile
from prefect_dbt.cli.configs import SnowflakeTargetConfigs


if __name__ == "__main__":
    credentials = SnowflakeCredentials(
        user="user",
        password="password",
        account="account.region.aws",
        role="role",
    )
    credentials.save("sf-cred", overwrite=True)

    connector = SnowflakeConnector(
        schema="public",
        database="database",
        warehouse="warehouse",
        credentials=SnowflakeCredentials.load("sf-cred"),
    )
    connector.save("sf-conn", overwrite=True)

    target_configs = SnowflakeTargetConfigs(
        connector=SnowflakeConnector.load("sf-conn"), schema="TEST"
    )
    target_configs.save("dbt-targetconf", overwrite=True)

    dbt_cli_profile = DbtCliProfile(
        name="jaffle_shop",
        target="dev",
        target_configs=SnowflakeTargetConfigs.load("dbt-targetconf"),
    )
    dbt_cli_profile.save("example", overwrite=True)

    dbt_cli_profile = DbtCliProfile.load("example").get_profile()

    print(dbt_cli_profile)

Output:
{'config': {}, 'jaffle_shop': {'target': 'dev', 'outputs': {'dev': {'type': 'snowflake', 'schema': 'TEST', 'threads': 4}}}}

image

Build a user testing plan

Think through what points and clicks the analytics engineer AND the data engineer have to go through to make ship a new data pipeline.

Does the UX feel elegant?

Think through what unit tests need to be made.

Clear Roles & Responsibilities

Engineers

  • Prefect Team
  • hands-on code
  • Alexander Streed
    • Andrew other teammate may help

Product Manager

  • Sung
  • leads vision and use case success
  • leads with clear documentation and examples
  • Equips engineer and user testing with context and docs
  • Pair program as needed
  • Lists OUT OF SCOPE work

Developer Experience

  • Isabela
  • Tests the development branch with dry runs with prefect example pipeline code
  • Equips engineer and user testing with context and docs
  • Pair program as needed

TargetConfigs' credentials gets dropped upon loading a DbtCliProfile

from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_dbt.cli import BigQueryTargetConfigs, DbtCliProfile

ENV = "abc"

@flow
def test_flow():
    credentials = GcpCredentials.load('gcp-credentials')
    bq_target_dict = {
        'schema': f'zone_hawk_{ENV}',
        'project': 'my_gcp_project',
        'threads': 20,
        'credentials': credentials
    }

    bq_target_configs = BigQueryTargetConfigs(**bq_target_dict)
    bq_target_configs.save(f'bq-target-configs-{ENV}', overwrite=True)

    target_configs = BigQueryTargetConfigs.load(f'bq-target-configs-{ENV}')
    dbt_cli_dict = {
        'name': 'bigbird',
        'target': f'{ENV}',
        'target_configs': target_configs
    }
    dbt_cli_profile = DbtCliProfile(**dbt_cli_dict)
    dbt_cli_profile.save(f'dbt-cli-profile-{ENV}', overwrite=True)
    print(dbt_cli_profile)

    dbt_cli_profile2 = DbtCliProfile.load(f"dbt-cli-profile-{ENV}")
    return dbt_cli_profile2

test_flow()

Returns:


DbtCliProfile(name='bigbird', target='abc', target_configs=TargetConfigs(extras=None, type='bigquery', threads=20), global_configs=None)

Expected:


DbtCliProfile(name='bigbird', target='abc', target_configs=BigQueryTargetConfigs(extras=None, type='bigquery', threads=20, project='my_gcp_project', credentials=GcpCredentials(service_account_file=None, service_account_info=SecretDict('{'type': '**********', 'auth_uri': '**********', 'client_id': '**********', 'token_uri': '**********', 'project_id': '**********', 'private_key': '**********', 'client_email': '**********', 'private_key_id': '**********', 'client_x509_cert_url': '**********', 'auth_provider_x509_cert_url': '**********'}'), project='myproject')), global_configs=None)

`ValueError`: The keyword, `_is_anonymous`, has already been provided in `TargetConfigs`

_is_anonymous is being included in the result of _populate_configs_json which makes it possible that other block metadata fields are also being included. Block metadata fields shouldn't be included in dbt profiles generated by this collection.

Full stacktrace of the error:

14:07:11.533 | ERROR   | Task run 'trigger_dbt_cli_command-321ca940-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/d.hinnenkamp/Desktop/test-prefect/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1185, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/Users/d.hinnenkamp/Desktop/test-prefect/.venv/lib/python3.8/site-packages/prefect_dbt/cli/commands.py", line 136, in trigger_dbt_cli_command
    profile = dbt_cli_profile.get_profile()
  File "/Users/d.hinnenkamp/Desktop/test-prefect/.venv/lib/python3.8/site-packages/prefect_dbt/cli/credentials.py", line 107, in get_profile
    "outputs": {self.target: self.target_configs.get_configs()},
  File "/Users/d.hinnenkamp/Desktop/test-prefect/.venv/lib/python3.8/site-packages/prefect_dbt/cli/configs/snowflake.py", line 78, in get_configs
    configs_json = super().get_configs()
  File "/Users/d.hinnenkamp/Desktop/test-prefect/.venv/lib/python3.8/site-packages/prefect_dbt/cli/configs/base.py", line 53, in get_configs
    return self._populate_configs_json({}, self.dict())
  File "/Users/d.hinnenkamp/Desktop/test-prefect/.venv/lib/python3.8/site-packages/prefect_dbt/cli/configs/base.py", line 34, in _populate_configs_json
    configs_json = self._populate_configs_json(configs_json, value)
  File "/Users/d.hinnenkamp/Desktop/test-prefect/.venv/lib/python3.8/site-packages/prefect_dbt/cli/configs/base.py", line 37, in _populate_configs_json
    raise ValueError(
ValueError: The keyword, _is_anonymous, has already been provided in TargetConfigs; remove duplicated keywords to continue

Reproducible example:

    snowflake_connector_block = await SnowflakeConnector.load("wld-snowflake-connection")
    connector = snowflake_connector_block
    target_configs = SnowflakeTargetConfigs(
        connector=connector
    )
    dbt_cli_profile = DbtCliProfile(
        name="profile_name",
        target="dev",
        target_configs=target_configs
    )
    debug = await trigger_dbt_cli_command.submit(
        "dbt debug",
        profiles_dir='profile',
        project_dir='wld-dbt-warehouse/dbt-project',
        overwrite_profiles=True,
        dbt_cli_profile=dbt_cli_profile,
        return_state=True
    )

Originally reported by @mcfuhrt in Slack.

Issues with logging and dbt 1.4 (protobuf)

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

When running dbt tasks using prefect-dbt we started getting errors somewhat at random (after 100s of models ran) after updating dbt version to 1.4. The errors seems come from the logging either "AssertionError: feed_data after feed_eof" or "UnicodeDecodeError: 'utf-8' codec can't decode byte 0x93 in position 7998: invalid start byte". Rolling back to 1.3 of dbt resolves the issue. We suspect it has something to do with dbt changing it's logging to protobuf (https://github.com/dbt-labs/dbt-core/releases/tag/v1.4.0). The logs from dbt-core 1.4 are also printed in batch in Orion instead of one per line.

We searched through earlier issues with similar errors but this seems to have a different cause PrefectHQ/prefect#6335

Reproduction

dbt-core==1.4

Running dbt run via prefect_dbt.cli.commands.trigger_dbt_cli_command. A bigger run with 500+ models.

Error happens after a few hundered models have been ran.

Making the logging batches smaller by chaning PREFECT_LOGGING_ORION_BATCH_INTERVAL
PREFECT_LOGGING_ORION_BATCH_SIZE
PREFECT_LOGGING_ORION_MAX_LOG_SIZE
seemed to make the issue appear less frequently (more models ran before error appeared). The same for changes on the dbt side that made the logs smaller such as removing coloring.

Reverting to dbt-core==1.3 makes the issue go away.

Error

09:52:24.525 | ERROR   | asyncio - Exception in callback SubprocessStreamProtocol.pipe_data_received(1, b'\x1b[0m09:5...m in 3.18s]\n')
handle: <Handle SubprocessStreamProtocol.pipe_data_received(1, b'\x1b[0m09:5...m in 3.18s]\n')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/usr/local/lib/python3.10/asyncio/subprocess.py", line 72, in pipe_data_received
    reader.feed_data(data)
  File "/usr/local/lib/python3.10/asyncio/streams.py", line 456, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof

--

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1478, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect_dbt/cli/commands.py", line 158, in trigger_dbt_cli_command
    result = await shell_run_command.fn(command=command, **shell_run_command_kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect_shell/commands.py", line 89, in shell_run_command
    async for text in TextReceiveStream(process.stdout):
  File "/usr/local/lib/python3.10/site-packages/anyio/abc/_streams.py", line 31, in __anext__
    return await self.receive()
  File "/usr/local/lib/python3.10/site-packages/anyio/streams/text.py", line 44, in receive
    decoded = self._decoder.decode(chunk)
  File "/usr/local/lib/python3.10/codecs.py", line 322, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x93 in position 7998: invalid start byte

Versions

(From local env, we have the same issue when running dockerized in Azure on python 3.10 with postgress db)

Version:             2.7.11
API version:         0.8.4
Python version:      3.8.10
Git commit:          6b27b476
Built:               Thu, Feb 2, 2023 7:22 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.31.1

Additional context

No response

Postgres port set as string in profiles.yml

Version 0.3.0 creates an invalid dbt profile for postgres. Port value in the target config is set as string which fails dbt cli run. Profile is correct with version 0.2.7.

Example from invalid profile:
port: '5432'

Should be:
port: 5432

I'm using following function to run DBT commands:

def dbt_command(
    dbt_project_name: str,
    dbt_project_dir: str,
    database_credentials: DatabaseCredentials,
    command: str,
    threads=1,
):
    dbt_cli_profile = DbtCliProfile(
        name=dbt_project_name,
        target="dbt-runner-database",
        target_configs=PostgresTargetConfigs(
            credentials=database_credentials, schema="public", threads=threads
        ),
    )

    result = trigger_dbt_cli_command(
        command,
        profiles_dir=".",
        project_dir=dbt_project_dir,
        overwrite_profiles=True,
        dbt_cli_profile=dbt_cli_profile,
    )

    return result

PermissionError: [Errno 13] Permission denied: 'dbt_logs'

I am getiing the following error and I am not sure how to resolve.

Code:

from prefect import flow
from prefect_dbt.cli.commands import trigger_dbt_cli_command
import os

@flow
def trigger_dbt_cli_command_flow() -> str:
    os.chdir('../dbtcode/MDP')
    result = trigger_dbt_cli_command("dbt run --select example")
    return result # Returns the last line the in CLI output

trigger_dbt_cli_command_flow()

Error:


12:15:13.828 | INFO    | prefect.engine - Created flow run 'lavender-skunk' for flow 'trigger-dbt-cli-command-flow'
12:15:14.010 | INFO    | Flow run 'lavender-skunk' - Created task run 'trigger_dbt_cli_command-321ca940-0' for task 'trigger_dbt_cli_command'
12:15:14.011 | INFO    | Flow run 'lavender-skunk' - Executing 'trigger_dbt_cli_command-321ca940-0' immediately...
12:15:14.046 | INFO    | Task run 'trigger_dbt_cli_command-321ca940-0' - Running dbt command: dbt run --select example --profiles-dir /.dbt
12:15:16.386 | INFO    | Task run 'trigger_dbt_cli_command-321ca940-0' - 12:15:16  Encountered an error:
[Errno 13] Permission denied: 'dbt_logs'

12:15:16.503 | ERROR   | Task run 'trigger_dbt_cli_command-321ca940-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1062, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect_dbt/cli/commands.py", line 158, in trigger_dbt_cli_command
    result = await shell_run_command.fn(command=command, **shell_run_command_kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect_shell/commands.py", line 90, in shell_run_command
    raise RuntimeError(msg)
RuntimeError: Command failed with exit code 2:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/dbt/main.py", line 129, in main
    results, succeeded = handle_and_check(args)
  File "/usr/local/lib/python3.10/site-packages/dbt/main.py", line 192, in handle_and_check
    task, res = run_from_args(parsed)
  File "/usr/local/lib/python3.10/site-packages/dbt/main.py", line 225, in run_from_args
    log_manager.set_path(log_path)
  File "/usr/local/lib/python3.10/site-packages/dbt/logger.py", line 514, in set_path
    self._file_handler.set_path(path)
  File "/usr/local/lib/python3.10/site-packages/dbt/logger.py", line 405, in set_path
    make_log_dir_if_missing(log_dir)
  File "/usr/local/lib/python3.10/site-packages/dbt/logger.py", line 340, in make_log_dir_if_missing
    dbt.clients.system.make_directory(log_dir)
  File "/usr/local/lib/python3.10/site-packages/dbt/clients/system.py", line 112, in make_directory
    raise e
  File "/usr/local/lib/python3.10/site-packages/dbt/clients/system.py", line 106, in make_directory
    os.makedirs(path)
  File "/usr/local/lib/python3.10/os.py", line 225, in makedirs
    mkdir(name, mode)
PermissionError: [Errno 13] Permission denied: 'dbt_logs'

12:15:16.538 | ERROR   | Task run 'trigger_dbt_cli_command-321ca940-0' - Finished in state Failed('Task run encountered an exception.')
12:15:16.538 | ERROR   | Flow run 'lavender-skunk' - Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 566, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/prefectcode/dbt_example.py", line 8, in trigger_dbt_cli_command_flow
    result = trigger_dbt_cli_command("dbt run --select example")
  File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 294, in __call__
    return enter_task_run_engine(
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 705, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 822, in create_task_run_then_submit
    return await future._result()
  File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 227, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/usr/local/lib/python3.10/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1062, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect_dbt/cli/commands.py", line 158, in trigger_dbt_cli_command
    result = await shell_run_command.fn(command=command, **shell_run_command_kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect_shell/commands.py", line 90, in shell_run_command
    raise RuntimeError(msg)
RuntimeError: Command failed with exit code 2:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/dbt/main.py", line 129, in main
    results, succeeded = handle_and_check(args)
  File "/usr/local/lib/python3.10/site-packages/dbt/main.py", line 192, in handle_and_check
    task, res = run_from_args(parsed)
  File "/usr/local/lib/python3.10/site-packages/dbt/main.py", line 225, in run_from_args
    log_manager.set_path(log_path)
  File "/usr/local/lib/python3.10/site-packages/dbt/logger.py", line 514, in set_path
    self._file_handler.set_path(path)
  File "/usr/local/lib/python3.10/site-packages/dbt/logger.py", line 405, in set_path
    make_log_dir_if_missing(log_dir)
  File "/usr/local/lib/python3.10/site-packages/dbt/logger.py", line 340, in make_log_dir_if_missing
    dbt.clients.system.make_directory(log_dir)
  File "/usr/local/lib/python3.10/site-packages/dbt/clients/system.py", line 112, in make_directory
    raise e
  File "/usr/local/lib/python3.10/site-packages/dbt/clients/system.py", line 106, in make_directory
    os.makedirs(path)
  File "/usr/local/lib/python3.10/os.py", line 225, in makedirs
    mkdir(name, mode)
PermissionError: [Errno 13] Permission denied: 'dbt_logs'

12:15:16.573 | ERROR   | Flow run 'lavender-skunk' - Finished in state Failed('Flow run encountered an exception.')
Traceback (most recent call last):
  File "/prefectcode/dbt_example.py", line 11, in <module>
    trigger_dbt_cli_command_flow()
  File "/usr/local/lib/python3.10/site-packages/prefect/flows.py", line 384, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 152, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/usr/local/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 105, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 228, in create_then_begin_flow_run
    return state.result()
  File "/usr/local/lib/python3.10/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 566, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/prefectcode/dbt_example.py", line 8, in trigger_dbt_cli_command_flow
    result = trigger_dbt_cli_command("dbt run --select example")
  File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 294, in __call__
    return enter_task_run_engine(
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 705, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 822, in create_task_run_then_submit
    return await future._result()
  File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 227, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/usr/local/lib/python3.10/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1062, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect_dbt/cli/commands.py", line 158, in trigger_dbt_cli_command
    result = await shell_run_command.fn(command=command, **shell_run_command_kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect_shell/commands.py", line 90, in shell_run_command
    raise RuntimeError(msg)
RuntimeError: Command failed with exit code 2:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/dbt/main.py", line 129, in main
    results, succeeded = handle_and_check(args)
  File "/usr/local/lib/python3.10/site-packages/dbt/main.py", line 192, in handle_and_check
    task, res = run_from_args(parsed)
  File "/usr/local/lib/python3.10/site-packages/dbt/main.py", line 225, in run_from_args
    log_manager.set_path(log_path)
  File "/usr/local/lib/python3.10/site-packages/dbt/logger.py", line 514, in set_path
    self._file_handler.set_path(path)
  File "/usr/local/lib/python3.10/site-packages/dbt/logger.py", line 405, in set_path
    make_log_dir_if_missing(log_dir)
  File "/usr/local/lib/python3.10/site-packages/dbt/logger.py", line 340, in make_log_dir_if_missing
    dbt.clients.system.make_directory(log_dir)
  File "/usr/local/lib/python3.10/site-packages/dbt/clients/system.py", line 112, in make_directory
    raise e
  File "/usr/local/lib/python3.10/site-packages/dbt/clients/system.py", line 106, in make_directory
    os.makedirs(path)
  File "/usr/local/lib/python3.10/os.py", line 225, in makedirs
    mkdir(name, mode)
PermissionError: [Errno 13] Permission denied: 'dbt_logs'

Add highlighting to logged dbt Cloud URIs

Add color to dbt Cloud URIs that are logged in any of the tasks or flows in prefect-dbt. Prefect 2.0 doesn't currently have the ability to add color to terminal logs or the logs displayed in the UI. Additional core development is needed before this item can be implemented.

DBT run forever

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

DBT tasks will complete, but shows as running for forever.

Reproduction

dbt_cloud_account_id = Secret.load("dbt-cloud-account-id").get()
    dbt_cloud_token = Secret.load("dbt-cloud-token").get()
    run_result = trigger_dbt_cloud_job_run_and_wait_for_completion(
        dbt_cloud_credentials=DbtCloudCredentials(
            api_key=dbt_cloud_token,
            account_id=dbt_cloud_account_id
        ),
        job_id=99228
    )

Error


Versions


Version:             2.4.0
API version:         0.8.0
Python version:      3.10.4
Git commit:          513639e8
Built:               Tue, Sep 13, 2022 2:15 PM
OS/Arch:             darwin/x86_64
Profile:             garvis_prod
Server type:         cloud

Additional context

All the tasks show as succeeded, but the flow itself shows as running.

How to declare a DbtCliProfile Block

Hi 👋

I'm trying to use a DbtCliProfile block instead of defining everything in the code.
The thing is I don't know how to register it in prefect block library.

I tried to use the JSON block but got an error when loading the block.

dbt_cli_profile = DbtCliProfile.load("BLOCK_NAME").get_profile()

Do you have any inputs/docs on that specific point ?

Thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.