Problem Statement
Currently, when users define a databricks job, they have the ability to add pip packages to their Python environment at a task level. However, the DatabricksWorkflowTaskGroup has not exposed this functionality, meaning that our users can only run jobs on base databricks clusters. This is especially blocking since Databricks Job clusters do not allow for defining python libraries at the job cluster creation level, so there is no way to add the libraries to the job_cluster_spec.
Proposed Solution
We propose adding the ability to define pip packages at both the task group and task level within the DatabricksWorkflowTaskGroup. This will look like an array of strings that specify packages. Here is an example of what the DAG could look like:
with dag:
# [START howto_databricks_workflow_notebook]
task_group = DatabricksWorkflowTaskGroup(
group_id="test_workflow",
databricks_conn_id=DATABRICKS_CONN_ID,
job_clusters=job_cluster_spec,
notebook_params=[],
notebook_packages=[
{
"pypi": {
"package": "simplejson"
}
},
{
"pypi": {
"package": "tensorflow"
}
}],
)
with task_group:
notebook_1 = DatabricksNotebookOperator(
task_id="notebook_1",
databricks_conn_id=DATABRICKS_CONN_ID,
notebook_path="/Users/[email protected]/Test workflow",
source="WORKSPACE",
job_cluster_key="Shared_job_cluster",
notebook_packages=[{
"pypi": {
"package": "foo",
"repo": "http://pypi.org"
}
}],
)
notebook_2 = DatabricksNotebookOperator(
task_id="notebook_2",
databricks_conn_id=DATABRICKS_CONN_ID,
notebook_path="/Users/[email protected]/Test workflow",
source="WORKSPACE",
job_cluster_key="Shared_job_cluster",
notebook_params={
"foo": "bar",
},
)
notebook_1 >> notebook_2
Implementation details
The big caveat here is that there is no job-level definition for python packages in the Databricks Jobs API. So when we offer a tasgroup-level package definition, it is more as a convenience function that would need to be applied against all tasks. Also please note that we would want to have a separate "notebook_packages" and "python_packages" as databricks specifies a difference between notebooks and python scripts.
Once the global and task-level packages have been merged, we would modify the convert_to_databricks_workflow_task
function to include the "libraries" in the task definition json.
"tasks": [
{
"task_key": "example_databricks_workflow__test_workflow__notebook_1",
"notebook_task": {
"notebook_path": "/Users/[email protected]/Test workflow",
"source": "WORKSPACE"
},
"job_cluster_key": "Shared_job_cluster",
"libraries": [
{
"pypi": {
"package": "simplejson"
}
},
{
"pypi": {
"package": "tensorflow"
}
},
{
"pypi": {
"package": "foo",
"repo": "http://pypi.org"
}
}
],
"timeout_seconds": 0,
"email_notifications": {}
},
Please note that we are using dictionaries as there are a wide array of sources users can use for libraries (e.g. s3, jars from maven, etc.) so it would be a pain to define and maintain these connections.