diff --git a/sinks/web/main.go b/sinks/web/main.go index d947b7a..35fca8d 100644 --- a/sinks/web/main.go +++ b/sinks/web/main.go @@ -3,7 +3,6 @@ package web import ( "encoding/json" "embed" - "errors" "fmt" "io/fs" "net/http" @@ -25,13 +24,15 @@ type WebDataSink struct { bind string Metadata inertia.SystemMetadata - States []inertia.Snapshot + States SnapshotRing } -func New(bind string) *WebDataSink { +func New(bind string, bufferlength int) *WebDataSink { - wv := &WebDataSink { bind: bind } + buffer := NewSnapshotRing(bufferlength) + + wv := &WebDataSink { bind: bind, States: buffer } return wv } @@ -53,7 +54,7 @@ func (wv *WebDataSink) Init(meta inertia.SystemMetadata) error { } func (wv *WebDataSink) Update(state inertia.Snapshot) { - wv.States = append(wv.States, state) + wv.States.Push(state) } func serveMetadata(wv *WebDataSink) http.HandlerFunc { @@ -91,7 +92,7 @@ func serveInertiaData(wv *WebDataSink) http.HandlerFunc { return } - state, err := getNewer(wv.States, latest) + state, err := wv.States.FirstAfter(latest) if err != nil { w.WriteHeader(NoNewData) fmt.Fprintln(w, "No new data") @@ -126,20 +127,6 @@ func parseTime(timestamp string) (time.Time, error) { } -func getNewer(states []inertia.Snapshot, latest time.Time) (inertia.Snapshot, error) { - - for _, state := range states { - - if state.Time.After(latest) { - return state, nil - } - - } - - return inertia.Snapshot {}, errors.New("No newer states avaialable") - -} - // TODO: Just define appropriate methods in inertia/internal? func jsonify_meta(meta inertia.SystemMetadata) ([]byte, error) { diff --git a/sinks/web/ringbuffer.go b/sinks/web/ringbuffer.go new file mode 100644 index 0000000..8a87516 --- /dev/null +++ b/sinks/web/ringbuffer.go @@ -0,0 +1,79 @@ +package web + +import ( + "errors" + "time" + + "github.com/G-PST/inertia" +) + +// Important assumptions: +// Snapshots are always added in chronological order +// Elements cannot be removed, only added (such that the ring is always full +// if ring.first > 0) +type SnapshotRing struct { + buffer []inertia.Snapshot + first int + last int + length int +} + +func NewSnapshotRing(length int) SnapshotRing { + + buffer := make([]inertia.Snapshot, length, length) + return SnapshotRing { buffer, -1, -1, length } + +} + +func (ring *SnapshotRing) Push(state inertia.Snapshot) { + + isEmpty := ring.first < 0 + isFull := ring.first > 0 || (ring.last + 1) == ring.length + + if (isFull || isEmpty) { + + ring.first += 1 + ring.first %= ring.length + + } + + ring.last += 1 + ring.last %= ring.length + + ring.buffer[ring.last] = state + +} + +func (ring SnapshotRing) FirstAfter(t time.Time) (inertia.Snapshot, error) { + + empty_err := errors.New("No newer states available") + + i := ring.first + + if i < 0 { + return inertia.Snapshot {}, empty_err + } + + for { + + state := ring.buffer[i] + + if state.Time.After(t) { + return state, nil + } + + i += 1 + + if (i == ring.length) { + i = 0 + } + + if (i == ring.first) { + break + } + + } + + return inertia.Snapshot {}, empty_err + +} diff --git a/sinks/web/ringbuffer_test.go b/sinks/web/ringbuffer_test.go new file mode 100644 index 0000000..4da4cb4 --- /dev/null +++ b/sinks/web/ringbuffer_test.go @@ -0,0 +1,102 @@ +package web + +import ( + "testing" + "time" + + "github.com/G-PST/inertia" +) + +func TestRingBuffer(t *testing.T) { + + ring := NewSnapshotRing(2) + t0 := time.Time {} + + _, err := ring.FirstAfter(t0) + if err == nil { + t.Errorf("Query should report no result when ring is empty") + } + + state1 := inertia.Snapshot { Time: time.Now() } + ring.Push(state1) + + state_result, err := ring.FirstAfter(t0) + if state_result.Time != state1.Time { + t.Errorf("Result should match only contained state") + } + + _, err = ring.FirstAfter(state1.Time) + if err == nil { + t.Errorf("Query should report no result when states are older") + } + + state2 := inertia.Snapshot { Time: state1.Time.Add(5000000000) } + ring.Push(state2) + + state_result, err = ring.FirstAfter(t0) + if state_result.Time != state1.Time { + t.Errorf("Result should match oldest contained state") + } + + state_result, err = ring.FirstAfter(state1.Time) + if state_result.Time != state2.Time { + t.Errorf("Result should match newest contained state") + } + + _, err = ring.FirstAfter(state2.Time) + if err == nil { + t.Errorf("Query should report no result when states are older") + } + + state3 := inertia.Snapshot { Time: state2.Time.Add(5000000000) } + ring.Push(state3) + + state_result, err = ring.FirstAfter(t0) + if state_result.Time != state2.Time { + t.Errorf("Result should match oldest contained state") + } + + state_result, err = ring.FirstAfter(state1.Time) + if state_result.Time != state2.Time { + t.Errorf("Result should match oldest contained state") + } + + state_result, err = ring.FirstAfter(state2.Time) + if state_result.Time != state3.Time { + t.Errorf("Result should match newest contained state") + } + + _, err = ring.FirstAfter(state3.Time) + if err == nil { + t.Errorf("Query should report no result when states are older") + } + + state4 := inertia.Snapshot { Time: state3.Time.Add(5000000000) } + ring.Push(state4) + + state_result, err = ring.FirstAfter(t0) + if state_result.Time != state3.Time { + t.Errorf("Result should match oldest contained state") + } + + state_result, err = ring.FirstAfter(state1.Time) + if state_result.Time != state3.Time { + t.Errorf("Result should match oldest contained state") + } + + state_result, err = ring.FirstAfter(state2.Time) + if state_result.Time != state3.Time { + t.Errorf("Result should match newest contained state") + } + + state_result, err = ring.FirstAfter(state3.Time) + if state_result.Time != state4.Time { + t.Errorf("Result should match newest contained state") + } + + _, err = ring.FirstAfter(state4.Time) + if err == nil { + t.Errorf("Query should report no result when states are older") + } + +} diff --git a/sinks/web/test/main.go b/sinks/web/test/main.go index 178ed3f..40c8169 100644 --- a/sinks/web/test/main.go +++ b/sinks/web/test/main.go @@ -11,7 +11,7 @@ import ( func main() { datasource := d.New(10 * time.Second) - sinks := []inertia.DataSink { web.New(":8181") } + sinks := []inertia.DataSink { web.New(":8080", 4) } inertia.Run(datasource, sinks, 500 * time.Millisecond, time.Second)