From 081ac839dc6692ab6faafaabe4f6a3b76d493d65 Mon Sep 17 00:00:00 2001 From: Tim Date: Tue, 26 Sep 2023 13:57:52 +1300 Subject: [PATCH] consolidate GroupBy.evaluate & evaluateBuffer (#177) --- .changeset/warm-walls-accept.md | 5 +++++ docs/modules/GroupBy.ts.md | 25 ++----------------------- src/GroupBy.ts | 22 +++------------------- src/internal/groupBy.ts | 25 ++++--------------------- 4 files changed, 14 insertions(+), 63 deletions(-) create mode 100644 .changeset/warm-walls-accept.md diff --git a/.changeset/warm-walls-accept.md b/.changeset/warm-walls-accept.md new file mode 100644 index 0000000..b646ab1 --- /dev/null +++ b/.changeset/warm-walls-accept.md @@ -0,0 +1,5 @@ +--- +"@effect/stream": minor +--- + +consolidate GroupBy.evaluate & evaluateBuffer diff --git a/docs/modules/GroupBy.ts.md b/docs/modules/GroupBy.ts.md index b7a668f..61a59ab 100644 --- a/docs/modules/GroupBy.ts.md +++ b/docs/modules/GroupBy.ts.md @@ -16,7 +16,6 @@ Added in v1.0.0 - [make](#make) - [destructors](#destructors) - [evaluate](#evaluate) - - [evaluateBuffer](#evaluatebuffer) - [models](#models) - [GroupBy (interface)](#groupby-interface) - [symbols](#symbols) @@ -57,34 +56,14 @@ arbitrary order. ```ts export declare const evaluate: { - (f: (key: K, stream: Stream.Stream) => Stream.Stream): ( - self: GroupBy - ) => Stream.Stream - ( - self: GroupBy, - f: (key: K, stream: Stream.Stream) => Stream.Stream - ): Stream.Stream -} -``` - -Added in v1.0.0 - -## evaluateBuffer - -Like `evaluate`, but with a configurable `bufferSize` parameter. - -**Signature** - -```ts -export declare const evaluateBuffer: { ( f: (key: K, stream: Stream.Stream) => Stream.Stream, - bufferSize: number + options?: { readonly bufferSize?: number } ): (self: GroupBy) => Stream.Stream ( self: GroupBy, f: (key: K, stream: Stream.Stream) => Stream.Stream, - bufferSize: number + options?: { readonly bufferSize?: number } ): Stream.Stream } ``` diff --git a/src/GroupBy.ts b/src/GroupBy.ts index d71178d..58c1670 100644 --- a/src/GroupBy.ts +++ b/src/GroupBy.ts @@ -58,32 +58,16 @@ export declare namespace GroupBy { * @category destructors */ export const evaluate: { - ( - f: (key: K, stream: Stream.Stream) => Stream.Stream - ): (self: GroupBy) => Stream.Stream - ( - self: GroupBy, - f: (key: K, stream: Stream.Stream) => Stream.Stream - ): Stream.Stream -} = internal.evaluate - -/** - * Like `evaluate`, but with a configurable `bufferSize` parameter. - * - * @since 1.0.0 - * @category destructors - */ -export const evaluateBuffer: { ( f: (key: K, stream: Stream.Stream) => Stream.Stream, - bufferSize: number + options?: { readonly bufferSize?: number } ): (self: GroupBy) => Stream.Stream ( self: GroupBy, f: (key: K, stream: Stream.Stream) => Stream.Stream, - bufferSize: number + options?: { readonly bufferSize?: number } ): Stream.Stream -} = internal.evaluateBuffer +} = internal.evaluate /** * Filter the groups to be processed. diff --git a/src/internal/groupBy.ts b/src/internal/groupBy.ts index 016bba4..390fec8 100644 --- a/src/internal/groupBy.ts +++ b/src/internal/groupBy.ts @@ -37,43 +37,26 @@ const groupByVariance = { /** @internal */ export const evaluate = dual< - ( - f: (key: K, stream: Stream.Stream) => Stream.Stream - ) => (self: GroupBy.GroupBy) => Stream.Stream, - ( - self: GroupBy.GroupBy, - f: (key: K, stream: Stream.Stream) => Stream.Stream - ) => Stream.Stream ->( - 2, - ( - self: GroupBy.GroupBy, - f: (key: K, stream: Stream.Stream) => Stream.Stream - ): Stream.Stream => evaluateBuffer(self, f, 16) -) - -/** @internal */ -export const evaluateBuffer = dual< ( f: (key: K, stream: Stream.Stream) => Stream.Stream, - bufferSize: number + options?: { readonly bufferSize?: number } ) => (self: GroupBy.GroupBy) => Stream.Stream, ( self: GroupBy.GroupBy, f: (key: K, stream: Stream.Stream) => Stream.Stream, - bufferSize: number + options?: { readonly bufferSize?: number } ) => Stream.Stream >( 3, ( self: GroupBy.GroupBy, f: (key: K, stream: Stream.Stream) => Stream.Stream, - bufferSize: number + options?: { readonly bufferSize?: number } ): Stream.Stream => stream.flatMap( self.grouped, ([key, queue]) => f(key, stream.flattenTake(stream.fromQueue(queue, { shutdown: true }))), - { concurrency: "unbounded", bufferSize } + { concurrency: "unbounded", bufferSize: options?.bufferSize ?? 16 } ) )