Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

Commit

Permalink
Stream: range now includes both endpoints (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
gcanti authored Sep 22, 2023
1 parent faa7d5b commit eab4bdb
Show file tree
Hide file tree
Showing 26 changed files with 92 additions and 82 deletions.
5 changes: 5 additions & 0 deletions .changeset/cyan-geese-unite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/stream": minor
---

Stream: range now includes both endpoints
3 changes: 1 addition & 2 deletions docs/modules/Stream.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -966,8 +966,7 @@ Added in v1.0.0
## range
Constructs a stream from a range of integers (lower bound included, upper
bound not included).
Constructs a stream from a range of integers, including both endpoints.
**Signature**
Expand Down
3 changes: 1 addition & 2 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2636,8 +2636,7 @@ export const provideSomeLayer: {
} = internal.provideSomeLayer

/**
* Constructs a stream from a range of integers (lower bound included, upper
* bound not included).
* Constructs a stream from a range of integers, including both endpoints.
*
* @since 1.0.0
* @category constructors
Expand Down
4 changes: 2 additions & 2 deletions src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4644,15 +4644,15 @@ export const provideSomeLayer = dual<
/** @internal */
export const range = (min: number, max: number, chunkSize = DefaultChunkSize): Stream.Stream<never, never, number> =>
suspend(() => {
if (min >= max) {
if (min > max) {
return empty as Stream.Stream<never, never, number>
}
const go = (
min: number,
max: number,
chunkSize: number
): Channel.Channel<never, unknown, unknown, unknown, never, Chunk.Chunk<number>, unknown> => {
const remaining = max - min
const remaining = max - min + 1
if (remaining > chunkSize) {
return pipe(
core.write(Chunk.range(min, min + chunkSize - 1)),
Expand Down
2 changes: 1 addition & 1 deletion test/Sink/collecting.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ describe.concurrent("Sink", () => {
it.effect("collectAllToMap", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(0, 10),
Stream.range(0, 9),
Stream.run(Sink.collectAllToMap(
(n) => n % 3,
(x, y) => x + y
Expand Down
6 changes: 3 additions & 3 deletions test/Sink/filtering.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ describe.concurrent("Sink", () => {
it.effect("filterInput", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(pipe(Sink.collectAll<number>(), Sink.filterInput((n) => n % 2 === 0)))
)
assert.deepStrictEqual(Array.from(result), [2, 4, 6, 8])
Expand All @@ -18,7 +18,7 @@ describe.concurrent("Sink", () => {
it.effect("filterInputEffect - happy path", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(pipe(
Sink.collectAll<number>(),
Sink.filterInputEffect((n) => Effect.succeed(n % 2 === 0))
Expand All @@ -30,7 +30,7 @@ describe.concurrent("Sink", () => {
it.effect("filterInputEffect - error", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(pipe(
Sink.collectAll<number>(),
Sink.filterInputEffect(() => Effect.fail("fail"))
Expand Down
8 changes: 4 additions & 4 deletions test/Sink/folding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe.concurrent("Sink", () => {
it.effect("fold - termination in the middle", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(Sink.fold<number, number>(0, (n) => n <= 5, (x, y) => x + y))
)
assert.strictEqual(result, 6)
Expand All @@ -31,7 +31,7 @@ describe.concurrent("Sink", () => {
it.effect("fold - immediate termination", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(Sink.fold<number, number>(0, (n) => n <= -1, (x, y) => x + y))
)
assert.strictEqual(result, 0)
Expand All @@ -40,15 +40,15 @@ describe.concurrent("Sink", () => {
it.effect("fold - no termination", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(Sink.fold<number, number>(0, (n) => n <= 500, (x, y) => x + y))
)
assert.strictEqual(result, 45)
}))

it.effect("foldLeft equivalence with Chunk.reduce", () =>
Effect.gen(function*($) {
const stream = Stream.range(1, 10)
const stream = Stream.range(1, 9)
const result1 = yield* $(stream, Stream.run(Sink.foldLeft("", (s, n) => s + `${n}`)))
const result2 = yield* $(stream, Stream.runCollect, Effect.map(Chunk.reduce("", (s, n) => s + `${n}`)))
assert.strictEqual(result1, result2)
Expand Down
10 changes: 5 additions & 5 deletions test/Sink/mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ describe.concurrent("Sink", () => {
it.effect("as", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(pipe(Sink.succeed(1), Sink.as("as")))
)
assert.strictEqual(result, "as")
Expand Down Expand Up @@ -155,7 +155,7 @@ describe.concurrent("Sink", () => {
it.effect("map", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(pipe(Sink.succeed(1), Sink.map((n) => `${n}`)))
)
assert.strictEqual(result, "1")
Expand All @@ -164,7 +164,7 @@ describe.concurrent("Sink", () => {
it.effect("mapEffect - happy path", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(pipe(Sink.succeed(1), Sink.mapEffect((n) => Effect.succeed(n + 1))))
)
assert.strictEqual(result, 2)
Expand All @@ -173,7 +173,7 @@ describe.concurrent("Sink", () => {
it.effect("mapEffect - error", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(pipe(Sink.succeed(1), Sink.mapEffect(() => Effect.fail("fail")))),
Effect.flip
)
Expand All @@ -183,7 +183,7 @@ describe.concurrent("Sink", () => {
it.effect("mapError", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(pipe(Sink.fail("fail"), Sink.mapError((s) => s + "!"))),
Effect.either
)
Expand Down
4 changes: 2 additions & 2 deletions test/Sink/traversing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe.concurrent("Sink", () => {
[1, 3, 7, 20],
Effect.forEach((n) =>
pipe(
Stream.range(1, 100),
Stream.range(1, 99),
Stream.rechunk(n),
Stream.run(sink),
Effect.map((option) => Equal.equals(option, Option.some(Option.some(10))))
Expand Down Expand Up @@ -79,7 +79,7 @@ describe.concurrent("Sink", () => {
it.effect("forEachWhile - handles leftovers", () =>
Effect.gen(function*($) {
const [result, value] = yield* $(
Stream.range(1, 5),
Stream.range(1, 4),
Stream.run(pipe(
Sink.forEachWhile((n: number) => Effect.succeed(n <= 3)),
Sink.collectLeftover
Expand Down
2 changes: 1 addition & 1 deletion test/Stream/aggregation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ describe.concurrent("Stream", () => {
Effect.gen(function*($) {
const queue = yield* $(Queue.unbounded<number>())
yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.tap((n) =>
pipe(
Effect.fail("Boom"),
Expand Down
8 changes: 4 additions & 4 deletions test/Stream/broadcasting.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ describe.concurrent("Stream", () => {
it.effect("broadcast - values", () =>
Effect.gen(function*($) {
const { result1, result2 } = yield* $(
Stream.range(0, 5),
Stream.range(0, 4),
Stream.broadcast(2, 12),
Effect.flatMap((streams) =>
Effect.all({
Expand All @@ -31,7 +31,7 @@ describe.concurrent("Stream", () => {
it.effect("broadcast - errors", () =>
Effect.gen(function*($) {
const { result1, result2 } = yield* $(
Stream.range(0, 1),
Stream.make(0),
Stream.concat(Stream.fail("boom")),
Stream.broadcast(2, 12),
Effect.flatMap((streams) =>
Expand All @@ -49,7 +49,7 @@ describe.concurrent("Stream", () => {
it.effect("broadcast - backpressure", () =>
Effect.gen(function*($) {
const { result1, result2 } = yield* $(
Stream.range(0, 5),
Stream.range(0, 4),
Stream.flatMap(Stream.succeed),
Stream.broadcast(2, 2),
Effect.flatMap((streams) =>
Expand Down Expand Up @@ -87,7 +87,7 @@ describe.concurrent("Stream", () => {
it.effect("broadcast - unsubscribe", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(0, 5),
Stream.range(0, 4),
Stream.broadcast(2, 2),
Effect.flatMap((streams) =>
pipe(
Expand Down
34 changes: 17 additions & 17 deletions test/Stream/buffering.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ describe.concurrent("Stream", () => {
Effect.gen(function*($) {
const error = Cause.RuntimeException("boom")
const result = yield* $(
Stream.range(0, 10),
Stream.range(0, 9),
Stream.concat(Stream.fail(error)),
Stream.buffer({ capacity: 2 }),
Stream.runCollect,
Expand All @@ -43,7 +43,7 @@ describe.concurrent("Stream", () => {
const ref = yield* $(Ref.make(Chunk.empty<number>()))
const latch = yield* $(Deferred.make<never, void>())
const stream = pipe(
Stream.range(1, 5),
Stream.range(1, 4),
Stream.tap((n) =>
pipe(
Ref.update(ref, Chunk.append(n)),
Expand Down Expand Up @@ -81,7 +81,7 @@ describe.concurrent("Stream", () => {
Effect.gen(function*($) {
const error = Cause.RuntimeException("boom")
const result = yield* $(
Stream.range(0, 10),
Stream.range(0, 9),
Stream.concat(Stream.fail(error)),
Stream.bufferChunks({ capacity: 2 }),
Stream.runCollect,
Expand All @@ -95,7 +95,7 @@ describe.concurrent("Stream", () => {
const ref = yield* $(Ref.make(Chunk.empty<number>()))
const latch = yield* $(Deferred.make<never, void>())
const stream = pipe(
Stream.range(1, 5),
Stream.range(1, 4),
Stream.tap((n) =>
pipe(
Ref.update(ref, Chunk.append(n)),
Expand All @@ -120,7 +120,7 @@ describe.concurrent("Stream", () => {
const result = yield* $(
Stream.range(1, 1_000),
Stream.concat(Stream.fail(error)),
Stream.concat(Stream.range(1_000, 2_000)),
Stream.concat(Stream.range(1_001, 2_000)),
Stream.bufferChunks({ capacity: 2, strategy: "dropping" }),
Stream.runCollect,
Effect.exit
Expand All @@ -142,7 +142,7 @@ describe.concurrent("Stream", () => {
Stream.fromEffect(Deferred.await(latch1)),
Stream.flatMap(() =>
pipe(
Stream.range(1, 17),
Stream.range(1, 16),
Stream.rechunk(1),
Stream.ensuring(Deferred.succeed<never, void>(latch2, void 0))
)
Expand All @@ -154,7 +154,7 @@ describe.concurrent("Stream", () => {
Stream.fromEffect(Deferred.await(latch3)),
Stream.flatMap(() =>
pipe(
Stream.range(17, 25),
Stream.range(17, 24),
Stream.rechunk(1),
Stream.ensuring(Deferred.succeed<never, void>(latch4, void 0))
)
Expand Down Expand Up @@ -211,7 +211,7 @@ describe.concurrent("Stream", () => {
const result = yield* $(
Stream.range(1, 1_000),
Stream.concat(Stream.fail(error)),
Stream.concat(Stream.range(1_000, 2_000)),
Stream.concat(Stream.range(1_001, 2_000)),
Stream.bufferChunks({ capacity: 2, strategy: "sliding" }),
Stream.runCollect,
Effect.exit
Expand All @@ -234,7 +234,7 @@ describe.concurrent("Stream", () => {
Stream.fromEffect(Deferred.await(latch1)),
Stream.flatMap(() =>
pipe(
Stream.range(1, 17),
Stream.range(1, 16),
Stream.rechunk(1),
Stream.ensuring(Deferred.succeed<never, void>(latch2, void 0))
)
Expand All @@ -246,7 +246,7 @@ describe.concurrent("Stream", () => {
Stream.fromEffect(Deferred.await(latch3)),
Stream.flatMap(() =>
pipe(
Stream.range(17, 26),
Stream.range(17, 25),
Stream.rechunk(1),
Stream.ensuring(Deferred.succeed<never, void>(latch4, void 0))
)
Expand Down Expand Up @@ -336,7 +336,7 @@ describe.concurrent("Stream", () => {
Stream.fromEffect(Deferred.await(latch3)),
Stream.flatMap(() =>
pipe(
Stream.range(17, 25),
Stream.range(17, 24),
Stream.rechunk(1),
Stream.ensuring(Deferred.succeed<never, void>(latch4, void 0))
)
Expand Down Expand Up @@ -393,7 +393,7 @@ describe.concurrent("Stream", () => {
const result = yield* $(
Stream.range(1, 1_000),
Stream.concat(Stream.fail(error)),
Stream.concat(Stream.range(1_000, 2_000)),
Stream.concat(Stream.range(1_001, 2_000)),
Stream.buffer({ capacity: 2, strategy: "sliding" }),
Stream.runCollect,
Effect.exit
Expand All @@ -415,7 +415,7 @@ describe.concurrent("Stream", () => {
Stream.fromEffect(Deferred.await(latch1)),
Stream.flatMap(() =>
pipe(
Stream.range(1, 17),
Stream.range(1, 16),
Stream.rechunk(1),
Stream.ensuring(Deferred.succeed<never, void>(latch2, void 0))
)
Expand All @@ -427,7 +427,7 @@ describe.concurrent("Stream", () => {
Stream.fromEffect(Deferred.await(latch3)),
Stream.flatMap(() =>
pipe(
Stream.range(17, 25),
Stream.range(17, 24),
Stream.rechunk(1),
Stream.ensuring(Deferred.succeed<never, void>(latch4, void 0))
)
Expand Down Expand Up @@ -500,7 +500,7 @@ describe.concurrent("Stream", () => {
Effect.gen(function*($) {
const error = Cause.RuntimeException("boom")
const result = yield* $(
Stream.range(0, 10),
Stream.range(0, 9),
Stream.concat(Stream.fail(error)),
Stream.buffer({ capacity: "unbounded" }),
Stream.runCollect,
Expand All @@ -514,14 +514,14 @@ describe.concurrent("Stream", () => {
const ref = yield* $(Ref.make(Chunk.empty<number>()))
const latch = yield* $(Deferred.make<never, void>())
const stream = pipe(
Stream.range(1, 1_000),
Stream.range(1, 999),
Stream.tap((n) =>
pipe(
Ref.update(ref, Chunk.append(n)),
Effect.zipRight(pipe(Deferred.succeed<never, void>(latch, void 0), Effect.when(() => n === 999)))
)
),
Stream.rechunk(1_000),
Stream.rechunk(999),
Stream.buffer({ capacity: "unbounded" })
)
const result1 = yield* $(stream, Stream.take(2), Stream.runCollect)
Expand Down
4 changes: 2 additions & 2 deletions test/Stream/changing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { assert, describe } from "vitest"
describe.concurrent("Stream", () => {
it.effect("changes", () =>
Effect.gen(function*($) {
const stream = Stream.range(0, 20)
const stream = Stream.range(0, 19)
const result = yield* $(
stream,
Stream.changes,
Expand All @@ -24,7 +24,7 @@ describe.concurrent("Stream", () => {

it.effect("changesWithEffect", () =>
Effect.gen(function*($) {
const stream = Stream.range(0, 20)
const stream = Stream.range(0, 19)
const result = yield* $(
stream,
Stream.changesWithEffect((left, right) => Effect.succeed(left === right)),
Expand Down
Loading

0 comments on commit eab4bdb

Please sign in to comment.