Skip to content

Commit

Permalink
[Response Ops][Task Manager] Adding background task to mark removed t…
Browse files Browse the repository at this point in the history
…ask types as `unrecognized` (elastic#199057)

Resolves elastic#192686

## Summary

Creates a background task to search for removed task types and mark them
as unrecognized. Removes the current logic that does this during the
task claim cycle for both task claim strategies.

---------

Co-authored-by: kibanamachine <[email protected]>
(cherry picked from commit be949d6)
  • Loading branch information
ymao1 committed Nov 11, 2024
1 parent 7b081fe commit 29a4719
Show file tree
Hide file tree
Showing 22 changed files with 547 additions and 556 deletions.
13 changes: 11 additions & 2 deletions x-pack/plugins/task_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { TaskManagerConfig } from './config';
import { createInitialMiddleware, addMiddlewareToChain, Middleware } from './lib/middleware';
import { removeIfExists } from './lib/remove_if_exists';
import { setupSavedObjects, BACKGROUND_TASK_NODE_SO_NAME, TASK_SO_NAME } from './saved_objects';
import { TaskDefinitionRegistry, TaskTypeDictionary, REMOVED_TYPES } from './task_type_dictionary';
import { TaskDefinitionRegistry, TaskTypeDictionary } from './task_type_dictionary';
import { AggregationOpts, FetchResult, SearchOpts, TaskStore } from './task_store';
import { createManagedConfiguration } from './lib/create_managed_configuration';
import { TaskScheduling } from './task_scheduling';
Expand All @@ -45,6 +45,10 @@ import { metricsStream, Metrics } from './metrics';
import { TaskManagerMetricsCollector } from './metrics/task_metrics_collector';
import { TaskPartitioner } from './lib/task_partitioner';
import { getDefaultCapacity } from './lib/get_default_capacity';
import {
registerMarkRemovedTasksAsUnrecognizedDefinition,
scheduleMarkRemovedTasksAsUnrecognizedDefinition,
} from './removed_tasks/mark_removed_tasks_as_unrecognized';

export interface TaskManagerSetupContract {
/**
Expand Down Expand Up @@ -221,6 +225,11 @@ export class TaskManagerPlugin
}

registerDeleteInactiveNodesTaskDefinition(this.logger, core.getStartServices, this.definitions);
registerMarkRemovedTasksAsUnrecognizedDefinition(
this.logger,
core.getStartServices,
this.definitions
);

if (this.config.unsafe.exclude_task_types.length) {
this.logger.warn(
Expand Down Expand Up @@ -332,7 +341,6 @@ export class TaskManagerPlugin
this.taskPollingLifecycle = new TaskPollingLifecycle({
config: this.config!,
definitions: this.definitions,
unusedTypes: REMOVED_TYPES,
logger: this.logger,
executionContext,
taskStore,
Expand Down Expand Up @@ -384,6 +392,7 @@ export class TaskManagerPlugin
});

scheduleDeleteInactiveNodesTaskDefinition(this.logger, taskScheduling).catch(() => {});
scheduleMarkRemovedTasksAsUnrecognizedDefinition(this.logger, taskScheduling).catch(() => {});

return {
fetch: (opts: SearchOpts): Promise<FetchResult> => taskStore.fetch(opts),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ describe('TaskPollingLifecycle', () => {
},
taskStore: mockTaskStore,
logger: taskManagerLogger,
unusedTypes: [],
definitions: new TaskTypeDictionary(taskManagerLogger),
middleware: createInitialMiddleware(),
startingCapacity: 20,
Expand Down
3 changes: 0 additions & 3 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ export interface ITaskEventEmitter<T> {
export type TaskPollingLifecycleOpts = {
logger: Logger;
definitions: TaskTypeDictionary;
unusedTypes: string[];
taskStore: TaskStore;
config: TaskManagerConfig;
middleware: Middleware;
Expand Down Expand Up @@ -115,7 +114,6 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
config,
taskStore,
definitions,
unusedTypes,
executionContext,
usageCounter,
taskPartitioner,
Expand Down Expand Up @@ -153,7 +151,6 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
maxAttempts: config.max_attempts,
excludedTaskTypes: config.unsafe.exclude_task_types,
definitions,
unusedTypes,
logger: this.logger,
getAvailableCapacity: (taskType?: string) => this.pool.availableCapacity(taskType),
taskPartitioner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ describe('mark_available_tasks_as_claimed', () => {
fieldUpdates,
claimableTaskTypes: definitions.getAllTypes(),
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: Array.from(definitions).reduce((accumulator, [type, { maxAttempts }]) => {
return { ...accumulator, [type]: maxAttempts || defaultMaxAttempts };
}, {}),
Expand Down Expand Up @@ -153,8 +152,6 @@ if (doc['task.runAt'].size()!=0) {
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
ctx.op = "noop";
}`,
Expand All @@ -167,7 +164,6 @@ if (doc['task.runAt'].size()!=0) {
},
claimableTaskTypes: ['sampleTask', 'otherTask'],
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: {
sampleTask: 5,
otherTask: 1,
Expand Down Expand Up @@ -242,7 +238,6 @@ if (doc['task.runAt'].size()!=0) {
fieldUpdates,
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: {
foo: 5,
bar: 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,13 @@ export interface UpdateFieldsAndMarkAsFailedOpts {
};
claimableTaskTypes: string[];
skippedTaskTypes: string[];
unusedTaskTypes: string[];
taskMaxAttempts: { [field: string]: number };
}

export const updateFieldsAndMarkAsFailed = ({
fieldUpdates,
claimableTaskTypes,
skippedTaskTypes,
unusedTaskTypes,
taskMaxAttempts,
}: UpdateFieldsAndMarkAsFailedOpts): ScriptClause => {
const setScheduledAtScript = `if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
Expand All @@ -227,8 +225,6 @@ export const updateFieldsAndMarkAsFailed = ({
source: `
if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) {
${setScheduledAtAndMarkAsClaimed}
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
ctx.op = "noop";
}`,
Expand All @@ -238,7 +234,6 @@ export const updateFieldsAndMarkAsFailed = ({
fieldUpdates,
claimableTaskTypes,
skippedTaskTypes,
unusedTaskTypes,
taskMaxAttempts,
},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ describe('TaskClaiming', () => {
strategy: 'non-default',
definitions,
excludedTaskTypes: [],
unusedTypes: [],
taskStore: taskStoreMock.create({ taskManagerId: '' }),
maxAttempts: 2,
getAvailableCapacity: () => 10,
Expand Down Expand Up @@ -134,7 +133,6 @@ describe('TaskClaiming', () => {
strategy: 'default',
definitions,
excludedTaskTypes: [],
unusedTypes: [],
taskStore: taskStoreMock.create({ taskManagerId: '' }),
maxAttempts: 2,
getAvailableCapacity: () => 10,
Expand Down
4 changes: 0 additions & 4 deletions x-pack/plugins/task_manager/server/queries/task_claiming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ export interface TaskClaimingOpts {
logger: Logger;
strategy: string;
definitions: TaskTypeDictionary;
unusedTypes: string[];
taskStore: TaskStore;
maxAttempts: number;
excludedTaskTypes: string[];
Expand Down Expand Up @@ -92,7 +91,6 @@ export class TaskClaiming {
private readonly taskClaimingBatchesByType: TaskClaimingBatches;
private readonly taskMaxAttempts: Record<string, number>;
private readonly excludedTaskTypes: string[];
private readonly unusedTypes: string[];
private readonly taskClaimer: TaskClaimerFn;
private readonly taskPartitioner: TaskPartitioner;

Expand All @@ -111,7 +109,6 @@ export class TaskClaiming {
this.taskClaimingBatchesByType = this.partitionIntoClaimingBatches(this.definitions);
this.taskMaxAttempts = Object.fromEntries(this.normalizeMaxAttempts(this.definitions));
this.excludedTaskTypes = opts.excludedTaskTypes;
this.unusedTypes = opts.unusedTypes;
this.taskClaimer = getTaskClaimer(this.logger, opts.strategy);
this.events$ = new Subject<TaskClaim>();
this.taskPartitioner = opts.taskPartitioner;
Expand Down Expand Up @@ -178,7 +175,6 @@ export class TaskClaiming {
taskStore: this.taskStore,
events$: this.events$,
getCapacity: this.getAvailableCapacity,
unusedTypes: this.unusedTypes,
definitions: this.definitions,
taskMaxAttempts: this.taskMaxAttempts,
excludedTaskTypes: this.excludedTaskTypes,
Expand Down
Loading

0 comments on commit 29a4719

Please sign in to comment.