Skip to content

Commit

Permalink
[YUNIKORN-1942] Null Batch API Response after buffer size change (#635)
Browse files Browse the repository at this point in the history
When the ring buffer size is changed the first ID in the buffer needs to
be updated. Normally the first ID is 0 but after resize this could be
any random number. That breaks id2pos and causes a nil return in the
REST call.

Closes: #635

Signed-off-by: Wilfred Spiegelenburg <[email protected]>
  • Loading branch information
pbacsko authored and wilfred-s committed Sep 8, 2023
1 parent f411c47 commit ea42210
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 56 deletions.
79 changes: 27 additions & 52 deletions pkg/events/event_ringbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,21 @@ type eventRange struct {

// eventRingBuffer A specialized circular buffer to store event objects.
//
// Unlike to regular circular buffers, existing entries are never directly removed and new entries can be added if the buffer is full.
// Unlike regular circular buffers, existing entries are never directly removed
// and new entries can be added if the buffer is full.
// In this case, the oldest entry is overwritten and can be collected by the GC.
// Each event has an ID, however, this mapping is not stored directly. If needed, we calculate the id
// of the event based on slice positions.
// Each event has an ID; however, this mapping is not stored directly.
// If needed, we calculate the id of the event based on slice positions.
//
// Retrieving the records can be achieved with GetEventsFromID and GetRecentEntries.
// Retrieving the records can be achieved with GetEventsFromID.
type eventRingBuffer struct {
events []*si.EventRecord
capacity uint64 // capacity of the buffer
head uint64 // position of the next element (no tail since we don't remove elements)
full bool // indicates whether the buffer if full - once it is, it stays full unless buffer is resized
id uint64 // unique id of an event record
lowestId uint64 // lowest id of an event record available in the buffer at any given time
events []*si.EventRecord
capacity uint64 // capacity of the buffer
head uint64 // position of the next element (no tail since we don't remove elements)
full bool // indicates whether the buffer if full - once it is, it stays full unless the buffer is resized
id uint64 // unique id of an event record
lowestId uint64 // lowest id of an event record available in the buffer at any given time
resizeOffset uint64 // used to aid the calculation of id->pos after resize (see id2pos)

sync.RWMutex
}
Expand All @@ -62,16 +64,13 @@ func (e *eventRingBuffer) Add(event *si.EventRecord) {
if !e.full {
e.full = e.head == e.capacity-1
} else {
// lowest event id updates when new event added to a full buffer
log.Log(log.Events).Debug("event buffer full, oldest event will be lost",
zap.String("id", strconv.FormatUint(e.lowestId, 10)))
e.lowestId++
}
e.head = (e.head + 1) % e.capacity
e.id++
}

// GetEventsFromID returns "count" number of event records from id if possible. The id can be determined from
// GetEventsFromID returns "count" number of event records from "id" if possible. The id can be determined from
// the first call of the method - if it returns nothing because the id is not in the buffer, the lowest valid
// identifier is returned which can be used to get the first batch.
// If the caller does not want to pose limit on the number of events returned, "count" must be set to a high
Expand Down Expand Up @@ -162,36 +161,18 @@ func (e *eventRingBuffer) getEntriesFromRanges(r1, r2 *eventRange) []*si.EventRe
}

// id2pos translates the unique event ID to an index in the event slice.
// If the event is present the position will be returned and the found flag will be true.
// In the case that the event ID is not present the position returned is 0 and the flag false.
// If the event is present, the position will be returned and the found flag will be true.
// If the event ID is not present, the position returned is 0 and the flag is false.
func (e *eventRingBuffer) id2pos(id uint64) (uint64, bool) {
pos := id % e.capacity
var calculatedID uint64 // calculated ID based on index values
if pos > e.head {
diff := pos - e.head
calculatedID = e.getLowestID() + diff
} else {
pId := e.id - 1
idAtZero := pId - (pId % e.capacity) // unique id at slice position 0
calculatedID = idAtZero + pos
}

if !e.full {
if e.head == 0 {
// empty
return 0, false
}
if pos >= e.head {
// "pos" is not in the [0..head-1] range
return 0, false
}
// id out of range?
if id < e.lowestId || id >= e.id {
return 0, false
}

if calculatedID != id {
return calculatedID, false
}

return pos, true
// resizeOffset tells how many elements were "shifted out" after resizing the buffer
// eg a buffer with 10 elements is full, then gets resized to 6
// the first element at index 0 is no longer 0 or the multiples of 10, but 4, 16, 22, etc.
return (id - e.resizeOffset) % e.capacity, true
}

// getLowestID returns the current lowest available id in the buffer.
Expand All @@ -206,15 +187,15 @@ func newEventRingBuffer(capacity uint64) *eventRingBuffer {
}
}

// called from Resize(), This functuin updates the lowest event id available in the buffer
// called from Resize(), this function updates the lowest event id available in the buffer
func (e *eventRingBuffer) updateLowestId(beginSize, endSize uint64) {
// if buffer size is increasing, lowestId stays the same
if beginSize < endSize {
return
}

// bufferSize is shrinking
// if number of events is < newSize no change
// if the number of events is < newSize, then no change
if (e.id - e.getLowestID()) <= endSize {
return
}
Expand All @@ -223,22 +204,18 @@ func (e *eventRingBuffer) updateLowestId(beginSize, endSize uint64) {
e.lowestId = e.id - endSize
}

// resize the existing ring buffer
// Resize resizes the existing ring buffer
// this method will be called upon configuration reload
func (e *eventRingBuffer) Resize(newSize uint64) {
e.Lock()
defer e.Unlock()

if newSize == e.capacity {
return // Nothing to do if the size is the same
return
}

initialSize := e.capacity

// Create a new buffer with the desired size
newEvents := make([]*si.EventRecord, newSize)

// Determine the number of events to copy
var numEventsToCopy uint64
if e.id-e.getLowestID() > newSize {
numEventsToCopy = newSize
Expand Down Expand Up @@ -270,11 +247,9 @@ func (e *eventRingBuffer) Resize(newSize uint64) {
copy(newEvents[e.capacity-startIndex:], e.events[:endIndex+1])
}

// Update the buffer's state
e.capacity = newSize
e.events = newEvents
e.head = numEventsToCopy % newSize

// Update e.full based on whether the buffer is full after resizing
e.resizeOffset = e.lowestId
e.full = numEventsToCopy == e.capacity
}
14 changes: 11 additions & 3 deletions pkg/events/event_ringbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,20 +192,23 @@ func TestResize(t *testing.T) {
assert.Equal(t, uint64(6), ringBuffer.capacity)
assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
assert.Equal(t, 6, len(ringBuffer.events))
assert.Equal(t, uint64(0), ringBuffer.resizeOffset)

// Test case 2: Resize to a smaller size
lastEventIdBeforeResize = ringBuffer.GetLastEventID()
ringBuffer.Resize(2)
assert.Equal(t, uint64(2), ringBuffer.capacity)
assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
assert.Equal(t, 2, len(ringBuffer.events))
assert.Equal(t, uint64(2), ringBuffer.resizeOffset)

// Test case 3: Resize to a larger size
lastEventIdBeforeResize = ringBuffer.GetLastEventID()
ringBuffer.Resize(20)
assert.Equal(t, uint64(20), ringBuffer.capacity)
assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
assert.Equal(t, 20, len(ringBuffer.events))
assert.Equal(t, uint64(2), ringBuffer.resizeOffset)

// Test case 4: Resize when head is at the last element
ringBuffer = newEventRingBuffer(5)
Expand All @@ -215,6 +218,7 @@ func TestResize(t *testing.T) {
assert.Equal(t, uint64(2), ringBuffer.capacity)
assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
assert.Equal(t, 2, len(ringBuffer.events))
assert.Equal(t, uint64(2), ringBuffer.resizeOffset)

// Test case 5: Resize to events length when head is at the last element
ringBuffer = newEventRingBuffer(5)
Expand All @@ -225,7 +229,8 @@ func TestResize(t *testing.T) {
assert.Equal(t, uint64(4), ringBuffer.capacity)
assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
assert.Equal(t, 4, len(ringBuffer.events))
assert.Equal(t, true, ringBuffer.full)
assert.Equal(t, uint64(0), ringBuffer.resizeOffset)
assert.Assert(t, ringBuffer.full)

// Test case 6: Resize when the buffer is full
ringBuffer = newEventRingBuffer(10)
Expand All @@ -237,6 +242,7 @@ func TestResize(t *testing.T) {
assert.Equal(t, 6, len(ringBuffer.events))
assert.Equal(t, uint64(0), ringBuffer.head)
assert.Equal(t, true, ringBuffer.full)
assert.Equal(t, uint64(4), ringBuffer.resizeOffset)

// Test case 7: Resize when the buffer is overflown (head is wrapped and position > 0)
ringBuffer = newEventRingBuffer(10)
Expand All @@ -248,7 +254,8 @@ func TestResize(t *testing.T) {
assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
assert.Equal(t, 8, len(ringBuffer.events))
assert.Equal(t, uint64(0), ringBuffer.head)
assert.Equal(t, true, ringBuffer.full)
assert.Equal(t, uint64(7), ringBuffer.resizeOffset)
assert.Assert(t, ringBuffer.full)

// Test case 8: Test event full : Resize to lower size, followed by resize to a large size
ringBuffer = newEventRingBuffer(10)
Expand All @@ -258,6 +265,7 @@ func TestResize(t *testing.T) {
assert.Equal(t, true, ringBuffer.full)
ringBuffer.Resize(6)
assert.Equal(t, false, ringBuffer.full)
assert.Equal(t, uint64(7), ringBuffer.resizeOffset)

// Test case 9: Test resize to same size
lastEventIdBeforeResize = ringBuffer.GetLastEventID()
Expand All @@ -266,7 +274,7 @@ func TestResize(t *testing.T) {
assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
assert.Equal(t, 6, len(ringBuffer.events))
assert.Equal(t, false, ringBuffer.full)

assert.Equal(t, uint64(7), ringBuffer.resizeOffset)
}

func populate(buffer *eventRingBuffer, count int) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/webservice/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ var webRoutes = routes{
route{
"Scheduler",
"GET",
"/ws/v1/events/batch/",
"/ws/v1/events/batch",
getEvents,
},
// endpoint to retrieve CPU, Memory profiling data,
Expand Down

0 comments on commit ea42210

Please sign in to comment.