Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ResponseOps][TaskManager] fix limited concurrency starvation in mget task claimer #187809

Merged
merged 16 commits into from
Aug 26, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import {
RecognizedTask,
OneOfTaskTypes,
tasksWithPartitions,
claimSort,
} from './mark_available_tasks_as_claimed';

import { TaskStatus, TaskPriority, ConcreteTaskInstance } from '../task';

import { TaskTypeDictionary } from '../task_type_dictionary';
import { mockLogger } from '../test_utils';

Expand Down Expand Up @@ -304,4 +307,129 @@ if (doc['task.runAt'].size()!=0) {
}
`);
});

// Tests sorting 3 tasks with different priorities, runAt/retryAt values
// running the sort over all permutations of them.
describe('claimSort', () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
normalPriorityTask: {
title: 'normal priority',
createTaskRunner: () => ({ run: () => Promise.resolve() }),
priority: TaskPriority.Normal, // 50
},
noPriorityTask: {
title: 'no priority',
createTaskRunner: () => ({ run: () => Promise.resolve() }),
priority: undefined, // 50
},
lowPriorityTask: {
title: 'low priority',
createTaskRunner: () => ({ run: () => Promise.resolve() }),
priority: TaskPriority.Low, // 1
},
});

// possible ordering of tasks before sort
const permutations = [
[0, 1, 2],
[0, 2, 1],
[1, 0, 2],
[1, 2, 0],
[2, 0, 1],
[2, 1, 0],
];

test('works correctly with same dates, different priorities', () => {
const date = new Date();
const baseTasks: ConcreteTaskInstance[] = [];

// push in reverse order
baseTasks.push(buildTaskInstance({ taskType: 'lowPriorityTask', runAt: date }));
baseTasks.push(buildTaskInstance({ taskType: 'noPriorityTask', runAt: date }));
baseTasks.push(buildTaskInstance({ taskType: 'normalPriorityTask', runAt: date }));

for (const perm of permutations) {
const tasks = [baseTasks[perm[0]], baseTasks[perm[1]], baseTasks[perm[2]]];
const sorted = claimSort(definitions, tasks);
// all we know is low should be last
expect(sorted[2]).toBe(baseTasks[0]);
}
});

test('works correctly with same priorities, different dates', () => {
const baseDate = new Date('2024-07-29T00:00:00Z').valueOf();
const baseTasks: ConcreteTaskInstance[] = [];

// push in reverse order
baseTasks.push(
buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate + 1000) })
);
baseTasks.push(buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate) }));
baseTasks.push(
buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate - 1000) })
);

for (const perm of permutations) {
const tasks = [baseTasks[perm[0]], baseTasks[perm[1]], baseTasks[perm[2]]];
const sorted = claimSort(definitions, tasks);
expect(sorted[0]).toBe(baseTasks[2]);
expect(sorted[1]).toBe(baseTasks[1]);
expect(sorted[2]).toBe(baseTasks[0]);
}
});

test('works correctly with mixed of runAt and retryAt values', () => {
const baseDate = new Date('2024-07-29T00:00:00Z').valueOf();
const baseTasks: ConcreteTaskInstance[] = [];

// push in reverse order
baseTasks.push(
buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate + 1000) })
);
baseTasks.push(
buildTaskInstance({
taskType: 'noPriorityTask',
runAt: new Date(baseDate - 2000),
retryAt: new Date(baseDate), // should use this value
})
);
baseTasks.push(
buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate - 1000) })
);

for (const perm of permutations) {
const tasks = [baseTasks[perm[0]], baseTasks[perm[1]], baseTasks[perm[2]]];
const sorted = claimSort(definitions, tasks);
expect(sorted[0]).toBe(baseTasks[2]);
expect(sorted[1]).toBe(baseTasks[1]);
expect(sorted[2]).toBe(baseTasks[0]);
}
});
});
});

interface BuildTaskOpts {
taskType: string;
runAt: Date;
retryAt?: Date;
}

let id = 1;

function buildTaskInstance(opts: BuildTaskOpts): ConcreteTaskInstance {
const { taskType, runAt, retryAt } = opts;
return {
taskType,
id: `${id++}`,
runAt,
retryAt: retryAt || null,
scheduledAt: runAt,
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
state: {},
params: {},
ownerId: null,
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { TaskStatus, TaskPriority } from '../task';
import { TaskStatus, TaskPriority, ConcreteTaskInstance } from '../task';
import {
ScriptBasedSortClause,
ScriptClause,
Expand All @@ -15,23 +15,6 @@ import {
MustNotCondition,
} from './query_clauses';

export function taskWithLessThanMaxAttempts(type: string, maxAttempts: number): MustCondition {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed a few lingering references to search-related things regarding tasks running too many attempts. I believe this got resolved in #152841; though not sure if that applies to recurring tasks. @mikecote @ymao1 ??? In any case, this function was no longer being used, so figured I might as well delete it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I don't think we enforced anything with max attempts for recurring task types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 shouldn't be used for recurring tasks, only ad-hoc (one time) tasks

return {
bool: {
must: [
{ term: { 'task.taskType': type } },
{
range: {
'task.attempts': {
lt: maxAttempts,
},
},
},
],
},
};
}

export function tasksOfType(taskTypes: string[]): estypes.QueryDslQueryContainer {
return {
bool: {
Expand Down Expand Up @@ -166,12 +149,53 @@ function getSortByPriority(definitions: TaskTypeDictionary): estypes.SortCombina
};
}

// getClaimSort() is used to generate sort bits for the ES query
// should align with claimSort() below
export function getClaimSort(definitions: TaskTypeDictionary): estypes.SortCombinations[] {
const sortByPriority = getSortByPriority(definitions);
if (!sortByPriority) return [SortByRunAtAndRetryAt];
return [sortByPriority, SortByRunAtAndRetryAt];
}

// claimSort() is used to sort tasks returned from a claimer by priority and date.
// Kept here so it should align with getClaimSort() above.
// Returns a copy of the tasks passed in.
export function claimSort(
ymao1 marked this conversation as resolved.
Show resolved Hide resolved
definitions: TaskTypeDictionary,
tasks: ConcreteTaskInstance[]
): ConcreteTaskInstance[] {
const priorityMap: Record<string, TaskPriority> = {};
tasks.forEach((task) => {
const taskType = task.taskType;
const priority = getPriority(definitions, taskType);
priorityMap[taskType] = priority;
});

return tasks.slice().sort(compare);

function compare(a: ConcreteTaskInstance, b: ConcreteTaskInstance) {
// sort by priority, descending
const priorityA = priorityMap[a.taskType] ?? TaskPriority.Normal;
const priorityB = priorityMap[b.taskType] ?? TaskPriority.Normal;

if (priorityA > priorityB) return -1;
if (priorityA < priorityB) return 1;

// then sort by retry/runAt, ascending
const runA = a.retryAt?.valueOf() ?? a.runAt.valueOf() ?? 0;
const runB = b.retryAt?.valueOf() ?? b.runAt.valueOf() ?? 0;

if (runA < runB) return -1;
if (runA > runB) return 1;

return 0;
}
}

function getPriority(definitions: TaskTypeDictionary, taskType: string): TaskPriority {
return definitions.get(taskType)?.priority ?? TaskPriority.Normal;
}

export interface UpdateFieldsAndMarkAsFailedOpts {
fieldUpdates: {
[field: string]: string | number | Date;
Expand Down
9 changes: 9 additions & 0 deletions x-pack/plugins/task_manager/server/task_claimers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,12 @@ export function isTaskTypeExcluded(excludedTaskTypePatterns: string[], taskType:

return false;
}

export function getExcludedTaskTypes(
definitions: TaskTypeDictionary,
excludedTaskTypePatterns: string[]
) {
return definitions
.getAllTypes()
.filter((taskType) => isTaskTypeExcluded(excludedTaskTypePatterns, taskType));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { ConcreteTaskInstance } from '../../task';
import { isLimited, TaskClaimingBatches } from '../../queries/task_claiming';

// given a list of tasks and capacity info, select the tasks that meet capacity
export function selectTasksByCapacity(
tasks: ConcreteTaskInstance[],
batches: TaskClaimingBatches
): ConcreteTaskInstance[] {
// create a map of task type - concurrency
const limitedBatches = batches.filter(isLimited);
const limitedMap = new Map<string, number>();
for (const limitedBatch of limitedBatches) {
const { tasksTypes, concurrency } = limitedBatch;
limitedMap.set(tasksTypes, concurrency);
}

// apply the limited concurrency
const result: ConcreteTaskInstance[] = [];
for (const task of tasks) {
const concurrency = limitedMap.get(task.taskType);
if (concurrency == null) {
result.push(task);
continue;
}

if (concurrency > 0) {
result.push(task);
limitedMap.set(task.taskType, concurrency - 1);
}
}

return result;
}
Loading