From f0a765b05aa84d8a2e70c9329fc2f630860114c7 Mon Sep 17 00:00:00 2001 From: jpfourny Date: Sat, 6 Jan 2024 15:09:26 -0500 Subject: [PATCH] Make kvstore an internal package; reformat comments. --- internal/kvstore/kvstore.go | 112 +++++++++++++++++ internal/kvstore/kvstore_test.go | 203 +++++++++++++++++++++++++++++++ pkg/stream/aggregate.go | 31 ++--- pkg/stream/collect.go | 4 +- pkg/stream/combine.go | 25 ++-- pkg/stream/filter.go | 15 +-- pkg/stream/from.go | 12 +- pkg/stream/group.go | 79 ++++++------ pkg/stream/iterate.go | 18 +-- pkg/stream/key_store.go | 103 ---------------- pkg/stream/key_store_test.go | 148 ---------------------- pkg/stream/misc.go | 2 +- pkg/stream/set.go | 29 +++-- pkg/stream/transform.go | 19 +-- 14 files changed, 442 insertions(+), 358 deletions(-) create mode 100644 internal/kvstore/kvstore.go create mode 100644 internal/kvstore/kvstore_test.go delete mode 100644 pkg/stream/key_store.go delete mode 100644 pkg/stream/key_store_test.go diff --git a/internal/kvstore/kvstore.go b/internal/kvstore/kvstore.go new file mode 100644 index 0000000..924ec35 --- /dev/null +++ b/internal/kvstore/kvstore.go @@ -0,0 +1,112 @@ +package kvstore + +import ( + "github.com/jpfourny/papaya/pkg/cmp" + "github.com/jpfourny/papaya/pkg/optional" + "slices" +) + +// Store represents a container for key-value pairs. +// Used internally for key-grouping and key-joining operations. +type Store[K, V any] interface { + Get(key K) optional.Optional[V] + Put(key K, value V) + ForEach(func(key K, value V) bool) bool +} + +// NewMapped creates a new Store backed by a map. +// The key type K must be comparable. +func NewMapped[K comparable, V any]() Store[K, V] { + return make(mappedStore[K, V]) +} + +// NewSorted creates a new Store of sorted keys, ordered by the given cmp.Comparer. +func NewSorted[K any, V any](compare cmp.Comparer[K]) Store[K, V] { + return &sortedStore[K, V]{ + compare: compare, + } +} + +// Maker is a factory function for creating a Store. +type Maker[K, V any] func() Store[K, V] + +// MappedMaker returns a Maker that calls NewMapped. +// The key type K must be comparable. +func MappedMaker[K comparable, V any]() Maker[K, V] { + return func() Store[K, V] { + return NewMapped[K, V]() + } +} + +// SortedMaker returns a Maker that calls NewSorted with the given cmp.Comparer. +func SortedMaker[K any, V any](compare cmp.Comparer[K]) Maker[K, V] { + return func() Store[K, V] { + return NewSorted[K, V](compare) + } +} + +// mappedStore provides an implementation of Store using the builtin map. +// The key type K must be comparable. +type mappedStore[K comparable, V any] map[K]V + +func (s mappedStore[K, V]) Get(key K) optional.Optional[V] { + if v, ok := s[key]; ok { + return optional.Of(v) + } + return optional.Empty[V]() +} + +func (s mappedStore[K, V]) Put(key K, value V) { + s[key] = value +} + +func (s mappedStore[K, V]) ForEach(yield func(K, V) bool) bool { + for k, v := range s { + if !yield(k, v) { + return false + } + } + return true +} + +// sortedStore provides an implementation of Store using sorted slices and binary-search. +// The keys are ordered using the given cmp.Comparer. +type sortedStore[K any, V any] struct { + compare cmp.Comparer[K] + keys []K + values []V +} + +func (s *sortedStore[K, V]) Get(key K) optional.Optional[V] { + if i, ok := s.indexOf(key); ok { + return optional.Of(s.values[i]) + } + return optional.Empty[V]() +} + +func (s *sortedStore[K, V]) Put(key K, value V) { + i, ok := s.indexOf(key) + if ok { + s.values[i] = value + } else { + s.keys = append(s.keys, key) + s.values = append(s.values, value) + copy(s.keys[i+1:], s.keys[i:]) // Shift keys. + copy(s.values[i+1:], s.values[i:]) // Shift values. + s.keys[i] = key // Insert key. + s.values[i] = value // Insert value. + } +} + +func (s *sortedStore[K, V]) indexOf(key K) (int, bool) { + return slices.BinarySearchFunc(s.keys, key, s.compare) +} + +func (s *sortedStore[K, V]) ForEach(f func(K, V) bool) bool { + for i, k := range s.keys { + if !f(k, s.values[i]) { + return false + } + } + return true +} diff --git a/internal/kvstore/kvstore_test.go b/internal/kvstore/kvstore_test.go new file mode 100644 index 0000000..212f508 --- /dev/null +++ b/internal/kvstore/kvstore_test.go @@ -0,0 +1,203 @@ +package kvstore + +import ( + "github.com/jpfourny/papaya/internal/assert" + "github.com/jpfourny/papaya/pkg/cmp" + "github.com/jpfourny/papaya/pkg/optional" + "testing" +) + +func TestMappedMaker(t *testing.T) { + m := MappedMaker[int, string]() + if m == nil { + t.Fatalf("got %#v, want non-nil", m) + } + ks := m() + if ks == nil { + t.Fatalf("got %#v, want non-nil", ks) + } +} + +func TestOrderedMaker(t *testing.T) { + m := SortedMaker[int, string](cmp.Natural[int]()) + if m == nil { + t.Fatalf("got %#v, want non-nil", m) + } + ks := m() + if ks == nil { + t.Fatalf("got %#v, want non-nil", ks) + } +} + +func TestOrderedStore_Get(t *testing.T) { + t.Run("empty", func(t *testing.T) { + ks := NewSorted[int, string](cmp.Natural[int]()) + got := ks.Get(0) + want := optional.Empty[string]() + if got != want { + t.Fatalf("got %#v, want %#v", got, want) + } + }) + + t.Run("non-empty", func(t *testing.T) { + ks := NewSorted[int, string](cmp.Natural[int]()) + ks.Put(1, "one") + ks.Put(2, "two") + ks.Put(1, "uno") + ks.Put(3, "three") + ks.Put(2, "dos") + got := ks.Get(1) + want := optional.Of("uno") + if got != want { + t.Fatalf("got %#v, want %#v", got, want) + } + got = ks.Get(2) + want = optional.Of("dos") + if got != want { + t.Fatalf("got %#v, want %#v", got, want) + } + got = ks.Get(3) + want = optional.Of("three") + if got != want { + t.Fatalf("got %#v, want %#v", got, want) + } + got = ks.Get(4) + want = optional.Empty[string]() + if got != want { + t.Fatalf("got %#v, want %#v", got, want) + } + }) +} + +func TestOrderedStore_ForEach(t *testing.T) { + t.Run("empty", func(t *testing.T) { + ks := NewSorted[int, string](cmp.Natural[int]()) + var got []int + ks.ForEach(func(key int, value string) bool { + got = append(got, key) + return true + }) + if len(got) != 0 { + t.Fatalf("got %#v, want %#v", got, []int{}) + } + }) + + t.Run("non-empty", func(t *testing.T) { + ks := NewSorted[int, string](cmp.Natural[int]()) + ks.Put(1, "one") + ks.Put(2, "two") + ks.Put(1, "uno") + ks.Put(3, "three") + ks.Put(2, "dos") + var got []int + ks.ForEach(func(key int, value string) bool { + got = append(got, key) + return true + }) + want := []int{1, 2, 3} + assert.ElementsMatch(t, got, want) + }) + + t.Run("stop", func(t *testing.T) { + ks := NewSorted[int, string](cmp.Natural[int]()) + ks.Put(1, "one") + ks.Put(2, "two") + ks.Put(1, "uno") + ks.Put(3, "three") + ks.Put(2, "dos") + var got []int + ks.ForEach(func(key int, value string) bool { + got = append(got, key) + return false + }) + want := []int{1} + assert.ElementsMatch(t, got, want) + }) +} + +func TestMappedStore_Get(t *testing.T) { + t.Run("empty", func(t *testing.T) { + ks := NewMapped[int, string]() + got := ks.Get(0) + want := optional.Empty[string]() + if got != want { + t.Fatalf("got %#v, want %#v", got, want) + } + }) + + t.Run("non-empty", func(t *testing.T) { + ks := NewMapped[int, string]() + ks.Put(1, "one") + ks.Put(2, "two") + ks.Put(1, "uno") + ks.Put(3, "three") + ks.Put(2, "dos") + got := ks.Get(1) + want := optional.Of("uno") + if got != want { + t.Fatalf("got %#v, want %#v", got, want) + } + got = ks.Get(2) + want = optional.Of("dos") + if got != want { + t.Fatalf("got %#v, want %#v", got, want) + } + got = ks.Get(3) + want = optional.Of("three") + if got != want { + t.Fatalf("got %#v, want %#v", got, want) + } + got = ks.Get(4) + want = optional.Empty[string]() + if got != want { + t.Fatalf("got %#v, want %#v", got, want) + } + }) +} + +func TestMappedStore_ForEach(t *testing.T) { + t.Run("empty", func(t *testing.T) { + ks := NewMapped[int, string]() + var got []int + ks.ForEach(func(key int, value string) bool { + got = append(got, key) + return true + }) + if len(got) != 0 { + t.Fatalf("got %#v, want %#v", got, []int{}) + } + }) + + t.Run("non-empty", func(t *testing.T) { + ks := NewMapped[int, string]() + ks.Put(1, "one") + ks.Put(2, "two") + ks.Put(1, "uno") + ks.Put(3, "three") + ks.Put(2, "dos") + var got []int + ks.ForEach(func(key int, value string) bool { + got = append(got, key) + return true + }) + want := []int{1, 2, 3} + assert.ElementsMatchAnyOrder(t, got, want) + }) + + t.Run("stop", func(t *testing.T) { + ks := NewMapped[int, string]() + ks.Put(1, "one") + ks.Put(2, "two") + ks.Put(1, "uno") + ks.Put(3, "three") + ks.Put(2, "dos") + var got []int + ks.ForEach(func(key int, value string) bool { + got = append(got, key) + return false + }) + if len(got) != 1 { + t.Fatalf("expected exactly 1 element, got %d", len(got)) + } + }) +} diff --git a/pkg/stream/aggregate.go b/pkg/stream/aggregate.go index 4eac120..625f31b 100644 --- a/pkg/stream/aggregate.go +++ b/pkg/stream/aggregate.go @@ -25,18 +25,19 @@ type Finisher[A, F any] func(a A) (result F) // // Example usage: // -// out := stream.Reduce( -// stream.Of(1, 2, 3), -// func(a, e int) int { // Reduce values by addition. -// return a + e -// }, -// ) // Some(6) -// out = stream.Reduce( -// stream.Empty[int](), -// func(a, e int) int { // Reduce values by addition. -// return a + e -// }, -// ) // None() +// out := stream.Reduce( +// stream.Of(1, 2, 3), +// func(a, e int) int { // Reduce values by addition. +// return a + e +// }, +// ) // Some(6) +// +// out = stream.Reduce( +// stream.Empty[int](), +// func(a, e int) int { // Reduce values by addition. +// return a + e +// }, +// ) // None() func Reduce[E any](s Stream[E], reduce Reducer[E]) (result optional.Optional[E]) { result = optional.Empty[E]() s(func(e E) bool { @@ -60,12 +61,12 @@ func Reduce[E any](s Stream[E], reduce Reducer[E]) (result optional.Optional[E]) // // s := stream.Aggregate( // stream.Of(1, 2, 3), -// 0, // Initial value +// 0, // Initial value // func(a, e int) int { -// return a + e // Accumulate with addition +// return a + e // Accumulate with addition // }, // func(a int) int { -// return a * 2 // Finish with multiplication by 2 +// return a * 2 // Finish with multiplication by 2 // }, // ) // (1+2+3) * 2 = 12 func Aggregate[E, A, F any](s Stream[E], identity A, accumulate Accumulator[A, E], finish Finisher[A, F]) F { diff --git a/pkg/stream/collect.go b/pkg/stream/collect.go index 92e3d12..523bb2e 100644 --- a/pkg/stream/collect.go +++ b/pkg/stream/collect.go @@ -105,7 +105,7 @@ func CollectChannelCtx[E any](ctx context.Context, s Stream[E], ch chan<- E) { // // ch := stream.CollectChannelAsync(stream.Of(1, 2, 3), 3) // for e := range ch { -// fmt.Println(e) +// fmt.Println(e) // } // // Output: @@ -133,7 +133,7 @@ func CollectChannelAsync[E any](s Stream[E], buf int) <-chan E { // ctx := context.Background() // ch := stream.CollectChannelAsyncCtx(ctx, stream.Of(1, 2, 3), 3) // for e := range ch { -// fmt.Println(e) +// fmt.Println(e) // } // // Output: diff --git a/pkg/stream/combine.go b/pkg/stream/combine.go index 832084f..905a9d6 100644 --- a/pkg/stream/combine.go +++ b/pkg/stream/combine.go @@ -17,9 +17,13 @@ type Combiner[E1, E2, F any] func(E1, E2) F // // Example usage: // -// s := stream.Combine(stream.Of(1, 2, 3), stream.Of("foo", "bar"), func(i int, s string) string { +// s := stream.Combine( +// stream.Of(1, 2, 3), +// stream.Of("foo", "bar"), +// func(i int, s string) string { // return fmt.Sprintf("%s%d", s, i) -// }) +// }, +// ) // out := stream.DebugString(s) // "" func Combine[E1, E2, F any](s1 Stream[E1], s2 Stream[E2], combine Combiner[E1, E2, F]) Stream[F] { return CombineOrDiscard(s1, s2, func(e1 E1, e2 E2) optional.Optional[F] { @@ -37,12 +41,16 @@ type OptionalCombiner[E1, E2, F any] func(E1, E2) optional.Optional[F] // // Example usage: // -// s := stream.CombineOrDiscard(stream.Of(1, 2, 3), stream.Of("foo", "bar"), func(i int, s string) optional.Optional[string] { +// s := stream.CombineOrDiscard( +// stream.Of(1, 2, 3), +// stream.Of("foo", "bar"), +// func(i int, s string) optional.Optional[string] { // if i == 2 { -// return optional.Empty[string]() +// return optional.Empty[string]() // } // return optional.Of(fmt.Sprintf("%s%d", s, i)) -// }) +// }, +// ) // out := stream.DebugString(s) // "" func CombineOrDiscard[E1, E2, F any](s1 Stream[E1], s2 Stream[E2], combine OptionalCombiner[E1, E2, F]) Stream[F] { return func(yield Consumer[F]) bool { @@ -129,9 +137,12 @@ type KeyExtractor[E, K any] func(e E) K // // Example usage: // -// s := stream.ZipWithKey(stream.Of("foo", "bar"), func(s string) string { +// s := stream.ZipWithKey( +// stream.Of("foo", "bar"), +// func(s string) string { // return strings.ToUpper(s) -// }) +// }, +// ) // out := stream.DebugString(s) // "<(FOO, foo), (BAR, bar)>" func ZipWithKey[K, E any](s Stream[E], ke KeyExtractor[E, K]) Stream[pair.Pair[K, E]] { return Map(s, func(e E) pair.Pair[K, E] { diff --git a/pkg/stream/filter.go b/pkg/stream/filter.go index d3b3492..269aa66 100644 --- a/pkg/stream/filter.go +++ b/pkg/stream/filter.go @@ -1,6 +1,7 @@ package stream import ( + "github.com/jpfourny/papaya/internal/kvstore" "github.com/jpfourny/papaya/pkg/cmp" ) @@ -14,7 +15,7 @@ type Predicate[E any] func(e E) (pass bool) // Example usage: // // s := stream.Filter(stream.Of(1, 2, 3), func(e int) bool { -// return e % 2 == 0 +// return e % 2 == 0 // }) // out := stream.DebugString(s) // "<2>" func Filter[E any](s Stream[E], p Predicate[E]) Stream[E] { @@ -75,7 +76,7 @@ func Skip[E any](s Stream[E], n int64) Stream[E] { // s := stream.Distinct(stream.Of(1, 2, 2, 3)) // out := stream.DebugString(s) // "<1, 2, 3>" func Distinct[E comparable](s Stream[E]) Stream[E] { - return distinct(s, mapKeyStoreFactory[E, struct{}]()) + return distinct(s, kvstore.MappedMaker[E, struct{}]()) } // DistinctBy returns a stream that only contains distinct elements using the given comparer to compare elements. @@ -85,17 +86,17 @@ func Distinct[E comparable](s Stream[E]) Stream[E] { // s := stream.DistinctBy(stream.Of(1, 2, 2, 3), cmp.Natural[int]()) // out := stream.DebugString(s) // "<1, 2, 3>" func DistinctBy[E any](s Stream[E], compare cmp.Comparer[E]) Stream[E] { - return distinct(s, sortedKeyStoreFactory[E, struct{}](compare)) + return distinct(s, kvstore.SortedMaker[E, struct{}](compare)) } -func distinct[E any](s Stream[E], ksf keyStoreFactory[E, struct{}]) Stream[E] { +func distinct[E any](s Stream[E], kv kvstore.Maker[E, struct{}]) Stream[E] { return func(yield Consumer[E]) bool { - seen := ksf() + seen := kv() return s(func(e E) bool { - if seen.get(e).Present() { + if seen.Get(e).Present() { return true // Skip. } - seen.put(e, struct{}{}) + seen.Put(e, struct{}{}) return yield(e) }) } diff --git a/pkg/stream/from.go b/pkg/stream/from.go index 04bd824..66fbc69 100644 --- a/pkg/stream/from.go +++ b/pkg/stream/from.go @@ -107,9 +107,9 @@ func FromMapValues[K comparable, V any](m map[K]V) Stream[V] { // // ch := make(chan int) // go func() { -// ch <- 1 -// ch <- 2 -// close(ch) +// ch <- 1 +// ch <- 2 +// close(ch) // }() // s := stream.FromChannel(ch) // out := stream.DebugString(s) // "<1, 2>" @@ -132,9 +132,9 @@ func FromChannel[E any](ch <-chan E) Stream[E] { // // ch := make(chan int) // go func() { -// ch <- 1 -// ch <- 2 -// close(ch) +// ch <- 1 +// ch <- 2 +// close(ch) // }() // s := stream.FromChannelCtx(ctx, ch) // out := stream.DebugString(s) // "<1, 2>" diff --git a/pkg/stream/group.go b/pkg/stream/group.go index 9240d50..9da62d3 100644 --- a/pkg/stream/group.go +++ b/pkg/stream/group.go @@ -1,6 +1,7 @@ package stream import ( + "github.com/jpfourny/papaya/internal/kvstore" "github.com/jpfourny/papaya/pkg/cmp" "github.com/jpfourny/papaya/pkg/constraint" "github.com/jpfourny/papaya/pkg/pair" @@ -22,7 +23,7 @@ import ( // ) // out := stream.DebugString(s) // "<(foo, [1, 3]), (bar, [2])>" func GroupByKey[K comparable, V any](s Stream[pair.Pair[K, V]]) Stream[pair.Pair[K, []V]] { - return groupByKey(s, mapKeyStoreFactory[K, []V]()) + return groupByKey(s, kvstore.MappedMaker[K, []V]()) } // GroupBySortedKey returns a stream that values key-value pairs by key using the given cmp.Comparer to compare keys. @@ -41,19 +42,19 @@ func GroupByKey[K comparable, V any](s Stream[pair.Pair[K, V]]) Stream[pair.Pair // ) // out := stream.DebugString(s) // "<(bar, [2]), (foo, [1, 3])>" func GroupBySortedKey[K any, V any](s Stream[pair.Pair[K, V]], keyCompare cmp.Comparer[K]) Stream[pair.Pair[K, []V]] { - return groupByKey(s, sortedKeyStoreFactory[K, []V](keyCompare)) + return groupByKey(s, kvstore.SortedMaker[K, []V](keyCompare)) } -func groupByKey[K any, V any](s Stream[pair.Pair[K, V]], ksf keyStoreFactory[K, []V]) Stream[pair.Pair[K, []V]] { +func groupByKey[K any, V any](s Stream[pair.Pair[K, V]], kv kvstore.Maker[K, []V]) Stream[pair.Pair[K, []V]] { return func(yield Consumer[pair.Pair[K, []V]]) bool { - groups := ksf() + groups := kv() s(func(p pair.Pair[K, V]) bool { - g := groups.get(p.First()).OrElse(nil) + g := groups.Get(p.First()).OrElse(nil) g = append(g, p.Second()) - groups.put(p.First(), g) + groups.Put(p.First(), g) return true }) - return groups.forEach(func(k K, vs []V) bool { + return groups.ForEach(func(k K, vs []V) bool { return yield(pair.Of(k, vs)) }) } @@ -77,7 +78,7 @@ func groupByKey[K any, V any](s Stream[pair.Pair[K, V]], ksf keyStoreFactory[K, // ) // out := stream.DebugString(s) // "<("foo", 4), ("bar", 2)>" func ReduceByKey[K comparable, V any](s Stream[pair.Pair[K, V]], reduce Reducer[V]) Stream[pair.Pair[K, V]] { - return reduceByKey(s, mapKeyStoreFactory[K, V](), reduce) + return reduceByKey(s, kvstore.MappedMaker[K, V](), reduce) } // ReduceBySortedKey returns a stream that reduces key-value pairs by key using the given cmp.Comparer to compare keys and the given Reducer to reduce values. @@ -99,24 +100,24 @@ func ReduceByKey[K comparable, V any](s Stream[pair.Pair[K, V]], reduce Reducer[ // ) // out := stream.DebugString(s) // "<("bar", 2), ("foo", 4)>" func ReduceBySortedKey[K any, V any](s Stream[pair.Pair[K, V]], keyCompare cmp.Comparer[K], reduce Reducer[V]) Stream[pair.Pair[K, V]] { - return reduceByKey(s, sortedKeyStoreFactory[K, V](keyCompare), reduce) + return reduceByKey(s, kvstore.SortedMaker[K, V](keyCompare), reduce) } -func reduceByKey[K any, V any](s Stream[pair.Pair[K, V]], ksf keyStoreFactory[K, V], reduce Reducer[V]) Stream[pair.Pair[K, V]] { +func reduceByKey[K any, V any](s Stream[pair.Pair[K, V]], kv kvstore.Maker[K, V], reduce Reducer[V]) Stream[pair.Pair[K, V]] { return func(yield Consumer[pair.Pair[K, V]]) bool { - groups := ksf() + groups := kv() s(func(p pair.Pair[K, V]) bool { - groups.get(p.First()).IfPresentElse( + groups.Get(p.First()).IfPresentElse( func(v V) { // If present - groups.put(p.First(), reduce(v, p.Second())) + groups.Put(p.First(), reduce(v, p.Second())) }, func() { // Else - groups.put(p.First(), p.Second()) + groups.Put(p.First(), p.Second()) }, ) return true }) - return groups.forEach(func(k K, v V) bool { + return groups.ForEach(func(k K, v V) bool { return yield(pair.Of(k, v)) }) } @@ -147,7 +148,7 @@ func reduceByKey[K any, V any](s Stream[pair.Pair[K, V]], ksf keyStoreFactory[K, // ) // out := stream.DebugString(s) // "<("foo", "4"), ("bar", "2")>" func AggregateByKey[K comparable, V, A, F any](s Stream[pair.Pair[K, V]], identity A, accumulate Accumulator[A, V], finish Finisher[A, F]) Stream[pair.Pair[K, F]] { - return aggregateByKey(s, mapKeyStoreFactory[K, A](), identity, accumulate, finish) + return aggregateByKey(s, kvstore.MappedMaker[K, A](), identity, accumulate, finish) } // AggregateBySortedKey returns a stream that aggregates key-value pairs by key using the given cmp.Comparer to compare keys. @@ -176,24 +177,24 @@ func AggregateByKey[K comparable, V, A, F any](s Stream[pair.Pair[K, V]], identi // ) // out := stream.DebugString(s) // "<("bar", "2"), ("foo", "4")>" func AggregateBySortedKey[K any, V, A, F any](s Stream[pair.Pair[K, V]], keyCompare cmp.Comparer[K], identity A, accumulate Accumulator[A, V], finish Finisher[A, F]) Stream[pair.Pair[K, F]] { - return aggregateByKey(s, sortedKeyStoreFactory[K, A](keyCompare), identity, accumulate, finish) + return aggregateByKey(s, kvstore.SortedMaker[K, A](keyCompare), identity, accumulate, finish) } -func aggregateByKey[K any, V, A, F any](s Stream[pair.Pair[K, V]], ksf keyStoreFactory[K, A], identity A, accumulate Accumulator[A, V], finish Finisher[A, F]) Stream[pair.Pair[K, F]] { +func aggregateByKey[K any, V, A, F any](s Stream[pair.Pair[K, V]], kv kvstore.Maker[K, A], identity A, accumulate Accumulator[A, V], finish Finisher[A, F]) Stream[pair.Pair[K, F]] { return func(yield Consumer[pair.Pair[K, F]]) bool { - groups := ksf() + groups := kv() s(func(p pair.Pair[K, V]) bool { - groups.get(p.First()).IfPresentElse( + groups.Get(p.First()).IfPresentElse( func(a A) { // If present - groups.put(p.First(), accumulate(a, p.Second())) + groups.Put(p.First(), accumulate(a, p.Second())) }, func() { // Else - groups.put(p.First(), accumulate(identity, p.Second())) + groups.Put(p.First(), accumulate(identity, p.Second())) }, ) return true }) - groups.forEach(func(k K, a A) bool { + groups.ForEach(func(k K, a A) bool { return yield(pair.Of(k, finish(a))) }) return true @@ -303,14 +304,14 @@ func MinBySortedKey[K any, V constraint.Ordered](s Stream[pair.Pair[K, V]], keyC // // Example usage: // -// s := stream.MaxByKey( -// stream.Of( -// pair.Of("foo", 1), -// pair.Of("bar", 2), -// pair.Of("foo", 3), -// ), -// ) -// out := stream.DebugString(s) // "<("foo", 3), ("bar", 2)>" +// s := stream.MaxByKey( +// stream.Of( +// pair.Of("foo", 1), +// pair.Of("bar", 2), +// pair.Of("foo", 3), +// ), +// ) +// out := stream.DebugString(s) // "<("foo", 3), ("bar", 2)>" func MaxByKey[K comparable, V constraint.Ordered](s Stream[pair.Pair[K, V]]) Stream[pair.Pair[K, V]] { return ReduceByKey( s, @@ -350,14 +351,14 @@ func MaxBySortedKey[K any, V constraint.Ordered](s Stream[pair.Pair[K, V]], keyC // // Example usage: // -// s := stream.SumByKey( -// stream.Of( -// pair.Of("foo", 1), -// pair.Of("bar", 2), -// pair.Of("foo", 3), -// ), -// ) -// out := stream.DebugString(s) // "<("foo", 4), ("bar", 2)>" +// s := stream.SumByKey( +// stream.Of( +// pair.Of("foo", 1), +// pair.Of("bar", 2), +// pair.Of("foo", 3), +// ), +// ) +// out := stream.DebugString(s) // "<("foo", 4), ("bar", 2)>" func SumByKey[K comparable, V constraint.Numeric](s Stream[pair.Pair[K, V]]) Stream[pair.Pair[K, V]] { return ReduceByKey( s, diff --git a/pkg/stream/iterate.go b/pkg/stream/iterate.go index 1bed4f7..32f1c80 100644 --- a/pkg/stream/iterate.go +++ b/pkg/stream/iterate.go @@ -12,15 +12,15 @@ type Iterator[E any] func() optional.Optional[E] // // Example usage: // -// i := 0 -// s := stream.Iterate(func() optional.Optional[int] { -// if i < 3 { -// i++ -// return optional.Of(i) -// } -// return optional.Empty[int]() -// }) -// out := stream.DebugString(s) // "<1, 2, 3>" +// i := 0 +// s := stream.Iterate(func() optional.Optional[int] { +// if i < 3 { +// i++ +// return optional.Of(i) +// } +// return optional.Empty[int]() +// }) +// out := stream.DebugString(s) // "<1, 2, 3>" func Iterate[E any](next Iterator[E]) Stream[E] { return func(yield Consumer[E]) bool { for e := next(); e.Present(); e = next() { diff --git a/pkg/stream/key_store.go b/pkg/stream/key_store.go deleted file mode 100644 index 84bacba..0000000 --- a/pkg/stream/key_store.go +++ /dev/null @@ -1,103 +0,0 @@ -package stream - -import ( - "github.com/jpfourny/papaya/pkg/cmp" - "github.com/jpfourny/papaya/pkg/optional" - "slices" -) - -// keyStore represents a store of keys and their associated values. -// Used internally for various stream operations, such as distinct, groupBy, aggregateBy, etc. -type keyStore[K, G any] interface { - get(key K) optional.Optional[G] - put(key K, value G) - forEach(func(key K, value G) bool) bool -} - -// keyStoreFactory represents a factory function for creating a keyStore. -type keyStoreFactory[K, V any] func() keyStore[K, V] - -// mapKeyStoreFactory returns a keyStoreFactory for creating a map-based keyStore with O(1) access time. -// The key type K must be comparable. -func mapKeyStoreFactory[K comparable, V any]() keyStoreFactory[K, V] { - return func() keyStore[K, V] { - return make(mapKeyStore[K, V]) - } -} - -// sortedKeyStoreFactory returns a keyStoreFactory for creating a sorted keyStore with O(log n) access time. -// This is used when the key type is not comparable, ruling out the use of a map-based keyStore. -// The keys are sorted using the given cmp.Comparer. -func sortedKeyStoreFactory[K any, V any](compare cmp.Comparer[K]) keyStoreFactory[K, V] { - return func() keyStore[K, V] { - return &sortedKeyStore[K, V]{ - compare: compare, - } - } -} - -// mapKeyStore provides an implementation of keyStore using a map. -// The key type K must be comparable. -type mapKeyStore[K comparable, V any] map[K]V - -func (ks mapKeyStore[K, V]) get(key K) optional.Optional[V] { - if v, ok := ks[key]; ok { - return optional.Of(v) - } - return optional.Empty[V]() -} - -func (ks mapKeyStore[K, V]) put(key K, value V) { - ks[key] = value -} - -func (ks mapKeyStore[K, V]) forEach(yield func(K, V) bool) bool { - for k, v := range ks { - if !yield(k, v) { - return false - } - } - return true -} - -// sortedKeyStore provides an implementation of keyStore using sorted slices and binary search. -// The keys are sorted using the given cmp.Comparer. -type sortedKeyStore[K any, V any] struct { - compare cmp.Comparer[K] - keys []K - values []V -} - -func (ks *sortedKeyStore[K, V]) get(key K) optional.Optional[V] { - if i, ok := ks.indexOf(key); ok { - return optional.Of(ks.values[i]) - } - return optional.Empty[V]() -} - -func (ks *sortedKeyStore[K, V]) put(key K, value V) { - i, ok := ks.indexOf(key) - if ok { - ks.values[i] = value - } else { - ks.keys = append(ks.keys, key) - ks.values = append(ks.values, value) - copy(ks.keys[i+1:], ks.keys[i:]) // Shift keys. - copy(ks.values[i+1:], ks.values[i:]) // Shift values. - ks.keys[i] = key // Insert key. - ks.values[i] = value // Insert value. - } -} - -func (ks *sortedKeyStore[K, V]) indexOf(key K) (int, bool) { - return slices.BinarySearchFunc(ks.keys, key, ks.compare) -} - -func (ks *sortedKeyStore[K, V]) forEach(f func(K, V) bool) bool { - for i, k := range ks.keys { - if !f(k, ks.values[i]) { - return false - } - } - return true -} diff --git a/pkg/stream/key_store_test.go b/pkg/stream/key_store_test.go deleted file mode 100644 index a6449e2..0000000 --- a/pkg/stream/key_store_test.go +++ /dev/null @@ -1,148 +0,0 @@ -package stream - -import ( - "github.com/jpfourny/papaya/internal/assert" - "github.com/jpfourny/papaya/pkg/cmp" - "github.com/jpfourny/papaya/pkg/optional" - "testing" -) - -func TestSortedKeyStore_get(t *testing.T) { - t.Run("empty", func(t *testing.T) { - ks := sortedKeyStoreFactory[int, string](cmp.Natural[int]())() - got := ks.get(0) - want := optional.Empty[string]() - if got != want { - t.Fatalf("got %#v, want %#v", got, want) - } - }) - - t.Run("non-empty", func(t *testing.T) { - ks := sortedKeyStoreFactory[int, string](cmp.Natural[int]())() - ks.put(1, "one") - ks.put(2, "two") - ks.put(1, "uno") - ks.put(3, "three") - ks.put(2, "dos") - got := ks.get(1) - want := optional.Of("uno") - if got != want { - t.Fatalf("got %#v, want %#v", got, want) - } - got = ks.get(2) - want = optional.Of("dos") - if got != want { - t.Fatalf("got %#v, want %#v", got, want) - } - got = ks.get(3) - want = optional.Of("three") - if got != want { - t.Fatalf("got %#v, want %#v", got, want) - } - got = ks.get(4) - want = optional.Empty[string]() - if got != want { - t.Fatalf("got %#v, want %#v", got, want) - } - }) -} - -func TestSortedKeyStore_forEach(t *testing.T) { - t.Run("empty", func(t *testing.T) { - ks := sortedKeyStoreFactory[int, string](cmp.Natural[int]())() - var got []int - ks.forEach(func(key int, value string) bool { - got = append(got, key) - return true - }) - if len(got) != 0 { - t.Fatalf("got %#v, want %#v", got, []int{}) - } - }) - - t.Run("non-empty", func(t *testing.T) { - ks := sortedKeyStoreFactory[int, string](cmp.Natural[int]())() - ks.put(1, "one") - ks.put(2, "two") - ks.put(1, "uno") - ks.put(3, "three") - ks.put(2, "dos") - var got []int - ks.forEach(func(key int, value string) bool { - got = append(got, key) - return true - }) - want := []int{1, 2, 3} - assert.ElementsMatch(t, got, want) - }) -} - -func TestMapKeyStore_get(t *testing.T) { - t.Run("empty", func(t *testing.T) { - ks := mapKeyStoreFactory[int, string]()() - got := ks.get(0) - want := optional.Empty[string]() - if got != want { - t.Fatalf("got %#v, want %#v", got, want) - } - }) - - t.Run("non-empty", func(t *testing.T) { - ks := mapKeyStoreFactory[int, string]()() - ks.put(1, "one") - ks.put(2, "two") - ks.put(1, "uno") - ks.put(3, "three") - ks.put(2, "dos") - got := ks.get(1) - want := optional.Of("uno") - if got != want { - t.Fatalf("got %#v, want %#v", got, want) - } - got = ks.get(2) - want = optional.Of("dos") - if got != want { - t.Fatalf("got %#v, want %#v", got, want) - } - got = ks.get(3) - want = optional.Of("three") - if got != want { - t.Fatalf("got %#v, want %#v", got, want) - } - got = ks.get(4) - want = optional.Empty[string]() - if got != want { - t.Fatalf("got %#v, want %#v", got, want) - } - }) -} - -func TestMapKeyStore_forEach(t *testing.T) { - t.Run("empty", func(t *testing.T) { - ks := mapKeyStoreFactory[int, string]()() - var got []int - ks.forEach(func(key int, value string) bool { - got = append(got, key) - return true - }) - if len(got) != 0 { - t.Fatalf("got %#v, want %#v", got, []int{}) - } - }) - - t.Run("non-empty", func(t *testing.T) { - ks := mapKeyStoreFactory[int, string]()() - ks.put(1, "one") - ks.put(2, "two") - ks.put(1, "uno") - ks.put(3, "three") - ks.put(2, "dos") - var got []int - ks.forEach(func(key int, value string) bool { - got = append(got, key) - return true - }) - want := []int{1, 2, 3} - assert.ElementsMatchAnyOrder(t, got, want) - }) -} diff --git a/pkg/stream/misc.go b/pkg/stream/misc.go index 98b0089..74f3347 100644 --- a/pkg/stream/misc.go +++ b/pkg/stream/misc.go @@ -78,7 +78,7 @@ func Last[E any](s Stream[E]) (last optional.Optional[E]) { // Example usage: // // s := stream.Peek(stream.Of(1, 2, 3), func(e int) { -// fmt.Println(e) +// fmt.Println(e) // }) // stream.Count(s) // Force the stream to materialize. // diff --git a/pkg/stream/set.go b/pkg/stream/set.go index 525955a..d9caa79 100644 --- a/pkg/stream/set.go +++ b/pkg/stream/set.go @@ -1,6 +1,9 @@ package stream -import "github.com/jpfourny/papaya/pkg/cmp" +import ( + "github.com/jpfourny/papaya/internal/kvstore" + "github.com/jpfourny/papaya/pkg/cmp" +) // Union combines multiple streams into a single stream (concatenation). // The length of the resulting stream is the sum of the lengths of the input streams. @@ -30,7 +33,7 @@ func Union[E any](ss ...Stream[E]) Stream[E] { // s := stream.Intersection(stream.Of(1, 2, 3, 4, 5), stream.Of(4, 5, 6)) // out := stream.DebugString(s) // "<4, 5>" func Intersection[E comparable](s1, s2 Stream[E]) Stream[E] { - return intersection(s1, s2, mapKeyStoreFactory[E, struct{}]()) + return intersection(s1, s2, kvstore.MappedMaker[E, struct{}]()) } // IntersectionAll returns a stream that contains elements that are in all the given streams. @@ -63,7 +66,7 @@ func IntersectionAll[E comparable](ss ...Stream[E]) Stream[E] { // s := stream.IntersectionBy(stream.Of(1, 2, 3, 4, 5), stream.Of(4, 5, 6), cmp.Natural[int]()) // out := stream.DebugString(s) // "<4, 5>" func IntersectionBy[E any](s1, s2 Stream[E], compare cmp.Comparer[E]) Stream[E] { - return intersection(s1, s2, sortedKeyStoreFactory[E, struct{}](compare)) + return intersection(s1, s2, kvstore.SortedMaker[E, struct{}](compare)) } // IntersectionAllBy returns a stream that contains elements that are in all the given streams, compared by the given cmp.Comparer. @@ -87,18 +90,18 @@ func IntersectionAllBy[E any](compare cmp.Comparer[E], ss ...Stream[E]) Stream[E return IntersectionBy(ss[0], ss[1], compare) } -func intersection[E any](s1, s2 Stream[E], ksf keyStoreFactory[E, struct{}]) Stream[E] { +func intersection[E any](s1, s2 Stream[E], kv kvstore.Maker[E, struct{}]) Stream[E] { // Exactly 2 streams; intersect ss[0] and ss[1]. return func(yield Consumer[E]) bool { // Index elements of the first stream into a set. - seen := ksf() + seen := kv() s1(func(e E) bool { - seen.put(e, struct{}{}) + seen.Put(e, struct{}{}) return true }) // Yield elements of the second stream that are in the set. return s2(func(e E) bool { - if seen.get(e).Present() { + if seen.Get(e).Present() { return yield(e) } return true @@ -115,7 +118,7 @@ func intersection[E any](s1, s2 Stream[E], ksf keyStoreFactory[E, struct{}]) Str // s := stream.Difference(stream.Of(1, 2, 3, 4, 5), stream.Of(4, 5, 6)) // out := stream.DebugString(s) // "<1, 2, 3>" func Difference[E comparable](s1, s2 Stream[E]) Stream[E] { - return difference(s1, s2, mapKeyStoreFactory[E, struct{}]()) + return difference(s1, s2, kvstore.MappedMaker[E, struct{}]()) } // DifferenceBy returns a stream that contains elements that are in the first stream but not in the second stream, compared by the given cmp.Comparer. @@ -126,20 +129,20 @@ func Difference[E comparable](s1, s2 Stream[E]) Stream[E] { // s := stream.DifferenceBy(stream.Of(1, 2, 3, 4, 5), stream.Of(4, 5, 6), cmp.Natural[int]()) // out := stream.DebugString(s) // "<1, 2, 3>" func DifferenceBy[E any](s1, s2 Stream[E], compare cmp.Comparer[E]) Stream[E] { - return difference(s1, s2, sortedKeyStoreFactory[E, struct{}](compare)) + return difference(s1, s2, kvstore.SortedMaker[E, struct{}](compare)) } -func difference[E any](s1, s2 Stream[E], ksf keyStoreFactory[E, struct{}]) Stream[E] { +func difference[E any](s1, s2 Stream[E], kv kvstore.Maker[E, struct{}]) Stream[E] { return func(yield Consumer[E]) bool { // Index elements of the second stream into a set. - seen := ksf() + seen := kv() s2(func(e E) bool { - seen.put(e, struct{}{}) + seen.Put(e, struct{}{}) return true }) // Yield elements of the first stream that are not in the set. return s1(func(e E) bool { - if !seen.get(e).Present() { + if !seen.Get(e).Present() { return yield(e) } return true diff --git a/pkg/stream/transform.go b/pkg/stream/transform.go index 89fb827..22657e4 100644 --- a/pkg/stream/transform.go +++ b/pkg/stream/transform.go @@ -63,10 +63,13 @@ type FlatMapper[E, F any] func(from E) (to Stream[F]) // // Example usage: // -// s := stream.FlatMap(stream.Of(1, 2, 3), func(e int) stream.Stream[string] { -// return stream.Map(stream.RangeInteger(0, e), mapper.Sprint) -// }) -// out := stream.DebugString(s) // "<0, 0, 1, 0, 1, 2>" +// s := stream.FlatMap( +// stream.Of(1, 2, 3), +// func(e int) stream.Stream[string] { // e -> <"e", "e"> +// return stream.Of(mapper.Sprint(e), mapper.Sprint(e)) +// }, +// ) +// out := stream.DebugString(s) // "<1, 1, 2, 2, 3, 3>" func FlatMap[E, F any](s Stream[E], fm FlatMapper[E, F]) Stream[F] { return func(yield Consumer[F]) bool { return s(func(e E) bool { @@ -117,11 +120,11 @@ func SortBy[E any](s Stream[E], compare cmp.Comparer[E]) Stream[E] { // // Example usage: // -// s := stream.Truncate(stream.Of("a", "b", "c""), 2, "...") -// out := stream.DebugString(s) // "" +// s := stream.Truncate(stream.Of("a", "b", "c""), 2, "...") +// out := stream.DebugString(s) // "" // -// s = stream.Truncate(stream.Of("a", "b", "c""), 3, "...") -// out = stream.DebugString(s) // "" +// s = stream.Truncate(stream.Of("a", "b", "c""), 3, "...") +// out = stream.DebugString(s) // "" func Truncate[E any](s Stream[E], length int, tail E) Stream[E] { return func(yield Consumer[E]) bool { i := 0