diff --git a/x-pack/plugins/task_manager/server/integration_tests/task_manager_capacity_based_claiming.test.ts b/x-pack/plugins/task_manager/server/integration_tests/task_manager_capacity_based_claiming.test.ts new file mode 100644 index 0000000000000..ab2a397f60f8c --- /dev/null +++ b/x-pack/plugins/task_manager/server/integration_tests/task_manager_capacity_based_claiming.test.ts @@ -0,0 +1,327 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { v4 as uuidV4 } from 'uuid'; +import type { TestElasticsearchUtils, TestKibanaUtils } from '@kbn/core-test-helpers-kbn-server'; +import { schema } from '@kbn/config-schema'; +import { times } from 'lodash'; +import { TaskCost, TaskStatus } from '../task'; +import type { TaskClaimingOpts } from '../queries/task_claiming'; +import { TaskManagerPlugin, type TaskManagerStartContract } from '../plugin'; +import { injectTask, setupTestServers, retry } from './lib'; +import { CreateMonitoringStatsOpts } from '../monitoring'; +import { filter, map } from 'rxjs'; +import { isTaskManagerWorkerUtilizationStatEvent } from '../task_events'; +import { TaskLifecycleEvent } from '../polling_lifecycle'; +import { Ok } from '../lib/result_type'; + +const POLLING_INTERVAL = 5000; +const { TaskPollingLifecycle: TaskPollingLifecycleMock } = jest.requireMock('../polling_lifecycle'); +jest.mock('../polling_lifecycle', () => { + const actual = jest.requireActual('../polling_lifecycle'); + return { + ...actual, + TaskPollingLifecycle: jest.fn().mockImplementation((opts) => { + return new actual.TaskPollingLifecycle(opts); + }), + }; +}); + +const { createMonitoringStats: createMonitoringStatsMock } = jest.requireMock('../monitoring'); +jest.mock('../monitoring', () => { + const actual = jest.requireActual('../monitoring'); + return { + ...actual, + createMonitoringStats: jest.fn().mockImplementation((opts) => { + return new actual.createMonitoringStats(opts); + }), + }; +}); + +const mockTaskTypeNormalCostRunFn = jest.fn(); +const mockCreateTaskRunnerNormalCost = jest.fn(); +const mockTaskTypeNormalCost = { + title: 'Normal cost task', + description: '', + cost: TaskCost.Normal, + stateSchemaByVersion: { + 1: { + up: (state: Record) => ({ foo: state.foo || '' }), + schema: schema.object({ + foo: schema.string(), + }), + }, + }, + createTaskRunner: mockCreateTaskRunnerNormalCost.mockImplementation(() => ({ + run: mockTaskTypeNormalCostRunFn, + })), +}; +const mockTaskTypeXLCostRunFn = jest.fn(); +const mockCreateTaskRunnerXLCost = jest.fn(); +const mockTaskTypeXLCost = { + title: 'XL cost task', + description: '', + cost: TaskCost.ExtraLarge, + stateSchemaByVersion: { + 1: { + up: (state: Record) => ({ foo: state.foo || '' }), + schema: schema.object({ + foo: schema.string(), + }), + }, + }, + createTaskRunner: mockCreateTaskRunnerXLCost.mockImplementation(() => ({ + run: mockTaskTypeXLCostRunFn, + })), +}; +jest.mock('../queries/task_claiming', () => { + const actual = jest.requireActual('../queries/task_claiming'); + return { + ...actual, + TaskClaiming: jest.fn().mockImplementation((opts: TaskClaimingOpts) => { + opts.definitions.registerTaskDefinitions({ + _normalCostType: mockTaskTypeNormalCost, + _xlCostType: mockTaskTypeXLCost, + }); + return new actual.TaskClaiming(opts); + }), + }; +}); + +const taskManagerStartSpy = jest.spyOn(TaskManagerPlugin.prototype, 'start'); + +describe('capacity based claiming', () => { + const taskIdsToRemove: string[] = []; + let esServer: TestElasticsearchUtils; + let kibanaServer: TestKibanaUtils; + let taskManagerPlugin: TaskManagerStartContract; + let createMonitoringStatsOpts: CreateMonitoringStatsOpts; + + beforeAll(async () => { + const setupResult = await setupTestServers({ + xpack: { + task_manager: { + claim_strategy: `mget`, + capacity: 10, + poll_interval: POLLING_INTERVAL, + unsafe: { + exclude_task_types: ['[A-Za-z]*'], + }, + }, + }, + }); + esServer = setupResult.esServer; + kibanaServer = setupResult.kibanaServer; + + expect(taskManagerStartSpy).toHaveBeenCalledTimes(1); + taskManagerPlugin = taskManagerStartSpy.mock.results[0].value; + + expect(TaskPollingLifecycleMock).toHaveBeenCalledTimes(1); + + expect(createMonitoringStatsMock).toHaveBeenCalledTimes(1); + createMonitoringStatsOpts = createMonitoringStatsMock.mock.calls[0][0]; + }); + + afterAll(async () => { + if (kibanaServer) { + await kibanaServer.stop(); + } + if (esServer) { + await esServer.stop(); + } + }); + + beforeEach(() => { + jest.clearAllMocks(); + }); + + afterEach(async () => { + while (taskIdsToRemove.length > 0) { + const id = taskIdsToRemove.pop(); + await taskManagerPlugin.removeIfExists(id!); + } + }); + + it('should claim tasks to full capacity', async () => { + const backgroundTaskLoads: number[] = []; + createMonitoringStatsOpts.taskPollingLifecycle?.events + .pipe( + filter(isTaskManagerWorkerUtilizationStatEvent), + map((taskEvent: TaskLifecycleEvent) => { + return (taskEvent.event as unknown as Ok).value; + }) + ) + .subscribe((load: number) => { + backgroundTaskLoads.push(load); + }); + const taskRunAtDates: Date[] = []; + mockTaskTypeNormalCostRunFn.mockImplementation(() => { + taskRunAtDates.push(new Date()); + return { state: { foo: 'test' } }; + }); + + // inject 10 normal cost tasks with the same runAt value + const ids: string[] = []; + times(10, () => ids.push(uuidV4())); + + const runAt = new Date(); + for (const id of ids) { + await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, { + id, + taskType: '_normalCostType', + params: {}, + state: { foo: 'test' }, + stateVersion: 1, + runAt, + enabled: true, + scheduledAt: new Date(), + attempts: 0, + status: TaskStatus.Idle, + startedAt: null, + retryAt: null, + ownerId: null, + }); + taskIdsToRemove.push(id); + } + + await retry(async () => { + expect(mockTaskTypeNormalCostRunFn).toHaveBeenCalledTimes(10); + }); + + expect(taskRunAtDates.length).toBe(10); + + // run at dates should be within a few seconds of each other + const firstRunAt = taskRunAtDates[0].getTime(); + const lastRunAt = taskRunAtDates[taskRunAtDates.length - 1].getTime(); + + expect(lastRunAt - firstRunAt).toBeLessThanOrEqual(1000); + + // background task load should be 0 or 100 since we're only running these tasks + for (const load of backgroundTaskLoads) { + expect(load === 0 || load === 100).toBe(true); + } + }); + + it('should claim tasks until the next task will exceed capacity', async () => { + const backgroundTaskLoads: number[] = []; + createMonitoringStatsOpts.taskPollingLifecycle?.events + .pipe( + filter(isTaskManagerWorkerUtilizationStatEvent), + map((taskEvent: TaskLifecycleEvent) => { + return (taskEvent.event as unknown as Ok).value; + }) + ) + .subscribe((load: number) => { + backgroundTaskLoads.push(load); + }); + const now = new Date(); + const taskRunAtDates: Array<{ runAt: Date; type: string }> = []; + mockTaskTypeNormalCostRunFn.mockImplementation(() => { + taskRunAtDates.push({ type: 'normal', runAt: new Date() }); + return { state: { foo: 'test' } }; + }); + mockTaskTypeXLCostRunFn.mockImplementation(() => { + taskRunAtDates.push({ type: 'xl', runAt: new Date() }); + return { state: { foo: 'test' } }; + }); + + // inject 6 normal cost tasks for total cost of 12 + const ids: string[] = []; + times(6, () => ids.push(uuidV4())); + const runAt1 = new Date(now.valueOf() - 5); + for (const id of ids) { + await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, { + id, + taskType: '_normalCostType', + params: {}, + state: { foo: 'test' }, + stateVersion: 1, + runAt: runAt1, + enabled: true, + scheduledAt: new Date(), + attempts: 0, + status: TaskStatus.Idle, + startedAt: null, + retryAt: null, + ownerId: null, + }); + taskIdsToRemove.push(id); + } + + // inject 1 XL cost task that will put us over the max cost capacity of 20 + const xlid = uuidV4(); + const runAt2 = now; + await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, { + id: xlid, + taskType: '_xlCostType', + params: {}, + state: { foo: 'test' }, + stateVersion: 1, + runAt: runAt2, + enabled: true, + scheduledAt: new Date(), + attempts: 0, + status: TaskStatus.Idle, + startedAt: null, + retryAt: null, + ownerId: null, + }); + taskIdsToRemove.push(xlid); + + // inject one more normal cost task + const runAt3 = new Date(now.valueOf() + 5); + const lastid = uuidV4(); + await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, { + id: lastid, + taskType: '_normalCostType', + params: {}, + state: { foo: 'test' }, + stateVersion: 1, + runAt: runAt3, + enabled: true, + scheduledAt: new Date(), + attempts: 0, + status: TaskStatus.Idle, + startedAt: null, + retryAt: null, + ownerId: null, + }); + taskIdsToRemove.push(lastid); + + // retry until all tasks have been run + await retry(async () => { + expect(mockTaskTypeNormalCostRunFn).toHaveBeenCalledTimes(7); + expect(mockTaskTypeXLCostRunFn).toHaveBeenCalledTimes(1); + }); + + expect(taskRunAtDates.length).toBe(8); + + const firstRunAt = taskRunAtDates[0].runAt.getTime(); + + // the first 6 tasks should have been run at the same time (adding some fudge factor) + // and they should all be normal cost tasks + for (let i = 0; i < 6; i++) { + expect(taskRunAtDates[i].type).toBe('normal'); + expect(taskRunAtDates[i].runAt.getTime() - firstRunAt).toBeLessThanOrEqual(500); + } + + // the next task should be XL cost task and be run after one polling interval has passed (with some fudge factor) + expect(taskRunAtDates[6].type).toBe('xl'); + expect(taskRunAtDates[6].runAt.getTime() - firstRunAt).toBeGreaterThan(POLLING_INTERVAL - 500); + + // last task should be normal cost and be run after one polling interval has passed + expect(taskRunAtDates[7].type).toBe('normal'); + expect(taskRunAtDates[7].runAt.getTime() - firstRunAt).toBeGreaterThan(POLLING_INTERVAL - 500); + + // background task load should be 0 or 60 or 100 since we're only running these tasks + // should be 100 during the claim cycle where we claimed 6 normal tasks but left the large capacity task in the queue + // should be 60 during the next claim cycle where we claimed the large capacity task and the normal capacity: 10 + 2 / 20 = 60% + for (const load of backgroundTaskLoads) { + expect(load === 0 || load === 60 || load === 100).toBe(true); + } + }); +}); diff --git a/x-pack/plugins/task_manager/server/task_claimers/index.test.ts b/x-pack/plugins/task_manager/server/task_claimers/index.test.ts index d4501a0a021ff..72ab2f9a695e8 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/index.test.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/index.test.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { getTaskClaimer } from '.'; +import { getTaskClaimer, isTaskTypeExcluded } from '.'; import { mockLogger } from '../test_utils'; import { claimAvailableTasksUpdateByQuery } from './strategy_update_by_query'; import { claimAvailableTasksMget } from './strategy_mget'; @@ -37,3 +37,15 @@ describe('task_claimers/index', () => { }); }); }); + +describe('isTaskTypeExcluded', () => { + test('returns false when task type is not in the excluded list', () => { + expect(isTaskTypeExcluded(['otherTaskType'], 'taskType')).toBe(false); + expect(isTaskTypeExcluded(['otherTaskType*'], 'taskType')).toBe(false); + }); + + test('returns true when task type is in the excluded list', () => { + expect(isTaskTypeExcluded(['taskType'], 'taskType')).toBe(true); + expect(isTaskTypeExcluded(['task*'], 'taskType')).toBe(true); + }); +}); diff --git a/x-pack/plugins/task_manager/server/task_claimers/index.ts b/x-pack/plugins/task_manager/server/task_claimers/index.ts index 2b7e48c85f167..ff4f9f6131120 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/index.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/index.ts @@ -8,6 +8,7 @@ import { Subject, Observable } from 'rxjs'; import { Logger } from '@kbn/core/server'; +import minimatch from 'minimatch'; import { TaskStore } from '../task_store'; import { TaskClaim, TaskTiming } from '../task_events'; import { TaskTypeDictionary } from '../task_type_dictionary'; @@ -72,3 +73,13 @@ export function getEmptyClaimOwnershipResult(): ClaimOwnershipResult { docs: [], }; } + +export function isTaskTypeExcluded(excludedTaskTypePatterns: string[], taskType: string) { + for (const excludedTypePattern of excludedTaskTypePatterns) { + if (minimatch(taskType, excludedTypePattern)) { + return true; + } + } + + return false; +} diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index 7962fdd2b6f8a..b2751803e8dc3 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -19,7 +19,12 @@ import apm from 'elastic-apm-node'; import { Subject, Observable } from 'rxjs'; import { TaskTypeDictionary } from '../task_type_dictionary'; -import { TaskClaimerOpts, ClaimOwnershipResult, getEmptyClaimOwnershipResult } from '.'; +import { + TaskClaimerOpts, + ClaimOwnershipResult, + getEmptyClaimOwnershipResult, + isTaskTypeExcluded, +} from '.'; import { ConcreteTaskInstance, TaskStatus, ConcreteTaskInstanceVersion, TaskCost } from '../task'; import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running'; import { @@ -50,7 +55,7 @@ interface OwnershipClaimingOpts { size: number; taskTypes: Set; removedTypes: Set; - excludedTypes: Set; + excludedTaskTypes: string[]; taskStore: TaskStore; events$: Subject; definitions: TaskTypeDictionary; @@ -103,13 +108,12 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise { const searchedTypes = Array.from(taskTypes) .concat(Array.from(removedTypes)) - .filter((type) => !excludedTypes.has(type)); + .filter((type) => !isTaskTypeExcluded(excludedTaskTypes, type)); const queryForScheduledTasks = mustBeAllOf( // Task must be enabled EnabledTask, diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.ts index 1cb9d8942a55a..807ee8ca4397f 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.ts @@ -9,14 +9,18 @@ * This module contains helpers for managing the task manager storage layer. */ import apm from 'elastic-apm-node'; -import minimatch from 'minimatch'; import { Subject, Observable, from, of } from 'rxjs'; import { mergeScan } from 'rxjs'; import { groupBy, pick } from 'lodash'; import { asOk } from '../lib/result_type'; import { TaskTypeDictionary } from '../task_type_dictionary'; -import { TaskClaimerOpts, ClaimOwnershipResult, getEmptyClaimOwnershipResult } from '.'; +import { + TaskClaimerOpts, + ClaimOwnershipResult, + getEmptyClaimOwnershipResult, + isTaskTypeExcluded, +} from '.'; import { ConcreteTaskInstance } from '../task'; import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running'; import { isLimited, TASK_MANAGER_MARK_AS_CLAIMED } from '../queries/task_claiming'; @@ -132,16 +136,6 @@ function emitEvents(events$: Subject, events: TaskClaim[]) { events.forEach((event) => events$.next(event)); } -function isTaskTypeExcluded(excludedTaskTypes: string[], taskType: string) { - for (const excludedType of excludedTaskTypes) { - if (minimatch(taskType, excludedType)) { - return true; - } - } - - return false; -} - async function markAvailableTasksAsClaimed({ definitions, excludedTaskTypes,