Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

Commit

Permalink
consolidate GroupBy.evaluate & evaluateBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Sep 14, 2023
1 parent a6973e4 commit 23a57ec
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 40 deletions.
5 changes: 5 additions & 0 deletions .changeset/warm-walls-accept.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/stream": minor
---

consolidate GroupBy.evaluate & evaluateBuffer
22 changes: 3 additions & 19 deletions src/GroupBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,16 @@ export declare namespace GroupBy {
* @category destructors
*/
export const evaluate: {
<K, E, V, R2, E2, A>(
f: (key: K, stream: Stream.Stream<never, E, V>) => Stream.Stream<R2, E2, A>
): <R>(self: GroupBy<R, E, K, V>) => Stream.Stream<R2 | R, E | E2, A>
<R, K, E, V, R2, E2, A>(
self: GroupBy<R, E, K, V>,
f: (key: K, stream: Stream.Stream<never, E, V>) => Stream.Stream<R2, E2, A>
): Stream.Stream<R | R2, E | E2, A>
} = internal.evaluate

/**
* Like `evaluate`, but with a configurable `bufferSize` parameter.
*
* @since 1.0.0
* @category destructors
*/
export const evaluateBuffer: {
<K, E, V, R2, E2, A>(
f: (key: K, stream: Stream.Stream<never, E, V>) => Stream.Stream<R2, E2, A>,
bufferSize: number
options?: { readonly bufferSize?: number }
): <R>(self: GroupBy<R, E, K, V>) => Stream.Stream<R2 | R, E | E2, A>
<R, K, E, V, R2, E2, A>(
self: GroupBy<R, E, K, V>,
f: (key: K, stream: Stream.Stream<never, E, V>) => Stream.Stream<R2, E2, A>,
bufferSize: number
options?: { readonly bufferSize?: number }
): Stream.Stream<R | R2, E | E2, A>
} = internal.evaluateBuffer
} = internal.evaluate

/**
* Filter the groups to be processed.
Expand Down
25 changes: 4 additions & 21 deletions src/internal/groupBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,43 +37,26 @@ const groupByVariance = {

/** @internal */
export const evaluate = dual<
<K, E, V, R2, E2, A>(
f: (key: K, stream: Stream.Stream<never, E, V>) => Stream.Stream<R2, E2, A>
) => <R>(self: GroupBy.GroupBy<R, E, K, V>) => Stream.Stream<R2 | R, E | E2, A>,
<R, K, E, V, R2, E2, A>(
self: GroupBy.GroupBy<R, E, K, V>,
f: (key: K, stream: Stream.Stream<never, E, V>) => Stream.Stream<R2, E2, A>
) => Stream.Stream<R2 | R, E | E2, A>
>(
2,
<R, K, E, V, R2, E2, A>(
self: GroupBy.GroupBy<R, E, K, V>,
f: (key: K, stream: Stream.Stream<never, E, V>) => Stream.Stream<R2, E2, A>
): Stream.Stream<R | R2, E | E2, A> => evaluateBuffer(self, f, 16)
)

/** @internal */
export const evaluateBuffer = dual<
<K, E, V, R2, E2, A>(
f: (key: K, stream: Stream.Stream<never, E, V>) => Stream.Stream<R2, E2, A>,
bufferSize: number
options?: { readonly bufferSize?: number }
) => <R>(self: GroupBy.GroupBy<R, E, K, V>) => Stream.Stream<R2 | R, E | E2, A>,
<R, K, E, V, R2, E2, A>(
self: GroupBy.GroupBy<R, E, K, V>,
f: (key: K, stream: Stream.Stream<never, E, V>) => Stream.Stream<R2, E2, A>,
bufferSize: number
options?: { readonly bufferSize?: number }
) => Stream.Stream<R2 | R, E | E2, A>
>(
3,
<R, K, E, V, R2, E2, A>(
self: GroupBy.GroupBy<R, E, K, V>,
f: (key: K, stream: Stream.Stream<never, E, V>) => Stream.Stream<R2, E2, A>,
bufferSize: number
options?: { readonly bufferSize?: number }
): Stream.Stream<R | R2, E | E2, A> =>
stream.flatMap(
self.grouped,
([key, queue]) => f(key, stream.flattenTake(stream.fromQueue(queue, { shutdown: true }))),
{ concurrency: "unbounded", bufferSize }
{ concurrency: "unbounded", bufferSize: options?.bufferSize ?? 16 }
)
)

Expand Down

0 comments on commit 23a57ec

Please sign in to comment.