Skip to content

Commit

Permalink
[Response Ops][Task Manager] Stop polling on Kibana shutdown (#195415)
Browse files Browse the repository at this point in the history
Resolves #160329

## Summary

Stop polling when task manager `stop()` is called. When Kibana receives
a `SIGTERM` signal, all the plugin stop functions are called. When TM
receives this signal, it should immediately stop claiming any new tasks
and then there is a grace period before kubernetes kills the pod that
allows any running tasks to complete.

I experimented with removing the code that prevents the event log from
indexing any additional documents after the `stop` signal is received,
but I received a bulk indexing error `There are no living connections`
even thought Elasticsearch was up and running so it seems that some of
the core functionality that the event log uses are gone at this point.

## To Verify

1. Add a log indicating that polling is occuring

```
--- a/x-pack/plugins/task_manager/server/polling/task_poller.ts
+++ b/x-pack/plugins/task_manager/server/polling/task_poller.ts
@@ -61,6 +61,7 @@ export function createTaskPoller<T, H>({
   const subject = new Subject<Result<H, PollingError<T>>>();

   async function runCycle() {
+    console.log('polling');
     timeoutId = null;
     const start = Date.now();
     try {
```

2. Start ES and Kibana. Use `ps aux` to determine Kibana's PID
3. Send a sigterm signal to Kibana: `kill -TERM <kibana_pid>`. Task
manager should log `Stopping the task poller` and you should no longer
see the console logs indicating that TM is polling

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
ymao1 and elasticmachine authored Oct 14, 2024
1 parent 96eff23 commit 674027d
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 9 deletions.
34 changes: 34 additions & 0 deletions x-pack/plugins/task_manager/server/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,40 @@ describe('TaskManagerPlugin', () => {
});

describe('stop', () => {
test('should stop task polling lifecycle if it is defined', async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext<TaskManagerConfig>(
pluginInitializerContextParams
);
pluginInitializerContext.node.roles.backgroundTasks = true;
const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext);
taskManagerPlugin.setup(coreMock.createSetup(), { usageCollection: undefined });
taskManagerPlugin.start(coreStart, {
cloud: cloudMock.createStart(),
});

expect(TaskPollingLifecycle as jest.Mock<TaskPollingLifecycleClass>).toHaveBeenCalledTimes(1);

await taskManagerPlugin.stop();

expect(mockTaskPollingLifecycle.stop).toHaveBeenCalled();
});

test('should not call stop task polling lifecycle if it is not defined', async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext<TaskManagerConfig>(
pluginInitializerContextParams
);
pluginInitializerContext.node.roles.backgroundTasks = false;
const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext);
taskManagerPlugin.setup(coreMock.createSetup(), { usageCollection: undefined });
taskManagerPlugin.start(coreStart, {
cloud: cloudMock.createStart(),
});

await taskManagerPlugin.stop();

expect(mockTaskPollingLifecycle.stop).not.toHaveBeenCalled();
});

test('should remove the current from discovery service', async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext<TaskManagerConfig>(
pluginInitializerContextParams
Expand Down
5 changes: 5 additions & 0 deletions x-pack/plugins/task_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,11 @@ export class TaskManagerPlugin
}

public async stop() {
// Stop polling for tasks
if (this.taskPollingLifecycle) {
this.taskPollingLifecycle.stop();
}

if (this.kibanaDiscoveryService?.isStarted()) {
this.kibanaDiscoveryService.stop();
try {
Expand Down
12 changes: 7 additions & 5 deletions x-pack/plugins/task_manager/server/polling/task_poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ interface Opts<H> {
work: WorkFn<H>;
}

export interface TaskPoller<T, H> {
start: () => void;
stop: () => void;
events$: Observable<Result<H, PollingError<T>>>;
}

/**
* constructs a new TaskPoller stream, which emits events on demand and on a scheduled interval, waiting for capacity to be available before emitting more events.
*
Expand All @@ -45,11 +51,7 @@ export function createTaskPoller<T, H>({
pollIntervalDelay$,
getCapacity,
work,
}: Opts<H>): {
start: () => void;
stop: () => void;
events$: Observable<Result<H, PollingError<T>>>;
} {
}: Opts<H>): TaskPoller<T, H> {
const hasCapacity = () => getCapacity() > 0;
let running: boolean = false;
let timeoutId: NodeJS.Timeout | null = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export const taskPollingLifecycleMock = {
create(opts: { isStarted?: boolean; events$?: Observable<TaskLifecycleEvent> }) {
return {
attemptToRun: jest.fn(),
stop: jest.fn(),
get isStarted() {
return opts.isStarted ?? true;
},
Expand Down
23 changes: 23 additions & 0 deletions x-pack/plugins/task_manager/server/polling_lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,29 @@ describe('TaskPollingLifecycle', () => {
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).not.toHaveBeenCalled();
});

test('stops polling if stop() is called', () => {
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
const pollingLifecycle = new TaskPollingLifecycle({
elasticsearchAndSOAvailability$,
...taskManagerOpts,
config: {
...taskManagerOpts.config,
poll_interval: 100,
},
});

expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalledTimes(0);
elasticsearchAndSOAvailability$.next(true);

clock.tick(50);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalledTimes(1);

pollingLifecycle.stop();

clock.tick(100);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalledTimes(1);
});

test('restarts polling once the ES and SavedObjects services become available again', () => {
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
new TaskPollingLifecycle({
Expand Down
16 changes: 12 additions & 4 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import { delayOnClaimConflicts } from './polling';
import { TaskClaiming } from './queries/task_claiming';
import { ClaimOwnershipResult } from './task_claimers';
import { TaskPartitioner } from './lib/task_partitioner';
import { TaskPoller } from './polling/task_poller';

const MAX_BUFFER_OPERATIONS = 100;

Expand Down Expand Up @@ -86,7 +87,10 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
private readonly executionContext: ExecutionContextStart;

private logger: Logger;
private poller: TaskPoller<string, TimedFillPoolResult>;

public pool: TaskPool;

// all task related events (task claimed, task marked as running, etc.) are emitted through events$
private events$ = new Subject<TaskLifecycleEvent>();

Expand Down Expand Up @@ -170,7 +174,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
).pipe(tap((delay) => emitEvent(asTaskManagerStatEvent('pollingDelay', asOk(delay)))));
}

const poller = createTaskPoller<string, TimedFillPoolResult>({
this.poller = createTaskPoller<string, TimedFillPoolResult>({
logger,
initialPollInterval: pollInterval,
pollInterval$: pollIntervalConfiguration$,
Expand All @@ -192,17 +196,17 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
work: this.pollForWork,
});

this.subscribeToPoller(poller.events$);
this.subscribeToPoller(this.poller.events$);

elasticsearchAndSOAvailability$.subscribe((areESAndSOAvailable) => {
if (areESAndSOAvailable) {
// start polling for work
poller.start();
this.poller.start();
} else if (!areESAndSOAvailable) {
this.logger.info(
`Stopping the task poller because Elasticsearch and/or saved-objects service became unavailable`
);
poller.stop();
this.poller.stop();
this.pool.cancelRunningTasks();
}
});
Expand All @@ -212,6 +216,10 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
return this.events$;
}

public stop() {
this.poller.stop();
}

private emitEvent = (event: TaskLifecycleEvent) => {
this.events$.next(event);
};
Expand Down

0 comments on commit 674027d

Please sign in to comment.