Skip to content

Commit

Permalink
Feat: Interrupt Flag (#251)
Browse files Browse the repository at this point in the history
* Introduced  `--fail-fast` flag to cause sayn to skip the remaining execution if a Task in the DAG fails.
  • Loading branch information
hustic authored Jan 16, 2024
1 parent b897984 commit 729eda2
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 6 deletions.
51 changes: 48 additions & 3 deletions sayn/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(
full_load=False,
start_dt=None,
end_dt=None,
fail_fast=False,
):
super().__init__()

Expand Down Expand Up @@ -68,6 +69,9 @@ def __init__(
if upstream_prod is not None:
self.run_arguments.upstream_prod = upstream_prod

if fail_fast is not None:
self.run_arguments.fail_fast = fail_fast

self.start_app()


Expand Down Expand Up @@ -117,6 +121,13 @@ def parser_process(value, state):
"--debug", "-d", is_flag=True, default=False, help="Include debug messages"
)

click_fail_fast = click.option(
"--fail-fast",
is_flag=True,
default=False,
help="Interrupt remaining task execution on first failure.",
)


def click_filter(func):
func = click.option(
Expand Down Expand Up @@ -168,6 +179,7 @@ def click_incremental(func):

def click_run_options(func):
func = click_debug(func)
func = click_fail_fast(func)
func = click.option("--profile", "-p", help="Profile from settings to use")(func)
func = click_incremental(func)
func = click_filter(func)
Expand All @@ -190,7 +202,17 @@ def init(sayn_project_name):

@cli.command(help="Compile sql tasks.")
@click_run_options
def compile(debug, tasks, exclude, upstream_prod, profile, full_load, start_dt, end_dt):
def compile(
debug,
tasks,
exclude,
upstream_prod,
profile,
full_load,
start_dt,
end_dt,
fail_fast,
):

tasks = [i for t in tasks for i in t.strip().split(" ")]
exclude = [i for t in exclude for i in t.strip().split(" ")]
Expand All @@ -204,6 +226,7 @@ def compile(debug, tasks, exclude, upstream_prod, profile, full_load, start_dt,
full_load,
start_dt,
end_dt,
fail_fast,
)

app.compile()
Expand All @@ -215,7 +238,17 @@ def compile(debug, tasks, exclude, upstream_prod, profile, full_load, start_dt,

@cli.command(help="Run SAYN tasks.")
@click_run_options
def run(debug, tasks, exclude, upstream_prod, profile, full_load, start_dt, end_dt):
def run(
debug,
tasks,
exclude,
upstream_prod,
profile,
full_load,
start_dt,
end_dt,
fail_fast,
):

tasks = [i for t in tasks for i in t.strip().split(" ")]
exclude = [i for t in exclude for i in t.strip().split(" ")]
Expand All @@ -229,6 +262,7 @@ def run(debug, tasks, exclude, upstream_prod, profile, full_load, start_dt, end_
full_load,
start_dt,
end_dt,
fail_fast,
)

app.run()
Expand All @@ -240,7 +274,17 @@ def run(debug, tasks, exclude, upstream_prod, profile, full_load, start_dt, end_

@cli.command(help="Test SAYN tasks.")
@click_run_options
def test(debug, tasks, exclude, upstream_prod, profile, full_load, start_dt, end_dt):
def test(
debug,
tasks,
exclude,
upstream_prod,
profile,
full_load,
start_dt,
end_dt,
fail_fast,
):

tasks = [i for t in tasks for i in t.strip().split(" ")]
exclude = [i for t in exclude for i in t.strip().split(" ")]
Expand All @@ -254,6 +298,7 @@ def test(debug, tasks, exclude, upstream_prod, profile, full_load, start_dt, end
full_load,
start_dt,
end_dt,
fail_fast,
)

app.test()
Expand Down
22 changes: 21 additions & 1 deletion sayn/core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class Folders:
command: Command = Command.UNDEFINED
upstream_prod: bool = False
is_prod: bool = False
fail_fast: bool = False

include: Set[str]
exclude: Set[str]
Expand Down Expand Up @@ -609,11 +610,15 @@ def execute_dag(self):
self.tracker.start_stage(
self.run_arguments.command.value, tasks=list(tasks_in_query.keys())
)
interrupt_flag = False

for task in self.tasks.values():
if not task.in_query:
continue

if interrupt_flag:
task.fail_fast = True

# We force the run/compile so that the skipped status can be calculated,
# but we only report if the task is in the query
# if task.in_query:
Expand All @@ -634,6 +639,9 @@ def execute_dag(self):
"finish_stage", duration=datetime.now() - start_ts, result=result
)

if self.run_arguments.fail_fast and result.is_err:
interrupt_flag = True

self.tracker.finish_current_stage(
tasks={k: v.status for k, v in tasks_in_query.items()},
test=True if self.run_arguments.command == Command.TEST else False,
Expand All @@ -643,7 +651,19 @@ def execute_dag(self):

def finish_app(self, error=None):
duration = datetime.now() - self.app_start_ts
if error is not None:
if self.run_arguments.fail_fast and error is not None:
self.tracker.report_event(
event="finish_stage",
duration=duration,
tasks={
k: v.status
if v.status in (TaskStatus.SUCCEEDED, TaskStatus.FAILED)
else TaskStatus.SKIPPED
for k, v in self.tasks.items()
},
)
sys.exit(-1)
elif error is not None:
self.tracker.report_event(
event="finish_app",
duration=duration,
Expand Down
3 changes: 3 additions & 0 deletions sayn/logging/fancy_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ def task_stage_finish(self, stage, duration, result):
if result.error.code == "parent_errors":
self.spinner.text_color = "yellow"
self.spinner.warn()
if result.error.code == "interrupted":
self.spinner.text_color = "yellow"
self.spinner.warn()
elif (
result.error.code == "setup_error"
and result.error.details["status"].value == "skipped"
Expand Down
4 changes: 4 additions & 0 deletions sayn/logging/log_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ def error_result(self, duration, error): # noqa: C901
f"Skipping due to ancestors errors: {parents} ({duration})"
)

elif error.code == "interrupted":
level = "warning"
message = self.warn(f"Skipping due to Interrupt signal ({duration})")

elif error.code == "setup_error":
if error.details["status"].value == "skipped":
level = "warning"
Expand Down
5 changes: 5 additions & 0 deletions sayn/tasks/task_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(
self.used_connections = set()
self.tracker = tracker
self.runner = None
self.fail_fast = False

self.name = name
self.group = group
Expand Down Expand Up @@ -266,6 +267,10 @@ def set_parameters(
self.compiler.update_globals(**task_parameters)

def check_skip(self):
if self.fail_fast:
self.status = TaskStatus.SKIPPED
return Err("task", "interrupted")

if self.run_arguments["command"] != "test":
failed_parents = {
p.name: p.status
Expand Down
Loading

0 comments on commit 729eda2

Please sign in to comment.