Skip to content

Commit

Permalink
Renamed Zip to Combine
Browse files Browse the repository at this point in the history
  • Loading branch information
Joseph Fourny committed Dec 17, 2023
1 parent a1da688 commit 3e91a11
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 38 deletions.
60 changes: 30 additions & 30 deletions pkg/stream/mux.go → pkg/stream/combine.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,49 @@
package stream

import (
"github.com/jpfourny/papaya/pkg/mapper"
"github.com/jpfourny/papaya/pkg/optional"
"github.com/jpfourny/papaya/pkg/pair"
"github.com/jpfourny/papaya/pkg/pred"
)

// Combiner represents a function that combines two elements of type E1 and E2 into an element of type F.
// It is used in the Multiplex operation.
// It is used in the Combine operation.
type Combiner[E1, E2, F any] func(E1, E2) F

// Multiplex combines the elements of two streams into a single stream using the given Combiner function.
// Combine combines the elements of two streams into a single stream using the given Combiner function.
// The resulting stream will have the same number of elements as the shorter of the two input streams.
//
// Example usage:
//
// s := stream.Multiplex(stream.Of(1, 2, 3), stream.Of("foo", "bar"), func(i int, s string) string {
// s := stream.Combine(stream.Of(1, 2, 3), stream.Of("foo", "bar"), func(i int, s string) string {
// return fmt.Sprintf("%s%d", s, i)
// })
// out := stream.DebugString(s) // "<foo1, bar2>"
func Multiplex[E1, E2, F any](s1 Stream[E1], s2 Stream[E2], combine Combiner[E1, E2, F]) Stream[F] {
return MultiplexOrDiscard(s1, s2, func(e1 E1, e2 E2) optional.Optional[F] {
func Combine[E1, E2, F any](s1 Stream[E1], s2 Stream[E2], combine Combiner[E1, E2, F]) Stream[F] {
return CombineOrDiscard(s1, s2, func(e1 E1, e2 E2) optional.Optional[F] {
return optional.Of(combine(e1, e2))
})
}

// OptionalCombiner represents a function that combines two elements of type E1 and E2 into an optional element of type F.
// If the elements cannot be combined, the function must return an empty optional.
// It is used in the MultiplexOrDiscard operation.
// It is used in the CombineOrDiscard operation.
type OptionalCombiner[E1, E2, F any] func(E1, E2) optional.Optional[F]

// MultiplexOrDiscard combines the elements of two streams into a single stream using the given OptionalCombiner function or discards them, if the combiner returns an empty optional.
// The resulting stream will have, at most, the same number of elements as the shorter of the two input streams.
// CombineOrDiscard combines the elements of two streams into a single stream using the given OptionalCombiner function or discards them, if the combiner returns an empty optional.
// The resulting stream will have at most the same number of elements as the shorter of the two input streams.
//
// Example usage:
//
// s := stream.MultiplexOrDiscard(stream.Of(1, 2, 3), stream.Of("foo", "bar"), func(i int, s string) optional.Optional[string] {
// s := stream.CombineOrDiscard(stream.Of(1, 2, 3), stream.Of("foo", "bar"), func(i int, s string) optional.Optional[string] {
// if i == 2 {
// return optional.Empty[string]()
// }
// return optional.Of(fmt.Sprintf("%s%d", s, i))
// })
// out := stream.DebugString(s) // "<foo1>"
func MultiplexOrDiscard[E1, E2, F any](s1 Stream[E1], s2 Stream[E2], combine OptionalCombiner[E1, E2, F]) Stream[F] {
func CombineOrDiscard[E1, E2, F any](s1 Stream[E1], s2 Stream[E2], combine OptionalCombiner[E1, E2, F]) Stream[F] {
return func(yield Consumer[F]) bool {
done := make(chan struct{})
defer close(done)
Expand Down Expand Up @@ -96,7 +98,7 @@ func MultiplexOrDiscard[E1, E2, F any](s1 Stream[E1], s2 Stream[E2], combine Opt
// s := stream.Zip(stream.Of(1, 2, 3), stream.Of("foo", "bar"))
// out := stream.DebugString(s) // "<(1, foo), (2, bar)>"
func Zip[E, F any](s1 Stream[E], s2 Stream[F]) Stream[pair.Pair[E, F]] {
return Multiplex(s1, s2, func(e E, f F) pair.Pair[E, F] {
return Combine(s1, s2, func(e E, f F) pair.Pair[E, F] {
return pair.Of(e, f)
})
}
Expand All @@ -109,13 +111,13 @@ func Zip[E, F any](s1 Stream[E], s2 Stream[F]) Stream[pair.Pair[E, F]] {
// s := stream.ZipWithIndexInt(stream.Of("foo", "bar"), 1)
// out := stream.DebugString(s) // "<(foo, 1), (bar, 2)>"
func ZipWithIndexInt[E any](s Stream[E], offset int) Stream[pair.Pair[E, int]] {
return func(yield Consumer[pair.Pair[E, int]]) bool {
i := offset - 1
return s(func(e E) bool {
i++
return yield(pair.Of(e, i))
})
}
return Combine(
s,
Range(offset, pred.True[int](), mapper.Increment(1)),
func(e E, i int) pair.Pair[E, int] {
return pair.Of(e, i)
},
)
}

// ZipWithIndexInt64 returns a stream that pairs each element in the input stream with its index, starting at the given offset.
Expand All @@ -126,13 +128,13 @@ func ZipWithIndexInt[E any](s Stream[E], offset int) Stream[pair.Pair[E, int]] {
// s := stream.ZipWithIndexInt64(stream.Of("foo", "bar"), 1)
// out := stream.DebugString(s) // "<(foo, 1), (bar, 2)>"
func ZipWithIndexInt64[E any](s Stream[E], offset int64) Stream[pair.Pair[E, int64]] {
return func(yield Consumer[pair.Pair[E, int64]]) bool {
i := offset - 1
return s(func(e E) bool {
i++
return yield(pair.Of(e, i))
})
}
return Combine(
s,
Range(offset, pred.True[int64](), mapper.Increment(int64(1))),
func(e E, i int64) pair.Pair[E, int64] {
return pair.Of(e, i)
},
)
}

// ZipWithKey returns a stream that pairs each element in the input stream with the key extracted from the element using the given key extractor.
Expand All @@ -145,9 +147,7 @@ func ZipWithIndexInt64[E any](s Stream[E], offset int64) Stream[pair.Pair[E, int
// })
// out := stream.DebugString(s) // "<(FOO, foo), (BAR, bar)>"
func ZipWithKey[E any, K any](s Stream[E], ke KeyExtractor[E, K]) Stream[pair.Pair[K, E]] {
return func(yield Consumer[pair.Pair[K, E]]) bool {
return s(func(e E) bool {
return yield(pair.Of(ke(e), e))
})
}
return Map(s, func(e E) pair.Pair[K, E] {
return pair.Of(ke(e), e)
})
}
16 changes: 8 additions & 8 deletions pkg/stream/mux_test.go → pkg/stream/combine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"github.com/jpfourny/papaya/pkg/pair"
)

func TestMultiplex(t *testing.T) {
func TestCombine(t *testing.T) {
t.Run("empty", func(t *testing.T) {
s := Multiplex(Empty[int](), Empty[string](), func(i int, s string) string {
s := Combine(Empty[int](), Empty[string](), func(i int, s string) string {
return fmt.Sprintf("%s%d", s, i)
})
got := CollectSlice(s)
Expand All @@ -20,7 +20,7 @@ func TestMultiplex(t *testing.T) {
})

t.Run("non-empty", func(t *testing.T) {
s := Multiplex(Of(1, 2, 3), Of("foo", "bar"), func(i int, s string) string {
s := Combine(Of(1, 2, 3), Of("foo", "bar"), func(i int, s string) string {
return fmt.Sprintf("%s%d", s, i)
})
got := CollectSlice(s)
Expand All @@ -29,7 +29,7 @@ func TestMultiplex(t *testing.T) {
})

t.Run("limited", func(t *testing.T) {
s := Multiplex(Of(1, 2, 3), Of("foo", "bar"), func(i int, s string) string {
s := Combine(Of(1, 2, 3), Of("foo", "bar"), func(i int, s string) string {
return fmt.Sprintf("%s%d", s, i)
})
got := CollectSlice(Limit(s, 1)) // Stops stream after 1 element.
Expand All @@ -38,9 +38,9 @@ func TestMultiplex(t *testing.T) {
})
}

func TestMultiplexOrDiscard(t *testing.T) {
func TestCombineOrDiscard(t *testing.T) {
t.Run("empty", func(t *testing.T) {
s := MultiplexOrDiscard(Empty[int](), Empty[string](), func(i int, s string) optional.Optional[string] {
s := CombineOrDiscard(Empty[int](), Empty[string](), func(i int, s string) optional.Optional[string] {
return optional.Of(fmt.Sprintf("%s%d", s, i))
})
got := CollectSlice(s)
Expand All @@ -49,7 +49,7 @@ func TestMultiplexOrDiscard(t *testing.T) {
})

t.Run("non-empty", func(t *testing.T) {
s := MultiplexOrDiscard(Of(1, 2, 3), Of("foo", "bar"), func(i int, s string) optional.Optional[string] {
s := CombineOrDiscard(Of(1, 2, 3), Of("foo", "bar"), func(i int, s string) optional.Optional[string] {
if i == 2 {
return optional.Empty[string]()
}
Expand All @@ -61,7 +61,7 @@ func TestMultiplexOrDiscard(t *testing.T) {
})

t.Run("limited", func(t *testing.T) {
s := MultiplexOrDiscard(Of(1, 2, 3), Of("foo", "bar"), func(i int, s string) optional.Optional[string] {
s := CombineOrDiscard(Of(1, 2, 3), Of("foo", "bar"), func(i int, s string) optional.Optional[string] {
if i == 2 {
return optional.Empty[string]()
}
Expand Down

0 comments on commit 3e91a11

Please sign in to comment.