Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
sgreben committed Aug 29, 2024
0 parents commit 270cd8f
Show file tree
Hide file tree
Showing 10 changed files with 738 additions and 0 deletions.
28 changes: 28 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This workflow will build a golang project
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go

name: Go

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:

build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.23'

- name: Build
run: go build -v ./...

- name: Test
run: go test -v ./...
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# sliding-topk

Sliding HeavyKeeper, as described in ["A Sketch Framework for Approximate Data Stream Processing in Sliding Windows"](https://yangtonghome.github.io/uploads/SlidingSketch_TKDE2022_final.pdf)

```go
import (
topk "github.com/keilerkonzept/sliding-topk"
)

func main() {
// make a new sketch keeping track of k=3 items over a window of the last 60 ticks
// use width=1024 x depth=3 = 3072 buckets
sketch := topk.New(3, 60, topk.WithWidth(1024),topk.WithDepth(3))

log.Println("the sketch takes", sketch.SizeBytes(), "bytes in memory")

sketch.Incr("an item") // count "an item" 1 time
sketch.Add("an item", 123) // count "an item" 123 times
sketch.Tick(1) // advance time by one tick
sketch.Add("another item", 4) // count "another item" 4 times
sketch.Tick(2) // advance time by two ticks
sketch.Add("an item", 5) // count "an item" 5 more times
sketch.Add("yet another item", 6) // count "yet another item" 6 times

if sketch.Query("an item") {
// "an item" is in the top K items observed within the last 60 ticks
}

_ = sketch.Count("another item") // return the estimated count for "another item"

for entry := range sketch.TopK() {// TopK() rseturn all top K items as a slice of {Item,Count} structs
log.Println(entry.Item, "counted", entry.Count, "times")
}

sketch.Reset() // reset to New() state
}
51 changes: 51 additions & 0 deletions bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package topk

type Bucket struct {
Fingerprint uint32

// Counts is a circular buffer (with its first entry at .First)
Counts []uint32
First uint32
// CountsSum is the current sum of Counts
CountsSum uint32
}

func (me *Bucket) tick() {
if me.CountsSum == 0 {
return
}

last := me.First
if last == 0 {
last = uint32(len(me.Counts) - 1)
} else {
last = uint32(last - 1)
}
me.CountsSum -= me.Counts[last]
me.Counts[last] = 0
me.First = last
}

func (me *Bucket) findNonzeroMinimumCount() int {
countsMinIdx := uint32(0)
first := true
var countsMin uint32
i := me.First
for range len(me.Counts) {
if i == uint32(len(me.Counts)) {
i = 0
}
c := me.Counts[i]
if c == 0 {
i++
continue
}
if first || c < countsMin {
countsMin = c
countsMinIdx = i
first = false
}
i++
}
return int(countsMinIdx)
}
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module github.com/keilerkonzept/sliding-topk

go 1.23.0

require github.com/google/go-cmp v0.6.0

require github.com/OneOfOne/xxhash v1.2.8
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
14 changes: 14 additions & 0 deletions hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package topk

import "github.com/OneOfOne/xxhash"

const hashSeed = 4848280

func Fingerprint(item string) uint32 {
return xxhash.ChecksumString32S(item, hashSeed)
}

func bucketIndex(item string, row, width int) int {
column := int(xxhash.ChecksumString32S(item, uint32(row))) % width
return row*width + column
}
135 changes: 135 additions & 0 deletions min_heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package topk

import "container/heap"

type HeapItem struct {
Fingerprint uint32
Item string
Count uint32
}

type MinHeap struct {
Items []HeapItem
Index map[string]int
StoredKeysBytes int
}

func NewMinHeap(k int) *MinHeap {
return &MinHeap{
Items: make([]HeapItem, k),
Index: make(map[string]int, k),
}
}

var _ heap.Interface = &MinHeap{}

func (me MinHeap) SizeBytes() int {
structSize := sizeofBucketMinHeapStruct
bucketsSize := len(me.Items)*sizeofHeapBucket + me.StoredKeysBytes
indexSize := sizeofStringIntMap + (sizeofInt+sizeofString)*len(me.Index)
return structSize + bucketsSize + indexSize
}

func (me *MinHeap) Reinit() {
heap.Init(me)
for me.Len() > 0 && me.Items[0].Count == 0 {
item := me.Items[0].Item
heap.Pop(me)
delete(me.Index, item)
}
}

func (me MinHeap) Full() bool { return len(me.Items) == cap(me.Items) }
func (me MinHeap) Len() int { return len(me.Items) }
func (me MinHeap) Less(i, j int) bool {
ic := me.Items[i].Count
jc := me.Items[j].Count
if ic == jc {
return me.Items[i].Item < me.Items[j].Item
}
return ic < jc
}
func (me MinHeap) Swap(i, j int) {
itemi := me.Items[i].Item
itemj := me.Items[j].Item
me.Items[i], me.Items[j] = me.Items[j], me.Items[i]
me.Index[itemi] = j
me.Index[itemj] = i
}

func (me *MinHeap) Push(x interface{}) {
b := x.(HeapItem)
me.Items = append(me.Items, b)
me.Index[b.Item] = len(me.Items) - 1
}

func (me *MinHeap) Pop() interface{} {
old := me.Items
n := len(old)
x := old[n-1]
me.Items = old[0 : n-1]
delete(me.Index, x.Item)
return x
}

// Min returns the minimum count in the heap or 0 if the heap is empty.
func (me MinHeap) Min() uint32 {
if len(me.Items) == 0 {
return 0
}
return me.Items[0].Count
}

func (me MinHeap) Find(item string) (i int) {
if i, ok := me.Index[item]; ok {
return i
}
return -1
}

func (me MinHeap) Contains(item string) bool {
_, ok := me.Index[item]
return ok
}

func (me MinHeap) Get(item string) *HeapItem {
if i, ok := me.Index[item]; ok {
return &me.Items[i]
}
return nil
}

func (me *MinHeap) Update(item string, fingerprint uint32, count uint32) {
if count < me.Min() && me.Full() { // not in top k: ignore
return
}

if i := me.Find(item); i >= 0 { // already in heap: update count
me.Items[i].Count = count
heap.Fix(me, i)
return
}

me.StoredKeysBytes += len(item)

if !me.Full() { // heap not full: add to heap
me.Push(HeapItem{
Count: count,
Fingerprint: fingerprint,
Item: item,
})
return
}

// replace min on heap
minItem := me.Items[0].Item
me.StoredKeysBytes -= len(minItem)
delete(me.Index, minItem)
me.Items[0] = HeapItem{
Count: count,
Fingerprint: fingerprint,
Item: item,
}
me.Index[item] = 0
heap.Fix(me, 0)
}
15 changes: 15 additions & 0 deletions sizeof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package topk

import "unsafe"

const (
sizeofSketchStruct = int(unsafe.Sizeof(Sketch{}))
sizeofBucketStruct = int(unsafe.Sizeof(Bucket{}))
sizeofBucketMinHeapStruct = int(unsafe.Sizeof(MinHeap{}))
sizeofHeapBucket = int(unsafe.Sizeof(HeapItem{}))
sizeofStringIntMap = int(unsafe.Sizeof(map[string]int{}))
sizeofString = int(unsafe.Sizeof(""))
sizeofInt = int(unsafe.Sizeof(int(0)))
sizeofUInt32 = int(unsafe.Sizeof(uint32(0)))
sizeofFloat32 = int(unsafe.Sizeof(float32(0)))
)
Loading

0 comments on commit 270cd8f

Please sign in to comment.