Skip to content

Commit

Permalink
added succeed with minor errors to NORM mode
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolasbisurgi authored and MariusWirtz committed Nov 1, 2024
1 parent 4915b67 commit 27a457a
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
24 changes: 19 additions & 5 deletions rushti.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
MSG_PROCESS_FAIL_WITH_ERROR_FILE = (
"Execution failed. Process: '{process}' with parameters: {parameters} with {retries} retries and status: "
"{status}, on instance: '{instance}'. Elapsed time : {time}. Error file: {error_file}")
MSG_PROCESS_HAS_MINOR_ERRORS = (
"Execution ended with minor errors but it was forced to succeed. Process: '{process}' with parameters: {parameters} with {retries} retries and status: "
"{status}, on instance: '{instance}'. Error file: {error_file}")
MSG_PROCESS_FAIL_UNEXPECTED = (
"Execution failed. Process: '{process}' with parameters: {parameters}. "
"Elapsed time: {time}. Error: {error}.")
Expand Down Expand Up @@ -198,6 +201,7 @@ def extract_task_from_line(line: str, task_class: Union[Type[Task], Type[Optimiz
else:
return Task(
instance_name=line_arguments.pop("instance"),
succeed_on_minor_errors=line_arguments.pop("succeed_on_minor_errors", False),
process_name=line_arguments.pop("process"),
parameters=line_arguments)

Expand Down Expand Up @@ -258,7 +262,7 @@ def expand_task(
require_predecessor_success=task.require_predecessor_success,
succeed_on_minor_errors = task.succeed_on_minor_errors))
elif isinstance(task, Task):
result.append(Task(task.instance_name, task.process_name, parameters=expanded_params))
result.append(Task(task.instance_name, task.process_name, parameters=expanded_params, succeed_on_minor_errors=task.succeed_on_minor_errors))
return result


Expand Down Expand Up @@ -482,11 +486,18 @@ def execute_process_with_retries(tm1: TM1Service, task: Task, retries: int):
process_name=task.process_name,
**task.parameters)

# Handle minor errors for OptimizedTask
if isinstance(task, OptimizedTask) and not success and task.succeed_on_minor_errors and status == 'HasMinorErrors':
# Handle minor errors
if not success and task.succeed_on_minor_errors and status == 'HasMinorErrors':
success = True
logging.debug(f"{task.id} returned {status} but has succeed_on_minor_errors={task.succeed_on_minor_errors}")

msg = MSG_PROCESS_HAS_MINOR_ERRORS.format(
process=task.process_name,
parameters=task.parameters,
status=status,
retries=retries,
instance=task.instance_name,
error_file=error_log_file)
logging.warning(msg)

if success:
return success, status, error_log_file, attempt

Expand Down Expand Up @@ -880,5 +891,8 @@ def exit_rushti(
exit_rushti(
overall_success=success,
executions=len(results),
successes=sum(results),
start_time=start,
end_time=end,
elapsed_time=duration)

11 changes: 6 additions & 5 deletions utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,20 @@ def __eq__(self, other):
class Task:
id = 1

def __init__(self, instance_name: str, process_name: str, parameters: Dict[str, Any] = None):
def __init__(self, instance_name: str, process_name: str, parameters: Dict[str, Any] = None, succeed_on_minor_errors: bool = False):
self.id = Task.id
self.instance_name = instance_name
self.process_name = process_name
self.parameters = parameters
self.succeed_on_minor_errors = succeed_on_minor_errors

Task.id = Task.id + 1

def translate_to_line(self):
return 'instance="{instance}" process="{process}" {parameters}\n'.format(
return 'instance="{instance}" process="{process}" succeed_on_minor_errors="{succeed_on_minor_errors}" {parameters}\n'.format(
instance=self.instance_name,
process=self.process_name,
succeed_on_minor_errors=self.succeed_on_minor_errors,
parameters=' '.join('{}="{}"'.format(parameter, value) for parameter, value in self.parameters.items()))


Expand All @@ -52,11 +54,10 @@ def __init__(self, task_id: str, instance_name: str, process_name: str, paramete
predecessors: List,
require_predecessor_success: bool,
succeed_on_minor_errors: bool = False):
super().__init__(instance_name, process_name, parameters)
super().__init__(instance_name, process_name, parameters, succeed_on_minor_errors)
self.id = task_id
self.predecessors = predecessors
self.require_predecessor_success = require_predecessor_success
self.succeed_on_minor_errors = succeed_on_minor_errors
self.successors = list()

@property
Expand Down Expand Up @@ -110,4 +111,4 @@ def flatten_to_list(object) -> list:
gather.append(item)
else:
gather.append(object)
return gather
return gather

0 comments on commit 27a457a

Please sign in to comment.