Skip to content

Commit

Permalink
Move mapper and pred packages under stream package; refactor stream.R…
Browse files Browse the repository at this point in the history
…educe
  • Loading branch information
jpfourny committed Jan 11, 2024
1 parent a73df8c commit e12d2a7
Show file tree
Hide file tree
Showing 35 changed files with 196 additions and 189 deletions.
12 changes: 6 additions & 6 deletions examples/stream/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package stream

import (
"fmt"
mapper2 "github.com/jpfourny/papaya/pkg/stream/mapper"
"testing"

"github.com/jpfourny/papaya/pkg/mapper"
"github.com/jpfourny/papaya/pkg/pair"
"github.com/jpfourny/papaya/pkg/stream"
)
Expand All @@ -13,7 +13,7 @@ func TestMapIntToString(t *testing.T) {
// Map stream of int to string.
s := stream.Map(
stream.Of(1, 2, 3),
mapper.Sprintf[int]("%d"),
mapper2.Sprintf[int]("%d"),
)
stream.ForEach(s, func(s string) {
fmt.Println(s)
Expand All @@ -28,7 +28,7 @@ func TestMapStringToInt(t *testing.T) {
// Map stream of string to int; default to 0 if parse fails.
s := stream.Map(
stream.Of("1", "2", "3", "foo"),
mapper.ParseIntOr[string, int](10, 64, -1),
mapper2.ParseIntOr[string, int](10, 64, -1),
)
stream.ForEach(s, func(i int) {
fmt.Println(i)
Expand All @@ -41,7 +41,7 @@ func TestMapStringToInt(t *testing.T) {

s = stream.MapOrDiscard(
stream.Of("1", "2", "3", "foo"),
mapper.TryParseInt[string, int](10, 64),
mapper2.TryParseInt[string, int](10, 64),
)
stream.ForEach(s, func(i int) {
fmt.Println(i)
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestMapNumberToBool(t *testing.T) {
// Given stream of numbers, map each number to whether it is even.
s := stream.Map(
stream.Of(0, 2, 0),
mapper.NumberToBool[int, bool](),
mapper2.NumberToBool[int, bool](),
)
stream.ForEach(s, func(b bool) {
fmt.Println(b)
Expand All @@ -122,7 +122,7 @@ func TestMapBoolNumber(t *testing.T) {
// Given stream of bools, map each bool to 0 if false and 1 if true.
s := stream.Map(
stream.Of(false, true, false),
mapper.BoolToNumber[bool, int](),
mapper2.BoolToNumber[bool, int](),
)
stream.ForEach(s, func(i int) {
fmt.Println(i)
Expand Down
219 changes: 87 additions & 132 deletions pkg/stream/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,14 @@ import (
"github.com/jpfourny/papaya/pkg/cmp"
"github.com/jpfourny/papaya/pkg/constraint"
"github.com/jpfourny/papaya/pkg/optional"
"github.com/jpfourny/papaya/pkg/stream/mapper"
"github.com/jpfourny/papaya/pkg/stream/reducer"
)

// Reducer represents a function that takes two inputs of type E and returns an output of type E.
// The Reducer is commonly used in the `Reduce` function to combine elements of a stream into a single result.
type Reducer[E any] func(e1, e2 E) (result E)

// Accumulator represents a function that takes an accumulated value of type A and an element of type E,
// and returns the updated accumulated value of type A.
// The Accumulator is commonly used in the `Aggregate` function to combine elements of a stream into a single result.
type Accumulator[A, E any] func(a A, e E) (result A)

// Finisher represents a function that takes an accumulated value of type A and returns the finished result of type F.
// The Finisher is commonly used in the `Aggregate` function to compute the final result after all elements have been accumulated.
type Finisher[A, F any] func(a A) (result F)

// Reduce combines the elements of the stream into a single value using the given reducer function.
// If the stream is empty, then an empty optional.Optional is returned.
// The stream is fully consumed.
Expand All @@ -38,131 +31,48 @@ type Finisher[A, F any] func(a A) (result F)
// return a + e
// },
// ) // None()
func Reduce[E any](s Stream[E], reduce Reducer[E]) (result optional.Optional[E]) {
result = optional.Empty[E]()
func Reduce[E any](s Stream[E], reduce Reducer[E]) optional.Optional[E] {
var accum E
var ok bool
s(func(e E) bool {
if result.Present() {
result = optional.Of(reduce(result.Get(), e))
if ok {
accum = reduce(accum, e)
} else {
result = optional.Of(e)
accum = e
ok = true
}
return true
})
return
}

// Aggregate combines the elements of the stream into a single value using the given identity value, accumulator function and finisher function.
// The accumulated value is initialized to the identity value.
// The accumulator function is used to combine each element with the accumulated value.
// The finisher function is used to compute the final result after all elements have been accumulated.
// The stream is fully consumed.
//
// Example usage:
//
// s := stream.Aggregate(
// stream.Of(1, 2, 3),
// 0, // Initial value
// func(a, e int) int {
// return a + e // Accumulate with addition
// },
// func(a int) int {
// return a * 2 // Finish with multiplication by 2
// },
// ) // (1+2+3) * 2 = 12
func Aggregate[E, A, F any](s Stream[E], identity A, accumulate Accumulator[A, E], finish Finisher[A, F]) F {
a := identity
s(func(e E) bool {
a = accumulate(a, e)
return true
})
return finish(a)
return optional.Maybe(accum, ok)
}

// Sum computes the sum of all elements in the stream of any number type E and returns the result as type E.
// The result of an empty stream is the zero value of type E.
// Sum computes the sum of all elements in the stream of any real-number type E and returns the result as real-number type F.
// The result of an empty stream is the zero value of type F.
// The stream is fully consumed.
//
// Example usage:
//
// n := stream.Sum(stream.Of(1, 2, 3)) // 6 (int)
func Sum[E constraint.Numeric](s Stream[E]) E {
// n1 := stream.Sum[int](stream.Of(1, 2, 3)) // 6 (int)
// n2 := stream.Sum[float64](stream.Of(1, 2, 3)) // 6.0 (float64)
func Sum[R, E constraint.RealNumber](s Stream[E]) R {
return Reduce(
s,
func(a E, e E) E { return a + e },
).OrElse(E(0))
Map(s, mapper.NumberToNumber[E, R]()),
reducer.Sum[R](),
).OrElseZero()
}

// SumInteger computes the sum of all elements in the stream of any signed-integer type E and returns the result as type int64.
// The result of an empty stream is the zero value of type int64.
// SumComplex computes the sum of all elements in the stream of any complex-number type E and returns the result as complex-number type F.
// The result of an empty stream is the zero value of type F.
// The stream is fully consumed.
//
// Example usage:
//
// n := stream.SumInteger(stream.Of(1, 2, 3)) // 6 (int64)
func SumInteger[E constraint.SignedInteger](s Stream[E]) int64 {
return Aggregate(
s,
int64(0),
func(a int64, e E) int64 { return a + int64(e) },
func(a int64) int64 { return a },
)
}

// SumUnsignedInteger computes the sum of all elements in the stream of any unsigned-integer type E and returns the result as type uint64.
// The result of an empty stream is the zero value of type uint64.
// The stream is fully consumed.
//
// Example usage:
//
// n := stream.SumUnsignedInteger(stream.Of[uint](1, 2, 3)) // 6 (uint64)
func SumUnsignedInteger[E constraint.UnsignedInteger](s Stream[E]) uint64 {
return Aggregate(
s,
uint64(0),
func(a uint64, e E) uint64 { return a + uint64(e) },
func(a uint64) uint64 { return a },
)
}

// SumFloat computes the sum of all elements in the stream of any floating-point type E and returns the result as type float64.
// The result of an empty stream is the zero value of type float64.
// The stream is fully consumed.
//
// Example usage:
//
// n := stream.SumFloat(stream.Of(1.0, 2.0, 3.0)) // 6.0 (float64)
func SumFloat[E constraint.RealNumber](s Stream[E]) float64 {
return Aggregate(
s,
float64(0),
func(a float64, e E) float64 { return a + float64(e) },
func(a float64) float64 { return a },
)
}

// Average computes the average of all elements in the stream of any number type E and returns the result as type float64.
// The result of an empty stream is the zero value of type float64.
// The stream is fully consumed.
//
// Example usage:
//
// n := stream.Average(stream.Of(1, 2, 3)) // 2.0 (float64)
func Average[E constraint.RealNumber](s Stream[E]) float64 {
var count uint64
return Aggregate(
s,
float64(0),
func(a float64, e E) float64 {
count++
return a + float64(e)
},
func(a float64) float64 {
if count == 0 {
return 0
}
return a / float64(count)
},
)
// n := stream.SumComplex[complex128](stream.Of(1+i, 2+i, 3+i)) // 6+3i (complex128)
func SumComplex[R, E constraint.Complex](s Stream[E]) R {
return Reduce(
Map(s, mapper.ComplexToComplex[E, R]()),
reducer.Sum[R](),
).OrElseZero()
}

// Min returns the minimum element in the stream, or the zero value of the type parameter E if the stream is empty.
Expand All @@ -174,7 +84,7 @@ func Average[E constraint.RealNumber](s Stream[E]) float64 {
// min := stream.Min(stream.Of(3, 1, 2)) // Some(1)
// min = stream.Min(stream.Empty[int]()) // None()
func Min[E constraint.Ordered](s Stream[E]) (min optional.Optional[E]) {
return MinBy(s, cmp.Natural[E]())
return Reduce(s, reducer.Min[E]())
}

// MinBy returns the minimum element in the stream.
Expand All @@ -186,15 +96,7 @@ func Min[E constraint.Ordered](s Stream[E]) (min optional.Optional[E]) {
// min := stream.MinBy(stream.Of(3, 1, 2), cmp.Natural[int]()) // Some(1)
// min = stream.MinBy(stream.Empty[int](), cmp.Natural[int]()) // None()
func MinBy[E any](s Stream[E], compare cmp.Comparer[E]) (min optional.Optional[E]) {
return Reduce(
s,
func(a, e E) E {
if compare.LessThan(e, a) {
return e
}
return a
},
)
return Reduce(s, reducer.MinBy(compare))
}

// Max returns the maximum element in the stream.
Expand All @@ -206,7 +108,7 @@ func MinBy[E any](s Stream[E], compare cmp.Comparer[E]) (min optional.Optional[E
// max := stream.Max(stream.Of(3, 1, 2)) // Some(3)
// max = stream.Max(stream.Empty[int]()) // None()
func Max[E constraint.Ordered](s Stream[E]) (max optional.Optional[E]) {
return MaxBy(s, cmp.Natural[E]())
return Reduce(s, reducer.Max[E]())
}

// MaxBy returns the maximum element in the stream, or the zero value of the type parameter E if the stream is empty.
Expand All @@ -218,13 +120,66 @@ func Max[E constraint.Ordered](s Stream[E]) (max optional.Optional[E]) {
// max := stream.MaxBy(stream.Of(3, 1, 2), cmp.Natural[int]()) // Some(3)
// max = stream.MaxBy(stream.Empty[int](), cmp.Natural[int]()) // None()
func MaxBy[E any](s Stream[E], compare cmp.Comparer[E]) (max optional.Optional[E]) {
return Reduce(
return Reduce(s, reducer.MaxBy(compare))
}

// Accumulator represents a function that takes an accumulated value of type A and an element of type E,
// and returns the updated accumulated value of type A.
// The Accumulator is commonly used in the `Aggregate` function to combine elements of a stream into a single result.
type Accumulator[A, E any] func(a A, e E) (result A)

// Finisher represents a function that takes an accumulated value of type A and returns the finished result of type F.
// The Finisher is commonly used in the `Aggregate` function to compute the final result after all elements have been accumulated.
type Finisher[A, F any] func(a A) (result F)

// Aggregate combines the elements of the stream into a single value using the given identity value, accumulator function and finisher function.
// The accumulated value is initialized to the identity value.
// The accumulator function is used to combine each element with the accumulated value.
// The finisher function is used to compute the final result after all elements have been accumulated.
// The stream is fully consumed.
//
// Example usage:
//
// s := stream.Aggregate(
// stream.Of(1, 2, 3),
// 0, // Initial value
// func(a, e int) int {
// return a + e // Accumulate with addition
// },
// func(a int) int {
// return a * 2 // Finish with multiplication by 2
// },
// ) // (1+2+3) * 2 = 12
func Aggregate[E, A, F any](s Stream[E], identity A, accumulate Accumulator[A, E], finish Finisher[A, F]) F {
a := identity
s(func(e E) bool {
a = accumulate(a, e)
return true
})
return finish(a)
}

// Average computes the average of all elements in the stream of any number type E and returns the result as type float64.
// The result of an empty stream is the zero value of type float64.
// The stream is fully consumed.
//
// Example usage:
//
// n := stream.Average(stream.Of(1, 2, 3)) // 2.0 (float64)
func Average[E constraint.RealNumber](s Stream[E]) float64 {
var count uint64
return Aggregate(
s,
func(a, e E) E {
if compare.GreaterThan(e, a) {
return e
float64(0),
func(a float64, e E) float64 {
count++
return a + float64(e)
},
func(a float64) float64 {
if count == 0 {
return 0
}
return a
return a / float64(count)
},
)
}
Expand Down
Loading

0 comments on commit e12d2a7

Please sign in to comment.