From 33c2cdc5571e6c2f396e8170f520af39766733f7 Mon Sep 17 00:00:00 2001 From: Mike Cote Date: Tue, 29 Oct 2024 14:58:18 -0400 Subject: [PATCH] Initial commit --- .../server/task_claimers/strategy_mget.ts | 154 ++++++++++-------- 1 file changed, 88 insertions(+), 66 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index 4b7e5ec6b3691..2b11111087039 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -92,7 +92,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise `task:${doc.id}`)); // filter out stale, missing and removed tasks - const currentTasks: ConcreteTaskInstance[] = []; + const tasksToClaim: ConcreteTaskInstance[] = []; const staleTasks: ConcreteTaskInstance[] = []; const missingTasks: ConcreteTaskInstance[] = []; const removedTasks: ConcreteTaskInstance[] = []; @@ -148,7 +148,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise( - (acc, task) => { + // perform an mget to get the full task instance for claiming + const partialFullTasksToRun = (await taskStore.bulkGet(updatedTaskIds)).reduce< + ConcreteTaskInstance[] + >((acc, task) => { if (isOk(task)) { acc.push(task.value); } else { @@ -228,17 +241,26 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise item.id === task.id); + if (index !== -1) { + tasksToClaim.splice(index, 1); + } + const taskCost = definitions.get(task.taskType)?.cost ?? TaskCost.Normal; + initialCapacity -= taskCost; + } + + // Look for tasks that have a null startedAt value, log them and manually set a startedAt field + for (const task of fullTasksToRun) { + if (task.startedAt == null) { + logger.warn( + `Task ${task.id} has a null startedAt value, setting to current time - ownerId ${task.ownerId}, status ${task.status}` + ); + task.startedAt = now; + } } }