Skip to content

Commit

Permalink
Skip claiming tasks that were modified during the task claiming phase (
Browse files Browse the repository at this point in the history
…elastic#198711)

Resolves elastic#196300

In this PR, I'm removing the fallback we had when `startedAt` value is
missing (elastic#194759) in favour of
dropping the task document from the claiming cycle. The additional logs
that were added showed that tasks that were missing a `startedAt` value
was because they were being re-created during the exact same time they
were being claimed, causing the task to have an `idle` status and a
missing `startedAt` value. Given the scenario, it feels better to drop
them from getting claimed and to instead try again at the next claim
cycle where the race condition shouldn't occur.
  • Loading branch information
mikecote authored Nov 7, 2024
1 parent 1fa3089 commit a19dd8e
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1344,15 +1344,15 @@ describe('TaskClaiming', () => {
expect(result.docs.length).toEqual(3);
});

test('should assign startedAt value if bulkGet returns task with null startedAt', async () => {
test('should skip tasks where bulkGet returns a newer task document than the bulkPartialUpdate', async () => {
const store = taskStoreMock.create({ taskManagerId: 'test-test' });
store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`));

const fetchedTasks = [
mockInstance({ id: `id-1`, taskType: 'report' }),
mockInstance({ id: `id-2`, taskType: 'report' }),
mockInstance({ id: `id-3`, taskType: 'yawn' }),
mockInstance({ id: `id-4`, taskType: 'report' }),
mockInstance({ id: `id-1`, taskType: 'report', version: '123' }),
mockInstance({ id: `id-2`, taskType: 'report', version: '123' }),
mockInstance({ id: `id-3`, taskType: 'yawn', version: '123' }),
mockInstance({ id: `id-4`, taskType: 'report', version: '123' }),
];

const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks);
Expand All @@ -1365,7 +1365,7 @@ describe('TaskClaiming', () => {
);
store.bulkGet.mockResolvedValueOnce([
asOk({ ...fetchedTasks[0], startedAt: new Date() }),
asOk(fetchedTasks[1]),
asOk({ ...fetchedTasks[1], startedAt: new Date(), version: 'abc' }),
asOk({ ...fetchedTasks[2], startedAt: new Date() }),
asOk({ ...fetchedTasks[3], startedAt: new Date() }),
]);
Expand Down Expand Up @@ -1399,11 +1399,11 @@ describe('TaskClaiming', () => {
expect(mockApmTrans.end).toHaveBeenCalledWith('success');

expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(taskManagerLogger.warn).toHaveBeenCalledWith(
'Task id-2 has a null startedAt value, setting to current time - ownerId null, status idle',
'Task id-2 was modified during the claiming phase, skipping until the next claiming cycle.',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);

Expand Down Expand Up @@ -1463,14 +1463,14 @@ describe('TaskClaiming', () => {
expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-2', 'id-3', 'id-4']);

expect(result.stats).toEqual({
tasksClaimed: 4,
tasksConflicted: 0,
tasksClaimed: 3,
tasksConflicted: 1,
tasksErrors: 0,
tasksUpdated: 4,
tasksUpdated: 3,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(4);
expect(result.docs.length).toEqual(3);
for (const r of result.docs) {
expect(r.startedAt).not.toBeNull();
}
Expand Down
38 changes: 16 additions & 22 deletions x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,15 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
}

// perform the task object updates, deal with errors
const updatedTaskIds: string[] = [];
const updatedTasks: Record<string, PartialConcreteTaskInstance> = {};
let conflicts = 0;
let bulkUpdateErrors = 0;
let bulkGetErrors = 0;

const updateResults = await taskStore.bulkPartialUpdate(taskUpdates);
for (const updateResult of updateResults) {
if (isOk(updateResult)) {
updatedTaskIds.push(updateResult.value.id);
updatedTasks[updateResult.value.id] = updateResult.value;
} else {
const { id, type, error, status } = updateResult.error;

Expand All @@ -218,29 +218,23 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
}

// perform an mget to get the full task instance for claiming
const fullTasksToRun = (await taskStore.bulkGet(updatedTaskIds)).reduce<ConcreteTaskInstance[]>(
(acc, task) => {
if (isOk(task)) {
acc.push(task.value);
} else {
const { id, type, error } = task.error;
logger.error(`Error getting full task ${id}:${type} during claim: ${error.message}`);
bulkGetErrors++;
}
return acc;
},
[]
);

// Look for tasks that have a null startedAt value, log them and manually set a startedAt field
for (const task of fullTasksToRun) {
if (task.startedAt == null) {
const fullTasksToRun = (await taskStore.bulkGet(Object.keys(updatedTasks))).reduce<
ConcreteTaskInstance[]
>((acc, task) => {
if (isOk(task) && task.value.version !== updatedTasks[task.value.id].version) {
logger.warn(
`Task ${task.id} has a null startedAt value, setting to current time - ownerId ${task.ownerId}, status ${task.status}`
`Task ${task.value.id} was modified during the claiming phase, skipping until the next claiming cycle.`
);
task.startedAt = now;
conflicts++;
} else if (isOk(task)) {
acc.push(task.value);
} else {
const { id, type, error } = task.error;
logger.error(`Error getting full task ${id}:${type} during claim: ${error.message}`);
bulkGetErrors++;
}
}
return acc;
}, []);

// separate update for removed tasks; shouldn't happen often, so unlikely
// a performance concern, and keeps the rest of the logic simpler
Expand Down
17 changes: 12 additions & 5 deletions x-pack/plugins/task_manager/server/task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,10 @@ describe('TaskStore', () => {
refresh: false,
});

expect(result).toEqual([asOk(task)]);
expect(result).toEqual([
// New version returned after update
asOk({ ...task, version: 'Wzg0LDFd' }),
]);
});

test(`should perform partial update with minimal fields`, async () => {
Expand Down Expand Up @@ -1062,7 +1065,8 @@ describe('TaskStore', () => {
refresh: false,
});

expect(result).toEqual([asOk(task)]);
// New version returned after update
expect(result).toEqual([asOk({ ...task, version: 'Wzg0LDFd' })]);
});

test(`should perform partial update with no version`, async () => {
Expand Down Expand Up @@ -1100,7 +1104,8 @@ describe('TaskStore', () => {
refresh: false,
});

expect(result).toEqual([asOk(task)]);
// New version returned after update
expect(result).toEqual([asOk({ ...task, version: 'Wzg0LDFd' })]);
});

test(`should gracefully handle errors within the response`, async () => {
Expand Down Expand Up @@ -1183,7 +1188,8 @@ describe('TaskStore', () => {
});

expect(result).toEqual([
asOk(task1),
// New version returned after update
asOk({ ...task1, version: 'Wzg0LDFd' }),
asErr({
type: 'task',
id: '45343254',
Expand Down Expand Up @@ -1267,7 +1273,8 @@ describe('TaskStore', () => {
});

expect(result).toEqual([
asOk(task1),
// New version returned after update
asOk({ ...task1, version: 'Wzg0LDFd' }),
asErr({
type: 'task',
id: 'unknown',
Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugins/task_manager/server/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
ElasticsearchClient,
} from '@kbn/core/server';

import { decodeRequestVersion } from '@kbn/core-saved-objects-base-server-internal';
import { decodeRequestVersion, encodeVersion } from '@kbn/core-saved-objects-base-server-internal';
import { RequestTimeoutsConfig } from './config';
import { asOk, asErr, Result } from './lib/result_type';

Expand Down Expand Up @@ -427,6 +427,7 @@ export class TaskStore {
return asOk({
...doc,
id: docId,
version: encodeVersion(item.update._seq_no, item.update._primary_term),
});
});
}
Expand Down

0 comments on commit a19dd8e

Please sign in to comment.