Skip to content

Commit

Permalink
Updated RetryOnExceptionError class and exception_response_handler fu…
Browse files Browse the repository at this point in the history
…nction

- Implement code updates and improvements
  • Loading branch information
julian-gula committed Sep 27, 2024
1 parent 379df90 commit 70da240
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
9 changes: 6 additions & 3 deletions frinx/common/worker/exception.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import os
from typing import Any
from typing import TypeAlias

from frinx.common.conductor_enums import TaskResultStatus
from frinx.common.worker.task_result import TaskResult

RawTaskIO: TypeAlias = dict[str, Any]

class RetryOnExceptionError(Exception):
"""
Expand All @@ -23,7 +26,7 @@ def __init__(self, exception: Exception, retry_delay_seconds: int, max_retries:
self.max_retries = max_retries or int(os.getenv('WORKFLOW_TASK_MAX_RETRIES', '3'))
self.caught_exception: Exception = exception

def update_task_result(self, task, task_result: TaskResult) -> TaskResult:
def update_task_result(self, task: RawTaskIO, task_result: TaskResult[Any]) -> TaskResult[Any]:
"""
Updates the task result based on the current poll count and retry logic.
Expand All @@ -38,7 +41,7 @@ def update_task_result(self, task, task_result: TaskResult) -> TaskResult:

if self._should_retry(current_poll_count):
task_result.status = TaskResultStatus.IN_PROGRESS
task["callbackAfterSeconds"] = self.retry_delay_seconds
task['callbackAfterSeconds'] = self.retry_delay_seconds
else:
task_result.status = TaskResultStatus.FAILED

Expand All @@ -49,7 +52,7 @@ def _should_retry(self, current_poll_count: int) -> bool:
"""Determines if the task should be retried based on the current poll count."""
return current_poll_count < self.max_retries

def _log_task_status(self, task_result: TaskResult, current_poll_count: int) -> None:
def _log_task_status(self, task_result: TaskResult[Any], current_poll_count: int) -> None:
"""Logs the task status with the current poll count and exception details."""
error_name: str = type(self.caught_exception).__name__
error_info: str = str(self.caught_exception)
Expand Down
8 changes: 4 additions & 4 deletions frinx/common/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,17 +225,17 @@ def exception_response_handler(self, task: RawTaskIO, error: Exception, **kwargs
match error:

case RetryOnExceptionError() as retry_on_error:
error_name: str = retry_on_error.get_caught_exception_name
error_info: str = str(error)
error_name = retry_on_error.get_caught_exception_name
error_info: str | DictAny = str(error)
task_result = retry_on_error.update_task_result(task, task_result)

case ValidationError() as validation_error:
task_result.logs = [TaskExecLog(f'{error_name}: {error}')]
error_info: str | DictAny = self._validate_exception_format(validation_error)
error_info = self._validate_exception_format(validation_error)

case _:
task_result.logs = [TaskExecLog(f'{error_name}: {error}')]
error_info: str = str(error)
error_info = str(error)

error_dict = {'error_name': error_name, 'error_info': error_info}
error_dict_with_output_path = self._parse_exception_output_path_to_dict(
Expand Down

0 comments on commit 70da240

Please sign in to comment.