diff --git a/.changeset/dirty-dragons-decide.md b/.changeset/dirty-dragons-decide.md new file mode 100644 index 0000000000..1aed35c4ef --- /dev/null +++ b/.changeset/dirty-dragons-decide.md @@ -0,0 +1,8 @@ +--- +"@effect/platform-browser": minor +"@effect/platform-node": minor +"@effect/platform-bun": minor +"@effect/platform": minor +--- + +lift worker shutdown to /platform implementation diff --git a/packages/platform-browser/src/internal/workerRunner.ts b/packages/platform-browser/src/internal/workerRunner.ts index 0df68ae78a..0317df8346 100644 --- a/packages/platform-browser/src/internal/workerRunner.ts +++ b/packages/platform-browser/src/internal/workerRunner.ts @@ -10,7 +10,7 @@ import type { WorkerRunner } from "../index.js" const platformRunnerImpl = Runner.PlatformRunner.of({ [Runner.PlatformRunnerTypeId]: Runner.PlatformRunnerTypeId, - start() { + start(shutdown: Effect.Effect) { return Effect.gen(function*(_) { const port = "postMessage" in self ? self : @@ -22,15 +22,14 @@ const platformRunnerImpl = Runner.PlatformRunner.of({ }, { once: true, signal }) }))) const queue = yield* _(Queue.unbounded()) - const parentId = yield* _(Effect.fiberId) - const fiber = yield* _( + yield* _( Effect.async((resume) => { function onMessage(event: MessageEvent) { const message = (event as MessageEvent).data as Runner.BackingRunner.Message if (message[0] === 0) { queue.unsafeOffer(message[1]) } else { - fiber.unsafeInterruptAsFork(parentId) + Effect.runFork(shutdown) } } function onMessageError(error: ErrorEvent) { diff --git a/packages/platform-bun/src/internal/workerRunner.ts b/packages/platform-bun/src/internal/workerRunner.ts index 8688e12c1c..0a08398476 100644 --- a/packages/platform-bun/src/internal/workerRunner.ts +++ b/packages/platform-bun/src/internal/workerRunner.ts @@ -3,9 +3,7 @@ import * as Runner from "@effect/platform/WorkerRunner" import type * as Schema from "@effect/schema/Schema" import * as Cause from "effect/Cause" import * as Effect from "effect/Effect" -import * as Fiber from "effect/Fiber" import * as Layer from "effect/Layer" -import * as Option from "effect/Option" import * as Queue from "effect/Queue" import type * as Stream from "effect/Stream" @@ -13,14 +11,13 @@ declare const self: Worker const platformRunnerImpl = Runner.PlatformRunner.of({ [Runner.PlatformRunnerTypeId]: Runner.PlatformRunnerTypeId, - start() { + start(shutdown: Effect.Effect) { return Effect.gen(function*(_) { if (!("postMessage" in self)) { return yield* _(Effect.die("not in a worker")) } const port = self const queue = yield* _(Queue.unbounded()) - const parent = Option.getOrThrow(Fiber.getCurrentFiber()) yield* _( Effect.async((resume) => { function onMessage(event: MessageEvent) { @@ -28,7 +25,7 @@ const platformRunnerImpl = Runner.PlatformRunner.of({ if (message[0] === 0) { queue.unsafeOffer(message[1]) } else { - parent.unsafeInterruptAsFork(parent.id()) + Effect.runFork(shutdown) } } function onError(error: ErrorEvent) { diff --git a/packages/platform-node/src/internal/workerRunner.ts b/packages/platform-node/src/internal/workerRunner.ts index 6e1ebb0e13..3674256f34 100644 --- a/packages/platform-node/src/internal/workerRunner.ts +++ b/packages/platform-node/src/internal/workerRunner.ts @@ -3,30 +3,27 @@ import * as Runner from "@effect/platform/WorkerRunner" import type * as Schema from "@effect/schema/Schema" import * as Cause from "effect/Cause" import * as Effect from "effect/Effect" -import * as Fiber from "effect/Fiber" import * as Layer from "effect/Layer" -import * as Option from "effect/Option" import * as Queue from "effect/Queue" import type * as Stream from "effect/Stream" import * as WorkerThreads from "node:worker_threads" const platformRunnerImpl = Runner.PlatformRunner.of({ [Runner.PlatformRunnerTypeId]: Runner.PlatformRunnerTypeId, - start() { + start(shutdown: Effect.Effect) { return Effect.gen(function*(_) { if (!WorkerThreads.parentPort) { return yield* _(Effect.fail(WorkerError("spawn", "not in worker"))) } const port = WorkerThreads.parentPort const queue = yield* _(Queue.unbounded()) - const parent = Option.getOrThrow(Fiber.getCurrentFiber()) yield* _( Effect.async((resume) => { port.on("message", (message: Runner.BackingRunner.Message) => { if (message[0] === 0) { queue.unsafeOffer(message[1]) } else { - parent.unsafeInterruptAsFork(parent.id()) + Effect.runFork(shutdown) } }) port.on("messageerror", (error) => { diff --git a/packages/platform/src/WorkerRunner.ts b/packages/platform/src/WorkerRunner.ts index 99c5d082be..98a82e8bf5 100644 --- a/packages/platform/src/WorkerRunner.ts +++ b/packages/platform/src/WorkerRunner.ts @@ -51,7 +51,9 @@ export type PlatformRunnerTypeId = typeof PlatformRunnerTypeId */ export interface PlatformRunner { readonly [PlatformRunnerTypeId]: PlatformRunnerTypeId - readonly start: () => Effect.Effect> + readonly start: ( + shutdown: Effect.Effect + ) => Effect.Effect> } /** diff --git a/packages/platform/src/internal/workerRunner.ts b/packages/platform/src/internal/workerRunner.ts index 0c8152c5fb..1dce016b69 100644 --- a/packages/platform/src/internal/workerRunner.ts +++ b/packages/platform/src/internal/workerRunner.ts @@ -5,11 +5,14 @@ import * as Chunk from "effect/Chunk" import * as Context from "effect/Context" import * as Effect from "effect/Effect" import * as Either from "effect/Either" +import * as ExecutionStrategy from "effect/ExecutionStrategy" +import * as Exit from "effect/Exit" import * as Fiber from "effect/Fiber" import { identity, pipe } from "effect/Function" import * as Layer from "effect/Layer" +import * as Option from "effect/Option" import * as Queue from "effect/Queue" -import type * as Scope from "effect/Scope" +import * as Scope from "effect/Scope" import * as Stream from "effect/Stream" import * as Transferable from "../Transferable.js" import type * as Worker from "../Worker.js" @@ -32,8 +35,17 @@ export const make = ( options?: WorkerRunner.Runner.Options ) => Effect.gen(function*(_) { + const scope = yield* _(Scope.fork(yield* _(Effect.scope), ExecutionStrategy.parallel)) + const fiber = Option.getOrThrow(Fiber.getCurrentFiber()) + const shutdown = Effect.zipRight( + Scope.close(scope, Exit.unit), + Fiber.interruptFork(fiber) + ) const platform = yield* _(PlatformRunner) - const backing = yield* _(platform.start, Worker.Worker.Response>()) + const backing = yield* _( + platform.start, Worker.Worker.Response>(shutdown), + Scope.extend(scope) + ) const fiberMap = new Map>() yield* _( @@ -138,7 +150,7 @@ export const make = ( ) }), Effect.forever, - Effect.forkScoped + Effect.forkIn(scope) ) }) diff --git a/packages/rpc-workers/test/e2e.test.ts b/packages/rpc-workers/test/e2e.test.ts index 4fbfd164d4..67f26581ac 100644 --- a/packages/rpc-workers/test/e2e.test.ts +++ b/packages/rpc-workers/test/e2e.test.ts @@ -77,12 +77,10 @@ describe("e2e", () => { ).rejects.toEqual(new Error("boom")) }) - it.skip("setup", async () => { + it("setup", async () => { const channel = new MessageChannel() const closedPromise = new Promise((resolve) => { channel.port1.onmessage = (e) => { - console.log(e) - resolve(e.data) } }) @@ -107,4 +105,4 @@ describe("e2e", () => { Effect.provide(SharedPoolLive), runPromise )) -}, 10000) +}, 30000) diff --git a/packages/rpc-workers/test/e2e/worker-setup.ts b/packages/rpc-workers/test/e2e/worker-setup.ts index 61e727e601..a42405fa06 100644 --- a/packages/rpc-workers/test/e2e/worker-setup.ts +++ b/packages/rpc-workers/test/e2e/worker-setup.ts @@ -11,7 +11,6 @@ const router = Router.make(schemaWithSetup, { Layer.scoped( Name, Effect.gen(function*(_) { - console.log("setup", port) yield* _( Effect.addFinalizer(() => Effect.sync(() => port.postMessage("closed"))) )