From 5071f98547b8a8d20f90413b520385efc680a0c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20C=C3=B4t=C3=A9?= Date: Fri, 20 Sep 2024 13:52:26 -0400 Subject: [PATCH] Fix memory leak in task manager task runner (#193612) In this PR, I'm fixing a memory leak that was introduced in https://github.com/elastic/kibana/pull/190093 where every task runner class object wouldn't free up in memory because it subscribed to the `pollIntervalConfiguration$` observable. To fix this, I moved the observable up a class into `TaskPollingLifecycle` which only gets created once on plugin start and then pass down the pollInterval value via a function call the task runner class can call. (cherry picked from commit cf6e8b5ba971fffe2a57e1a7c573e60cc2fbe280) --- .../task_manager/server/polling_lifecycle.ts | 9 ++++++--- .../server/task_running/task_runner.test.ts | 3 +-- .../server/task_running/task_runner.ts | 14 +++++--------- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index 81a65009391f6..4176f7a03312f 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -84,7 +84,6 @@ export class TaskPollingLifecycle implements ITaskEventEmitter; private logger: Logger; public pool: TaskPool; @@ -95,6 +94,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter { + this.currentPollInterval = pollInterval; + }); const emitEvent = (event: TaskLifecycleEvent) => this.events$.next(event); @@ -225,7 +228,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter this.currentPollInterval, }); }; diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts index 4f21bf35619ef..d9016395b6cc2 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts @@ -9,7 +9,6 @@ import _ from 'lodash'; import sinon from 'sinon'; import { secondsFromNow } from '../lib/intervals'; import { asOk, asErr } from '../lib/result_type'; -import { BehaviorSubject } from 'rxjs'; import { createTaskRunError, TaskErrorSource, @@ -2502,7 +2501,7 @@ describe('TaskManagerRunner', () => { }), allowReadingInvalidState: opts.allowReadingInvalidState || false, strategy: opts.strategy ?? CLAIM_STRATEGY_UPDATE_BY_QUERY, - pollIntervalConfiguration$: new BehaviorSubject(500), + getPollInterval: () => 500, }); if (stage === TaskRunningStage.READY_TO_RUN) { diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.ts index 32b48c5caf58b..b68cbe1c85e53 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.ts @@ -11,7 +11,6 @@ * rescheduling, middleware application, etc. */ -import { Observable } from 'rxjs'; import apm from 'elastic-apm-node'; import { v4 as uuidv4 } from 'uuid'; import { withSpan } from '@kbn/apm-utils'; @@ -113,7 +112,7 @@ type Opts = { config: TaskManagerConfig; allowReadingInvalidState: boolean; strategy: string; - pollIntervalConfiguration$: Observable; + getPollInterval: () => number; } & Pick; export enum TaskRunResult { @@ -166,7 +165,7 @@ export class TaskManagerRunner implements TaskRunner { private config: TaskManagerConfig; private readonly taskValidator: TaskValidator; private readonly claimStrategy: string; - private currentPollInterval: number; + private getPollInterval: () => number; /** * Creates an instance of TaskManagerRunner. @@ -192,7 +191,7 @@ export class TaskManagerRunner implements TaskRunner { config, allowReadingInvalidState, strategy, - pollIntervalConfiguration$, + getPollInterval, }: Opts) { this.instance = asPending(sanitizeInstance(instance)); this.definitions = definitions; @@ -212,10 +211,7 @@ export class TaskManagerRunner implements TaskRunner { allowReadingInvalidState, }); this.claimStrategy = strategy; - this.currentPollInterval = config.poll_interval; - pollIntervalConfiguration$.subscribe((pollInterval) => { - this.currentPollInterval = pollInterval; - }); + this.getPollInterval = getPollInterval; } /** @@ -656,7 +652,7 @@ export class TaskManagerRunner implements TaskRunner { startedAt: this.instance.task.startedAt, schedule: updatedTaskSchedule, }, - this.currentPollInterval + this.getPollInterval() ), state, schedule: updatedTaskSchedule,