Skip to content

Commit

Permalink
[Response Ops][Task Manager] change task claiming interface to stop u…
Browse files Browse the repository at this point in the history
…sing observables (elastic#196196)

Resolves elastic#184952

## Summary

Changing task claimers to return promises instead of observables. This
is a code refactor and should not have any effect on task claiming
functionality.

---------

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
2 people authored and tiansivive committed Oct 29, 2024
1 parent 07094c6 commit ffb7067
Show file tree
Hide file tree
Showing 10 changed files with 515 additions and 539 deletions.
93 changes: 38 additions & 55 deletions x-pack/plugins/task_manager/server/lib/fill_pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,68 +12,58 @@ import { TaskPoolRunResult } from '../task_pool';
import { asOk, Result } from './result_type';
import { ConcreteTaskInstance, TaskStatus } from '../task';
import { TaskManagerRunner } from '../task_running/task_runner';
import { from, Observable } from 'rxjs';
import { ClaimOwnershipResult } from '../queries/task_claiming';

jest.mock('../task_running/task_runner');

describe('fillPool', () => {
function mockFetchAvailableTasks(
tasksToMock: number[][]
): () => Observable<Result<ClaimOwnershipResult, FillPoolResult>> {
const claimCycles: ConcreteTaskInstance[][] = tasksToMock.map((ids) => mockTaskInstances(ids));
tasksToMock: number[]
): () => Promise<Result<ClaimOwnershipResult, FillPoolResult>> {
const tasks: ConcreteTaskInstance[] = tasksToMock.map((id) => mockTaskInstance(id));
return () =>
from(
claimCycles.map((tasks) =>
asOk({
stats: {
tasksUpdated: tasks?.length ?? 0,
tasksConflicted: 0,
tasksClaimed: 0,
},
docs: tasks,
})
)
Promise.resolve(
asOk({
stats: {
tasksUpdated: tasks?.length ?? 0,
tasksConflicted: 0,
tasksClaimed: 0,
},
docs: tasks,
})
);
}

const mockTaskInstances = (ids: number[]): ConcreteTaskInstance[] =>
ids.map((id) => ({
id: `${id}`,
attempts: 0,
status: TaskStatus.Running,
version: '123',
runAt: new Date(0),
scheduledAt: new Date(0),
const mockTaskInstance = (id: number): ConcreteTaskInstance => ({
id: `${id}`,
attempts: 0,
status: TaskStatus.Running,
version: '123',
runAt: new Date(0),
scheduledAt: new Date(0),
startedAt: new Date(0),
retryAt: new Date(0),
state: {
startedAt: new Date(0),
retryAt: new Date(0),
state: {
startedAt: new Date(0),
},
taskType: '',
params: {},
ownerId: null,
}));

test('fills task pool with all claimed tasks until fetchAvailableTasks stream closes', async () => {
const tasks = [
[1, 2, 3],
[4, 5],
];
},
taskType: '',
params: {},
ownerId: null,
});

test('fills task pool with all claimed tasks', async () => {
const tasks = [1, 2, 3, 4, 5];
const fetchAvailableTasks = mockFetchAvailableTasks(tasks);
const run = sinon.spy(async () => TaskPoolRunResult.RunningAllClaimedTasks);
const converter = _.identity;

await fillPool(fetchAvailableTasks, converter, run);

expect(_.flattenDeep(run.args)).toEqual(mockTaskInstances([1, 2, 3, 4, 5]));
expect(_.flattenDeep(run.args)).toEqual(tasks.map((id) => mockTaskInstance(id)));
});

test('calls the converter on the records prior to running', async () => {
const tasks = [
[1, 2, 3],
[4, 5],
];
const tasks = [1, 2, 3, 4, 5];
const fetchAvailableTasks = mockFetchAvailableTasks(tasks);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = (instance: ConcreteTaskInstance) =>
Expand All @@ -91,14 +81,13 @@ describe('fillPool', () => {
instance.id as unknown as TaskManagerRunner;

try {
const fetchAvailableTasks = () =>
new Observable<Result<ClaimOwnershipResult, FillPoolResult>>((obs) =>
obs.error('fetch is not working')
);
const fetchAvailableTasks = () => {
throw new Error('fetch is not working');
};

await fillPool(fetchAvailableTasks, converter, run);
} catch (err) {
expect(err.toString()).toBe('fetch is not working');
expect(err.toString()).toBe('Error: fetch is not working');
expect(run.called).toBe(false);
}
});
Expand All @@ -109,10 +98,7 @@ describe('fillPool', () => {
instance.id as unknown as TaskManagerRunner;

try {
const tasks = [
[1, 2, 3],
[4, 5],
];
const tasks = [1, 2, 3, 4, 5];
const fetchAvailableTasks = mockFetchAvailableTasks(tasks);

await fillPool(fetchAvailableTasks, converter, run);
Expand All @@ -123,10 +109,7 @@ describe('fillPool', () => {

test('throws exception from converter', async () => {
try {
const tasks = [
[1, 2, 3],
[4, 5],
];
const tasks = [1, 2, 3, 4, 5];
const fetchAvailableTasks = mockFetchAvailableTasks(tasks);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = (instance: ConcreteTaskInstance) => {
Expand Down
109 changes: 37 additions & 72 deletions x-pack/plugins/task_manager/server/lib/fill_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
* 2.0.
*/

import { Observable } from 'rxjs';
import { concatMap, last } from 'rxjs';
import { ClaimOwnershipResult } from '../queries/task_claiming';
import { ConcreteTaskInstance } from '../task';
import { WithTaskTiming, startTaskTimer } from '../task_events';
import { TaskPoolRunResult } from '../task_pool';
import { TaskManagerRunner } from '../task_running';
import { Result, map as mapResult, asErr, asOk } from './result_type';
import { Result, isOk } from './result_type';

export enum FillPoolResult {
Failed = 'Failed',
Expand All @@ -23,17 +21,6 @@ export enum FillPoolResult {
PoolFilled = 'PoolFilled',
}

type FillPoolAndRunResult = Result<
{
result: TaskPoolRunResult;
stats?: ClaimOwnershipResult['stats'];
},
{
result: FillPoolResult;
stats?: ClaimOwnershipResult['stats'];
}
>;

export type ClaimAndFillPoolResult = Partial<Pick<ClaimOwnershipResult, 'stats'>> & {
result: FillPoolResult;
};
Expand All @@ -52,66 +39,44 @@ export type TimedFillPoolResult = WithTaskTiming<ClaimAndFillPoolResult>;
* @param converter - a function that converts task records to the appropriate task runner
*/
export async function fillPool(
fetchAvailableTasks: () => Observable<Result<ClaimOwnershipResult, FillPoolResult>>,
fetchAvailableTasks: () => Promise<Result<ClaimOwnershipResult, FillPoolResult>>,
converter: (taskInstance: ConcreteTaskInstance) => TaskManagerRunner,
run: (tasks: TaskManagerRunner[]) => Promise<TaskPoolRunResult>
): Promise<TimedFillPoolResult> {
return new Promise((resolve, reject) => {
const stopTaskTimer = startTaskTimer();
const augmentTimingTo = (
result: FillPoolResult,
stats?: ClaimOwnershipResult['stats']
): TimedFillPoolResult => ({
result,
stats,
timing: stopTaskTimer(),
});
fetchAvailableTasks()
.pipe(
// each ClaimOwnershipResult will be sequencially consumed an ran using the `run` handler
concatMap(async (res) =>
mapResult<ClaimOwnershipResult, FillPoolResult, Promise<FillPoolAndRunResult>>(
res,
async ({ docs, stats }) => {
if (!docs.length) {
return asOk({ result: TaskPoolRunResult.NoTaskWereRan, stats });
}
return asOk(
await run(docs.map(converter)).then((runResult) => ({
result: runResult,
stats,
}))
);
},
async (fillPoolResult) => asErr({ result: fillPoolResult })
)
),
// when the final call to `run` completes, we'll complete the stream and emit the
// final accumulated result
last()
)
.subscribe(
(claimResults) => {
resolve(
mapResult(
claimResults,
({ result, stats }) => {
switch (result) {
case TaskPoolRunResult.RanOutOfCapacity:
return augmentTimingTo(FillPoolResult.RanOutOfCapacity, stats);
case TaskPoolRunResult.RunningAtCapacity:
return augmentTimingTo(FillPoolResult.RunningAtCapacity, stats);
case TaskPoolRunResult.NoTaskWereRan:
return augmentTimingTo(FillPoolResult.NoTasksClaimed, stats);
default:
return augmentTimingTo(FillPoolResult.PoolFilled, stats);
}
},
({ result, stats }) => augmentTimingTo(result, stats)
)
);
},
(err) => reject(err)
);
const stopTaskTimer = startTaskTimer();
const augmentTimingTo = (
result: FillPoolResult,
stats?: ClaimOwnershipResult['stats']
): TimedFillPoolResult => ({
result,
stats,
timing: stopTaskTimer(),
});

const claimResults = await fetchAvailableTasks();
if (isOk(claimResults)) {
if (!claimResults.value.docs.length) {
return augmentTimingTo(FillPoolResult.NoTasksClaimed, claimResults.value.stats);
}

const taskPoolRunResult = await run(claimResults.value.docs.map(converter)).then(
(runResult) => ({
result: runResult,
stats: claimResults.value.stats,
})
);

switch (taskPoolRunResult.result) {
case TaskPoolRunResult.RanOutOfCapacity:
return augmentTimingTo(FillPoolResult.RanOutOfCapacity, taskPoolRunResult.stats);
case TaskPoolRunResult.RunningAtCapacity:
return augmentTimingTo(FillPoolResult.RunningAtCapacity, taskPoolRunResult.stats);
case TaskPoolRunResult.NoTaskWereRan:
return augmentTimingTo(FillPoolResult.NoTasksClaimed, taskPoolRunResult.stats);
default:
return augmentTimingTo(FillPoolResult.PoolFilled, taskPoolRunResult.stats);
}
}

return augmentTimingTo(claimResults.error);
}
Loading

0 comments on commit ffb7067

Please sign in to comment.