Skip to content

Commit

Permalink
feat: only flush as realtime if not lagging (#16336)
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra authored Jul 3, 2023
1 parent ae50265 commit 312cbd6
Showing 1 changed file with 8 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,19 +173,22 @@ export class SessionManager {
const bufferAgeInMemory = now() - this.buffer.createdAt
const bufferAgeFromReference = referenceNow - this.buffer.oldestKafkaTimestamp

// We will use whichever age is oldest (largest). This handles the fact that the reference now can get "stuck" if there is no new data in the partition
const bufferAge = Math.max(bufferAgeInMemory, bufferAgeFromReference)
const bufferAgeIsOverThreshold = bufferAgeFromReference >= flushThresholdMillis
// check the in-memory age against a larger value than the flush threshold,
// otherwise we'll flap between reasons for flushing when close to real-time processing
const sessionAgeIsOverThreshold = bufferAgeInMemory >= flushThresholdMillis * 2

logContext['bufferAge'] = bufferAge
logContext['bufferAgeInMemory'] = bufferAgeInMemory
logContext['bufferAgeFromReference'] = bufferAgeFromReference
logContext['bufferAgeIsOverThreshold'] = bufferAgeIsOverThreshold
logContext['sessionAgeIsOverThreshold'] = sessionAgeIsOverThreshold

if (bufferAge >= flushThresholdMillis) {
if (bufferAgeIsOverThreshold || sessionAgeIsOverThreshold) {
status.info('🚽', `blob_ingester_session_manager flushing buffer due to age`, {
...logContext,
})
// return the promise and let the caller decide whether to await
return this.flush(bufferAgeInMemory > bufferAgeFromReference ? 'buffer_age' : 'buffer_age_realtime')
return this.flush(bufferAgeIsOverThreshold ? 'buffer_age' : 'buffer_age_realtime')
} else {
status.info('🚽', `blob_ingester_session_manager not flushing buffer due to age`, {
...logContext,
Expand Down

0 comments on commit 312cbd6

Please sign in to comment.