Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

Stream: range now includes both endpoints #181

Merged
merged 1 commit into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/cyan-geese-unite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/stream": minor
---

Stream: range now includes both endpoints
3 changes: 1 addition & 2 deletions docs/modules/Stream.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -966,8 +966,7 @@ Added in v1.0.0

## range

Constructs a stream from a range of integers (lower bound included, upper
bound not included).
Constructs a stream from a range of integers, including both endpoints.

**Signature**

Expand Down
3 changes: 1 addition & 2 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2636,8 +2636,7 @@ export const provideSomeLayer: {
} = internal.provideSomeLayer

/**
* Constructs a stream from a range of integers (lower bound included, upper
* bound not included).
* Constructs a stream from a range of integers, including both endpoints.
*
* @since 1.0.0
* @category constructors
Expand Down
4 changes: 2 additions & 2 deletions src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4644,15 +4644,15 @@ export const provideSomeLayer = dual<
/** @internal */
export const range = (min: number, max: number, chunkSize = DefaultChunkSize): Stream.Stream<never, never, number> =>
suspend(() => {
if (min >= max) {
if (min > max) {
return empty as Stream.Stream<never, never, number>
}
const go = (
min: number,
max: number,
chunkSize: number
): Channel.Channel<never, unknown, unknown, unknown, never, Chunk.Chunk<number>, unknown> => {
const remaining = max - min
const remaining = max - min + 1
if (remaining > chunkSize) {
return pipe(
core.write(Chunk.range(min, min + chunkSize - 1)),
Expand Down
2 changes: 1 addition & 1 deletion test/Sink/collecting.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ describe.concurrent("Sink", () => {
it.effect("collectAllToMap", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(0, 10),
Stream.range(0, 9),
Stream.run(Sink.collectAllToMap(
(n) => n % 3,
(x, y) => x + y
Expand Down
6 changes: 3 additions & 3 deletions test/Sink/filtering.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ describe.concurrent("Sink", () => {
it.effect("filterInput", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(pipe(Sink.collectAll<number>(), Sink.filterInput((n) => n % 2 === 0)))
)
assert.deepStrictEqual(Array.from(result), [2, 4, 6, 8])
Expand All @@ -18,7 +18,7 @@ describe.concurrent("Sink", () => {
it.effect("filterInputEffect - happy path", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(pipe(
Sink.collectAll<number>(),
Sink.filterInputEffect((n) => Effect.succeed(n % 2 === 0))
Expand All @@ -30,7 +30,7 @@ describe.concurrent("Sink", () => {
it.effect("filterInputEffect - error", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(pipe(
Sink.collectAll<number>(),
Sink.filterInputEffect(() => Effect.fail("fail"))
Expand Down
8 changes: 4 additions & 4 deletions test/Sink/folding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe.concurrent("Sink", () => {
it.effect("fold - termination in the middle", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(Sink.fold<number, number>(0, (n) => n <= 5, (x, y) => x + y))
)
assert.strictEqual(result, 6)
Expand All @@ -31,7 +31,7 @@ describe.concurrent("Sink", () => {
it.effect("fold - immediate termination", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(Sink.fold<number, number>(0, (n) => n <= -1, (x, y) => x + y))
)
assert.strictEqual(result, 0)
Expand All @@ -40,15 +40,15 @@ describe.concurrent("Sink", () => {
it.effect("fold - no termination", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(Sink.fold<number, number>(0, (n) => n <= 500, (x, y) => x + y))
)
assert.strictEqual(result, 45)
}))

it.effect("foldLeft equivalence with Chunk.reduce", () =>
Effect.gen(function*($) {
const stream = Stream.range(1, 10)
const stream = Stream.range(1, 9)
const result1 = yield* $(stream, Stream.run(Sink.foldLeft("", (s, n) => s + `${n}`)))
const result2 = yield* $(stream, Stream.runCollect, Effect.map(Chunk.reduce("", (s, n) => s + `${n}`)))
assert.strictEqual(result1, result2)
Expand Down
10 changes: 5 additions & 5 deletions test/Sink/mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ describe.concurrent("Sink", () => {
it.effect("as", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(pipe(Sink.succeed(1), Sink.as("as")))
)
assert.strictEqual(result, "as")
Expand Down Expand Up @@ -155,7 +155,7 @@ describe.concurrent("Sink", () => {
it.effect("map", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(pipe(Sink.succeed(1), Sink.map((n) => `${n}`)))
)
assert.strictEqual(result, "1")
Expand All @@ -164,7 +164,7 @@ describe.concurrent("Sink", () => {
it.effect("mapEffect - happy path", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(pipe(Sink.succeed(1), Sink.mapEffect((n) => Effect.succeed(n + 1))))
)
assert.strictEqual(result, 2)
Expand All @@ -173,7 +173,7 @@ describe.concurrent("Sink", () => {
it.effect("mapEffect - error", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(pipe(Sink.succeed(1), Sink.mapEffect(() => Effect.fail("fail")))),
Effect.flip
)
Expand All @@ -183,7 +183,7 @@ describe.concurrent("Sink", () => {
it.effect("mapError", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.run(pipe(Sink.fail("fail"), Sink.mapError((s) => s + "!"))),
Effect.either
)
Expand Down
4 changes: 2 additions & 2 deletions test/Sink/traversing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe.concurrent("Sink", () => {
[1, 3, 7, 20],
Effect.forEach((n) =>
pipe(
Stream.range(1, 100),
Stream.range(1, 99),
Stream.rechunk(n),
Stream.run(sink),
Effect.map((option) => Equal.equals(option, Option.some(Option.some(10))))
Expand Down Expand Up @@ -79,7 +79,7 @@ describe.concurrent("Sink", () => {
it.effect("forEachWhile - handles leftovers", () =>
Effect.gen(function*($) {
const [result, value] = yield* $(
Stream.range(1, 5),
Stream.range(1, 4),
Stream.run(pipe(
Sink.forEachWhile((n: number) => Effect.succeed(n <= 3)),
Sink.collectLeftover
Expand Down
2 changes: 1 addition & 1 deletion test/Stream/aggregation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ describe.concurrent("Stream", () => {
Effect.gen(function*($) {
const queue = yield* $(Queue.unbounded<number>())
yield* $(
Stream.range(1, 10),
Stream.range(1, 9),
Stream.tap((n) =>
pipe(
Effect.fail("Boom"),
Expand Down
8 changes: 4 additions & 4 deletions test/Stream/broadcasting.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ describe.concurrent("Stream", () => {
it.effect("broadcast - values", () =>
Effect.gen(function*($) {
const { result1, result2 } = yield* $(
Stream.range(0, 5),
Stream.range(0, 4),
Stream.broadcast(2, 12),
Effect.flatMap((streams) =>
Effect.all({
Expand All @@ -31,7 +31,7 @@ describe.concurrent("Stream", () => {
it.effect("broadcast - errors", () =>
Effect.gen(function*($) {
const { result1, result2 } = yield* $(
Stream.range(0, 1),
Stream.make(0),
Stream.concat(Stream.fail("boom")),
Stream.broadcast(2, 12),
Effect.flatMap((streams) =>
Expand All @@ -49,7 +49,7 @@ describe.concurrent("Stream", () => {
it.effect("broadcast - backpressure", () =>
Effect.gen(function*($) {
const { result1, result2 } = yield* $(
Stream.range(0, 5),
Stream.range(0, 4),
Stream.flatMap(Stream.succeed),
Stream.broadcast(2, 2),
Effect.flatMap((streams) =>
Expand Down Expand Up @@ -87,7 +87,7 @@ describe.concurrent("Stream", () => {
it.effect("broadcast - unsubscribe", () =>
Effect.gen(function*($) {
const result = yield* $(
Stream.range(0, 5),
Stream.range(0, 4),
Stream.broadcast(2, 2),
Effect.flatMap((streams) =>
pipe(
Expand Down
34 changes: 17 additions & 17 deletions test/Stream/buffering.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ describe.concurrent("Stream", () => {
Effect.gen(function*($) {
const error = Cause.RuntimeException("boom")
const result = yield* $(
Stream.range(0, 10),
Stream.range(0, 9),
Stream.concat(Stream.fail(error)),
Stream.buffer({ capacity: 2 }),
Stream.runCollect,
Expand All @@ -43,7 +43,7 @@ describe.concurrent("Stream", () => {
const ref = yield* $(Ref.make(Chunk.empty<number>()))
const latch = yield* $(Deferred.make<never, void>())
const stream = pipe(
Stream.range(1, 5),
Stream.range(1, 4),
Stream.tap((n) =>
pipe(
Ref.update(ref, Chunk.append(n)),
Expand Down Expand Up @@ -81,7 +81,7 @@ describe.concurrent("Stream", () => {
Effect.gen(function*($) {
const error = Cause.RuntimeException("boom")
const result = yield* $(
Stream.range(0, 10),
Stream.range(0, 9),
Stream.concat(Stream.fail(error)),
Stream.bufferChunks({ capacity: 2 }),
Stream.runCollect,
Expand All @@ -95,7 +95,7 @@ describe.concurrent("Stream", () => {
const ref = yield* $(Ref.make(Chunk.empty<number>()))
const latch = yield* $(Deferred.make<never, void>())
const stream = pipe(
Stream.range(1, 5),
Stream.range(1, 4),
Stream.tap((n) =>
pipe(
Ref.update(ref, Chunk.append(n)),
Expand All @@ -120,7 +120,7 @@ describe.concurrent("Stream", () => {
const result = yield* $(
Stream.range(1, 1_000),
Stream.concat(Stream.fail(error)),
Stream.concat(Stream.range(1_000, 2_000)),
Stream.concat(Stream.range(1_001, 2_000)),
Stream.bufferChunks({ capacity: 2, strategy: "dropping" }),
Stream.runCollect,
Effect.exit
Expand All @@ -142,7 +142,7 @@ describe.concurrent("Stream", () => {
Stream.fromEffect(Deferred.await(latch1)),
Stream.flatMap(() =>
pipe(
Stream.range(1, 17),
Stream.range(1, 16),
Stream.rechunk(1),
Stream.ensuring(Deferred.succeed<never, void>(latch2, void 0))
)
Expand All @@ -154,7 +154,7 @@ describe.concurrent("Stream", () => {
Stream.fromEffect(Deferred.await(latch3)),
Stream.flatMap(() =>
pipe(
Stream.range(17, 25),
Stream.range(17, 24),
Stream.rechunk(1),
Stream.ensuring(Deferred.succeed<never, void>(latch4, void 0))
)
Expand Down Expand Up @@ -211,7 +211,7 @@ describe.concurrent("Stream", () => {
const result = yield* $(
Stream.range(1, 1_000),
Stream.concat(Stream.fail(error)),
Stream.concat(Stream.range(1_000, 2_000)),
Stream.concat(Stream.range(1_001, 2_000)),
Stream.bufferChunks({ capacity: 2, strategy: "sliding" }),
Stream.runCollect,
Effect.exit
Expand All @@ -234,7 +234,7 @@ describe.concurrent("Stream", () => {
Stream.fromEffect(Deferred.await(latch1)),
Stream.flatMap(() =>
pipe(
Stream.range(1, 17),
Stream.range(1, 16),
Stream.rechunk(1),
Stream.ensuring(Deferred.succeed<never, void>(latch2, void 0))
)
Expand All @@ -246,7 +246,7 @@ describe.concurrent("Stream", () => {
Stream.fromEffect(Deferred.await(latch3)),
Stream.flatMap(() =>
pipe(
Stream.range(17, 26),
Stream.range(17, 25),
Stream.rechunk(1),
Stream.ensuring(Deferred.succeed<never, void>(latch4, void 0))
)
Expand Down Expand Up @@ -336,7 +336,7 @@ describe.concurrent("Stream", () => {
Stream.fromEffect(Deferred.await(latch3)),
Stream.flatMap(() =>
pipe(
Stream.range(17, 25),
Stream.range(17, 24),
Stream.rechunk(1),
Stream.ensuring(Deferred.succeed<never, void>(latch4, void 0))
)
Expand Down Expand Up @@ -393,7 +393,7 @@ describe.concurrent("Stream", () => {
const result = yield* $(
Stream.range(1, 1_000),
Stream.concat(Stream.fail(error)),
Stream.concat(Stream.range(1_000, 2_000)),
Stream.concat(Stream.range(1_001, 2_000)),
Stream.buffer({ capacity: 2, strategy: "sliding" }),
Stream.runCollect,
Effect.exit
Expand All @@ -415,7 +415,7 @@ describe.concurrent("Stream", () => {
Stream.fromEffect(Deferred.await(latch1)),
Stream.flatMap(() =>
pipe(
Stream.range(1, 17),
Stream.range(1, 16),
Stream.rechunk(1),
Stream.ensuring(Deferred.succeed<never, void>(latch2, void 0))
)
Expand All @@ -427,7 +427,7 @@ describe.concurrent("Stream", () => {
Stream.fromEffect(Deferred.await(latch3)),
Stream.flatMap(() =>
pipe(
Stream.range(17, 25),
Stream.range(17, 24),
Stream.rechunk(1),
Stream.ensuring(Deferred.succeed<never, void>(latch4, void 0))
)
Expand Down Expand Up @@ -500,7 +500,7 @@ describe.concurrent("Stream", () => {
Effect.gen(function*($) {
const error = Cause.RuntimeException("boom")
const result = yield* $(
Stream.range(0, 10),
Stream.range(0, 9),
Stream.concat(Stream.fail(error)),
Stream.buffer({ capacity: "unbounded" }),
Stream.runCollect,
Expand All @@ -514,14 +514,14 @@ describe.concurrent("Stream", () => {
const ref = yield* $(Ref.make(Chunk.empty<number>()))
const latch = yield* $(Deferred.make<never, void>())
const stream = pipe(
Stream.range(1, 1_000),
Stream.range(1, 999),
Stream.tap((n) =>
pipe(
Ref.update(ref, Chunk.append(n)),
Effect.zipRight(pipe(Deferred.succeed<never, void>(latch, void 0), Effect.when(() => n === 999)))
)
),
Stream.rechunk(1_000),
Stream.rechunk(999),
Stream.buffer({ capacity: "unbounded" })
)
const result1 = yield* $(stream, Stream.take(2), Stream.runCollect)
Expand Down
4 changes: 2 additions & 2 deletions test/Stream/changing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { assert, describe } from "vitest"
describe.concurrent("Stream", () => {
it.effect("changes", () =>
Effect.gen(function*($) {
const stream = Stream.range(0, 20)
const stream = Stream.range(0, 19)
const result = yield* $(
stream,
Stream.changes,
Expand All @@ -24,7 +24,7 @@ describe.concurrent("Stream", () => {

it.effect("changesWithEffect", () =>
Effect.gen(function*($) {
const stream = Stream.range(0, 20)
const stream = Stream.range(0, 19)
const result = yield* $(
stream,
Stream.changesWithEffect((left, right) => Effect.succeed(left === right)),
Expand Down
Loading
Loading