Giter VIP home page Giter VIP logo

prefect-planetary-computer's Introduction

prefect-planetary-computer

PyPI

Visit the full docs here to see additional examples and the API reference.

Prefect integrations with the Microsoft Planetary Computer (PC).

Overview

This collection includes a Credentials Block ๐Ÿ”‘ to store and retrieve a PC subscription key and Jupyter Hub token, with convenience methods to easily interact with the PC Data Catalog ๐ŸŒ and Dask Gateway ๐Ÿš€ server.

For more information about:

  • using Azure services with Prefect and the Planetary Computer, check out the prefect-azure collection.
  • the integration between Prefect and Dask, check out the prefect-dask collection.
  • taking advantage of the Planetary Computer data catalog and compute resources, check out the Planetary Computer documentation.

Resources

For more tips on how to use tasks and flows in a Collection, check out Using Collections!

Installation

Install prefect-planetary-computer with pip:

pip install prefect-planetary-computer

Requires an installation of Python 3.8+.

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the Prefect documentation.

Usage

!!! note * The following Examples are adapted from Planetary Computer - Scale with Dask.

- Require the following additional packages:
    ```
    pip install xarray zarr adlfs netcdf4 prefect_azure
    ```
- Make sure to share the same python dependencies - in particular `dask` and `distributed` - between the flow execution environment, the Dask Scheduler and Workers, [as explained in the Dask docs](https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments).

Computing Dask Collections

Dask collection computations, such as Dask DataFrames, can be supported from within a Prefect task by creating a Dask Gateway cluster using the credentials block within the main flow or task itself.

# Prefect tasks are executed using the default ConcurrentTaskRunner
# Dask Collections tasks are executed on a new temporary Dask cluster 

import xarray as xr

from prefect import flow, task, get_run_logger
from prefect_planetary_computer import PlanetaryComputerCredentials

from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_upload

pc_credentials = PlanetaryComputerCredentials.load("PC_BLOCK_NAME")
bs_credentials = AzureBlobStorageCredentials.load("BS_BLOCK_NAME")

@task
def compute_mean(asset):
    logger = get_run_logger()

    with pc_credentials.new_gateway_cluster(
        name="test-cluster",
        image="pangeo/pangeo-notebook:latest"
    ) as cluster:

        cluster.adapt(minimum=2, maximum=10)
        client = cluster.get_client()

        ds = xr.open_zarr(
            asset.href,
            **asset.extra_fields["xarray:open_kwargs"],
            storage_options=asset.extra_fields["xarray:storage_options"]
        )
        logger.info(f"Daymet dataset info\n: {ds}")
    
        timeseries = ds["tmin"].mean(dim=["x", "y"]).compute()
        logger.info(f"Mean timeseries info\n: {timeseries}")

    return timeseries

@flow
def pc_dask_flow():

    # get a configured PySTAC client
    catalog = pc_credentials.get_stac_catalog()

    # compute the minimum daily temperature averaged over all of Hawaii, 
    # using the Daymet dataset
    asset = catalog.get_collection("daymet-daily-hi").assets["zarr-abfs"]
    prefect_future = compute_mean.submit(asset)
    timeseries = prefect_future.result()

    # save NetCDF timeseries file
    timeseries.to_netcdf("timeseries.nc")

    # upload to 'my-container' blob storage container
    with open("timeseries.nc", "rb") as f:
        blob = blob_storage_upload(
            data=f.read(),
            container="my-container",
            blob="timeseries.nc",
            blob_storage_credentials=bs_credentials,
            overwrite=False,
        )

    # return the blob name of the uploaded timeseries object
    return blob

pc_dask_flow()

Using the Dask Task Runner

Prefect's prefect_dask.DaskTaskRunner automatically instatiates a temporary Dask cluster at flow execution time, enabling submission of both Prefect and Dask Collections tasks.

!!! warning - prefect-dask requires: distributed==2022.2.0; python_version < '3.8' distributed>=2022.5.0,<=2023.3.1 - It requires less configuration on the Dask Workers side when using Prefect Cloud, you can get started for free.

# Both Prefect tasks and Dask Collections task are executed
# on a new temporary Dask cluster 
import xarray as xr

from prefect import flow, task, get_run_logger
from prefect_planetary_computer import PlanetaryComputerCredentials

from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_upload

from prefect_dask import get_dask_client 

pc_credentials = PlanetaryComputerCredentials.load("PC_BLOCK_NAME")
bs_credentials = AzureBlobStorageCredentials.load("BS_BLOCK_NAME")

pc_runner = pc_credentials.get_dask_task_runner(
    cluster_kwargs={
        "image": "pangeo/pangeo-notebook:latest",
    },
    adapt_kwargs={'minimum': 1, 'maximum': 10, 'active': True}
)

@task
def compute_mean(asset):
    logger = get_run_logger()

    with get_dask_client() as client:
        ds = xr.open_zarr(
            asset.hr
            **asset.extra_fields["xarray:open_kwargs"],
            storage_options=asset.extra_fields["xarray:storage_options"]
        )
        logger.info(f"Daymet dataset info\n: {ds}")

        timeseries = ds["tmin"].mean(dim=["x", "y"]).compute()
        logger.info(f"Mean timeseries info\n: {timeseries}")

    return timeseries

@flow(task_runner=pc_runner)
def pc_dask_flow():
    
    # get a configured PySTAC client
    catalog = pc_credentials.get_stac_catalog()

    # compute the minimum daily temperature averaged over all of Hawaii, 
    # using the Daymet dataset
    asset = catalog.get_collection("daymet-daily-hi").assets["zarr-abfs"]

    mean_task = compute_mean.submit(asset)
    timeseries = mean_task.result()

    # save NetCDF timeseries file
    timeseries.to_netcdf("timeseries.nc")

    # upload to 'my-container' blob storage container
    with open("timeseries.nc", "rb") as f:
        blob = blob_storage_upload(
            data=f.read(),
            container="my-container",
            blob="timeseries.nc",
            blob_storage_credentials=bs_credentials,
            overwrite=False,
        )

    # return the blob name of the uploaded timeseries object
    return blob

pc_dask_flow()

Feedback

If you encounter any bugs while using prefect-planetary-computer, feel free to open an issue in the prefect-planetary-computer repository.

If you have any questions or issues while using prefect-planetary-computer, you can find help in either the Prefect Discourse forum or the Prefect Slack community.

Feel free to star or watch prefect-planetary-computer for updates too!

Contributing

If you'd like to help contribute to fix an issue or add a feature to prefect-planetary-computer, please propose changes through a pull request from a fork of the repository.

Here are the steps:

  1. Fork the repository
  2. Clone the forked repository
  3. Install the repository and its dependencies:
pip install -e ".[dev]"
  1. Make desired changes
  2. Add tests
  3. Insert an entry to CHANGELOG.md
  4. Install pre-commit to perform quality checks prior to commit:
pre-commit install
  1. git commit, git push, and create a pull request

prefect-planetary-computer's People

Contributors

dependabot[bot] avatar giorgiobasile avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar

prefect-planetary-computer's Issues

Provide configured DaskTaskRunner instead of subclassed task runner

Expectation / Proposal

Instead of defining a new specific class for a DaskTaskRunner using the PC gateway, it would be better to provide the user with a credentials getter that accepts PC cluster options and returns a base DaskTaskRunner configured with the necessary defaults.

Traceback / Example

from prefect import flow

pc_credentials = PlanetaryComputerCredentials.load("BLOCK_NAME")
pc_runner = pc_credentials.get_dask_task_runner()

@flow(task_runner=pc_runner)
def my_flow():
    ...

Add specialised TaskRunner for PC Dask Gateway

Expectation / Proposal

Add a PlanetaryComputerTaskRunner, inheriting from prefect_dask.DaskTaskRunner, with PC-related defaults.

Traceback / Example

import dask
from prefect import flow, task
from prefect_planetary_computer import PlanetaryComputerCredentials
from prefect_planetary_computer.task_runners import PlanetaryComputerTaskRunner

pc_task_runner = PlanetaryComputerTaskRunner(
  credentials=PlanetaryComputerCredentials.load("BLOCK_NAME")
)

@task
def compute_task():
    with get_dask_client() as client:
        df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
        summary_df = client.compute(df.describe())
    return summary_df

@flow(task_runner=pc_task_runner)
def dask_flow():
    prefect_future = compute_task.submit()
    return prefect_future.result()

Windows Tests actions fail

Problem

Windows Tests actions fail during the "Install dependencies" step - i.e. this one.

Traceback / Example

Getting requirements to build editable: started
  Getting requirements to build editable: finished with status 'error'
  error: subprocess-exited-with-error
  
  Getting requirements to build editable did not run successfully.
  exit code: 1
  
  [[21](https://github.com/giorgiobasile/prefect-planetary-computer/actions/runs/5832758908/job/15818800384#step:4:22) lines of output]
  Traceback (most recent call last):
    File "C:\hostedtoolcache\windows\Python\3.9.13\x64\lib\site-packages\pip\_vendor\pyproject_hooks\_in_process\_in_process.py", line 353, in <module>
      main()
    File "C:\hostedtoolcache\windows\Python\3.9.13\x64\lib\site-packages\pip\_vendor\pyproject_hooks\_in_process\_in_process.py", line 335, in main
      json_out['return_val'] = hook(**hook_input['kwargs'])
    File "C:\hostedtoolcache\windows\Python\3.9.13\x64\lib\site-packages\pip\_vendor\pyproject_hooks\_in_process\_in_process.py", line 132, in get_requires_for_build_editable
      return hook(config_settings)
    File "C:\Users\runneradmin\AppData\Local\Temp\pip-build-env-d5o5hepf\overlay\Lib\site-packages\setuptools\build_meta.py", line 450, in get_requires_for_build_editable
      return self.get_requires_for_build_wheel(config_settings)
    File "C:\Users\runneradmin\AppData\Local\Temp\pip-build-env-d5o5hepf\overlay\Lib\site-packages\setuptools\build_meta.py", line 341, in get_requires_for_build_wheel
      return self._get_build_requires(config_settings, requirements=['wheel'])
    File "C:\Users\runneradmin\AppData\Local\Temp\pip-build-env-d5o5hepf\overlay\Lib\site-packages\setuptools\build_meta.py", line 3[23](https://github.com/giorgiobasile/prefect-planetary-computer/actions/runs/5832758908/job/15818800384#step:4:24), in _get_build_requires
      self.run_setup()
    File "C:\Users\runneradmin\AppData\Local\Temp\pip-build-env-d5o5hepf\overlay\Lib\site-packages\setuptools\build_meta.py", line 487, in run_setup
      super(_BuildMetaLegacyBackend,
    File "C:\Users\runneradmin\AppData\Local\Temp\pip-build-env-d5o5hepf\overlay\Lib\site-packages\setuptools\build_meta.py", line 338, in run_setup
      exec(code, locals())
    File "<string>", line 12, in <module>
    File "C:\hostedtoolcache\windows\Python\3.9.13\x64\lib\encodings\cp1[25](https://github.com/giorgiobasile/prefect-planetary-computer/actions/runs/5832758908/job/15818800384#step:4:26)2.py", line 23, in decode
      return codecs.charmap_decode(input,self.errors,decoding_table)[0]
  UnicodeDecodeError: 'charmap' codec can't decode byte 0x8d in position 2[39](https://github.com/giorgiobasile/prefect-planetary-computer/actions/runs/5832758908/job/15818800384#step:4:40)4: character maps to <undefined>
  [end of output]
  
  note: This error originates from a subprocess, and is likely not a problem with pip.
error: subprocess-exited-with-error

Getting requirements to build editable did not run successfully.
exit code: 1

See above for output.

note: This error originates from a subprocess, and is likely not a problem with pip.
Error: Process completed with exit code 1.

Provide a PC credentials block and related clients

Provide a credentials block that handles the following:

Expectation / Proposal

The user shall be able to load the credential block and easily ask for a PySTAC client and a Dask Gateway client already configured to interact with the PC data and services. The clients will be instantiated with PC-related defaults, but it should be possible to provide other parameters compliant with the underlying libraries pystac-client and dask-gateway.

Traceback / Example

from prefect_planetary_computer import PlanetaryComputerCredentials
pc_credentials_block = PlanetaryComputerCredentials.load("BLOCK_NAME")

pc_stac_client = pc_credentials_block.get_pystac_client()
# do something with it (i.e. query a given collection)

gateway_client = pc_credentials_block.get_dask_gateway_client()
#ย do something with it (i.e. instantiate a new cluster)

Pydantic v2 support from Prefect main branch

Expectation / Proposal

Prefect now supports pydantic v2, and this is breaking tests against Prefect main branch.

The easiest way to fix it is to import from pydantic.v1 as explained in the pydantic migration guide.

Traceback / Example

from pydantic import VERSION as PYDANTIC_VERSION
if PYDANTIC_VERSION.startswith("2."):
    from pydantic.v1 import Field, SecretStr
else:
    from pydantic import Field, SecretStr

Add credentials block method to create a new Dask cluster

Expectation / Proposal

Instantiation of a new Dask cluster should be made available by the PlanetaryComputerCredentials block, using the Dask Gateway factory already implemented.

Traceback / Example

from prefect_planetary_computer import PlanetaryComputerCredentials

pc_credentials_block = PlanetaryComputerCredentials(
    subscription_key = "sub-key",
    hub_api_token = "hub-token"
)

cluster = pc_credentials_block.new_dask_gateway_cluster()
cluster.adapt(minimum=2, maximum=10)

# use client for computations 
client = cluster.get_client()

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.