From 270cd8fa6dc4bf6dfa394213e4f1635eed52b9c3 Mon Sep 17 00:00:00 2001 From: Sergey Grebenshchikov Date: Thu, 29 Aug 2024 20:08:45 +0200 Subject: [PATCH] init --- .github/workflows/go.yml | 28 ++++ README.md | 36 +++++ bucket.go | 51 +++++++ go.mod | 7 + go.sum | 4 + hash.go | 14 ++ min_heap.go | 135 +++++++++++++++++ sizeof.go | 15 ++ sketch.go | 309 +++++++++++++++++++++++++++++++++++++++ sketch_test.go | 139 ++++++++++++++++++ 10 files changed, 738 insertions(+) create mode 100644 .github/workflows/go.yml create mode 100644 README.md create mode 100644 bucket.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 hash.go create mode 100644 min_heap.go create mode 100644 sizeof.go create mode 100644 sketch.go create mode 100644 sketch_test.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 0000000..6092f4a --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,28 @@ +# This workflow will build a golang project +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go + +name: Go + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +jobs: + + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.23' + + - name: Build + run: go build -v ./... + + - name: Test + run: go test -v ./... diff --git a/README.md b/README.md new file mode 100644 index 0000000..6dd7934 --- /dev/null +++ b/README.md @@ -0,0 +1,36 @@ +# sliding-topk + +Sliding HeavyKeeper, as described in ["A Sketch Framework for Approximate Data Stream Processing in Sliding Windows"](https://yangtonghome.github.io/uploads/SlidingSketch_TKDE2022_final.pdf) + +```go +import ( + topk "github.com/keilerkonzept/sliding-topk" +) + +func main() { + // make a new sketch keeping track of k=3 items over a window of the last 60 ticks + // use width=1024 x depth=3 = 3072 buckets + sketch := topk.New(3, 60, topk.WithWidth(1024),topk.WithDepth(3)) + + log.Println("the sketch takes", sketch.SizeBytes(), "bytes in memory") + + sketch.Incr("an item") // count "an item" 1 time + sketch.Add("an item", 123) // count "an item" 123 times + sketch.Tick(1) // advance time by one tick + sketch.Add("another item", 4) // count "another item" 4 times + sketch.Tick(2) // advance time by two ticks + sketch.Add("an item", 5) // count "an item" 5 more times + sketch.Add("yet another item", 6) // count "yet another item" 6 times + + if sketch.Query("an item") { + // "an item" is in the top K items observed within the last 60 ticks + } + + _ = sketch.Count("another item") // return the estimated count for "another item" + + for entry := range sketch.TopK() {// TopK() rseturn all top K items as a slice of {Item,Count} structs + log.Println(entry.Item, "counted", entry.Count, "times") + } + + sketch.Reset() // reset to New() state +} diff --git a/bucket.go b/bucket.go new file mode 100644 index 0000000..d43c9da --- /dev/null +++ b/bucket.go @@ -0,0 +1,51 @@ +package topk + +type Bucket struct { + Fingerprint uint32 + + // Counts is a circular buffer (with its first entry at .First) + Counts []uint32 + First uint32 + // CountsSum is the current sum of Counts + CountsSum uint32 +} + +func (me *Bucket) tick() { + if me.CountsSum == 0 { + return + } + + last := me.First + if last == 0 { + last = uint32(len(me.Counts) - 1) + } else { + last = uint32(last - 1) + } + me.CountsSum -= me.Counts[last] + me.Counts[last] = 0 + me.First = last +} + +func (me *Bucket) findNonzeroMinimumCount() int { + countsMinIdx := uint32(0) + first := true + var countsMin uint32 + i := me.First + for range len(me.Counts) { + if i == uint32(len(me.Counts)) { + i = 0 + } + c := me.Counts[i] + if c == 0 { + i++ + continue + } + if first || c < countsMin { + countsMin = c + countsMinIdx = i + first = false + } + i++ + } + return int(countsMinIdx) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..51f053c --- /dev/null +++ b/go.mod @@ -0,0 +1,7 @@ +module github.com/keilerkonzept/sliding-topk + +go 1.23.0 + +require github.com/google/go-cmp v0.6.0 + +require github.com/OneOfOne/xxhash v1.2.8 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..3ee4bb9 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= +github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= diff --git a/hash.go b/hash.go new file mode 100644 index 0000000..553e6ab --- /dev/null +++ b/hash.go @@ -0,0 +1,14 @@ +package topk + +import "github.com/OneOfOne/xxhash" + +const hashSeed = 4848280 + +func Fingerprint(item string) uint32 { + return xxhash.ChecksumString32S(item, hashSeed) +} + +func bucketIndex(item string, row, width int) int { + column := int(xxhash.ChecksumString32S(item, uint32(row))) % width + return row*width + column +} diff --git a/min_heap.go b/min_heap.go new file mode 100644 index 0000000..f6f5026 --- /dev/null +++ b/min_heap.go @@ -0,0 +1,135 @@ +package topk + +import "container/heap" + +type HeapItem struct { + Fingerprint uint32 + Item string + Count uint32 +} + +type MinHeap struct { + Items []HeapItem + Index map[string]int + StoredKeysBytes int +} + +func NewMinHeap(k int) *MinHeap { + return &MinHeap{ + Items: make([]HeapItem, k), + Index: make(map[string]int, k), + } +} + +var _ heap.Interface = &MinHeap{} + +func (me MinHeap) SizeBytes() int { + structSize := sizeofBucketMinHeapStruct + bucketsSize := len(me.Items)*sizeofHeapBucket + me.StoredKeysBytes + indexSize := sizeofStringIntMap + (sizeofInt+sizeofString)*len(me.Index) + return structSize + bucketsSize + indexSize +} + +func (me *MinHeap) Reinit() { + heap.Init(me) + for me.Len() > 0 && me.Items[0].Count == 0 { + item := me.Items[0].Item + heap.Pop(me) + delete(me.Index, item) + } +} + +func (me MinHeap) Full() bool { return len(me.Items) == cap(me.Items) } +func (me MinHeap) Len() int { return len(me.Items) } +func (me MinHeap) Less(i, j int) bool { + ic := me.Items[i].Count + jc := me.Items[j].Count + if ic == jc { + return me.Items[i].Item < me.Items[j].Item + } + return ic < jc +} +func (me MinHeap) Swap(i, j int) { + itemi := me.Items[i].Item + itemj := me.Items[j].Item + me.Items[i], me.Items[j] = me.Items[j], me.Items[i] + me.Index[itemi] = j + me.Index[itemj] = i +} + +func (me *MinHeap) Push(x interface{}) { + b := x.(HeapItem) + me.Items = append(me.Items, b) + me.Index[b.Item] = len(me.Items) - 1 +} + +func (me *MinHeap) Pop() interface{} { + old := me.Items + n := len(old) + x := old[n-1] + me.Items = old[0 : n-1] + delete(me.Index, x.Item) + return x +} + +// Min returns the minimum count in the heap or 0 if the heap is empty. +func (me MinHeap) Min() uint32 { + if len(me.Items) == 0 { + return 0 + } + return me.Items[0].Count +} + +func (me MinHeap) Find(item string) (i int) { + if i, ok := me.Index[item]; ok { + return i + } + return -1 +} + +func (me MinHeap) Contains(item string) bool { + _, ok := me.Index[item] + return ok +} + +func (me MinHeap) Get(item string) *HeapItem { + if i, ok := me.Index[item]; ok { + return &me.Items[i] + } + return nil +} + +func (me *MinHeap) Update(item string, fingerprint uint32, count uint32) { + if count < me.Min() && me.Full() { // not in top k: ignore + return + } + + if i := me.Find(item); i >= 0 { // already in heap: update count + me.Items[i].Count = count + heap.Fix(me, i) + return + } + + me.StoredKeysBytes += len(item) + + if !me.Full() { // heap not full: add to heap + me.Push(HeapItem{ + Count: count, + Fingerprint: fingerprint, + Item: item, + }) + return + } + + // replace min on heap + minItem := me.Items[0].Item + me.StoredKeysBytes -= len(minItem) + delete(me.Index, minItem) + me.Items[0] = HeapItem{ + Count: count, + Fingerprint: fingerprint, + Item: item, + } + me.Index[item] = 0 + heap.Fix(me, 0) +} diff --git a/sizeof.go b/sizeof.go new file mode 100644 index 0000000..48d7a46 --- /dev/null +++ b/sizeof.go @@ -0,0 +1,15 @@ +package topk + +import "unsafe" + +const ( + sizeofSketchStruct = int(unsafe.Sizeof(Sketch{})) + sizeofBucketStruct = int(unsafe.Sizeof(Bucket{})) + sizeofBucketMinHeapStruct = int(unsafe.Sizeof(MinHeap{})) + sizeofHeapBucket = int(unsafe.Sizeof(HeapItem{})) + sizeofStringIntMap = int(unsafe.Sizeof(map[string]int{})) + sizeofString = int(unsafe.Sizeof("")) + sizeofInt = int(unsafe.Sizeof(int(0))) + sizeofUInt32 = int(unsafe.Sizeof(uint32(0))) + sizeofFloat32 = int(unsafe.Sizeof(float32(0))) +) diff --git a/sketch.go b/sketch.go new file mode 100644 index 0000000..ed44f3a --- /dev/null +++ b/sketch.go @@ -0,0 +1,309 @@ +// Package topk implements a sliding HeavyKeeper, as described in "A Sketch Framework for Approximate Data Stream Processing in Sliding Windows" [1] +// [1] https://yangtonghome.github.io/uploads/SlidingSketch_TKDE2022_final.pdf +package topk + +import ( + "math" + "math/rand/v2" + "sort" +) + +type Sketch struct { + K int + Width int + Depth int + WindowSize int // N: window size in ticks + BucketHistoryLength int // d + + Decay float32 + DecayLookupTable []float32 + + NextBucketToExpireIndex int // Index of the next bucket to expire. + + Buckets []Bucket + Heap *MinHeap +} + +type Option func(*Sketch) + +func WithDepth(depth int) Option { return func(s *Sketch) { s.Depth = depth } } +func WithWidth(width int) Option { return func(s *Sketch) { s.Width = width } } +func WithDecay(decay float32) Option { return func(s *Sketch) { s.Decay = decay } } +func WithDecayLookupTableSize(n int) Option { + return func(s *Sketch) { s.DecayLookupTable = make([]float32, n) } +} +func WithBucketHistoryLength(n int) Option { + return func(s *Sketch) { s.BucketHistoryLength = n } +} + +func New(k, windowSize int, opts ...Option) *Sketch { + log_k := int(math.Ceil(math.Log(float64(k)))) + + // default settings + out := Sketch{ + K: k, + Width: intMax(256, k*log_k), + Depth: intMax(3, log_k), + WindowSize: windowSize, + BucketHistoryLength: windowSize, + Decay: 0.9, + } + + for _, o := range opts { + o(&out) + } + + if len(out.DecayLookupTable) == 0 { + // if not specified, default to 256 + out.DecayLookupTable = make([]float32, 256) + } + + if out.BucketHistoryLength < 1 { + out.BucketHistoryLength = 1 + } + if out.BucketHistoryLength >= out.WindowSize { + out.BucketHistoryLength = out.WindowSize + } + + out.Heap = NewMinHeap(out.K) + out.initBuckets() + out.initLookupTable() + + return &out +} + +func (me *Sketch) initLookupTable() { + for i := range me.DecayLookupTable { + me.DecayLookupTable[i] = float32(math.Pow(float64(me.Decay), float64(i))) + } +} + +func (me *Sketch) initBuckets() { + me.Buckets = make([]Bucket, me.Width*me.Depth) + for i := range me.Buckets { + me.Buckets[i].Counts = make([]uint32, me.BucketHistoryLength) + } +} + +// SizeBytes returns the current size of the sketch in bytes. +func (me *Sketch) SizeBytes() int { + bucketsSize := (sizeofBucketStruct + sizeofUInt32*me.BucketHistoryLength) * len(me.Buckets) + heapSize := me.Heap.SizeBytes() + decayTableSize := len(me.DecayLookupTable) * sizeofFloat32 + return sizeofSketchStruct + + bucketsSize + + heapSize + + decayTableSize +} + +// Tick advances time by one unit (of the N units in a window) +func (me *Sketch) Tick() { me.Ticks(1) } + +// Ticks advances time by n units (of the N units in a window) +func (me *Sketch) Ticks(n int) { + if n == 0 { + return + } + tick := me.NextBucketToExpireIndex + m, d, N := len(me.Buckets), me.BucketHistoryLength, me.WindowSize + bucketsToAge := (n * d * m) / N + if bucketsToAge < 1 { + bucketsToAge = 1 + } + for i := 0; i < bucketsToAge; i++ { + me.Buckets[tick].tick() + tick++ + if tick == m { + tick = 0 + } + } + me.NextBucketToExpireIndex = tick + me.recountHeapItems() +} + +// Count returns the estimated count of the given item. +func (me *Sketch) Count(item string) uint32 { + if i := me.Heap.Find(item); i >= 0 { + b := me.Heap.Items[i] + if b.Item == item { + return b.Count + } + } + + fingerprint := Fingerprint(item) + var maxSum uint32 + + for i := range me.Depth { + b := &me.Buckets[bucketIndex(item, i, me.Width)] + if b.Fingerprint != fingerprint { + continue + } + maxSum = uint32Max(maxSum, b.CountsSum) + } + + return maxSum +} + +func (me *Sketch) recountHeapItems() { + // recompute each heap item's count from its buckets, + // then re-initialize the heap. + // + // O(k * depth) + for i := range me.Heap.Items { + hb := &me.Heap.Items[i] + if hb.Count == 0 { + continue + } + fingerprint := hb.Fingerprint + item := hb.Item + width := me.Width + var maxSum uint32 + + for i := range me.Depth { + b := &me.Buckets[bucketIndex(item, i, width)] + if b.Fingerprint != fingerprint { + continue + } + maxSum = uint32Max(maxSum, b.CountsSum) + } + hb.Count = maxSum + } + + // O(k) + me.Heap.Reinit() +} + +// Incr counts a single instance of the given item. +func (me *Sketch) Incr(item string) { + me.Add(item, 1) +} + +// Add increments the given item's count by the given increment. +func (me *Sketch) Add(item string, increment uint32) { + var maxCount uint32 + fingerprint := Fingerprint(item) + + width := me.Width + for i := range me.Depth { + k := bucketIndex(item, i, width) + b := &me.Buckets[k] + count := b.CountsSum + switch { + // empty bucket (zero count) + case count == 0: + b.Fingerprint = fingerprint + clear(b.Counts) + b.Counts[b.First] = increment + count = increment + + // this flow's bucket (equal fingerprint) + case b.Fingerprint == fingerprint: + b.Counts[b.First] += increment + count += increment + + // another flow's bucket (nonequal fingerprint) + default: + // can't be inlined, so not factored out + var decay float32 + lookupTableSize := uint32(len(me.DecayLookupTable)) + for incrementRemaining := increment; incrementRemaining > 0; incrementRemaining-- { + if count < lookupTableSize { + decay = me.DecayLookupTable[count] + } else { + decay = float32(math.Pow( + float64(me.DecayLookupTable[lookupTableSize-1]), + float64(count/(lookupTableSize-1)))) * me.DecayLookupTable[count%(lookupTableSize-1)] + } + if rand.Float32() < decay { + countsMinIdx := b.findNonzeroMinimumCount() + b.Counts[countsMinIdx]-- + count-- + if count == 0 { + b.Fingerprint = fingerprint + count = incrementRemaining + break + } + } + } + } + + b.CountsSum = count + maxCount = uint32Max(maxCount, count) + } + + me.Heap.Update(item, fingerprint, maxCount) +} + +// Query returns whether the given item is in the top K items by count. +func (me *Sketch) Query(item string) bool { + return me.Heap.Contains(item) +} + +// TopK returns the top K items as a slice. +func (me *Sketch) TopK() []ItemWithCount { + out := make([]ItemWithCount, me.K) + for i, b := range me.Heap.Items { + if b.Count == 0 { + continue + } + out[i] = ItemWithCount{ + Item: b.Item, + Count: b.Count, + } + } + sort.Stable(sort.Reverse(byCount(out))) + + end := len(out) + for ; end > 0; end-- { + if out[end-1].Count > 0 { + break + } + } + + return out[:end] +} + +// Reset resets the sketch to an empty state. +func (me *Sketch) Reset() { + me.NextBucketToExpireIndex = 0 + for i := range me.Buckets { + me.Buckets[i].CountsSum = 0 + me.Buckets[i].Fingerprint = 0 + clear(me.Buckets[i].Counts) + } + clear(me.Buckets) + clear(me.Heap.Items) + clear(me.Heap.Index) +} + +type ItemWithCount struct { + Item string + Count uint32 +} + +type byCount []ItemWithCount + +func (a byCount) Len() int { return len(a) } +func (a byCount) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byCount) Less(i, j int) bool { + ic := a[i].Count + jc := a[j].Count + if ic == jc { + return a[i].Item > a[j].Item + } + return ic < jc +} + +func uint32Max(a, b uint32) uint32 { + if a > b { + return a + } + return b +} + +func intMax(a, b int) int { + if a > b { + return a + } + return b +} diff --git a/sketch_test.go b/sketch_test.go new file mode 100644 index 0000000..cb8a886 --- /dev/null +++ b/sketch_test.go @@ -0,0 +1,139 @@ +package topk_test + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + topk "github.com/keilerkonzept/sliding-topk" +) + +func TestSketch(t *testing.T) { + sketch := topk.New(2, 2, topk.WithWidth(10), topk.WithDepth(2), topk.WithBucketHistoryLength(2), topk.WithDecayLookupTableSize(1)) + + //t 0 + // + //X 3 + //Y 2 + //Z 1 + // [ _ _ ] {x:3,y:2}+ + sketch.Add("X", 3) + sketch.Add("Y", 2) + sketch.Add("Z", 1) + { + expected := []topk.ItemWithCount{ + {"X", 3}, + {"Y", 2}, + } + actual := sketch.TopK() + if diff := cmp.Diff(expected, actual); diff != "" { + t.Error(diff) + } + } + sketch.Tick() + + //t 0 1 + // + //X 3 2 + //Y 2 2 + //Z 1 1 + // [ _ _ ] {x:5,y:4} + sketch.Add("X", 2) + sketch.Add("Y", 2) + sketch.Add("Z", 1) + { + expected := []topk.ItemWithCount{ + {"X", 5}, + {"Y", 4}, + } + actual := sketch.TopK() + if diff := cmp.Diff(expected, actual); diff != "" { + t.Error(diff) + } + } + sketch.Tick() + + //t 0 1 2 + // + //X 3 2 0 + //Y 2 2 1 + //Z 1 1 3 + // [ _ _ ] {x:5,y:4} + // [ _ _ ] {z:4,y:3} + sketch.Add("Y", 1) + sketch.Add("Z", 3) + { + expected := []topk.ItemWithCount{ + {"Z", 4}, + {"Y", 3}, + } + actual := sketch.TopK() + if diff := cmp.Diff(expected, actual); diff != "" { + t.Error(diff) + } + } + sketch.Tick() + + //t 0 1 2 3 + // + //X 3 2 0 0 + //Y 2 2 1 1 + //Z 1 1 3 3 + // [ _ _ ] {x:5,y:4} + // [ _ _ ] {z:4,y:3} + // [ _ _ ] {z:6:y:2} + sketch.Add("Y", 1) + sketch.Add("Z", 3) + { + expected := []topk.ItemWithCount{ + {"Z", 6}, + {"Y", 2}, + } + actual := sketch.TopK() + if diff := cmp.Diff(expected, actual); diff != "" { + t.Error(diff) + } + } + + sketch.Tick() + //t 0 1 2 3 4 + // + //X 3 2 0 0 0 + //Y 2 2 1 1 0 + //Z 1 1 3 3 0 + // [ _ _ ] {x:5,y:4} + // [ _ _ ] {z:4,y:3} + // [ _ _ ] {z:6:y:2} + // [ _ _ ] {z:3:y:1} + { + expected := []topk.ItemWithCount{ + {"Z", 3}, + {"Y", 1}, + } + actual := sketch.TopK() + if diff := cmp.Diff(expected, actual); diff != "" { + t.Error(diff) + } + } + + sketch.Tick() + sketch.Add("X", 1) + //t 0 1 2 3 4 5 + // + //X 3 2 0 0 0 1 + //Y 2 2 1 1 0 0 + //Z 1 1 3 3 0 0 + // [ _ _ ] {x:5,y:4} + // [ _ _ ] {z:4,y:3} + // [ _ _ ] {z:6:y:2} + // [ _ _ ] {z:3:y:1} + // [ _ _ ] {x:1} + { + expected := []topk.ItemWithCount{ + {"X", 1}, + } + actual := sketch.TopK() + if diff := cmp.Diff(expected, actual); diff != "" { + t.Error(diff) + } + } +}