Skip to content

Commit

Permalink
Removing perf_hooks from task manager (#117294)
Browse files Browse the repository at this point in the history
  • Loading branch information
ymao1 authored Nov 3, 2021
1 parent 12c0075 commit 6cf9f8c
Show file tree
Hide file tree
Showing 4 changed files with 0 additions and 48 deletions.
16 changes: 0 additions & 16 deletions x-pack/plugins/task_manager/server/lib/fill_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
* 2.0.
*/

import { performance } from 'perf_hooks';
import { Observable } from 'rxjs';
import { concatMap, last } from 'rxjs/operators';
import { ClaimOwnershipResult } from '../queries/task_claiming';
Expand Down Expand Up @@ -57,7 +56,6 @@ export async function fillPool(
converter: (taskInstance: ConcreteTaskInstance) => TaskManagerRunner,
run: (tasks: TaskManagerRunner[]) => Promise<TaskPoolRunResult>
): Promise<TimedFillPoolResult> {
performance.mark('fillPool.start');
return new Promise((resolve, reject) => {
const stopTaskTimer = startTaskTimer();
const augmentTimingTo = (
Expand All @@ -76,12 +74,6 @@ export async function fillPool(
res,
async ({ docs, stats }) => {
if (!docs.length) {
performance.mark('fillPool.bailNoTasks');
performance.measure(
'fillPool.activityDurationUntilNoTasks',
'fillPool.start',
'fillPool.bailNoTasks'
);
return asOk({ result: TaskPoolRunResult.NoTaskWereRan, stats });
}
return asOk(
Expand All @@ -106,20 +98,12 @@ export async function fillPool(
({ result, stats }) => {
switch (result) {
case TaskPoolRunResult.RanOutOfCapacity:
performance.mark('fillPool.bailExhaustedCapacity');
performance.measure(
'fillPool.activityDurationUntilExhaustedCapacity',
'fillPool.start',
'fillPool.bailExhaustedCapacity'
);
return augmentTimingTo(FillPoolResult.RanOutOfCapacity, stats);
case TaskPoolRunResult.RunningAtCapacity:
performance.mark('fillPool.cycle');
return augmentTimingTo(FillPoolResult.RunningAtCapacity, stats);
case TaskPoolRunResult.NoTaskWereRan:
return augmentTimingTo(FillPoolResult.NoTasksClaimed, stats);
default:
performance.mark('fillPool.cycle');
return augmentTimingTo(FillPoolResult.PoolFilled, stats);
}
},
Expand Down
14 changes: 0 additions & 14 deletions x-pack/plugins/task_manager/server/polling/task_poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
* This module contains the logic for polling the task manager index for new work.
*/

import { performance } from 'perf_hooks';
import { after } from 'lodash';
import { Subject, merge, of, Observable, combineLatest, timer } from 'rxjs';
import { mapTo, filter, scan, concatMap, tap, catchError, switchMap } from 'rxjs/operators';

Expand Down Expand Up @@ -113,7 +111,6 @@ export function createTaskPoller<T, H>({
// take as many argumented calls as we have capacity for and call `work` with
// those arguments. If the queue is empty this will still trigger work to be done
concatMap(async (set: Set<T>) => {
closeSleepPerf();
return mapResult<H, Error, Result<H, PollingError<T>>>(
await promiseResult<H, Error>(
timeoutPromiseAfter<H, Error>(
Expand All @@ -126,7 +123,6 @@ export function createTaskPoller<T, H>({
(err: Error) => asPollingError<T>(err, PollingErrorType.WorkError)
);
}),
tap(openSleepPerf),
// catch errors during polling for work
catchError((err: Error) => of(asPollingError<T>(err, PollingErrorType.WorkError)))
);
Expand Down Expand Up @@ -177,13 +173,3 @@ export class PollingError<T> extends Error {
this.data = data;
}
}

const openSleepPerf = () => {
performance.mark('TaskPoller.sleep');
};
// we only want to close after an open has been called, as we're counting the time *between* work cycles
// so we'll ignore the first call to `closeSleepPerf` but we will run every subsequent call
const closeSleepPerf = after(2, () => {
performance.mark('TaskPoller.poll');
performance.measure('TaskPoller.sleepDuration', 'TaskPoller.sleep', 'TaskPoller.poll');
});
5 changes: 0 additions & 5 deletions x-pack/plugins/task_manager/server/task_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
*/
import { Observable, Subject } from 'rxjs';
import moment, { Duration } from 'moment';
import { performance } from 'perf_hooks';
import { padStart } from 'lodash';
import { Logger } from '../../../../src/core/server';
import { TaskRunner } from './task_running';
Expand Down Expand Up @@ -111,7 +110,6 @@ export class TaskPool {
public run = async (tasks: TaskRunner[]): Promise<TaskPoolRunResult> => {
const [tasksToRun, leftOverTasks] = partitionListByCount(tasks, this.availableWorkers);
if (tasksToRun.length) {
performance.mark('attemptToRun_start');
await Promise.all(
tasksToRun
.filter((taskRunner) => !this.tasksInPool.has(taskRunner.id))
Expand All @@ -130,9 +128,6 @@ export class TaskPool {
.catch((err) => this.handleFailureOfMarkAsRunning(taskRunner, err));
})
);

performance.mark('attemptToRun_stop');
performance.measure('taskPool.attemptToRun', 'attemptToRun_start', 'attemptToRun_stop');
}

if (leftOverTasks.length) {
Expand Down
13 changes: 0 additions & 13 deletions x-pack/plugins/task_manager/server/task_running/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import apm from 'elastic-apm-node';
import { withSpan } from '@kbn/apm-utils';
import { performance } from 'perf_hooks';
import { identity, defaults, flow } from 'lodash';
import {
Logger,
Expand Down Expand Up @@ -313,7 +312,6 @@ export class TaskManagerRunner implements TaskRunner {
}`
);
}
performance.mark('markTaskAsRunning_start');

const apmTrans = apm.startTransaction('taskManager', 'taskManager markTaskAsRunning');

Expand Down Expand Up @@ -372,12 +370,10 @@ export class TaskManagerRunner implements TaskRunner {
}

if (apmTrans) apmTrans.end('success');
performanceStopMarkingTaskAsRunning();
this.onTaskEvent(asTaskMarkRunningEvent(this.id, asOk(this.instance.task)));
return true;
} catch (error) {
if (apmTrans) apmTrans.end('failure');
performanceStopMarkingTaskAsRunning();
this.onTaskEvent(asTaskMarkRunningEvent(this.id, asErr(error)));
if (!SavedObjectsErrorHelpers.isConflictError(error)) {
if (!SavedObjectsErrorHelpers.isNotFoundError(error)) {
Expand Down Expand Up @@ -617,15 +613,6 @@ function howManyMsUntilOwnershipClaimExpires(ownershipClaimedUntil: Date | null)
return ownershipClaimedUntil ? ownershipClaimedUntil.getTime() - Date.now() : 0;
}

function performanceStopMarkingTaskAsRunning() {
performance.mark('markTaskAsRunning_stop');
performance.measure(
'taskRunner.markTaskAsRunning',
'markTaskAsRunning_start',
'markTaskAsRunning_stop'
);
}

// A type that extracts the Instance type out of TaskRunningStage
// This helps us to better communicate to the developer what the expected "stage"
// in a specific place in the code might be
Expand Down

0 comments on commit 6cf9f8c

Please sign in to comment.