diff --git a/assemblyline_service_client/task_handler.py b/assemblyline_service_client/task_handler.py index 3984464..1a83e45 100644 --- a/assemblyline_service_client/task_handler.py +++ b/assemblyline_service_client/task_handler.py @@ -33,8 +33,7 @@ DEFAULT_API_KEY = 'ThisIsARandomAuthKey...ChangeMe!' SUPPORTED_API = 'v1' TASK_REQUEST_TIMEOUT = int(os.environ.get('TASK_REQUEST_TIMEOUT', 30)) -TASK_FIFO_PATH = "/tmp/task.fifo" -DONE_FIFO_PATH = "/tmp/done.fifo" + # The number of tasks a service will complete before stopping, letting the environment start a new container. # By default there is no limit, but this lets the orchestration environment set one @@ -50,12 +49,14 @@ def __init__(self, shutdown_timeout=SHUTDOWN_SECONDS_LIMIT, api_host=None, api_k container_id=None, register_only=False, container_mode=False): super().__init__('assemblyline.service.task_handler', shutdown_timeout=shutdown_timeout) - self.service_manifest_yml = os.path.join(tempfile.gettempdir(), 'service_manifest.yml') + self.service_manifest_yml = f"/tmp/{os.environ.get('RUNTIME_PREFIX', 'service')}_manifest.yml" self.status = None self.register_only = register_only self.container_mode = container_mode self.wait_start = None + self.task_fifo_path = f"/tmp/{os.environ.get('RUNTIME_PREFIX', 'service')}_task.fifo" + self.done_fifo_path = f"/tmp/{os.environ.get('RUNTIME_PREFIX', 'service')}_done.fifo" self.task_fifo = None self.done_fifo = None self.tasks_processed = 0 @@ -157,11 +158,10 @@ def update_service_manifest(self, data): yaml.safe_dump(self.service_manifest_data, yml_fh) # noinspection PyBroadException - @staticmethod - def cleanup_working_directory(folder_path): + def cleanup_working_directory(self, folder_path): for file in os.listdir(folder_path): file_path = os.path.join(folder_path, file) - if file_path != TASK_FIFO_PATH or file_path != DONE_FIFO_PATH: + if file_path != self.task_fifo_path or file_path != self.done_fifo_path: try: if os.path.isfile(file_path): os.unlink(file_path) @@ -287,19 +287,19 @@ def try_run(self): def connect_pipes(self): # Start task receiving fifo self.log.info('Waiting for receive task named pipe to be ready...') - while not os.path.exists(TASK_FIFO_PATH): + while not os.path.exists(self.task_fifo_path): if not self.running: return time.sleep(1) - self.task_fifo = open(TASK_FIFO_PATH, "w") + self.task_fifo = open(self.task_fifo_path, "w") # Start task completing fifo self.log.info('Waiting for complete task named pipe to be ready...') - while not os.path.exists(DONE_FIFO_PATH): + while not os.path.exists(self.done_fifo_path): if not self.running: return time.sleep(1) - self.done_fifo = open(DONE_FIFO_PATH, "r") + self.done_fifo = open(self.done_fifo_path, "r") def initialize_service(self): self.status = STATUSES.INITIALIZING