From 7f72a50e3f2bba61ab6cdd20d28cab0dd3fe66ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20C=C3=B4t=C3=A9?= Date: Wed, 6 Nov 2024 22:15:31 -0500 Subject: [PATCH] Skip claiming tasks that were modified during the task claiming phase (#198711) Resolves https://github.com/elastic/kibana/issues/196300 In this PR, I'm removing the fallback we had when `startedAt` value is missing (https://github.com/elastic/kibana/pull/194759) in favour of dropping the task document from the claiming cycle. The additional logs that were added showed that tasks that were missing a `startedAt` value was because they were being re-created during the exact same time they were being claimed, causing the task to have an `idle` status and a missing `startedAt` value. Given the scenario, it feels better to drop them from getting claimed and to instead try again at the next claim cycle where the race condition shouldn't occur. (cherry picked from commit a19dd8ea97bae73dc69ba71e74917e80f091a8a1) --- .../task_claimers/strategy_mget.test.ts | 24 ++++++------ .../server/task_claimers/strategy_mget.ts | 38 ++++++++----------- .../task_manager/server/task_store.test.ts | 17 ++++++--- .../plugins/task_manager/server/task_store.ts | 3 +- 4 files changed, 42 insertions(+), 40 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts index e089d3b2d8785..92f2dba988575 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts @@ -1344,15 +1344,15 @@ describe('TaskClaiming', () => { expect(result.docs.length).toEqual(3); }); - test('should assign startedAt value if bulkGet returns task with null startedAt', async () => { + test('should skip tasks where bulkGet returns a newer task document than the bulkPartialUpdate', async () => { const store = taskStoreMock.create({ taskManagerId: 'test-test' }); store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`)); const fetchedTasks = [ - mockInstance({ id: `id-1`, taskType: 'report' }), - mockInstance({ id: `id-2`, taskType: 'report' }), - mockInstance({ id: `id-3`, taskType: 'yawn' }), - mockInstance({ id: `id-4`, taskType: 'report' }), + mockInstance({ id: `id-1`, taskType: 'report', version: '123' }), + mockInstance({ id: `id-2`, taskType: 'report', version: '123' }), + mockInstance({ id: `id-3`, taskType: 'yawn', version: '123' }), + mockInstance({ id: `id-4`, taskType: 'report', version: '123' }), ]; const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); @@ -1365,7 +1365,7 @@ describe('TaskClaiming', () => { ); store.bulkGet.mockResolvedValueOnce([ asOk({ ...fetchedTasks[0], startedAt: new Date() }), - asOk(fetchedTasks[1]), + asOk({ ...fetchedTasks[1], startedAt: new Date(), version: 'abc' }), asOk({ ...fetchedTasks[2], startedAt: new Date() }), asOk({ ...fetchedTasks[3], startedAt: new Date() }), ]); @@ -1399,11 +1399,11 @@ describe('TaskClaiming', () => { expect(mockApmTrans.end).toHaveBeenCalledWith('success'); expect(taskManagerLogger.debug).toHaveBeenCalledWith( - 'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', + 'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(taskManagerLogger.warn).toHaveBeenCalledWith( - 'Task id-2 has a null startedAt value, setting to current time - ownerId null, status idle', + 'Task id-2 was modified during the claiming phase, skipping until the next claiming cycle.', { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); @@ -1463,14 +1463,14 @@ describe('TaskClaiming', () => { expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-2', 'id-3', 'id-4']); expect(result.stats).toEqual({ - tasksClaimed: 4, - tasksConflicted: 0, + tasksClaimed: 3, + tasksConflicted: 1, tasksErrors: 0, - tasksUpdated: 4, + tasksUpdated: 3, tasksLeftUnclaimed: 0, staleTasks: 0, }); - expect(result.docs.length).toEqual(4); + expect(result.docs.length).toEqual(3); for (const r of result.docs) { expect(r.startedAt).not.toBeNull(); } diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index 4e74454e8c982..cd3efdc783008 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -195,7 +195,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise = {}; let conflicts = 0; let bulkUpdateErrors = 0; let bulkGetErrors = 0; @@ -203,7 +203,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise( - (acc, task) => { - if (isOk(task)) { - acc.push(task.value); - } else { - const { id, type, error } = task.error; - logger.error(`Error getting full task ${id}:${type} during claim: ${error.message}`); - bulkGetErrors++; - } - return acc; - }, - [] - ); - - // Look for tasks that have a null startedAt value, log them and manually set a startedAt field - for (const task of fullTasksToRun) { - if (task.startedAt == null) { + const fullTasksToRun = (await taskStore.bulkGet(Object.keys(updatedTasks))).reduce< + ConcreteTaskInstance[] + >((acc, task) => { + if (isOk(task) && task.value.version !== updatedTasks[task.value.id].version) { logger.warn( - `Task ${task.id} has a null startedAt value, setting to current time - ownerId ${task.ownerId}, status ${task.status}` + `Task ${task.value.id} was modified during the claiming phase, skipping until the next claiming cycle.` ); - task.startedAt = now; + conflicts++; + } else if (isOk(task)) { + acc.push(task.value); + } else { + const { id, type, error } = task.error; + logger.error(`Error getting full task ${id}:${type} during claim: ${error.message}`); + bulkGetErrors++; } - } + return acc; + }, []); // separate update for removed tasks; shouldn't happen often, so unlikely // a performance concern, and keeps the rest of the logic simpler diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index 2238381552861..cbb1c44dde3fc 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -1020,7 +1020,10 @@ describe('TaskStore', () => { refresh: false, }); - expect(result).toEqual([asOk(task)]); + expect(result).toEqual([ + // New version returned after update + asOk({ ...task, version: 'Wzg0LDFd' }), + ]); }); test(`should perform partial update with minimal fields`, async () => { @@ -1062,7 +1065,8 @@ describe('TaskStore', () => { refresh: false, }); - expect(result).toEqual([asOk(task)]); + // New version returned after update + expect(result).toEqual([asOk({ ...task, version: 'Wzg0LDFd' })]); }); test(`should perform partial update with no version`, async () => { @@ -1100,7 +1104,8 @@ describe('TaskStore', () => { refresh: false, }); - expect(result).toEqual([asOk(task)]); + // New version returned after update + expect(result).toEqual([asOk({ ...task, version: 'Wzg0LDFd' })]); }); test(`should gracefully handle errors within the response`, async () => { @@ -1183,7 +1188,8 @@ describe('TaskStore', () => { }); expect(result).toEqual([ - asOk(task1), + // New version returned after update + asOk({ ...task1, version: 'Wzg0LDFd' }), asErr({ type: 'task', id: '45343254', @@ -1267,7 +1273,8 @@ describe('TaskStore', () => { }); expect(result).toEqual([ - asOk(task1), + // New version returned after update + asOk({ ...task1, version: 'Wzg0LDFd' }), asErr({ type: 'task', id: 'unknown', diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index b7f1cec3f5567..0946c5c18d328 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -26,7 +26,7 @@ import { ElasticsearchClient, } from '@kbn/core/server'; -import { decodeRequestVersion } from '@kbn/core-saved-objects-base-server-internal'; +import { decodeRequestVersion, encodeVersion } from '@kbn/core-saved-objects-base-server-internal'; import { RequestTimeoutsConfig } from './config'; import { asOk, asErr, Result } from './lib/result_type'; @@ -427,6 +427,7 @@ export class TaskStore { return asOk({ ...doc, id: docId, + version: encodeVersion(item.update._seq_no, item.update._primary_term), }); }); }