Skip to content

Commit

Permalink
[8.x] Remove stale tasks from task conflict count during task claiming (
Browse files Browse the repository at this point in the history
#198416) (#198666)

# Backport

This will backport the following commits from `main` to `8.x`:
- [Remove stale tasks from task conflict count during task claiming
(#198416)](#198416)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Mike
Côté","email":"[email protected]"},"sourceCommit":{"committedDate":"2024-11-01T12:19:04Z","message":"Remove
stale tasks from task conflict count during task claiming
(#198416)\n\nIn this PR, I'm removing the count of stale tasks from the
number of\r\nconflicts during the claiming cycle. I am also adding a new
property to\r\nthe task manager health report (`claim_stale_tasks`) so
we can track\r\nthose separately to ensure we have the proper page
size.\r\n\r\n## To verify\r\nApply the following diff, observe the new
`claim_stale_tasks` in the TM\r\nhealth API and that conflicts are
0\r\n```\r\ndiff --git
a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts
b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts\r\nindex
4e74454e8c9..35d7fd872d8 100644\r\n---
a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts\r\n+++
b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts\r\n@@
-145,6 +145,7 @@ async function claimAvailableTasks(opts:
TaskClaimerOpts): Promise<ClaimOwnershi\r\n }\r\n\r\n if (\r\n+ false
&&\r\n searchVersion.seqNo === latestVersion.seqNo &&\r\n
searchVersion.primaryTerm === latestVersion.primaryTerm\r\n )
{\r\n```","sha":"37ebf29f87047e8b96e3c2cd378c647a4f2ca797","branchLabelMapping":{"^v9.0.0$":"main","^v8.17.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Feature:Task
Manager","Team:ResponseOps","v9.0.0","backport:prev-minor","v8.17.0"],"title":"Remove
stale tasks from task conflict count during task
claiming","number":198416,"url":"https://github.com/elastic/kibana/pull/198416","mergeCommit":{"message":"Remove
stale tasks from task conflict count during task claiming
(#198416)\n\nIn this PR, I'm removing the count of stale tasks from the
number of\r\nconflicts during the claiming cycle. I am also adding a new
property to\r\nthe task manager health report (`claim_stale_tasks`) so
we can track\r\nthose separately to ensure we have the proper page
size.\r\n\r\n## To verify\r\nApply the following diff, observe the new
`claim_stale_tasks` in the TM\r\nhealth API and that conflicts are
0\r\n```\r\ndiff --git
a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts
b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts\r\nindex
4e74454e8c9..35d7fd872d8 100644\r\n---
a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts\r\n+++
b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts\r\n@@
-145,6 +145,7 @@ async function claimAvailableTasks(opts:
TaskClaimerOpts): Promise<ClaimOwnershi\r\n }\r\n\r\n if (\r\n+ false
&&\r\n searchVersion.seqNo === latestVersion.seqNo &&\r\n
searchVersion.primaryTerm === latestVersion.primaryTerm\r\n )
{\r\n```","sha":"37ebf29f87047e8b96e3c2cd378c647a4f2ca797"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/198416","number":198416,"mergeCommit":{"message":"Remove
stale tasks from task conflict count during task claiming
(#198416)\n\nIn this PR, I'm removing the count of stale tasks from the
number of\r\nconflicts during the claiming cycle. I am also adding a new
property to\r\nthe task manager health report (`claim_stale_tasks`) so
we can track\r\nthose separately to ensure we have the proper page
size.\r\n\r\n## To verify\r\nApply the following diff, observe the new
`claim_stale_tasks` in the TM\r\nhealth API and that conflicts are
0\r\n```\r\ndiff --git
a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts
b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts\r\nindex
4e74454e8c9..35d7fd872d8 100644\r\n---
a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts\r\n+++
b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts\r\n@@
-145,6 +145,7 @@ async function claimAvailableTasks(opts:
TaskClaimerOpts): Promise<ClaimOwnershi\r\n }\r\n\r\n if (\r\n+ false
&&\r\n searchVersion.seqNo === latestVersion.seqNo &&\r\n
searchVersion.primaryTerm === latestVersion.primaryTerm\r\n )
{\r\n```","sha":"37ebf29f87047e8b96e3c2cd378c647a4f2ca797"}},{"branch":"8.x","label":"v8.17.0","branchLabelMappingKey":"^v8.17.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Mike Côté <[email protected]>
  • Loading branch information
kibanamachine and mikecote authored Nov 1, 2024
1 parent 4371eb8 commit b9ddfc5
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ interface FillPoolStat extends JsonObject {
claim_duration: number[];
claim_conflicts: number[];
claim_mismatches: number[];
claim_stale_tasks: number[];
result_frequency_percent_as_number: FillPoolResult[];
persistence: TaskPersistence[];
}
Expand Down Expand Up @@ -150,6 +151,7 @@ export function createTaskRunAggregator(
const claimDurationQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const claimConflictsQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const claimMismatchesQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const claimStaleTasksQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const polledTasksByPersistenceQueue =
createRunningAveragedStat<TaskPersistence>(runningAverageWindowSize);
const taskPollingEvents$: Observable<Pick<TaskRunStat, 'polling'>> = combineLatest([
Expand All @@ -161,9 +163,8 @@ export function createTaskRunAggregator(
isOk<ClaimAndFillPoolResult, unknown>(taskEvent.event)
),
map((taskEvent: TaskLifecycleEvent) => {
const { result, stats: { tasksClaimed, tasksUpdated, tasksConflicted } = {} } = (
taskEvent.event as unknown as Ok<ClaimAndFillPoolResult>
).value;
const { result, stats: { tasksClaimed, tasksUpdated, tasksConflicted, staleTasks } = {} } =
(taskEvent.event as unknown as Ok<ClaimAndFillPoolResult>).value;
const duration = (taskEvent?.timing?.stop ?? 0) - (taskEvent?.timing?.start ?? 0);
return {
polling: {
Expand All @@ -179,6 +180,9 @@ export function createTaskRunAggregator(
isNumber(tasksClaimed) && isNumber(tasksUpdated)
? claimMismatchesQueue(tasksUpdated - tasksClaimed)
: claimMismatchesQueue(),
claim_stale_tasks: isNumber(staleTasks)
? claimStaleTasksQueue(staleTasks)
: claimStaleTasksQueue(),
result_frequency_percent_as_number: resultFrequencyQueue(result),
},
};
Expand Down Expand Up @@ -258,6 +262,7 @@ export function createTaskRunAggregator(
claim_duration: [],
claim_conflicts: [],
claim_mismatches: [],
claim_stale_tasks: [],
result_frequency_percent_as_number: [],
persistence: [],
},
Expand Down Expand Up @@ -347,6 +352,7 @@ export function summarizeTaskRunStat(
result_frequency_percent_as_number: pollingResultFrequency,
claim_conflicts: claimConflicts,
claim_mismatches: claimMismatches,
claim_stale_tasks: claimStaleTasks,
persistence: pollingPersistence,
},
drift,
Expand All @@ -373,6 +379,7 @@ export function summarizeTaskRunStat(
duration: calculateRunningAverage(pollingDuration as number[]),
claim_conflicts: calculateRunningAverage(claimConflicts as number[]),
claim_mismatches: calculateRunningAverage(claimMismatches as number[]),
claim_stale_tasks: calculateRunningAverage(claimStaleTasks as number[]),
result_frequency_percent_as_number: {
...DEFAULT_POLLING_FREQUENCIES,
...calculateFrequency<FillPoolResult>(pollingResultFrequency as FillPoolResult[]),
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/routes/health.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,7 @@ function mockHealthStats(overrides = {}) {
claim_conflicts: [0, 100, 75],
claim_mismatches: [0, 100, 75],
claim_duration: [0, 100, 75],
claim_stale_tasks: [0, 100, 75],
result_frequency_percent_as_number: [
FillPoolResult.NoTasksClaimed,
FillPoolResult.NoTasksClaimed,
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/task_manager/server/task_claimers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export interface ClaimOwnershipResult {
tasksClaimed: number;
tasksLeftUnclaimed?: number;
tasksErrors?: number;
staleTasks?: number;
};
docs: ConcreteTaskInstance[];
timing?: TaskTiming;
Expand Down Expand Up @@ -70,6 +71,7 @@ export function getEmptyClaimOwnershipResult(): ClaimOwnershipResult {
tasksUpdated: 0,
tasksConflicted: 0,
tasksClaimed: 0,
staleTasks: 0,
},
docs: [],
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ describe('TaskClaiming', () => {
tasksConflicted: 0,
tasksErrors: 0,
tasksUpdated: 3,
staleTasks: 0,
tasksLeftUnclaimed: 3,
});
expect(result.docs.length).toEqual(3);
Expand Down Expand Up @@ -529,6 +530,7 @@ describe('TaskClaiming', () => {
tasksErrors: 0,
tasksUpdated: 1,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(1);
});
Expand Down Expand Up @@ -640,6 +642,7 @@ describe('TaskClaiming', () => {
tasksErrors: 0,
tasksUpdated: 1,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(1);
});
Expand Down Expand Up @@ -737,6 +740,7 @@ describe('TaskClaiming', () => {
tasksErrors: 0,
tasksUpdated: 1,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(1);
});
Expand Down Expand Up @@ -792,6 +796,7 @@ describe('TaskClaiming', () => {
tasksClaimed: 0,
tasksConflicted: 0,
tasksUpdated: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(0);
});
Expand Down Expand Up @@ -885,6 +890,7 @@ describe('TaskClaiming', () => {
tasksErrors: 0,
tasksUpdated: 2,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(2);
});
Expand Down Expand Up @@ -978,6 +984,7 @@ describe('TaskClaiming', () => {
tasksErrors: 0,
tasksUpdated: 2,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(2);
});
Expand Down Expand Up @@ -1031,7 +1038,7 @@ describe('TaskClaiming', () => {
expect(mockApmTrans.end).toHaveBeenCalledWith('success');

expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 2; stale: 1; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
'task claimer claimed: 2; stale: 1; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);

Expand Down Expand Up @@ -1067,10 +1074,11 @@ describe('TaskClaiming', () => {

expect(result.stats).toEqual({
tasksClaimed: 2,
tasksConflicted: 1,
tasksConflicted: 0,
tasksErrors: 0,
tasksUpdated: 2,
tasksLeftUnclaimed: 0,
staleTasks: 1,
});
expect(result.docs.length).toEqual(2);
});
Expand Down Expand Up @@ -1197,6 +1205,7 @@ describe('TaskClaiming', () => {
tasksErrors: 0,
tasksUpdated: 4,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(4);
});
Expand Down Expand Up @@ -1330,6 +1339,7 @@ describe('TaskClaiming', () => {
tasksErrors: 1,
tasksUpdated: 3,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(3);
});
Expand Down Expand Up @@ -1458,6 +1468,7 @@ describe('TaskClaiming', () => {
tasksErrors: 0,
tasksUpdated: 4,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(4);
for (const r of result.docs) {
Expand Down Expand Up @@ -1699,6 +1710,7 @@ describe('TaskClaiming', () => {
tasksErrors: 1,
tasksUpdated: 3,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(3);
});
Expand Down Expand Up @@ -1829,6 +1841,7 @@ describe('TaskClaiming', () => {
tasksErrors: 0,
tasksUpdated: 3,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(3);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi

// perform the task object updates, deal with errors
const updatedTaskIds: string[] = [];
let conflicts = staleTasks.length;
let conflicts = 0;
let bulkUpdateErrors = 0;
let bulkGetErrors = 0;

Expand Down Expand Up @@ -288,6 +288,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
tasksClaimed: fullTasksToRun.length,
tasksLeftUnclaimed: leftOverTasks.length,
tasksErrors: bulkUpdateErrors + bulkGetErrors,
staleTasks: staleTasks.length,
},
docs: fullTasksToRun,
timing: stopTaskTimer(),
Expand Down

0 comments on commit b9ddfc5

Please sign in to comment.