Skip to content

Commit

Permalink
Remove stale tasks from task conflict count during task claiming (ela…
Browse files Browse the repository at this point in the history
…stic#198416)

In this PR, I'm removing the count of stale tasks from the number of
conflicts during the claiming cycle. I am also adding a new property to
the task manager health report (`claim_stale_tasks`) so we can track
those separately to ensure we have the proper page size.

## To verify
Apply the following diff, observe the new `claim_stale_tasks` in the TM
health API and that conflicts are 0
```
diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts
index 4e74454e8c9..35d7fd872d8 100644
--- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts
+++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts
@@ -145,6 +145,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
     }

     if (
+      false &&
       searchVersion.seqNo === latestVersion.seqNo &&
       searchVersion.primaryTerm === latestVersion.primaryTerm
     ) {
```
  • Loading branch information
mikecote authored Nov 1, 2024
1 parent 115dbec commit 37ebf29
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ interface FillPoolStat extends JsonObject {
claim_duration: number[];
claim_conflicts: number[];
claim_mismatches: number[];
claim_stale_tasks: number[];
result_frequency_percent_as_number: FillPoolResult[];
persistence: TaskPersistence[];
}
Expand Down Expand Up @@ -150,6 +151,7 @@ export function createTaskRunAggregator(
const claimDurationQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const claimConflictsQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const claimMismatchesQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const claimStaleTasksQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const polledTasksByPersistenceQueue =
createRunningAveragedStat<TaskPersistence>(runningAverageWindowSize);
const taskPollingEvents$: Observable<Pick<TaskRunStat, 'polling'>> = combineLatest([
Expand All @@ -161,9 +163,8 @@ export function createTaskRunAggregator(
isOk<ClaimAndFillPoolResult, unknown>(taskEvent.event)
),
map((taskEvent: TaskLifecycleEvent) => {
const { result, stats: { tasksClaimed, tasksUpdated, tasksConflicted } = {} } = (
taskEvent.event as unknown as Ok<ClaimAndFillPoolResult>
).value;
const { result, stats: { tasksClaimed, tasksUpdated, tasksConflicted, staleTasks } = {} } =
(taskEvent.event as unknown as Ok<ClaimAndFillPoolResult>).value;
const duration = (taskEvent?.timing?.stop ?? 0) - (taskEvent?.timing?.start ?? 0);
return {
polling: {
Expand All @@ -179,6 +180,9 @@ export function createTaskRunAggregator(
isNumber(tasksClaimed) && isNumber(tasksUpdated)
? claimMismatchesQueue(tasksUpdated - tasksClaimed)
: claimMismatchesQueue(),
claim_stale_tasks: isNumber(staleTasks)
? claimStaleTasksQueue(staleTasks)
: claimStaleTasksQueue(),
result_frequency_percent_as_number: resultFrequencyQueue(result),
},
};
Expand Down Expand Up @@ -258,6 +262,7 @@ export function createTaskRunAggregator(
claim_duration: [],
claim_conflicts: [],
claim_mismatches: [],
claim_stale_tasks: [],
result_frequency_percent_as_number: [],
persistence: [],
},
Expand Down Expand Up @@ -347,6 +352,7 @@ export function summarizeTaskRunStat(
result_frequency_percent_as_number: pollingResultFrequency,
claim_conflicts: claimConflicts,
claim_mismatches: claimMismatches,
claim_stale_tasks: claimStaleTasks,
persistence: pollingPersistence,
},
drift,
Expand All @@ -373,6 +379,7 @@ export function summarizeTaskRunStat(
duration: calculateRunningAverage(pollingDuration as number[]),
claim_conflicts: calculateRunningAverage(claimConflicts as number[]),
claim_mismatches: calculateRunningAverage(claimMismatches as number[]),
claim_stale_tasks: calculateRunningAverage(claimStaleTasks as number[]),
result_frequency_percent_as_number: {
...DEFAULT_POLLING_FREQUENCIES,
...calculateFrequency<FillPoolResult>(pollingResultFrequency as FillPoolResult[]),
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/routes/health.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,7 @@ function mockHealthStats(overrides = {}) {
claim_conflicts: [0, 100, 75],
claim_mismatches: [0, 100, 75],
claim_duration: [0, 100, 75],
claim_stale_tasks: [0, 100, 75],
result_frequency_percent_as_number: [
FillPoolResult.NoTasksClaimed,
FillPoolResult.NoTasksClaimed,
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/task_manager/server/task_claimers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export interface ClaimOwnershipResult {
tasksClaimed: number;
tasksLeftUnclaimed?: number;
tasksErrors?: number;
staleTasks?: number;
};
docs: ConcreteTaskInstance[];
timing?: TaskTiming;
Expand Down Expand Up @@ -70,6 +71,7 @@ export function getEmptyClaimOwnershipResult(): ClaimOwnershipResult {
tasksUpdated: 0,
tasksConflicted: 0,
tasksClaimed: 0,
staleTasks: 0,
},
docs: [],
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ describe('TaskClaiming', () => {
tasksConflicted: 0,
tasksErrors: 0,
tasksUpdated: 3,
staleTasks: 0,
tasksLeftUnclaimed: 3,
});
expect(result.docs.length).toEqual(3);
Expand Down Expand Up @@ -529,6 +530,7 @@ describe('TaskClaiming', () => {
tasksErrors: 0,
tasksUpdated: 1,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(1);
});
Expand Down Expand Up @@ -640,6 +642,7 @@ describe('TaskClaiming', () => {
tasksErrors: 0,
tasksUpdated: 1,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(1);
});
Expand Down Expand Up @@ -737,6 +740,7 @@ describe('TaskClaiming', () => {
tasksErrors: 0,
tasksUpdated: 1,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(1);
});
Expand Down Expand Up @@ -792,6 +796,7 @@ describe('TaskClaiming', () => {
tasksClaimed: 0,
tasksConflicted: 0,
tasksUpdated: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(0);
});
Expand Down Expand Up @@ -885,6 +890,7 @@ describe('TaskClaiming', () => {
tasksErrors: 0,
tasksUpdated: 2,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(2);
});
Expand Down Expand Up @@ -978,6 +984,7 @@ describe('TaskClaiming', () => {
tasksErrors: 0,
tasksUpdated: 2,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(2);
});
Expand Down Expand Up @@ -1031,7 +1038,7 @@ describe('TaskClaiming', () => {
expect(mockApmTrans.end).toHaveBeenCalledWith('success');

expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 2; stale: 1; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
'task claimer claimed: 2; stale: 1; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);

Expand Down Expand Up @@ -1067,10 +1074,11 @@ describe('TaskClaiming', () => {

expect(result.stats).toEqual({
tasksClaimed: 2,
tasksConflicted: 1,
tasksConflicted: 0,
tasksErrors: 0,
tasksUpdated: 2,
tasksLeftUnclaimed: 0,
staleTasks: 1,
});
expect(result.docs.length).toEqual(2);
});
Expand Down Expand Up @@ -1197,6 +1205,7 @@ describe('TaskClaiming', () => {
tasksErrors: 0,
tasksUpdated: 4,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(4);
});
Expand Down Expand Up @@ -1330,6 +1339,7 @@ describe('TaskClaiming', () => {
tasksErrors: 1,
tasksUpdated: 3,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(3);
});
Expand Down Expand Up @@ -1458,6 +1468,7 @@ describe('TaskClaiming', () => {
tasksErrors: 0,
tasksUpdated: 4,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(4);
for (const r of result.docs) {
Expand Down Expand Up @@ -1699,6 +1710,7 @@ describe('TaskClaiming', () => {
tasksErrors: 1,
tasksUpdated: 3,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(3);
});
Expand Down Expand Up @@ -1829,6 +1841,7 @@ describe('TaskClaiming', () => {
tasksErrors: 0,
tasksUpdated: 3,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(3);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi

// perform the task object updates, deal with errors
const updatedTaskIds: string[] = [];
let conflicts = staleTasks.length;
let conflicts = 0;
let bulkUpdateErrors = 0;
let bulkGetErrors = 0;

Expand Down Expand Up @@ -288,6 +288,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
tasksClaimed: fullTasksToRun.length,
tasksLeftUnclaimed: leftOverTasks.length,
tasksErrors: bulkUpdateErrors + bulkGetErrors,
staleTasks: staleTasks.length,
},
docs: fullTasksToRun,
timing: stopTaskTimer(),
Expand Down

0 comments on commit 37ebf29

Please sign in to comment.