Skip to content

Commit

Permalink
move code to be more mutex efficicent
Browse files Browse the repository at this point in the history
  • Loading branch information
gregns1 committed Nov 5, 2024
1 parent abc68be commit 908fb6e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
5 changes: 2 additions & 3 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,9 +993,8 @@ func (c *changeCache) PushSkipped(ctx context.Context, startSeq uint64, endSeq u
if numSeqs > MinSequencesForRange {
c.skippedSeqs.PushSkippedSequenceEntry(NewSkippedSequenceRangeEntry(startSeq, endSeq))
} else {
for i := startSeq; i <= endSeq; i++ {
c.skippedSeqs.PushSkippedSequenceEntry(NewSingleSkippedSequenceEntry(i))
}
// push sequences as separate entries
c.skippedSeqs.PushSkippedSequenceEntries(startSeq, endSeq, int64(numSeqs))
}
}

Expand Down
29 changes: 29 additions & 0 deletions db/skipped_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,35 @@ func (s *SkippedSequenceSlice) _insert(index int, entry *SkippedSequenceListEntr
s.list[index] = entry
}

// PushSkippedSequenceEntries will push seq range to end of slice as separate single sequence entries unless the range
// being pushed in contiguous with end of the slice and that contiguous range is grater than the min range threshold
func (s *SkippedSequenceSlice) PushSkippedSequenceEntries(startSeq, endSeq uint64, numSeqsIncoming int64) {
s.lock.Lock()
defer s.lock.Unlock()

// update num current skipped sequences count + the cumulative count of skipped sequences
s.NumCurrentSkippedSequences += numSeqsIncoming
s.NumCumulativeSkippedSequences += numSeqsIncoming

// check if we should be extending the last entry on the slice
if len(s.list) != 0 {
index := len(s.list) - 1
lastEntryLastSeq := s.list[index].getLastSeq()
totalSeqs := s.list[index].getNumSequencesInEntry() + numSeqsIncoming
if (lastEntryLastSeq+1) == startSeq && totalSeqs > MinSequencesForRange {
// adding contiguous sequence, and we are above the min threshold to hold a range in list
// set last seq in the range to the new arriving sequence + alter timestamp to incoming entries timestamp
s.list[index].extendRange(endSeq, time.Now().Unix())
return
}
}

// push all items separate to end of slice
for i := startSeq; i <= endSeq; i++ {
s.list = append(s.list, NewSingleSkippedSequenceEntry(i))
}
}

// PushSkippedSequenceEntry will append a new skipped sequence entry to the end of the slice, if adding a contiguous
// sequence function will expand the last entry of the slice to reflect this
func (s *SkippedSequenceSlice) PushSkippedSequenceEntry(entry *SkippedSequenceListEntry) {
Expand Down

0 comments on commit 908fb6e

Please sign in to comment.