Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Response Ops][Task Manager] change task claiming interface to stop using observables #196196

Merged
merged 7 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 38 additions & 55 deletions x-pack/plugins/task_manager/server/lib/fill_pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,68 +12,58 @@ import { TaskPoolRunResult } from '../task_pool';
import { asOk, Result } from './result_type';
import { ConcreteTaskInstance, TaskStatus } from '../task';
import { TaskManagerRunner } from '../task_running/task_runner';
import { from, Observable } from 'rxjs';
import { ClaimOwnershipResult } from '../queries/task_claiming';

jest.mock('../task_running/task_runner');

describe('fillPool', () => {
function mockFetchAvailableTasks(
tasksToMock: number[][]
): () => Observable<Result<ClaimOwnershipResult, FillPoolResult>> {
const claimCycles: ConcreteTaskInstance[][] = tasksToMock.map((ids) => mockTaskInstances(ids));
tasksToMock: number[]
): () => Promise<Result<ClaimOwnershipResult, FillPoolResult>> {
const tasks: ConcreteTaskInstance[] = tasksToMock.map((id) => mockTaskInstances(id));
return () =>
from(
claimCycles.map((tasks) =>
asOk({
stats: {
tasksUpdated: tasks?.length ?? 0,
tasksConflicted: 0,
tasksClaimed: 0,
},
docs: tasks,
})
)
Promise.resolve(
asOk({
stats: {
tasksUpdated: tasks?.length ?? 0,
tasksConflicted: 0,
tasksClaimed: 0,
},
docs: tasks,
})
);
}

const mockTaskInstances = (ids: number[]): ConcreteTaskInstance[] =>
ids.map((id) => ({
id: `${id}`,
attempts: 0,
status: TaskStatus.Running,
version: '123',
runAt: new Date(0),
scheduledAt: new Date(0),
const mockTaskInstances = (id: number): ConcreteTaskInstance => ({
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const mockTaskInstances = (id: number): ConcreteTaskInstance => ({
const mockTaskInstance = (id: number): ConcreteTaskInstance => ({

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 5da0cf8

id: `${id}`,
attempts: 0,
status: TaskStatus.Running,
version: '123',
runAt: new Date(0),
scheduledAt: new Date(0),
startedAt: new Date(0),
retryAt: new Date(0),
state: {
startedAt: new Date(0),
retryAt: new Date(0),
state: {
startedAt: new Date(0),
},
taskType: '',
params: {},
ownerId: null,
}));

test('fills task pool with all claimed tasks until fetchAvailableTasks stream closes', async () => {
const tasks = [
[1, 2, 3],
[4, 5],
];
},
taskType: '',
params: {},
ownerId: null,
});

test('fills task pool with all claimed tasks', async () => {
const tasks = [1, 2, 3, 4, 5];
const fetchAvailableTasks = mockFetchAvailableTasks(tasks);
const run = sinon.spy(async () => TaskPoolRunResult.RunningAllClaimedTasks);
const converter = _.identity;

await fillPool(fetchAvailableTasks, converter, run);

expect(_.flattenDeep(run.args)).toEqual(mockTaskInstances([1, 2, 3, 4, 5]));
expect(_.flattenDeep(run.args)).toEqual(tasks.map((id) => mockTaskInstances(id)));
});

test('calls the converter on the records prior to running', async () => {
const tasks = [
[1, 2, 3],
[4, 5],
];
const tasks = [1, 2, 3, 4, 5];
const fetchAvailableTasks = mockFetchAvailableTasks(tasks);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = (instance: ConcreteTaskInstance) =>
Expand All @@ -91,14 +81,13 @@ describe('fillPool', () => {
instance.id as unknown as TaskManagerRunner;

try {
const fetchAvailableTasks = () =>
new Observable<Result<ClaimOwnershipResult, FillPoolResult>>((obs) =>
obs.error('fetch is not working')
);
const fetchAvailableTasks = () => {
throw new Error('fetch is not working');
};

await fillPool(fetchAvailableTasks, converter, run);
} catch (err) {
expect(err.toString()).toBe('fetch is not working');
expect(err.toString()).toBe('Error: fetch is not working');
expect(run.called).toBe(false);
}
});
Expand All @@ -109,10 +98,7 @@ describe('fillPool', () => {
instance.id as unknown as TaskManagerRunner;

try {
const tasks = [
[1, 2, 3],
[4, 5],
];
const tasks = [1, 2, 3, 4, 5];
const fetchAvailableTasks = mockFetchAvailableTasks(tasks);

await fillPool(fetchAvailableTasks, converter, run);
Expand All @@ -123,10 +109,7 @@ describe('fillPool', () => {

test('throws exception from converter', async () => {
try {
const tasks = [
[1, 2, 3],
[4, 5],
];
const tasks = [1, 2, 3, 4, 5];
const fetchAvailableTasks = mockFetchAvailableTasks(tasks);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = (instance: ConcreteTaskInstance) => {
Expand Down
109 changes: 37 additions & 72 deletions x-pack/plugins/task_manager/server/lib/fill_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
* 2.0.
*/

import { Observable } from 'rxjs';
import { concatMap, last } from 'rxjs';
import { ClaimOwnershipResult } from '../queries/task_claiming';
import { ConcreteTaskInstance } from '../task';
import { WithTaskTiming, startTaskTimer } from '../task_events';
import { TaskPoolRunResult } from '../task_pool';
import { TaskManagerRunner } from '../task_running';
import { Result, map as mapResult, asErr, asOk } from './result_type';
import { Result, isOk } from './result_type';

export enum FillPoolResult {
Failed = 'Failed',
Expand All @@ -23,17 +21,6 @@ export enum FillPoolResult {
PoolFilled = 'PoolFilled',
}

type FillPoolAndRunResult = Result<
{
result: TaskPoolRunResult;
stats?: ClaimOwnershipResult['stats'];
},
{
result: FillPoolResult;
stats?: ClaimOwnershipResult['stats'];
}
>;

export type ClaimAndFillPoolResult = Partial<Pick<ClaimOwnershipResult, 'stats'>> & {
result: FillPoolResult;
};
Expand All @@ -52,66 +39,44 @@ export type TimedFillPoolResult = WithTaskTiming<ClaimAndFillPoolResult>;
* @param converter - a function that converts task records to the appropriate task runner
*/
export async function fillPool(
fetchAvailableTasks: () => Observable<Result<ClaimOwnershipResult, FillPoolResult>>,
fetchAvailableTasks: () => Promise<Result<ClaimOwnershipResult, FillPoolResult>>,
converter: (taskInstance: ConcreteTaskInstance) => TaskManagerRunner,
run: (tasks: TaskManagerRunner[]) => Promise<TaskPoolRunResult>
): Promise<TimedFillPoolResult> {
return new Promise((resolve, reject) => {
const stopTaskTimer = startTaskTimer();
const augmentTimingTo = (
result: FillPoolResult,
stats?: ClaimOwnershipResult['stats']
): TimedFillPoolResult => ({
result,
stats,
timing: stopTaskTimer(),
});
fetchAvailableTasks()
.pipe(
// each ClaimOwnershipResult will be sequencially consumed an ran using the `run` handler
concatMap(async (res) =>
mapResult<ClaimOwnershipResult, FillPoolResult, Promise<FillPoolAndRunResult>>(
res,
async ({ docs, stats }) => {
if (!docs.length) {
return asOk({ result: TaskPoolRunResult.NoTaskWereRan, stats });
}
return asOk(
await run(docs.map(converter)).then((runResult) => ({
result: runResult,
stats,
}))
);
},
async (fillPoolResult) => asErr({ result: fillPoolResult })
)
),
// when the final call to `run` completes, we'll complete the stream and emit the
// final accumulated result
last()
)
.subscribe(
(claimResults) => {
resolve(
mapResult(
claimResults,
({ result, stats }) => {
switch (result) {
case TaskPoolRunResult.RanOutOfCapacity:
return augmentTimingTo(FillPoolResult.RanOutOfCapacity, stats);
case TaskPoolRunResult.RunningAtCapacity:
return augmentTimingTo(FillPoolResult.RunningAtCapacity, stats);
case TaskPoolRunResult.NoTaskWereRan:
return augmentTimingTo(FillPoolResult.NoTasksClaimed, stats);
default:
return augmentTimingTo(FillPoolResult.PoolFilled, stats);
}
},
({ result, stats }) => augmentTimingTo(result, stats)
)
);
},
(err) => reject(err)
);
const stopTaskTimer = startTaskTimer();
const augmentTimingTo = (
result: FillPoolResult,
stats?: ClaimOwnershipResult['stats']
): TimedFillPoolResult => ({
result,
stats,
timing: stopTaskTimer(),
});

const claimResults = await fetchAvailableTasks();
if (isOk(claimResults)) {
if (!claimResults.value.docs.length) {
return augmentTimingTo(FillPoolResult.NoTasksClaimed, claimResults.value.stats);
}

const taskPoolRunResult = await run(claimResults.value.docs.map(converter)).then(
(runResult) => ({
result: runResult,
stats: claimResults.value.stats,
})
);

switch (taskPoolRunResult.result) {
case TaskPoolRunResult.RanOutOfCapacity:
return augmentTimingTo(FillPoolResult.RanOutOfCapacity, taskPoolRunResult.stats);
case TaskPoolRunResult.RunningAtCapacity:
return augmentTimingTo(FillPoolResult.RunningAtCapacity, taskPoolRunResult.stats);
case TaskPoolRunResult.NoTaskWereRan:
return augmentTimingTo(FillPoolResult.NoTasksClaimed, taskPoolRunResult.stats);
default:
return augmentTimingTo(FillPoolResult.PoolFilled, taskPoolRunResult.stats);
}
}

return augmentTimingTo(claimResults.error);
}
Loading