Skip to content

Commit

Permalink
[web] Use ring buffer for finite rolling snapshot history
Browse files Browse the repository at this point in the history
  • Loading branch information
GordStephen committed Nov 3, 2023
1 parent cbca66f commit 4a09217
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 21 deletions.
27 changes: 7 additions & 20 deletions sinks/web/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package web
import (
"encoding/json"
"embed"
"errors"
"fmt"
"io/fs"
"net/http"
Expand All @@ -25,13 +24,15 @@ type WebDataSink struct {
bind string

Metadata inertia.SystemMetadata
States []inertia.Snapshot
States SnapshotRing

}

func New(bind string) *WebDataSink {
func New(bind string, bufferlength int) *WebDataSink {

wv := &WebDataSink { bind: bind }
buffer := NewSnapshotRing(bufferlength)

wv := &WebDataSink { bind: bind, States: buffer }
return wv

}
Expand All @@ -53,7 +54,7 @@ func (wv *WebDataSink) Init(meta inertia.SystemMetadata) error {
}

func (wv *WebDataSink) Update(state inertia.Snapshot) {
wv.States = append(wv.States, state)
wv.States.Push(state)
}

func serveMetadata(wv *WebDataSink) http.HandlerFunc {
Expand Down Expand Up @@ -91,7 +92,7 @@ func serveInertiaData(wv *WebDataSink) http.HandlerFunc {
return
}

state, err := getNewer(wv.States, latest)
state, err := wv.States.FirstAfter(latest)
if err != nil {
w.WriteHeader(NoNewData)
fmt.Fprintln(w, "No new data")
Expand Down Expand Up @@ -126,20 +127,6 @@ func parseTime(timestamp string) (time.Time, error) {

}

func getNewer(states []inertia.Snapshot, latest time.Time) (inertia.Snapshot, error) {

for _, state := range states {

if state.Time.After(latest) {
return state, nil
}

}

return inertia.Snapshot {}, errors.New("No newer states avaialable")

}

// TODO: Just define appropriate methods in inertia/internal?
func jsonify_meta(meta inertia.SystemMetadata) ([]byte, error) {

Expand Down
79 changes: 79 additions & 0 deletions sinks/web/ringbuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package web

import (
"errors"
"time"

"github.com/G-PST/inertia"
)

// Important assumptions:
// Snapshots are always added in chronological order
// Elements cannot be removed, only added (such that the ring is always full
// if ring.first > 0)
type SnapshotRing struct {
buffer []inertia.Snapshot
first int
last int
length int
}

func NewSnapshotRing(length int) SnapshotRing {

buffer := make([]inertia.Snapshot, length, length)
return SnapshotRing { buffer, -1, -1, length }

}

func (ring *SnapshotRing) Push(state inertia.Snapshot) {

isEmpty := ring.first < 0
isFull := ring.first > 0 || (ring.last + 1) == ring.length

if (isFull || isEmpty) {

ring.first += 1
ring.first %= ring.length

}

ring.last += 1
ring.last %= ring.length

ring.buffer[ring.last] = state

}

func (ring SnapshotRing) FirstAfter(t time.Time) (inertia.Snapshot, error) {

empty_err := errors.New("No newer states available")

i := ring.first

if i < 0 {
return inertia.Snapshot {}, empty_err
}

for {

state := ring.buffer[i]

if state.Time.After(t) {
return state, nil
}

i += 1

if (i == ring.length) {
i = 0
}

if (i == ring.first) {
break
}

}

return inertia.Snapshot {}, empty_err

}
102 changes: 102 additions & 0 deletions sinks/web/ringbuffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package web

import (
"testing"
"time"

"github.com/G-PST/inertia"
)

func TestRingBuffer(t *testing.T) {

ring := NewSnapshotRing(2)
t0 := time.Time {}

_, err := ring.FirstAfter(t0)
if err == nil {
t.Errorf("Query should report no result when ring is empty")
}

state1 := inertia.Snapshot { Time: time.Now() }
ring.Push(state1)

state_result, err := ring.FirstAfter(t0)
if state_result.Time != state1.Time {
t.Errorf("Result should match only contained state")
}

_, err = ring.FirstAfter(state1.Time)
if err == nil {
t.Errorf("Query should report no result when states are older")
}

state2 := inertia.Snapshot { Time: state1.Time.Add(5000000000) }
ring.Push(state2)

state_result, err = ring.FirstAfter(t0)
if state_result.Time != state1.Time {
t.Errorf("Result should match oldest contained state")
}

state_result, err = ring.FirstAfter(state1.Time)
if state_result.Time != state2.Time {
t.Errorf("Result should match newest contained state")
}

_, err = ring.FirstAfter(state2.Time)
if err == nil {
t.Errorf("Query should report no result when states are older")
}

state3 := inertia.Snapshot { Time: state2.Time.Add(5000000000) }
ring.Push(state3)

state_result, err = ring.FirstAfter(t0)
if state_result.Time != state2.Time {
t.Errorf("Result should match oldest contained state")
}

state_result, err = ring.FirstAfter(state1.Time)
if state_result.Time != state2.Time {
t.Errorf("Result should match oldest contained state")
}

state_result, err = ring.FirstAfter(state2.Time)
if state_result.Time != state3.Time {
t.Errorf("Result should match newest contained state")
}

_, err = ring.FirstAfter(state3.Time)
if err == nil {
t.Errorf("Query should report no result when states are older")
}

state4 := inertia.Snapshot { Time: state3.Time.Add(5000000000) }
ring.Push(state4)

state_result, err = ring.FirstAfter(t0)
if state_result.Time != state3.Time {
t.Errorf("Result should match oldest contained state")
}

state_result, err = ring.FirstAfter(state1.Time)
if state_result.Time != state3.Time {
t.Errorf("Result should match oldest contained state")
}

state_result, err = ring.FirstAfter(state2.Time)
if state_result.Time != state3.Time {
t.Errorf("Result should match newest contained state")
}

state_result, err = ring.FirstAfter(state3.Time)
if state_result.Time != state4.Time {
t.Errorf("Result should match newest contained state")
}

_, err = ring.FirstAfter(state4.Time)
if err == nil {
t.Errorf("Query should report no result when states are older")
}

}
2 changes: 1 addition & 1 deletion sinks/web/test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func main() {

datasource := d.New(10 * time.Second)
sinks := []inertia.DataSink { web.New(":8181") }
sinks := []inertia.DataSink { web.New(":8080", 4) }

inertia.Run(datasource, sinks, 500 * time.Millisecond, time.Second)

Expand Down

0 comments on commit 4a09217

Please sign in to comment.