Skip to content

Commit

Permalink
some starting changes
Browse files Browse the repository at this point in the history
  • Loading branch information
pmuellr committed Jul 8, 2024
1 parent 94b9a48 commit 2b0c750
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ interface OwnershipClaimingOpts {
taskTypes: Set<string>;
removedTypes: Set<string>;
excludedTypes: Set<string>;
getCapacity: (taskType?: string | undefined) => number;
taskStore: TaskStore;
events$: Subject<TaskClaim>;
definitions: TaskTypeDictionary;
Expand Down Expand Up @@ -109,6 +110,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
taskStore,
events$,
claimOwnershipUntil,
getCapacity,
size: initialCapacity * SIZE_MULTIPLIER_FOR_TASK_FETCH,
taskMaxAttempts,
});
Expand Down Expand Up @@ -266,6 +268,7 @@ async function searchAvailableTasks({
removedTypes,
excludedTypes,
taskStore,
getCapacity,
size,
taskMaxAttempts,
}: OwnershipClaimingOpts): Promise<SearchAvailableTasksResponse> {
Expand Down
89 changes: 69 additions & 20 deletions x-pack/plugins/task_manager/server/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,34 @@ export class TaskStore {
}
}

async msearch(opts: SearchOpts[] = []): Promise<FetchResult> {
const queries = opts.map((opt) => ensureQueryOnlyReturnsTaskObjects(opt));
const body = queries.flatMap((query) => [{}, query]);

const result = await this.esClientWithoutRetries.msearch<SavedObjectsRawDoc['_source']>({
index: this.index,
ignore_unavailable: true,
body,
});
const { responses } = result;

const versionMap = this.createVersionMap([]);
let allTasks = new Array<ConcreteTaskInstance>();

for (const response of responses) {
if (response.status !== 200) {
throw new Error(`Unexpected status code: ${response.status}`);
}

const { hits } = response as estypes.MsearchMultiSearchItem<SavedObjectsRawDoc['_source']>;
const { hits: tasks } = hits;
this.addTasksToVersionMap(versionMap, tasks);
allTasks = allTasks.concat(this.filterTasks(tasks));
}

return { docs: allTasks, versionMap };
}

private async search(opts: SearchOpts = {}): Promise<FetchResult> {
const { query } = ensureQueryOnlyReturnsTaskObjects(opts);

Expand All @@ -500,27 +528,9 @@ export class TaskStore {
hits: { hits: tasks },
} = result;

const versionMap = new Map<string, ConcreteTaskInstanceVersion>();
for (const task of tasks) {
if (task._seq_no == null || task._primary_term == null) continue;

const esId = task._id!.startsWith('task:') ? task._id!.slice(5) : task._id!;
versionMap.set(esId, {
esId: task._id!,
seqNo: task._seq_no,
primaryTerm: task._primary_term,
});
}

const versionMap = this.createVersionMap(tasks);
return {
docs: tasks
// @ts-expect-error @elastic/elasticsearch _source is optional
.filter((doc) => this.serializer.isRawSavedObject(doc))
// @ts-expect-error @elastic/elasticsearch _source is optional
.map((doc) => this.serializer.rawToSavedObject(doc))
.map((doc) => omit(doc, 'namespace') as SavedObject<SerializedConcreteTaskInstance>)
.map((doc) => savedObjectToConcreteTaskInstance(doc))
.filter((doc): doc is ConcreteTaskInstance => !!doc),
docs: this.filterTasks(tasks),
versionMap,
};
} catch (e) {
Expand All @@ -529,6 +539,45 @@ export class TaskStore {
}
}

private filterTasks(
tasks: Array<estypes.SearchHit<SavedObjectsRawDoc['_source']>>
): ConcreteTaskInstance[] {
return (
tasks
// @ts-expect-error @elastic/elasticsearch _source is optional
.filter((doc) => this.serializer.isRawSavedObject(doc))
// @ts-expect-error @elastic/elasticsearch _source is optional
.map((doc) => this.serializer.rawToSavedObject(doc))
.map((doc) => omit(doc, 'namespace') as SavedObject<SerializedConcreteTaskInstance>)
.map((doc) => savedObjectToConcreteTaskInstance(doc))
.filter((doc): doc is ConcreteTaskInstance => !!doc)
);
}

private addTasksToVersionMap(
versionMap: Map<string, ConcreteTaskInstanceVersion>,
tasks: Array<estypes.SearchHit<SavedObjectsRawDoc['_source']>>
): void {
for (const task of tasks) {
if (task._seq_no == null || task._primary_term == null) continue;

const esId = task._id.startsWith('task:') ? task._id.slice(5) : task._id;
versionMap.set(esId, {
esId: task._id,
seqNo: task._seq_no,
primaryTerm: task._primary_term,
});
}
}

private createVersionMap(
tasks: Array<estypes.SearchHit<SavedObjectsRawDoc['_source']>>
): Map<string, ConcreteTaskInstanceVersion> {
const versionMap = new Map<string, ConcreteTaskInstanceVersion>();
this.addTasksToVersionMap(versionMap, tasks);
return versionMap;
}

public async aggregate<TSearchRequest extends AggregationOpts>({
aggs,
query,
Expand Down

0 comments on commit 2b0c750

Please sign in to comment.