Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task Manager] Don't reset number of skipped runs as long as there is a validation error #172327

Merged
merged 2 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ export type SuccessfulRunResult = {
*/
state: Record<string, unknown>;
taskRunError?: DecoratedError;
skipAttempts?: number;
} & (
| // ensure a SuccessfulRunResult can either specify a new `runAt` or a new `schedule`, but not both
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2300,6 +2300,7 @@ describe('TaskManagerRunner', () => {
asOk({
state: {},
taskRunError,
skipAttempts: 20,
})
);
});
Expand Down
60 changes: 37 additions & 23 deletions x-pack/plugins/task_manager/server/task_running/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,23 @@ export class TaskManagerRunner implements TaskRunner {
}
}

const result = taskParamsValidation?.error
const hasSkipError = !isUndefined(taskParamsValidation?.error);
let shouldSkip = false;
let shouldKeepSkipAttempts = false;

if (hasSkipError) {
const reachedMaxSkipAttempts = this.hasReachedMaxSkipAttempts(modifiedContext.taskInstance);
shouldSkip = !reachedMaxSkipAttempts;
shouldKeepSkipAttempts = reachedMaxSkipAttempts;
}

const result = shouldSkip
? taskParamsValidation
: await this.executionContext.withContext(ctx, () =>
withSpan({ name: 'run', type: 'task manager' }, () => this.task!.run())
);

const validatedResult = this.validateResult(result);
const validatedResult = this.validateResult(shouldKeepSkipAttempts, result);
const processedResult = await withSpan({ name: 'process result', type: 'task manager' }, () =>
this.processResult(validatedResult, stopTaskTimer())
);
Expand Down Expand Up @@ -371,8 +381,7 @@ export class TaskManagerRunner implements TaskRunner {

private validateTaskParams({ taskInstance }: RunContext) {
let error;
const { state, taskType, params, id, numSkippedRuns = 0 } = taskInstance;
const { max_attempts: maxAttempts } = this.requeueInvalidTasksConfig;
const { state, taskType, params, id } = taskInstance;

try {
const paramsSchema = this.definition.paramsSchema;
Expand All @@ -381,22 +390,15 @@ export class TaskManagerRunner implements TaskRunner {
}
} catch (err) {
this.logger.warn(`Task (${taskType}/${id}) has a validation error: ${err.message}`);
if (numSkippedRuns < maxAttempts) {
error = createSkipError(err);
} else {
this.logger.warn(
`Task Manager has reached the max skip attempts for task ${taskType}/${id}`
);
}
error = createSkipError(err);
}

return { ...(error ? { error } : {}), state };
}

private async validateIndirectTaskParams({ taskInstance }: RunContext) {
let error;
const { state, taskType, id, numSkippedRuns = 0 } = taskInstance;
const { max_attempts: maxAttempts } = this.requeueInvalidTasksConfig;
const { state, taskType, id } = taskInstance;
const indirectParamsSchema = this.definition.indirectParamsSchema;

if (this.task?.loadIndirectParams && !!indirectParamsSchema) {
Expand All @@ -410,13 +412,7 @@ export class TaskManagerRunner implements TaskRunner {
this.logger.warn(
`Task (${taskType}/${id}) has a validation error in its indirect params: ${err.message}`
);
if (numSkippedRuns < maxAttempts) {
error = createSkipError(err);
} else {
this.logger.warn(
`Task Manager has reached the max skip attempts for task ${taskType}/${id}`
);
}
error = createSkipError(err);
}
}
}
Expand Down Expand Up @@ -554,11 +550,17 @@ export class TaskManagerRunner implements TaskRunner {
}

private validateResult(
shouldKeepSkipAttempts: boolean,
result?: SuccessfulRunResult | FailedRunResult | void
): Result<SuccessfulRunResult, FailedRunResult> {
return isFailedRunResult(result)
? asErr({ ...result, error: result.error })
: asOk(result || EMPTY_RUN_RESULT);
: asOk({
...(result || EMPTY_RUN_RESULT),
...(shouldKeepSkipAttempts
? { skipAttempts: this.requeueInvalidTasksConfig.max_attempts }
: {}),
});
}

private async releaseClaimAndIncrementAttempts(): Promise<Result<ConcreteTaskInstance, Error>> {
Expand Down Expand Up @@ -662,14 +664,14 @@ export class TaskManagerRunner implements TaskRunner {
state,
attempts = 0,
skipAttempts,
}: SuccessfulRunResult & { attempts: number; skipAttempts: number }) => {
}: SuccessfulRunResult & { attempts: number }) => {
const { startedAt, schedule, numSkippedRuns } = this.instance.task;
const { taskRunError } = unwrap(result);
let requeueInvalidTaskAttempts = skipAttempts || numSkippedRuns || 0;

// Alerting TaskRunner returns SuccessResult even though there is an error
// therefore we use "taskRunError" to be sure that there wasn't any error
if (isUndefined(skipAttempts) && taskRunError === undefined) {
if (isUndefined(skipAttempts) && isUndefined(taskRunError)) {
requeueInvalidTaskAttempts = 0;
}

Expand Down Expand Up @@ -838,6 +840,18 @@ export class TaskManagerRunner implements TaskRunner {
? this.definition.maxAttempts
: this.defaultMaxAttempts;
}

private hasReachedMaxSkipAttempts(taskInstance: ConcreteTaskInstance) {
const { taskType, id, numSkippedRuns = 0 } = taskInstance;
const { max_attempts: maxAttempts } = this.requeueInvalidTasksConfig;

if (numSkippedRuns >= maxAttempts) {
this.logger.warn(`Task Manager has reached the max skip attempts for task ${taskType}/${id}`);
return true;
}

return false;
}
}

function sanitizeInstance(instance: ConcreteTaskInstance): ConcreteTaskInstance {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export default function ({ getService }: FtrProviderContext) {
expect(task.numSkippedRuns).to.eql(2);
});

let newLastRun: string;
await retry.try(async () => {
const task = await currentTask(createdTask.id);
expect(task.attempts).to.eql(0);
Expand All @@ -63,6 +64,18 @@ export default function ({ getService }: FtrProviderContext) {
expect(task.numSkippedRuns).to.eql(2);
// keeps rescheduling after skips
expect(new Date(task.runAt).getTime()).to.greaterThan(new Date(lastRunAt).getTime());
newLastRun = task.runAt;
});

// should keep running the rule after 2 skips and 1 successful run
await retry.try(async () => {
const task = await currentTask(createdTask.id);
expect(task.attempts).to.eql(0);
expect(task.retryAt).to.eql(null);
// skip attempts remains as it is
expect(task.numSkippedRuns).to.eql(2);
// keeps rescheduling after skips
expect(new Date(task.runAt).getTime()).to.greaterThan(new Date(newLastRun).getTime());
});
});

Expand Down
Loading