Skip to content

Commit

Permalink
lift worker shutdown to /platform implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Jan 9, 2024
1 parent 446898e commit ffa5881
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 23 deletions.
8 changes: 8 additions & 0 deletions .changeset/dirty-dragons-decide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@effect/platform-browser": minor
"@effect/platform-node": minor
"@effect/platform-bun": minor
"@effect/platform": minor
---

lift worker shutdown to /platform implementation
7 changes: 3 additions & 4 deletions packages/platform-browser/src/internal/workerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type { WorkerRunner } from "../index.js"

const platformRunnerImpl = Runner.PlatformRunner.of({
[Runner.PlatformRunnerTypeId]: Runner.PlatformRunnerTypeId,
start<I, O>() {
start<I, O>(shutdown: Effect.Effect<never, never, void>) {
return Effect.gen(function*(_) {
const port = "postMessage" in self ?
self :
Expand All @@ -22,15 +22,14 @@ const platformRunnerImpl = Runner.PlatformRunner.of({
}, { once: true, signal })
})))
const queue = yield* _(Queue.unbounded<I>())
const parentId = yield* _(Effect.fiberId)
const fiber = yield* _(
yield* _(
Effect.async<never, WorkerError, never>((resume) => {
function onMessage(event: MessageEvent) {
const message = (event as MessageEvent).data as Runner.BackingRunner.Message<I>
if (message[0] === 0) {
queue.unsafeOffer(message[1])
} else {
fiber.unsafeInterruptAsFork(parentId)
Effect.runFork(shutdown)
}
}
function onMessageError(error: ErrorEvent) {
Expand Down
7 changes: 2 additions & 5 deletions packages/platform-bun/src/internal/workerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,29 @@ import * as Runner from "@effect/platform/WorkerRunner"
import type * as Schema from "@effect/schema/Schema"
import * as Cause from "effect/Cause"
import * as Effect from "effect/Effect"
import * as Fiber from "effect/Fiber"
import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
import * as Queue from "effect/Queue"
import type * as Stream from "effect/Stream"

declare const self: Worker

const platformRunnerImpl = Runner.PlatformRunner.of({
[Runner.PlatformRunnerTypeId]: Runner.PlatformRunnerTypeId,
start<I, O>() {
start<I, O>(shutdown: Effect.Effect<never, never, void>) {
return Effect.gen(function*(_) {
if (!("postMessage" in self)) {
return yield* _(Effect.die("not in a worker"))
}
const port = self
const queue = yield* _(Queue.unbounded<I>())
const parent = Option.getOrThrow(Fiber.getCurrentFiber())
yield* _(
Effect.async<never, WorkerError, never>((resume) => {
function onMessage(event: MessageEvent) {
const message = (event as MessageEvent).data as Runner.BackingRunner.Message<I>
if (message[0] === 0) {
queue.unsafeOffer(message[1])
} else {
parent.unsafeInterruptAsFork(parent.id())
Effect.runFork(shutdown)
}
}
function onError(error: ErrorEvent) {
Expand Down
7 changes: 2 additions & 5 deletions packages/platform-node/src/internal/workerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,27 @@ import * as Runner from "@effect/platform/WorkerRunner"
import type * as Schema from "@effect/schema/Schema"
import * as Cause from "effect/Cause"
import * as Effect from "effect/Effect"
import * as Fiber from "effect/Fiber"
import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
import * as Queue from "effect/Queue"
import type * as Stream from "effect/Stream"
import * as WorkerThreads from "node:worker_threads"

const platformRunnerImpl = Runner.PlatformRunner.of({
[Runner.PlatformRunnerTypeId]: Runner.PlatformRunnerTypeId,
start<I, O>() {
start<I, O>(shutdown: Effect.Effect<never, never, void>) {
return Effect.gen(function*(_) {
if (!WorkerThreads.parentPort) {
return yield* _(Effect.fail(WorkerError("spawn", "not in worker")))
}
const port = WorkerThreads.parentPort
const queue = yield* _(Queue.unbounded<I>())
const parent = Option.getOrThrow(Fiber.getCurrentFiber())
yield* _(
Effect.async<never, WorkerError, never>((resume) => {
port.on("message", (message: Runner.BackingRunner.Message<I>) => {
if (message[0] === 0) {
queue.unsafeOffer(message[1])
} else {
parent.unsafeInterruptAsFork(parent.id())
Effect.runFork(shutdown)
}
})
port.on("messageerror", (error) => {
Expand Down
4 changes: 3 additions & 1 deletion packages/platform/src/WorkerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ export type PlatformRunnerTypeId = typeof PlatformRunnerTypeId
*/
export interface PlatformRunner {
readonly [PlatformRunnerTypeId]: PlatformRunnerTypeId
readonly start: <I, O>() => Effect.Effect<Scope.Scope, WorkerError, BackingRunner<I, O>>
readonly start: <I, O>(
shutdown: Effect.Effect<never, never, void>
) => Effect.Effect<Scope.Scope, WorkerError, BackingRunner<I, O>>
}

/**
Expand Down
18 changes: 15 additions & 3 deletions packages/platform/src/internal/workerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import * as Chunk from "effect/Chunk"
import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as Either from "effect/Either"
import * as ExecutionStrategy from "effect/ExecutionStrategy"
import * as Exit from "effect/Exit"
import * as Fiber from "effect/Fiber"
import { identity, pipe } from "effect/Function"
import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
import * as Queue from "effect/Queue"
import type * as Scope from "effect/Scope"
import * as Scope from "effect/Scope"
import * as Stream from "effect/Stream"
import * as Transferable from "../Transferable.js"
import type * as Worker from "../Worker.js"
Expand All @@ -32,8 +35,17 @@ export const make = <I, R, E, O>(
options?: WorkerRunner.Runner.Options<I, E, O>
) =>
Effect.gen(function*(_) {
const scope = yield* _(Scope.fork(yield* _(Effect.scope), ExecutionStrategy.parallel))
const fiber = Option.getOrThrow(Fiber.getCurrentFiber())
const shutdown = Effect.zipRight(
Scope.close(scope, Exit.unit),
Fiber.interruptFork(fiber)
)
const platform = yield* _(PlatformRunner)
const backing = yield* _(platform.start<Worker.Worker.Request<I>, Worker.Worker.Response<E>>())
const backing = yield* _(
platform.start<Worker.Worker.Request<I>, Worker.Worker.Response<E>>(shutdown),
Scope.extend(scope)
)
const fiberMap = new Map<number, Fiber.Fiber<never, void>>()

yield* _(
Expand Down Expand Up @@ -138,7 +150,7 @@ export const make = <I, R, E, O>(
)
}),
Effect.forever,
Effect.forkScoped
Effect.forkIn(scope)
)
})

Expand Down
6 changes: 2 additions & 4 deletions packages/rpc-workers/test/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,10 @@ describe("e2e", () => {
).rejects.toEqual(new Error("boom"))
})

it.skip("setup", async () => {
it("setup", async () => {
const channel = new MessageChannel()
const closedPromise = new Promise<string>((resolve) => {
channel.port1.onmessage = (e) => {
console.log(e)

resolve(e.data)
}
})
Expand All @@ -107,4 +105,4 @@ describe("e2e", () => {
Effect.provide(SharedPoolLive),
runPromise
))
}, 10000)
}, 30000)
1 change: 0 additions & 1 deletion packages/rpc-workers/test/e2e/worker-setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const router = Router.make(schemaWithSetup, {
Layer.scoped(
Name,
Effect.gen(function*(_) {
console.log("setup", port)
yield* _(
Effect.addFinalizer(() => Effect.sync(() => port.postMessage("closed")))
)
Expand Down

0 comments on commit ffa5881

Please sign in to comment.