From 2b0c750ff96c14bd8e69555fbeb871fa3e8eb4ab Mon Sep 17 00:00:00 2001 From: Patrick Mueller Date: Mon, 8 Jul 2024 19:42:25 -0400 Subject: [PATCH 1/7] some starting changes --- .../server/task_claimers/strategy_mget.ts | 3 + .../plugins/task_manager/server/task_store.ts | 89 ++++++++++++++----- 2 files changed, 72 insertions(+), 20 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index 07d18a39a1dbc..f67139a39c122 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -47,6 +47,7 @@ interface OwnershipClaimingOpts { taskTypes: Set; removedTypes: Set; excludedTypes: Set; + getCapacity: (taskType?: string | undefined) => number; taskStore: TaskStore; events$: Subject; definitions: TaskTypeDictionary; @@ -109,6 +110,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise { diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index b922d10ee5cf1..b859920c29722 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -484,6 +484,34 @@ export class TaskStore { } } + async msearch(opts: SearchOpts[] = []): Promise { + const queries = opts.map((opt) => ensureQueryOnlyReturnsTaskObjects(opt)); + const body = queries.flatMap((query) => [{}, query]); + + const result = await this.esClientWithoutRetries.msearch({ + index: this.index, + ignore_unavailable: true, + body, + }); + const { responses } = result; + + const versionMap = this.createVersionMap([]); + let allTasks = new Array(); + + for (const response of responses) { + if (response.status !== 200) { + throw new Error(`Unexpected status code: ${response.status}`); + } + + const { hits } = response as estypes.MsearchMultiSearchItem; + const { hits: tasks } = hits; + this.addTasksToVersionMap(versionMap, tasks); + allTasks = allTasks.concat(this.filterTasks(tasks)); + } + + return { docs: allTasks, versionMap }; + } + private async search(opts: SearchOpts = {}): Promise { const { query } = ensureQueryOnlyReturnsTaskObjects(opts); @@ -500,27 +528,9 @@ export class TaskStore { hits: { hits: tasks }, } = result; - const versionMap = new Map(); - for (const task of tasks) { - if (task._seq_no == null || task._primary_term == null) continue; - - const esId = task._id!.startsWith('task:') ? task._id!.slice(5) : task._id!; - versionMap.set(esId, { - esId: task._id!, - seqNo: task._seq_no, - primaryTerm: task._primary_term, - }); - } - + const versionMap = this.createVersionMap(tasks); return { - docs: tasks - // @ts-expect-error @elastic/elasticsearch _source is optional - .filter((doc) => this.serializer.isRawSavedObject(doc)) - // @ts-expect-error @elastic/elasticsearch _source is optional - .map((doc) => this.serializer.rawToSavedObject(doc)) - .map((doc) => omit(doc, 'namespace') as SavedObject) - .map((doc) => savedObjectToConcreteTaskInstance(doc)) - .filter((doc): doc is ConcreteTaskInstance => !!doc), + docs: this.filterTasks(tasks), versionMap, }; } catch (e) { @@ -529,6 +539,45 @@ export class TaskStore { } } + private filterTasks( + tasks: Array> + ): ConcreteTaskInstance[] { + return ( + tasks + // @ts-expect-error @elastic/elasticsearch _source is optional + .filter((doc) => this.serializer.isRawSavedObject(doc)) + // @ts-expect-error @elastic/elasticsearch _source is optional + .map((doc) => this.serializer.rawToSavedObject(doc)) + .map((doc) => omit(doc, 'namespace') as SavedObject) + .map((doc) => savedObjectToConcreteTaskInstance(doc)) + .filter((doc): doc is ConcreteTaskInstance => !!doc) + ); + } + + private addTasksToVersionMap( + versionMap: Map, + tasks: Array> + ): void { + for (const task of tasks) { + if (task._seq_no == null || task._primary_term == null) continue; + + const esId = task._id.startsWith('task:') ? task._id.slice(5) : task._id; + versionMap.set(esId, { + esId: task._id, + seqNo: task._seq_no, + primaryTerm: task._primary_term, + }); + } + } + + private createVersionMap( + tasks: Array> + ): Map { + const versionMap = new Map(); + this.addTasksToVersionMap(versionMap, tasks); + return versionMap; + } + public async aggregate({ aggs, query, From e7d505fbe98127d5ee33309a77b8b9000f33ea3f Mon Sep 17 00:00:00 2001 From: Patrick Mueller Date: Tue, 9 Jul 2024 01:04:00 -0400 Subject: [PATCH 2/7] close, I think --- .../mark_available_tasks_as_claimed.ts | 49 +++--- .../task_claimers/strategy_mget.test.ts | 24 +-- .../server/task_claimers/strategy_mget.ts | 149 ++++++++++++++---- .../task_manager/server/task_store.mock.ts | 2 + .../plugins/task_manager/server/task_store.ts | 7 +- 5 files changed, 159 insertions(+), 72 deletions(-) diff --git a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts index 0c241aeef14b8..1d8f590ab2a26 100644 --- a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts +++ b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts @@ -6,7 +6,7 @@ */ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import { TaskTypeDictionary } from '../task_type_dictionary'; -import { TaskStatus, TaskPriority } from '../task'; +import { TaskStatus, TaskPriority, ConcreteTaskInstance } from '../task'; import { ScriptBasedSortClause, ScriptClause, @@ -15,23 +15,6 @@ import { MustNotCondition, } from './query_clauses'; -export function taskWithLessThanMaxAttempts(type: string, maxAttempts: number): MustCondition { - return { - bool: { - must: [ - { term: { 'task.taskType': type } }, - { - range: { - 'task.attempts': { - lt: maxAttempts, - }, - }, - }, - ], - }, - }; -} - export function tasksOfType(taskTypes: string[]): estypes.QueryDslQueryContainer { return { bool: { @@ -166,12 +149,42 @@ function getSortByPriority(definitions: TaskTypeDictionary): estypes.SortCombina }; } +// getClaimSort() is used to generate sort bits for the ES query +// should align with claimSort() below export function getClaimSort(definitions: TaskTypeDictionary): estypes.SortCombinations[] { const sortByPriority = getSortByPriority(definitions); if (!sortByPriority) return [SortByRunAtAndRetryAt]; return [sortByPriority, SortByRunAtAndRetryAt]; } +// claimSort() is used to sort tasks returned from a claimer +// should align with getClaimSort() above +export function claimSort( + definitions: TaskTypeDictionary, + tasks: ConcreteTaskInstance[] +): ConcreteTaskInstance[] { + const priorityMap: Record = {}; + tasks.forEach((task) => { + const taskType = task.taskType; + const priority = definitions.get(taskType)?.priority || TaskPriority.Normal; + priorityMap[taskType] = priority; + }); + + return tasks.sort(compare); + + function compare(a: ConcreteTaskInstance, b: ConcreteTaskInstance) { + const priorityA = priorityMap[a.taskType] || TaskPriority.Normal; + const priorityB = priorityMap[b.taskType] || TaskPriority.Normal; + if (priorityA > priorityB) return -1; + if (priorityA < priorityB) return 1; + + const runA = a.runAt?.valueOf() || a.retryAt?.valueOf() || 0; + const runB = b.runAt?.valueOf() || b.retryAt?.valueOf() || 0; + + return runA - runB; + } +} + export interface UpdateFieldsAndMarkAsFailedOpts { fieldUpdates: { [field: string]: string | number | Date; diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts index 0306f9dda3da8..cb356e495cc24 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts @@ -123,7 +123,7 @@ describe('TaskClaiming', () => { } for (let i = 0; i < hits.length; i++) { - store.fetch.mockResolvedValueOnce({ docs: hits[i], versionMap: versionMaps[i] }); + store.msearch.mockResolvedValueOnce({ docs: hits[i], versionMap: versionMaps[i] }); store.getDocVersions.mockResolvedValueOnce(versionMaps[i]); const oneBulkResult = hits[i].map((hit) => asOk(hit)); store.bulkUpdate.mockResolvedValueOnce(oneBulkResult); @@ -189,7 +189,7 @@ describe('TaskClaiming', () => { ); expect(mockApmTrans.end).toHaveBeenCalledWith('success'); - expect(store.fetch.mock.calls).toMatchObject({}); + expect(store.msearch.mock.calls).toMatchObject({}); expect(store.getDocVersions.mock.calls).toMatchObject({}); return results.map((result, index) => ({ result, @@ -223,8 +223,8 @@ describe('TaskClaiming', () => { }, }); - store.fetch.mockReset(); - store.fetch.mockRejectedValue(new Error('Oh no')); + store.msearch.mockReset(); + store.msearch.mockRejectedValue(new Error('Oh no')); await expect( getAllAsPromise( @@ -349,24 +349,14 @@ describe('TaskClaiming', () => { const taskStore = taskStoreMock.create({ taskManagerId }); taskStore.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`)); for (const docs of taskCycles) { - taskStore.fetch.mockResolvedValueOnce({ docs, versionMap: new Map() }); - taskStore.updateByQuery.mockResolvedValueOnce({ - updated: docs.length, - version_conflicts: 0, - total: docs.length, - }); + taskStore.msearch.mockResolvedValueOnce({ docs, versionMap: new Map() }); } - taskStore.fetch.mockResolvedValue({ docs: [], versionMap: new Map() }); - taskStore.updateByQuery.mockResolvedValue({ - updated: 0, - version_conflicts: 0, - total: 0, - }); + taskStore.msearch.mockResolvedValue({ docs: [], versionMap: new Map() }); const taskClaiming = new TaskClaiming({ logger: taskManagerLogger, - strategy: 'default', + strategy: CLAIM_STRATEGY_MGET, definitions, excludedTaskTypes: [], unusedTypes: [], diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index f67139a39c122..14ac70ad5c3b8 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -41,19 +41,6 @@ import { import { TaskStore, SearchOpts } from '../task_store'; import { isOk, asOk } from '../lib/result_type'; -interface OwnershipClaimingOpts { - claimOwnershipUntil: Date; - size: number; - taskTypes: Set; - removedTypes: Set; - excludedTypes: Set; - getCapacity: (taskType?: string | undefined) => number; - taskStore: TaskStore; - events$: Subject; - definitions: TaskTypeDictionary; - taskMaxAttempts: Record; -} - const SIZE_MULTIPLIER_FOR_TASK_FETCH = 4; export function claimAvailableTasksMget(opts: TaskClaimerOpts): Observable { @@ -262,6 +249,19 @@ interface SearchAvailableTasksResponse { versionMap: Map; } +interface SearchAvailableTasksOpts { + claimOwnershipUntil: Date; + size: number; + taskTypes: Set; + removedTypes: Set; + excludedTypes: Set; + getCapacity: (taskType?: string | undefined) => number; + taskStore: TaskStore; + events$: Subject; + definitions: TaskTypeDictionary; + taskMaxAttempts: Record; +} + async function searchAvailableTasks({ definitions, taskTypes, @@ -271,31 +271,110 @@ async function searchAvailableTasks({ getCapacity, size, taskMaxAttempts, -}: OwnershipClaimingOpts): Promise { - const searchedTypes = Array.from(taskTypes) - .concat(Array.from(removedTypes)) - .filter((type) => !excludedTypes.has(type)); - const queryForScheduledTasks = mustBeAllOf( - // Task must be enabled - EnabledTask, - // a task type that's not excluded (may be removed or not) - OneOfTaskTypes('task.taskType', searchedTypes), - // Either a task with idle status and runAt <= now or - // status running or claiming with a retryAt <= now. - shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt), - // must have a status that isn't 'unrecognized' - RecognizedTask - ); +}: SearchAvailableTasksOpts): Promise { + const claimPartitions = buildClaimPartitions({ + types: taskTypes, + excludedTypes, + removedTypes, + getCapacity, + definitions, + }); const sort: NonNullable = getClaimSort(definitions); - const query = matchesClauses(queryForScheduledTasks, filterDownBy(InactiveTasks)); + const searches: SearchOpts[] = []; + + // not handling removed types yet + + // add search for unlimited types + if (claimPartitions.unlimitedTypes.length > 0) { + const queryForScheduledTasks = mustBeAllOf( + // Task must be enabled + EnabledTask, + // a task type that's not excluded (may be removed or not) + OneOfTaskTypes('task.taskType', claimPartitions.unlimitedTypes), + // Either a task with idle status and runAt <= now or + // status running or claiming with a retryAt <= now. + shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt), + // must have a status that isn't 'unrecognized' + RecognizedTask + ); + + const query = matchesClauses(queryForScheduledTasks, filterDownBy(InactiveTasks)); + searches.push({ + query, + sort, + size, + seq_no_primary_term: true, + }); + } - return await taskStore.fetch({ - query, - sort, - size, - seq_no_primary_term: true, - }); + // add searches for limited types + for (const [type, capacity] of claimPartitions.limitedTypes) { + const queryForScheduledTasks = mustBeAllOf( + // Task must be enabled + EnabledTask, + // Specific task type + OneOfTaskTypes('task.taskType', [type]), + // Either a task with idle status and runAt <= now or + // status running or claiming with a retryAt <= now. + shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt), + // must have a status that isn't 'unrecognized' + RecognizedTask + ); + + const query = matchesClauses(queryForScheduledTasks, filterDownBy(InactiveTasks)); + searches.push({ + query, + sort, + size: capacity, + seq_no_primary_term: true, + }); + } + + return await taskStore.msearch(searches); +} + +interface ClaimPartitions { + removedTypes: string[]; + unlimitedTypes: string[]; + limitedTypes: Map; +} + +interface BuildClaimPartitionsOpts { + types: Set; + excludedTypes: Set; + removedTypes: Set; + getCapacity: (taskType?: string) => number; + definitions: TaskTypeDictionary; +} + +function buildClaimPartitions(opts: BuildClaimPartitionsOpts): ClaimPartitions { + const result: ClaimPartitions = { + removedTypes: [], + unlimitedTypes: [], + limitedTypes: new Map(), + }; + + const { types, excludedTypes, removedTypes, getCapacity, definitions } = opts; + for (const type of types) { + if (excludedTypes.has(type)) continue; + + if (removedTypes.has(type)) { + result.removedTypes.push(type); + continue; + } + + const definition = definitions.get(type); + if (definition.maxConcurrency == null) { + result.unlimitedTypes.push(definition.type); + continue; + } + + const capacity = getCapacity(definition.type); + result.limitedTypes.set(definition.type, capacity); + } + + return result; } function applyLimitedConcurrency( diff --git a/x-pack/plugins/task_manager/server/task_store.mock.ts b/x-pack/plugins/task_manager/server/task_store.mock.ts index c15518eaed510..7cf051f406532 100644 --- a/x-pack/plugins/task_manager/server/task_store.mock.ts +++ b/x-pack/plugins/task_manager/server/task_store.mock.ts @@ -33,6 +33,8 @@ export const taskStoreMock = { bulkGet: jest.fn(), bulkGetVersions: jest.fn(), getDocVersions: jest.fn(), + search: jest.fn(), + msearch: jest.fn(), index, taskManagerId, } as unknown as jest.Mocked; diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index b859920c29722..2b554f2de2a69 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -39,6 +39,7 @@ import { import { TaskTypeDictionary } from './task_type_dictionary'; import { AdHocTaskCounter } from './lib/adhoc_task_counter'; import { TaskValidator } from './task_validator'; +import { claimSort } from './queries/mark_available_tasks_as_claimed'; export interface StoreOpts { esClient: ElasticsearchClient; @@ -509,7 +510,9 @@ export class TaskStore { allTasks = allTasks.concat(this.filterTasks(tasks)); } - return { docs: allTasks, versionMap }; + const allSortedTasks = claimSort(this.definitions, allTasks); + + return { docs: allSortedTasks, versionMap }; } private async search(opts: SearchOpts = {}): Promise { @@ -559,7 +562,7 @@ export class TaskStore { tasks: Array> ): void { for (const task of tasks) { - if (task._seq_no == null || task._primary_term == null) continue; + if (task._id == null || task._seq_no == null || task._primary_term == null) continue; const esId = task._id.startsWith('task:') ? task._id.slice(5) : task._id; versionMap.set(esId, { From 0d002432334bdbf554b276b7dea9ffb7f84d22ad Mon Sep 17 00:00:00 2001 From: Patrick Mueller Date: Tue, 9 Jul 2024 18:35:56 -0400 Subject: [PATCH 3/7] fix jest test --- .../lib/task_selector_by_capacity.ts | 40 +++++++++++++++++++ .../task_claimers/strategy_mget.test.ts | 21 +++++++++- .../server/task_claimers/strategy_mget.ts | 39 ++---------------- 3 files changed, 62 insertions(+), 38 deletions(-) create mode 100644 x-pack/plugins/task_manager/server/task_claimers/lib/task_selector_by_capacity.ts diff --git a/x-pack/plugins/task_manager/server/task_claimers/lib/task_selector_by_capacity.ts b/x-pack/plugins/task_manager/server/task_claimers/lib/task_selector_by_capacity.ts new file mode 100644 index 0000000000000..531357436c0bf --- /dev/null +++ b/x-pack/plugins/task_manager/server/task_claimers/lib/task_selector_by_capacity.ts @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ConcreteTaskInstance } from '../../task'; +import { isLimited, TaskClaimingBatches } from '../../queries/task_claiming'; + +// given a list of tasks and capacity info, select the tasks that meet capacity +export function selectTasksByCapacity( + tasks: ConcreteTaskInstance[], + batches: TaskClaimingBatches +): ConcreteTaskInstance[] { + // create a map of task type - concurrency + const limitedBatches = batches.filter(isLimited); + const limitedMap = new Map(); + for (const limitedBatch of limitedBatches) { + const { tasksTypes, concurrency } = limitedBatch; + limitedMap.set(tasksTypes, concurrency); + } + + // apply the limited concurrency + const result: ConcreteTaskInstance[] = []; + for (const task of tasks) { + const concurrency = limitedMap.get(task.taskType); + if (concurrency == null) { + result.push(task); + continue; + } + + if (concurrency > 0) { + result.push(task); + limitedMap.set(task.taskType, concurrency - 1); + } + } + + return result; +} diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts index cb356e495cc24..8db8694efff8b 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts @@ -349,10 +349,26 @@ describe('TaskClaiming', () => { const taskStore = taskStoreMock.create({ taskManagerId }); taskStore.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`)); for (const docs of taskCycles) { - taskStore.msearch.mockResolvedValueOnce({ docs, versionMap: new Map() }); + const versionMap = new Map(); + const docVersions = new Map(); + for (const doc of docs) { + const esId = `task:${doc.id}`; + versionMap.set(doc.id, { esId, seqNo: 42, primaryTerm: 666 }); + docVersions.set(esId, { esId, seqNo: 42, primaryTerm: 666 }); + } + taskStore.msearch.mockResolvedValueOnce({ docs, versionMap }); + taskStore.getDocVersions.mockResolvedValueOnce(docVersions); + taskStore.bulkUpdate.mockResolvedValueOnce( + docs.map((doc) => { + doc = { ...doc, retryAt: null }; + return asOk(doc); + }) + ); } taskStore.msearch.mockResolvedValue({ docs: [], versionMap: new Map() }); + taskStore.getDocVersions.mockResolvedValue(new Map()); + taskStore.bulkUpdate.mockResolvedValue([]); const taskClaiming = new TaskClaiming({ logger: taskManagerLogger, @@ -406,7 +422,8 @@ describe('TaskClaiming', () => { retryAt: null, scheduledAt: new Date(), traceparent: 'newParent', - }) + }), + event?.timing ) ); }); diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index 14ac70ad5c3b8..1237408e9ff9d 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -20,11 +20,7 @@ import { TaskTypeDictionary } from '../task_type_dictionary'; import { TaskClaimerOpts, ClaimOwnershipResult, getEmptyClaimOwnershipResult } from '.'; import { ConcreteTaskInstance, TaskStatus, ConcreteTaskInstanceVersion } from '../task'; import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running'; -import { - isLimited, - TASK_MANAGER_MARK_AS_CLAIMED, - TaskClaimingBatches, -} from '../queries/task_claiming'; +import { TASK_MANAGER_MARK_AS_CLAIMED } from '../queries/task_claiming'; import { TaskClaim, asTaskClaimEvent, startTaskTimer } from '../task_events'; import { shouldBeOneOf, mustBeAllOf, filterDownBy, matchesClauses } from '../queries/query_clauses'; @@ -40,6 +36,7 @@ import { import { TaskStore, SearchOpts } from '../task_store'; import { isOk, asOk } from '../lib/result_type'; +import { selectTasksByCapacity } from './lib/task_selector_by_capacity'; const SIZE_MULTIPLIER_FOR_TASK_FETCH = 4; @@ -142,7 +139,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise(); - for (const limitedBatch of limitedBatches) { - const { tasksTypes, concurrency } = limitedBatch; - limitedMap.set(tasksTypes, concurrency); - } - - // apply the limited concurrency - const result: ConcreteTaskInstance[] = []; - for (const task of tasks) { - const concurrency = limitedMap.get(task.taskType); - if (concurrency == null) { - result.push(task); - continue; - } - - if (concurrency > 0) { - result.push(task); - limitedMap.set(task.taskType, concurrency - 1); - } - } - - return result; -} From 450acb39c6cb6060bc38ba48f39b0fae698a3e74 Mon Sep 17 00:00:00 2001 From: Patrick Mueller Date: Mon, 29 Jul 2024 23:02:06 -0400 Subject: [PATCH 4/7] add test for claimSort() --- .../mark_available_tasks_as_claimed.test.ts | 128 ++++++++++++++++++ .../mark_available_tasks_as_claimed.ts | 32 +++-- 2 files changed, 151 insertions(+), 9 deletions(-) diff --git a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts index 103bc17004ef0..76df8b7ae5584 100644 --- a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts +++ b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts @@ -18,8 +18,11 @@ import { RecognizedTask, OneOfTaskTypes, tasksWithPartitions, + claimSort, } from './mark_available_tasks_as_claimed'; +import { TaskStatus, TaskPriority, ConcreteTaskInstance } from '../task'; + import { TaskTypeDictionary } from '../task_type_dictionary'; import { mockLogger } from '../test_utils'; @@ -304,4 +307,129 @@ if (doc['task.runAt'].size()!=0) { } `); }); + + // Tests sorting 3 tasks with different priorities, runAt/retryAt values + // running the sort over all permutations of them. + describe('claimSort', () => { + const definitions = new TaskTypeDictionary(mockLogger()); + definitions.registerTaskDefinitions({ + normalPriorityTask: { + title: 'normal priority', + createTaskRunner: () => ({ run: () => Promise.resolve() }), + priority: TaskPriority.Normal, // 50 + }, + noPriorityTask: { + title: 'no priority', + createTaskRunner: () => ({ run: () => Promise.resolve() }), + priority: undefined, // 50 + }, + lowPriorityTask: { + title: 'low priority', + createTaskRunner: () => ({ run: () => Promise.resolve() }), + priority: TaskPriority.Low, // 1 + }, + }); + + // possible ordering of tasks before sort + const permutations = [ + [0, 1, 2], + [0, 2, 1], + [1, 0, 2], + [1, 2, 0], + [2, 0, 1], + [2, 1, 0], + ]; + + test('works correctly with same dates, different priorities', () => { + const date = new Date(); + const baseTasks: ConcreteTaskInstance[] = []; + + // push in reverse order + baseTasks.push(buildTaskInstance({ taskType: 'lowPriorityTask', runAt: date })); + baseTasks.push(buildTaskInstance({ taskType: 'noPriorityTask', runAt: date })); + baseTasks.push(buildTaskInstance({ taskType: 'normalPriorityTask', runAt: date })); + + for (const perm of permutations) { + const tasks = [baseTasks[perm[0]], baseTasks[perm[1]], baseTasks[perm[2]]]; + const sorted = claimSort(definitions, tasks); + // all we know is low should be last + expect(sorted[2]).toBe(baseTasks[0]); + } + }); + + test('works correctly with same priorities, different dates', () => { + const baseDate = new Date('2024-07-29T00:00:00Z').valueOf(); + const baseTasks: ConcreteTaskInstance[] = []; + + // push in reverse order + baseTasks.push( + buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate + 1000) }) + ); + baseTasks.push(buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate) })); + baseTasks.push( + buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate - 1000) }) + ); + + for (const perm of permutations) { + const tasks = [baseTasks[perm[0]], baseTasks[perm[1]], baseTasks[perm[2]]]; + const sorted = claimSort(definitions, tasks); + expect(sorted[0]).toBe(baseTasks[2]); + expect(sorted[1]).toBe(baseTasks[1]); + expect(sorted[2]).toBe(baseTasks[0]); + } + }); + + test('works correctly with mixed of runAt and retryAt values', () => { + const baseDate = new Date('2024-07-29T00:00:00Z').valueOf(); + const baseTasks: ConcreteTaskInstance[] = []; + + // push in reverse order + baseTasks.push( + buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate + 1000) }) + ); + baseTasks.push( + buildTaskInstance({ + taskType: 'noPriorityTask', + runAt: new Date(baseDate - 2000), + retryAt: new Date(baseDate), // should use this value + }) + ); + baseTasks.push( + buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate - 1000) }) + ); + + for (const perm of permutations) { + const tasks = [baseTasks[perm[0]], baseTasks[perm[1]], baseTasks[perm[2]]]; + const sorted = claimSort(definitions, tasks); + expect(sorted[0]).toBe(baseTasks[2]); + expect(sorted[1]).toBe(baseTasks[1]); + expect(sorted[2]).toBe(baseTasks[0]); + } + }); + }); }); + +interface BuildTaskOpts { + taskType: string; + runAt: Date; + retryAt?: Date; +} + +let id = 1; + +function buildTaskInstance(opts: BuildTaskOpts): ConcreteTaskInstance { + const { taskType, runAt, retryAt } = opts; + return { + taskType, + id: `${id++}`, + runAt, + retryAt: retryAt || null, + scheduledAt: runAt, + attempts: 0, + status: TaskStatus.Idle, + startedAt: null, + state: {}, + params: {}, + ownerId: null, + }; +} diff --git a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts index d1e4898f6bd26..f6a91acf8bbc1 100644 --- a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts +++ b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts @@ -157,8 +157,9 @@ export function getClaimSort(definitions: TaskTypeDictionary): estypes.SortCombi return [sortByPriority, SortByRunAtAndRetryAt]; } -// claimSort() is used to sort tasks returned from a claimer -// should align with getClaimSort() above +// claimSort() is used to sort tasks returned from a claimer by priority and date. +// Kept here so it should align with getClaimSort() above. +// Returns a copy of the tasks passed in. export function claimSort( definitions: TaskTypeDictionary, tasks: ConcreteTaskInstance[] @@ -166,23 +167,36 @@ export function claimSort( const priorityMap: Record = {}; tasks.forEach((task) => { const taskType = task.taskType; - const priority = definitions.get(taskType)?.priority || TaskPriority.Normal; + const priority = getPriority(definitions, taskType); priorityMap[taskType] = priority; }); - return tasks.sort(compare); + return tasks.slice().sort(compare); function compare(a: ConcreteTaskInstance, b: ConcreteTaskInstance) { - const priorityA = priorityMap[a.taskType] || TaskPriority.Normal; - const priorityB = priorityMap[b.taskType] || TaskPriority.Normal; + // sort by priority, descending + const priorityA = priorityMap[a.taskType] ?? TaskPriority.Normal; + const priorityB = priorityMap[b.taskType] ?? TaskPriority.Normal; + if (priorityA > priorityB) return -1; if (priorityA < priorityB) return 1; - const runA = a.runAt?.valueOf() || a.retryAt?.valueOf() || 0; - const runB = b.runAt?.valueOf() || b.retryAt?.valueOf() || 0; + // then sort by retry/runAt, ascending + const runA = a.retryAt?.valueOf() ?? a.runAt.valueOf() ?? 0; + const runB = b.retryAt?.valueOf() ?? b.runAt.valueOf() ?? 0; + + if (runA < runB) return -1; + if (runA > runB) return 1; + + return 0; + } +} - return runA - runB; +function getPriority(definitions: TaskTypeDictionary, taskType: string): TaskPriority { + if (definitions.has(taskType)) { + return definitions.get(taskType).priority ?? TaskPriority.Normal; } + return TaskPriority.Normal; } export interface UpdateFieldsAndMarkAsFailedOpts { From 842d4023a447ed91b81627faee668b0bcb2a1e16 Mon Sep 17 00:00:00 2001 From: Patrick Mueller Date: Thu, 22 Aug 2024 16:55:06 -0400 Subject: [PATCH 5/7] changes from PR review --- .../server/task_claimers/strategy_mget.ts | 14 ++++++++++---- x-pack/plugins/task_manager/server/task_store.ts | 5 ++++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index e739aa479caf8..dfdf41b160541 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -362,7 +362,7 @@ async function searchAvailableTasks({ ); searches.push({ query: queryUnlimitedTasks, - sort, + sort, // note: we could optimize this to not sort on priority, for this case size, seq_no_primary_term: true, }); @@ -382,7 +382,11 @@ async function searchAvailableTasks({ RecognizedTask ); - const query = matchesClauses(queryForLimitedTasks, filterDownBy(InactiveTasks)); + const query = matchesClauses( + queryForLimitedTasks, + filterDownBy(InactiveTasks), + tasksWithPartitions(partitions) + ); searches.push({ query, sort, @@ -432,8 +436,10 @@ function buildClaimPartitions(opts: BuildClaimPartitionsOpts): ClaimPartitions { continue; } - const capacity = getCapacity(definition.type); - result.limitedTypes.set(definition.type, capacity); + const capacity = getCapacity(definition.type) / definition.cost; + if (capacity !== 0) { + result.limitedTypes.set(definition.type, capacity); + } } return result; diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index acbf22feff115..9d661b288d5ea 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -505,6 +505,7 @@ export class TaskStore { } } + // like search(), only runs multiple searches in parallel returning the combined results async msearch(opts: SearchOpts[] = []): Promise { const queries = opts.map((opt) => ensureQueryOnlyReturnsTaskObjects(opt)); const body = queries.flatMap((query) => [{}, query]); @@ -521,7 +522,9 @@ export class TaskStore { for (const response of responses) { if (response.status !== 200) { - throw new Error(`Unexpected status code: ${response.status}`); + const err = new Error(`Unexpected status code from taskStore::msearch: ${response.status}`); + this.errors$.next(err); + throw err; } const { hits } = response as estypes.MsearchMultiSearchItem; From 4b357c1abb8556b91c3b12635136235deab0a7e1 Mon Sep 17 00:00:00 2001 From: Mike Cote Date: Fri, 23 Aug 2024 07:45:05 -0400 Subject: [PATCH 6/7] Copy search jest tests --- .../task_manager/server/task_store.test.ts | 135 ++++++++++++++++++ .../plugins/task_manager/server/task_store.ts | 4 +- 2 files changed, 138 insertions(+), 1 deletion(-) diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index 9bc1a64140647..6417fef233cc7 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -360,6 +360,141 @@ describe('TaskStore', () => { }); }); + describe('msearch', () => { + let store: TaskStore; + let esClient: ReturnType['asInternalUser']; + let childEsClient: ReturnType< + typeof elasticsearchServiceMock.createClusterClient + >['asInternalUser']; + + beforeAll(() => { + esClient = elasticsearchServiceMock.createClusterClient().asInternalUser; + childEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser; + esClient.child.mockReturnValue(childEsClient as unknown as Client); + store = new TaskStore({ + logger: mockLogger(), + index: 'tasky', + taskManagerId: '', + serializer, + esClient, + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, + adHocTaskCounter, + allowReadingInvalidState: false, + requestTimeouts: { + update_by_query: 1000, + }, + }); + }); + + async function testMsearch( + optsArray: SearchOpts[], + hitsArray: Array> = [] + ) { + childEsClient.msearch.mockResponse({ + took: 0, + responses: hitsArray.map((hits) => ({ + hits, + took: 0, + _shards: { + failed: 0, + successful: 1, + total: 1, + }, + timed_out: false, + status: 200, + })), + }); + + const result = await store.msearch(optsArray); + + expect(childEsClient.msearch).toHaveBeenCalledTimes(1); + + return { + result, + args: childEsClient.msearch.mock.calls[0][0], + }; + } + + test('empty call filters by type, sorts by runAt and id', async () => { + const { args } = await testMsearch([{}], []); + expect(args).toMatchObject({ + index: 'tasky', + body: [ + {}, + { + sort: [{ 'task.runAt': 'asc' }], + query: { term: { type: 'task' } }, + }, + ], + }); + }); + + test('allows multiple custom queries', async () => { + const { args } = await testMsearch( + [ + { + query: { + term: { 'task.taskType': 'foo' }, + }, + }, + { + query: { + term: { 'task.taskType': 'bar' }, + }, + }, + ], + [] + ); + + expect(args).toMatchObject({ + body: [ + {}, + { + query: { + bool: { + must: [{ term: { type: 'task' } }, { term: { 'task.taskType': 'foo' } }], + }, + }, + }, + {}, + { + query: { + bool: { + must: [{ term: { type: 'task' } }, { term: { 'task.taskType': 'bar' } }], + }, + }, + }, + ], + }); + }); + + test('pushes error from call cluster to errors$', async () => { + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + childEsClient.msearch.mockResponse({ + took: 0, + responses: [ + { + took: 0, + _shards: { + failed: 0, + successful: 1, + total: 1, + }, + timed_out: false, + status: 429, + }, + ], + } as estypes.MsearchResponse); + await expect(store.msearch([{}])).rejects.toThrowErrorMatchingInlineSnapshot( + `"Unexpected status code from taskStore::msearch: 429"` + ); + expect(await firstErrorPromise).toMatchInlineSnapshot( + `[Error: Unexpected status code from taskStore::msearch: 429]` + ); + }); + }); + describe('aggregate', () => { let store: TaskStore; let esClient: ReturnType['asInternalUser']; diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 9d661b288d5ea..12a1f256c585b 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -507,7 +507,9 @@ export class TaskStore { // like search(), only runs multiple searches in parallel returning the combined results async msearch(opts: SearchOpts[] = []): Promise { - const queries = opts.map((opt) => ensureQueryOnlyReturnsTaskObjects(opt)); + const queries = opts.map(({ sort = [{ 'task.runAt': 'asc' }], ...opt }) => + ensureQueryOnlyReturnsTaskObjects({ sort, ...opt }) + ); const body = queries.flatMap((query) => [{}, query]); const result = await this.esClientWithoutRetries.msearch({ From 48806ca765520d2ed7edd07eeb6f7353848fc0fc Mon Sep 17 00:00:00 2001 From: Mike Cote Date: Mon, 26 Aug 2024 13:11:25 -0400 Subject: [PATCH 7/7] Add size multiplier for limited concurrency tasks --- .../plugins/task_manager/server/task_claimers/strategy_mget.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index dfdf41b160541..dce4bf66e57db 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -390,7 +390,7 @@ async function searchAvailableTasks({ searches.push({ query, sort, - size: capacity, + size: capacity * SIZE_MULTIPLIER_FOR_TASK_FETCH, seq_no_primary_term: true, }); }