Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x] Skip claiming tasks that were modified during the task claiming phase (#198711) #199251

Merged
merged 1 commit into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1344,15 +1344,15 @@ describe('TaskClaiming', () => {
expect(result.docs.length).toEqual(3);
});

test('should assign startedAt value if bulkGet returns task with null startedAt', async () => {
test('should skip tasks where bulkGet returns a newer task document than the bulkPartialUpdate', async () => {
const store = taskStoreMock.create({ taskManagerId: 'test-test' });
store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`));

const fetchedTasks = [
mockInstance({ id: `id-1`, taskType: 'report' }),
mockInstance({ id: `id-2`, taskType: 'report' }),
mockInstance({ id: `id-3`, taskType: 'yawn' }),
mockInstance({ id: `id-4`, taskType: 'report' }),
mockInstance({ id: `id-1`, taskType: 'report', version: '123' }),
mockInstance({ id: `id-2`, taskType: 'report', version: '123' }),
mockInstance({ id: `id-3`, taskType: 'yawn', version: '123' }),
mockInstance({ id: `id-4`, taskType: 'report', version: '123' }),
];

const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks);
Expand All @@ -1365,7 +1365,7 @@ describe('TaskClaiming', () => {
);
store.bulkGet.mockResolvedValueOnce([
asOk({ ...fetchedTasks[0], startedAt: new Date() }),
asOk(fetchedTasks[1]),
asOk({ ...fetchedTasks[1], startedAt: new Date(), version: 'abc' }),
asOk({ ...fetchedTasks[2], startedAt: new Date() }),
asOk({ ...fetchedTasks[3], startedAt: new Date() }),
]);
Expand Down Expand Up @@ -1399,11 +1399,11 @@ describe('TaskClaiming', () => {
expect(mockApmTrans.end).toHaveBeenCalledWith('success');

expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(taskManagerLogger.warn).toHaveBeenCalledWith(
'Task id-2 has a null startedAt value, setting to current time - ownerId null, status idle',
'Task id-2 was modified during the claiming phase, skipping until the next claiming cycle.',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);

Expand Down Expand Up @@ -1463,14 +1463,14 @@ describe('TaskClaiming', () => {
expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-2', 'id-3', 'id-4']);

expect(result.stats).toEqual({
tasksClaimed: 4,
tasksConflicted: 0,
tasksClaimed: 3,
tasksConflicted: 1,
tasksErrors: 0,
tasksUpdated: 4,
tasksUpdated: 3,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(4);
expect(result.docs.length).toEqual(3);
for (const r of result.docs) {
expect(r.startedAt).not.toBeNull();
}
Expand Down
38 changes: 16 additions & 22 deletions x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,15 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
}

// perform the task object updates, deal with errors
const updatedTaskIds: string[] = [];
const updatedTasks: Record<string, PartialConcreteTaskInstance> = {};
let conflicts = 0;
let bulkUpdateErrors = 0;
let bulkGetErrors = 0;

const updateResults = await taskStore.bulkPartialUpdate(taskUpdates);
for (const updateResult of updateResults) {
if (isOk(updateResult)) {
updatedTaskIds.push(updateResult.value.id);
updatedTasks[updateResult.value.id] = updateResult.value;
} else {
const { id, type, error, status } = updateResult.error;

Expand All @@ -218,29 +218,23 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
}

// perform an mget to get the full task instance for claiming
const fullTasksToRun = (await taskStore.bulkGet(updatedTaskIds)).reduce<ConcreteTaskInstance[]>(
(acc, task) => {
if (isOk(task)) {
acc.push(task.value);
} else {
const { id, type, error } = task.error;
logger.error(`Error getting full task ${id}:${type} during claim: ${error.message}`);
bulkGetErrors++;
}
return acc;
},
[]
);

// Look for tasks that have a null startedAt value, log them and manually set a startedAt field
for (const task of fullTasksToRun) {
if (task.startedAt == null) {
const fullTasksToRun = (await taskStore.bulkGet(Object.keys(updatedTasks))).reduce<
ConcreteTaskInstance[]
>((acc, task) => {
if (isOk(task) && task.value.version !== updatedTasks[task.value.id].version) {
logger.warn(
`Task ${task.id} has a null startedAt value, setting to current time - ownerId ${task.ownerId}, status ${task.status}`
`Task ${task.value.id} was modified during the claiming phase, skipping until the next claiming cycle.`
);
task.startedAt = now;
conflicts++;
} else if (isOk(task)) {
acc.push(task.value);
} else {
const { id, type, error } = task.error;
logger.error(`Error getting full task ${id}:${type} during claim: ${error.message}`);
bulkGetErrors++;
}
}
return acc;
}, []);

// separate update for removed tasks; shouldn't happen often, so unlikely
// a performance concern, and keeps the rest of the logic simpler
Expand Down
17 changes: 12 additions & 5 deletions x-pack/plugins/task_manager/server/task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,10 @@ describe('TaskStore', () => {
refresh: false,
});

expect(result).toEqual([asOk(task)]);
expect(result).toEqual([
// New version returned after update
asOk({ ...task, version: 'Wzg0LDFd' }),
]);
});

test(`should perform partial update with minimal fields`, async () => {
Expand Down Expand Up @@ -1062,7 +1065,8 @@ describe('TaskStore', () => {
refresh: false,
});

expect(result).toEqual([asOk(task)]);
// New version returned after update
expect(result).toEqual([asOk({ ...task, version: 'Wzg0LDFd' })]);
});

test(`should perform partial update with no version`, async () => {
Expand Down Expand Up @@ -1100,7 +1104,8 @@ describe('TaskStore', () => {
refresh: false,
});

expect(result).toEqual([asOk(task)]);
// New version returned after update
expect(result).toEqual([asOk({ ...task, version: 'Wzg0LDFd' })]);
});

test(`should gracefully handle errors within the response`, async () => {
Expand Down Expand Up @@ -1183,7 +1188,8 @@ describe('TaskStore', () => {
});

expect(result).toEqual([
asOk(task1),
// New version returned after update
asOk({ ...task1, version: 'Wzg0LDFd' }),
asErr({
type: 'task',
id: '45343254',
Expand Down Expand Up @@ -1267,7 +1273,8 @@ describe('TaskStore', () => {
});

expect(result).toEqual([
asOk(task1),
// New version returned after update
asOk({ ...task1, version: 'Wzg0LDFd' }),
asErr({
type: 'task',
id: 'unknown',
Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugins/task_manager/server/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
ElasticsearchClient,
} from '@kbn/core/server';

import { decodeRequestVersion } from '@kbn/core-saved-objects-base-server-internal';
import { decodeRequestVersion, encodeVersion } from '@kbn/core-saved-objects-base-server-internal';
import { RequestTimeoutsConfig } from './config';
import { asOk, asErr, Result } from './lib/result_type';

Expand Down Expand Up @@ -427,6 +427,7 @@ export class TaskStore {
return asOk({
...doc,
id: docId,
version: encodeVersion(item.update._seq_no, item.update._primary_term),
});
});
}
Expand Down