diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index 07d18a39a1dbc..f67139a39c122 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -47,6 +47,7 @@ interface OwnershipClaimingOpts { taskTypes: Set; removedTypes: Set; excludedTypes: Set; + getCapacity: (taskType?: string | undefined) => number; taskStore: TaskStore; events$: Subject; definitions: TaskTypeDictionary; @@ -109,6 +110,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise { diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index b922d10ee5cf1..b859920c29722 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -484,6 +484,34 @@ export class TaskStore { } } + async msearch(opts: SearchOpts[] = []): Promise { + const queries = opts.map((opt) => ensureQueryOnlyReturnsTaskObjects(opt)); + const body = queries.flatMap((query) => [{}, query]); + + const result = await this.esClientWithoutRetries.msearch({ + index: this.index, + ignore_unavailable: true, + body, + }); + const { responses } = result; + + const versionMap = this.createVersionMap([]); + let allTasks = new Array(); + + for (const response of responses) { + if (response.status !== 200) { + throw new Error(`Unexpected status code: ${response.status}`); + } + + const { hits } = response as estypes.MsearchMultiSearchItem; + const { hits: tasks } = hits; + this.addTasksToVersionMap(versionMap, tasks); + allTasks = allTasks.concat(this.filterTasks(tasks)); + } + + return { docs: allTasks, versionMap }; + } + private async search(opts: SearchOpts = {}): Promise { const { query } = ensureQueryOnlyReturnsTaskObjects(opts); @@ -500,27 +528,9 @@ export class TaskStore { hits: { hits: tasks }, } = result; - const versionMap = new Map(); - for (const task of tasks) { - if (task._seq_no == null || task._primary_term == null) continue; - - const esId = task._id!.startsWith('task:') ? task._id!.slice(5) : task._id!; - versionMap.set(esId, { - esId: task._id!, - seqNo: task._seq_no, - primaryTerm: task._primary_term, - }); - } - + const versionMap = this.createVersionMap(tasks); return { - docs: tasks - // @ts-expect-error @elastic/elasticsearch _source is optional - .filter((doc) => this.serializer.isRawSavedObject(doc)) - // @ts-expect-error @elastic/elasticsearch _source is optional - .map((doc) => this.serializer.rawToSavedObject(doc)) - .map((doc) => omit(doc, 'namespace') as SavedObject) - .map((doc) => savedObjectToConcreteTaskInstance(doc)) - .filter((doc): doc is ConcreteTaskInstance => !!doc), + docs: this.filterTasks(tasks), versionMap, }; } catch (e) { @@ -529,6 +539,45 @@ export class TaskStore { } } + private filterTasks( + tasks: Array> + ): ConcreteTaskInstance[] { + return ( + tasks + // @ts-expect-error @elastic/elasticsearch _source is optional + .filter((doc) => this.serializer.isRawSavedObject(doc)) + // @ts-expect-error @elastic/elasticsearch _source is optional + .map((doc) => this.serializer.rawToSavedObject(doc)) + .map((doc) => omit(doc, 'namespace') as SavedObject) + .map((doc) => savedObjectToConcreteTaskInstance(doc)) + .filter((doc): doc is ConcreteTaskInstance => !!doc) + ); + } + + private addTasksToVersionMap( + versionMap: Map, + tasks: Array> + ): void { + for (const task of tasks) { + if (task._seq_no == null || task._primary_term == null) continue; + + const esId = task._id.startsWith('task:') ? task._id.slice(5) : task._id; + versionMap.set(esId, { + esId: task._id, + seqNo: task._seq_no, + primaryTerm: task._primary_term, + }); + } + } + + private createVersionMap( + tasks: Array> + ): Map { + const versionMap = new Map(); + this.addTasksToVersionMap(versionMap, tasks); + return versionMap; + } + public async aggregate({ aggs, query,