From 67f138ee9e667d8272d2b396480a3b9ae07ee45a Mon Sep 17 00:00:00 2001 From: Jiawei Wu Date: Thu, 21 Nov 2024 23:45:29 -0800 Subject: [PATCH] remove ephemeral task from task manager --- .../task_manager/server/config.test.ts | 70 ++- x-pack/plugins/task_manager/server/config.ts | 13 +- .../server/ephemeral_task_lifecycle.mock.ts | 24 - .../server/ephemeral_task_lifecycle.test.ts | 414 ------------------ .../server/ephemeral_task_lifecycle.ts | 210 --------- x-pack/plugins/task_manager/server/index.ts | 10 - .../managed_configuration.test.ts | 12 - .../lib/calculate_health_status.test.ts | 5 - .../server/lib/log_health_metrics.test.ts | 1 - .../server/metrics/create_aggregator.test.ts | 4 - x-pack/plugins/task_manager/server/mocks.ts | 2 - .../monitoring/capacity_estimation.test.ts | 133 +----- .../server/monitoring/capacity_estimation.ts | 26 +- .../configuration_statistics.test.ts | 4 - .../ephemeral_task_statistics.test.ts | 384 ---------------- .../monitoring/ephemeral_task_statistics.ts | 127 ------ .../task_manager/server/monitoring/index.ts | 2 - .../monitoring/monitoring_stats_stream.ts | 28 +- .../monitoring/task_run_statistics.test.ts | 36 +- .../server/monitoring/task_run_statistics.ts | 3 - .../task_manager/server/plugin.test.ts | 24 - x-pack/plugins/task_manager/server/plugin.ts | 24 +- .../server/polling_lifecycle.test.ts | 4 - .../task_manager/server/polling_lifecycle.ts | 4 +- .../task_manager/server/routes/health.test.ts | 44 +- x-pack/plugins/task_manager/server/task.ts | 10 - .../task_manager/server/task_events.ts | 24 - .../task_running/ephemeral_task_runner.ts | 396 ----------------- .../server/task_running/errors.ts | 20 - .../server/task_running/task_runner.ts | 1 - .../server/task_scheduling.mock.ts | 1 - .../server/task_scheduling.test.ts | 124 +----- .../task_manager/server/task_scheduling.ts | 155 +------ .../task_manager_usage_collector.test.ts | 68 --- .../usage/task_manager_usage_collector.ts | 52 --- .../task_manager/server/usage/types.ts | 23 - .../sample_task_plugin/server/init_routes.ts | 39 -- .../check_registered_task_types.ts | 1 - .../test_suites/task_manager/health_route.ts | 1 - .../task_manager/task_management.ts | 204 --------- .../config.ts | 2 - .../server/init_routes.ts | 39 -- .../sample_task_plugin_mget/server/plugin.ts | 34 -- .../test_suites/task_manager/health_route.ts | 1 - .../task_manager/task_management.ts | 204 --------- 45 files changed, 85 insertions(+), 2922 deletions(-) delete mode 100644 x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.mock.ts delete mode 100644 x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts delete mode 100644 x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.ts delete mode 100644 x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.test.ts delete mode 100644 x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.ts delete mode 100644 x-pack/plugins/task_manager/server/task_running/ephemeral_task_runner.ts diff --git a/x-pack/plugins/task_manager/server/config.test.ts b/x-pack/plugins/task_manager/server/config.test.ts index 2193620e0c99..63b9b587af95 100644 --- a/x-pack/plugins/task_manager/server/config.test.ts +++ b/x-pack/plugins/task_manager/server/config.test.ts @@ -19,10 +19,6 @@ describe('config validation', () => { "active_nodes_lookback": "30s", "interval": 10000, }, - "ephemeral_tasks": Object { - "enabled": false, - "request_capacity": 10, - }, "event_loop_delay": Object { "monitor": true, "warn_threshold": 5000, @@ -82,10 +78,6 @@ describe('config validation', () => { "active_nodes_lookback": "30s", "interval": 10000, }, - "ephemeral_tasks": Object { - "enabled": false, - "request_capacity": 10, - }, "event_loop_delay": Object { "monitor": true, "warn_threshold": 5000, @@ -143,10 +135,6 @@ describe('config validation', () => { "active_nodes_lookback": "30s", "interval": 10000, }, - "ephemeral_tasks": Object { - "enabled": false, - "request_capacity": 10, - }, "event_loop_delay": Object { "monitor": true, "warn_threshold": 5000, @@ -296,4 +284,62 @@ describe('config validation', () => { `"[discovery.active_nodes_lookback]: active node lookback duration cannot exceed five minutes"` ); }); + + test('should not throw if ephemeral_tasks is defined', () => { + const config: Record = { + ephemeral_tasks: { + enabled: true, + request_capacity: 20, + }, + }; + + expect(configSchema.validate(config)).toMatchInlineSnapshot(` + Object { + "allow_reading_invalid_state": true, + "auto_calculate_default_ech_capacity": false, + "claim_strategy": "mget", + "discovery": Object { + "active_nodes_lookback": "30s", + "interval": 10000, + }, + "ephemeral_tasks": Object { + "enabled": true, + "request_capacity": 20, + }, + "event_loop_delay": Object { + "monitor": true, + "warn_threshold": 5000, + }, + "kibanas_per_partition": 2, + "max_attempts": 3, + "metrics_reset_interval": 30000, + "monitored_aggregated_stats_refresh_rate": 60000, + "monitored_stats_health_verbose_log": Object { + "enabled": false, + "level": "debug", + "warn_delayed_task_start_in_seconds": 60, + }, + "monitored_stats_required_freshness": 4000, + "monitored_stats_running_average_window": 50, + "monitored_task_execution_thresholds": Object { + "custom": Object {}, + "default": Object { + "error_threshold": 90, + "warn_threshold": 80, + }, + }, + "poll_interval": 500, + "request_capacity": 1000, + "request_timeouts": Object { + "update_by_query": 30000, + }, + "unsafe": Object { + "authenticate_background_task_utilization": true, + "exclude_task_types": Array [], + }, + "version_conflict_threshold": 80, + "worker_utilization_running_average_window": 5, + } + `); + }); }); diff --git a/x-pack/plugins/task_manager/server/config.ts b/x-pack/plugins/task_manager/server/config.ts index 002f18380a74..8d58cb9fdc87 100644 --- a/x-pack/plugins/task_manager/server/config.ts +++ b/x-pack/plugins/task_manager/server/config.ts @@ -16,7 +16,6 @@ export const DEFAULT_MAX_WORKERS = 10; export const DEFAULT_POLL_INTERVAL = 3000; export const MGET_DEFAULT_POLL_INTERVAL = 500; export const DEFAULT_VERSION_CONFLICT_THRESHOLD = 80; -export const DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY = MAX_WORKERS_LIMIT; // Monitoring Constants // =================== @@ -100,16 +99,8 @@ export const configSchema = schema.object( max: MAX_DISCOVERY_INTERVAL_MS, }), }), - ephemeral_tasks: schema.object({ - enabled: schema.boolean({ defaultValue: false }), - /* How many requests can Task Manager buffer before it rejects new requests. */ - request_capacity: schema.number({ - // a nice round contrived number, feel free to change as we learn how it behaves - defaultValue: 10, - min: 1, - max: DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY, - }), - }), + /* Allows for old kibana config to start kibana without crashing since ephemeral tasks are deprecated*/ + ephemeral_tasks: schema.maybe(schema.any()), event_loop_delay: eventLoopDelaySchema, kibanas_per_partition: schema.number({ defaultValue: DEFAULT_KIBANAS_PER_PARTITION, diff --git a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.mock.ts b/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.mock.ts deleted file mode 100644 index d107bcdddf50..000000000000 --- a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.mock.ts +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle'; -import { TaskLifecycleEvent } from './polling_lifecycle'; -import { of, Observable } from 'rxjs'; - -export const ephemeralTaskLifecycleMock = { - create(opts: { events$?: Observable; getQueuedTasks?: () => number }) { - return { - attemptToRun: jest.fn(), - get events() { - return opts.events$ ?? of(); - }, - get queuedTasks() { - return opts.getQueuedTasks ? opts.getQueuedTasks() : 0; - }, - } as unknown as jest.Mocked; - }, -}; diff --git a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts b/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts deleted file mode 100644 index ec4595915777..000000000000 --- a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts +++ /dev/null @@ -1,414 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { Subject } from 'rxjs'; - -import { TaskLifecycleEvent } from './polling_lifecycle'; -import { createInitialMiddleware } from './lib/middleware'; -import { TaskTypeDictionary } from './task_type_dictionary'; -import { mockLogger } from './test_utils'; -import { asErr, asOk } from './lib/result_type'; -import { FillPoolResult } from './lib/fill_pool'; -import { EphemeralTaskLifecycle, EphemeralTaskLifecycleOpts } from './ephemeral_task_lifecycle'; -import { v4 as uuidv4 } from 'uuid'; -import { asTaskPollingCycleEvent, asTaskRunEvent, TaskPersistence } from './task_events'; -import { TaskRunResult } from './task_running'; -import { TaskPoolRunResult } from './task_pool'; -import { TaskPoolMock } from './task_pool/task_pool.mock'; -import { executionContextServiceMock } from '@kbn/core/server/mocks'; -import { taskManagerMock } from './mocks'; - -jest.mock('./constants', () => ({ - CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: ['report'], -})); - -const executionContext = executionContextServiceMock.createSetupContract(); - -describe('EphemeralTaskLifecycle', () => { - function initTaskLifecycleParams({ - config, - ...optOverrides - }: { - config?: Partial; - } & Partial> = {}) { - const taskManagerLogger = mockLogger(); - const poolCapacity = jest.fn(); - const pool = TaskPoolMock.create(poolCapacity); - const lifecycleEvent$ = new Subject(); - const elasticsearchAndSOAvailability$ = new Subject(); - const opts: EphemeralTaskLifecycleOpts = { - logger: taskManagerLogger, - definitions: new TaskTypeDictionary(taskManagerLogger), - executionContext, - config: { - discovery: { - active_nodes_lookback: '30s', - interval: 10000, - }, - kibanas_per_partition: 2, - max_attempts: 9, - poll_interval: 6000000, - version_conflict_threshold: 80, - request_capacity: 1000, - allow_reading_invalid_state: false, - monitored_aggregated_stats_refresh_rate: 5000, - monitored_stats_required_freshness: 5000, - monitored_stats_running_average_window: 50, - monitored_stats_health_verbose_log: { - enabled: true, - level: 'debug', - warn_delayed_task_start_in_seconds: 60, - }, - monitored_task_execution_thresholds: { - default: { - error_threshold: 90, - warn_threshold: 80, - }, - custom: {}, - }, - ephemeral_tasks: { - enabled: true, - request_capacity: 10, - }, - unsafe: { - exclude_task_types: [], - authenticate_background_task_utilization: true, - }, - event_loop_delay: { - monitor: true, - warn_threshold: 5000, - }, - worker_utilization_running_average_window: 5, - metrics_reset_interval: 3000, - claim_strategy: 'update_by_query', - request_timeouts: { - update_by_query: 1000, - }, - auto_calculate_default_ech_capacity: false, - ...config, - }, - elasticsearchAndSOAvailability$, - pool, - lifecycleEvent: lifecycleEvent$, - middleware: createInitialMiddleware(), - ...optOverrides, - }; - - opts.definitions.registerTaskDefinitions({ - foo: { - title: 'foo', - createTaskRunner: jest.fn(), - }, - }); - - pool.run.mockResolvedValue(Promise.resolve(TaskPoolRunResult.RunningAllClaimedTasks)); - - return { poolCapacity, lifecycleEvent$, pool, elasticsearchAndSOAvailability$, opts }; - } - - describe('constructor', () => { - test('avoids unnecesery subscription if ephemeral tasks are disabled', () => { - const { opts } = initTaskLifecycleParams({ - config: { - ephemeral_tasks: { - enabled: false, - request_capacity: 10, - }, - }, - }); - - const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts); - - const task = taskManagerMock.createTask(); - expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asErr(task)); - }); - - test('queues up tasks when ephemeral tasks are enabled', () => { - const { opts } = initTaskLifecycleParams(); - - const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts); - - const task = taskManagerMock.createTask(); - expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task)); - }); - - test('rejects tasks when ephemeral tasks are enabled and queue is full', () => { - const { opts } = initTaskLifecycleParams({ - config: { ephemeral_tasks: { enabled: true, request_capacity: 2 } }, - }); - - const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts); - - const task = taskManagerMock.createTask(); - expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task)); - const task2 = taskManagerMock.createTask(); - expect(ephemeralTaskLifecycle.attemptToRun(task2)).toMatchObject(asOk(task2)); - - const rejectedTask = taskManagerMock.createTask(); - expect(ephemeralTaskLifecycle.attemptToRun(rejectedTask)).toMatchObject(asErr(rejectedTask)); - }); - - test('pulls tasks off queue when a polling cycle completes', () => { - const { pool, poolCapacity, opts, lifecycleEvent$ } = initTaskLifecycleParams(); - - const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts); - - const task = taskManagerMock.createTask({ id: `my-phemeral-task` }); - expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task)); - - poolCapacity.mockReturnValue({ - availableCapacity: 10, - }); - - lifecycleEvent$.next( - asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })) - ); - - expect(pool.run).toHaveBeenCalledTimes(1); - - const taskRunners = pool.run.mock.calls[0][0]; - expect(taskRunners).toHaveLength(1); - expect(`${taskRunners[0]}`).toMatchInlineSnapshot(`"foo \\"my-phemeral-task\\" (Ephemeral)"`); - }); - - test('pulls tasks off queue when a task run completes', () => { - const { pool, poolCapacity, opts, lifecycleEvent$ } = initTaskLifecycleParams(); - - const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts); - - const task = taskManagerMock.createTask({ id: `my-phemeral-task` }); - expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task)); - - poolCapacity.mockReturnValue({ - availableCapacity: 10, - }); - - lifecycleEvent$.next( - asTaskRunEvent( - uuidv4(), - asOk({ - task: taskManagerMock.createTask(), - result: TaskRunResult.Success, - persistence: TaskPersistence.Ephemeral, - isExpired: false, - }) - ) - ); - - expect(pool.run).toHaveBeenCalledTimes(1); - - const taskRunners = pool.run.mock.calls[0][0]; - expect(taskRunners).toHaveLength(1); - expect(`${taskRunners[0]}`).toMatchInlineSnapshot(`"foo \\"my-phemeral-task\\" (Ephemeral)"`); - }); - - test('pulls as many tasks off queue as it has capacity for', () => { - const { pool, poolCapacity, opts, lifecycleEvent$ } = initTaskLifecycleParams(); - - const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts); - - const tasks = [ - taskManagerMock.createTask(), - taskManagerMock.createTask(), - taskManagerMock.createTask(), - ]; - expect(ephemeralTaskLifecycle.attemptToRun(tasks[0])).toMatchObject(asOk(tasks[0])); - expect(ephemeralTaskLifecycle.attemptToRun(tasks[1])).toMatchObject(asOk(tasks[1])); - expect(ephemeralTaskLifecycle.attemptToRun(tasks[2])).toMatchObject(asOk(tasks[2])); - - poolCapacity.mockReturnValue({ - availableCapacity: 2, - }); - - lifecycleEvent$.next( - asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })) - ); - - expect(pool.run).toHaveBeenCalledTimes(1); - - const taskRunners = pool.run.mock.calls[0][0]; - expect(taskRunners).toHaveLength(2); - expect(`${taskRunners[0]}`).toEqual(`foo "${tasks[0].id}" (Ephemeral)`); - expect(`${taskRunners[1]}`).toEqual(`foo "${tasks[1].id}" (Ephemeral)`); - }); - - test('pulls only as many tasks of the same type as is allowed by maxConcurrency', () => { - const { pool, poolCapacity, opts, lifecycleEvent$ } = initTaskLifecycleParams(); - - opts.definitions.registerTaskDefinitions({ - report: { - title: 'report', - maxConcurrency: 1, - createTaskRunner: jest.fn(), - }, - }); - - const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts); - - const firstLimitedTask = taskManagerMock.createTask({ taskType: 'report' }); - const secondLimitedTask = taskManagerMock.createTask({ taskType: 'report' }); - // both are queued - expect(ephemeralTaskLifecycle.attemptToRun(firstLimitedTask)).toMatchObject( - asOk(firstLimitedTask) - ); - expect(ephemeralTaskLifecycle.attemptToRun(secondLimitedTask)).toMatchObject( - asOk(secondLimitedTask) - ); - - // pool has capacity for both - poolCapacity.mockReturnValue({ - availableCapacity: 10, - }); - pool.getUsedCapacityByType.mockReturnValue(0); - - lifecycleEvent$.next( - asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })) - ); - - expect(pool.run).toHaveBeenCalledTimes(1); - - const taskRunners = pool.run.mock.calls[0][0]; - expect(taskRunners).toHaveLength(1); - expect(`${taskRunners[0]}`).toEqual(`report "${firstLimitedTask.id}" (Ephemeral)`); - }); - - test('when pulling tasks from the queue, it takes into account the maxConcurrency of tasks that are already in the pool', () => { - const { pool, poolCapacity, opts, lifecycleEvent$ } = initTaskLifecycleParams(); - - opts.definitions.registerTaskDefinitions({ - report: { - title: 'report', - maxConcurrency: 1, - createTaskRunner: jest.fn(), - }, - }); - - const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts); - - const firstLimitedTask = taskManagerMock.createTask({ taskType: 'report' }); - const secondLimitedTask = taskManagerMock.createTask({ taskType: 'report' }); - // both are queued - expect(ephemeralTaskLifecycle.attemptToRun(firstLimitedTask)).toMatchObject( - asOk(firstLimitedTask) - ); - expect(ephemeralTaskLifecycle.attemptToRun(secondLimitedTask)).toMatchObject( - asOk(secondLimitedTask) - ); - - // pool has capacity in general - poolCapacity.mockReturnValue({ - availableCapacity: 2, - }); - // but when we ask how many it has occupied by type - wee always have one worker already occupied by that type - pool.getUsedCapacityByType.mockReturnValue(1); - - lifecycleEvent$.next( - asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })) - ); - - expect(pool.run).toHaveBeenCalledTimes(0); - - // now we release the worker in the pool and cause another cycle in the epheemral queue - pool.getUsedCapacityByType.mockReturnValue(0); - lifecycleEvent$.next( - asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })) - ); - - expect(pool.run).toHaveBeenCalledTimes(1); - const taskRunners = pool.run.mock.calls[0][0]; - expect(taskRunners).toHaveLength(1); - expect(`${taskRunners[0]}`).toEqual(`report "${firstLimitedTask.id}" (Ephemeral)`); - }); - }); - - test('pulls tasks with both maxConcurrency and unlimited concurrency', () => { - const { pool, poolCapacity, opts, lifecycleEvent$ } = initTaskLifecycleParams(); - - opts.definitions.registerTaskDefinitions({ - report: { - title: 'report', - maxConcurrency: 1, - createTaskRunner: jest.fn(), - }, - }); - - const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts); - - const fooTasks = [ - taskManagerMock.createTask(), - taskManagerMock.createTask(), - taskManagerMock.createTask(), - ]; - expect(ephemeralTaskLifecycle.attemptToRun(fooTasks[0])).toMatchObject(asOk(fooTasks[0])); - - const firstLimitedTask = taskManagerMock.createTask({ taskType: 'report' }); - expect(ephemeralTaskLifecycle.attemptToRun(firstLimitedTask)).toMatchObject( - asOk(firstLimitedTask) - ); - - expect(ephemeralTaskLifecycle.attemptToRun(fooTasks[1])).toMatchObject(asOk(fooTasks[1])); - - const secondLimitedTask = taskManagerMock.createTask({ taskType: 'report' }); - expect(ephemeralTaskLifecycle.attemptToRun(secondLimitedTask)).toMatchObject( - asOk(secondLimitedTask) - ); - - expect(ephemeralTaskLifecycle.attemptToRun(fooTasks[2])).toMatchObject(asOk(fooTasks[2])); - - // pool has capacity for all - poolCapacity.mockReturnValue({ - availableCapacity: 10, - }); - pool.getUsedCapacityByType.mockReturnValue(0); - - lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))); - - expect(pool.run).toHaveBeenCalledTimes(1); - - const taskRunners = pool.run.mock.calls[0][0]; - expect(taskRunners).toHaveLength(4); - const asStrings = taskRunners.map((taskRunner) => `${taskRunner}`); - expect(asStrings).toContain(`foo "${fooTasks[0].id}" (Ephemeral)`); - expect(asStrings).toContain(`report "${firstLimitedTask.id}" (Ephemeral)`); - expect(asStrings).toContain(`foo "${fooTasks[1].id}" (Ephemeral)`); - expect(asStrings).toContain(`foo "${fooTasks[2].id}" (Ephemeral)`); - }); - - test('properly removes from the queue after pulled', () => { - const { poolCapacity, opts, lifecycleEvent$ } = initTaskLifecycleParams(); - - const ephemeralTaskLifecycle = new EphemeralTaskLifecycle(opts); - - const tasks = [ - taskManagerMock.createTask(), - taskManagerMock.createTask(), - taskManagerMock.createTask(), - ]; - expect(ephemeralTaskLifecycle.attemptToRun(tasks[0])).toMatchObject(asOk(tasks[0])); - expect(ephemeralTaskLifecycle.attemptToRun(tasks[1])).toMatchObject(asOk(tasks[1])); - expect(ephemeralTaskLifecycle.attemptToRun(tasks[2])).toMatchObject(asOk(tasks[2])); - - expect(ephemeralTaskLifecycle.queuedTasks).toBe(3); - poolCapacity.mockReturnValue({ - availableCapacity: 1, - }); - lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))); - expect(ephemeralTaskLifecycle.queuedTasks).toBe(2); - - poolCapacity.mockReturnValue({ - availableCapacity: 1, - }); - lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))); - expect(ephemeralTaskLifecycle.queuedTasks).toBe(1); - - poolCapacity.mockReturnValue({ - availableCapacity: 1, - }); - lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))); - expect(ephemeralTaskLifecycle.queuedTasks).toBe(0); - }); -}); diff --git a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.ts b/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.ts deleted file mode 100644 index c7ee267b848e..000000000000 --- a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.ts +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { Subject, Observable, Subscription } from 'rxjs'; -import { filter } from 'rxjs'; -import { Logger, ExecutionContextStart } from '@kbn/core/server'; - -import { Result, asErr, asOk } from './lib/result_type'; -import { TaskManagerConfig } from './config'; - -import { asTaskManagerStatEvent, isTaskRunEvent, isTaskPollingCycleEvent } from './task_events'; -import { Middleware } from './lib/middleware'; -import { EphemeralTaskInstance } from './task'; -import { TaskTypeDictionary } from './task_type_dictionary'; -import { TaskLifecycleEvent } from './polling_lifecycle'; -import { EphemeralTaskManagerRunner } from './task_running/ephemeral_task_runner'; -import { TaskPool } from './task_pool'; - -export interface EphemeralTaskLifecycleOpts { - logger: Logger; - definitions: TaskTypeDictionary; - config: TaskManagerConfig; - middleware: Middleware; - elasticsearchAndSOAvailability$: Observable; - pool: TaskPool; - lifecycleEvent: Observable; - executionContext: ExecutionContextStart; -} - -export type EphemeralTaskInstanceRequest = Omit; - -export class EphemeralTaskLifecycle { - private definitions: TaskTypeDictionary; - private pool: TaskPool; - private lifecycleEvent: Observable; - // all task related events (task claimed, task marked as running, etc.) are emitted through events$ - private events$ = new Subject(); - private ephemeralTaskQueue: Array<{ - task: EphemeralTaskInstanceRequest; - enqueuedAt: number; - }> = []; - private logger: Logger; - private config: TaskManagerConfig; - private middleware: Middleware; - private lifecycleSubscription: Subscription = Subscription.EMPTY; - private readonly executionContext: ExecutionContextStart; - - constructor({ - logger, - middleware, - definitions, - pool, - lifecycleEvent, - config, - executionContext, - }: EphemeralTaskLifecycleOpts) { - this.logger = logger; - this.middleware = middleware; - this.definitions = definitions; - this.pool = pool; - this.lifecycleEvent = lifecycleEvent; - this.config = config; - this.executionContext = executionContext; - - if (this.enabled) { - this.lifecycleSubscription = this.lifecycleEvent - .pipe( - filter((e) => { - const hasPollingCycleCompleted = isTaskPollingCycleEvent(e); - if (hasPollingCycleCompleted) { - this.emitEvent( - asTaskManagerStatEvent('queuedEphemeralTasks', asOk(this.queuedTasks)) - ); - } - return ( - // when a polling cycle or a task run have just completed - (hasPollingCycleCompleted || isTaskRunEvent(e)) && - // we want to know when the queue has ephemeral task run requests - this.queuedTasks > 0 && - this.getCapacity() > 0 - ); - }) - ) - .subscribe((e) => { - let overallCapacity = this.getCapacity(); - const capacityByType = new Map(); - const tasksWithinCapacity = [...this.ephemeralTaskQueue] - .filter(({ task }) => { - if (overallCapacity > 0) { - if (!capacityByType.has(task.taskType)) { - capacityByType.set(task.taskType, this.getCapacity(task.taskType)); - } - if (capacityByType.get(task.taskType)! > 0) { - overallCapacity--; - capacityByType.set(task.taskType, capacityByType.get(task.taskType)! - 1); - return true; - } - } - }) - .map((ephemeralTask) => { - const index = this.ephemeralTaskQueue.indexOf(ephemeralTask); - if (index >= 0) { - this.ephemeralTaskQueue.splice(index, 1); - } - this.emitEvent( - asTaskManagerStatEvent( - 'ephemeralTaskDelay', - asOk(Date.now() - ephemeralTask.enqueuedAt) - ) - ); - return this.createTaskRunnerForTask(ephemeralTask.task); - }); - - if (tasksWithinCapacity.length) { - this.pool - .run(tasksWithinCapacity) - .then((successTaskPoolRunResult) => { - this.logger.debug( - `Successful ephemeral task lifecycle resulted in: ${successTaskPoolRunResult}` - ); - }) - .catch((error) => { - this.logger.debug(`Failed ephemeral task lifecycle resulted in: ${error}`); - }); - } - }); - } - } - - public get enabled(): boolean { - return this.config.ephemeral_tasks.enabled; - } - - public get events(): Observable { - return this.events$; - } - - private getCapacity = (taskType?: string) => - taskType && this.definitions.get(taskType)?.maxConcurrency - ? Math.max( - Math.min( - this.pool.availableCapacity(), - this.definitions.get(taskType)!.maxConcurrency! - - this.pool.getUsedCapacityByType(taskType) - ), - 0 - ) - : this.pool.availableCapacity(); - - private emitEvent = (event: TaskLifecycleEvent) => { - this.events$.next(event); - }; - - public attemptToRun(task: EphemeralTaskInstanceRequest) { - if (this.lifecycleSubscription.closed) { - return asErr(task); - } - return pushIntoSetWithTimestamp( - this.ephemeralTaskQueue, - this.config.ephemeral_tasks.request_capacity, - task - ); - } - - public get queuedTasks() { - return this.ephemeralTaskQueue.length; - } - - private createTaskRunnerForTask = ( - instance: EphemeralTaskInstanceRequest - ): EphemeralTaskManagerRunner => { - return new EphemeralTaskManagerRunner({ - logger: this.logger, - instance: { - ...instance, - startedAt: new Date(), - }, - definitions: this.definitions, - beforeRun: this.middleware.beforeRun, - beforeMarkRunning: this.middleware.beforeMarkRunning, - onTaskEvent: this.emitEvent, - executionContext: this.executionContext, - }); - }; -} - -/** - * Pushes values into a bounded set - * @param set A Set of generic type T - * @param maxCapacity How many values are we allowed to push into the set - * @param value A value T to push into the set if it is there - */ -function pushIntoSetWithTimestamp( - set: Array<{ - task: EphemeralTaskInstanceRequest; - enqueuedAt: number; - }>, - maxCapacity: number, - task: EphemeralTaskInstanceRequest -): Result { - if (set.length >= maxCapacity) { - return asErr(task); - } - set.push({ task, enqueuedAt: Date.now() }); - return asOk(task); -} diff --git a/x-pack/plugins/task_manager/server/index.ts b/x-pack/plugins/task_manager/server/index.ts index 79ec16b52987..fbd7a6ecfd00 100644 --- a/x-pack/plugins/task_manager/server/index.ts +++ b/x-pack/plugins/task_manager/server/index.ts @@ -17,7 +17,6 @@ export const plugin = async (initContext: PluginInitializerContext) => { export type { TaskInstance, ConcreteTaskInstance, - EphemeralTask, TaskRunCreatorFunction, RunContext, IntervalSchedule, @@ -32,7 +31,6 @@ export { isUnrecoverableError, throwUnrecoverableError, throwRetryableError, - isEphemeralTaskRejectedDueToCapacityError, createTaskRunError, TaskErrorSource, } from './task_running'; @@ -57,14 +55,6 @@ export const config: PluginConfigDescriptor = { schema: configSchema, deprecations: ({ deprecate }) => { return [ - deprecate('ephemeral_tasks.enabled', 'a future version', { - level: 'warning', - message: `Configuring "xpack.task_manager.ephemeral_tasks.enabled" is deprecated and will be removed in a future version. Remove this setting to increase task execution resiliency.`, - }), - deprecate('ephemeral_tasks.request_capacity', 'a future version', { - level: 'warning', - message: `Configuring "xpack.task_manager.ephemeral_tasks.request_capacity" is deprecated and will be removed in a future version. Remove this setting to increase task execution resiliency.`, - }), deprecate('max_workers', 'a future version', { level: 'warning', message: `Configuring "xpack.task_manager.max_workers" is deprecated and will be removed in a future version. Remove this setting and use "xpack.task_manager.capacity" instead.`, diff --git a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts index ab1d1bc0498f..f7b124ba02f8 100644 --- a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts +++ b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts @@ -69,10 +69,6 @@ describe('managed configuration', () => { }, custom: {}, }, - ephemeral_tasks: { - enabled: true, - request_capacity: 10, - }, unsafe: { exclude_task_types: [], authenticate_background_task_utilization: true, @@ -192,10 +188,6 @@ describe('managed configuration', () => { }, custom: {}, }, - ephemeral_tasks: { - enabled: true, - request_capacity: 10, - }, unsafe: { exclude_task_types: [], authenticate_background_task_utilization: true, @@ -318,10 +310,6 @@ describe('managed configuration', () => { }, custom: {}, }, - ephemeral_tasks: { - enabled: true, - request_capacity: 10, - }, unsafe: { exclude_task_types: [], authenticate_background_task_utilization: true, diff --git a/x-pack/plugins/task_manager/server/lib/calculate_health_status.test.ts b/x-pack/plugins/task_manager/server/lib/calculate_health_status.test.ts index b973a5c1cd5e..28f083e68b82 100644 --- a/x-pack/plugins/task_manager/server/lib/calculate_health_status.test.ts +++ b/x-pack/plugins/task_manager/server/lib/calculate_health_status.test.ts @@ -42,10 +42,6 @@ const config = { }, custom: {}, }, - ephemeral_tasks: { - enabled: false, - request_capacity: 10, - }, unsafe: { exclude_task_types: [], authenticate_background_task_utilization: true, @@ -195,7 +191,6 @@ const getStatsWithTimestamp = ({ persistence: { recurring: 95, non_recurring: 5, - ephemeral: 0, }, result_frequency_percent_as_number: { taskType1: { diff --git a/x-pack/plugins/task_manager/server/lib/log_health_metrics.test.ts b/x-pack/plugins/task_manager/server/lib/log_health_metrics.test.ts index 7df06865d30e..739d9638fa42 100644 --- a/x-pack/plugins/task_manager/server/lib/log_health_metrics.test.ts +++ b/x-pack/plugins/task_manager/server/lib/log_health_metrics.test.ts @@ -507,7 +507,6 @@ function getMockMonitoredHealth(overrides = {}): MonitoredHealth { persistence: { [TaskPersistence.Recurring]: 10, [TaskPersistence.NonRecurring]: 10, - [TaskPersistence.Ephemeral]: 10, }, result_frequency_percent_as_number: {}, }, diff --git a/x-pack/plugins/task_manager/server/metrics/create_aggregator.test.ts b/x-pack/plugins/task_manager/server/metrics/create_aggregator.test.ts index e56d57e17055..27d41a2c9f3f 100644 --- a/x-pack/plugins/task_manager/server/metrics/create_aggregator.test.ts +++ b/x-pack/plugins/task_manager/server/metrics/create_aggregator.test.ts @@ -41,10 +41,6 @@ const config: TaskManagerConfig = { }, kibanas_per_partition: 2, allow_reading_invalid_state: false, - ephemeral_tasks: { - enabled: true, - request_capacity: 10, - }, event_loop_delay: { monitor: true, warn_threshold: 5000, diff --git a/x-pack/plugins/task_manager/server/mocks.ts b/x-pack/plugins/task_manager/server/mocks.ts index 71638bc88368..69d0d40121f1 100644 --- a/x-pack/plugins/task_manager/server/mocks.ts +++ b/x-pack/plugins/task_manager/server/mocks.ts @@ -27,10 +27,8 @@ const createStartMock = () => { bulkRemove: jest.fn(), schedule: jest.fn(), runSoon: jest.fn(), - ephemeralRunNow: jest.fn(), ensureScheduled: jest.fn(), removeIfExists: jest.fn().mockResolvedValue(Promise.resolve()), // it's a promise and there are some places where it's followed by `.catch()` - supportsEphemeralTasks: jest.fn(), bulkUpdateSchedules: jest.fn(), bulkSchedule: jest.fn(), bulkDisable: jest.fn(), diff --git a/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.test.ts b/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.test.ts index 23ef344c197f..e6e001685610 100644 --- a/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.test.ts @@ -35,12 +35,6 @@ describe('estimateCapacity', () => { execution: { duration: {}, duration_by_persistence: { - ephemeral: { - p50: 400, - p90: 500, - p95: 1200, - p99: 1500, - }, non_recurring: { p50: 400, p90: 500, @@ -56,7 +50,6 @@ describe('estimateCapacity', () => { }, // no non-recurring executions in the system in recent history persistence: { - ephemeral: 0, non_recurring: 0, recurring: 100, }, @@ -92,12 +85,6 @@ describe('estimateCapacity', () => { execution: { duration: {}, duration_by_persistence: { - ephemeral: { - p50: 2400, - p90: 2500, - p95: 3200, - p99: 3500, - }, non_recurring: { p50: 1400, p90: 1500, @@ -113,7 +100,6 @@ describe('estimateCapacity', () => { }, // no non-recurring executions in the system in recent history persistence: { - ephemeral: 0, non_recurring: 0, recurring: 100, }, @@ -153,7 +139,6 @@ describe('estimateCapacity', () => { duration_by_persistence: {}, // no non-recurring executions in the system in recent history persistence: { - ephemeral: 0, non_recurring: 0, recurring: 100, }, @@ -189,12 +174,6 @@ describe('estimateCapacity', () => { execution: { duration: {}, duration_by_persistence: { - ephemeral: { - p50: 400, - p90: 500, - p95: 1200, - p99: 1500, - }, non_recurring: { p50: 400, p90: 500, @@ -210,7 +189,6 @@ describe('estimateCapacity', () => { }, // no non-recurring executions in the system in recent history persistence: { - ephemeral: 0, non_recurring: 0, recurring: 100, }, @@ -247,12 +225,6 @@ describe('estimateCapacity', () => { execution: { duration: {}, duration_by_persistence: { - ephemeral: { - p50: 400, - p90: 500, - p95: 1200, - p99: 1500, - }, non_recurring: { p50: 400, p90: 500, @@ -268,7 +240,6 @@ describe('estimateCapacity', () => { }, // no non-recurring executions in the system in recent history persistence: { - ephemeral: 0, non_recurring: 0, recurring: 100, }, @@ -304,12 +275,6 @@ describe('estimateCapacity', () => { execution: { duration: {}, duration_by_persistence: { - ephemeral: { - p50: 400, - p90: 500, - p95: 1200, - p99: 1500, - }, non_recurring: { p50: 400, p90: 500, @@ -325,7 +290,6 @@ describe('estimateCapacity', () => { }, // no non-recurring executions in the system in recent history persistence: { - ephemeral: 0, non_recurring: 0, recurring: 100, }, @@ -374,12 +338,6 @@ describe('estimateCapacity', () => { execution: { duration: {}, duration_by_persistence: { - ephemeral: { - p50: 400, - p90: 500, - p95: 1200, - p99: 1500, - }, non_recurring: { p50: 400, p90: 500, @@ -394,8 +352,6 @@ describe('estimateCapacity', () => { }, }, persistence: { - // 50% of tasks are non-recurring/ephemeral executions in the system in recent history - ephemeral: 25, non_recurring: 25, recurring: 50, }, @@ -418,7 +374,7 @@ describe('estimateCapacity', () => { }); }); - test('estimates the min required kibana instances when there is sufficient capacity for recurring but not for non-recurring/ephemeral', async () => { + test('estimates the min required kibana instances when there is sufficient capacity for recurring but not for non-recurring', async () => { const provisionedKibanaInstances = 2; const recurringTasksPerMinute = 251; // 50% for non-recurring/epehemral + half of recurring task workload @@ -456,12 +412,6 @@ describe('estimateCapacity', () => { execution: { duration: {}, duration_by_persistence: { - ephemeral: { - p50: 400, - p90: 500, - p95: 1200, - p99: 1500, - }, non_recurring: { p50: 400, p90: 500, @@ -476,8 +426,6 @@ describe('estimateCapacity', () => { }, }, persistence: { - // 50% of tasks are non-recurring/ephemeral executions in the system in recent history - ephemeral: 25, non_recurring: 25, recurring: 50, }, @@ -541,12 +489,6 @@ describe('estimateCapacity', () => { execution: { duration: {}, duration_by_persistence: { - ephemeral: { - p50: 400, - p90: 500, - p95: 1200, - p99: 1500, - }, non_recurring: { p50: 400, p90: 500, @@ -562,7 +504,6 @@ describe('estimateCapacity', () => { }, // 20% average of non-recurring executions in the system in recent history persistence: { - ephemeral: 0, non_recurring: 20, recurring: 80, }, @@ -607,12 +548,6 @@ describe('estimateCapacity', () => { execution: { duration: {}, duration_by_persistence: { - ephemeral: { - p50: 400, - p90: 500, - p95: 1200, - p99: 1500, - }, non_recurring: { p50: 400, p90: 500, @@ -628,7 +563,6 @@ describe('estimateCapacity', () => { }, // no non-recurring executions in the system in recent history persistence: { - ephemeral: 0, non_recurring: 20, recurring: 80, }, @@ -673,12 +607,6 @@ describe('estimateCapacity', () => { execution: { duration: {}, duration_by_persistence: { - ephemeral: { - p50: 400, - p90: 500, - p95: 1200, - p99: 1500, - }, non_recurring: { p50: 400, p90: 500, @@ -694,7 +622,6 @@ describe('estimateCapacity', () => { }, // 20% average of non-recurring executions in the system in recent history persistence: { - ephemeral: 0, non_recurring: 20, recurring: 80, }, @@ -739,12 +666,6 @@ describe('estimateCapacity', () => { execution: { duration: {}, duration_by_persistence: { - ephemeral: { - p50: 400, - p90: 500, - p95: 1200, - p99: 1500, - }, non_recurring: { p50: 400, p90: 500, @@ -761,7 +682,6 @@ describe('estimateCapacity', () => { persistence: { recurring: 0, non_recurring: 70, - ephemeral: 30, }, result_frequency_percent_as_number: {}, }, @@ -776,7 +696,7 @@ describe('estimateCapacity', () => { observed: { observed_kibana_instances: 1, avg_recurring_required_throughput_per_minute: 29, - // we obesrve 100% capacity on non-recurring/ephemeral tasks, which is 200tpm + // we obesrve 100% capacity on non-recurring tasks, which is 200tpm // and add to that the 29tpm for recurring tasks avg_required_throughput_per_minute_per_kibana: 229, }, @@ -816,12 +736,6 @@ describe('estimateCapacity', () => { execution: { duration: {}, duration_by_persistence: { - ephemeral: { - p50: 400, - p90: 500, - p95: 1200, - p99: 1500, - }, non_recurring: { p50: 400, p90: 500, @@ -838,7 +752,6 @@ describe('estimateCapacity', () => { persistence: { recurring: 0, non_recurring: 70, - ephemeral: 30, }, result_frequency_percent_as_number: {}, }, @@ -853,12 +766,12 @@ describe('estimateCapacity', () => { observed: { observed_kibana_instances: 1, avg_recurring_required_throughput_per_minute: 210, - // we obesrve 100% capacity on non-recurring/ephemeral tasks, which is 200tpm + // we obesrve 100% capacity on non-recurring tasks, which is 200tpm // and add to that the 210tpm for recurring tasks avg_required_throughput_per_minute_per_kibana: 410, }, proposed: { - // we propose provisioning 3 instances for recurring + non-recurring/ephemeral + // we propose provisioning 3 instances for recurring + non-recurring provisioned_kibana: 3, // but need at least 2 for recurring min_required_kibana: 2, @@ -890,12 +803,6 @@ describe('estimateCapacity', () => { execution: { duration: {}, duration_by_persistence: { - ephemeral: { - p50: 400, - p90: 500, - p95: 1200, - p99: 1500, - }, non_recurring: { p50: 400, p90: 500, @@ -911,7 +818,6 @@ describe('estimateCapacity', () => { }, // no non-recurring executions in the system in recent history persistence: { - ephemeral: 0, non_recurring: 0, recurring: 100, }, @@ -935,30 +841,6 @@ function mockStats( runtime: Partial['runtime']['value']> = {} ): CapacityEstimationParams { return { - ephemeral: { - status: HealthStatus.OK, - timestamp: new Date().toISOString(), - value: { - load: { - p50: 4, - p90: 6, - p95: 6, - p99: 6, - }, - executionsPerCycle: { - p50: 4, - p90: 6, - p95: 6, - p99: 6, - }, - queuedTasks: { - p50: 4, - p90: 6, - p95: 6, - p99: 6, - }, - }, - }, configuration: { status: HealthStatus.OK, timestamp: new Date().toISOString(), @@ -1026,12 +908,6 @@ function mockStats( execution: { duration: {}, duration_by_persistence: { - ephemeral: { - p50: 400, - p90: 500, - p95: 1200, - p99: 1500, - }, non_recurring: { p50: 400, p90: 500, @@ -1046,7 +922,6 @@ function mockStats( }, }, persistence: { - ephemeral: 0, non_recurring: 30, recurring: 70, }, diff --git a/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.ts b/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.ts index acbf1284b21b..8b25cfbf3693 100644 --- a/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.ts +++ b/x-pack/plugins/task_manager/server/monitoring/capacity_estimation.ts @@ -104,9 +104,9 @@ export function estimateCapacity( /** * On average, how much of this kibana's capacity has been historically used to execute - * non-recurring and ephemeral tasks + * non-recurring tasks */ - const averageCapacityUsedByNonRecurringAndEphemeralTasksPerKibana = percentageOf( + const averageCapacityUsedByNonRecurringTasksPerKibana = percentageOf( capacityPerMinutePerKibana, percentageOf(averageLoadPercentage, 100 - percentageOfExecutionsUsedByRecurringTasks) ); @@ -116,14 +116,14 @@ export function estimateCapacity( * for recurring tasks */ const averageCapacityAvailableForRecurringTasksPerKibana = - capacityPerMinutePerKibana - averageCapacityUsedByNonRecurringAndEphemeralTasksPerKibana; + capacityPerMinutePerKibana - averageCapacityUsedByNonRecurringTasksPerKibana; /** - * At times a cluster might experience spikes of NonRecurring/Ephemeral tasks which swamp Task Manager - * causing it to spend all its capacity on NonRecurring/Ephemeral tasks, which makes it much harder + * At times a cluster might experience spikes of NonRecurring tasks which swamp Task Manager + * causing it to spend all its capacity on NonRecurring tasks, which makes it much harder * to estimate the required capacity. * This is easy to identify as load will usually max out or all the workers are busy executing non-recurring - * or ephemeral tasks, and none are running recurring tasks. + * tasks, and none are running recurring tasks. */ const hasTooLittleCapacityToEstimateRequiredNonRecurringCapacity = averageLoadPercentage === 100 || averageCapacityAvailableForRecurringTasksPerKibana === 0; @@ -165,24 +165,24 @@ export function estimateCapacity( averageRecurringRequiredPerMinute / minRequiredKibanaInstances; /** - * assuming the historical capacity needed for ephemeral and non-recurring tasks, plus + * assuming the historical capacity needed for non-recurring tasks, plus * the amount we know each kibana would need for recurring tasks, how much capacity would * each kibana need if following the minRequiredKibanaInstances? */ const averageRequiredThroughputPerMinutePerKibana = - averageCapacityUsedByNonRecurringAndEphemeralTasksPerKibana * + averageCapacityUsedByNonRecurringTasksPerKibana * (assumedKibanaInstances / minRequiredKibanaInstances) + averageRecurringRequiredPerMinute / minRequiredKibanaInstances; const assumedAverageRecurringRequiredThroughputPerMinutePerKibana = averageRecurringRequiredPerMinute / assumedKibanaInstances; /** - * assuming the historical capacity needed for ephemeral and non-recurring tasks, plus + * assuming the historical capacity needed for non-recurring tasks, plus * the amount we know each kibana would need for recurring tasks, how much capacity would * each kibana need if the assumed current number were correct? */ const assumedRequiredThroughputPerMinutePerKibana = - averageCapacityUsedByNonRecurringAndEphemeralTasksPerKibana + + averageCapacityUsedByNonRecurringTasksPerKibana + averageRecurringRequiredPerMinute / assumedKibanaInstances; const { status, reason } = getHealthStatus(logger, { @@ -281,11 +281,7 @@ function getAverageDuration( durations: Partial> ): Result { const result = stats.mean( - [ - durations.ephemeral?.p50 ?? 0, - durations.non_recurring?.p50 ?? 0, - durations.recurring?.p50 ?? 0, - ].filter((val) => val > 0) + [durations.non_recurring?.p50 ?? 0, durations.recurring?.p50 ?? 0].filter((val) => val > 0) ); if (isNaN(result)) { return asErr(result); diff --git a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts index 1bcd3e286d4a..9a0084fcdf9e 100644 --- a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts @@ -38,10 +38,6 @@ describe('Configuration Statistics Aggregator', () => { }, custom: {}, }, - ephemeral_tasks: { - enabled: true, - request_capacity: 10, - }, unsafe: { exclude_task_types: [], authenticate_background_task_utilization: true, diff --git a/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.test.ts deleted file mode 100644 index ac16070d7c13..000000000000 --- a/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.test.ts +++ /dev/null @@ -1,384 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { v4 as uuidv4 } from 'uuid'; -import { Subject, Observable } from 'rxjs'; -import stats from 'stats-lite'; -import { take, bufferCount, skip, map } from 'rxjs'; - -import { ConcreteTaskInstance, TaskStatus } from '../task'; -import { - asTaskRunEvent, - TaskTiming, - asTaskManagerStatEvent, - TaskPersistence, -} from '../task_events'; -import { asOk } from '../lib/result_type'; -import { TaskLifecycleEvent } from '../polling_lifecycle'; -import { TaskRunResult } from '../task_running'; -import { - createEphemeralTaskAggregator, - summarizeEphemeralStat, - SummarizedEphemeralTaskStat, - EphemeralTaskStat, -} from './ephemeral_task_statistics'; -import { AggregatedStat } from '../lib/runtime_statistics_aggregator'; -import { ephemeralTaskLifecycleMock } from '../ephemeral_task_lifecycle.mock'; -import { times, takeRight, take as takeLeft } from 'lodash'; - -describe('Ephemeral Task Statistics', () => { - test('returns the average size of the ephemeral queue', async () => { - const queueSize = [2, 6, 10, 10, 10, 6, 2, 0, 0]; - const events$ = new Subject(); - const getQueuedTasks = jest.fn(); - const ephemeralTaskLifecycle = ephemeralTaskLifecycleMock.create({ - events$: events$ as Observable, - getQueuedTasks, - }); - - const runningAverageWindowSize = 5; - const ephemeralTaskAggregator = createEphemeralTaskAggregator( - ephemeralTaskLifecycle, - runningAverageWindowSize, - 10 - ); - - function expectWindowEqualsUpdate( - taskStat: AggregatedStat, - window: number[] - ) { - expect(taskStat.value.queuedTasks).toMatchObject({ - p50: stats.percentile(window, 0.5), - p90: stats.percentile(window, 0.9), - p95: stats.percentile(window, 0.95), - p99: stats.percentile(window, 0.99), - }); - } - - return new Promise((resolve) => { - ephemeralTaskAggregator - .pipe( - // skip initial stat which is just initialized data which - // ensures we don't stall on combineLatest - skip(1), - // Use 'summarizeEphemeralStat' to receive summarize stats - map(({ key, value }: AggregatedStat) => ({ - key, - value: summarizeEphemeralStat(value).value, - })), - take(queueSize.length), - bufferCount(queueSize.length) - ) - .subscribe((taskStats: Array>) => { - expectWindowEqualsUpdate(taskStats[0], queueSize.slice(0, 1)); - expectWindowEqualsUpdate(taskStats[1], queueSize.slice(0, 2)); - expectWindowEqualsUpdate(taskStats[2], queueSize.slice(0, 3)); - expectWindowEqualsUpdate(taskStats[3], queueSize.slice(0, 4)); - expectWindowEqualsUpdate(taskStats[4], queueSize.slice(0, 5)); - // from the 6th value, begin to drop old values as out window is 5 - expectWindowEqualsUpdate(taskStats[5], queueSize.slice(1, 6)); - expectWindowEqualsUpdate(taskStats[6], queueSize.slice(2, 7)); - expectWindowEqualsUpdate(taskStats[7], queueSize.slice(3, 8)); - resolve(); - }); - - for (const size of queueSize) { - events$.next(asTaskManagerStatEvent('queuedEphemeralTasks', asOk(size))); - } - }); - }); - - test('returns the average number of ephemeral tasks executed per polling cycle', async () => { - const tasksQueueSize = [5, 2, 5, 0]; - const executionsPerCycle = [5, 0, 5]; - // we expect one event per "task queue size event", and we simmulate - // tasks being drained after each one of theseevents, so we expect - // the first cycle to show zero drained tasks - const expectedTasksDrainedEvents = [0, ...executionsPerCycle]; - - const events$ = new Subject(); - const getQueuedTasks = jest.fn(); - const ephemeralTaskLifecycle = ephemeralTaskLifecycleMock.create({ - events$: events$ as Observable, - getQueuedTasks, - }); - - const runningAverageWindowSize = 5; - const ephemeralTaskAggregator = createEphemeralTaskAggregator( - ephemeralTaskLifecycle, - runningAverageWindowSize, - 10 - ); - - function expectWindowEqualsUpdate( - taskStat: AggregatedStat, - window: number[] - ) { - expect(taskStat.value.executionsPerCycle).toMatchObject({ - p50: stats.percentile(window, 0.5), - p90: stats.percentile(window, 0.9), - p95: stats.percentile(window, 0.95), - p99: stats.percentile(window, 0.99), - }); - } - - return new Promise((resolve) => { - ephemeralTaskAggregator - .pipe( - // skip initial stat which is just initialized data which - // ensures we don't stall on combineLatest - skip(1), - // Use 'summarizeEphemeralStat' to receive summarize stats - map(({ key, value }: AggregatedStat) => ({ - key, - value: summarizeEphemeralStat(value).value, - })), - take(tasksQueueSize.length), - bufferCount(tasksQueueSize.length) - ) - .subscribe((taskStats: Array>) => { - taskStats.forEach((taskStat, index) => { - expectWindowEqualsUpdate( - taskStat, - takeRight(takeLeft(expectedTasksDrainedEvents, index + 1), runningAverageWindowSize) - ); - }); - resolve(); - }); - - for (const tasksDrainedInCycle of executionsPerCycle) { - events$.next( - asTaskManagerStatEvent('queuedEphemeralTasks', asOk(tasksQueueSize.shift() ?? 0)) - ); - times(tasksDrainedInCycle, () => { - events$.next(mockTaskRunEvent()); - }); - } - events$.next( - asTaskManagerStatEvent('queuedEphemeralTasks', asOk(tasksQueueSize.shift() ?? 0)) - ); - }); - }); - - test('returns the average load added per polling cycle cycle by ephemeral tasks', async () => { - const tasksExecuted = [0, 5, 10, 10, 10, 5, 5, 0, 0, 0, 0, 0]; - const expectedLoad = [0, 50, 100, 100, 100, 50, 50, 0, 0, 0, 0, 0]; - - const events$ = new Subject(); - const getQueuedTasks = jest.fn(); - const ephemeralTaskLifecycle = ephemeralTaskLifecycleMock.create({ - events$: events$ as Observable, - getQueuedTasks, - }); - - const runningAverageWindowSize = 5; - const capacity = 10; - const ephemeralTaskAggregator = createEphemeralTaskAggregator( - ephemeralTaskLifecycle, - runningAverageWindowSize, - capacity - ); - - function expectWindowEqualsUpdate( - taskStat: AggregatedStat, - window: number[] - ) { - expect(taskStat.value.load).toMatchObject({ - p50: stats.percentile(window, 0.5), - p90: stats.percentile(window, 0.9), - p95: stats.percentile(window, 0.95), - p99: stats.percentile(window, 0.99), - }); - } - - return new Promise((resolve) => { - ephemeralTaskAggregator - .pipe( - // skip initial stat which is just initialized data which - // ensures we don't stall on combineLatest - skip(1), - // Use 'summarizeEphemeralStat' to receive summarize stats - map(({ key, value }: AggregatedStat) => ({ - key, - value: summarizeEphemeralStat(value).value, - })), - take(tasksExecuted.length), - bufferCount(tasksExecuted.length) - ) - .subscribe((taskStats: Array>) => { - taskStats.forEach((taskStat, index) => { - expectWindowEqualsUpdate( - taskStat, - takeRight(takeLeft(expectedLoad, index + 1), runningAverageWindowSize) - ); - }); - resolve(); - }); - - for (const tasksExecutedInCycle of tasksExecuted) { - times(tasksExecutedInCycle, () => { - events$.next(mockTaskRunEvent()); - }); - events$.next(asTaskManagerStatEvent('queuedEphemeralTasks', asOk(0))); - } - }); - }); -}); - -test('returns the average load added per polling cycle cycle by ephemeral tasks when load exceeds capacity', async () => { - const tasksExecuted = [0, 5, 10, 20, 15, 10, 5, 0, 0, 0, 0, 0]; - const expectedLoad = [0, 50, 100, 200, 150, 100, 50, 0, 0, 0, 0, 0]; - - const events$ = new Subject(); - const getQueuedTasks = jest.fn(); - const ephemeralTaskLifecycle = ephemeralTaskLifecycleMock.create({ - events$: events$ as Observable, - getQueuedTasks, - }); - - const runningAverageWindowSize = 5; - const capacity = 10; - const ephemeralTaskAggregator = createEphemeralTaskAggregator( - ephemeralTaskLifecycle, - runningAverageWindowSize, - capacity - ); - - function expectWindowEqualsUpdate( - taskStat: AggregatedStat, - window: number[] - ) { - expect(taskStat.value.load).toMatchObject({ - p50: stats.percentile(window, 0.5), - p90: stats.percentile(window, 0.9), - p95: stats.percentile(window, 0.95), - p99: stats.percentile(window, 0.99), - }); - } - - return new Promise((resolve) => { - ephemeralTaskAggregator - .pipe( - // skip initial stat which is just initialized data which - // ensures we don't stall on combineLatest - skip(1), - // Use 'summarizeEphemeralStat' to receive summarize stats - map(({ key, value }: AggregatedStat) => ({ - key, - value: summarizeEphemeralStat(value).value, - })), - take(tasksExecuted.length), - bufferCount(tasksExecuted.length) - ) - .subscribe((taskStats: Array>) => { - taskStats.forEach((taskStat, index) => { - expectWindowEqualsUpdate( - taskStat, - takeRight(takeLeft(expectedLoad, index + 1), runningAverageWindowSize) - ); - }); - resolve(); - }); - - for (const tasksExecutedInCycle of tasksExecuted) { - times(tasksExecutedInCycle, () => { - events$.next(mockTaskRunEvent()); - }); - events$.next(asTaskManagerStatEvent('queuedEphemeralTasks', asOk(0))); - } - }); -}); - -test('returns the average delay experienced by tasks in the ephemeral queue', async () => { - const taskDelays = [100, 150, 500, 100, 100, 200, 2000, 10000, 20000, 100]; - - const events$ = new Subject(); - const getQueuedTasks = jest.fn(); - const ephemeralTaskLifecycle = ephemeralTaskLifecycleMock.create({ - events$: events$ as Observable, - getQueuedTasks, - }); - - const runningAverageWindowSize = 5; - const ephemeralTaskAggregator = createEphemeralTaskAggregator( - ephemeralTaskLifecycle, - runningAverageWindowSize, - 10 - ); - - function expectWindowEqualsUpdate( - taskStat: AggregatedStat, - window: number[] - ) { - expect(taskStat.value.delay).toMatchObject({ - p50: stats.percentile(window, 0.5), - p90: stats.percentile(window, 0.9), - p95: stats.percentile(window, 0.95), - p99: stats.percentile(window, 0.99), - }); - } - - return new Promise((resolve) => { - ephemeralTaskAggregator - .pipe( - // skip initial stat which is just initialized data which - // ensures we don't stall on combineLatest - skip(1), - // Use 'summarizeEphemeralStat' to receive summarize stats - map(({ key, value }: AggregatedStat) => ({ - key, - value: summarizeEphemeralStat(value).value, - })), - take(taskDelays.length), - bufferCount(taskDelays.length) - ) - .subscribe((taskStats: Array>) => { - taskStats.forEach((taskStat, index) => { - expectWindowEqualsUpdate( - taskStat, - takeRight(takeLeft(taskDelays, index + 1), runningAverageWindowSize) - ); - }); - resolve(); - }); - - for (const delay of taskDelays) { - events$.next(asTaskManagerStatEvent('ephemeralTaskDelay', asOk(delay))); - } - }); -}); - -const mockTaskRunEvent = ( - overrides: Partial = {}, - timing: TaskTiming = { - start: 0, - stop: 0, - }, - result: TaskRunResult = TaskRunResult.Success -) => { - const task = mockTaskInstance(overrides); - const persistence = TaskPersistence.Recurring; - return asTaskRunEvent(task.id, asOk({ task, persistence, result, isExpired: false }), timing); -}; - -const mockTaskInstance = (overrides: Partial = {}): ConcreteTaskInstance => ({ - id: uuidv4(), - attempts: 0, - status: TaskStatus.Running, - version: '123', - runAt: new Date(), - scheduledAt: new Date(), - startedAt: new Date(), - retryAt: new Date(Date.now() + 5 * 60 * 1000), - state: {}, - taskType: 'alerting:test', - params: { - alertId: '1', - }, - ownerId: null, - ...overrides, -}); diff --git a/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.ts deleted file mode 100644 index d02080a56a1a..000000000000 --- a/x-pack/plugins/task_manager/server/monitoring/ephemeral_task_statistics.ts +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { map, filter, startWith, buffer, share } from 'rxjs'; -import { JsonObject } from '@kbn/utility-types'; -import { combineLatest, Observable, zip } from 'rxjs'; -import { isOk, Ok } from '../lib/result_type'; -import { AggregatedStat, AggregatedStatProvider } from '../lib/runtime_statistics_aggregator'; -import { EphemeralTaskLifecycle } from '../ephemeral_task_lifecycle'; -import { TaskLifecycleEvent } from '../polling_lifecycle'; -import { isTaskRunEvent, isTaskManagerStatEvent } from '../task_events'; -import { - AveragedStat, - calculateRunningAverage, - createRunningAveragedStat, -} from './task_run_calculators'; -import { HealthStatus } from './monitoring_stats_stream'; - -export interface EphemeralTaskStat extends JsonObject { - queuedTasks: number[]; - executionsPerCycle: number[]; - load: number[]; - delay: number[]; -} - -export interface SummarizedEphemeralTaskStat extends JsonObject { - queuedTasks: AveragedStat; - executionsPerCycle: AveragedStat; - load: AveragedStat; -} -export function createEphemeralTaskAggregator( - ephemeralTaskLifecycle: EphemeralTaskLifecycle, - runningAverageWindowSize: number, - capacity: number -): AggregatedStatProvider { - const ephemeralTaskRunEvents$ = ephemeralTaskLifecycle.events.pipe( - filter((taskEvent: TaskLifecycleEvent) => isTaskRunEvent(taskEvent)) - ); - - const ephemeralQueueSizeEvents$: Observable = ephemeralTaskLifecycle.events.pipe( - filter( - (taskEvent: TaskLifecycleEvent) => - isTaskManagerStatEvent(taskEvent) && - taskEvent.id === 'queuedEphemeralTasks' && - isOk(taskEvent.event) - ), - map((taskEvent: TaskLifecycleEvent) => { - return (taskEvent.event as unknown as Ok).value; - }), - // as we consume this stream twice below (in the buffer, and the zip) - // we want to use share, otherwise ther'll be 2 subscribers and both will emit event - share() - ); - - const ephemeralQueueExecutionsPerCycleQueue = - createRunningAveragedStat(runningAverageWindowSize); - const ephemeralQueuedTasksQueue = createRunningAveragedStat(runningAverageWindowSize); - const ephemeralTaskLoadQueue = createRunningAveragedStat(runningAverageWindowSize); - const ephemeralPollingCycleBasedStats$ = zip( - ephemeralTaskRunEvents$.pipe( - buffer(ephemeralQueueSizeEvents$), - map((taskEvents: TaskLifecycleEvent[]) => taskEvents.length) - ), - ephemeralQueueSizeEvents$ - ).pipe( - map(([tasksRanSincePreviousQueueSize, ephemeralQueueSize]) => ({ - queuedTasks: ephemeralQueuedTasksQueue(ephemeralQueueSize), - executionsPerCycle: ephemeralQueueExecutionsPerCycleQueue(tasksRanSincePreviousQueueSize), - load: ephemeralTaskLoadQueue(calculateWorkerLoad(capacity, tasksRanSincePreviousQueueSize)), - })), - startWith({ - queuedTasks: [], - executionsPerCycle: [], - load: [], - }) - ); - - const ephemeralTaskDelayQueue = createRunningAveragedStat(runningAverageWindowSize); - const ephemeralTaskDelayEvents$: Observable = ephemeralTaskLifecycle.events.pipe( - filter( - (taskEvent: TaskLifecycleEvent) => - isTaskManagerStatEvent(taskEvent) && - taskEvent.id === 'ephemeralTaskDelay' && - isOk(taskEvent.event) - ), - map((taskEvent: TaskLifecycleEvent) => { - return ephemeralTaskDelayQueue((taskEvent.event as unknown as Ok).value); - }), - startWith([]) - ); - - return combineLatest([ephemeralPollingCycleBasedStats$, ephemeralTaskDelayEvents$]).pipe( - map(([stats, delay]: [Omit, EphemeralTaskStat['delay']]) => { - return { - key: 'ephemeral', - value: { ...stats, delay }, - } as AggregatedStat; - }) - ); -} - -function calculateWorkerLoad(maxWorkers: number, tasksExecuted: number) { - return Math.round((tasksExecuted * 100) / maxWorkers); -} - -export function summarizeEphemeralStat({ - queuedTasks, - executionsPerCycle, - load, - delay, -}: EphemeralTaskStat): { value: SummarizedEphemeralTaskStat; status: HealthStatus } { - return { - value: { - queuedTasks: calculateRunningAverage(queuedTasks.length ? queuedTasks : [0]), - load: calculateRunningAverage(load.length ? load : [0]), - executionsPerCycle: calculateRunningAverage( - executionsPerCycle.length ? executionsPerCycle : [0] - ), - delay: calculateRunningAverage(delay.length ? delay : [0]), - }, - status: HealthStatus.OK, - }; -} diff --git a/x-pack/plugins/task_manager/server/monitoring/index.ts b/x-pack/plugins/task_manager/server/monitoring/index.ts index 5dc024b53de1..fdcfe8aecebf 100644 --- a/x-pack/plugins/task_manager/server/monitoring/index.ts +++ b/x-pack/plugins/task_manager/server/monitoring/index.ts @@ -16,7 +16,6 @@ import { import { TaskStore } from '../task_store'; import { TaskPollingLifecycle } from '../polling_lifecycle'; import { ManagedConfiguration } from '../lib/create_managed_configuration'; -import { EphemeralTaskLifecycle } from '../ephemeral_task_lifecycle'; import { AdHocTaskCounter } from '../lib/adhoc_task_counter'; import { TaskTypeDictionary } from '../task_type_dictionary'; @@ -37,7 +36,6 @@ export interface CreateMonitoringStatsOpts { adHocTaskCounter: AdHocTaskCounter; taskDefinitions: TaskTypeDictionary; taskPollingLifecycle?: TaskPollingLifecycle; - ephemeralTaskLifecycle?: EphemeralTaskLifecycle; } export function createMonitoringStats( diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts index 1237af9e68eb..b89f242741b0 100644 --- a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts @@ -16,12 +16,6 @@ import { SummarizedWorkloadStat, WorkloadStat, } from './workload_statistics'; -import { - EphemeralTaskStat, - createEphemeralTaskAggregator, - SummarizedEphemeralTaskStat, - summarizeEphemeralStat, -} from './ephemeral_task_statistics'; import { createTaskRunAggregator, summarizeTaskRunStat, @@ -45,7 +39,6 @@ export interface MonitoringStats { configuration?: MonitoredStat; workload?: MonitoredStat; runtime?: MonitoredStat; - ephemeral?: MonitoredStat; utilization?: MonitoredStat; }; } @@ -72,7 +65,6 @@ export interface RawMonitoringStats { configuration?: RawMonitoredStat; workload?: RawMonitoredStat; runtime?: RawMonitoredStat; - ephemeral?: RawMonitoredStat; capacity_estimation?: RawMonitoredStat; }; } @@ -86,7 +78,6 @@ export function createAggregators({ taskDefinitions, adHocTaskCounter, taskPollingLifecycle, - ephemeralTaskLifecycle, }: CreateMonitoringStatsOpts): AggregatedStatProvider { const aggregators: AggregatedStatProvider[] = [ createConfigurationAggregator(config, managedConfig), @@ -111,15 +102,6 @@ export function createAggregators({ ) ); } - if (ephemeralTaskLifecycle && ephemeralTaskLifecycle.enabled) { - aggregators.push( - createEphemeralTaskAggregator( - ephemeralTaskLifecycle, - config.monitored_stats_running_average_window, - managedConfig.startingCapacity - ) - ); - } return merge(...aggregators); } @@ -156,7 +138,7 @@ export function summarizeMonitoringStats( { // eslint-disable-next-line @typescript-eslint/naming-convention last_update, - stats: { runtime, workload, configuration, ephemeral, utilization }, + stats: { runtime, workload, configuration, utilization }, }: MonitoringStats, config: TaskManagerConfig, assumedKibanaInstances: number @@ -188,14 +170,6 @@ export function summarizeMonitoringStats( }, } : {}), - ...(ephemeral - ? { - ephemeral: { - timestamp: ephemeral.timestamp, - ...summarizeEphemeralStat(ephemeral.value), - }, - } - : {}), }, assumedKibanaInstances ); diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts index fa6fbe0d3a2b..800e2129b706 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts @@ -455,10 +455,7 @@ describe('Task Run Statistics', () => { { start: 0, stop: 0 }, TaskRunResult.Success ), - mockTaskRunEvent({}, { start: 0, stop: 0 }, TaskRunResult.Success, TaskPersistence.Ephemeral), - mockTaskRunEvent({}, { start: 0, stop: 0 }, TaskRunResult.Success, TaskPersistence.Ephemeral), mockTaskRunEvent({}, { start: 0, stop: 0 }, TaskRunResult.Success), - mockTaskRunEvent({}, { start: 0, stop: 0 }, TaskRunResult.Success, TaskPersistence.Ephemeral), mockTaskRunEvent( { schedule: { interval: '3s' } }, { start: 0, stop: 0 }, @@ -490,79 +487,52 @@ describe('Task Run Statistics', () => { .toMatchInlineSnapshot(` Array [ Object { - "ephemeral": 0, "non_recurring": 100, "recurring": 0, }, Object { - "ephemeral": 0, "non_recurring": 100, "recurring": 0, }, Object { - "ephemeral": 0, "non_recurring": 67, "recurring": 33, }, Object { - "ephemeral": 0, "non_recurring": 75, "recurring": 25, }, Object { - "ephemeral": 0, "non_recurring": 80, "recurring": 20, }, Object { - "ephemeral": 0, "non_recurring": 60, "recurring": 40, }, Object { - "ephemeral": 0, "non_recurring": 40, "recurring": 60, }, Object { - "ephemeral": 0, "non_recurring": 60, "recurring": 40, }, Object { - "ephemeral": 0, "non_recurring": 60, "recurring": 40, }, Object { - "ephemeral": 0, "non_recurring": 40, "recurring": 60, }, Object { - "ephemeral": 20, - "non_recurring": 40, + "non_recurring": 60, "recurring": 40, }, Object { - "ephemeral": 40, - "non_recurring": 40, - "recurring": 20, - }, - Object { - "ephemeral": 40, - "non_recurring": 40, - "recurring": 20, - }, - Object { - "ephemeral": 60, - "non_recurring": 20, - "recurring": 20, - }, - Object { - "ephemeral": 60, - "non_recurring": 20, - "recurring": 20, + "non_recurring": 60, + "recurring": 40, }, ] `); diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts index 6007508451d9..37bdf0498461 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts @@ -92,7 +92,6 @@ interface ResultFrequency extends JsonObject { export interface TaskPersistenceTypes extends JsonObject { [TaskPersistence.Recurring]: T; [TaskPersistence.NonRecurring]: T; - [TaskPersistence.Ephemeral]: T; } type ResultFrequencySummary = ResultFrequency & { @@ -247,7 +246,6 @@ export function createTaskRunAggregator( duration_by_persistence: { [TaskPersistence.Recurring]: [], [TaskPersistence.NonRecurring]: [], - [TaskPersistence.Ephemeral]: [], }, result_frequency_percent_as_number: {}, persistence: [], @@ -401,7 +399,6 @@ export function summarizeTaskRunStat( persistence: { [TaskPersistence.Recurring]: 0, [TaskPersistence.NonRecurring]: 0, - [TaskPersistence.Ephemeral]: 0, ...calculateFrequency(persistence), }, result_frequency_percent_as_number: mapValues( diff --git a/x-pack/plugins/task_manager/server/plugin.test.ts b/x-pack/plugins/task_manager/server/plugin.test.ts index 80109e062414..9592af3d75c3 100644 --- a/x-pack/plugins/task_manager/server/plugin.test.ts +++ b/x-pack/plugins/task_manager/server/plugin.test.ts @@ -17,9 +17,6 @@ import { cloudMock } from '@kbn/cloud-plugin/public/mocks'; import { taskPollingLifecycleMock } from './polling_lifecycle.mock'; import { TaskPollingLifecycle } from './polling_lifecycle'; import type { TaskPollingLifecycle as TaskPollingLifecycleClass } from './polling_lifecycle'; -import { ephemeralTaskLifecycleMock } from './ephemeral_task_lifecycle.mock'; -import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle'; -import type { EphemeralTaskLifecycle as EphemeralTaskLifecycleClass } from './ephemeral_task_lifecycle'; let mockTaskPollingLifecycle = taskPollingLifecycleMock.create({}); jest.mock('./polling_lifecycle', () => { @@ -30,15 +27,6 @@ jest.mock('./polling_lifecycle', () => { }; }); -let mockEphemeralTaskLifecycle = ephemeralTaskLifecycleMock.create({}); -jest.mock('./ephemeral_task_lifecycle', () => { - return { - EphemeralTaskLifecycle: jest.fn().mockImplementation(() => { - return mockEphemeralTaskLifecycle; - }), - }; -}); - const deleteCurrentNodeSpy = jest.spyOn(KibanaDiscoveryService.prototype, 'deleteCurrentNode'); const discoveryIsStarted = jest.spyOn(KibanaDiscoveryService.prototype, 'isStarted'); @@ -69,10 +57,6 @@ const pluginInitializerContextParams = { }, custom: {}, }, - ephemeral_tasks: { - enabled: false, - request_capacity: 10, - }, unsafe: { exclude_task_types: [], authenticate_background_task_utilization: true, @@ -94,8 +78,6 @@ describe('TaskManagerPlugin', () => { beforeEach(() => { mockTaskPollingLifecycle = taskPollingLifecycleMock.create({}); (TaskPollingLifecycle as jest.Mock).mockClear(); - mockEphemeralTaskLifecycle = ephemeralTaskLifecycleMock.create({}); - (EphemeralTaskLifecycle as jest.Mock).mockClear(); }); describe('setup', () => { @@ -164,9 +146,6 @@ describe('TaskManagerPlugin', () => { }); expect(TaskPollingLifecycle as jest.Mock).toHaveBeenCalledTimes(1); - expect( - EphemeralTaskLifecycle as jest.Mock - ).toHaveBeenCalledTimes(1); }); test('should not initialize task polling lifecycle if node.roles.backgroundTasks is false', async () => { @@ -181,9 +160,6 @@ describe('TaskManagerPlugin', () => { }); expect(TaskPollingLifecycle as jest.Mock).not.toHaveBeenCalled(); - expect( - EphemeralTaskLifecycle as jest.Mock - ).not.toHaveBeenCalled(); }); }); diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index cd820d1e7078..42a979bf9606 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -35,8 +35,7 @@ import { createManagedConfiguration } from './lib/create_managed_configuration'; import { TaskScheduling } from './task_scheduling'; import { backgroundTaskUtilizationRoute, healthRoute, metricsRoute } from './routes'; import { createMonitoringStats, MonitoringStats } from './monitoring'; -import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle'; -import { EphemeralTask, ConcreteTaskInstance } from './task'; +import { ConcreteTaskInstance } from './task'; import { registerTaskManagerUsageCollector } from './usage'; import { TASK_MANAGER_INDEX } from './constants'; import { AdHocTaskCounter } from './lib/adhoc_task_counter'; @@ -67,7 +66,6 @@ export type TaskManagerStartContract = Pick< TaskScheduling, | 'schedule' | 'runSoon' - | 'ephemeralRunNow' | 'ensureScheduled' | 'bulkUpdateSchedules' | 'bulkEnable' @@ -78,7 +76,6 @@ export type TaskManagerStartContract = Pick< Pick & { removeIfExists: TaskStore['remove']; } & { - supportsEphemeralTasks: () => boolean; getRegisteredTypes: () => string[]; }; @@ -92,7 +89,6 @@ export class TaskManagerPlugin implements Plugin { private taskPollingLifecycle?: TaskPollingLifecycle; - private ephemeralTaskLifecycle?: EphemeralTaskLifecycle; private taskManagerId?: string; private usageCounter?: UsageCounter; private config: TaskManagerConfig; @@ -218,8 +214,6 @@ export class TaskManagerPlugin usageCollection, monitoredHealth$, monitoredUtilization$, - this.config.ephemeral_tasks.enabled, - this.config.ephemeral_tasks.request_capacity, this.config.unsafe.exclude_task_types ); } @@ -350,17 +344,6 @@ export class TaskManagerPlugin ...managedConfiguration, taskPartitioner, }); - - this.ephemeralTaskLifecycle = new EphemeralTaskLifecycle({ - config: this.config!, - definitions: this.definitions, - logger: this.logger, - executionContext, - middleware: this.middleware, - elasticsearchAndSOAvailability$: this.elasticsearchAndSOAvailability$!, - pool: this.taskPollingLifecycle.pool, - lifecycleEvent: this.taskPollingLifecycle.events, - }); } createMonitoringStats({ @@ -372,7 +355,6 @@ export class TaskManagerPlugin adHocTaskCounter: this.adHocTaskCounter, taskDefinitions: this.definitions, taskPollingLifecycle: this.taskPollingLifecycle, - ephemeralTaskLifecycle: this.ephemeralTaskLifecycle, }).subscribe((stat) => this.monitoringStats$.next(stat)); metricsStream({ @@ -387,7 +369,6 @@ export class TaskManagerPlugin logger: this.logger, taskStore, middleware: this.middleware, - ephemeralTaskLifecycle: this.ephemeralTaskLifecycle, taskManagerId: taskStore.taskManagerId, }); @@ -409,9 +390,6 @@ export class TaskManagerPlugin bulkEnable: (...args) => taskScheduling.bulkEnable(...args), bulkDisable: (...args) => taskScheduling.bulkDisable(...args), bulkUpdateSchedules: (...args) => taskScheduling.bulkUpdateSchedules(...args), - ephemeralRunNow: (task: EphemeralTask) => taskScheduling.ephemeralRunNow(task), - supportsEphemeralTasks: () => - this.config.ephemeral_tasks.enabled && this.shouldRunBackgroundTasks, getRegisteredTypes: () => this.definitions.getAllTypes(), bulkUpdateState: (...args) => taskScheduling.bulkUpdateState(...args), }; diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts index a408bd3f634d..1ccbe57debe2 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts @@ -84,10 +84,6 @@ describe('TaskPollingLifecycle', () => { }, custom: {}, }, - ephemeral_tasks: { - enabled: true, - request_capacity: 10, - }, unsafe: { exclude_task_types: [], authenticate_background_task_utilization: true, diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index fb6776fa34f2..91f32d7201ea 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -26,7 +26,6 @@ import { asTaskPollingCycleEvent, TaskManagerStat, asTaskManagerStatEvent, - EphemeralTaskRejectedDueToCapacity, TaskManagerMetric, } from './task_events'; import { fillPool, FillPoolResult, TimedFillPoolResult } from './lib/fill_pool'; @@ -71,8 +70,7 @@ export type TaskLifecycleEvent = | TaskRunRequest | TaskPollingCycle | TaskManagerStat - | TaskManagerMetric - | EphemeralTaskRejectedDueToCapacity; + | TaskManagerMetric; /** * The public interface into the task manager system. diff --git a/x-pack/plugins/task_manager/server/routes/health.test.ts b/x-pack/plugins/task_manager/server/routes/health.test.ts index 1e06ea91a6fc..f8b716ba0d15 100644 --- a/x-pack/plugins/task_manager/server/routes/health.test.ts +++ b/x-pack/plugins/task_manager/server/routes/health.test.ts @@ -299,7 +299,6 @@ describe('healthRoute', () => { const warnRuntimeStat = mockHealthStats(); const warnConfigurationStat = mockHealthStats(); const warnWorkloadStat = mockHealthStats(); - const warnEphemeralStat = mockHealthStats(); const stats$ = new Subject(); @@ -334,15 +333,13 @@ describe('healthRoute', () => { stats$.next(warnConfigurationStat); await sleep(1001); stats$.next(warnWorkloadStat); - await sleep(1001); - stats$.next(warnEphemeralStat); expect(await serviceStatus).toMatchObject({ level: ServiceStatusLevels.degraded, summary: `Task Manager is unhealthy - Reason: ${reason}`, }); - expect(logHealthMetrics).toBeCalledTimes(4); + expect(logHealthMetrics).toBeCalledTimes(3); expect(logHealthMetrics.mock.calls[0][0]).toMatchObject({ id, timestamp: expect.any(String), @@ -367,14 +364,6 @@ describe('healthRoute', () => { summarizeMonitoringStats(logger, warnWorkloadStat, getTaskManagerConfig({})) ), }); - expect(logHealthMetrics.mock.calls[3][0]).toMatchObject({ - id, - timestamp: expect.any(String), - status: expect.any(String), - ...ignoreCapacityEstimation( - summarizeMonitoringStats(logger, warnEphemeralStat, getTaskManagerConfig({})) - ), - }); }); it(`logs at an error level if the status is error`, async () => { @@ -402,7 +391,6 @@ describe('healthRoute', () => { const errorRuntimeStat = mockHealthStats(); const errorConfigurationStat = mockHealthStats(); const errorWorkloadStat = mockHealthStats(); - const errorEphemeralStat = mockHealthStats(); const stats$ = new Subject(); @@ -437,15 +425,13 @@ describe('healthRoute', () => { stats$.next(errorConfigurationStat); await sleep(1001); stats$.next(errorWorkloadStat); - await sleep(1001); - stats$.next(errorEphemeralStat); expect(await serviceStatus).toMatchObject({ level: ServiceStatusLevels.degraded, summary: `Task Manager is unhealthy - Reason: ${reason}`, }); - expect(logHealthMetrics).toBeCalledTimes(4); + expect(logHealthMetrics).toBeCalledTimes(3); expect(logHealthMetrics.mock.calls[0][0]).toMatchObject({ id, timestamp: expect.any(String), @@ -470,14 +456,6 @@ describe('healthRoute', () => { summarizeMonitoringStats(logger, errorWorkloadStat, getTaskManagerConfig({})) ), }); - expect(logHealthMetrics.mock.calls[3][0]).toMatchObject({ - id, - timestamp: expect.any(String), - status: expect.any(String), - ...ignoreCapacityEstimation( - summarizeMonitoringStats(logger, errorEphemeralStat, getTaskManagerConfig({})) - ), - }); }); it('returns a error status if the overall stats have not been updated within the required hot freshness', async () => { @@ -548,9 +526,6 @@ describe('healthRoute', () => { workload: { timestamp: expect.any(String), }, - ephemeral: { - timestamp: expect.any(String), - }, runtime: { timestamp: expect.any(String), value: { @@ -653,9 +628,6 @@ describe('healthRoute', () => { workload: { timestamp: expect.any(String), }, - ephemeral: { - timestamp: expect.any(String), - }, runtime: { timestamp: expect.any(String), value: { @@ -737,9 +709,6 @@ describe('healthRoute', () => { workload: { timestamp: expect.any(String), }, - ephemeral: { - timestamp: expect.any(String), - }, runtime: { timestamp: expect.any(String), value: { @@ -952,15 +921,6 @@ function mockHealthStats(overrides = {}) { }, }, }, - ephemeral: { - timestamp: new Date().toISOString(), - value: { - load: [], - executionsPerCycle: [], - queuedTasks: [], - delay: [], - }, - }, }, }; return merge(stub, overrides) as unknown as MonitoringStats; diff --git a/x-pack/plugins/task_manager/server/task.ts b/x-pack/plugins/task_manager/server/task.ts index bbe2935bdfc6..a6f5ae05e781 100644 --- a/x-pack/plugins/task_manager/server/task.ts +++ b/x-pack/plugins/task_manager/server/task.ts @@ -471,16 +471,6 @@ export interface ConcreteTaskInstanceVersion { error?: string; } -/** - * A task instance that has an id and is ready for storage. - */ -export type EphemeralTask = Pick< - ConcreteTaskInstance, - 'taskType' | 'params' | 'state' | 'scope' | 'enabled' ->; -export type EphemeralTaskInstance = EphemeralTask & - Pick; - export type SerializedConcreteTaskInstance = Omit< ConcreteTaskInstance, 'state' | 'params' | 'scheduledAt' | 'startedAt' | 'retryAt' | 'runAt' diff --git a/x-pack/plugins/task_manager/server/task_events.ts b/x-pack/plugins/task_manager/server/task_events.ts index c0ae11528f84..f3df6c230e8e 100644 --- a/x-pack/plugins/task_manager/server/task_events.ts +++ b/x-pack/plugins/task_manager/server/task_events.ts @@ -13,14 +13,12 @@ import { Result, Err } from './lib/result_type'; import { ClaimAndFillPoolResult } from './lib/fill_pool'; import { PollingError } from './polling'; import { DecoratedError, TaskRunResult } from './task_running'; -import { EphemeralTaskInstanceRequest } from './ephemeral_task_lifecycle'; import type { EventLoopDelayConfig } from './config'; import { TaskManagerMetrics } from './metrics/task_metrics_collector'; export enum TaskPersistence { Recurring = 'recurring', NonRecurring = 'non_recurring', - Ephemeral = 'ephemeral', } export enum TaskEventType { @@ -31,7 +29,6 @@ export enum TaskEventType { TASK_POLLING_CYCLE = 'TASK_POLLING_CYCLE', TASK_MANAGER_METRIC = 'TASK_MANAGER_METRIC', TASK_MANAGER_STAT = 'TASK_MANAGER_STAT', - EPHEMERAL_TASK_DELAYED_DUE_TO_CAPACITY = 'EPHEMERAL_TASK_DELAYED_DUE_TO_CAPACITY', } export interface TaskTiming { @@ -82,7 +79,6 @@ export type TaskMarkRunning = TaskEvent; export type TaskRun = TaskEvent; export type TaskClaim = TaskEvent; export type TaskRunRequest = TaskEvent; -export type EphemeralTaskRejectedDueToCapacity = TaskEvent; export type TaskPollingCycle = TaskEvent>; export type TaskManagerMetric = TaskEvent; @@ -90,8 +86,6 @@ export type TaskManagerStats = | 'load' | 'pollingDelay' | 'claimDuration' - | 'queuedEphemeralTasks' - | 'ephemeralTaskDelay' | 'workerUtilization' | 'runDelay'; export type TaskManagerStat = TaskEvent; @@ -187,19 +181,6 @@ export function asTaskManagerMetricEvent( }; } -export function asEphemeralTaskRejectedDueToCapacityEvent( - id: string, - event: Result, - timing?: TaskTiming -): EphemeralTaskRejectedDueToCapacity { - return { - id, - type: TaskEventType.EPHEMERAL_TASK_DELAYED_DUE_TO_CAPACITY, - event, - timing, - }; -} - export function isTaskMarkRunningEvent( taskEvent: TaskEvent ): taskEvent is TaskMarkRunning { @@ -236,8 +217,3 @@ export function isTaskManagerMetricEvent( ): taskEvent is TaskManagerStat { return taskEvent.type === TaskEventType.TASK_MANAGER_METRIC; } -export function isEphemeralTaskRejectedDueToCapacityEvent( - taskEvent: TaskEvent -): taskEvent is EphemeralTaskRejectedDueToCapacity { - return taskEvent.type === TaskEventType.EPHEMERAL_TASK_DELAYED_DUE_TO_CAPACITY; -} diff --git a/x-pack/plugins/task_manager/server/task_running/ephemeral_task_runner.ts b/x-pack/plugins/task_manager/server/task_running/ephemeral_task_runner.ts deleted file mode 100644 index 365169b04889..000000000000 --- a/x-pack/plugins/task_manager/server/task_running/ephemeral_task_runner.ts +++ /dev/null @@ -1,396 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -/* - * This module contains the core logic for running an individual task. - * It handles the full lifecycle of a task run, including error handling, - * rescheduling, middleware application, etc. - */ - -import apm from 'elastic-apm-node'; -import { v4 as uuidv4 } from 'uuid'; -import { withSpan } from '@kbn/apm-utils'; -import { identity } from 'lodash'; -import { Logger, ExecutionContextStart } from '@kbn/core/server'; - -import { Middleware } from '../lib/middleware'; -import { asOk, asErr, eitherAsync, Result } from '../lib/result_type'; -import { - TaskRun, - TaskMarkRunning, - asTaskRunEvent, - asTaskMarkRunningEvent, - startTaskTimer, - TaskTiming, - TaskPersistence, -} from '../task_events'; -import { intervalFromDate } from '../lib/intervals'; -import { - CancellableTask, - ConcreteTaskInstance, - isFailedRunResult, - SuccessfulRunResult, - FailedRunResult, - TaskStatus, - EphemeralTaskInstance, -} from '../task'; -import { TaskTypeDictionary } from '../task_type_dictionary'; -import { - asPending, - asReadyToRun, - EMPTY_RUN_RESULT, - isPending, - isReadyToRun, - TaskRunner, - TaskRunningInstance, - TaskRunResult, - TASK_MANAGER_RUN_TRANSACTION_TYPE, - TASK_MANAGER_TRANSACTION_TYPE, - TASK_MANAGER_TRANSACTION_TYPE_MARK_AS_RUNNING, -} from './task_runner'; - -type Opts = { - logger: Logger; - definitions: TaskTypeDictionary; - instance: EphemeralTaskInstance; - onTaskEvent?: (event: TaskRun | TaskMarkRunning) => void; - executionContext: ExecutionContextStart; -} & Pick; - -// ephemeral tasks cannot be rescheduled or scheduled to run again in the future -type EphemeralSuccessfulRunResult = Omit; -type EphemeralFailedRunResult = Omit; - -/** - * - * @export - * @class EphemeralTaskManagerRunner - * @implements {TaskRunner} - */ -export class EphemeralTaskManagerRunner implements TaskRunner { - private task?: CancellableTask; - private instance: TaskRunningInstance; - private definitions: TaskTypeDictionary; - private logger: Logger; - private beforeRun: Middleware['beforeRun']; - private beforeMarkRunning: Middleware['beforeMarkRunning']; - private onTaskEvent: (event: TaskRun | TaskMarkRunning) => void; - private uuid: string; - private readonly executionContext: ExecutionContextStart; - - /** - * Creates an instance of EphemeralTaskManagerRunner. - * @param {Opts} opts - * @prop {Logger} logger - The task manager logger - * @prop {TaskDefinition} definition - The definition of the task being run - * @prop {EphemeralTaskInstance} instance - The record describing this particular task instance - * @prop {BeforeRunFunction} beforeRun - A function that adjusts the run context prior to running the task - * @memberof TaskManagerRunner - */ - constructor({ - instance, - definitions, - logger, - beforeRun, - beforeMarkRunning, - onTaskEvent = identity, - executionContext, - }: Opts) { - this.instance = asPending(asConcreteInstance(sanitizeInstance(instance))); - this.definitions = definitions; - this.logger = logger; - this.beforeRun = beforeRun; - this.beforeMarkRunning = beforeMarkRunning; - this.onTaskEvent = onTaskEvent; - this.executionContext = executionContext; - this.uuid = uuidv4(); - } - - /** - * Gets the id of this task instance. - */ - public get id() { - return this.instance.task.id; - } - - /** - * Gets the exeuction id of this task instance. - */ - public get taskExecutionId() { - return `${this.id}::${this.uuid}`; - } - - /** - * Test whether given execution ID identifies a different execution of this same task - * @param id - */ - public isSameTask(executionId: string) { - return executionId.startsWith(this.id); - } - - /** - * Gets the task type of this task instance. - */ - public get taskType() { - return this.instance.task.taskType; - } - - /** - * Get the stage this TaskRunner is at - */ - public get stage() { - return this.instance.stage; - } - - /** - * Gets the task defintion from the dictionary. - */ - public get definition() { - return this.definitions.get(this.taskType); - } - - /** - * Gets the time at which this task will expire. - */ - public get expiration() { - return intervalFromDate( - // if the task is running, use it's started at, otherwise use the timestamp at - // which it was last updated - // this allows us to catch tasks that remain in Pending/Finalizing without being - // cleaned up - isReadyToRun(this.instance) ? this.instance.task.startedAt : this.instance.timestamp, - this.definition?.timeout - )!; - } - - /** - * Gets the duration of the current task run - */ - public get startedAt() { - return this.instance.task.startedAt; - } - - /** - * Gets whether or not this task has run longer than its expiration setting allows. - */ - public get isExpired() { - return this.expiration < new Date(); - } - - /** - * Returns true whenever the task is ad hoc and has ran out of attempts. When true before - * running a task, the task should be deleted instead of ran. - */ - public get isAdHocTaskAndOutOfAttempts() { - return false; - } - - public get isEphemeral() { - return true; - } - - /** - * Returns a log-friendly representation of this task. - */ - public toString() { - return `${this.taskType} "${this.id}" (Ephemeral)`; - } - - /** - * Runs the task, handling the task result, errors, etc, rescheduling if need - * be. NOTE: the time of applying the middleware's beforeRun is incorporated - * into the total timeout time the task in configured with. We may decide to - * start the timer after beforeRun resolves - * - * @returns {Promise>} - */ - public async run(): Promise> { - const definition = this.definition; - - if (!definition) { - throw new Error(`Running ephemeral task ${this} failed because it has no definition`); - } - - if (!isReadyToRun(this.instance)) { - throw new Error( - `Running ephemeral task ${this} failed as it ${ - isPending(this.instance) ? `isn't ready to be ran` : `has already been ran` - }` - ); - } - this.logger.debug(`Running ephemeral task ${this}`); - const apmTrans = apm.startTransaction(this.taskType, TASK_MANAGER_RUN_TRANSACTION_TYPE, { - childOf: this.instance.task.traceparent, - }); - apmTrans?.addLabels({ ephemeral: true }); - - const modifiedContext = await this.beforeRun({ - taskInstance: asConcreteInstance(this.instance.task), - }); - const stopTaskTimer = startTaskTimer(); - try { - this.task = definition.createTaskRunner(modifiedContext); - const ctx = { - type: 'task manager', - name: `run ephemeral ${this.instance.task.taskType}`, - id: this.instance.task.id, - description: 'run ephemeral task', - }; - const result = await this.executionContext.withContext(ctx, () => - withSpan({ name: 'ephemeral run', type: 'task manager' }, () => this.task!.run()) - ); - const validatedResult = this.validateResult(result); - const processedResult = await withSpan( - { name: 'process ephemeral result', type: 'task manager' }, - () => this.processResult(validatedResult, stopTaskTimer()) - ); - if (apmTrans) apmTrans.end('success'); - return processedResult; - } catch (err) { - this.logger.error(`Task ${this} failed: ${err}`); - // in error scenario, we can not get the RunResult - const processedResult = await withSpan( - { name: 'process ephemeral result', type: 'task manager' }, - () => - this.processResult( - asErr({ error: err, state: modifiedContext.taskInstance.state }), - stopTaskTimer() - ) - ); - if (apmTrans) apmTrans.end('failure'); - return processedResult; - } - } - - /** - * Used by the non-ephemeral task runner - */ - public async removeTask(): Promise {} - - /** - * Noop for Ephemeral tasks - * - * @returns {Promise} - */ - public async markTaskAsRunning(): Promise { - if (!isPending(this.instance)) { - throw new Error( - `Marking ephemeral task ${this} as running has failed as it ${ - isReadyToRun(this.instance) ? `is already running` : `has already been ran` - }` - ); - } - - const apmTrans = apm.startTransaction( - TASK_MANAGER_TRANSACTION_TYPE_MARK_AS_RUNNING, - TASK_MANAGER_TRANSACTION_TYPE - ); - apmTrans?.addLabels({ entityId: this.taskType }); - - const now = new Date(); - try { - const { taskInstance } = await this.beforeMarkRunning({ - taskInstance: asConcreteInstance(this.instance.task), - }); - - this.instance = asReadyToRun({ - ...taskInstance, - status: TaskStatus.Running, - startedAt: now, - attempts: taskInstance.attempts + 1, - retryAt: null, - }); - - if (apmTrans) apmTrans.end('success'); - this.onTaskEvent(asTaskMarkRunningEvent(this.id, asOk(this.instance.task))); - return true; - } catch (error) { - if (apmTrans) apmTrans.end('failure'); - this.onTaskEvent(asTaskMarkRunningEvent(this.id, asErr(error))); - } - return false; - } - - /** - * Attempts to cancel the task. - * - * @returns {Promise} - */ - public async cancel() { - const { task } = this; - if (task?.cancel) { - // it will cause the task state of "running" to be cleared - this.task = undefined; - return task.cancel(); - } - - this.logger.debug(`The ephemral task ${this} is not cancellable.`); - } - - private validateResult( - result?: SuccessfulRunResult | FailedRunResult | void - ): Result { - return isFailedRunResult(result) - ? asErr({ ...result, error: result.error }) - : asOk(result || EMPTY_RUN_RESULT); - } - - private async processResult( - result: Result, - taskTiming: TaskTiming - ): Promise> { - await eitherAsync( - result, - async ({ state }: EphemeralSuccessfulRunResult) => { - this.onTaskEvent( - asTaskRunEvent( - this.id, - asOk({ - task: { ...this.instance.task, state }, - persistence: TaskPersistence.Ephemeral, - result: TaskRunResult.Success, - isExpired: false, - }), - taskTiming - ) - ); - }, - async ({ error, state }: EphemeralFailedRunResult) => { - this.onTaskEvent( - asTaskRunEvent( - this.id, - asErr({ - task: { ...this.instance.task, state }, - persistence: TaskPersistence.Ephemeral, - result: TaskRunResult.Failed, - isExpired: false, - error, - }), - taskTiming - ) - ); - } - ); - return result; - } -} - -function sanitizeInstance(instance: EphemeralTaskInstance): EphemeralTaskInstance { - return { - ...instance, - params: instance.params || {}, - state: instance.state || {}, - }; -} - -function asConcreteInstance(instance: EphemeralTaskInstance): ConcreteTaskInstance { - return { - ...instance, - attempts: 0, - retryAt: null, - }; -} diff --git a/x-pack/plugins/task_manager/server/task_running/errors.ts b/x-pack/plugins/task_manager/server/task_running/errors.ts index e7063a92556e..e30c99c68c60 100644 --- a/x-pack/plugins/task_manager/server/task_running/errors.ts +++ b/x-pack/plugins/task_manager/server/task_running/errors.ts @@ -5,7 +5,6 @@ * 2.0. */ import { TaskErrorSource } from '../../common'; -import { EphemeralTask } from '../task'; export { TaskErrorSource }; @@ -23,19 +22,6 @@ export interface DecoratedError extends Error { [source]?: TaskErrorSource; } -export class EphemeralTaskRejectedDueToCapacityError extends Error { - private _task: EphemeralTask; - - constructor(message: string, task: EphemeralTask) { - super(message); - this._task = task; - } - - public get task() { - return this._task; - } -} - function isTaskManagerError(error: unknown): error is DecoratedError { return Boolean(error && (error as DecoratedError)[code]); } @@ -87,9 +73,3 @@ export function getErrorSource(error: Error | DecoratedError): TaskErrorSource | export function isUserError(error: Error | DecoratedError) { return getErrorSource(error) === TaskErrorSource.USER; } - -export function isEphemeralTaskRejectedDueToCapacityError( - error: Error | EphemeralTaskRejectedDueToCapacityError -) { - return Boolean(error && error instanceof EphemeralTaskRejectedDueToCapacityError); -} diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.ts index 9f9dadbc27c9..4077190258e4 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.ts @@ -79,7 +79,6 @@ export interface TaskRunner { id: string; taskExecutionId: string; stage: string; - isEphemeral?: boolean; toString: () => string; isSameTask: (executionId: string) => boolean; isAdHocTaskAndOutOfAttempts: boolean; diff --git a/x-pack/plugins/task_manager/server/task_scheduling.mock.ts b/x-pack/plugins/task_manager/server/task_scheduling.mock.ts index 64dd923d6712..1981031b553c 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.mock.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.mock.ts @@ -14,7 +14,6 @@ const createTaskSchedulingMock = () => { ensureScheduled: jest.fn(), schedule: jest.fn(), runSoon: jest.fn(), - ephemeralRunNow: jest.fn(), } as unknown as jest.Mocked; }; diff --git a/x-pack/plugins/task_manager/server/task_scheduling.test.ts b/x-pack/plugins/task_manager/server/task_scheduling.test.ts index df5f93a50ec8..29bffbded5ec 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.test.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.test.ts @@ -6,20 +6,15 @@ */ import sinon from 'sinon'; -import { BehaviorSubject, Observable, Subject } from 'rxjs'; import moment from 'moment'; -import { asTaskRunEvent, TaskPersistence } from './task_events'; -import { TaskLifecycleEvent } from './polling_lifecycle'; import { TaskScheduling } from './task_scheduling'; -import { asErr, asOk } from './lib/result_type'; +import { asOk } from './lib/result_type'; import { TaskStatus } from './task'; import { createInitialMiddleware } from './lib/middleware'; import { taskStoreMock } from './task_store.mock'; -import { TaskRunResult } from './task_running'; import { mockLogger } from './test_utils'; import { TaskTypeDictionary } from './task_type_dictionary'; -import { ephemeralTaskLifecycleMock } from './ephemeral_task_lifecycle.mock'; import { taskManagerMock } from './mocks'; import { omit } from 'lodash'; @@ -52,7 +47,6 @@ describe('TaskScheduling', () => { logger: mockLogger(), middleware: createInitialMiddleware(), definitions, - ephemeralTaskLifecycle: ephemeralTaskLifecycleMock.create({}), taskManagerId: '123', }; @@ -835,122 +829,6 @@ describe('TaskScheduling', () => { }); }); - describe('ephemeralRunNow', () => { - test('runs a task ephemerally', async () => { - const ephemeralEvents$ = new BehaviorSubject>({}); - const ephemeralTask = taskManagerMock.createTask({ - state: { - foo: 'bar', - }, - }); - const customEphemeralTaskLifecycleMock = ephemeralTaskLifecycleMock.create({ - events$: ephemeralEvents$ as Observable, - }); - - customEphemeralTaskLifecycleMock.attemptToRun.mockImplementation((value) => { - return { - tag: 'ok', - value, - }; - }); - - const middleware = createInitialMiddleware(); - middleware.beforeSave = jest.fn().mockImplementation(async () => { - return { taskInstance: ephemeralTask }; - }); - const taskScheduling = new TaskScheduling({ - ...taskSchedulingOpts, - middleware, - ephemeralTaskLifecycle: customEphemeralTaskLifecycleMock, - }); - - const result = taskScheduling.ephemeralRunNow(ephemeralTask); - ephemeralEvents$.next( - asTaskRunEvent( - 'v4uuid', - asOk({ - task: { - ...ephemeralTask, - id: 'v4uuid', - }, - result: TaskRunResult.Success, - persistence: TaskPersistence.Ephemeral, - isExpired: false, - }) - ) - ); - await expect(result).resolves.toEqual({ id: 'v4uuid', state: { foo: 'bar' } }); - }); - - test('rejects ephemeral task if lifecycle returns an error', async () => { - const ephemeralEvents$ = new Subject(); - const ephemeralTask = taskManagerMock.createTask({ - state: { - foo: 'bar', - }, - }); - const customEphemeralTaskLifecycleMock = ephemeralTaskLifecycleMock.create({ - events$: ephemeralEvents$, - }); - - customEphemeralTaskLifecycleMock.attemptToRun.mockImplementation((value) => { - return asErr(value); - }); - - const middleware = createInitialMiddleware(); - middleware.beforeSave = jest.fn().mockImplementation(async () => { - return { taskInstance: ephemeralTask }; - }); - const taskScheduling = new TaskScheduling({ - ...taskSchedulingOpts, - middleware, - ephemeralTaskLifecycle: customEphemeralTaskLifecycleMock, - }); - - const result = taskScheduling.ephemeralRunNow(ephemeralTask); - ephemeralEvents$.next( - asTaskRunEvent( - 'v4uuid', - asOk({ - task: { - ...ephemeralTask, - id: 'v4uuid', - }, - result: TaskRunResult.Failed, - persistence: TaskPersistence.Ephemeral, - isExpired: false, - }) - ) - ); - - await expect(result).rejects.toMatchInlineSnapshot( - `[Error: Ephemeral Task of type foo was rejected]` - ); - }); - - test('rejects ephemeral task if ephemeralTaskLifecycle is not defined', async () => { - const ephemeralTask = taskManagerMock.createTask({ - state: { - foo: 'bar', - }, - }); - const middleware = createInitialMiddleware(); - middleware.beforeSave = jest.fn().mockImplementation(async () => { - return { taskInstance: ephemeralTask }; - }); - const taskScheduling = new TaskScheduling({ - ...taskSchedulingOpts, - middleware, - ephemeralTaskLifecycle: undefined, - }); - - const result = taskScheduling.ephemeralRunNow(ephemeralTask); - await expect(result).rejects.toMatchInlineSnapshot( - `[Error: Ephemeral Task of type foo was rejected because ephemeral tasks are not supported]` - ); - }); - }); - describe('bulkSchedule', () => { test('allows scheduling tasks', async () => { const taskScheduling = new TaskScheduling(taskSchedulingOpts); diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index 1a66cddfa637..6972e8eeabf5 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -5,29 +5,14 @@ * 2.0. */ -import { filter, take } from 'rxjs'; import pMap from 'p-map'; - -import { v4 as uuidv4 } from 'uuid'; -import { chunk, flatten, pick } from 'lodash'; -import { Subject } from 'rxjs'; +import { chunk, flatten } from 'lodash'; import agent from 'elastic-apm-node'; import { Logger } from '@kbn/core/server'; -import { either, isErr, mapErr } from './lib/result_type'; -import { - ErroredTask, - ErrResultOf, - isTaskClaimEvent, - isTaskRunEvent, - isTaskRunRequestEvent, - OkResultOf, - RanTask, -} from './task_events'; import { Middleware } from './lib/middleware'; import { parseIntervalAsMillisecond } from './lib/intervals'; import { ConcreteTaskInstance, - EphemeralTask, IntervalSchedule, TaskInstanceWithDeprecatedFields, TaskInstanceWithId, @@ -35,9 +20,6 @@ import { } from './task'; import { TaskStore } from './task_store'; import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields'; -import { TaskLifecycleEvent } from './polling_lifecycle'; -import { EphemeralTaskLifecycle } from './ephemeral_task_lifecycle'; -import { EphemeralTaskRejectedDueToCapacityError } from './task_running'; import { retryableBulkUpdate } from './lib/retryable_bulk_update'; import { ErrorOutput } from './lib/bulk_operation_buffer'; @@ -46,7 +28,6 @@ const BULK_ACTION_SIZE = 100; export interface TaskSchedulingOpts { logger: Logger; taskStore: TaskStore; - ephemeralTaskLifecycle?: EphemeralTaskLifecycle; middleware: Middleware; taskManagerId: string; } @@ -76,10 +57,8 @@ export interface RunNowResult { export class TaskScheduling { private store: TaskStore; - private ephemeralTaskLifecycle?: EphemeralTaskLifecycle; private logger: Logger; private middleware: Middleware; - private taskManagerId: string; /** * Initializes the task manager, preventing any further addition of middleware, @@ -89,9 +68,7 @@ export class TaskScheduling { constructor(opts: TaskSchedulingOpts) { this.logger = opts.logger; this.middleware = opts.middleware; - this.ephemeralTaskLifecycle = opts.ephemeralTaskLifecycle; this.store = opts.taskStore; - this.taskManagerId = opts.taskManagerId; } /** @@ -284,68 +261,6 @@ export class TaskScheduling { return { id: task.id }; } - /** - * Run an ad-hoc task in memory without persisting it into ES or distributing the load across the cluster. - * - * @param task - The ephemeral task being queued. - * @returns {Promise} - */ - public async ephemeralRunNow( - task: EphemeralTask, - options?: Record - ): Promise { - if (!this.ephemeralTaskLifecycle) { - throw new EphemeralTaskRejectedDueToCapacityError( - `Ephemeral Task of type ${task.taskType} was rejected because ephemeral tasks are not supported`, - task - ); - } - const id = uuidv4(); - const { taskInstance: modifiedTask } = await this.middleware.beforeSave({ - ...options, - taskInstance: task, - }); - return new Promise(async (resolve, reject) => { - try { - // The actual promise returned from this function is resolved after the awaitTaskRunResult promise resolves. - // However, we do not wait to await this promise, as we want later execution to happen in parallel. - // The awaitTaskRunResult promise is resolved once the ephemeral task is successfully executed (technically, when a TaskEventType.TASK_RUN is emitted with the same id). - // However, the ephemeral task won't even get into the queue until the subsequent this.ephemeralTaskLifecycle.attemptToRun is called (which puts it in the queue). - // The reason for all this confusion? Timing. - // In the this.ephemeralTaskLifecycle.attemptToRun, it's possible that the ephemeral task is put into the queue and processed before this function call returns anything. - // If that happens, putting the awaitTaskRunResult after would just hang because the task already completed. We need to listen for the completion before we add it to the queue to avoid this possibility. - const { cancel, resolveOnCancel } = cancellablePromise(); - this.awaitTaskRunResult(id, resolveOnCancel) - .then((arg: RunNowResult) => { - resolve(arg); - }) - .catch((err: Error) => { - reject(err); - }); - const attemptToRunResult = this.ephemeralTaskLifecycle!.attemptToRun({ - id, - scheduledAt: new Date(), - runAt: new Date(), - status: TaskStatus.Idle, - ownerId: this.taskManagerId, - ...modifiedTask, - }); - - if (isErr(attemptToRunResult)) { - cancel(); - reject( - new EphemeralTaskRejectedDueToCapacityError( - `Ephemeral Task of type ${task.taskType} was rejected`, - task - ) - ); - } - } catch (error) { - reject(error); - } - }); - } - /** * Schedules a task with an Id * @@ -366,63 +281,6 @@ export class TaskScheduling { } } - private awaitTaskRunResult(taskId: string, cancel?: Promise): Promise { - return new Promise((resolve, reject) => { - if (!this.ephemeralTaskLifecycle) { - reject( - new Error( - `Failed to run task "${taskId}" because ephemeral tasks are not supported. Rescheduled the task to ensure it is picked up as soon as possible.` - ) - ); - } - // listen for all events related to the current task - const subscription = this.ephemeralTaskLifecycle!.events.pipe( - filter(({ id }: TaskLifecycleEvent) => id === taskId) - ).subscribe((taskEvent: TaskLifecycleEvent) => { - if (isTaskClaimEvent(taskEvent)) { - mapErr(async (error: Error) => { - // reject if any error event takes place for the requested task - subscription.unsubscribe(); - }, taskEvent.event); - } else { - either, ErrResultOf>( - taskEvent.event, - (taskInstance: OkResultOf) => { - // resolve if the task has run sucessfully - if (isTaskRunEvent(taskEvent)) { - resolve(pick((taskInstance as RanTask).task, ['id', 'state'])); - subscription.unsubscribe(); - } - }, - async (errorResult: ErrResultOf) => { - // reject if any error event takes place for the requested task - subscription.unsubscribe(); - return reject( - new Error( - `Failed to run task "${taskId}": ${ - isTaskRunRequestEvent(taskEvent) - ? `Task Manager is at capacity, please try again later` - : isTaskRunEvent(taskEvent) - ? `${(errorResult as ErroredTask).error}` - : `${errorResult}` - }` - ) - ); - } - ); - } - }); - - if (cancel) { - cancel - .then(() => { - subscription.unsubscribe(); - }) - .catch(() => {}); - } - }); - } - private async getNonRunningTask(taskId: string) { const task = await this.store.get(taskId); switch (task.status) { @@ -438,17 +296,6 @@ export class TaskScheduling { } } -const cancellablePromise = () => { - const boolStream = new Subject(); - return { - cancel: () => boolStream.next(true), - resolveOnCancel: boolStream - .pipe(take(1)) - .toPromise() - .then(() => {}), - }; -}; - const randomlyOffsetRunTimestamp: (task: ConcreteTaskInstance) => ConcreteTaskInstance = (task) => { const now = Date.now(); const maximumOffsetTimestamp = now + 1000 * 60 * 5; // now + 5 minutes diff --git a/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.test.ts b/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.test.ts index db511677439b..baa26638f8d3 100644 --- a/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.test.ts +++ b/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.test.ts @@ -26,43 +26,6 @@ describe('registerTaskManagerUsageCollector', () => { let collector: Collector; const logger = loggingSystemMock.createLogger(); - it('should report telemetry on the ephemeral queue', async () => { - const monitoringStats$ = new Subject(); - const monitoringUtilization$ = new Subject(); - const usageCollectionMock = createUsageCollectionSetupMock(); - const fetchContext = createCollectorFetchContextMock(); - usageCollectionMock.makeUsageCollector.mockImplementation((config) => { - collector = new Collector(logger, config); - return createUsageCollectionSetupMock().makeUsageCollector(config); - }); - - registerTaskManagerUsageCollector( - usageCollectionMock, - monitoringStats$, - monitoringUtilization$, - true, - 10, - [] - ); - - const mockHealth = getMockMonitoredHealth(); - monitoringStats$.next(mockHealth); - const mockUtilization = getMockMonitoredUtilization(); - monitoringUtilization$.next(mockUtilization); - await sleep(1001); - - expect(usageCollectionMock.makeUsageCollector).toBeCalled(); - const telemetry: TaskManagerUsage = (await collector.fetch(fetchContext)) as TaskManagerUsage; - expect(telemetry.ephemeral_tasks_enabled).toBe(true); - expect(telemetry.ephemeral_request_capacity).toBe(10); - expect(telemetry.ephemeral_stats).toMatchObject({ - status: mockHealth.stats.ephemeral?.status, - load: mockHealth.stats.ephemeral?.value.load, - executions_per_cycle: mockHealth.stats.ephemeral?.value.executionsPerCycle, - queued_tasks: mockHealth.stats.ephemeral?.value.queuedTasks, - }); - }); - it('should report telemetry on the excluded task types', async () => { const monitoringStats$ = new Subject(); const monitoringUtilization$ = new Subject(); @@ -77,8 +40,6 @@ describe('registerTaskManagerUsageCollector', () => { usageCollectionMock, monitoringStats$, monitoringUtilization$, - true, - 10, ['actions:*'] ); @@ -107,8 +68,6 @@ describe('registerTaskManagerUsageCollector', () => { usageCollectionMock, monitoringStats$, monitoringUtilization$, - true, - 10, ['actions:*'] ); @@ -146,8 +105,6 @@ describe('registerTaskManagerUsageCollector', () => { usageCollectionMock, monitoringStats$, monitoringUtilization$, - true, - 10, ['actions:*'] ); @@ -216,30 +173,6 @@ function getMockMonitoredHealth(overrides = {}): MonitoredHealth { }, }, }, - ephemeral: { - status: HealthStatus.OK, - timestamp: new Date().toISOString(), - value: { - load: { - p50: 4, - p90: 6, - p95: 6, - p99: 6, - }, - executionsPerCycle: { - p50: 4, - p90: 6, - p95: 6, - p99: 6, - }, - queuedTasks: { - p50: 4, - p90: 6, - p95: 6, - p99: 6, - }, - }, - }, runtime: { timestamp: new Date().toISOString(), status: HealthStatus.OK, @@ -263,7 +196,6 @@ function getMockMonitoredHealth(overrides = {}): MonitoredHealth { persistence: { [TaskPersistence.Recurring]: 10, [TaskPersistence.NonRecurring]: 10, - [TaskPersistence.Ephemeral]: 10, }, result_frequency_percent_as_number: {}, }, diff --git a/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.ts b/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.ts index 6c8809c5c3d9..56594be045f9 100644 --- a/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.ts +++ b/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.ts @@ -16,8 +16,6 @@ export function createTaskManagerUsageCollector( usageCollection: UsageCollectionSetup, monitoringStats$: Observable, monitoredUtilization$: Observable, - ephemeralTasksEnabled: boolean, - ephemeralRequestCapacity: number, excludeTaskTypes: string[] ) { let lastMonitoredHealth: MonitoredHealth | null = null; @@ -37,29 +35,6 @@ export function createTaskManagerUsageCollector( }, fetch: async () => { return { - ephemeral_tasks_enabled: ephemeralTasksEnabled, - ephemeral_request_capacity: ephemeralRequestCapacity, - ephemeral_stats: { - status: lastMonitoredHealth?.stats.ephemeral?.status ?? '', - queued_tasks: { - p50: lastMonitoredHealth?.stats.ephemeral?.value.queuedTasks.p50 ?? 0, - p90: lastMonitoredHealth?.stats.ephemeral?.value.queuedTasks.p90 ?? 0, - p95: lastMonitoredHealth?.stats.ephemeral?.value.queuedTasks.p95 ?? 0, - p99: lastMonitoredHealth?.stats.ephemeral?.value.queuedTasks.p99 ?? 0, - }, - load: { - p50: lastMonitoredHealth?.stats.ephemeral?.value.load.p50 ?? 0, - p90: lastMonitoredHealth?.stats.ephemeral?.value.load.p90 ?? 0, - p95: lastMonitoredHealth?.stats.ephemeral?.value.load.p95 ?? 0, - p99: lastMonitoredHealth?.stats.ephemeral?.value.load.p99 ?? 0, - }, - executions_per_cycle: { - p50: lastMonitoredHealth?.stats.ephemeral?.value.executionsPerCycle.p50 ?? 0, - p90: lastMonitoredHealth?.stats.ephemeral?.value.executionsPerCycle.p90 ?? 0, - p95: lastMonitoredHealth?.stats.ephemeral?.value.executionsPerCycle.p95 ?? 0, - p99: lastMonitoredHealth?.stats.ephemeral?.value.executionsPerCycle.p99 ?? 0, - }, - }, task_type_exclusion: excludeTaskTypes, failed_tasks: Object.entries(lastMonitoredHealth?.stats.workload?.value.task_types!).reduce( (numb, [key, val]) => { @@ -88,29 +63,6 @@ export function createTaskManagerUsageCollector( }; }, schema: { - ephemeral_tasks_enabled: { type: 'boolean' }, - ephemeral_request_capacity: { type: 'short' }, - ephemeral_stats: { - status: { type: 'keyword' }, - queued_tasks: { - p50: { type: 'long' }, - p90: { type: 'long' }, - p95: { type: 'long' }, - p99: { type: 'long' }, - }, - load: { - p50: { type: 'long' }, - p90: { type: 'long' }, - p95: { type: 'long' }, - p99: { type: 'long' }, - }, - executions_per_cycle: { - p50: { type: 'long' }, - p90: { type: 'long' }, - p95: { type: 'long' }, - p99: { type: 'long' }, - }, - }, task_type_exclusion: { type: 'array', items: { type: 'keyword' } }, failed_tasks: { type: 'long' }, recurring_tasks: { @@ -130,16 +82,12 @@ export function registerTaskManagerUsageCollector( usageCollection: UsageCollectionSetup, monitoringStats$: Observable, monitoredUtilization$: Observable, - ephemeralTasksEnabled: boolean, - ephemeralRequestCapacity: number, excludeTaskTypes: string[] ) { const collector = createTaskManagerUsageCollector( usageCollection, monitoringStats$, monitoredUtilization$, - ephemeralTasksEnabled, - ephemeralRequestCapacity, excludeTaskTypes ); usageCollection.registerCollector(collector); diff --git a/x-pack/plugins/task_manager/server/usage/types.ts b/x-pack/plugins/task_manager/server/usage/types.ts index 0e98d1d0685a..97e990599be8 100644 --- a/x-pack/plugins/task_manager/server/usage/types.ts +++ b/x-pack/plugins/task_manager/server/usage/types.ts @@ -7,29 +7,6 @@ export interface TaskManagerUsage { task_type_exclusion: string[]; - ephemeral_tasks_enabled: boolean; - ephemeral_request_capacity: number; - ephemeral_stats: { - status: string; - queued_tasks: { - p50: number; - p90: number; - p95: number; - p99: number; - }; - load: { - p50: number; - p90: number; - p95: number; - p99: number; - }; - executions_per_cycle: { - p50: number; - p90: number; - p95: number; - p99: number; - }; - }; failed_tasks: number; recurring_tasks: { actual_service_time: number; diff --git a/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts b/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts index c5927d894911..89a75eba663a 100644 --- a/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts +++ b/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts @@ -216,45 +216,6 @@ export function initRoutes( } ); - router.post( - { - path: `/api/sample_tasks/ephemeral_run_now`, - validate: { - body: schema.object({ - task: schema.object({ - taskType: schema.string(), - state: schema.recordOf(schema.string(), schema.any()), - params: schema.recordOf(schema.string(), schema.any()), - }), - }), - }, - }, - async function ( - context: RequestHandlerContext, - req: KibanaRequest< - any, - any, - { - task: { - taskType: string; - params: Record; - state: Record; - }; - }, - any - >, - res: KibanaResponseFactory - ): Promise> { - const { task } = req.body; - try { - const taskManager = await taskManagerStart; - return res.ok({ body: await taskManager.ephemeralRunNow(task) }); - } catch (err) { - return res.ok({ body: { task, error: `${err}` } }); - } - } - ); - router.post( { path: `/api/sample_tasks/ensure_scheduled`, diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts index a6bf7e7e9d5f..37f0587e984a 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts @@ -30,7 +30,6 @@ export default function ({ getService }: FtrProviderContext) { 'sampleTaskWithLimitedConcurrency', 'sampleTaskWithSingleConcurrency', 'singleAttemptSampleTask', - 'taskWhichExecutesOtherTasksEphemerally', 'timedTask', 'timedTaskWithLimitedConcurrency', 'timedTaskWithSingleConcurrency', diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts index 8aba3d162473..d27e33948758 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts @@ -302,7 +302,6 @@ export default function ({ getService }: FtrProviderContext) { expect(typeof execution.duration.sampleTask.p95).to.eql('number'); expect(typeof execution.duration.sampleTask.p99).to.eql('number'); - expect(typeof execution.persistence.ephemeral).to.eql('number'); expect(typeof execution.persistence.non_recurring).to.eql('number'); expect(typeof execution.persistence.recurring).to.eql('number'); diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts index d291e6cfb710..34ef9c2481bc 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts @@ -197,20 +197,6 @@ export default function ({ getService }: FtrProviderContext) { .then((response: { body: BulkUpdateTaskResult }) => response.body); } - // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139 - // function runEphemeralTaskNow(task: { - // taskType: string; - // params: Record; - // state: Record; - // }) { - // return supertest - // .post('/api/sample_tasks/ephemeral_run_now') - // .set('kbn-xsrf', 'xxx') - // .send({ task }) - // .expect(200) - // .then((response) => response.body); - // } - function scheduleTaskIfNotExists(task: Partial) { return supertest .post('/api/sample_tasks/ensure_scheduled') @@ -919,196 +905,6 @@ export default function ({ getService }: FtrProviderContext) { expect(task.runAt).to.eql(scheduledRunAt); }); }); - - // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139 - // it('should return the resulting task state when asked to run an ephemeral task now', async () => { - // const ephemeralTask = await runEphemeralTaskNow({ - // taskType: 'sampleTask', - // params: {}, - // state: {}, - // }); - - // await retry.try(async () => { - // expect( - // (await historyDocs()).filter((taskDoc) => taskDoc._source.taskId === ephemeralTask.id) - // .length - // ).to.eql(1); - - // expect(ephemeralTask.state.count).to.eql(1); - // }); - - // const secondEphemeralTask = await runEphemeralTaskNow({ - // taskType: 'sampleTask', - // params: {}, - // // pass state from previous ephemeral run as input for the second run - // state: ephemeralTask.state, - // }); - - // // ensure state is cumulative - // expect(secondEphemeralTask.state.count).to.eql(2); - - // await retry.try(async () => { - // // ensure new id is produced for second task execution - // expect( - // (await historyDocs()).filter((taskDoc) => taskDoc._source.taskId === ephemeralTask.id) - // .length - // ).to.eql(1); - // expect( - // (await historyDocs()).filter( - // (taskDoc) => taskDoc._source.taskId === secondEphemeralTask.id - // ).length - // ).to.eql(1); - // }); - // }); - - // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139 - // it('Epheemral task run should only run one instance of a task if its maxConcurrency is 1', async () => { - // const ephemeralTaskWithSingleConcurrency: { - // state: { - // executions: Array<{ - // result: { - // id: string; - // state: { - // timings: Array<{ - // start: number; - // stop: number; - // }>; - // }; - // }; - // }>; - // }; - // } = await runEphemeralTaskNow({ - // taskType: 'taskWhichExecutesOtherTasksEphemerally', - // params: { - // tasks: [ - // { - // taskType: 'timedTaskWithSingleConcurrency', - // params: { delay: 1000 }, - // state: {}, - // }, - // { - // taskType: 'timedTaskWithSingleConcurrency', - // params: { delay: 1000 }, - // state: {}, - // }, - // { - // taskType: 'timedTaskWithSingleConcurrency', - // params: { delay: 1000 }, - // state: {}, - // }, - // { - // taskType: 'timedTaskWithSingleConcurrency', - // params: { delay: 1000 }, - // state: {}, - // }, - // ], - // }, - // state: {}, - // }); - - // ensureOverlappingTasksDontExceedThreshold( - // ephemeralTaskWithSingleConcurrency.state.executions, - // // make sure each task intersects with any other task - // 0 - // ); - // }); - - // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139 - // it('Ephemeral task run should only run as many instances of a task as its maxConcurrency will allow', async () => { - // const ephemeralTaskWithSingleConcurrency: { - // state: { - // executions: Array<{ - // result: { - // id: string; - // state: { - // timings: Array<{ - // start: number; - // stop: number; - // }>; - // }; - // }; - // }>; - // }; - // } = await runEphemeralTaskNow({ - // taskType: 'taskWhichExecutesOtherTasksEphemerally', - // params: { - // tasks: [ - // { - // taskType: 'timedTaskWithLimitedConcurrency', - // params: { delay: 100 }, - // state: {}, - // }, - // { - // taskType: 'timedTaskWithLimitedConcurrency', - // params: { delay: 100 }, - // state: {}, - // }, - // { - // taskType: 'timedTaskWithLimitedConcurrency', - // params: { delay: 100 }, - // state: {}, - // }, - // { - // taskType: 'timedTaskWithLimitedConcurrency', - // params: { delay: 100 }, - // state: {}, - // }, - // { - // taskType: 'timedTaskWithLimitedConcurrency', - // params: { delay: 100 }, - // state: {}, - // }, - // { - // taskType: 'timedTaskWithLimitedConcurrency', - // params: { delay: 100 }, - // state: {}, - // }, - // ], - // }, - // state: {}, - // }); - - // ensureOverlappingTasksDontExceedThreshold( - // ephemeralTaskWithSingleConcurrency.state.executions, - // // make sure each task intersects with, at most, 1 other task - // 1 - // ); - // }); - - // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139 - // it('Ephemeral task executions cant exceed the max workes in Task Manager', async () => { - // const ephemeralTaskWithSingleConcurrency: { - // state: { - // executions: Array<{ - // result: { - // id: string; - // state: { - // timings: Array<{ - // start: number; - // stop: number; - // }>; - // }; - // }; - // }>; - // }; - // } = await runEphemeralTaskNow({ - // taskType: 'taskWhichExecutesOtherTasksEphemerally', - // params: { - // tasks: times(20, () => ({ - // taskType: 'timedTask', - // params: { delay: 100 }, - // state: {}, - // })), - // }, - // state: {}, - // }); - - // ensureOverlappingTasksDontExceedThreshold( - // ephemeralTaskWithSingleConcurrency.state.executions, - // // make sure each task intersects with, at most, 9 other tasks (as max workes is 10) - // 9 - // ); - // }); }); // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139 diff --git a/x-pack/test/task_manager_claimer_update_by_query/config.ts b/x-pack/test/task_manager_claimer_update_by_query/config.ts index ddae0d7b04d3..0d6f89b1c11a 100644 --- a/x-pack/test/task_manager_claimer_update_by_query/config.ts +++ b/x-pack/test/task_manager_claimer_update_by_query/config.ts @@ -30,8 +30,6 @@ export default async function ({ readConfigFile }: FtrConfigProviderContext) { '--xpack.eventLog.indexEntries=true', '--xpack.task_manager.claim_strategy="update_by_query"', '--xpack.task_manager.monitored_aggregated_stats_refresh_rate=5000', - '--xpack.task_manager.ephemeral_tasks.enabled=false', - '--xpack.task_manager.ephemeral_tasks.request_capacity=100', '--xpack.task_manager.metrics_reset_interval=40000', `--xpack.stack_connectors.enableExperimental=${JSON.stringify([ 'crowdstrikeConnectorOn', diff --git a/x-pack/test/task_manager_claimer_update_by_query/plugins/sample_task_plugin_mget/server/init_routes.ts b/x-pack/test/task_manager_claimer_update_by_query/plugins/sample_task_plugin_mget/server/init_routes.ts index acdbae0b0033..77cfc468f3f1 100644 --- a/x-pack/test/task_manager_claimer_update_by_query/plugins/sample_task_plugin_mget/server/init_routes.ts +++ b/x-pack/test/task_manager_claimer_update_by_query/plugins/sample_task_plugin_mget/server/init_routes.ts @@ -219,45 +219,6 @@ export function initRoutes( } ); - router.post( - { - path: `/api/sample_tasks/ephemeral_run_now`, - validate: { - body: schema.object({ - task: schema.object({ - taskType: schema.string(), - state: schema.recordOf(schema.string(), schema.any()), - params: schema.recordOf(schema.string(), schema.any()), - }), - }), - }, - }, - async function ( - context: RequestHandlerContext, - req: KibanaRequest< - any, - any, - { - task: { - taskType: string; - params: Record; - state: Record; - }; - }, - any - >, - res: KibanaResponseFactory - ): Promise> { - const { task } = req.body; - try { - const taskManager = await taskManagerStart; - return res.ok({ body: await taskManager.ephemeralRunNow(task) }); - } catch (err) { - return res.ok({ body: { task, error: `${err}` } }); - } - } - ); - router.post( { path: `/api/sample_tasks/ensure_scheduled`, diff --git a/x-pack/test/task_manager_claimer_update_by_query/plugins/sample_task_plugin_mget/server/plugin.ts b/x-pack/test/task_manager_claimer_update_by_query/plugins/sample_task_plugin_mget/server/plugin.ts index eceb750c207b..c390d9388c93 100644 --- a/x-pack/test/task_manager_claimer_update_by_query/plugins/sample_task_plugin_mget/server/plugin.ts +++ b/x-pack/test/task_manager_claimer_update_by_query/plugins/sample_task_plugin_mget/server/plugin.ts @@ -15,7 +15,6 @@ import { TaskManagerSetupContract, TaskManagerStartContract, ConcreteTaskInstance, - EphemeralTask, } from '@kbn/task-manager-plugin/server'; import { DEFAULT_MAX_WORKERS } from '@kbn/task-manager-plugin/server/config'; import { TaskPriority } from '@kbn/task-manager-plugin/server/task'; @@ -45,8 +44,6 @@ export class SampleTaskManagerFixturePlugin const taskTestingEvents = new EventEmitter(); taskTestingEvents.setMaxListeners(DEFAULT_MAX_WORKERS * 2); - const tmStart = this.taskManagerStart; - const defaultSampleTaskConfig = { timeout: '1m', // This task allows tests to specify its behavior (whether it reschedules itself, whether it errors, etc) @@ -311,37 +308,6 @@ export class SampleTaskManagerFixturePlugin 'A task that can only have two concurrent instance and tracks its execution timing.', ...taskWithTiming, }, - taskWhichExecutesOtherTasksEphemerally: { - title: 'Task Which Executes Other Tasks Ephemerally', - description: 'A sample task used to validate how ephemeral tasks are executed.', - maxAttempts: 1, - timeout: '60s', - createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => ({ - async run() { - const { - params: { tasks = [] }, - } = taskInstance; - - const tm = await tmStart; - const executions = await Promise.all( - (tasks as EphemeralTask[]).map(async (task) => { - return tm - .ephemeralRunNow(task) - .then((result) => ({ - result, - })) - .catch((error) => ({ - error, - })); - }) - ); - - return { - state: { executions }, - }; - }, - }), - }, }); taskManager.addMiddleware({ diff --git a/x-pack/test/task_manager_claimer_update_by_query/test_suites/task_manager/health_route.ts b/x-pack/test/task_manager_claimer_update_by_query/test_suites/task_manager/health_route.ts index c5efe2622055..607a67f92f68 100644 --- a/x-pack/test/task_manager_claimer_update_by_query/test_suites/task_manager/health_route.ts +++ b/x-pack/test/task_manager_claimer_update_by_query/test_suites/task_manager/health_route.ts @@ -299,7 +299,6 @@ export default function ({ getService }: FtrProviderContext) { expect(typeof execution.duration.sampleTask.p95).to.eql('number'); expect(typeof execution.duration.sampleTask.p99).to.eql('number'); - expect(typeof execution.persistence.ephemeral).to.eql('number'); expect(typeof execution.persistence.non_recurring).to.eql('number'); expect(typeof execution.persistence.recurring).to.eql('number'); diff --git a/x-pack/test/task_manager_claimer_update_by_query/test_suites/task_manager/task_management.ts b/x-pack/test/task_manager_claimer_update_by_query/test_suites/task_manager/task_management.ts index f03023fb10ee..33c72ffd1de9 100644 --- a/x-pack/test/task_manager_claimer_update_by_query/test_suites/task_manager/task_management.ts +++ b/x-pack/test/task_manager_claimer_update_by_query/test_suites/task_manager/task_management.ts @@ -200,20 +200,6 @@ export default function ({ getService }: FtrProviderContext) { .then((response: { body: BulkUpdateTaskResult }) => response.body); } - // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139 - // function runEphemeralTaskNow(task: { - // taskType: string; - // params: Record; - // state: Record; - // }) { - // return supertest - // .post('/api/sample_tasks/ephemeral_run_now') - // .set('kbn-xsrf', 'xxx') - // .send({ task }) - // .expect(200) - // .then((response) => response.body); - // } - function scheduleTaskIfNotExists(task: Partial) { return supertest .post('/api/sample_tasks/ensure_scheduled') @@ -915,196 +901,6 @@ export default function ({ getService }: FtrProviderContext) { expect(task.runAt).to.eql(scheduledRunAt); }); }); - - // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139 - // it('should return the resulting task state when asked to run an ephemeral task now', async () => { - // const ephemeralTask = await runEphemeralTaskNow({ - // taskType: 'sampleTask', - // params: {}, - // state: {}, - // }); - - // await retry.try(async () => { - // expect( - // (await historyDocs()).filter((taskDoc) => taskDoc._source.taskId === ephemeralTask.id) - // .length - // ).to.eql(1); - - // expect(ephemeralTask.state.count).to.eql(1); - // }); - - // const secondEphemeralTask = await runEphemeralTaskNow({ - // taskType: 'sampleTask', - // params: {}, - // // pass state from previous ephemeral run as input for the second run - // state: ephemeralTask.state, - // }); - - // // ensure state is cumulative - // expect(secondEphemeralTask.state.count).to.eql(2); - - // await retry.try(async () => { - // // ensure new id is produced for second task execution - // expect( - // (await historyDocs()).filter((taskDoc) => taskDoc._source.taskId === ephemeralTask.id) - // .length - // ).to.eql(1); - // expect( - // (await historyDocs()).filter( - // (taskDoc) => taskDoc._source.taskId === secondEphemeralTask.id - // ).length - // ).to.eql(1); - // }); - // }); - - // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139 - // it('Epheemral task run should only run one instance of a task if its maxConcurrency is 1', async () => { - // const ephemeralTaskWithSingleConcurrency: { - // state: { - // executions: Array<{ - // result: { - // id: string; - // state: { - // timings: Array<{ - // start: number; - // stop: number; - // }>; - // }; - // }; - // }>; - // }; - // } = await runEphemeralTaskNow({ - // taskType: 'taskWhichExecutesOtherTasksEphemerally', - // params: { - // tasks: [ - // { - // taskType: 'timedTaskWithSingleConcurrency', - // params: { delay: 1000 }, - // state: {}, - // }, - // { - // taskType: 'timedTaskWithSingleConcurrency', - // params: { delay: 1000 }, - // state: {}, - // }, - // { - // taskType: 'timedTaskWithSingleConcurrency', - // params: { delay: 1000 }, - // state: {}, - // }, - // { - // taskType: 'timedTaskWithSingleConcurrency', - // params: { delay: 1000 }, - // state: {}, - // }, - // ], - // }, - // state: {}, - // }); - - // ensureOverlappingTasksDontExceedThreshold( - // ephemeralTaskWithSingleConcurrency.state.executions, - // // make sure each task intersects with any other task - // 0 - // ); - // }); - - // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139 - // it('Ephemeral task run should only run as many instances of a task as its maxConcurrency will allow', async () => { - // const ephemeralTaskWithSingleConcurrency: { - // state: { - // executions: Array<{ - // result: { - // id: string; - // state: { - // timings: Array<{ - // start: number; - // stop: number; - // }>; - // }; - // }; - // }>; - // }; - // } = await runEphemeralTaskNow({ - // taskType: 'taskWhichExecutesOtherTasksEphemerally', - // params: { - // tasks: [ - // { - // taskType: 'timedTaskWithLimitedConcurrency', - // params: { delay: 100 }, - // state: {}, - // }, - // { - // taskType: 'timedTaskWithLimitedConcurrency', - // params: { delay: 100 }, - // state: {}, - // }, - // { - // taskType: 'timedTaskWithLimitedConcurrency', - // params: { delay: 100 }, - // state: {}, - // }, - // { - // taskType: 'timedTaskWithLimitedConcurrency', - // params: { delay: 100 }, - // state: {}, - // }, - // { - // taskType: 'timedTaskWithLimitedConcurrency', - // params: { delay: 100 }, - // state: {}, - // }, - // { - // taskType: 'timedTaskWithLimitedConcurrency', - // params: { delay: 100 }, - // state: {}, - // }, - // ], - // }, - // state: {}, - // }); - - // ensureOverlappingTasksDontExceedThreshold( - // ephemeralTaskWithSingleConcurrency.state.executions, - // // make sure each task intersects with, at most, 1 other task - // 1 - // ); - // }); - - // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139 - // it('Ephemeral task executions cant exceed the max workes in Task Manager', async () => { - // const ephemeralTaskWithSingleConcurrency: { - // state: { - // executions: Array<{ - // result: { - // id: string; - // state: { - // timings: Array<{ - // start: number; - // stop: number; - // }>; - // }; - // }; - // }>; - // }; - // } = await runEphemeralTaskNow({ - // taskType: 'taskWhichExecutesOtherTasksEphemerally', - // params: { - // tasks: times(20, () => ({ - // taskType: 'timedTask', - // params: { delay: 100 }, - // state: {}, - // })), - // }, - // state: {}, - // }); - - // ensureOverlappingTasksDontExceedThreshold( - // ephemeralTaskWithSingleConcurrency.state.executions, - // // make sure each task intersects with, at most, 9 other tasks (as max workes is 10) - // 9 - // ); - // }); }); // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139