Giter VIP home page Giter VIP logo

prefect-gcp's Introduction

Note

Active development of this project has moved within PrefectHQ/prefect. The code can be found here and documentation here. Please open issues and PRs against PrefectHQ/prefect instead of this repository.

prefect-gcp

PyPI

prefect-gcp makes it easy to leverage the capabilities of Google Cloud Platform (GCP) in your flows, featuring support for Vertex AI, Cloud Run, BigQuery, Cloud Storage, and Secret Manager.

Visit the full docs here.

Installation

To start using prefect-gcp:

pip install prefect-gcp

To install extras, see here.

Contributing

Thanks for thinking about chipping in! Check out this step-by-step guide on how to get started.

prefect-gcp's People

Contributors

acgourley avatar ahuang11 avatar biancaines avatar dependabot[bot] avatar desertaxle avatar grf53 avatar hakimkac99 avatar iobruno avatar jakekaplan avatar japerry911 avatar jawnsy avatar jeremy-thomas-roc avatar jhamet93 avatar kevingrismore avatar kevmo avatar khoinguyen19k8 avatar parkedwards avatar peytonrunyan avatar prefect-collection-synchronizer[bot] avatar razvanvilceanu avatar rishav8989 avatar saint1991 avatar salman1993 avatar serinamarie avatar taljaards avatar urimandujano avatar wangjoshuah avatar willraphaelson avatar zanieb avatar zzstoatzz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

prefect-gcp's Issues

Persist Labels from Prefect to Vertex AI Job

Currently, Prefect passes labels to the Deployment object consisting of data like flow-id, flow-name, etc. It would be nice if this data can be specified in the Vertex AI Custom Job to allow capabilities like filtering by certain attributes (especially when there are too many jobs running to reasonably attribute a job to a flow for example).

Expectation / Proposal

Vertex AI Custom Jobs have labels set from the data specified by Prefect.

Path problems using GcsBucket with Windows

Bug Summary

While using the GcsBucket from the prefect_gcp.cloud_storage package on my Windows system,
I am having issues with writing to a folder in GCS using upload_from_path and also having problems downloading a directory using the get_directory function.

Issues

upload_path = gcs_block.upload_from_path(
        from_path=f"{path}",
        to_path="folder/file"
    )

won't upload to a folder, but adds a backslash to the file name in GCS.
From what I have seen in the source code it is caused by the use of the Path(), which on my machine is converting the forward slashes (/) to a backslash (\), and only then uploading to GCS - thus changing the name and not inserting into a folder correctly.
This also occurs if I try to set a default Bucket Folder on the Block

Furthermore, when trying to use the get_directory function:

folder_path = gcs_block.get_directory(
        from_path=gcs_path,
        local_path=f"../data/"
    )

The source code is using os.path and not pathlib, thus is having a problem copying the folder to the correct given local path as needed.
Is there a way to copy the files themselves without the prefixed path copied from the bucket?
and also the return file_path List is always empty even if it has created the files in the local system.

Is there a way to make it work in Windows or will it only work on a Posix system?

System

Host Computer

OS Name:                   Microsoft Windows 11 Pro
OS Version:                10.0.22621 N/A Build 22621
System Model:              ASUS TUF Dash F15
Python:                       Python 3.9.16

Prefect Version

Version:             2.7.7
API version:         0.8.4
Python version:      3.9.16
Git commit:          e8ca30b8
Built:               Fri, Jan 6, 2023 4:25 PM
OS/Arch:             win32/AMD64
Profile:             default
Server type:         hosted

Support Application default credentials

For the GCP library it looks like https://prefecthq.github.io/prefect-gcp/credentials/#prefect_gcp.credentials.GcpCredentials we always need to have the credential file. However when running on Google Services you can normally used an attached Application Default Credential and don't have to pass the credential along which is the recommended way for this in production. My reading of the docs indicate it only supports the https://googleapis.dev/python/google-auth/1.15.0/user-guide.html#service-account-private-key-files method currently.

It should be easy to support via the SDK. We normally do it via https://googleapis.dev/python/google-auth/1.15.0/user-guide.html#application-default-credentials.

import google.auth

credentials, project = google.auth.default()

vs

from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_file(
    '/path/to/key.json')

scoped_credentials = credentials.with_scopes(
    ['https://www.googleapis.com/auth/cloud-platform'])

Prefect 1.x supported ADC as well (https://github.com/PrefectHQ/prefect/blob/1.x/src/prefect/agent/vertex/agent.py#L73)

GcsBucket#get_directory should return the list of paths of downloaded files

Proposal

GcsBucket#get_directory should return the list of paths of downloaded files because this is the only way to know which files are newly downloaded from downstream tasks.

Example

Its signature may become like

async def get_directory(self, from_path: Optional[str] = None, local_path: Optional[str] = None ) -> List[Union[str, Path]]

Allow prefect-gcp classes to intake Credentials in addition to GcpCredentials

Right now, the prefect-gcp classes require a GcpCredentials object to instantiate, and GcpCredentials can only be created as a Block with a supplied service account key.

However for our use case, we prefer using application default credentials as such, rather than always having to supply service account keys.

Could we add support passing Credentials objects into prefect-gcp classes directly?

Example:

@flow
def run_vertex_custom_training_job():
    logger = get_run_logger()
    # Produces a Credentials object
    gcp_credentials = GcpCredentials().get_credentials_from_service_account()
    # Produces a GcpCredentials object
    # gcp_credentials = GcpCredentials().load("ds-cash-production")
    
    job = VertexAICustomTrainingJob(
        command=["echo", "hello world"],
        region="us-east1",
        image="us-docker.pkg.dev/cloudrun/container/job:latest",
        gcp_credentials=gcp_credentials,
    )
    job.run()

References:
VertexAICustomTrainingJob
GcpCredentials
load()

Forward slash ("/") gets replaced by ("\\") when executing on windows 11

Hi everyone, i'm having an issue with prefect on windows.
We've bumped into an issue that prefect-gcp is actually converting the / (forward slashes) into \ double backward slashes during the upload tasks,
Which translates into the blob being named, for example, as:
foo\\bar\\foobar.txt
Instead of actually:
foo/bar/foobar.txt
Which would be represented as "folders" in the GCS Web interface.
We're not sure if the same issue happens on macOS or Linux
We've tried not only the upload_from_path but also the upload_from_file_object.
Here's the full task in question :

@task(log_prints=True)
def write_gcs(path: Path) -> None:
    """Write data to GCS."""
    gcp_cloud_storage_bucket_block = GcsBucket.load("gcs-de-zoomcamp")
    print(f"Writing {path} to GCS")
    gcp_cloud_storage_bucket_block.upload_from_path(from_path= path, to_path= path)
    return

the variable path is created earlier as path = Path(f"data/{color}/{dataset_file}.parquet")

i've tried the following :

  • removing to_path
  • passing the path as a string (str(path))

I think part of the solution is that we need to pass the Path.as_posix() when Prefect is executed from windows so that it become os agnostic but i could be wrong
Thanks in advance for any feedback

Upgrade Cloud Run to use API v2

Hello!

I am working on upgrading the Cloud Run to use the API v2. Primarily I want this due to the new beta features of significantly longer timeout.
link to API docs

Expectation / Proposal

That we can use v2 Cloud Run API after this update.

  • I would like to help contribute a pull request to resolve this!
    • The PR is In Progress
    • I will most likely need editing help after opening PR, as I haven't contributed to this repo before. Making a lot of progress though, will share when ready for review. Is there anyone specific that will be doing review, that I can work with towards the end?

Thanks!

A `create` method for GcsBucket

When uploading files, if the bucket does not exist, it'll crash.

Expectation / Proposal

Automatically check if it exists; if not upon block initialization, create the bucket

Traceback / Example

Add listdir similar method to gcs bucket

I would like to be able to write conditions for what data should be taken from the gcp bucket. At the moment I am storing all data in separate directories (based on yyyy_mm names) on a single bucket and it would be helpful to have a way to check all the directories inside the bucket before downloading one based on a specific condition.

Vertex AI Custom Training Job does not show up as a Block option in the UI

Expectation / Proposal

VertexAICustomTrainingJob inherits from Infrastructure and also has the block slug name & type so I was expecting it to show up as an option when you go to the "Cloud UI > Blocks > Add Block". However, there is no infrastructure block option for Vertex AI (screenshot).

_block_type_name = "Vertex AI Custom Training Job"

Screen Shot 2023-02-03 at 8 56 16 AM

Traceback / Example

Add "gs://" to bucket path if it's not present

The cloud storage block currently asks for a bucket path. If you click on "copy the path to the clipboard" it will give you just the bucket name. If you provide just the bucket name to the block, you will get the error gcsfs.retry.HttpError: Invalid bucket name: <bucket_name>. If you then rename the path to gs://<bucket_name> it will work.

We should be able to handle this case without raising an error.

The provided path to the service account is invalid (type=value_error)

Bug summary

I was deploying a flow in a docker container by pulling it from Dockerhub then run it in Orion server on my local machine. The flow would download raw data, do some transformation then upload it to a GCP bucket. Everything runs well except for the GCP credential block.

Reproduction

Flows

Dockerfile

FROM prefecthq/prefect:2.7.7-python3.9

COPY docker-requirements.txt .
RUN pip install -r docker-requirements.txt --trusted-host pypi.python.org --no-cache-dir
COPY flows /opt/prefect/flows
RUN mkdir -p /opt/prefect/data/yellow

I built a deployment by using the following configuration for the Docker block

{
"image": "delvo1919/prefect:zoom",
"image_pull_policy": "ALWAYS",
"networks": ['host'],
"auto_remove": True
}

I created the GCP block with the following config

from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import GcsBucket

credentials_block = GcpCredentials(
    service_account_file="~/.google/credentials/google_credentials.json"  # enter your credentials info or use the file method.
)
credentials_block.save("zoom-gcp-creds", overwrite=True)


bucket_block = GcsBucket(
    gcp_credentials=GcpCredentials.load("zoom-gcp-creds"),
    bucket="my-bucket",  # insert your  GCS bucket name
)

bucket_block.save("zoom-gcs", overwrite=True)


Afterwards I built the deployment

docker_deploy.py

from prefect.deployments import Deployment
from prefect.infrastructure.docker import DockerContainer
from parameterized_flow import etl_parent_flow

docker_block = DockerContainer.load("zoom")

docker_dep = Deployment.build_from_flow(
    flow=etl_parent_flow, name="docker-flow", infrastructure=docker_block
)

if __name__ == "__main__":
    docker_dep.apply()

python docker_deploy.py

I then ran the deployment after pushing the docker image to dockerhub.

prefect deployment run etl-parent-flow/docker-flow

Prefect server

The server to run the flow is a standard Orion one.

prefect orion start

The agent is also standard.

prefect agent start -q default

Error

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 696, in load
    return cls._from_block_document(block_document)
  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 551, in _from_block_document
    block = block_cls.parse_obj(block_document.data)
  File "pydantic/main.py", line 527, in pydantic.main.BaseModel.parse_obj
  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 183, in __init__
    super().__init__(*args, **kwargs)
  File "pydantic/main.py", line 342, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for GcsBucket
gcp_credentials -> service_account_file
  The provided path to the service account is invalid (type=value_error)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1445, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "flows/03_deployment/parameterized_flow.py", line 42, in write_gcs
    gcp_block = GcsBucket.load("zoom-gcs")
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 226, in coroutine_wrapper
    return run_async_from_worker_thread(async_fn, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 710, in load
    raise RuntimeError(
RuntimeError: Unable to load 'zoom-gcs' of block type None due to failed validation. To load without validation, try loading again with `validate=False`.
03:58:27 PM
write_gcs-67f8f48e-0
Finished in state Failed('Task run encountered an exception: Traceback (most recent call last):\n  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 696, in load\n    return cls._from_block_document(block_document)\n  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 551, in _from_block_document\n    block = block_cls.parse_obj(block_document.data)\n  File "pydantic/main.py", line 527, in pydantic.main.BaseModel.parse_obj\n  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 183, in __init__\n    super().__init__(*args, **kwargs)\n  File "pydantic/main.py", line 342, in pydantic.main.BaseModel.__init__\npydantic.error_wrappers.ValidationError: 1 validation error for GcsBucket\ngcp_credentials -> service_account_file\n  The provided path to the service account is invalid (type=value_error)\n\nThe above exception was the direct cause of the following exception:\n\nRuntimeError: Unable to load \'zoom-gcs\' of block type None due to failed validation. To load without validation, try loading again with `validate=False`.\n')
03:58:27 PM
write_gcs-67f8f48e-0
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 696, in load
    return cls._from_block_document(block_document)
  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 551, in _from_block_document
    block = block_cls.parse_obj(block_document.data)
  File "pydantic/main.py", line 527, in pydantic.main.BaseModel.parse_obj
  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 183, in __init__
    super().__init__(*args, **kwargs)
  File "pydantic/main.py", line 342, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for GcsBucket
gcp_credentials -> service_account_file
  The provided path to the service account is invalid (type=value_error)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 636, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "flows/03_deployment/parameterized_flow.py", line 56, in etl_web_to_gcs
    write_gcs(path)
  File "/usr/local/lib/python3.9/site-packages/prefect/tasks.py", line 436, in __call__
    return enter_task_run_engine(
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 926, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1064, in get_task_call_return_value
    return await future._result()
  File "/usr/local/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.9/site-packages/prefect/states.py", line 89, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1445, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "flows/03_deployment/parameterized_flow.py", line 42, in write_gcs
    gcp_block = GcsBucket.load("zoom-gcs")
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 226, in coroutine_wrapper
    return run_async_from_worker_thread(async_fn, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 710, in load
    raise RuntimeError(
RuntimeError: Unable to load 'zoom-gcs' of block type None due to failed validation. To load without validation, try loading again with `validate=False`.
03:58:27 PM
Finished in state Failed('Flow run encountered an exception. Traceback (most recent call last):\n  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 696, in load\n    return cls._from_block_document(block_document)\n  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 551, in _from_block_document\n    block = block_cls.parse_obj(block_document.data)\n  File "pydantic/main.py", line 527, in pydantic.main.BaseModel.parse_obj\n  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 183, in __init__\n    super().__init__(*args, **kwargs)\n  File "pydantic/main.py", line 342, in pydantic.main.BaseModel.__init__\npydantic.error_wrappers.ValidationError: 1 validation error for GcsBucket\ngcp_credentials -> service_account_file\n  The provided path to the service account is invalid (type=value_error)\n\nThe above exception was the direct cause of the following exception:\n\nRuntimeError: Unable to load \'zoom-gcs\' of block type None due to failed validation. To load without validation, try loading again with `validate=False`.\n')

System

Host machine

OS: Fedora Linux 37 (Workstation Edition) x86_64
Host: ASUS TUF Dash
Kernel: 6.0.9-300.fc37.x86_64
Python: 3.11

Prefect

Version:             2.7.9
API version:         0.8.4
Python version:      3.11.0
Git commit:          42b80f18
Built:               Thu, Jan 19, 2023 4:59 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         hosted

Remark

I guess for some reason prefect inside the container cannot read the credential file on my local machine. I tried mounting the path to my credential file in the Dockerfile but it also did not work.

Create GCS Project Push Step

Add a project step that allows users to push their project to a GCS bucket at a given path. Should accept credentials that can be used for authentication with GCP.

Create GCS Project Pull Step

Add a project step that allows users to pull their project from a GCS bucket at a given path. Should accept credentials that can be used for authentication with GCP.

Fix Official Docs on upload_from_dataframe

Expectation / Proposal

It was recently noticed on DataTalksClub's Slack that, the official documentation for upload_from_dataframe states that, among other options, parquet_gz is valid, which is inaccurate, as it should be parquet_gzip.

Since the MkDocs is generated from the method docstring:
image

I propose that we fix the documentation to avoid further errors or confusion by the user base.

Traceback / Example

import pandas as pd
from prefect_gcp.cloud_storage import GcsBucket

pandas_df = pd.read_csv(url="some_url")

gcs_bucket = GcsBucket.load("my-bucket")
gcs_bucket.upload_from_dataframe(
   df=pandas_df, 
   to_path="/path/to/gcs/blob.parquet", 
   serialization_format='parquet_gz'
)

Leads to KEY_ERROR as parquet_gz is not valid as the documentation suggests it to be.

'SSLError' (_ssl.c:2483) on upload_from_path method on GcsBucket instance

I am running into 'SSLError' (_ssl.c:2483) when using the upload_from_path method of GcsBucket object. Any help would be greatly appreciated.

Python File

from pathlib import Path
from prefect_gcp.cloud_storage import GcsBucket

color = "yellow"
year = 2019
month = 2
dataset_file = f"{color}_tripdata_{year}-{month:02}"

data_dir = f"data/{color}"
Path(data_dir).mkdir(parents=True, exist_ok=True)
path = Path(f"{data_dir}/{dataset_file}.parquet")

gcp_cloud_storage_bucket_block = GcsBucket.load("zoom-gcs")
print(gcp_cloud_storage_bucket_block.list_folders())
gcp_cloud_storage_bucket_block.upload_from_path(from_path=path, to_path=path.as_posix())

Log Output

    raise SSLError(e, request=request)
requests.exceptions.SSLError: HTTPSConnectionPool(host='[storage.googleapis.com](http://storage.googleapis.com/)', port=443): Max retries exceeded with url: /upload/storage/v1/b/prefect-de-zoomcamp247/o?uploadType=resumable&upload_id=ADPycdtK3ltdrHHYEXTE4TsWTiLf8b6kj9QPhuYMytVcWE6U2b7gumRC0A0Ganjzy4DzoXCzbQ3FWfxvLaEyxTAm_Ee7 (Caused by SSLError(SSLWantWriteError(3, 'The operation did not complete (write) (_ssl.c:2483)'))) (edited) 

Unable to set Vertex runner disk size (but was able to in prefect 1)

Expectation / Proposal

In prefect v1 we could specify the machine parameters (like boot disk size) of the vertex runner. This functionality seems missing from this v2 version. It's critical for my use case which deals with large data processing.

I am open to any work around which allows me to edit the "workerPoolSpecs" ultimately sent to GCS. I've tried editing my local aiplatform.py to ask for a 500gb disk as follows

from google.cloud.aiplatform_v1.types.machine_resources import DiskSpec

container_spec=container_spec, machine_spec=machine_spec, replica_count=1, disk_spec=DiskSpec(boot_disk_type="pd-ssd", boot_disk_size_gb=500)

But even if I deploy the VertexAICustomTrainingJob with save("name", overwrite=True) and re-rerun the deploy, new jobs on vertex still show this in the run parameters: "bootDiskSizeGb": 100 so it didn't seem to work.

VPC Connector annotations are in the wrong place in CloudRunJob

Expectation / Proposal

When I create a flow using the CloudRunJob infrastructure block with a vpc_connector_name specified, I receive error in the creation of the resource by the run.googleapis.com/v1 API. Investigating we found that the implementation of the annotations inside the jobs_body are in the wrong place (in "spec.metatada.annotations"), but according to docs there must be located inside "spec.template.metadata.annotations".

We did a lot of tests and managed to fix it

Traceback / Example

State message: Submission failed. googleapiclient.errors.HttpError: <HttpError 400 when requesting https://us-west2- 
run.googleapis.com/apis/run.googleapis.com/v1/namespaces/ba-basic/jobs?alt=json returned 
"metadata.annotations[run.googleapis.com/vpc-access-connector]: Annotation 'run.googleapis.com/vpc-access-connector' is not 
supported on resources of kind 'Job'. Supported kinds are: Revision, Execution". Details: "[{'@type': 
'type.googleapis.com/google.rpc.BadRequest', 'fieldViolations': [{'field': 'metadata.annotations[run.googleapis.com/vpc-access- 
connector]', 'description': "Annotation 'run.googleapis.com/vpc-access-connector' is not supported on resources of kind 'Job'. 
Supported kinds are: Revision, Execution"}]}]">

name 'BigQueryClient' is not defined

Installed via 'poetry add prefect_gcp'

Importing this:

from prefect import flow, task
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import cloud_storage_upload_blob_from_file

the second line triggers this exception:

Traceback (most recent call last): File "main.py", line 4, in <module> from prefect_gcp import GcpCredentials File "/Users/timodechau/Library/Caches/pypoetry/virtualenvs/datafunctions-_z1_YbqT-py3.9/lib/python3.9/site-packages/prefect_gcp/__init__.py", line 2, in <module> from .credentials import GcpCredentials # noqa File "/Users/timodechau/Library/Caches/pypoetry/virtualenvs/datafunctions-_z1_YbqT-py3.9/lib/python3.9/site-packages/prefect_gcp/credentials.py", line 62, in <module> class GcpCredentials: File "/Users/timodechau/Library/Caches/pypoetry/virtualenvs/datafunctions-_z1_YbqT-py3.9/lib/python3.9/site-packages/prefect_gcp/credentials.py", line 174, in GcpCredentials ) -> BigQueryClient: NameError: name 'BigQueryClient' is not defined

`list_blobs` and `list_folder` log incorrect bucket name

Expectation / Proposal

The list_blobs and list_folder methods on GcsBucket log what bucket and path they are listing blobs or folders for.

Traceback / Example

When running this code:

from prefect_gcp import GcsBucket

bucket = GcsBucket(
    bucket="my-bucket",
)

bucket.list_folders()

The log output is:

08:30:20.932 | INFO    | prefect.GcsBucket - Listing folders in bucket None.
08:30:21.002 | INFO    | prefect.GcsBucket - Listing blobs in bucket None.

Instead, the log messages should be:

08:30:20.932 | INFO    | prefect.GcsBucket - Listing folders in bucket 'my-bucket'.
08:30:21.002 | INFO    | prefect.GcsBucket - Listing blobs in bucket 'my-bucket`.

When running this code:

from prefect_gcp import GcsBucket

bucket = GcsBucket(
    bucket="my-bucket",
    bucket_folder="my-folder",
)

bucket.list_folders()

The log output is:

08:30:20.932 | INFO    | prefect.GcsBucket - Listing folders in bucket my-folder.
08:30:21.002 | INFO    | prefect.GcsBucket - Listing blobs in bucket my-folder.

Instead, the log messages should be something like:

08:30:20.932 | INFO    | prefect.GcsBucket - Listing folders in folder 'my-folder' in bucket 'my-bucket'.
08:30:21.002 | INFO    | prefect.GcsBucket - Listing blobs in folder 'my-folder' in bucket 'my-bucket`.

Creating CloudRunJob Block Error -> TypeError: Object of type 'SecretDict' is not JSON serializable

Hello,

I have been using this blogpost as a walkthrough for setting up Prefect Agent with Compute Engine, and flows with Cloud Run Job Block.

Unfortunately, I am running into issues with the GCP Credentials block I think.

Specifically I am receiving this error when I save a Cloud Run Job block:

File "pydantic/main.py", line 505, in pydantic.main.BaseModel.json
  File "/opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/json/encoder.py", line [199](https://github.com/japerry911/crypto-data-pipeline/actions/runs/3834160612/jobs/6526312987#step:8:205), in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "pydantic/json.py", line 90, in pydantic.json.pydantic_encoder
TypeError: Object of type 'SecretDict' is not JSON serializable
Error: Process completed with exit code 1.

I have the following dependencies in my Poetry pyproject.toml, any idea if I am missing anything, or is this a bug within Prefect-GCP?:

[tool.poetry.dependencies]
python = "~3.10"
prefect-gcp = "^0.2.2"
google-cloud-storage = "^2.7.0"
google-cloud-bigquery = "^3.4.1"
google-auth-oauthlib = "^0.8.0"
google-auth = "^2.15.0"
pydata-google-auth = "^1.4.0"

Whatever it is, is there any way I could help address this if it's not an error on my end?

Thanks team.

Upload pandas DataFrame to GoogleCloudStorageBucket

Issue

  • As of 0.2.6, GcsBucket does not allow Developers to upload a pandas Dataframe, as a .csv / .parquet (and their compressed variants) as a Gcs blob.

  • Therefore, users are enforced to first persist the DataFrame into the local filesystem path, to only then, upload from the said path with upload_from_path, upload_from_folder, or upload_from_file

Proposal:

  • One can write a Dataframe as .into a io.ByteIO( ) buffer, and use that onto upload_from_file_object( )

  • Include a upload_from_dataframe method for convenience and better developer experience to handle the boilerplate code and allow the DataFrame serialization as .csv, .csv.gz, .parquet, .parquet.snappy, and .parquet.gz

Traceback / Example

Create Prefect GCP Worker that works with Cloud Run v2

I am going to work on creating a Prefect GCP Worker that works with the Cloud Run v2 Block (in this PR). This worker will be very similar to current worker, but will be able to handle Cloud Run v2 instead of Cloud Run v1.

Expectation / Proposal

  • Worker v2 class

  • Base Worker class

    • since a lot of the code in Worker v1 will be reused in Worker v2
  • I would like to help contribute a pull request to resolve this!

    • I will start working on this now

Add External Table options for bigquery_create_table

It would be really awesome if the bigquery_create_table task supported options to build an ExternalConfig so we could define our BigQuery table as an external table. I made an attempt to write a version of this task in my own project, and it works for my needs, but doesn't cover all the available options for external tables.

@task
async def bigquery_create_table(
    dataset: str,
    table: str,
    gcp_credentials: "GcpCredentials",
    schema: List[SchemaField] = None,
    clustering_fields: List[str] = None,
    time_partitioning: TimePartitioning = None,
    project: Optional[str] = None,
    location: str = "US",
    external_config: Dict = None,
) -> str:
    """
    Creates table in BigQuery.
    Args:
        dataset: Name of a dataset in that the table will be created.
        table: Name of a table to create.
        schema: Schema to use when creating the table.
        gcp_credentials: Credentials to use for authentication with GCP.
        clustering_fields: List of fields to cluster the table by.
        time_partitioning: `bigquery.TimePartitioning` object specifying a partitioning
            of the newly created table
        project: Project to initialize the BigQuery Client with; if
            not provided, will default to the one inferred from your credentials.
        location: location of the dataset that will be written to.
        external_config: Source format, source uris, and additional settings.
    Returns:
        Table name.
    Example:
        ```python
        from prefect import flow
        from prefect_gcp import GcpCredentials
        from prefect_gcp.bigquery import bigquery_create_table
        from google.cloud.bigquery import SchemaField
        @flow
        def example_bigquery_create_table_flow():
            gcp_credentials = GcpCredentials(project="project")
            schema = [
                SchemaField("number", field_type="INTEGER", mode="REQUIRED"),
                SchemaField("text", field_type="STRING", mode="REQUIRED"),
                SchemaField("bool", field_type="BOOLEAN")
            ]
            result = bigquery_create_table(
                dataset="dataset",
                table="test_table",
                schema=schema,
                gcp_credentials=gcp_credentials
            )
            return result
        example_bigquery_create_table_flow()
        ```
    """
    logger = get_run_logger()
    logger.info("Creating %s.%s", dataset, table)

    client = gcp_credentials.get_bigquery_client(project=project, location=location)
    try:
        partial_get_dataset = partial(client.get_dataset, dataset)
        dataset_ref = await to_thread.run_sync(partial_get_dataset)
    except NotFound:
        logger.debug("Dataset %s not found, creating", dataset)
        partial_create_dataset = partial(client.create_dataset, dataset)
        dataset_ref = await to_thread.run_sync(partial_create_dataset)

    table_ref = dataset_ref.table(table)
    try:
        partial_get_table = partial(client.get_table, table_ref)
        await to_thread.run_sync(partial_get_table)
        logger.info("%s.%s already exists", dataset, table)
    except NotFound:
        logger.debug("Table %s not found, creating", table)

        table_obj = Table(table_ref, schema=schema)

        # external config stuff added here
        if external_config:
            external_data_configuration = ExternalConfig(
                external_config["source_format"]
            )

            external_data_configuration.source_uris = external_config["source_uris"]

            if external_config["hive_partitioning"]:
                hive_partitioning_options = HivePartitioningOptions()
                hive_partitioning_options.mode = external_config["hive_partitioning"][
                    "mode"
                ]
                hive_partitioning_options.source_uri_prefix = external_config[
                    "hive_partitioning"
                ]["source_uri_prefix"]
                hive_partitioning_options.require_partition_filter = external_config[
                    "hive_partitioning"
                ]["require_partition_filter"]
                external_data_configuration.hive_partitioning = (
                    hive_partitioning_options
                )

            if external_config["autodetect"]:
                external_data_configuration.autodetect = external_config["autodetect"]

            if external_config["source_format"] == "BIGTABLE":
                external_data_configuration.bigtable_options = external_config[
                    "bigtable_options"
                ]

            elif external_config["source_format"] == "CSV":
                external_data_configuration.csv_options = external_config["csv_options"]

            elif external_config["source_format"] == "GOOGLE_SHEETS":
                external_data_configuration.google_sheets_options = external_config[
                    "google_sheets_options"
                ]

            table_obj.external_data_configuration = external_data_configuration

        # cluster for optimal data sorting/access
        if clustering_fields:
            table_obj.clustering_fields = clustering_fields

        # partitioning
        if time_partitioning:
            table_obj.time_partitioning = time_partitioning

        partial_create_table = partial(client.create_table, table_obj)
        await to_thread.run_sync(partial_create_table)

    return table

GcsBucket does not work with deployments

  File "/Users/andrew/mambaforge/envs/gcp/lib/python3.10/site-packages/prefect/deployments.py", line 318, in location
    if not self.storage.basepath.endswith("/")
AttributeError: 'GcsBucket' object has no attribute 'basepath'

Expectation / Proposal

It should work like Gcs

Traceback / Example

Fix `upload_from_dataframe` Compressed Parquet serialization to .gz.parquet & .snappy.parquet

Expectation / Proposal

When upload_from_dataframe was implemented, we went with the file extensions .parquet.snappy and .parquet.gz for the compressed parquet serialization formats.

However, it recently came to my attention that when you're using tools like Tad and others to visualize tabular data, they expect the file extension to end with .parquet instead

Meaning, it works if one renames file.parquet.snappy to file.snappy.parquet, or file.parquet.gz to file.gz.parquet. But that's a hassle for the Developer Experience if you're dealing with multiple files.

I also noticed that Distributed Computing Frameworks such as Apache Spark and Apache Flink are actually saving compressed parquets as .snappy.parquet or .gz.parquet instead.

image

image

Therefore, I believe, it'd be much better to be compliant with the industry standard distributed frameworks (Spark and Flink), and also have a better developer experience for people browsing data with Tad or others.

Traceback / Example

Accept other kwargs for file upload task

The file upload task does not provide a content_type variable:

async def cloud_storage_upload_blob_from_file(

Hence, it relies on the default behavior of Google Cloud SDK's MIME type detection, and cannot be overridden: https://googleapis.dev/python/storage/latest/storage/blobs.html#google.cloud.storage.blob.Blob.upload_from_filename

The above documentation says that this method's MIME type detection works as follows:

Upload this blob’s contents from the content of a named file.

The content type of the upload will be determined in order of precedence:

  • The value passed in to this method (if not None)
  • The value stored on the current blob
  • The value given by mimetypes.guess_type
  • The default value (‘application/octet-stream’)

Add timeoutSeconds integration with Cloud Run TaskSpec

This section:

"spec": {"containers": containers} # TaskSpec

could be enhanced to pass more arguments to the Cloud Run TaskSpec object. It currently only passes containers.

The parameter I need for my project is timeoutSeconds:

Duration in seconds the task may be active before the system will actively try to mark it failed and kill associated containers. This applies per attempt of a task, meaning each retry can run for the full timeout.

Reason:

The timeout parameter (L279) of CloudRunJob doesn't update the timeoutSeconds in the YAML file for the same job sent to GCP. Therefore the default timeout of 10min (see docs) is enforced on GCP. As a Prefect 2.0 user I currently can't run any Cloud Run Jobs longer than 10min because they get automatically shut down by the GCP job handler.

Bug: trigger_dbt_cli_command for BigQuery target fails, if credentials are provided with service_account_info

I use the prefect-dbt package together with prefect-gcp to run dbt cli commands against BigQuery.
The flow runs into an exception, if I use service_account_info instead of service_account_file in the GcpCredentials object.

Note: As the exception is about GcpCredentials, I opened the issue in this repository. If the issue however is related to prefect-dbt, let me know

Steps to reproduce

  1. Create a flow with trigger_dbt_cli_command and using BigQueryTargetConfigs as target.
    from prefect import flow, task, get_run_logger
    from prefect.task_runners import SequentialTaskRunner
    from prefect_gcp import GcpCredentials
    from logging import Logger
    from prefect_dbt.cli.credentials import DbtCliProfile
    from prefect_dbt.cli.commands import trigger_dbt_cli_command
    from prefect_dbt.cli.configs import BigQueryTargetConfigs
    import json
    
    def create_dbt_profile():
        credentials = GcpCredentials(
            service_account_info=json.dumps({
      "type": "ssss",
      "project_id": "ssss",
      "private_key_id": "ssss",
      "private_key": "sssss",
      "client_email": "ssss",
      "client_id": "sssss",
      "auth_uri": "ssssss",
      "token_uri": "sssssss",
      "auth_provider_x509_cert_url": "ssssss",
      "client_x509_cert_url": "ssssss"
    })
            # service_account_file = "/mypath/google_sa_keys/vn_at_vol_at_bigquery.json"
        )
    
        target_configs = BigQueryTargetConfigs(
            schema="dbt_anigg",
            project="ssss",
            credentials=credentials,
            location="EU",
            maximum_bytes_billed= 10000000000,
            type="bigquery",
            threads=8
        )
    
        dbt_cli_profile = DbtCliProfile(
            name="test",
            target="dev",
            target_configs=target_configs
        )
    
        return dbt_cli_profile
    
    # %%
    @flow(name="run_dbt_pipeline",
    description="Runs a dbt pipeline containing dbt test and dbt run for the specified selects",
    task_runner=SequentialTaskRunner, timeout_seconds=3600)
    def run_dbt_pipeline(should_test = True, should_run = True):
        """
        Runs a dbt pipeline containing dbt test and dbt run for the specified selects.
        The steps (test and run) can be selected or deselected via flow parameters.
        """
        logger = get_run_logger()
        dbt_cli_profile = create_dbt_profile()
    
        if should_test:
            result = trigger_dbt_cli_command(
                "dbt test",
                overwrite_profiles=True,
                dbt_cli_profile=dbt_cli_profile,
                profiles_dir="./",
                project_dir="/mydir"
            )
        return
    
    
    # %%
    if __name__ == "__main__":
        run_dbt_pipeline(should_test=True, should_run=False)
  2. Run the flow

Result

The flow runs into an exception. Summary:

pydantic.error_wrappers.ValidationError: 1 validation error for GcpCredentials
service_account_info
  JSON object must be str, bytes or bytearray (type=type_error.json)

Full Exception see at the bottom of this issue:

Expected

No exception

Additional info

  • it's the same outcome whether one json.dumps the service account dict or directly uses the dict
  • If one uses the service_account_file parameter instead of service_account_info - everything works as expected!

Full Exception

13:15:56.822 | ERROR   | Flow run 'enlightened-raptor' - Finished in state Failed('Flow run encountered an exception.')
Traceback (most recent call last):
  File "/home/andreas/github/datawarehouse/prefect/flows/dbt/dbt_flows.py", line 77, in <module>
    run_dbt_pipeline(should_test=True, should_run=False)
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/flows.py", line 384, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/engine.py", line 156, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/engine.py", line 236, in create_then_begin_flow_run
    return state.result()
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/engine.py", line 587, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 114, in run_sync_in_interruptible_worker_thread
    async with anyio.create_task_group() as tg:
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 96, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "/home/andreas/github/datawarehouse/prefect/flows/dbt/dbt_flows.py", line 65, in run_dbt_pipeline
    result = trigger_dbt_cli_command(
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/tasks.py", line 294, in __call__
    return enter_task_run_engine(
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/engine.py", line 727, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/engine.py", line 859, in create_task_run_then_submit
    return await future._result()
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/futures.py", line 227, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/task_runners.py", line 214, in submit
    result = await run_fn(**run_kwargs)
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/engine.py", line 1000, in begin_task_run
    return await orchestrate_task_run(
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/engine.py", line 1065, in orchestrate_task_run
    resolved_parameters = await resolve_inputs(parameters)
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/engine.py", line 1291, in resolve_inputs
    return await run_sync_in_worker_thread(
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/utilities/collections.py", line 300, in visit_collection
    items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/utilities/collections.py", line 300, in <listcomp>
    items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/utilities/collections.py", line 266, in visit_nested
    return visit_collection(
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/utilities/collections.py", line 313, in visit_collection
    items = [visit_nested(getattr(expr, key)) for key in model_fields]
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/utilities/collections.py", line 313, in <listcomp>
    items = [visit_nested(getattr(expr, key)) for key in model_fields]
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/utilities/collections.py", line 266, in visit_nested
    return visit_collection(
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/utilities/collections.py", line 313, in visit_collection
    items = [visit_nested(getattr(expr, key)) for key in model_fields]
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/utilities/collections.py", line 313, in <listcomp>
    items = [visit_nested(getattr(expr, key)) for key in model_fields]
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/utilities/collections.py", line 266, in visit_nested
    return visit_collection(
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/utilities/collections.py", line 323, in visit_collection
    model_instance = typ(
  File "/home/andreas/miniconda3/envs/prefect-dbt/lib/python3.10/site-packages/prefect/blocks/core.py", line 165, in __init__
    super().__init__(*args, **kwargs)
  File "pydantic/main.py", line 341, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for GcpCredentials
service_account_info
  JSON object must be str, bytes or bytearray (type=type_error.json)

Setting VPC Cloud Run Blocks

We are trying to upgrade to Orion and start use GCP Cloud Run Blocks to run our new jobs. However it seems like the integration is incomplete and does not support setting the VPC for the cloud run job. This is supported by google through its python API just not in the prefect-gcp. Can you advise if we are right? @ahuang11

https://github.com/PrefectHQ/prefect-gcp/blob/main/prefect_gcp/cloud_run.py

https://cloud.google.com/run/docs/reference/rpc/google.cloud.run.v1#executiontemplatespec

Add collection sync workflow using cruft

Add cruft to repo to allow synchronization of this collection with the original template.
Cruft can be added by running cruft link. Note that a starting commit will need to be specified.
Using the commit of the prefect-collection-template closest to the generation date of this repo
is a good default.

VertexAICustomTrainingJob does not have accelerator_count

Expectation / Proposal

Using the VertexAICustomTrainingJob class to attach a GPU to a custom training job would work. It turns out that the MachineSpec submitted does not include an accelerator_count, which means that specifying an accelerator_type breaks this block

Traceback / Example

"Submission failed. Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 72, in error_remapped_callable return callable_(*args, **kwargs) File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 1030, in call return _end_unary_response_blocking(state, call, False, None) File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 910, in _end_unary_response_blocking raise _InactiveRpcError(state) # pytype: disable=not-instantiable grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.INVALID_ARGUMENT details = "List of found errors: 1.Field: job_spec.worker_pool_specs[0].machine_spec.accelerator_type; Message: Both accelerator_type and accelerator_count should be specified or none. " debug_error_string = "UNKNOWN:Error received from peer ipv4:74.125.69.95:443 {created_time:"2023-04-24T13:37:52.582665137+00:00", grpc_status:3, grpc_message:"List of found errors:\t1.Field: job_spec.worker_pool_specs[0].machine_spec.accelerator_type; Message: Both accelerator_type and accelerator_count should be specified or none.\t"}" > The above exception was the direct cause of the following exception: google.api_core.exceptions.InvalidArgument: 400 List of found errors: 1.Field: job_spec.worker_pool_specs[0].machine_spec.accelerator_type; Message: Both accelerator_type and accelerator_count should be specified or none. [field_violations { field: "job_spec.worker_pool_specs[0].machine_spec.accelerator_type" description: "Both accelerator_type and accelerator_count should be specified or none." } ]"

I opened #174 for this fix

Pydantic type inconsistency on service_account_info parameter

Hi there,

We were trying to get the use the GcpCredentials block passing a service_account_info value, but the validation from Pydantic is not enabling it to be a Dictionary, which actually is expected in the _get_credentials_from_service_account method. Running the command below would raise an error.

gcp_secret = {"xx": "yy"...}
gcp_credentials = GcpCredentials(
		service_account_info=gcp_secret,
		project="project"
	)

Looking into the code from credentials.py file , it seems the types are not compatible:

    service_account_file: Optional[Path] = None
    service_account_info: Optional[Json] = None    #HERE
    project: Optional[str] = None

    @staticmethod
    def _get_credentials_from_service_account(
        service_account_file: Optional[str] = None,
        service_account_info: Optional[Dict[str, str]] = None,    #vs HERE
    ) -> Credentials:

Our workaround was to change the value from Optional[Json] to Optional[Dict], which seems to have fixed the problem, but more testing would be needed.

vpc-access-connector error in Google Cloud Run Work Pool

Expectation / Proposal

I believe the default yaml file for the Google Cloud Run Work Pool needs to be updated in order to match googles change for the location of the vpc-access-connector annotation. The tag should be moved to the template's metadata according to: VPC with Connectors

I changed the yaml file accordingly in the advanced tab which resolved the error:

{
  "job_configuration": {
    "command": "{{ command }}",
    "env": "{{ env }}",
    "labels": "{{ labels }}",
    "name": "{{ name }}",
    "region": "{{ region }}",
    "credentials": "{{ credentials }}",
    "job_body": {
      "apiVersion": "run.googleapis.com/v1",
      "kind": "Job",
      "metadata": {
        "name": "{{ name }}",
        "annotations": {
          "run.googleapis.com/launch-stage": "BETA"
        }
      },
      "spec": {
        "template": {
          "metadata": {
            "annotations": {
              "run.googleapis.com/vpc-access-connector": "{{ vpc_connector_name }}"
            }
          },

Traceback / Example

googleapiclient.errors.HttpError: <HttpError 400 when requesting https://northamerica-northeast2-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/bettercartanalytics/jobs?alt=json returned "metadata.annotations[run.googleapis.com/vpc-access-connector]: Annotation 'run.googleapis.com/vpc-access-connector' is not supported on resources of kind 'Service'. Supported kinds are: Revision, Execution". Details: "[{'@type': 'type.googleapis.com/google.rpc.BadRequest', 'fieldViolations': [{'field': 'metadata.annotations[run.googleapis.com/vpc-access-connector]', 'description': "Annotation 'run.googleapis.com/vpc-access-connector' is not supported on resources of kind 'Service'. Supported kinds are: Revision, Execution"}]}]">

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.