Skip to content

Commit

Permalink
add FiberMap/FiberSet.join api (#2244)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored Mar 4, 2024
1 parent 5f7925b commit e3ff789
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 10 deletions.
21 changes: 21 additions & 0 deletions .changeset/healthy-turkeys-serve.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
"effect": patch
---

add FiberMap/FiberSet.join api

This api can be used to propogate failures back to a parent fiber, in case any of the fibers added to the FiberMap/FiberSet fail with an error.

Example:

```ts
import { Effect, FiberSet } from "effect";

Effect.gen(function* (_) {
const set = yield* _(FiberSet.make());
yield* _(FiberSet.add(set, Effect.runFork(Effect.fail("error"))));

// parent fiber will fail with "error"
yield* _(FiberSet.join(set));
});
```
50 changes: 44 additions & 6 deletions packages/effect/src/FiberMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import * as Effect from "effect/Effect"
import type * as Scope from "effect/Scope"
import type { NoSuchElementException } from "./Cause.js"
import * as Cause from "./Cause.js"
import * as Deferred from "./Deferred.js"
import * as Exit from "./Exit.js"
import * as Fiber from "./Fiber.js"
import * as FiberId from "./FiberId.js"
import { dual } from "./Function.js"
Expand All @@ -30,11 +33,12 @@ export type TypeId = typeof TypeId
* @since 2.0.0
* @categories models
*/
export interface FiberMap<K, A = unknown, E = unknown>
export interface FiberMap<in out K, out A = unknown, out E = unknown>
extends Pipeable, Inspectable.Inspectable, Iterable<[K, Fiber.RuntimeFiber<A, E>]>
{
readonly [TypeId]: TypeId
readonly backing: MutableHashMap.MutableHashMap<K, Fiber.RuntimeFiber<A, E>>
readonly deferred: Deferred.Deferred<never, unknown>
}

/**
Expand Down Expand Up @@ -65,9 +69,13 @@ const Proto = {
}
}

const unsafeMake = <K, A = unknown, E = unknown>(): FiberMap<K, A, E> => {
const unsafeMake = <K, A = unknown, E = unknown>(
backing: MutableHashMap.MutableHashMap<K, Fiber.RuntimeFiber<A, E>>,
deferred: Deferred.Deferred<never, E>
): FiberMap<K, A, E> => {
const self = Object.create(Proto)
self.backing = MutableHashMap.empty()
self.backing = backing
self.deferred = deferred
return self
}

Expand Down Expand Up @@ -97,7 +105,14 @@ const unsafeMake = <K, A = unknown, E = unknown>(): FiberMap<K, A, E> => {
* @categories constructors
*/
export const make = <K, A = unknown, E = unknown>(): Effect.Effect<FiberMap<K, A, E>, never, Scope.Scope> =>
Effect.acquireRelease(Effect.sync(() => unsafeMake<K, A, E>()), clear)
Effect.acquireRelease(
Effect.map(Deferred.make<never, E>(), (deferred) =>
unsafeMake<K, A, E>(
MutableHashMap.empty(),
deferred
)),
clear
)

/**
* Create an Effect run function that is backed by a FiberMap.
Expand Down Expand Up @@ -159,11 +174,14 @@ export const unsafeSet: {
previous.value.unsafeInterruptAsFork(interruptAs ?? FiberId.none)
}
MutableHashMap.set(self.backing, key, fiber)
fiber.addObserver((_) => {
fiber.addObserver((exit) => {
const current = MutableHashMap.get(self.backing, key)
if (Option.isSome(current) && fiber === current.value) {
MutableHashMap.remove(self.backing, key)
}
if (Exit.isFailure(exit) && !Cause.isInterruptedOnly(exit.cause)) {
Deferred.unsafeDone(self.deferred, exit as any)
}
})
})

Expand Down Expand Up @@ -372,5 +390,25 @@ export const runtime: <K, A, E>(
* @since 2.0.0
* @categories combinators
*/
export const size = <K, E, A>(self: FiberMap<K, E, A>): Effect.Effect<number> =>
export const size = <K, A, E>(self: FiberMap<K, A, E>): Effect.Effect<number> =>
Effect.sync(() => MutableHashMap.size(self.backing))

/**
* Join all fibers in the FiberMap. If any of the Fiber's in the map terminate with a failure,
* the returned Effect will terminate with the first failure that occurred.
*
* @since 2.0.0
* @categories combinators
* @example
* import { Effect, FiberMap } from "effect";
*
* Effect.gen(function* (_) {
* const map = yield* _(FiberMap.make());
* yield* _(FiberMap.set(map, "a", Effect.runFork(Effect.fail("error"))));
*
* // parent fiber will fail with "error"
* yield* _(FiberMap.join(map));
* });
*/
export const join = <K, A, E>(self: FiberMap<K, A, E>): Effect.Effect<never, E> =>
Deferred.await(self.deferred as Deferred.Deferred<never, E>)
42 changes: 38 additions & 4 deletions packages/effect/src/FiberSet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
*/
import * as Effect from "effect/Effect"
import type * as Scope from "effect/Scope"
import * as Cause from "./Cause.js"
import * as Deferred from "./Deferred.js"
import * as Exit from "./Exit.js"
import * as Fiber from "./Fiber.js"
import { dual } from "./Function.js"
import * as Inspectable from "./Inspectable.js"
Expand Down Expand Up @@ -31,6 +34,7 @@ export interface FiberSet<out A = unknown, out E = unknown>
{
readonly [TypeId]: TypeId
readonly backing: Set<Fiber.RuntimeFiber<A, E>>
readonly deferred: Deferred.Deferred<never, unknown>
}

/**
Expand Down Expand Up @@ -61,9 +65,13 @@ const Proto = {
}
}

const unsafeMake = <A, E>(): FiberSet<A, E> => {
const unsafeMake = <A, E>(
backing: Set<Fiber.RuntimeFiber<A, E>>,
deferred: Deferred.Deferred<never, unknown>
): FiberSet<A, E> => {
const self = Object.create(Proto)
self.backing = new Set()
self.backing = backing
self.deferred = deferred
return self
}

Expand Down Expand Up @@ -93,7 +101,10 @@ const unsafeMake = <A, E>(): FiberSet<A, E> => {
* @categories constructors
*/
export const make = <A = unknown, E = unknown>(): Effect.Effect<FiberSet<A, E>, never, Scope.Scope> =>
Effect.acquireRelease(Effect.sync(() => unsafeMake<A, E>()), clear)
Effect.acquireRelease(
Effect.map(Deferred.make<never, unknown>(), (deferred) => unsafeMake(new Set(), deferred)),
clear
)

/**
* Create an Effect run function that is backed by a FiberSet.
Expand Down Expand Up @@ -136,8 +147,11 @@ export const unsafeAdd: {
return
}
self.backing.add(fiber)
fiber.addObserver((_) => {
fiber.addObserver((exit) => {
self.backing.delete(fiber)
if (Exit.isFailure(exit) && !Cause.isInterruptedOnly(exit.cause)) {
Deferred.unsafeDone(self.deferred, exit as any)
}
})
})

Expand Down Expand Up @@ -264,3 +278,23 @@ export const runtime: <A, E>(
* @categories combinators
*/
export const size = <A, E>(self: FiberSet<A, E>): Effect.Effect<number> => Effect.sync(() => self.backing.size)

/**
* Join all fibers in the FiberSet. If any of the Fiber's in the set terminate with a failure,
* the returned Effect will terminate with the first failure that occurred.
*
* @since 2.0.0
* @categories combinators
* @example
* import { Effect, FiberSet } from "effect";
*
* Effect.gen(function* (_) {
* const set = yield* _(FiberSet.make());
* yield* _(FiberSet.add(set, Effect.runFork(Effect.fail("error"))));
*
* // parent fiber will fail with "error"
* yield* _(FiberSet.join(set));
* });
*/
export const join = <A, E>(self: FiberSet<A, E>): Effect.Effect<never, E> =>
Deferred.await(self.deferred as Deferred.Deferred<never, E>)
11 changes: 11 additions & 0 deletions packages/effect/test/FiberMap.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,15 @@ describe("FiberMap", () => {

assert.strictEqual(yield* _(Ref.get(ref)), 10)
}))

it.scoped("join", () =>
Effect.gen(function*(_) {
const map = yield* _(FiberMap.make<string>())
FiberMap.unsafeSet(map, "a", Effect.runFork(Effect.unit))
FiberMap.unsafeSet(map, "b", Effect.runFork(Effect.unit))
FiberMap.unsafeSet(map, "c", Effect.runFork(Effect.fail("fail")))
FiberMap.unsafeSet(map, "d", Effect.runFork(Effect.fail("ignored")))
const result = yield* _(FiberMap.join(map), Effect.flip)
assert.strictEqual(result, "fail")
}))
})
10 changes: 10 additions & 0 deletions packages/effect/test/FiberSet.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,14 @@ describe("FiberSet", () => {

assert.strictEqual(yield* _(Ref.get(ref)), 10)
}))

it.scoped("join", () =>
Effect.gen(function*(_) {
const set = yield* _(FiberSet.make())
FiberSet.unsafeAdd(set, Effect.runFork(Effect.unit))
FiberSet.unsafeAdd(set, Effect.runFork(Effect.unit))
FiberSet.unsafeAdd(set, Effect.runFork(Effect.fail("fail")))
const result = yield* _(FiberSet.join(set), Effect.flip)
assert.strictEqual(result, "fail")
}))
})

0 comments on commit e3ff789

Please sign in to comment.