Skip to content

Commit

Permalink
[Response Ops][Task Manager] Adding jest integration test to test cap…
Browse files Browse the repository at this point in the history
…acity based claiming (#189431)

Resolves #189111

## Summary

Adds jest integration test to test cost capacity based claiming with the
`mget` claim strategy. Using this integration test, we can exclude
running other tasks other than our test types. We register a normal cost
task and an XL cost task. We test both that we can claim tasks up to
100% capacity and that we will stop claiming tasks if the next task puts
us over capacity, even if that means we're leaving capacity on the
table.

---------

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
ymao1 and elasticmachine authored Aug 22, 2024
1 parent f63bd03 commit 9653d7e
Show file tree
Hide file tree
Showing 5 changed files with 367 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,327 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { v4 as uuidV4 } from 'uuid';
import type { TestElasticsearchUtils, TestKibanaUtils } from '@kbn/core-test-helpers-kbn-server';
import { schema } from '@kbn/config-schema';
import { times } from 'lodash';
import { TaskCost, TaskStatus } from '../task';
import type { TaskClaimingOpts } from '../queries/task_claiming';
import { TaskManagerPlugin, type TaskManagerStartContract } from '../plugin';
import { injectTask, setupTestServers, retry } from './lib';
import { CreateMonitoringStatsOpts } from '../monitoring';
import { filter, map } from 'rxjs';
import { isTaskManagerWorkerUtilizationStatEvent } from '../task_events';
import { TaskLifecycleEvent } from '../polling_lifecycle';
import { Ok } from '../lib/result_type';

const POLLING_INTERVAL = 5000;
const { TaskPollingLifecycle: TaskPollingLifecycleMock } = jest.requireMock('../polling_lifecycle');
jest.mock('../polling_lifecycle', () => {
const actual = jest.requireActual('../polling_lifecycle');
return {
...actual,
TaskPollingLifecycle: jest.fn().mockImplementation((opts) => {
return new actual.TaskPollingLifecycle(opts);
}),
};
});

const { createMonitoringStats: createMonitoringStatsMock } = jest.requireMock('../monitoring');
jest.mock('../monitoring', () => {
const actual = jest.requireActual('../monitoring');
return {
...actual,
createMonitoringStats: jest.fn().mockImplementation((opts) => {
return new actual.createMonitoringStats(opts);
}),
};
});

const mockTaskTypeNormalCostRunFn = jest.fn();
const mockCreateTaskRunnerNormalCost = jest.fn();
const mockTaskTypeNormalCost = {
title: 'Normal cost task',
description: '',
cost: TaskCost.Normal,
stateSchemaByVersion: {
1: {
up: (state: Record<string, unknown>) => ({ foo: state.foo || '' }),
schema: schema.object({
foo: schema.string(),
}),
},
},
createTaskRunner: mockCreateTaskRunnerNormalCost.mockImplementation(() => ({
run: mockTaskTypeNormalCostRunFn,
})),
};
const mockTaskTypeXLCostRunFn = jest.fn();
const mockCreateTaskRunnerXLCost = jest.fn();
const mockTaskTypeXLCost = {
title: 'XL cost task',
description: '',
cost: TaskCost.ExtraLarge,
stateSchemaByVersion: {
1: {
up: (state: Record<string, unknown>) => ({ foo: state.foo || '' }),
schema: schema.object({
foo: schema.string(),
}),
},
},
createTaskRunner: mockCreateTaskRunnerXLCost.mockImplementation(() => ({
run: mockTaskTypeXLCostRunFn,
})),
};
jest.mock('../queries/task_claiming', () => {
const actual = jest.requireActual('../queries/task_claiming');
return {
...actual,
TaskClaiming: jest.fn().mockImplementation((opts: TaskClaimingOpts) => {
opts.definitions.registerTaskDefinitions({
_normalCostType: mockTaskTypeNormalCost,
_xlCostType: mockTaskTypeXLCost,
});
return new actual.TaskClaiming(opts);
}),
};
});

const taskManagerStartSpy = jest.spyOn(TaskManagerPlugin.prototype, 'start');

describe('capacity based claiming', () => {
const taskIdsToRemove: string[] = [];
let esServer: TestElasticsearchUtils;
let kibanaServer: TestKibanaUtils;
let taskManagerPlugin: TaskManagerStartContract;
let createMonitoringStatsOpts: CreateMonitoringStatsOpts;

beforeAll(async () => {
const setupResult = await setupTestServers({
xpack: {
task_manager: {
claim_strategy: `mget`,
capacity: 10,
poll_interval: POLLING_INTERVAL,
unsafe: {
exclude_task_types: ['[A-Za-z]*'],
},
},
},
});
esServer = setupResult.esServer;
kibanaServer = setupResult.kibanaServer;

expect(taskManagerStartSpy).toHaveBeenCalledTimes(1);
taskManagerPlugin = taskManagerStartSpy.mock.results[0].value;

expect(TaskPollingLifecycleMock).toHaveBeenCalledTimes(1);

expect(createMonitoringStatsMock).toHaveBeenCalledTimes(1);
createMonitoringStatsOpts = createMonitoringStatsMock.mock.calls[0][0];
});

afterAll(async () => {
if (kibanaServer) {
await kibanaServer.stop();
}
if (esServer) {
await esServer.stop();
}
});

beforeEach(() => {
jest.clearAllMocks();
});

afterEach(async () => {
while (taskIdsToRemove.length > 0) {
const id = taskIdsToRemove.pop();
await taskManagerPlugin.removeIfExists(id!);
}
});

it('should claim tasks to full capacity', async () => {
const backgroundTaskLoads: number[] = [];
createMonitoringStatsOpts.taskPollingLifecycle?.events
.pipe(
filter(isTaskManagerWorkerUtilizationStatEvent),
map<TaskLifecycleEvent, number>((taskEvent: TaskLifecycleEvent) => {
return (taskEvent.event as unknown as Ok<number>).value;
})
)
.subscribe((load: number) => {
backgroundTaskLoads.push(load);
});
const taskRunAtDates: Date[] = [];
mockTaskTypeNormalCostRunFn.mockImplementation(() => {
taskRunAtDates.push(new Date());
return { state: { foo: 'test' } };
});

// inject 10 normal cost tasks with the same runAt value
const ids: string[] = [];
times(10, () => ids.push(uuidV4()));

const runAt = new Date();
for (const id of ids) {
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id,
taskType: '_normalCostType',
params: {},
state: { foo: 'test' },
stateVersion: 1,
runAt,
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});
taskIdsToRemove.push(id);
}

await retry(async () => {
expect(mockTaskTypeNormalCostRunFn).toHaveBeenCalledTimes(10);
});

expect(taskRunAtDates.length).toBe(10);

// run at dates should be within a few seconds of each other
const firstRunAt = taskRunAtDates[0].getTime();
const lastRunAt = taskRunAtDates[taskRunAtDates.length - 1].getTime();

expect(lastRunAt - firstRunAt).toBeLessThanOrEqual(1000);

// background task load should be 0 or 100 since we're only running these tasks
for (const load of backgroundTaskLoads) {
expect(load === 0 || load === 100).toBe(true);
}
});

it('should claim tasks until the next task will exceed capacity', async () => {
const backgroundTaskLoads: number[] = [];
createMonitoringStatsOpts.taskPollingLifecycle?.events
.pipe(
filter(isTaskManagerWorkerUtilizationStatEvent),
map<TaskLifecycleEvent, number>((taskEvent: TaskLifecycleEvent) => {
return (taskEvent.event as unknown as Ok<number>).value;
})
)
.subscribe((load: number) => {
backgroundTaskLoads.push(load);
});
const now = new Date();
const taskRunAtDates: Array<{ runAt: Date; type: string }> = [];
mockTaskTypeNormalCostRunFn.mockImplementation(() => {
taskRunAtDates.push({ type: 'normal', runAt: new Date() });
return { state: { foo: 'test' } };
});
mockTaskTypeXLCostRunFn.mockImplementation(() => {
taskRunAtDates.push({ type: 'xl', runAt: new Date() });
return { state: { foo: 'test' } };
});

// inject 6 normal cost tasks for total cost of 12
const ids: string[] = [];
times(6, () => ids.push(uuidV4()));
const runAt1 = new Date(now.valueOf() - 5);
for (const id of ids) {
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id,
taskType: '_normalCostType',
params: {},
state: { foo: 'test' },
stateVersion: 1,
runAt: runAt1,
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});
taskIdsToRemove.push(id);
}

// inject 1 XL cost task that will put us over the max cost capacity of 20
const xlid = uuidV4();
const runAt2 = now;
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: xlid,
taskType: '_xlCostType',
params: {},
state: { foo: 'test' },
stateVersion: 1,
runAt: runAt2,
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});
taskIdsToRemove.push(xlid);

// inject one more normal cost task
const runAt3 = new Date(now.valueOf() + 5);
const lastid = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: lastid,
taskType: '_normalCostType',
params: {},
state: { foo: 'test' },
stateVersion: 1,
runAt: runAt3,
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});
taskIdsToRemove.push(lastid);

// retry until all tasks have been run
await retry(async () => {
expect(mockTaskTypeNormalCostRunFn).toHaveBeenCalledTimes(7);
expect(mockTaskTypeXLCostRunFn).toHaveBeenCalledTimes(1);
});

expect(taskRunAtDates.length).toBe(8);

const firstRunAt = taskRunAtDates[0].runAt.getTime();

// the first 6 tasks should have been run at the same time (adding some fudge factor)
// and they should all be normal cost tasks
for (let i = 0; i < 6; i++) {
expect(taskRunAtDates[i].type).toBe('normal');
expect(taskRunAtDates[i].runAt.getTime() - firstRunAt).toBeLessThanOrEqual(500);
}

// the next task should be XL cost task and be run after one polling interval has passed (with some fudge factor)
expect(taskRunAtDates[6].type).toBe('xl');
expect(taskRunAtDates[6].runAt.getTime() - firstRunAt).toBeGreaterThan(POLLING_INTERVAL - 500);

// last task should be normal cost and be run after one polling interval has passed
expect(taskRunAtDates[7].type).toBe('normal');
expect(taskRunAtDates[7].runAt.getTime() - firstRunAt).toBeGreaterThan(POLLING_INTERVAL - 500);

// background task load should be 0 or 60 or 100 since we're only running these tasks
// should be 100 during the claim cycle where we claimed 6 normal tasks but left the large capacity task in the queue
// should be 60 during the next claim cycle where we claimed the large capacity task and the normal capacity: 10 + 2 / 20 = 60%
for (const load of backgroundTaskLoads) {
expect(load === 0 || load === 60 || load === 100).toBe(true);
}
});
});
14 changes: 13 additions & 1 deletion x-pack/plugins/task_manager/server/task_claimers/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

import { getTaskClaimer } from '.';
import { getTaskClaimer, isTaskTypeExcluded } from '.';
import { mockLogger } from '../test_utils';
import { claimAvailableTasksUpdateByQuery } from './strategy_update_by_query';
import { claimAvailableTasksMget } from './strategy_mget';
Expand Down Expand Up @@ -37,3 +37,15 @@ describe('task_claimers/index', () => {
});
});
});

describe('isTaskTypeExcluded', () => {
test('returns false when task type is not in the excluded list', () => {
expect(isTaskTypeExcluded(['otherTaskType'], 'taskType')).toBe(false);
expect(isTaskTypeExcluded(['otherTaskType*'], 'taskType')).toBe(false);
});

test('returns true when task type is in the excluded list', () => {
expect(isTaskTypeExcluded(['taskType'], 'taskType')).toBe(true);
expect(isTaskTypeExcluded(['task*'], 'taskType')).toBe(true);
});
});
11 changes: 11 additions & 0 deletions x-pack/plugins/task_manager/server/task_claimers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import { Subject, Observable } from 'rxjs';
import { Logger } from '@kbn/core/server';

import minimatch from 'minimatch';
import { TaskStore } from '../task_store';
import { TaskClaim, TaskTiming } from '../task_events';
import { TaskTypeDictionary } from '../task_type_dictionary';
Expand Down Expand Up @@ -72,3 +73,13 @@ export function getEmptyClaimOwnershipResult(): ClaimOwnershipResult {
docs: [],
};
}

export function isTaskTypeExcluded(excludedTaskTypePatterns: string[], taskType: string) {
for (const excludedTypePattern of excludedTaskTypePatterns) {
if (minimatch(taskType, excludedTypePattern)) {
return true;
}
}

return false;
}
Loading

0 comments on commit 9653d7e

Please sign in to comment.