diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts index e089d3b2d8785..92f2dba988575 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts @@ -1344,15 +1344,15 @@ describe('TaskClaiming', () => { expect(result.docs.length).toEqual(3); }); - test('should assign startedAt value if bulkGet returns task with null startedAt', async () => { + test('should skip tasks where bulkGet returns a newer task document than the bulkPartialUpdate', async () => { const store = taskStoreMock.create({ taskManagerId: 'test-test' }); store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`)); const fetchedTasks = [ - mockInstance({ id: `id-1`, taskType: 'report' }), - mockInstance({ id: `id-2`, taskType: 'report' }), - mockInstance({ id: `id-3`, taskType: 'yawn' }), - mockInstance({ id: `id-4`, taskType: 'report' }), + mockInstance({ id: `id-1`, taskType: 'report', version: '123' }), + mockInstance({ id: `id-2`, taskType: 'report', version: '123' }), + mockInstance({ id: `id-3`, taskType: 'yawn', version: '123' }), + mockInstance({ id: `id-4`, taskType: 'report', version: '123' }), ]; const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); @@ -1365,7 +1365,7 @@ describe('TaskClaiming', () => { ); store.bulkGet.mockResolvedValueOnce([ asOk({ ...fetchedTasks[0], startedAt: new Date() }), - asOk(fetchedTasks[1]), + asOk({ ...fetchedTasks[1], startedAt: new Date(), version: 'abc' }), asOk({ ...fetchedTasks[2], startedAt: new Date() }), asOk({ ...fetchedTasks[3], startedAt: new Date() }), ]); @@ -1399,11 +1399,11 @@ describe('TaskClaiming', () => { expect(mockApmTrans.end).toHaveBeenCalledWith('success'); expect(taskManagerLogger.debug).toHaveBeenCalledWith( - 'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', + 'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); expect(taskManagerLogger.warn).toHaveBeenCalledWith( - 'Task id-2 has a null startedAt value, setting to current time - ownerId null, status idle', + 'Task id-2 was modified during the claiming phase, skipping until the next claiming cycle.', { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); @@ -1463,14 +1463,14 @@ describe('TaskClaiming', () => { expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-2', 'id-3', 'id-4']); expect(result.stats).toEqual({ - tasksClaimed: 4, - tasksConflicted: 0, + tasksClaimed: 3, + tasksConflicted: 1, tasksErrors: 0, - tasksUpdated: 4, + tasksUpdated: 3, tasksLeftUnclaimed: 0, staleTasks: 0, }); - expect(result.docs.length).toEqual(4); + expect(result.docs.length).toEqual(3); for (const r of result.docs) { expect(r.startedAt).not.toBeNull(); } diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index 4e74454e8c982..cd3efdc783008 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -195,7 +195,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise = {}; let conflicts = 0; let bulkUpdateErrors = 0; let bulkGetErrors = 0; @@ -203,7 +203,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise( - (acc, task) => { - if (isOk(task)) { - acc.push(task.value); - } else { - const { id, type, error } = task.error; - logger.error(`Error getting full task ${id}:${type} during claim: ${error.message}`); - bulkGetErrors++; - } - return acc; - }, - [] - ); - - // Look for tasks that have a null startedAt value, log them and manually set a startedAt field - for (const task of fullTasksToRun) { - if (task.startedAt == null) { + const fullTasksToRun = (await taskStore.bulkGet(Object.keys(updatedTasks))).reduce< + ConcreteTaskInstance[] + >((acc, task) => { + if (isOk(task) && task.value.version !== updatedTasks[task.value.id].version) { logger.warn( - `Task ${task.id} has a null startedAt value, setting to current time - ownerId ${task.ownerId}, status ${task.status}` + `Task ${task.value.id} was modified during the claiming phase, skipping until the next claiming cycle.` ); - task.startedAt = now; + conflicts++; + } else if (isOk(task)) { + acc.push(task.value); + } else { + const { id, type, error } = task.error; + logger.error(`Error getting full task ${id}:${type} during claim: ${error.message}`); + bulkGetErrors++; } - } + return acc; + }, []); // separate update for removed tasks; shouldn't happen often, so unlikely // a performance concern, and keeps the rest of the logic simpler diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index 2238381552861..cbb1c44dde3fc 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -1020,7 +1020,10 @@ describe('TaskStore', () => { refresh: false, }); - expect(result).toEqual([asOk(task)]); + expect(result).toEqual([ + // New version returned after update + asOk({ ...task, version: 'Wzg0LDFd' }), + ]); }); test(`should perform partial update with minimal fields`, async () => { @@ -1062,7 +1065,8 @@ describe('TaskStore', () => { refresh: false, }); - expect(result).toEqual([asOk(task)]); + // New version returned after update + expect(result).toEqual([asOk({ ...task, version: 'Wzg0LDFd' })]); }); test(`should perform partial update with no version`, async () => { @@ -1100,7 +1104,8 @@ describe('TaskStore', () => { refresh: false, }); - expect(result).toEqual([asOk(task)]); + // New version returned after update + expect(result).toEqual([asOk({ ...task, version: 'Wzg0LDFd' })]); }); test(`should gracefully handle errors within the response`, async () => { @@ -1183,7 +1188,8 @@ describe('TaskStore', () => { }); expect(result).toEqual([ - asOk(task1), + // New version returned after update + asOk({ ...task1, version: 'Wzg0LDFd' }), asErr({ type: 'task', id: '45343254', @@ -1267,7 +1273,8 @@ describe('TaskStore', () => { }); expect(result).toEqual([ - asOk(task1), + // New version returned after update + asOk({ ...task1, version: 'Wzg0LDFd' }), asErr({ type: 'task', id: 'unknown', diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index b7f1cec3f5567..0946c5c18d328 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -26,7 +26,7 @@ import { ElasticsearchClient, } from '@kbn/core/server'; -import { decodeRequestVersion } from '@kbn/core-saved-objects-base-server-internal'; +import { decodeRequestVersion, encodeVersion } from '@kbn/core-saved-objects-base-server-internal'; import { RequestTimeoutsConfig } from './config'; import { asOk, asErr, Result } from './lib/result_type'; @@ -427,6 +427,7 @@ export class TaskStore { return asOk({ ...doc, id: docId, + version: encodeVersion(item.update._seq_no, item.update._primary_term), }); }); }