From fcb5062b5aa526a25ea2e80b75e6895e36058947 Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Mon, 29 Jul 2024 14:04:18 -0400 Subject: [PATCH] fix: Make background job shutdown cleanly --- packages/base-test-utils/src/index.ts | 1 + packages/common/src/utils/abort-signal-utils.ts | 15 +++++++++++++++ packages/core/src/ceramic.ts | 9 +++------ packages/core/src/shutdown-signal.ts | 6 ++++++ 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/packages/base-test-utils/src/index.ts b/packages/base-test-utils/src/index.ts index 8a452d9ef2..41faa04f89 100644 --- a/packages/base-test-utils/src/index.ts +++ b/packages/base-test-utils/src/index.ts @@ -54,6 +54,7 @@ export class BaseTestUtils { throw new Error(baseErrMsg + ': ' + customMsg) } + // TODO: De-dupe this with `delayOrAbort` in abort-signal-utils.ts static async delay(ms: number, signal?: AbortSignal): Promise { return new Promise((resolve, reject) => { const timeout = setTimeout(() => resolve(), ms) diff --git a/packages/common/src/utils/abort-signal-utils.ts b/packages/common/src/utils/abort-signal-utils.ts index 007622d25a..304a594dfd 100644 --- a/packages/common/src/utils/abort-signal-utils.ts +++ b/packages/common/src/utils/abort-signal-utils.ts @@ -66,3 +66,18 @@ export async function abortable( original.removeEventListener('abort', onAbort) }) } + +export async function delayOrAbort(ms: number, signal?: AbortSignal): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => resolve(), ms) + if (signal) { + const handleAbort = () => { + clearTimeout(timeout) + signal.removeEventListener('abort', handleAbort) + reject(signal.reason) + } + if (signal.aborted) handleAbort() + signal.addEventListener('abort', handleAbort) + } + }) +} diff --git a/packages/core/src/ceramic.ts b/packages/core/src/ceramic.ts index 5e2f18cd36..79f3a69009 100644 --- a/packages/core/src/ceramic.ts +++ b/packages/core/src/ceramic.ts @@ -24,6 +24,7 @@ import { CeramicSigner, StreamStateLoader, StreamReaderWriter, + delayOrAbort, } from '@ceramicnetwork/common' import { DEFAULT_TRACE_SAMPLE_RATIO, @@ -683,12 +684,8 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader { let attemptNum = 0 let backoffMs = 100 const maxBackoffMs = 1000 * 60 // Caps off at checking once per minute - const delay = async function (ms) { - return new Promise((resolve) => setTimeout(() => resolve(), ms)) - } - // eslint-disable-next-line no-constant-condition - while (true) { + while (!this._shutdownSignal.isShuttingDown()) { try { await this.dispatcher.getFromIpfs(model.cid) if (attemptNum > 0) { @@ -704,7 +701,7 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader { this._logger.err(`Error loading Model ${model} used to publish Node Metrics: ${err}`) } - await delay(backoffMs) + await this._shutdownSignal.abortable((signal) => delayOrAbort(backoffMs, signal)) attemptNum++ if (backoffMs <= maxBackoffMs) { backoffMs *= 2 diff --git a/packages/core/src/shutdown-signal.ts b/packages/core/src/shutdown-signal.ts index aa9ee1ab72..f061b88b0f 100644 --- a/packages/core/src/shutdown-signal.ts +++ b/packages/core/src/shutdown-signal.ts @@ -7,6 +7,7 @@ import { Observer, Subject } from 'rxjs' */ export class ShutdownSignal { private subject: Subject = new Subject() + private shuttingDown = false /** * Subscribers to the signal. @@ -20,6 +21,11 @@ export class ShutdownSignal { */ abort(): void { this.subject.complete() + this.shuttingDown = true + } + + isShuttingDown(): boolean { + return this.shuttingDown } /**