Skip to content

Commit

Permalink
gaarf-wf: Workflow: 'fail fast' terminates workflow as soon as an err…
Browse files Browse the repository at this point in the history
…or occurred; improved logging of errors (retry/non-retry and final error)

introduced output_path argument to pass to CF

Change-Id: Ib407db00dd1c6fb2c6d61446d1d1b9fe558ff0a2
  • Loading branch information
evil-shrike committed Jul 29, 2024
1 parent a5171d3 commit 777d505
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 27 deletions.
140 changes: 114 additions & 26 deletions gcp/workflow/workflow-ads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ main:
- bq_dataset: ${args.bq_dataset}
- bq_dataset_location: ${map.get(args, "bq_dataset_location")}
- macros: ${map.get(args, "macros")}
- bq_writer_options: ${map.get(args, "bq_writer_options")}
- writer_options: ${map.get(args, "bq_writer_options")}
- output_path: ${default(map.get(args, "output_path"), "")}
- concurrency_limit: ${default(map.get(args, "concurrency_limit"), 20)}

# get CF 'gaarf' function's URL
Expand All @@ -32,14 +33,22 @@ main:
assign:
- gaarf_main_cf_uri: ${function.body.serviceConfig.uri}

- initialize:
assign:
- error_occurred: false
- error_message: ""
- iterate_over_scripts:
parallel:
shared: [scripts, accounts]
shared: [scripts, accounts, error_occurred, error_message]
concurrency_limit: ${concurrency_limit}
for:
value: script_item
in: ${scripts}
steps:
- check_for_errors:
switch:
- condition: ${error_occurred}
next: continue
- check_for_constant:
switch:
- condition: ${text.match_regex(script_item, "_constant")}
Expand All @@ -57,33 +66,96 @@ main:
bq_dataset: ${bq_dataset}
bq_dataset_location: ${bq_dataset_location}
ads_config_path: ${ads_config_path}
bq_writer_options: ${bq_writer_options}
writer_options: ${writer_options}
output_path: ${output_path}
is_constant: true
next: continue # continue loop over queries
# parallel nested loop over accounts
- parallel_loop_over_accounts:
parallel:
shared: [accounts]
concurrency_limit: ${concurrency_limit}
for:
value: account
in: ${accounts}
steps:
- execute_script:
call: executeAdsQuery
args:
cf_uri: ${gaarf_main_cf_uri}
script_path: ${script_item}
account: ${account}
macros: ${macros}
project: ${project}
bq_dataset: ${bq_dataset}
bq_dataset_location: ${bq_dataset_location}
ads_config_path: ${ads_config_path}
bq_writer_options: ${bq_writer_options}
try:
parallel:
shared: [accounts, error_occurred, error_message]
concurrency_limit: ${concurrency_limit}
for:
value: account
in: ${accounts}
steps:
- check_for_errors_nested:
switch:
- condition: ${error_occurred}
next: continue
- execute_script:
try:
call: executeAdsQuery
args:
cf_uri: ${gaarf_main_cf_uri}
script_path: ${script_item}
account: ${account}
macros: ${macros}
project: ${project}
bq_dataset: ${bq_dataset}
bq_dataset_location: ${bq_dataset_location}
ads_config_path: ${ads_config_path}
writer_options: ${writer_options}
output_path: ${output_path}
except:
as: e
steps:
- set_error_nested:
assign:
- error_occurred: true
- error_message: ${"Error in script " + script_item + " for account " + account + ":" + e.message}
- exit_loop:
next: continue
- end_nested_loop:
next: continue
except:
as: e
steps:
- set_error:
assign:
- error_occurred: true
- error_message: ${e.message}
- returnError:
raise: ${error_message}

- final_error_check:
switch:
- condition: ${error_occurred}
next: handle_error
next: workflow_success

- handle_error:
steps:
- log_error:
call: sys.log
args:
text: ${error_message}
severity: ERROR
- fail:
raise: ${error_message}

- workflow_success:
return: "Workflow completed successfully"

- end_workflow:
return: null

executeAdsQuery:
params: [cf_uri, script_path, account, macros, project, bq_dataset, bq_dataset_location, ads_config_path, bq_writer_options, is_constant: false]
params:
[
cf_uri,
script_path,
account,
macros,
project,
bq_dataset,
bq_dataset_location,
ads_config_path,
writer_options,
output_path,
is_constant: false,
]
steps:
- init_vars:
assign:
Expand All @@ -102,9 +174,11 @@ executeAdsQuery:
bq_dataset_location: ${bq_dataset_location}
customer_id: ${account}
single_customer: true # it's important to prevent fetching child accounts for the supplied cid
writer: "bq"
output_path: ${output_path}
body:
macro: ${macros}
bq_writer_options: ${bq_writer_options}
writer_options: ${writer_options}
auth:
type: OIDC
result: script_results
Expand Down Expand Up @@ -149,6 +223,20 @@ custom_retry_predicate:
# NOTE: sometime errors happen inside Workflow and there's no any code
# (i.e. "code" can be null, so DO NOT use operand ==,<,>,>=,<= without wrapping with `default`
- condition: ${"ConnectionFailedError" in tags or "ConnectionError" in tags or "TimeoutError" in tags or code == 429 or code == 502 or code == 503 or code == 504}
return: true
next: log_call_gaarf_cf_rety
- otherwise:
return: false
steps:
- log_call_gaarf_cf_terminate:
call: sys.log
args:
text: "Non repeatable error, breaking"
severity: "ERROR"
- return_false:
return: false
- log_call_gaarf_cf_rety:
call: sys.log
args:
text: "Retrying"
severity: "WARNING"
- return_true:
return: true
4 changes: 3 additions & 1 deletion gcp/workflow/workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ main:
bq_dataset_location: ${bq_dataset_location}
macros: ${map.get(args, "ads_macro")}
bq_writer_options: ${map.get(args, "bq_writer_options")}
output_path: ${map.get(args, "output_path")}
concurrency_limit: ${concurrency_limit}
workflow_ads_id: ${workflow_ads_id}
disable_strict_views: ${disable_strict_views}
Expand Down Expand Up @@ -70,7 +71,7 @@ main:
return: ${accounts}

runAdsQueries:
params: [project, location, function_name, gcs_bucket, queries_path, ads_config_path, cid, cid_ignore, customer_ids_query, customer_ids_offset, customer_ids_batchsize, bq_dataset, bq_dataset_location, macros, bq_writer_options, concurrency_limit, workflow_ads_id, disable_strict_views]
params: [project, location, function_name, gcs_bucket, queries_path, ads_config_path, cid, cid_ignore, customer_ids_query, customer_ids_offset, customer_ids_batchsize, bq_dataset, bq_dataset_location, macros, bq_writer_options, output_path, concurrency_limit, workflow_ads_id, disable_strict_views]
# NOTE: currently it's assumed that CF's project is the same as project for BQ datasets
steps:
# get CF 'gaarf-getcids' function's URL
Expand Down Expand Up @@ -181,6 +182,7 @@ runAdsQueries:
bq_dataset_location: ${bq_dataset_location}
macros: ${macros}
bq_writer_options: ${bq_writer_options}
output_path: ${output_path}
concurrency_limit: ${concurrency_limit}
result: execution_result

Expand Down

0 comments on commit 777d505

Please sign in to comment.