Skip to content

Commit

Permalink
[Response Ops][Task Manager] Handle errors in getCapacity function …
Browse files Browse the repository at this point in the history
…during task polling (elastic#194759)

## Summary

* Moves the `getCapacity` call during task polling within the try/catch
so any errors with this function will be caught and logged under the
`Failed to poll for work` message and polling continues
* During `mget` claim strategy, perform a final check after the
`bulkGet` to check for tasks with a null `startedAt` value. If any tasks
meet this condition, log some basic info and manually assign the
`startedAt`. This is a stop-gap measure to ensure we understand why we
might be seeing tasks with null `startedAt` values. In the future we may
choose to filter out these tasks from running in this cycle.
  • Loading branch information
ymao1 authored Oct 3, 2024
1 parent da4a872 commit 6827ba4
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 5 deletions.
46 changes: 46 additions & 0 deletions x-pack/plugins/task_manager/server/polling/task_poller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,52 @@ describe('TaskPoller', () => {
expect(handler).toHaveBeenCalledWith(asOk(3));
});

test('continues polling if getCapacity throws error fails', async () => {
const pollInterval = 100;

const handler = jest.fn();
let callCount = 0;
const work = jest.fn(async () => callCount);
const poller = createTaskPoller<string, number>({
initialPollInterval: pollInterval,
logger: loggingSystemMock.create().get(),
pollInterval$: of(pollInterval),
pollIntervalDelay$: of(0),
work,
getCapacity: () => {
callCount++;
if (callCount === 2) {
throw new Error('error getting capacity');
}
return 2;
},
});
poller.events$.subscribe(handler);
poller.start();

clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));

expect(handler).toHaveBeenCalledWith(asOk(1));

clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));

const expectedError = new PollingError<string>(
'Failed to poll for work: Error: error getting capacity',
PollingErrorType.WorkError,
none
);
expect(handler).toHaveBeenCalledWith(asErr(expectedError));
expect(handler.mock.calls[1][0].error.type).toEqual(PollingErrorType.WorkError);
expect(handler).not.toHaveBeenCalledWith(asOk(2));

clock.tick(pollInterval);
await new Promise((resolve) => setImmediate(resolve));

expect(handler).toHaveBeenCalledWith(asOk(3));
});

test(`doesn't start polling until start is called`, async () => {
const pollInterval = 100;

Expand Down
9 changes: 5 additions & 4 deletions x-pack/plugins/task_manager/server/polling/task_poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,15 @@ export function createTaskPoller<T, H>({
async function runCycle() {
timeoutId = null;
const start = Date.now();
if (hasCapacity()) {
try {
try {
if (hasCapacity()) {
const result = await work();
subject.next(asOk(result));
} catch (e) {
subject.next(asPollingError<T>(e, PollingErrorType.WorkError));
}
} catch (e) {
subject.next(asPollingError<T>(e, PollingErrorType.WorkError));
}

if (running) {
// Set the next runCycle call
timeoutId = setTimeout(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,137 @@ describe('TaskClaiming', () => {
expect(result.docs.length).toEqual(3);
});

test('should assign startedAt value if bulkGet returns task with null startedAt', async () => {
const store = taskStoreMock.create({ taskManagerId: 'test-test' });
store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`));

const fetchedTasks = [
mockInstance({ id: `id-1`, taskType: 'report' }),
mockInstance({ id: `id-2`, taskType: 'report' }),
mockInstance({ id: `id-3`, taskType: 'yawn' }),
mockInstance({ id: `id-4`, taskType: 'report' }),
];

const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks);
store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap });
store.getDocVersions.mockResolvedValueOnce(docLatestVersions);
store.bulkPartialUpdate.mockResolvedValueOnce(
[fetchedTasks[0], fetchedTasks[1], fetchedTasks[2], fetchedTasks[3]].map(
getPartialUpdateResult
)
);
store.bulkGet.mockResolvedValueOnce([
asOk({ ...fetchedTasks[0], startedAt: new Date() }),
asOk(fetchedTasks[1]),
asOk({ ...fetchedTasks[2], startedAt: new Date() }),
asOk({ ...fetchedTasks[3], startedAt: new Date() }),
]);

const taskClaiming = new TaskClaiming({
logger: taskManagerLogger,
strategy: CLAIM_STRATEGY_MGET,
definitions: taskDefinitions,
taskStore: store,
excludedTaskTypes: [],
unusedTypes: [],
maxAttempts: 2,
getAvailableCapacity: () => 10,
taskPartitioner,
});

const [resultOrErr] = await getAllAsPromise(
taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() })
);

if (!isOk<ClaimOwnershipResult, FillPoolResult>(resultOrErr)) {
expect(resultOrErr).toBe(undefined);
}

const result = unwrap(resultOrErr) as ClaimOwnershipResult;

expect(apm.startTransaction).toHaveBeenCalledWith(
TASK_MANAGER_MARK_AS_CLAIMED,
TASK_MANAGER_TRANSACTION_TYPE
);
expect(mockApmTrans.end).toHaveBeenCalledWith('success');

expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(taskManagerLogger.warn).toHaveBeenCalledWith(
'Task id-2 has a null startedAt value, setting to current time - ownerId null, status idle',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);

expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
size: 40,
seq_no_primary_term: true,
});
expect(store.getDocVersions).toHaveBeenCalledWith([
'task:id-1',
'task:id-2',
'task:id-3',
'task:id-4',
]);
expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(1);
expect(store.bulkPartialUpdate).toHaveBeenCalledWith([
{
id: fetchedTasks[0].id,
version: fetchedTasks[0].version,
scheduledAt: fetchedTasks[0].runAt,
attempts: 1,
ownerId: 'test-test',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
id: fetchedTasks[1].id,
version: fetchedTasks[1].version,
scheduledAt: fetchedTasks[1].runAt,
attempts: 1,
ownerId: 'test-test',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
id: fetchedTasks[2].id,
version: fetchedTasks[2].version,
scheduledAt: fetchedTasks[2].runAt,
attempts: 1,
ownerId: 'test-test',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
{
id: fetchedTasks[3].id,
version: fetchedTasks[3].version,
scheduledAt: fetchedTasks[3].runAt,
attempts: 1,
ownerId: 'test-test',
retryAt: new Date('1970-01-01T00:05:30.000Z'),
status: 'running',
startedAt: new Date('1970-01-01T00:00:00.000Z'),
},
]);
expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-2', 'id-3', 'id-4']);

expect(result.stats).toEqual({
tasksClaimed: 4,
tasksConflicted: 0,
tasksErrors: 0,
tasksUpdated: 4,
tasksLeftUnclaimed: 0,
});
expect(result.docs.length).toEqual(4);
for (const r of result.docs) {
expect(r.startedAt).not.toBeNull();
}
});

test('should throw when error when bulk getting all full task docs', async () => {
const store = taskStoreMock.create({ taskManagerId: 'test-test' });
store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`));
Expand Down Expand Up @@ -2295,7 +2426,7 @@ describe('TaskClaiming', () => {
user: 'dabo',
scope: ['reporting', 'ceo'],
ownerId: taskManagerId,
startedAt: null,
startedAt: new Date(),
retryAt: null,
scheduledAt: new Date(),
traceparent: 'newParent',
Expand Down
10 changes: 10 additions & 0 deletions x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,16 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
[]
);

// Look for tasks that have a null startedAt value, log them and manually set a startedAt field
for (const task of fullTasksToRun) {
if (task.startedAt == null) {
logger.warn(
`Task ${task.id} has a null startedAt value, setting to current time - ownerId ${task.ownerId}, status ${task.status}`
);
task.startedAt = now;
}
}

// separate update for removed tasks; shouldn't happen often, so unlikely
// a performance concern, and keeps the rest of the logic simpler
let removedCount = 0;
Expand Down

0 comments on commit 6827ba4

Please sign in to comment.