Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] Enable Support for Srverless Compute #165

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
1,499 changes: 912 additions & 587 deletions brickflow/bundles/model.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion brickflow/cli/bundles.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def bundle_synth(**kwargs: Any) -> None:


def get_bundle_cli_version() -> str:
return config(BrickflowEnvVars.BRICKFLOW_BUNDLE_CLI_VERSION.value, "0.210.2")
return config(BrickflowEnvVars.BRICKFLOW_BUNDLE_CLI_VERSION.value, "0.228.0")


def bundle_cli_setup() -> None:
Expand Down
53 changes: 41 additions & 12 deletions brickflow/codegen/databricks_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,17 +521,26 @@ def _build_native_notebook_task(
f"Make sure {task_name} returns a NotebookTask object."
) from e

return JobsTasks(
jt = JobsTasks(
**task_settings.to_tf_dict(),
notebook_task=notebook_task,
libraries=task_libraries,
depends_on=depends_on,
task_key=task_name,
# unpack dictionary provided by cluster object, will either be key or
# existing cluster id
**task.cluster.job_task_field_dict,
# existing cluster id, if cluster object is empty, Databricks will use serverless compute
**(task.cluster.job_task_field_dict if task.cluster else {}),
)

# Do not configure Notebook dependencies for Serverless clusters
if task.cluster:
jt.libraries = task_libraries
else:
_ilog.warning(
"Library definitions are not compatible with Serverless executions. "
"Use '%pip install' directly in the notebook instead."
)
return jt

def _build_native_spark_jar_task(
self,
task_name: str,
Expand Down Expand Up @@ -595,17 +604,25 @@ def _build_native_spark_python_task(
spark_python_task.parameters.append(k)
spark_python_task.parameters.append(v)

return JobsTasks(
jt = JobsTasks(
**task_settings.to_tf_dict(),
spark_python_task=spark_python_task,
libraries=task_libraries,
depends_on=depends_on,
task_key=task_name,
# unpack dictionary provided by cluster object, will either be key or
# existing cluster id
**task.cluster.job_task_field_dict,
# existing cluster id, if cluster object is empty, Databricks will use serverless compute
**(task.cluster.job_task_field_dict if task.cluster else {}),
)

if task.cluster:
jt.libraries = task_libraries
else:
jt.environment_key = (
"Default" # TODO: make configurable from task definition
)

return jt

def _build_native_run_job_task(
self,
task_name: str,
Expand Down Expand Up @@ -725,13 +742,21 @@ def _build_brickflow_entrypoint_task(
task.databricks_task_type_str: self.task_to_task_obj(task),
**task_settings.to_tf_dict(),
}, # type: ignore
libraries=task_libraries,
depends_on=depends_on,
task_key=task_name,
# unpack dictionary provided by cluster object, will either be key or
# existing cluster id
**task.cluster.job_task_field_dict,
# existing cluster id, if cluster object is empty, Databricks will use serverless compute
**(task.cluster.job_task_field_dict if task.cluster else {}),
)

# Do not configure Notebook dependencies for Serverless clusters
if task.cluster:
task_obj.libraries = task_libraries
else:
_ilog.warning(
"Library definitions are not compatible with Serverless executions. "
"Use '%pip install' directly in the 'entrypoint.py' instead."
)
return task_obj

def workflow_obj_to_tasks(
Expand Down Expand Up @@ -870,7 +895,6 @@ def proj_to_bundle(self) -> DatabricksAssetBundles:
job = Jobs(
name=workflow_name,
tasks=tasks,
git_source=git_conf,
tags=workflow.tags,
health=workflow.health,
job_clusters=[JobsJobClusters(**c) for c in workflow_clusters],
Expand All @@ -887,7 +911,12 @@ def proj_to_bundle(self) -> DatabricksAssetBundles:
trigger=workflow.trigger,
continuous=workflow.schedule_continuous,
parameters=workflow.parameters,
environments=workflow.environments,
)
# Make sure that `git_source` is not included in the job definition, otherwise model validation will fail
if git_conf:
job.git_source = git_conf

jobs[workflow_name] = job

pipelines.update(self.workflow_obj_to_pipelines(workflow))
Expand Down
2 changes: 1 addition & 1 deletion brickflow/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ class Task:
task_id: str
task_func: Callable
workflow: Workflow # noqa
cluster: Cluster
cluster: Optional[Cluster] = None
description: Optional[str] = None
libraries: List[TaskLibrary] = field(default_factory=lambda: [])
depends_on: List[Union[Callable, str]] = field(default_factory=lambda: [])
Expand Down
50 changes: 44 additions & 6 deletions brickflow/engine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
JobsParameters,
JobsTrigger,
JobsWebhookNotifications,
JobsEnvironments,
)
from brickflow.context import BrickflowInternalVariables
from brickflow.engine import ROOT_NODE
Expand All @@ -29,6 +30,8 @@
TaskNotFoundError,
TaskSettings,
TaskType,
PypiTaskLibrary,
WheelTaskLibrary,
)
from brickflow.engine.utils import wraps_keyerror

Expand Down Expand Up @@ -144,14 +147,18 @@ class Workflow:
max_tasks_in_workflow: int = 100
enable_plugins: Optional[bool] = None
parameters: Optional[List[JobsParameters]] = None
# environments should be defined for serverless workloads
environments: Optional[List[JobsEnvironments]] = None

def __post_init__(self) -> None:
self.graph.add_node(ROOT_NODE)
if self.default_cluster is None and self.clusters == []:
raise NoWorkflowComputeError(
f"Please configure default_cluster or "
f"clusters field for workflow: {self.name}"
logging.info(
"Default cluster details are not provided, switching to serverless compute."
)
self.environments = self.convert_libraries_to_environments
logging.debug(self.environments)

if self.prefix is None:
self.prefix = env_chain(
BrickflowEnvVars.BRICKFLOW_WORKFLOW_PREFIX.value,
Expand All @@ -164,7 +171,7 @@ def __post_init__(self) -> None:
BrickflowInternalVariables.workflow_suffix.value,
"",
)
if self.default_cluster is None:
if self.default_cluster is None and self.clusters:
# the default cluster is set to the first cluster if it is not configured
self.default_cluster = self.clusters[0]

Expand Down Expand Up @@ -254,6 +261,37 @@ def validate_schedule_configs(self) -> None:
"Please configure either PAUSED or UNPAUSED for schedule_continuous.pause_status"
)

@property
def convert_libraries_to_environments(self) -> List[Dict[Any, Any]]:
logging.info(
"Serverless workload detected, library dependencies will be converted to 'environments'!"
)
environments, dependencies = [], []
for lib in self.libraries:
if isinstance(lib, PypiTaskLibrary):
if lib.repo:
dependencies.append(
lib.repo
) # TODO: check if `--extra-index-url` is needed
dependencies.append(lib.package)
elif isinstance(lib, WheelTaskLibrary):
dependencies.append(lib.whl)
else:
logging.info(
"Serverless workload type only compatible with PyPi and Whl dependencies, skipping %s",
lib,
)
environments.append(
{
"environment_key": "Default",
"spec": {
"client": "1",
"dependencies": dependencies,
},
}
)
return environments

@property
def bfs_layers(self) -> List[str]:
return list(nx.bfs_layers(self.graph, ROOT_NODE))[1:]
Expand Down Expand Up @@ -339,8 +377,8 @@ def _add_task(
)

if self.default_cluster is None:
raise RuntimeError(
"Some how default cluster wasnt set please raise a github issue."
logging.info(
"Default cluster details are not provided, switching to serverless compute."
)

if self.log_timeout_warning(task_settings): # type: ignore
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# DO NOT MODIFY THIS FILE - IT IS AUTO GENERATED BY BRICKFLOW AND RESERVED FOR FUTURE USAGE
projects:
brickflow-serverless-demo:
brickflow_version: auto
deployment_mode: bundle
enable_plugins: true
name: brickflow-serverless-demo
path_from_repo_root_to_project_root: .
path_project_root_to_workflows_dir: workflows
version: v1
Loading
Loading