Skip to content

Commit

Permalink
fix: Make background job shutdown cleanly
Browse files Browse the repository at this point in the history
  • Loading branch information
stbrody committed Jul 29, 2024
1 parent fdd4738 commit fcb5062
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 6 deletions.
1 change: 1 addition & 0 deletions packages/base-test-utils/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export class BaseTestUtils {
throw new Error(baseErrMsg + ': ' + customMsg)
}

// TODO: De-dupe this with `delayOrAbort` in abort-signal-utils.ts
static async delay(ms: number, signal?: AbortSignal): Promise<void> {
return new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => resolve(), ms)
Expand Down
15 changes: 15 additions & 0 deletions packages/common/src/utils/abort-signal-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,18 @@ export async function abortable<T>(
original.removeEventListener('abort', onAbort)
})
}

export async function delayOrAbort(ms: number, signal?: AbortSignal): Promise<void> {
return new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => resolve(), ms)
if (signal) {
const handleAbort = () => {
clearTimeout(timeout)
signal.removeEventListener('abort', handleAbort)
reject(signal.reason)
}
if (signal.aborted) handleAbort()
signal.addEventListener('abort', handleAbort)
}
})
}
9 changes: 3 additions & 6 deletions packages/core/src/ceramic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
CeramicSigner,
StreamStateLoader,
StreamReaderWriter,
delayOrAbort,
} from '@ceramicnetwork/common'
import {
DEFAULT_TRACE_SAMPLE_RATIO,
Expand Down Expand Up @@ -683,12 +684,8 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
let attemptNum = 0
let backoffMs = 100
const maxBackoffMs = 1000 * 60 // Caps off at checking once per minute
const delay = async function (ms) {
return new Promise<void>((resolve) => setTimeout(() => resolve(), ms))
}

// eslint-disable-next-line no-constant-condition
while (true) {
while (!this._shutdownSignal.isShuttingDown()) {
try {
await this.dispatcher.getFromIpfs(model.cid)
if (attemptNum > 0) {
Expand All @@ -704,7 +701,7 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
this._logger.err(`Error loading Model ${model} used to publish Node Metrics: ${err}`)
}

await delay(backoffMs)
await this._shutdownSignal.abortable((signal) => delayOrAbort(backoffMs, signal))
attemptNum++
if (backoffMs <= maxBackoffMs) {
backoffMs *= 2
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/shutdown-signal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Observer, Subject } from 'rxjs'
*/
export class ShutdownSignal {
private subject: Subject<void> = new Subject()
private shuttingDown = false

/**
* Subscribers to the signal.
Expand All @@ -20,6 +21,11 @@ export class ShutdownSignal {
*/
abort(): void {
this.subject.complete()
this.shuttingDown = true
}

isShuttingDown(): boolean {
return this.shuttingDown
}

/**
Expand Down

0 comments on commit fcb5062

Please sign in to comment.