From e0c59d12c1eff78533d8a329e1f4a4ee7c037814 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Wed, 17 Aug 2022 16:00:30 -0400 Subject: [PATCH] Change data structure so we can conditionally reap waiting jobs --- awx/main/dispatch/pool.py | 8 ++++---- awx/main/tasks/system.py | 13 +++++++++---- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index 6a65df95ded0..527b4b52fbae 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -441,12 +441,12 @@ def add_bind_kwargs(self, body): body.setdefault('kwargs', {}) if 'dispatch_time' in bind_kwargs: body['kwargs']['dispatch_time'] = tz_now().isoformat() - if 'active_task_ids' in bind_kwargs: - active_task_ids = [] + if 'worker_tasks' in bind_kwargs: + worker_tasks = {} for worker in self.workers: worker.calculate_managed_tasks() - active_task_ids.extend(list(worker.managed_tasks.keys())) - body['kwargs']['active_task_ids'] = active_task_ids + worker_tasks[worker.pid] = list(worker.managed_tasks.keys()) + body['kwargs']['worker_tasks'] = worker_tasks def up(self): if self.full: diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 598b3a680fa4..d4f067115e1c 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -483,8 +483,8 @@ def inspect_execution_nodes(instance_list): execution_node_health_check.apply_async([hostname]) -@task(queue=get_local_queuename, bind_kwargs=['dispatch_time', 'active_task_ids']) -def cluster_node_heartbeat(dispatch_time=None, active_task_ids=None): +@task(queue=get_local_queuename, bind_kwargs=['dispatch_time', 'worker_tasks']) +def cluster_node_heartbeat(dispatch_time=None, worker_tasks=None): logger.debug("Cluster node heartbeat task.") nowtime = now() instance_list = list(Instance.objects.all()) @@ -564,8 +564,13 @@ def cluster_node_heartbeat(dispatch_time=None, active_task_ids=None): logger.exception('Error marking {} as lost'.format(other_inst.hostname)) # Run local reaper - if active_task_ids is not None: - reaper.reap(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time)) + if worker_tasks is not None: + active_task_ids = [] + for task_list in worker_tasks.values(): + active_task_ids.extend(task_list) + reaper.reap(instance=this_inst, excluded_uuids=active_task_ids) + if max(len(task_list) for task_list in worker_tasks.values()) <= 1: + reaper.reap_waiting(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time)) @task(queue=get_local_queuename)