Skip to content

Commit

Permalink
Change data structure so we can conditionally reap waiting jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanCoding committed Aug 17, 2022
1 parent 7645cc2 commit e0c59d1
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
8 changes: 4 additions & 4 deletions awx/main/dispatch/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,12 +441,12 @@ def add_bind_kwargs(self, body):
body.setdefault('kwargs', {})
if 'dispatch_time' in bind_kwargs:
body['kwargs']['dispatch_time'] = tz_now().isoformat()
if 'active_task_ids' in bind_kwargs:
active_task_ids = []
if 'worker_tasks' in bind_kwargs:
worker_tasks = {}
for worker in self.workers:
worker.calculate_managed_tasks()
active_task_ids.extend(list(worker.managed_tasks.keys()))
body['kwargs']['active_task_ids'] = active_task_ids
worker_tasks[worker.pid] = list(worker.managed_tasks.keys())
body['kwargs']['worker_tasks'] = worker_tasks

def up(self):
if self.full:
Expand Down
13 changes: 9 additions & 4 deletions awx/main/tasks/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,8 @@ def inspect_execution_nodes(instance_list):
execution_node_health_check.apply_async([hostname])


@task(queue=get_local_queuename, bind_kwargs=['dispatch_time', 'active_task_ids'])
def cluster_node_heartbeat(dispatch_time=None, active_task_ids=None):
@task(queue=get_local_queuename, bind_kwargs=['dispatch_time', 'worker_tasks'])
def cluster_node_heartbeat(dispatch_time=None, worker_tasks=None):
logger.debug("Cluster node heartbeat task.")
nowtime = now()
instance_list = list(Instance.objects.all())
Expand Down Expand Up @@ -564,8 +564,13 @@ def cluster_node_heartbeat(dispatch_time=None, active_task_ids=None):
logger.exception('Error marking {} as lost'.format(other_inst.hostname))

# Run local reaper
if active_task_ids is not None:
reaper.reap(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time))
if worker_tasks is not None:
active_task_ids = []
for task_list in worker_tasks.values():
active_task_ids.extend(task_list)
reaper.reap(instance=this_inst, excluded_uuids=active_task_ids)
if max(len(task_list) for task_list in worker_tasks.values()) <= 1:
reaper.reap_waiting(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time))


@task(queue=get_local_queuename)
Expand Down

0 comments on commit e0c59d1

Please sign in to comment.