Skip to content

Commit

Permalink
Allow manifest, task fifo and done fifo to be prefixed so we can debu…
Browse files Browse the repository at this point in the history
…g multiple services at the same time.
  • Loading branch information
cccs-sgaron committed Jun 2, 2020
1 parent 6cc8c02 commit bc10f8e
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions assemblyline_service_client/task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
DEFAULT_API_KEY = 'ThisIsARandomAuthKey...ChangeMe!'
SUPPORTED_API = 'v1'
TASK_REQUEST_TIMEOUT = int(os.environ.get('TASK_REQUEST_TIMEOUT', 30))
TASK_FIFO_PATH = "/tmp/task.fifo"
DONE_FIFO_PATH = "/tmp/done.fifo"


# The number of tasks a service will complete before stopping, letting the environment start a new container.
# By default there is no limit, but this lets the orchestration environment set one
Expand All @@ -50,12 +49,14 @@ def __init__(self, shutdown_timeout=SHUTDOWN_SECONDS_LIMIT, api_host=None, api_k
container_id=None, register_only=False, container_mode=False):
super().__init__('assemblyline.service.task_handler', shutdown_timeout=shutdown_timeout)

self.service_manifest_yml = os.path.join(tempfile.gettempdir(), 'service_manifest.yml')
self.service_manifest_yml = f"/tmp/{os.environ.get('RUNTIME_PREFIX', 'service')}_manifest.yml"

self.status = None
self.register_only = register_only
self.container_mode = container_mode
self.wait_start = None
self.task_fifo_path = f"/tmp/{os.environ.get('RUNTIME_PREFIX', 'service')}_task.fifo"
self.done_fifo_path = f"/tmp/{os.environ.get('RUNTIME_PREFIX', 'service')}_done.fifo"
self.task_fifo = None
self.done_fifo = None
self.tasks_processed = 0
Expand Down Expand Up @@ -157,11 +158,10 @@ def update_service_manifest(self, data):
yaml.safe_dump(self.service_manifest_data, yml_fh)

# noinspection PyBroadException
@staticmethod
def cleanup_working_directory(folder_path):
def cleanup_working_directory(self, folder_path):
for file in os.listdir(folder_path):
file_path = os.path.join(folder_path, file)
if file_path != TASK_FIFO_PATH or file_path != DONE_FIFO_PATH:
if file_path != self.task_fifo_path or file_path != self.done_fifo_path:
try:
if os.path.isfile(file_path):
os.unlink(file_path)
Expand Down Expand Up @@ -287,19 +287,19 @@ def try_run(self):
def connect_pipes(self):
# Start task receiving fifo
self.log.info('Waiting for receive task named pipe to be ready...')
while not os.path.exists(TASK_FIFO_PATH):
while not os.path.exists(self.task_fifo_path):
if not self.running:
return
time.sleep(1)
self.task_fifo = open(TASK_FIFO_PATH, "w")
self.task_fifo = open(self.task_fifo_path, "w")

# Start task completing fifo
self.log.info('Waiting for complete task named pipe to be ready...')
while not os.path.exists(DONE_FIFO_PATH):
while not os.path.exists(self.done_fifo_path):
if not self.running:
return
time.sleep(1)
self.done_fifo = open(DONE_FIFO_PATH, "r")
self.done_fifo = open(self.done_fifo_path, "r")

def initialize_service(self):
self.status = STATUSES.INITIALIZING
Expand Down

0 comments on commit bc10f8e

Please sign in to comment.