From 3e91a114f8a761ae66813f3071c9eb5a3378b7e3 Mon Sep 17 00:00:00 2001 From: Joseph Fourny Date: Sat, 16 Dec 2023 23:36:20 -0500 Subject: [PATCH] Renamed Zip to Combine --- pkg/stream/{mux.go => combine.go} | 60 ++++++++++----------- pkg/stream/{mux_test.go => combine_test.go} | 16 +++--- 2 files changed, 38 insertions(+), 38 deletions(-) rename pkg/stream/{mux.go => combine.go} (69%) rename pkg/stream/{mux_test.go => combine_test.go} (89%) diff --git a/pkg/stream/mux.go b/pkg/stream/combine.go similarity index 69% rename from pkg/stream/mux.go rename to pkg/stream/combine.go index a202075..febb804 100644 --- a/pkg/stream/mux.go +++ b/pkg/stream/combine.go @@ -1,47 +1,49 @@ package stream import ( + "github.com/jpfourny/papaya/pkg/mapper" "github.com/jpfourny/papaya/pkg/optional" "github.com/jpfourny/papaya/pkg/pair" + "github.com/jpfourny/papaya/pkg/pred" ) // Combiner represents a function that combines two elements of type E1 and E2 into an element of type F. -// It is used in the Multiplex operation. +// It is used in the Combine operation. type Combiner[E1, E2, F any] func(E1, E2) F -// Multiplex combines the elements of two streams into a single stream using the given Combiner function. +// Combine combines the elements of two streams into a single stream using the given Combiner function. // The resulting stream will have the same number of elements as the shorter of the two input streams. // // Example usage: // -// s := stream.Multiplex(stream.Of(1, 2, 3), stream.Of("foo", "bar"), func(i int, s string) string { +// s := stream.Combine(stream.Of(1, 2, 3), stream.Of("foo", "bar"), func(i int, s string) string { // return fmt.Sprintf("%s%d", s, i) // }) // out := stream.DebugString(s) // "" -func Multiplex[E1, E2, F any](s1 Stream[E1], s2 Stream[E2], combine Combiner[E1, E2, F]) Stream[F] { - return MultiplexOrDiscard(s1, s2, func(e1 E1, e2 E2) optional.Optional[F] { +func Combine[E1, E2, F any](s1 Stream[E1], s2 Stream[E2], combine Combiner[E1, E2, F]) Stream[F] { + return CombineOrDiscard(s1, s2, func(e1 E1, e2 E2) optional.Optional[F] { return optional.Of(combine(e1, e2)) }) } // OptionalCombiner represents a function that combines two elements of type E1 and E2 into an optional element of type F. // If the elements cannot be combined, the function must return an empty optional. -// It is used in the MultiplexOrDiscard operation. +// It is used in the CombineOrDiscard operation. type OptionalCombiner[E1, E2, F any] func(E1, E2) optional.Optional[F] -// MultiplexOrDiscard combines the elements of two streams into a single stream using the given OptionalCombiner function or discards them, if the combiner returns an empty optional. -// The resulting stream will have, at most, the same number of elements as the shorter of the two input streams. +// CombineOrDiscard combines the elements of two streams into a single stream using the given OptionalCombiner function or discards them, if the combiner returns an empty optional. +// The resulting stream will have at most the same number of elements as the shorter of the two input streams. // // Example usage: // -// s := stream.MultiplexOrDiscard(stream.Of(1, 2, 3), stream.Of("foo", "bar"), func(i int, s string) optional.Optional[string] { +// s := stream.CombineOrDiscard(stream.Of(1, 2, 3), stream.Of("foo", "bar"), func(i int, s string) optional.Optional[string] { // if i == 2 { // return optional.Empty[string]() // } // return optional.Of(fmt.Sprintf("%s%d", s, i)) // }) // out := stream.DebugString(s) // "" -func MultiplexOrDiscard[E1, E2, F any](s1 Stream[E1], s2 Stream[E2], combine OptionalCombiner[E1, E2, F]) Stream[F] { +func CombineOrDiscard[E1, E2, F any](s1 Stream[E1], s2 Stream[E2], combine OptionalCombiner[E1, E2, F]) Stream[F] { return func(yield Consumer[F]) bool { done := make(chan struct{}) defer close(done) @@ -96,7 +98,7 @@ func MultiplexOrDiscard[E1, E2, F any](s1 Stream[E1], s2 Stream[E2], combine Opt // s := stream.Zip(stream.Of(1, 2, 3), stream.Of("foo", "bar")) // out := stream.DebugString(s) // "<(1, foo), (2, bar)>" func Zip[E, F any](s1 Stream[E], s2 Stream[F]) Stream[pair.Pair[E, F]] { - return Multiplex(s1, s2, func(e E, f F) pair.Pair[E, F] { + return Combine(s1, s2, func(e E, f F) pair.Pair[E, F] { return pair.Of(e, f) }) } @@ -109,13 +111,13 @@ func Zip[E, F any](s1 Stream[E], s2 Stream[F]) Stream[pair.Pair[E, F]] { // s := stream.ZipWithIndexInt(stream.Of("foo", "bar"), 1) // out := stream.DebugString(s) // "<(foo, 1), (bar, 2)>" func ZipWithIndexInt[E any](s Stream[E], offset int) Stream[pair.Pair[E, int]] { - return func(yield Consumer[pair.Pair[E, int]]) bool { - i := offset - 1 - return s(func(e E) bool { - i++ - return yield(pair.Of(e, i)) - }) - } + return Combine( + s, + Range(offset, pred.True[int](), mapper.Increment(1)), + func(e E, i int) pair.Pair[E, int] { + return pair.Of(e, i) + }, + ) } // ZipWithIndexInt64 returns a stream that pairs each element in the input stream with its index, starting at the given offset. @@ -126,13 +128,13 @@ func ZipWithIndexInt[E any](s Stream[E], offset int) Stream[pair.Pair[E, int]] { // s := stream.ZipWithIndexInt64(stream.Of("foo", "bar"), 1) // out := stream.DebugString(s) // "<(foo, 1), (bar, 2)>" func ZipWithIndexInt64[E any](s Stream[E], offset int64) Stream[pair.Pair[E, int64]] { - return func(yield Consumer[pair.Pair[E, int64]]) bool { - i := offset - 1 - return s(func(e E) bool { - i++ - return yield(pair.Of(e, i)) - }) - } + return Combine( + s, + Range(offset, pred.True[int64](), mapper.Increment(int64(1))), + func(e E, i int64) pair.Pair[E, int64] { + return pair.Of(e, i) + }, + ) } // ZipWithKey returns a stream that pairs each element in the input stream with the key extracted from the element using the given key extractor. @@ -145,9 +147,7 @@ func ZipWithIndexInt64[E any](s Stream[E], offset int64) Stream[pair.Pair[E, int // }) // out := stream.DebugString(s) // "<(FOO, foo), (BAR, bar)>" func ZipWithKey[E any, K any](s Stream[E], ke KeyExtractor[E, K]) Stream[pair.Pair[K, E]] { - return func(yield Consumer[pair.Pair[K, E]]) bool { - return s(func(e E) bool { - return yield(pair.Of(ke(e), e)) - }) - } + return Map(s, func(e E) pair.Pair[K, E] { + return pair.Of(ke(e), e) + }) } diff --git a/pkg/stream/mux_test.go b/pkg/stream/combine_test.go similarity index 89% rename from pkg/stream/mux_test.go rename to pkg/stream/combine_test.go index 0d79729..fd0258e 100644 --- a/pkg/stream/mux_test.go +++ b/pkg/stream/combine_test.go @@ -9,9 +9,9 @@ import ( "github.com/jpfourny/papaya/pkg/pair" ) -func TestMultiplex(t *testing.T) { +func TestCombine(t *testing.T) { t.Run("empty", func(t *testing.T) { - s := Multiplex(Empty[int](), Empty[string](), func(i int, s string) string { + s := Combine(Empty[int](), Empty[string](), func(i int, s string) string { return fmt.Sprintf("%s%d", s, i) }) got := CollectSlice(s) @@ -20,7 +20,7 @@ func TestMultiplex(t *testing.T) { }) t.Run("non-empty", func(t *testing.T) { - s := Multiplex(Of(1, 2, 3), Of("foo", "bar"), func(i int, s string) string { + s := Combine(Of(1, 2, 3), Of("foo", "bar"), func(i int, s string) string { return fmt.Sprintf("%s%d", s, i) }) got := CollectSlice(s) @@ -29,7 +29,7 @@ func TestMultiplex(t *testing.T) { }) t.Run("limited", func(t *testing.T) { - s := Multiplex(Of(1, 2, 3), Of("foo", "bar"), func(i int, s string) string { + s := Combine(Of(1, 2, 3), Of("foo", "bar"), func(i int, s string) string { return fmt.Sprintf("%s%d", s, i) }) got := CollectSlice(Limit(s, 1)) // Stops stream after 1 element. @@ -38,9 +38,9 @@ func TestMultiplex(t *testing.T) { }) } -func TestMultiplexOrDiscard(t *testing.T) { +func TestCombineOrDiscard(t *testing.T) { t.Run("empty", func(t *testing.T) { - s := MultiplexOrDiscard(Empty[int](), Empty[string](), func(i int, s string) optional.Optional[string] { + s := CombineOrDiscard(Empty[int](), Empty[string](), func(i int, s string) optional.Optional[string] { return optional.Of(fmt.Sprintf("%s%d", s, i)) }) got := CollectSlice(s) @@ -49,7 +49,7 @@ func TestMultiplexOrDiscard(t *testing.T) { }) t.Run("non-empty", func(t *testing.T) { - s := MultiplexOrDiscard(Of(1, 2, 3), Of("foo", "bar"), func(i int, s string) optional.Optional[string] { + s := CombineOrDiscard(Of(1, 2, 3), Of("foo", "bar"), func(i int, s string) optional.Optional[string] { if i == 2 { return optional.Empty[string]() } @@ -61,7 +61,7 @@ func TestMultiplexOrDiscard(t *testing.T) { }) t.Run("limited", func(t *testing.T) { - s := MultiplexOrDiscard(Of(1, 2, 3), Of("foo", "bar"), func(i int, s string) optional.Optional[string] { + s := CombineOrDiscard(Of(1, 2, 3), Of("foo", "bar"), func(i int, s string) optional.Optional[string] { if i == 2 { return optional.Empty[string]() }