From eab4bdb66e2583f8871aa11005aa298a1f01d770 Mon Sep 17 00:00:00 2001 From: Giulio Canti Date: Fri, 22 Sep 2023 14:31:52 +0200 Subject: [PATCH] Stream: range now includes both endpoints (#181) --- .changeset/cyan-geese-unite.md | 5 +++++ docs/modules/Stream.ts.md | 3 +-- src/Stream.ts | 3 +-- src/internal/stream.ts | 4 ++-- test/Sink/collecting.ts | 2 +- test/Sink/filtering.ts | 6 +++--- test/Sink/folding.ts | 8 ++++---- test/Sink/mapping.ts | 10 +++++----- test/Sink/traversing.ts | 4 ++-- test/Stream/aggregation.ts | 2 +- test/Stream/broadcasting.ts | 8 ++++---- test/Stream/buffering.ts | 34 +++++++++++++++++----------------- test/Stream/changing.ts | 4 ++-- test/Stream/constructors.ts | 28 ++++++++++++++++++---------- test/Stream/draining.ts | 2 +- test/Stream/environment.ts | 1 - test/Stream/error-handling.ts | 2 +- test/Stream/grouping.ts | 2 +- test/Stream/mapping.ts | 4 ++-- test/Stream/partitioning.ts | 6 +++--- test/Stream/scheduling.ts | 2 +- test/Stream/sequencing.ts | 8 ++++---- test/Stream/sliding.ts | 4 ++-- test/Stream/splitting.ts | 4 ++-- test/Stream/taking.ts | 10 +++++----- test/Stream/timeouts.ts | 8 ++++---- 26 files changed, 92 insertions(+), 82 deletions(-) create mode 100644 .changeset/cyan-geese-unite.md diff --git a/.changeset/cyan-geese-unite.md b/.changeset/cyan-geese-unite.md new file mode 100644 index 0000000..ec6f97f --- /dev/null +++ b/.changeset/cyan-geese-unite.md @@ -0,0 +1,5 @@ +--- +"@effect/stream": minor +--- + +Stream: range now includes both endpoints diff --git a/docs/modules/Stream.ts.md b/docs/modules/Stream.ts.md index 8c1b5ca..72bed9f 100644 --- a/docs/modules/Stream.ts.md +++ b/docs/modules/Stream.ts.md @@ -966,8 +966,7 @@ Added in v1.0.0 ## range -Constructs a stream from a range of integers (lower bound included, upper -bound not included). +Constructs a stream from a range of integers, including both endpoints. **Signature** diff --git a/src/Stream.ts b/src/Stream.ts index c7bc1b4..897b760 100644 --- a/src/Stream.ts +++ b/src/Stream.ts @@ -2636,8 +2636,7 @@ export const provideSomeLayer: { } = internal.provideSomeLayer /** - * Constructs a stream from a range of integers (lower bound included, upper - * bound not included). + * Constructs a stream from a range of integers, including both endpoints. * * @since 1.0.0 * @category constructors diff --git a/src/internal/stream.ts b/src/internal/stream.ts index fcccb0f..3eedf2d 100644 --- a/src/internal/stream.ts +++ b/src/internal/stream.ts @@ -4644,7 +4644,7 @@ export const provideSomeLayer = dual< /** @internal */ export const range = (min: number, max: number, chunkSize = DefaultChunkSize): Stream.Stream => suspend(() => { - if (min >= max) { + if (min > max) { return empty as Stream.Stream } const go = ( @@ -4652,7 +4652,7 @@ export const range = (min: number, max: number, chunkSize = DefaultChunkSize): S max: number, chunkSize: number ): Channel.Channel, unknown> => { - const remaining = max - min + const remaining = max - min + 1 if (remaining > chunkSize) { return pipe( core.write(Chunk.range(min, min + chunkSize - 1)), diff --git a/test/Sink/collecting.ts b/test/Sink/collecting.ts index 293060c..5081afd 100644 --- a/test/Sink/collecting.ts +++ b/test/Sink/collecting.ts @@ -83,7 +83,7 @@ describe.concurrent("Sink", () => { it.effect("collectAllToMap", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(0, 10), + Stream.range(0, 9), Stream.run(Sink.collectAllToMap( (n) => n % 3, (x, y) => x + y diff --git a/test/Sink/filtering.ts b/test/Sink/filtering.ts index cec2734..f25ad89 100644 --- a/test/Sink/filtering.ts +++ b/test/Sink/filtering.ts @@ -9,7 +9,7 @@ describe.concurrent("Sink", () => { it.effect("filterInput", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(1, 10), + Stream.range(1, 9), Stream.run(pipe(Sink.collectAll(), Sink.filterInput((n) => n % 2 === 0))) ) assert.deepStrictEqual(Array.from(result), [2, 4, 6, 8]) @@ -18,7 +18,7 @@ describe.concurrent("Sink", () => { it.effect("filterInputEffect - happy path", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(1, 10), + Stream.range(1, 9), Stream.run(pipe( Sink.collectAll(), Sink.filterInputEffect((n) => Effect.succeed(n % 2 === 0)) @@ -30,7 +30,7 @@ describe.concurrent("Sink", () => { it.effect("filterInputEffect - error", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(1, 10), + Stream.range(1, 9), Stream.run(pipe( Sink.collectAll(), Sink.filterInputEffect(() => Effect.fail("fail")) diff --git a/test/Sink/folding.ts b/test/Sink/folding.ts index 6a00870..f947253 100644 --- a/test/Sink/folding.ts +++ b/test/Sink/folding.ts @@ -22,7 +22,7 @@ describe.concurrent("Sink", () => { it.effect("fold - termination in the middle", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(1, 10), + Stream.range(1, 9), Stream.run(Sink.fold(0, (n) => n <= 5, (x, y) => x + y)) ) assert.strictEqual(result, 6) @@ -31,7 +31,7 @@ describe.concurrent("Sink", () => { it.effect("fold - immediate termination", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(1, 10), + Stream.range(1, 9), Stream.run(Sink.fold(0, (n) => n <= -1, (x, y) => x + y)) ) assert.strictEqual(result, 0) @@ -40,7 +40,7 @@ describe.concurrent("Sink", () => { it.effect("fold - no termination", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(1, 10), + Stream.range(1, 9), Stream.run(Sink.fold(0, (n) => n <= 500, (x, y) => x + y)) ) assert.strictEqual(result, 45) @@ -48,7 +48,7 @@ describe.concurrent("Sink", () => { it.effect("foldLeft equivalence with Chunk.reduce", () => Effect.gen(function*($) { - const stream = Stream.range(1, 10) + const stream = Stream.range(1, 9) const result1 = yield* $(stream, Stream.run(Sink.foldLeft("", (s, n) => s + `${n}`))) const result2 = yield* $(stream, Stream.runCollect, Effect.map(Chunk.reduce("", (s, n) => s + `${n}`))) assert.strictEqual(result1, result2) diff --git a/test/Sink/mapping.ts b/test/Sink/mapping.ts index 223ea35..e32073d 100644 --- a/test/Sink/mapping.ts +++ b/test/Sink/mapping.ts @@ -12,7 +12,7 @@ describe.concurrent("Sink", () => { it.effect("as", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(1, 10), + Stream.range(1, 9), Stream.run(pipe(Sink.succeed(1), Sink.as("as"))) ) assert.strictEqual(result, "as") @@ -155,7 +155,7 @@ describe.concurrent("Sink", () => { it.effect("map", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(1, 10), + Stream.range(1, 9), Stream.run(pipe(Sink.succeed(1), Sink.map((n) => `${n}`))) ) assert.strictEqual(result, "1") @@ -164,7 +164,7 @@ describe.concurrent("Sink", () => { it.effect("mapEffect - happy path", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(1, 10), + Stream.range(1, 9), Stream.run(pipe(Sink.succeed(1), Sink.mapEffect((n) => Effect.succeed(n + 1)))) ) assert.strictEqual(result, 2) @@ -173,7 +173,7 @@ describe.concurrent("Sink", () => { it.effect("mapEffect - error", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(1, 10), + Stream.range(1, 9), Stream.run(pipe(Sink.succeed(1), Sink.mapEffect(() => Effect.fail("fail")))), Effect.flip ) @@ -183,7 +183,7 @@ describe.concurrent("Sink", () => { it.effect("mapError", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(1, 10), + Stream.range(1, 9), Stream.run(pipe(Sink.fail("fail"), Sink.mapError((s) => s + "!"))), Effect.either ) diff --git a/test/Sink/traversing.ts b/test/Sink/traversing.ts index 64cae86..8c0e25a 100644 --- a/test/Sink/traversing.ts +++ b/test/Sink/traversing.ts @@ -22,7 +22,7 @@ describe.concurrent("Sink", () => { [1, 3, 7, 20], Effect.forEach((n) => pipe( - Stream.range(1, 100), + Stream.range(1, 99), Stream.rechunk(n), Stream.run(sink), Effect.map((option) => Equal.equals(option, Option.some(Option.some(10)))) @@ -79,7 +79,7 @@ describe.concurrent("Sink", () => { it.effect("forEachWhile - handles leftovers", () => Effect.gen(function*($) { const [result, value] = yield* $( - Stream.range(1, 5), + Stream.range(1, 4), Stream.run(pipe( Sink.forEachWhile((n: number) => Effect.succeed(n <= 3)), Sink.collectLeftover diff --git a/test/Stream/aggregation.ts b/test/Stream/aggregation.ts index 1d6cbe9..70703a2 100644 --- a/test/Stream/aggregation.ts +++ b/test/Stream/aggregation.ts @@ -231,7 +231,7 @@ describe.concurrent("Stream", () => { Effect.gen(function*($) { const queue = yield* $(Queue.unbounded()) yield* $( - Stream.range(1, 10), + Stream.range(1, 9), Stream.tap((n) => pipe( Effect.fail("Boom"), diff --git a/test/Stream/broadcasting.ts b/test/Stream/broadcasting.ts index ff6b337..9f62100 100644 --- a/test/Stream/broadcasting.ts +++ b/test/Stream/broadcasting.ts @@ -13,7 +13,7 @@ describe.concurrent("Stream", () => { it.effect("broadcast - values", () => Effect.gen(function*($) { const { result1, result2 } = yield* $( - Stream.range(0, 5), + Stream.range(0, 4), Stream.broadcast(2, 12), Effect.flatMap((streams) => Effect.all({ @@ -31,7 +31,7 @@ describe.concurrent("Stream", () => { it.effect("broadcast - errors", () => Effect.gen(function*($) { const { result1, result2 } = yield* $( - Stream.range(0, 1), + Stream.make(0), Stream.concat(Stream.fail("boom")), Stream.broadcast(2, 12), Effect.flatMap((streams) => @@ -49,7 +49,7 @@ describe.concurrent("Stream", () => { it.effect("broadcast - backpressure", () => Effect.gen(function*($) { const { result1, result2 } = yield* $( - Stream.range(0, 5), + Stream.range(0, 4), Stream.flatMap(Stream.succeed), Stream.broadcast(2, 2), Effect.flatMap((streams) => @@ -87,7 +87,7 @@ describe.concurrent("Stream", () => { it.effect("broadcast - unsubscribe", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(0, 5), + Stream.range(0, 4), Stream.broadcast(2, 2), Effect.flatMap((streams) => pipe( diff --git a/test/Stream/buffering.ts b/test/Stream/buffering.ts index b2e87c3..c8c218b 100644 --- a/test/Stream/buffering.ts +++ b/test/Stream/buffering.ts @@ -29,7 +29,7 @@ describe.concurrent("Stream", () => { Effect.gen(function*($) { const error = Cause.RuntimeException("boom") const result = yield* $( - Stream.range(0, 10), + Stream.range(0, 9), Stream.concat(Stream.fail(error)), Stream.buffer({ capacity: 2 }), Stream.runCollect, @@ -43,7 +43,7 @@ describe.concurrent("Stream", () => { const ref = yield* $(Ref.make(Chunk.empty())) const latch = yield* $(Deferred.make()) const stream = pipe( - Stream.range(1, 5), + Stream.range(1, 4), Stream.tap((n) => pipe( Ref.update(ref, Chunk.append(n)), @@ -81,7 +81,7 @@ describe.concurrent("Stream", () => { Effect.gen(function*($) { const error = Cause.RuntimeException("boom") const result = yield* $( - Stream.range(0, 10), + Stream.range(0, 9), Stream.concat(Stream.fail(error)), Stream.bufferChunks({ capacity: 2 }), Stream.runCollect, @@ -95,7 +95,7 @@ describe.concurrent("Stream", () => { const ref = yield* $(Ref.make(Chunk.empty())) const latch = yield* $(Deferred.make()) const stream = pipe( - Stream.range(1, 5), + Stream.range(1, 4), Stream.tap((n) => pipe( Ref.update(ref, Chunk.append(n)), @@ -120,7 +120,7 @@ describe.concurrent("Stream", () => { const result = yield* $( Stream.range(1, 1_000), Stream.concat(Stream.fail(error)), - Stream.concat(Stream.range(1_000, 2_000)), + Stream.concat(Stream.range(1_001, 2_000)), Stream.bufferChunks({ capacity: 2, strategy: "dropping" }), Stream.runCollect, Effect.exit @@ -142,7 +142,7 @@ describe.concurrent("Stream", () => { Stream.fromEffect(Deferred.await(latch1)), Stream.flatMap(() => pipe( - Stream.range(1, 17), + Stream.range(1, 16), Stream.rechunk(1), Stream.ensuring(Deferred.succeed(latch2, void 0)) ) @@ -154,7 +154,7 @@ describe.concurrent("Stream", () => { Stream.fromEffect(Deferred.await(latch3)), Stream.flatMap(() => pipe( - Stream.range(17, 25), + Stream.range(17, 24), Stream.rechunk(1), Stream.ensuring(Deferred.succeed(latch4, void 0)) ) @@ -211,7 +211,7 @@ describe.concurrent("Stream", () => { const result = yield* $( Stream.range(1, 1_000), Stream.concat(Stream.fail(error)), - Stream.concat(Stream.range(1_000, 2_000)), + Stream.concat(Stream.range(1_001, 2_000)), Stream.bufferChunks({ capacity: 2, strategy: "sliding" }), Stream.runCollect, Effect.exit @@ -234,7 +234,7 @@ describe.concurrent("Stream", () => { Stream.fromEffect(Deferred.await(latch1)), Stream.flatMap(() => pipe( - Stream.range(1, 17), + Stream.range(1, 16), Stream.rechunk(1), Stream.ensuring(Deferred.succeed(latch2, void 0)) ) @@ -246,7 +246,7 @@ describe.concurrent("Stream", () => { Stream.fromEffect(Deferred.await(latch3)), Stream.flatMap(() => pipe( - Stream.range(17, 26), + Stream.range(17, 25), Stream.rechunk(1), Stream.ensuring(Deferred.succeed(latch4, void 0)) ) @@ -336,7 +336,7 @@ describe.concurrent("Stream", () => { Stream.fromEffect(Deferred.await(latch3)), Stream.flatMap(() => pipe( - Stream.range(17, 25), + Stream.range(17, 24), Stream.rechunk(1), Stream.ensuring(Deferred.succeed(latch4, void 0)) ) @@ -393,7 +393,7 @@ describe.concurrent("Stream", () => { const result = yield* $( Stream.range(1, 1_000), Stream.concat(Stream.fail(error)), - Stream.concat(Stream.range(1_000, 2_000)), + Stream.concat(Stream.range(1_001, 2_000)), Stream.buffer({ capacity: 2, strategy: "sliding" }), Stream.runCollect, Effect.exit @@ -415,7 +415,7 @@ describe.concurrent("Stream", () => { Stream.fromEffect(Deferred.await(latch1)), Stream.flatMap(() => pipe( - Stream.range(1, 17), + Stream.range(1, 16), Stream.rechunk(1), Stream.ensuring(Deferred.succeed(latch2, void 0)) ) @@ -427,7 +427,7 @@ describe.concurrent("Stream", () => { Stream.fromEffect(Deferred.await(latch3)), Stream.flatMap(() => pipe( - Stream.range(17, 25), + Stream.range(17, 24), Stream.rechunk(1), Stream.ensuring(Deferred.succeed(latch4, void 0)) ) @@ -500,7 +500,7 @@ describe.concurrent("Stream", () => { Effect.gen(function*($) { const error = Cause.RuntimeException("boom") const result = yield* $( - Stream.range(0, 10), + Stream.range(0, 9), Stream.concat(Stream.fail(error)), Stream.buffer({ capacity: "unbounded" }), Stream.runCollect, @@ -514,14 +514,14 @@ describe.concurrent("Stream", () => { const ref = yield* $(Ref.make(Chunk.empty())) const latch = yield* $(Deferred.make()) const stream = pipe( - Stream.range(1, 1_000), + Stream.range(1, 999), Stream.tap((n) => pipe( Ref.update(ref, Chunk.append(n)), Effect.zipRight(pipe(Deferred.succeed(latch, void 0), Effect.when(() => n === 999))) ) ), - Stream.rechunk(1_000), + Stream.rechunk(999), Stream.buffer({ capacity: "unbounded" }) ) const result1 = yield* $(stream, Stream.take(2), Stream.runCollect) diff --git a/test/Stream/changing.ts b/test/Stream/changing.ts index 362a67c..86e8f77 100644 --- a/test/Stream/changing.ts +++ b/test/Stream/changing.ts @@ -7,7 +7,7 @@ import { assert, describe } from "vitest" describe.concurrent("Stream", () => { it.effect("changes", () => Effect.gen(function*($) { - const stream = Stream.range(0, 20) + const stream = Stream.range(0, 19) const result = yield* $( stream, Stream.changes, @@ -24,7 +24,7 @@ describe.concurrent("Stream", () => { it.effect("changesWithEffect", () => Effect.gen(function*($) { - const stream = Stream.range(0, 20) + const stream = Stream.range(0, 19) const result = yield* $( stream, Stream.changesWithEffect((left, right) => Effect.succeed(left === right)), diff --git a/test/Stream/constructors.ts b/test/Stream/constructors.ts index 04dc5bb..2c6244f 100644 --- a/test/Stream/constructors.ts +++ b/test/Stream/constructors.ts @@ -250,46 +250,54 @@ describe.concurrent("Stream", () => { assert.deepStrictEqual(Array.from(result), Array.from(Chunk.range(1, 10))) })) - it.effect("range - includes min value and excludes max value", () => + it.effect("range - includes both endpoints", () => Effect.gen(function*($) { const result = yield* $(Stream.runCollect(Stream.range(1, 2))) - assert.deepStrictEqual(Array.from(result), [1]) + assert.deepStrictEqual(Array.from(result), [1, 2]) })) it.effect("range - two large ranges can be concatenated", () => Effect.gen(function*($) { const result = yield* $( Stream.range(1, 1_000), - Stream.concat(Stream.range(1_000, 2_000)), + Stream.concat(Stream.range(1_001, 2_000)), Stream.runCollect ) - assert.deepStrictEqual(Array.from(result), Array.from(Chunk.range(1, 1_999))) + assert.deepStrictEqual(Array.from(result), Array.from(Chunk.range(1, 2000))) })) it.effect("range - two small ranges can be concatenated", () => Effect.gen(function*($) { const result = yield* $( Stream.range(1, 10), - Stream.concat(Stream.range(10, 20)), + Stream.concat(Stream.range(11, 20)), Stream.runCollect ) - assert.deepStrictEqual(Array.from(result), Array.from(Chunk.range(1, 19))) + assert.deepStrictEqual(Array.from(result), Array.from(Chunk.range(1, 20))) })) - it.effect("range - emits no values when start >= end", () => + it.effect("range - emits no values when start > end", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(1, 1), - Stream.concat(Stream.range(2, 1)), + Stream.range(2, 1), Stream.runCollect ) assert.deepStrictEqual(Array.from(result), []) })) + it.effect("range - emits 1 value when start === end", () => + Effect.gen(function*($) { + const result = yield* $( + Stream.range(1, 1), + Stream.runCollect + ) + assert.deepStrictEqual(Array.from(result), [1]) + })) + it.effect("range - emits values in chunks of chunkSize", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(1, 10, 2), + Stream.range(1, 9, 2), Stream.mapChunks((chunk) => Chunk.make(pipe(chunk, Chunk.reduce(0, (x, y) => x + y)))), Stream.runCollect ) diff --git a/test/Stream/draining.ts b/test/Stream/draining.ts index 721e1a5..0a435b0 100644 --- a/test/Stream/draining.ts +++ b/test/Stream/draining.ts @@ -15,7 +15,7 @@ describe.concurrent("Stream", () => { Effect.gen(function*($) { const ref = yield* $(Ref.make(Chunk.empty())) yield* $( - Stream.range(0, 10), + Stream.range(0, 9), Stream.mapEffect((n) => Ref.update(ref, Chunk.append(n))), Stream.drain, Stream.runDrain diff --git a/test/Stream/environment.ts b/test/Stream/environment.ts index 0a2ca19..c70d7bc 100644 --- a/test/Stream/environment.ts +++ b/test/Stream/environment.ts @@ -168,7 +168,6 @@ describe.concurrent("Stream", () => { Stream.provideLayer(Layer.succeed(StringService, { string: "test" })), Stream.runCollect ) - console.log("serviceWithStream") assert.deepStrictEqual(Array.from(result), ["test"]) })) diff --git a/test/Stream/error-handling.ts b/test/Stream/error-handling.ts index 1fe59df..c71aaa3 100644 --- a/test/Stream/error-handling.ts +++ b/test/Stream/error-handling.ts @@ -333,7 +333,7 @@ describe.concurrent("Stream", () => { Effect.gen(function*($) { const result = yield* $( Stream.empty, - Stream.orElseIfEmptyStream(() => Stream.range(0, 5)), + Stream.orElseIfEmptyStream(() => Stream.range(0, 4)), Stream.runCollect ) assert.deepStrictEqual(Array.from(result), [0, 1, 2, 3, 4]) diff --git a/test/Stream/grouping.ts b/test/Stream/grouping.ts index cc6e5cb..c31e1da 100644 --- a/test/Stream/grouping.ts +++ b/test/Stream/grouping.ts @@ -121,7 +121,7 @@ describe.concurrent("Stream", () => { it.effect("grouped - group size is correct", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(0, 100), + Stream.range(0, 99), Stream.grouped(10), Stream.map(Chunk.size), Stream.runCollect diff --git a/test/Stream/mapping.ts b/test/Stream/mapping.ts index a6d622c..d4e5b4d 100644 --- a/test/Stream/mapping.ts +++ b/test/Stream/mapping.ts @@ -209,7 +209,7 @@ describe.concurrent("Stream", () => { Effect.gen(function*($) { const queue = yield* $(Queue.unbounded()) yield* $( - Stream.range(0, 9), + Stream.range(0, 8), Stream.mapEffect((n) => pipe(Queue.offer(queue, n)), { concurrency: 1 }), Stream.runDrain ) @@ -313,7 +313,7 @@ describe.concurrent("Stream", () => { it.effect("mapEffectPar - propagates the error of the original stream", () => Effect.gen(function*($) { const fiber = yield* $( - Stream.range(1, 11), + Stream.range(1, 10), Stream.concat(Stream.fail(Cause.RuntimeException("boom"))), Stream.mapEffect(() => Effect.sleep(Duration.seconds(1)), { concurrency: 2 }), Stream.runDrain, diff --git a/test/Stream/partitioning.ts b/test/Stream/partitioning.ts index ceb0de8..659c425 100644 --- a/test/Stream/partitioning.ts +++ b/test/Stream/partitioning.ts @@ -29,7 +29,7 @@ describe.concurrent("Stream", () => { it.effect("partition - values", () => Effect.gen(function*($) { const { result1, result2 } = yield* $( - Stream.range(0, 6), + Stream.range(0, 5), Stream.partition((n) => n % 2 === 0), Effect.flatMap(([evens, odds]) => Effect.all({ @@ -46,7 +46,7 @@ describe.concurrent("Stream", () => { it.effect("partition - errors", () => Effect.gen(function*($) { const { result1, result2 } = yield* $( - Stream.range(0, 1), + Stream.make(0), Stream.concat(Stream.fail("boom")), Stream.partition((n) => n % 2 === 0), Effect.flatMap(([evens, odds]) => @@ -64,7 +64,7 @@ describe.concurrent("Stream", () => { it.effect("partition - backpressure", () => Effect.gen(function*($) { const { result1, result2, result3 } = yield* $( - Stream.range(0, 6), + Stream.range(0, 5), Stream.partition((n) => (n % 2 === 0), { bufferSize: 1 }), Effect.flatMap(([evens, odds]) => Effect.gen(function*($) { diff --git a/test/Stream/scheduling.ts b/test/Stream/scheduling.ts index c1eb8e3..36e25f7 100644 --- a/test/Stream/scheduling.ts +++ b/test/Stream/scheduling.ts @@ -14,7 +14,7 @@ describe.concurrent("Stream", () => { Effect.gen(function*($) { const start = yield* $(Clock.currentTimeMillis) const fiber = yield* $( - Stream.range(1, 9), + Stream.range(1, 8), Stream.schedule(Schedule.fixed(Duration.millis(100))), Stream.mapEffect((n) => pipe( diff --git a/test/Stream/sequencing.ts b/test/Stream/sequencing.ts index e801da6..2bbe1d2 100644 --- a/test/Stream/sequencing.ts +++ b/test/Stream/sequencing.ts @@ -104,7 +104,7 @@ describe.concurrent("Stream", () => { it.effect("flatMap - associativity", () => Effect.gen(function*($) { - const stream = Stream.range(0, 5) + const stream = Stream.range(0, 4) const f = (n: number) => Stream.succeed(n * 2) const g = (n: number) => Stream.succeed(String(n)) const { result1, result2 } = yield* $(Effect.all({ @@ -483,7 +483,7 @@ describe.concurrent("Stream", () => { const ref = yield* $(Ref.make(0)) const semaphore = yield* $(Effect.makeSemaphore(4)) yield* $( - Stream.range(1, 13), + Stream.range(1, 12), Stream.flatMap((n) => { if (n > 8) { return pipe( @@ -694,7 +694,7 @@ describe.concurrent("Stream", () => { it.effect("flattenExitOption - happy path", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(0, 10), + Stream.range(0, 9), Stream.toQueue({ capacity: 1 }), Effect.flatMap((queue) => pipe( @@ -716,7 +716,7 @@ describe.concurrent("Stream", () => { Effect.gen(function*($) { const error = Cause.RuntimeException("boom") const result = yield* $( - Stream.range(0, 10), + Stream.range(0, 9), Stream.concat(Stream.fail(error)), Stream.toQueue({ capacity: 1 }), Effect.flatMap((queue) => diff --git a/test/Stream/sliding.ts b/test/Stream/sliding.ts index 11f73cb..cd0c748 100644 --- a/test/Stream/sliding.ts +++ b/test/Stream/sliding.ts @@ -71,7 +71,7 @@ describe.concurrent("Stream", () => { it.effect("sliding - returns all elements if chunkSize is greater than the size of the stream", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(1, 6), + Stream.range(1, 5), Stream.sliding(6), Stream.runCollect ) @@ -80,7 +80,7 @@ describe.concurrent("Stream", () => { it.effect("sliding - is mostly equivalent to ZStream#grouped when stepSize and chunkSize are equal", () => Effect.gen(function*($) { - const stream = Stream.range(1, 6) + const stream = Stream.range(1, 5) const { result1, result2 } = yield* $(Effect.all({ result1: pipe(stream, Stream.slidingSize(3, 3), Stream.runCollect), result2: pipe(stream, Stream.grouped(3), Stream.runCollect) diff --git a/test/Stream/splitting.ts b/test/Stream/splitting.ts index 319d823..095d9ac 100644 --- a/test/Stream/splitting.ts +++ b/test/Stream/splitting.ts @@ -40,7 +40,7 @@ describe.concurrent("Stream", () => { ) const { result1, result2 } = yield* $(Effect.all({ result1: pipe( - Stream.range(0, 10), + Stream.range(0, 9), Stream.split((n) => n % 4 === 0), Stream.runCollect ), @@ -62,7 +62,7 @@ describe.concurrent("Stream", () => { it.effect("split - is equivalent to identity when the predicate is not satisfied", () => Effect.gen(function*($) { - const stream = Stream.range(1, 11) + const stream = Stream.range(1, 10) const { result1, result2 } = yield* $(Effect.all({ result1: pipe(stream, Stream.split((n) => n % 11 === 0), Stream.runCollect), result2: pipe( diff --git a/test/Stream/taking.ts b/test/Stream/taking.ts index b235f25..bb78228 100644 --- a/test/Stream/taking.ts +++ b/test/Stream/taking.ts @@ -11,7 +11,7 @@ describe.concurrent("Stream", () => { it.effect("take", () => Effect.gen(function*($) { const take = 3 - const stream = Stream.range(1, 6) + const stream = Stream.range(1, 5) const { result1, result2 } = yield* $(Effect.all({ result1: pipe(stream, Stream.take(take), Stream.runCollect), result2: pipe(Stream.runCollect(stream), Effect.map(Chunk.take(take))) @@ -52,7 +52,7 @@ describe.concurrent("Stream", () => { it.effect("takeRight", () => Effect.gen(function*($) { const take = 3 - const stream = Stream.range(1, 6) + const stream = Stream.range(1, 5) const { result1, result2 } = yield* $(Effect.all({ result1: pipe(stream, Stream.takeRight(take), Stream.runCollect), result2: pipe(Stream.runCollect(stream), Effect.map(Chunk.takeRight(take))) @@ -62,7 +62,7 @@ describe.concurrent("Stream", () => { it.effect("takeUntil", () => Effect.gen(function*($) { - const stream = Stream.range(1, 6) + const stream = Stream.range(1, 5) const f = (n: number) => n % 3 === 0 const { result1, result2 } = yield* $(Effect.all({ result1: pipe(stream, Stream.takeUntil(f), Stream.runCollect), @@ -82,7 +82,7 @@ describe.concurrent("Stream", () => { it.effect("takeUntilEffect", () => Effect.gen(function*($) { - const stream = Stream.range(1, 6) + const stream = Stream.range(1, 5) const f = (n: number) => Effect.succeed(n % 3 === 0) const { result1, result2 } = yield* $(Effect.all({ result1: pipe(stream, Stream.takeUntilEffect(f), Stream.runCollect), @@ -126,7 +126,7 @@ describe.concurrent("Stream", () => { it.effect("takeWhile", () => Effect.gen(function*($) { - const stream = Stream.range(1, 6) + const stream = Stream.range(1, 5) const f = (n: number) => n <= 3 const { result1, result2 } = yield* $(Effect.all({ result1: pipe(stream, Stream.takeWhile(f), Stream.runCollect), diff --git a/test/Stream/timeouts.ts b/test/Stream/timeouts.ts index 2283ed8..ce53c81 100644 --- a/test/Stream/timeouts.ts +++ b/test/Stream/timeouts.ts @@ -27,7 +27,7 @@ describe.concurrent("Stream", () => { it.effect("timeout - should end the stream", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(0, 5), + Stream.range(0, 4), Stream.tap(() => Effect.sleep(Duration.infinity)), Stream.timeout(Duration.zero), Stream.runCollect @@ -38,7 +38,7 @@ describe.concurrent("Stream", () => { it.effect("timeoutFail - succeed", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(0, 5), + Stream.range(0, 4), Stream.tap(() => Effect.sleep(Duration.infinity)), Stream.timeoutFail(() => false, Duration.zero), Stream.runDrain, @@ -63,7 +63,7 @@ describe.concurrent("Stream", () => { Effect.gen(function*($) { const error = Cause.RuntimeException("boom") const result = yield* $( - Stream.range(0, 5), + Stream.range(0, 4), Stream.tap(() => Effect.sleep(Duration.infinity)), Stream.timeoutFailCause(() => Cause.die(error), Duration.zero), Stream.runDrain, @@ -77,7 +77,7 @@ describe.concurrent("Stream", () => { it.effect("timeoutTo - succeed", () => Effect.gen(function*($) { const result = yield* $( - Stream.range(0, 5), + Stream.range(0, 4), Stream.timeoutTo(Duration.infinity, Stream.succeed(-1)), Stream.runCollect )