From 747112e11a8339b42ff8ebcce5641881473ffc4c Mon Sep 17 00:00:00 2001 From: gcanti Date: Fri, 15 Sep 2023 08:00:55 +0200 Subject: [PATCH] Stream: replace Duration.Duration with Duration.DurationInput in public APIs --- .changeset/old-cars-divide.md | 5 +++ docs/modules/Stream.ts.md | 44 +++++++++++---------- src/Stream.ts | 45 +++++++++++---------- src/internal/stream.ts | 73 ++++++++++++++++------------------- test/Stream/repeating.ts | 13 +++++++ 5 files changed, 99 insertions(+), 81 deletions(-) create mode 100644 .changeset/old-cars-divide.md diff --git a/.changeset/old-cars-divide.md b/.changeset/old-cars-divide.md new file mode 100644 index 0000000..aea6711 --- /dev/null +++ b/.changeset/old-cars-divide.md @@ -0,0 +1,5 @@ +--- +"@effect/stream": patch +--- + +Stream: replace Duration.Duration with Duration.DurationInput in public APIs diff --git a/docs/modules/Stream.ts.md b/docs/modules/Stream.ts.md index b3fd7ac..8c1b5ca 100644 --- a/docs/modules/Stream.ts.md +++ b/docs/modules/Stream.ts.md @@ -1114,7 +1114,7 @@ A stream that emits Unit values spaced by the specified duration. **Signature** ```ts -export declare const tick: (interval: Duration.Duration) => Stream +export declare const tick: (interval: Duration.DurationInput) => Stream ``` Added in v1.0.0 @@ -3785,8 +3785,8 @@ has paused typing so as to not prematurely recommend results. ```ts export declare const debounce: { - (duration: Duration.Duration): (self: Stream) => Stream - (self: Stream, duration: Duration.Duration): Stream + (duration: Duration.DurationInput): (self: Stream) => Stream + (self: Stream, duration: Duration.DurationInput): Stream } ``` @@ -4204,8 +4204,10 @@ Partitions the stream with the specified `chunkSize` or until the specified ```ts export declare const groupedWithin: { - (chunkSize: number, duration: Duration.Duration): (self: Stream) => Stream> - (self: Stream, chunkSize: number, duration: Duration.Duration): Stream> + (chunkSize: number, duration: Duration.DurationInput): ( + self: Stream + ) => Stream> + (self: Stream, chunkSize: number, duration: Duration.DurationInput): Stream> } ``` @@ -4223,8 +4225,8 @@ given duration completes. See `interruptAfter` for this behavior. ```ts export declare const haltAfter: { - (duration: Duration.Duration): (self: Stream) => Stream - (self: Stream, duration: Duration.Duration): Stream + (duration: Duration.DurationInput): (self: Stream) => Stream + (self: Stream, duration: Duration.DurationInput): Stream } ``` @@ -4335,8 +4337,8 @@ evaluation of this stream after the given `Duration`. ```ts export declare const interruptAfter: { - (duration: Duration.Duration): (self: Stream) => Stream - (self: Stream, duration: Duration.Duration): Stream + (duration: Duration.DurationInput): (self: Stream) => Stream + (self: Stream, duration: Duration.DurationInput): Stream } ``` @@ -5286,7 +5288,7 @@ export declare const throttle: { (options: { readonly cost: (chunk: Chunk.Chunk) => number readonly units: number - readonly duration: Duration.Duration + readonly duration: Duration.DurationInput readonly burst?: number | undefined readonly strategy?: 'enforce' | 'shape' | undefined }): (self: Stream) => Stream @@ -5295,7 +5297,7 @@ export declare const throttle: { options: { readonly cost: (chunk: Chunk.Chunk) => number readonly units: number - readonly duration: Duration.Duration + readonly duration: Duration.DurationInput readonly burst?: number | undefined readonly strategy?: 'enforce' | 'shape' | undefined } @@ -5326,7 +5328,7 @@ export declare const throttleEffect: { (options: { readonly cost: (chunk: Chunk.Chunk) => Effect.Effect readonly units: number - readonly duration: Duration.Duration + readonly duration: Duration.DurationInput readonly burst?: number | undefined readonly strategy?: 'enforce' | 'shape' | undefined }): (self: Stream) => Stream @@ -5335,7 +5337,7 @@ export declare const throttleEffect: { options: { readonly cost: (chunk: Chunk.Chunk) => Effect.Effect readonly units: number - readonly duration: Duration.Duration + readonly duration: Duration.DurationInput readonly burst?: number | undefined readonly strategy?: 'enforce' | 'shape' | undefined } @@ -5353,8 +5355,8 @@ Ends the stream if it does not produce a value after the specified duration. ```ts export declare const timeout: { - (duration: Duration.Duration): (self: Stream) => Stream - (self: Stream, duration: Duration.Duration): Stream + (duration: Duration.DurationInput): (self: Stream) => Stream + (self: Stream, duration: Duration.DurationInput): Stream } ``` @@ -5369,8 +5371,8 @@ duration. ```ts export declare const timeoutFail: { - (error: LazyArg, duration: Duration.Duration): (self: Stream) => Stream - (self: Stream, error: LazyArg, duration: Duration.Duration): Stream + (error: LazyArg, duration: Duration.DurationInput): (self: Stream) => Stream + (self: Stream, error: LazyArg, duration: Duration.DurationInput): Stream } ``` @@ -5385,10 +5387,10 @@ duration. ```ts export declare const timeoutFailCause: { - (cause: LazyArg>, duration: Duration.Duration): ( + (cause: LazyArg>, duration: Duration.DurationInput): ( self: Stream ) => Stream - (self: Stream, cause: LazyArg>, duration: Duration.Duration): Stream< + (self: Stream, cause: LazyArg>, duration: Duration.DurationInput): Stream< R, E | E2, A @@ -5407,10 +5409,10 @@ duration. ```ts export declare const timeoutTo: { - (duration: Duration.Duration, that: Stream): ( + (duration: Duration.DurationInput, that: Stream): ( self: Stream ) => Stream - (self: Stream, duration: Duration.Duration, that: Stream): Stream< + (self: Stream, duration: Duration.DurationInput, that: Stream): Stream< R | R2, E | E2, A | A2 diff --git a/src/Stream.ts b/src/Stream.ts index 8915095..c7bc1b4 100644 --- a/src/Stream.ts +++ b/src/Stream.ts @@ -830,8 +830,8 @@ export const crossWith: { * @category utils */ export const debounce: { - (duration: Duration.Duration): (self: Stream) => Stream - (self: Stream, duration: Duration.Duration): Stream + (duration: Duration.DurationInput): (self: Stream) => Stream + (self: Stream, duration: Duration.DurationInput): Stream } = internal.debounce /** @@ -1695,8 +1695,11 @@ export const grouped: { * @category utils */ export const groupedWithin: { - (chunkSize: number, duration: Duration.Duration): (self: Stream) => Stream> - (self: Stream, chunkSize: number, duration: Duration.Duration): Stream> + ( + chunkSize: number, + duration: Duration.DurationInput + ): (self: Stream) => Stream> + (self: Stream, chunkSize: number, duration: Duration.DurationInput): Stream> } = internal.groupedWithin /** @@ -1710,8 +1713,8 @@ export const groupedWithin: { * @category utils */ export const haltAfter: { - (duration: Duration.Duration): (self: Stream) => Stream - (self: Stream, duration: Duration.Duration): Stream + (duration: Duration.DurationInput): (self: Stream) => Stream + (self: Stream, duration: Duration.DurationInput): Stream } = internal.haltAfter /** @@ -1826,8 +1829,8 @@ export const intersperseAffixes: { * @category utils */ export const interruptAfter: { - (duration: Duration.Duration): (self: Stream) => Stream - (self: Stream, duration: Duration.Duration): Stream + (duration: Duration.DurationInput): (self: Stream) => Stream + (self: Stream, duration: Duration.DurationInput): Stream } = internal.interruptAfter /** @@ -3635,7 +3638,7 @@ export const throttle: { options: { readonly cost: (chunk: Chunk.Chunk) => number readonly units: number - readonly duration: Duration.Duration + readonly duration: Duration.DurationInput readonly burst?: number readonly strategy?: "enforce" | "shape" } @@ -3645,7 +3648,7 @@ export const throttle: { options: { readonly cost: (chunk: Chunk.Chunk) => number readonly units: number - readonly duration: Duration.Duration + readonly duration: Duration.DurationInput readonly burst?: number readonly strategy?: "enforce" | "shape" } @@ -3673,7 +3676,7 @@ export const throttleEffect: { options: { readonly cost: (chunk: Chunk.Chunk) => Effect.Effect readonly units: number - readonly duration: Duration.Duration + readonly duration: Duration.DurationInput readonly burst?: number readonly strategy?: "enforce" | "shape" } @@ -3683,7 +3686,7 @@ export const throttleEffect: { options: { readonly cost: (chunk: Chunk.Chunk) => Effect.Effect readonly units: number - readonly duration: Duration.Duration + readonly duration: Duration.DurationInput readonly burst?: number readonly strategy?: "enforce" | "shape" } @@ -3696,7 +3699,7 @@ export const throttleEffect: { * @since 1.0.0 * @category constructors */ -export const tick: (interval: Duration.Duration) => Stream = internal.tick +export const tick: (interval: Duration.DurationInput) => Stream = internal.tick /** * Ends the stream if it does not produce a value after the specified duration. @@ -3705,8 +3708,8 @@ export const tick: (interval: Duration.Duration) => Stream = * @category utils */ export const timeout: { - (duration: Duration.Duration): (self: Stream) => Stream - (self: Stream, duration: Duration.Duration): Stream + (duration: Duration.DurationInput): (self: Stream) => Stream + (self: Stream, duration: Duration.DurationInput): Stream } = internal.timeout /** @@ -3717,8 +3720,8 @@ export const timeout: { * @category utils */ export const timeoutFail: { - (error: LazyArg, duration: Duration.Duration): (self: Stream) => Stream - (self: Stream, error: LazyArg, duration: Duration.Duration): Stream + (error: LazyArg, duration: Duration.DurationInput): (self: Stream) => Stream + (self: Stream, error: LazyArg, duration: Duration.DurationInput): Stream } = internal.timeoutFail /** @@ -3731,12 +3734,12 @@ export const timeoutFail: { export const timeoutFailCause: { ( cause: LazyArg>, - duration: Duration.Duration + duration: Duration.DurationInput ): (self: Stream) => Stream ( self: Stream, cause: LazyArg>, - duration: Duration.Duration + duration: Duration.DurationInput ): Stream } = internal.timeoutFailCause @@ -3749,12 +3752,12 @@ export const timeoutFailCause: { */ export const timeoutTo: { ( - duration: Duration.Duration, + duration: Duration.DurationInput, that: Stream ): (self: Stream) => Stream ( self: Stream, - duration: Duration.Duration, + duration: Duration.DurationInput, that: Stream ): Stream } = internal.timeoutTo diff --git a/src/internal/stream.ts b/src/internal/stream.ts index 5e2627a..fcccb0f 100644 --- a/src/internal/stream.ts +++ b/src/internal/stream.ts @@ -1608,9 +1608,9 @@ export const crossWith = dual< /** @internal */ export const debounce = dual< - (duration: Duration.Duration) => (self: Stream.Stream) => Stream.Stream, - (self: Stream.Stream, duration: Duration.Duration) => Stream.Stream ->(2, (self: Stream.Stream, duration: Duration.Duration): Stream.Stream => + (duration: Duration.DurationInput) => (self: Stream.Stream) => Stream.Stream, + (self: Stream.Stream, duration: Duration.DurationInput) => Stream.Stream +>(2, (self: Stream.Stream, duration: Duration.DurationInput): Stream.Stream => pipe( singleProducerAsyncInput.make, unknown>(), Effect.flatMap((input) => @@ -3221,19 +3221,19 @@ export const grouped = dual< export const groupedWithin = dual< ( chunkSize: number, - duration: Duration.Duration + duration: Duration.DurationInput ) => (self: Stream.Stream) => Stream.Stream>, ( self: Stream.Stream, chunkSize: number, - duration: Duration.Duration + duration: Duration.DurationInput ) => Stream.Stream> >( 3, ( self: Stream.Stream, chunkSize: number, - duration: Duration.Duration + duration: Duration.DurationInput ): Stream.Stream> => aggregateWithin(self, _sink.collectAllN(chunkSize), Schedule.spaced(duration)) ) @@ -3286,11 +3286,11 @@ export const haltWhen = dual< /** @internal */ export const haltAfter = dual< - (duration: Duration.Duration) => (self: Stream.Stream) => Stream.Stream, - (self: Stream.Stream, duration: Duration.Duration) => Stream.Stream + (duration: Duration.DurationInput) => (self: Stream.Stream) => Stream.Stream, + (self: Stream.Stream, duration: Duration.DurationInput) => Stream.Stream >( 2, - (self: Stream.Stream, duration: Duration.Duration): Stream.Stream => + (self: Stream.Stream, duration: Duration.DurationInput): Stream.Stream => pipe(self, haltWhen(Clock.sleep(duration))) ) @@ -3529,11 +3529,11 @@ export const intersperseAffixes = dual< /** @internal */ export const interruptAfter = dual< - (duration: Duration.Duration) => (self: Stream.Stream) => Stream.Stream, - (self: Stream.Stream, duration: Duration.Duration) => Stream.Stream + (duration: Duration.DurationInput) => (self: Stream.Stream) => Stream.Stream, + (self: Stream.Stream, duration: Duration.DurationInput) => Stream.Stream >( 2, - (self: Stream.Stream, duration: Duration.Duration): Stream.Stream => + (self: Stream.Stream, duration: Duration.DurationInput): Stream.Stream => pipe(self, interruptWhen(Clock.sleep(duration))) ) @@ -5020,12 +5020,7 @@ export const repeatWith = dual< } ) -/** - * Repeats the value using the provided schedule. - * - * @since 1.0.0 - * @category constructors - */ +/** @internal */ export const repeatWithSchedule = ( value: A, schedule: Schedule.Schedule @@ -6220,7 +6215,7 @@ export const throttle = dual< options: { readonly cost: (chunk: Chunk.Chunk) => number readonly units: number - readonly duration: Duration.Duration + readonly duration: Duration.DurationInput readonly burst?: number readonly strategy?: "enforce" | "shape" } @@ -6230,7 +6225,7 @@ export const throttle = dual< options: { readonly cost: (chunk: Chunk.Chunk) => number readonly units: number - readonly duration: Duration.Duration + readonly duration: Duration.DurationInput readonly burst?: number readonly strategy?: "enforce" | "shape" } @@ -6242,7 +6237,7 @@ export const throttle = dual< options: { readonly cost: (chunk: Chunk.Chunk) => number readonly units: number - readonly duration: Duration.Duration + readonly duration: Duration.DurationInput readonly burst?: number readonly strategy?: "enforce" | "shape" } @@ -6259,7 +6254,7 @@ export const throttleEffect = dual< options: { readonly cost: (chunk: Chunk.Chunk) => Effect.Effect readonly units: number - readonly duration: Duration.Duration + readonly duration: Duration.DurationInput readonly burst?: number readonly strategy?: "enforce" | "shape" } @@ -6269,7 +6264,7 @@ export const throttleEffect = dual< options: { readonly cost: (chunk: Chunk.Chunk) => Effect.Effect readonly units: number - readonly duration: Duration.Duration + readonly duration: Duration.DurationInput readonly burst?: number readonly strategy?: "enforce" | "shape" } @@ -6281,7 +6276,7 @@ export const throttleEffect = dual< options: { readonly cost: (chunk: Chunk.Chunk) => Effect.Effect readonly units: number - readonly duration: Duration.Duration + readonly duration: Duration.DurationInput readonly burst?: number readonly strategy?: "enforce" | "shape" } @@ -6297,7 +6292,7 @@ const throttleEnforceEffect = ( self: Stream.Stream, cost: (chunk: Chunk.Chunk) => Effect.Effect, units: number, - duration: Duration.Duration, + duration: Duration.DurationInput, burst: number ): Stream.Stream => { const loop = ( @@ -6340,7 +6335,7 @@ const throttleShapeEffect = ( self: Stream.Stream, costFn: (chunk: Chunk.Chunk) => Effect.Effect, units: number, - duration: Duration.Duration, + duration: Duration.DurationInput, burst: number ): Stream.Stream => { const loop = ( @@ -6387,14 +6382,14 @@ const throttleShapeEffect = ( } /** @internal */ -export const tick = (interval: Duration.Duration): Stream.Stream => +export const tick = (interval: Duration.DurationInput): Stream.Stream => repeatWithSchedule(void 0, Schedule.spaced(interval)) /** @internal */ export const timeout = dual< - (duration: Duration.Duration) => (self: Stream.Stream) => Stream.Stream, - (self: Stream.Stream, duration: Duration.Duration) => Stream.Stream ->(2, (self: Stream.Stream, duration: Duration.Duration): Stream.Stream => + (duration: Duration.DurationInput) => (self: Stream.Stream) => Stream.Stream, + (self: Stream.Stream, duration: Duration.DurationInput) => Stream.Stream +>(2, (self: Stream.Stream, duration: Duration.DurationInput): Stream.Stream => pipe( toPull(self), Effect.map(Effect.timeoutFail>({ @@ -6408,19 +6403,19 @@ export const timeout = dual< export const timeoutFail = dual< ( error: LazyArg, - duration: Duration.Duration + duration: Duration.DurationInput ) => (self: Stream.Stream) => Stream.Stream, ( self: Stream.Stream, error: LazyArg, - duration: Duration.Duration + duration: Duration.DurationInput ) => Stream.Stream >( 3, ( self: Stream.Stream, error: LazyArg, - duration: Duration.Duration + duration: Duration.DurationInput ): Stream.Stream => pipe(self, timeoutTo(duration, failSync(error))) ) @@ -6428,19 +6423,19 @@ export const timeoutFail = dual< export const timeoutFailCause = dual< ( cause: LazyArg>, - duration: Duration.Duration + duration: Duration.DurationInput ) => (self: Stream.Stream) => Stream.Stream, ( self: Stream.Stream, cause: LazyArg>, - duration: Duration.Duration + duration: Duration.DurationInput ) => Stream.Stream >( 3, ( self: Stream.Stream, cause: LazyArg>, - duration: Duration.Duration + duration: Duration.DurationInput ): Stream.Stream => pipe( toPull(self), @@ -6457,19 +6452,19 @@ export const timeoutFailCause = dual< /** @internal */ export const timeoutTo = dual< ( - duration: Duration.Duration, + duration: Duration.DurationInput, that: Stream.Stream ) => (self: Stream.Stream) => Stream.Stream, ( self: Stream.Stream, - duration: Duration.Duration, + duration: Duration.DurationInput, that: Stream.Stream ) => Stream.Stream >( 3, ( self: Stream.Stream, - duration: Duration.Duration, + duration: Duration.DurationInput, that: Stream.Stream ): Stream.Stream => { const StreamTimeout = Cause.RuntimeException("Stream Timeout") diff --git a/test/Stream/repeating.ts b/test/Stream/repeating.ts index 044308c..7bbfc3c 100644 --- a/test/Stream/repeating.ts +++ b/test/Stream/repeating.ts @@ -38,6 +38,19 @@ describe.concurrent("Stream", () => { assert.deepStrictEqual(Array.from(result), [1, 1, 1, 1, 1]) })) + it.effect("tick", () => + Effect.gen(function*($) { + const fiber = yield* $( + Stream.tick("10 millis"), + Stream.take(2), + Stream.runCollect, + Effect.fork + ) + yield* $(TestClock.adjust(Duration.millis(50))) + const result = yield* $(Fiber.join(fiber)) + assert.deepStrictEqual(Array.from(result), [undefined, undefined]) + })) + it.effect("repeat - short circuits", () => Effect.gen(function*($) { const ref = yield* $(Ref.make(Chunk.empty()))