diff --git a/x-pack/plugins/task_manager/server/polling/task_poller.test.ts b/x-pack/plugins/task_manager/server/polling/task_poller.test.ts index b878c25c36906..12b68f9526a22 100644 --- a/x-pack/plugins/task_manager/server/polling/task_poller.test.ts +++ b/x-pack/plugins/task_manager/server/polling/task_poller.test.ts @@ -310,6 +310,52 @@ describe('TaskPoller', () => { expect(handler).toHaveBeenCalledWith(asOk(3)); }); + test('continues polling if getCapacity throws error fails', async () => { + const pollInterval = 100; + + const handler = jest.fn(); + let callCount = 0; + const work = jest.fn(async () => callCount); + const poller = createTaskPoller({ + initialPollInterval: pollInterval, + logger: loggingSystemMock.create().get(), + pollInterval$: of(pollInterval), + pollIntervalDelay$: of(0), + work, + getCapacity: () => { + callCount++; + if (callCount === 2) { + throw new Error('error getting capacity'); + } + return 2; + }, + }); + poller.events$.subscribe(handler); + poller.start(); + + clock.tick(pollInterval); + await new Promise((resolve) => setImmediate(resolve)); + + expect(handler).toHaveBeenCalledWith(asOk(1)); + + clock.tick(pollInterval); + await new Promise((resolve) => setImmediate(resolve)); + + const expectedError = new PollingError( + 'Failed to poll for work: Error: error getting capacity', + PollingErrorType.WorkError, + none + ); + expect(handler).toHaveBeenCalledWith(asErr(expectedError)); + expect(handler.mock.calls[1][0].error.type).toEqual(PollingErrorType.WorkError); + expect(handler).not.toHaveBeenCalledWith(asOk(2)); + + clock.tick(pollInterval); + await new Promise((resolve) => setImmediate(resolve)); + + expect(handler).toHaveBeenCalledWith(asOk(3)); + }); + test(`doesn't start polling until start is called`, async () => { const pollInterval = 100; diff --git a/x-pack/plugins/task_manager/server/polling/task_poller.ts b/x-pack/plugins/task_manager/server/polling/task_poller.ts index 64d17fad2f81a..038cd48ac527e 100644 --- a/x-pack/plugins/task_manager/server/polling/task_poller.ts +++ b/x-pack/plugins/task_manager/server/polling/task_poller.ts @@ -61,14 +61,15 @@ export function createTaskPoller({ async function runCycle() { timeoutId = null; const start = Date.now(); - if (hasCapacity()) { - try { + try { + if (hasCapacity()) { const result = await work(); subject.next(asOk(result)); - } catch (e) { - subject.next(asPollingError(e, PollingErrorType.WorkError)); } + } catch (e) { + subject.next(asPollingError(e, PollingErrorType.WorkError)); } + if (running) { // Set the next runCycle call timeoutId = setTimeout( diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts index 3919860d27061..593be2d5497ec 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts @@ -1343,6 +1343,137 @@ describe('TaskClaiming', () => { expect(result.docs.length).toEqual(3); }); + test('should assign startedAt value if bulkGet returns task with null startedAt', async () => { + const store = taskStoreMock.create({ taskManagerId: 'test-test' }); + store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`)); + + const fetchedTasks = [ + mockInstance({ id: `id-1`, taskType: 'report' }), + mockInstance({ id: `id-2`, taskType: 'report' }), + mockInstance({ id: `id-3`, taskType: 'yawn' }), + mockInstance({ id: `id-4`, taskType: 'report' }), + ]; + + const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); + store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); + store.getDocVersions.mockResolvedValueOnce(docLatestVersions); + store.bulkPartialUpdate.mockResolvedValueOnce( + [fetchedTasks[0], fetchedTasks[1], fetchedTasks[2], fetchedTasks[3]].map( + getPartialUpdateResult + ) + ); + store.bulkGet.mockResolvedValueOnce([ + asOk({ ...fetchedTasks[0], startedAt: new Date() }), + asOk(fetchedTasks[1]), + asOk({ ...fetchedTasks[2], startedAt: new Date() }), + asOk({ ...fetchedTasks[3], startedAt: new Date() }), + ]); + + const taskClaiming = new TaskClaiming({ + logger: taskManagerLogger, + strategy: CLAIM_STRATEGY_MGET, + definitions: taskDefinitions, + taskStore: store, + excludedTaskTypes: [], + unusedTypes: [], + maxAttempts: 2, + getAvailableCapacity: () => 10, + taskPartitioner, + }); + + const [resultOrErr] = await getAllAsPromise( + taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() }) + ); + + if (!isOk(resultOrErr)) { + expect(resultOrErr).toBe(undefined); + } + + const result = unwrap(resultOrErr) as ClaimOwnershipResult; + + expect(apm.startTransaction).toHaveBeenCalledWith( + TASK_MANAGER_MARK_AS_CLAIMED, + TASK_MANAGER_TRANSACTION_TYPE + ); + expect(mockApmTrans.end).toHaveBeenCalledWith('success'); + + expect(taskManagerLogger.debug).toHaveBeenCalledWith( + 'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', + { tags: ['taskClaiming', 'claimAvailableTasksMget'] } + ); + expect(taskManagerLogger.warn).toHaveBeenCalledWith( + 'Task id-2 has a null startedAt value, setting to current time - ownerId null, status idle', + { tags: ['taskClaiming', 'claimAvailableTasksMget'] } + ); + + expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ + size: 40, + seq_no_primary_term: true, + }); + expect(store.getDocVersions).toHaveBeenCalledWith([ + 'task:id-1', + 'task:id-2', + 'task:id-3', + 'task:id-4', + ]); + expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(1); + expect(store.bulkPartialUpdate).toHaveBeenCalledWith([ + { + id: fetchedTasks[0].id, + version: fetchedTasks[0].version, + scheduledAt: fetchedTasks[0].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[1].id, + version: fetchedTasks[1].version, + scheduledAt: fetchedTasks[1].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[2].id, + version: fetchedTasks[2].version, + scheduledAt: fetchedTasks[2].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + { + id: fetchedTasks[3].id, + version: fetchedTasks[3].version, + scheduledAt: fetchedTasks[3].runAt, + attempts: 1, + ownerId: 'test-test', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), + }, + ]); + expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-2', 'id-3', 'id-4']); + + expect(result.stats).toEqual({ + tasksClaimed: 4, + tasksConflicted: 0, + tasksErrors: 0, + tasksUpdated: 4, + tasksLeftUnclaimed: 0, + }); + expect(result.docs.length).toEqual(4); + for (const r of result.docs) { + expect(r.startedAt).not.toBeNull(); + } + }); + test('should throw when error when bulk getting all full task docs', async () => { const store = taskStoreMock.create({ taskManagerId: 'test-test' }); store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`)); @@ -2295,7 +2426,7 @@ describe('TaskClaiming', () => { user: 'dabo', scope: ['reporting', 'ceo'], ownerId: taskManagerId, - startedAt: null, + startedAt: new Date(), retryAt: null, scheduledAt: new Date(), traceparent: 'newParent', diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index 432d7f183ce39..aa69742998c74 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -247,6 +247,16 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise