Skip to content

Commit

Permalink
Merge branch 'main' into feat/workflow_parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
mikita-sakalouski committed Sep 16, 2024
2 parents 9ad2fa6 + a0950c3 commit d4d51e7
Show file tree
Hide file tree
Showing 4 changed files with 1,546 additions and 1,225 deletions.
49 changes: 35 additions & 14 deletions brickflow/codegen/databricks_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,9 +501,11 @@ def _build_native_notebook_task(
depends_on: List[JobsTasksDependsOn],
**_kwargs: Any,
) -> JobsTasks:
notebook_task: JobsTasksNotebookTask = task.task_func()

try:
notebook_task: JobsTasksNotebookTask = task.task_func()
except Exception as e:
assert isinstance(notebook_task, JobsTasksNotebookTask)
except AssertionError as e:
raise ValueError(
f"Error while building notebook task {task_name}. "
f"Make sure {task_name} returns a NotebookTask object."
Expand All @@ -529,9 +531,11 @@ def _build_native_spark_jar_task(
depends_on: List[JobsTasksDependsOn],
**_kwargs: Any,
) -> JobsTasks:
spark_jar_task: JobsTasksSparkJarTask = task.task_func()

try:
spark_jar_task: JobsTasksSparkJarTask = task.task_func()
except Exception as e:
assert isinstance(spark_jar_task, JobsTasksSparkJarTask)
except AssertionError as e:
raise ValueError(
f"Error while building jar task {task_name}. "
f"Make sure {task_name} returns a SparkJarTask object."
Expand All @@ -557,9 +561,11 @@ def _build_native_spark_python_task(
depends_on: List[JobsTasksDependsOn],
**kwargs: Any,
) -> JobsTasks:
spark_python_task = task.task_func()

try:
spark_python_task: JobsTasksSparkPythonTask = task.task_func()
except Exception as e:
assert isinstance(spark_python_task, JobsTasksSparkPythonTask)
except AssertionError as e:
raise ValueError(
f"Error while building python task {task_name}. "
f"Make sure {task_name} returns a SparkPythonTask object."
Expand Down Expand Up @@ -598,13 +604,16 @@ def _build_native_run_job_task(
depends_on: List[JobsTasksDependsOn],
**_kwargs: Any,
) -> JobsTasks:
run_job_task: JobsTasksRunJobTask = task.task_func()

try:
run_job_task: JobsTasksRunJobTask = task.task_func()
except Exception as e:
assert isinstance(run_job_task, JobsTasksRunJobTask)
except AssertionError as e:
raise ValueError(
f"Error while building run job task {task_name}. "
f"Make sure {task_name} returns a RunJobTask object."
) from e

return JobsTasks(
**task_settings.to_tf_dict(), # type: ignore
run_job_task=JobsTasksRunJobTask(job_id=run_job_task.job_id),
Expand All @@ -620,14 +629,16 @@ def _build_native_sql_file_task(
depends_on: List[JobsTasksDependsOn],
**_kwargs: Any,
) -> JobsTasks:
sql_task: JobsTasksSqlTask = task.task_func()

try:
sql_task: JobsTasksSqlTask = task.task_func()
except Exception as e:
print(e)
assert isinstance(sql_task, JobsTasksSqlTask)
except AssertionError as e:
raise ValueError(
f"Error while building sql file task {task_name}. "
f"Make sure {task_name} returns a JobsTasksSqlTask object."
) from e

return JobsTasks(
**task_settings.to_tf_dict(), # type: ignore
sql_task=sql_task,
Expand All @@ -643,10 +654,11 @@ def _build_native_condition_task(
depends_on: List[JobsTasksDependsOn],
**_kwargs: Any,
) -> JobsTasks:
condition_task: JobsTasksConditionTask = task.task_func()

try:
condition_task: JobsTasksConditionTask = task.task_func()
except Exception as e:
print(e)
assert isinstance(condition_task, JobsTasksConditionTask)
except AssertionError as e:
raise ValueError(
f"Error while building If/else task {task_name}. "
f"Make sure {task_name} returns a JobsTasksConditionTask object."
Expand All @@ -668,6 +680,15 @@ def _build_dlt_task(
**_kwargs: Any,
) -> JobsTasks:
dlt_task: DLTPipeline = task.task_func()

try:
assert isinstance(dlt_task, DLTPipeline)
except AssertionError as e:
raise ValueError(
f"Error while building DLT task {task_name}. "
f"Make sure {task_name} returns a DLTPipeline object."
) from e

# tasks.append(Pipelines(**dlt_task.to_dict())) # TODO: fix this so pipeline also gets created
pipeline_ref = self.get_pipeline_reference(workflow, dlt_task)
return JobsTasks(
Expand Down
Loading

0 comments on commit d4d51e7

Please sign in to comment.