Skip to content

Commit

Permalink
feat: Add immediate:boolean & scope:Scope to RunForkOptions
Browse files Browse the repository at this point in the history
refactor: use interrupting FiberId

fix: remove Scope option due to memory leaks

feat: add RunForkOptions to runCallback
  • Loading branch information
TylorS committed Jan 11, 2024
1 parent 48a3d40 commit 917105e
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 14 deletions.
5 changes: 5 additions & 0 deletions .changeset/fair-walls-sin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": patch
---

Add immediate:boolean flag to runFork/runCallback
13 changes: 11 additions & 2 deletions packages/effect/src/Runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export interface Runtime<in R> extends Pipeable {
export interface RunForkOptions {
readonly scheduler?: Scheduler | undefined
readonly updateRefs?: ((refs: FiberRefs.FiberRefs, fiberId: FiberId.Runtime) => FiberRefs.FiberRefs) | undefined
readonly immediate?: boolean
}

/**
Expand Down Expand Up @@ -93,6 +94,14 @@ export const runSyncExit: <R>(runtime: Runtime<R>) => <E, A>(effect: Effect.Effe
*/
export const runSync: <R>(runtime: Runtime<R>) => <E, A>(effect: Effect.Effect<R, E, A>) => A = internal.unsafeRunSync

/**
* @since 2.0.0
* @category models
*/
export interface RunCallbackOptions<E, A> extends RunForkOptions {
readonly onExit?: ((exit: Exit.Exit<E, A>) => void) | undefined
}

/**
* Executes the effect asynchronously, eventually passing the exit value to
* the specified callback.
Expand All @@ -107,8 +116,8 @@ export const runCallback: <R>(
runtime: Runtime<R>
) => <E, A>(
effect: Effect.Effect<R, E, A>,
onExit?: ((exit: Exit.Exit<E, A>) => void) | undefined
) => (fiberId?: FiberId.FiberId | undefined, onExit?: ((exit: Exit.Exit<E, A>) => void) | undefined) => void =
options?: RunCallbackOptions<E, A> | undefined
) => (fiberId?: FiberId.FiberId | undefined, options?: RunCallbackOptions<E, A> | undefined) => void =
internal.unsafeRunCallback

/**
Expand Down
31 changes: 19 additions & 12 deletions packages/effect/src/internal/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ export const unsafeFork = <R>(runtime: Runtime.Runtime<R>) =>
options?: Runtime.RunForkOptions
): Fiber.RuntimeFiber<E, A> => {
const fiberId = FiberId.unsafeMake()
const effect = self

const fiberRefUpdates: ReadonlyArray.NonEmptyArray<
readonly [FiberRef.FiberRef<any>, ReadonlyArray.NonEmptyReadonlyArray<readonly [FiberId.Runtime, any]>]
> = [[core.currentContext, [[fiberId, runtime.context]]]]
Expand All @@ -55,6 +53,7 @@ export const unsafeFork = <R>(runtime: Runtime.Runtime<R>) =>
runtime.runtimeFlags
)

const effect = self
const supervisor = fiberRuntime._supervisor

// we can compare by reference here as _supervisor.none is wrapped with globalValue
Expand All @@ -66,7 +65,12 @@ export const unsafeFork = <R>(runtime: Runtime.Runtime<R>) =>

fiberScope.globalScope.add(runtime.runtimeFlags, fiberRuntime)

fiberRuntime.start(effect)
// Only an explicit false will prevent immediate execution
if (options?.immediate === false) {
fiberRuntime.resume(effect)
} else {
fiberRuntime.start(effect)
}

return fiberRuntime
}
Expand All @@ -75,22 +79,25 @@ export const unsafeFork = <R>(runtime: Runtime.Runtime<R>) =>
export const unsafeRunCallback = <R>(runtime: Runtime.Runtime<R>) =>
<E, A>(
effect: Effect.Effect<R, E, A>,
onExit?: (exit: Exit.Exit<E, A>) => void
): (fiberId?: FiberId.FiberId, onExit?: (exit: Exit.Exit<E, A>) => void) => void => {
const fiberRuntime = unsafeFork(runtime)(effect)
options: Runtime.RunCallbackOptions<E, A> = {}
): (fiberId?: FiberId.FiberId, options?: Runtime.RunCallbackOptions<E, A> | undefined) => void => {
const fiberRuntime = unsafeFork(runtime)(effect, options)

if (onExit) {
if (options.onExit) {
fiberRuntime.addObserver((exit) => {
onExit(exit)
options.onExit!(exit)
})
}

return (id, onExitInterrupt) =>
return (id, cancelOptions) =>
unsafeRunCallback(runtime)(
pipe(fiberRuntime, Fiber.interruptAs(id ?? FiberId.none)),
onExitInterrupt ?
(exit) => onExitInterrupt(Exit.flatten(exit)) :
void 0
{
...cancelOptions,
onExit: cancelOptions?.onExit
? (exit) => cancelOptions.onExit!(Exit.flatten(exit))
: undefined
}
)
}

Expand Down

0 comments on commit 917105e

Please sign in to comment.