Skip to content

Commit

Permalink
Merge pull request #49 from cloudbeds/fix/worker-termination
Browse files Browse the repository at this point in the history
fix(compileTypesAsync): improve worker termination logic
  • Loading branch information
steven-pribilinskiy authored Oct 24, 2024
2 parents 7c86483 + e4c8264 commit 13384f8
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 45 deletions.
14 changes: 2 additions & 12 deletions src/compileTypes/__tests__/compileTypesWorker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest';

import type { FederationConfig } from '../../models';
import { compileTypes } from '../compileTypes';
import type { CompileTypesWorkerMessage, ExitMessage } from '../compileTypesWorker';
import type { CompileTypesWorkerMessage } from '../compileTypesWorker';
import { rewritePathsWithExposedFederatedModules } from '../rewritePathsWithExposedFederatedModules';
import { workerLogger } from '../workerLogger';

Expand Down Expand Up @@ -34,7 +34,7 @@ describe('compileTypesWorker', () => {
const mockCompileTypes = vi.mocked(compileTypes);
const mockRewritePaths = vi.mocked(rewritePathsWithExposedFederatedModules);

let messageHandler: (message: CompileTypesWorkerMessage | ExitMessage) => void;
let messageHandler: (message: CompileTypesWorkerMessage) => void;

beforeEach(async () => {
vi.resetAllMocks();
Expand All @@ -56,16 +56,6 @@ describe('compileTypesWorker', () => {
vi.resetModules();
});

test('handles exit message', () => {
const exitSpy = vi.spyOn(process, 'exit').mockImplementation(() => undefined as never);
const exitMessage: ExitMessage = { type: 'exit' };

messageHandler(exitMessage);

expect(workerLogger.log).toHaveBeenCalledWith('Exiting by request');
expect(exitSpy).toHaveBeenCalledWith(0);
});

test('handles successful compilation and rewrite', () => {
const workerMessage: CompileTypesWorkerMessage = {
tsconfigPath: 'tsconfig.json',
Expand Down
50 changes: 28 additions & 22 deletions src/compileTypes/compileTypesAsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,68 +7,74 @@ import type {
CompileTypesWorkerResultMessage,
} from './compileTypesWorker';

let worker: Worker | null = null;
let workerIndex = 0;
const activeWorkers = new Map<number, Worker>();
let workerIndex = 1;

export function compileTypesAsync(
params: CompileTypesWorkerMessage,
loggerHint = '',
): Promise<void> {
const logger = getLogger();
workerIndex++;
const innerWorkerIndex = workerIndex;

return new Promise((resolve, reject) => {
if (worker) {
logger.log(`Terminating existing worker process #${innerWorkerIndex}`);
worker.postMessage({ type: 'exit' });
}
activeWorkers.forEach((worker, index) => {
logger.log(`Terminating existing worker process #${index}`);
worker.terminate();
});
activeWorkers.clear();

const workerPath = path.join(__dirname, 'compileTypesWorker.js');
worker = new Worker(workerPath);
const currentWorkerIndex = workerIndex++;
const worker = new Worker(path.join(__dirname, 'compileTypesWorker.js'));
activeWorkers.set(currentWorkerIndex, worker);

return new Promise((resolve, reject) => {
worker.on('message', (result: CompileTypesWorkerResultMessage) => {
switch (result.status) {
case 'log':
logger[result.level](`[Worker] run #${innerWorkerIndex}:`, result.message);
logger[result.level](`[Worker] run #${currentWorkerIndex}:`, result.message);
return;
case 'success':
resolve();
break;
case 'failure':
logger.warn(
`[Worker] run #${innerWorkerIndex}: Failed to compile types for exposed modules.`,
`[Worker] run #${currentWorkerIndex}: Failed to compile types for exposed modules.`,
loggerHint,
);
reject(new Error('Failed to compile types for exposed modules.'));
break;
case 'error':
logger.warn(
`[Worker] run #${innerWorkerIndex}: Error compiling types for exposed modules.`,
`[Worker] run #${currentWorkerIndex}: Error compiling types for exposed modules.`,
loggerHint,
);
reject(result.error);
break;
default:
logger.error(`[Worker]: Received unknown status: ${(result as Dict).status}`);
break;
}
worker?.postMessage({ type: 'exit' });
worker = null;

worker.terminate();
});

worker.on('error', error => {
logger.warn(`[Worker] run #${innerWorkerIndex}: Unexpected error.`, loggerHint);
logger.warn(`[Worker] run #${currentWorkerIndex}: Unexpected error.`, loggerHint);
logger.log(error);
reject(error);
worker?.postMessage({ type: 'exit' });
worker = null;
worker.terminate();
});

worker.on('exit', code => {
if (code === null || code === 0) {
const isActiveWorker = activeWorkers.has(currentWorkerIndex);
if (isActiveWorker) {
activeWorkers.delete(currentWorkerIndex);
}

if (!code || !isActiveWorker) {
resolve();
} else {
reject(new Error(`[Worker] run #${innerWorkerIndex}: Process exited with code ${code}`));
reject(new Error(`[Worker] run #${currentWorkerIndex}: Process exited with code ${code}`));
}
worker = null;
});

worker.postMessage(params);
Expand Down
13 changes: 2 additions & 11 deletions src/compileTypes/compileTypesWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,14 @@ export type CompileTypesWorkerMessage = CompileTypesParams & {
federationConfig: FederationConfig;
};

export type ExitMessage = {
type: 'exit';
};

export type CompileTypesWorkerResultMessage =
| { status: 'success' }
| { status: 'failure' }
| CompileTypesWorkerResultMessageError
| { status: 'log'; level: LogLevel; message: string };

parentPort?.on('message', (message: CompileTypesWorkerMessage | ExitMessage) => {
if ((message as ExitMessage).type === 'exit') {
workerLogger.log('Exiting by request');
process.exit(0);
}

const { federationConfig, ...params } = message as CompileTypesWorkerMessage;
parentPort?.on('message', (message: CompileTypesWorkerMessage) => {
const { federationConfig, ...params } = message;

try {
const startTime = performance.now();
Expand Down

0 comments on commit 13384f8

Please sign in to comment.