From b9ddfc5bc623550144fa25994a8fa711a531d088 Mon Sep 17 00:00:00 2001 From: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Date: Sat, 2 Nov 2024 01:08:24 +1100 Subject: [PATCH] [8.x] Remove stale tasks from task conflict count during task claiming (#198416) (#198666) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Backport This will backport the following commits from `main` to `8.x`: - [Remove stale tasks from task conflict count during task claiming (#198416)](https://github.com/elastic/kibana/pull/198416) ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sqren/backport) Co-authored-by: Mike Côté --- .../server/monitoring/task_run_statistics.ts | 13 ++++++++++--- .../task_manager/server/routes/health.test.ts | 1 + .../task_manager/server/task_claimers/index.ts | 2 ++ .../server/task_claimers/strategy_mget.test.ts | 17 +++++++++++++++-- .../server/task_claimers/strategy_mget.ts | 3 ++- 5 files changed, 30 insertions(+), 6 deletions(-) diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts index 517b29a54cd64..6007508451d9e 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts @@ -45,6 +45,7 @@ interface FillPoolStat extends JsonObject { claim_duration: number[]; claim_conflicts: number[]; claim_mismatches: number[]; + claim_stale_tasks: number[]; result_frequency_percent_as_number: FillPoolResult[]; persistence: TaskPersistence[]; } @@ -150,6 +151,7 @@ export function createTaskRunAggregator( const claimDurationQueue = createRunningAveragedStat(runningAverageWindowSize); const claimConflictsQueue = createRunningAveragedStat(runningAverageWindowSize); const claimMismatchesQueue = createRunningAveragedStat(runningAverageWindowSize); + const claimStaleTasksQueue = createRunningAveragedStat(runningAverageWindowSize); const polledTasksByPersistenceQueue = createRunningAveragedStat(runningAverageWindowSize); const taskPollingEvents$: Observable> = combineLatest([ @@ -161,9 +163,8 @@ export function createTaskRunAggregator( isOk(taskEvent.event) ), map((taskEvent: TaskLifecycleEvent) => { - const { result, stats: { tasksClaimed, tasksUpdated, tasksConflicted } = {} } = ( - taskEvent.event as unknown as Ok - ).value; + const { result, stats: { tasksClaimed, tasksUpdated, tasksConflicted, staleTasks } = {} } = + (taskEvent.event as unknown as Ok).value; const duration = (taskEvent?.timing?.stop ?? 0) - (taskEvent?.timing?.start ?? 0); return { polling: { @@ -179,6 +180,9 @@ export function createTaskRunAggregator( isNumber(tasksClaimed) && isNumber(tasksUpdated) ? claimMismatchesQueue(tasksUpdated - tasksClaimed) : claimMismatchesQueue(), + claim_stale_tasks: isNumber(staleTasks) + ? claimStaleTasksQueue(staleTasks) + : claimStaleTasksQueue(), result_frequency_percent_as_number: resultFrequencyQueue(result), }, }; @@ -258,6 +262,7 @@ export function createTaskRunAggregator( claim_duration: [], claim_conflicts: [], claim_mismatches: [], + claim_stale_tasks: [], result_frequency_percent_as_number: [], persistence: [], }, @@ -347,6 +352,7 @@ export function summarizeTaskRunStat( result_frequency_percent_as_number: pollingResultFrequency, claim_conflicts: claimConflicts, claim_mismatches: claimMismatches, + claim_stale_tasks: claimStaleTasks, persistence: pollingPersistence, }, drift, @@ -373,6 +379,7 @@ export function summarizeTaskRunStat( duration: calculateRunningAverage(pollingDuration as number[]), claim_conflicts: calculateRunningAverage(claimConflicts as number[]), claim_mismatches: calculateRunningAverage(claimMismatches as number[]), + claim_stale_tasks: calculateRunningAverage(claimStaleTasks as number[]), result_frequency_percent_as_number: { ...DEFAULT_POLLING_FREQUENCIES, ...calculateFrequency(pollingResultFrequency as FillPoolResult[]), diff --git a/x-pack/plugins/task_manager/server/routes/health.test.ts b/x-pack/plugins/task_manager/server/routes/health.test.ts index e3a7eb278d225..1e06ea91a6fcf 100644 --- a/x-pack/plugins/task_manager/server/routes/health.test.ts +++ b/x-pack/plugins/task_manager/server/routes/health.test.ts @@ -942,6 +942,7 @@ function mockHealthStats(overrides = {}) { claim_conflicts: [0, 100, 75], claim_mismatches: [0, 100, 75], claim_duration: [0, 100, 75], + claim_stale_tasks: [0, 100, 75], result_frequency_percent_as_number: [ FillPoolResult.NoTasksClaimed, FillPoolResult.NoTasksClaimed, diff --git a/x-pack/plugins/task_manager/server/task_claimers/index.ts b/x-pack/plugins/task_manager/server/task_claimers/index.ts index 4b6c8b96d6ca4..178ebacf68cb9 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/index.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/index.ts @@ -40,6 +40,7 @@ export interface ClaimOwnershipResult { tasksClaimed: number; tasksLeftUnclaimed?: number; tasksErrors?: number; + staleTasks?: number; }; docs: ConcreteTaskInstance[]; timing?: TaskTiming; @@ -70,6 +71,7 @@ export function getEmptyClaimOwnershipResult(): ClaimOwnershipResult { tasksUpdated: 0, tasksConflicted: 0, tasksClaimed: 0, + staleTasks: 0, }, docs: [], }; diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts index 0d3560c3bec6e..e089d3b2d8785 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts @@ -433,6 +433,7 @@ describe('TaskClaiming', () => { tasksConflicted: 0, tasksErrors: 0, tasksUpdated: 3, + staleTasks: 0, tasksLeftUnclaimed: 3, }); expect(result.docs.length).toEqual(3); @@ -529,6 +530,7 @@ describe('TaskClaiming', () => { tasksErrors: 0, tasksUpdated: 1, tasksLeftUnclaimed: 0, + staleTasks: 0, }); expect(result.docs.length).toEqual(1); }); @@ -640,6 +642,7 @@ describe('TaskClaiming', () => { tasksErrors: 0, tasksUpdated: 1, tasksLeftUnclaimed: 0, + staleTasks: 0, }); expect(result.docs.length).toEqual(1); }); @@ -737,6 +740,7 @@ describe('TaskClaiming', () => { tasksErrors: 0, tasksUpdated: 1, tasksLeftUnclaimed: 0, + staleTasks: 0, }); expect(result.docs.length).toEqual(1); }); @@ -792,6 +796,7 @@ describe('TaskClaiming', () => { tasksClaimed: 0, tasksConflicted: 0, tasksUpdated: 0, + staleTasks: 0, }); expect(result.docs.length).toEqual(0); }); @@ -885,6 +890,7 @@ describe('TaskClaiming', () => { tasksErrors: 0, tasksUpdated: 2, tasksLeftUnclaimed: 0, + staleTasks: 0, }); expect(result.docs.length).toEqual(2); }); @@ -978,6 +984,7 @@ describe('TaskClaiming', () => { tasksErrors: 0, tasksUpdated: 2, tasksLeftUnclaimed: 0, + staleTasks: 0, }); expect(result.docs.length).toEqual(2); }); @@ -1031,7 +1038,7 @@ describe('TaskClaiming', () => { expect(mockApmTrans.end).toHaveBeenCalledWith('success'); expect(taskManagerLogger.debug).toHaveBeenCalledWith( - 'task claimer claimed: 2; stale: 1; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', + 'task claimer claimed: 2; stale: 1; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;', { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); @@ -1067,10 +1074,11 @@ describe('TaskClaiming', () => { expect(result.stats).toEqual({ tasksClaimed: 2, - tasksConflicted: 1, + tasksConflicted: 0, tasksErrors: 0, tasksUpdated: 2, tasksLeftUnclaimed: 0, + staleTasks: 1, }); expect(result.docs.length).toEqual(2); }); @@ -1197,6 +1205,7 @@ describe('TaskClaiming', () => { tasksErrors: 0, tasksUpdated: 4, tasksLeftUnclaimed: 0, + staleTasks: 0, }); expect(result.docs.length).toEqual(4); }); @@ -1330,6 +1339,7 @@ describe('TaskClaiming', () => { tasksErrors: 1, tasksUpdated: 3, tasksLeftUnclaimed: 0, + staleTasks: 0, }); expect(result.docs.length).toEqual(3); }); @@ -1458,6 +1468,7 @@ describe('TaskClaiming', () => { tasksErrors: 0, tasksUpdated: 4, tasksLeftUnclaimed: 0, + staleTasks: 0, }); expect(result.docs.length).toEqual(4); for (const r of result.docs) { @@ -1699,6 +1710,7 @@ describe('TaskClaiming', () => { tasksErrors: 1, tasksUpdated: 3, tasksLeftUnclaimed: 0, + staleTasks: 0, }); expect(result.docs.length).toEqual(3); }); @@ -1829,6 +1841,7 @@ describe('TaskClaiming', () => { tasksErrors: 0, tasksUpdated: 3, tasksLeftUnclaimed: 0, + staleTasks: 0, }); expect(result.docs.length).toEqual(3); }); diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index 4b7e5ec6b3691..4e74454e8c982 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -196,7 +196,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise