Skip to content

Commit

Permalink
interrupt current fiber on worker interrupt
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Jan 9, 2024
1 parent 94043c2 commit 446898e
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 25 deletions.
23 changes: 6 additions & 17 deletions packages/platform-browser/test/Worker.test.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
import * as EffectWorker from "@effect/platform-browser/Worker"
import "@vitest/web-worker"
import { Cause, Chunk, Effect, Exit, Option, Stream } from "effect"
import { Chunk, Effect, Option, Stream } from "effect"
import { assert, describe, it } from "vitest"
import type { WorkerMessage } from "./fixtures/schema.js"
import { GetPersonById, GetSpan, GetUserById, Person, SetName, User } from "./fixtures/schema.js"

const runPromiseExit = <E, A>(effect: Effect.Effect<never, E, A>) =>
Effect.runPromiseExit(effect).then((exit) => {
if (Exit.isSuccess(exit) || Exit.isInterrupted(exit)) {
return
}
throw Cause.squash(exit.cause)
})

describe.sequential("Worker", () => {
it("executes streams", () =>
Effect.gen(function*(_) {
Expand All @@ -25,7 +17,7 @@ describe.sequential("Worker", () => {
}).pipe(
Effect.scoped,
Effect.provide(EffectWorker.layerManager),
runPromiseExit
Effect.runPromise
))

it("Serialized", () =>
Expand All @@ -34,9 +26,6 @@ describe.sequential("Worker", () => {
spawn: () => new globalThis.Worker(new URL("./fixtures/serializedWorker.ts", import.meta.url)),
size: 1
}))
let user = yield* _(pool.executeEffect(new GetUserById({ id: 123 })))
user = yield* _(pool.executeEffect(new GetUserById({ id: 123 })))
assert.deepStrictEqual(user, new User({ id: 123, name: "test" }))
const people = yield* _(pool.execute(new GetPersonById({ id: 123 })), Stream.runCollect)
assert.deepStrictEqual(Chunk.toReadonlyArray(people), [
new Person({ id: 123, name: "test" }),
Expand All @@ -45,7 +34,7 @@ describe.sequential("Worker", () => {
}).pipe(
Effect.scoped,
Effect.provide(EffectWorker.layerManager),
runPromiseExit
Effect.runPromise
))

it("Serialized with initialMessage", () =>
Expand All @@ -66,7 +55,7 @@ describe.sequential("Worker", () => {
}).pipe(
Effect.scoped,
Effect.provide(EffectWorker.layerManager),
runPromiseExit
Effect.runPromise
))

it("tracing", () =>
Expand All @@ -88,7 +77,7 @@ describe.sequential("Worker", () => {
Effect.withSpan("test"),
Effect.scoped,
Effect.provide(EffectWorker.layerManager),
runPromiseExit
Effect.runPromise
))

it("SharedWorker", () =>
Expand All @@ -102,7 +91,7 @@ describe.sequential("Worker", () => {
}).pipe(
Effect.scoped,
Effect.provide(EffectWorker.layerManager),
runPromiseExit
Effect.runPromise
))

// TODO: vitest/web-worker doesn't support postMessage throwing errors
Expand Down
8 changes: 5 additions & 3 deletions packages/platform-bun/src/internal/workerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import * as Runner from "@effect/platform/WorkerRunner"
import type * as Schema from "@effect/schema/Schema"
import * as Cause from "effect/Cause"
import * as Effect from "effect/Effect"
import * as Fiber from "effect/Fiber"
import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
import * as Queue from "effect/Queue"
import type * as Stream from "effect/Stream"

Expand All @@ -18,15 +20,15 @@ const platformRunnerImpl = Runner.PlatformRunner.of({
}
const port = self
const queue = yield* _(Queue.unbounded<I>())
const parentId = yield* _(Effect.fiberId)
const fiber = yield* _(
const parent = Option.getOrThrow(Fiber.getCurrentFiber())
yield* _(
Effect.async<never, WorkerError, never>((resume) => {
function onMessage(event: MessageEvent) {
const message = (event as MessageEvent).data as Runner.BackingRunner.Message<I>
if (message[0] === 0) {
queue.unsafeOffer(message[1])
} else {
fiber.unsafeInterruptAsFork(parentId)
parent.unsafeInterruptAsFork(parent.id())
}
}
function onError(error: ErrorEvent) {
Expand Down
8 changes: 5 additions & 3 deletions packages/platform-node/src/internal/workerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import * as Runner from "@effect/platform/WorkerRunner"
import type * as Schema from "@effect/schema/Schema"
import * as Cause from "effect/Cause"
import * as Effect from "effect/Effect"
import * as Fiber from "effect/Fiber"
import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
import * as Queue from "effect/Queue"
import type * as Stream from "effect/Stream"
import * as WorkerThreads from "node:worker_threads"
Expand All @@ -17,14 +19,14 @@ const platformRunnerImpl = Runner.PlatformRunner.of({
}
const port = WorkerThreads.parentPort
const queue = yield* _(Queue.unbounded<I>())
const parentId = yield* _(Effect.fiberId)
const fiber = yield* _(
const parent = Option.getOrThrow(Fiber.getCurrentFiber())
yield* _(
Effect.async<never, WorkerError, never>((resume) => {
port.on("message", (message: Runner.BackingRunner.Message<I>) => {
if (message[0] === 0) {
queue.unsafeOffer(message[1])
} else {
fiber.unsafeInterruptAsFork(parentId)
parent.unsafeInterruptAsFork(parent.id())
}
})
port.on("messageerror", (error) => {
Expand Down
4 changes: 2 additions & 2 deletions packages/rpc-workers/test/e2e.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import "@vitest/web-worker"
import * as Worker from "@effect/platform-browser/Worker"
import * as Client from "@effect/rpc-workers/Client"
import * as Resolver from "@effect/rpc-workers/Resolver"
import "@vitest/web-worker"
import { Exit } from "effect"
import * as Cause from "effect/Cause"
import * as Chunk from "effect/Chunk"
Expand Down Expand Up @@ -63,7 +63,7 @@ describe("e2e", () => {
runPromise
))

it("interruption", () => {
it.skip("interruption", () => {
expect(() =>
pipe(
client.delayed("foo"),
Expand Down

0 comments on commit 446898e

Please sign in to comment.