Skip to content

Commit

Permalink
CBG-3775: [3.1.4 Backport] Fix for memeory issues seen in blipcollect…
Browse files Browse the repository at this point in the history
…ion contexts (#6679)

* CBG-3775: Fix for memeory issues seen in blipcollection contexts

* remove repro test

* remove comment
  • Loading branch information
gregns1 authored Feb 12, 2024
1 parent d3b758c commit d0d5060
Showing 1 changed file with 17 additions and 36 deletions.
53 changes: 17 additions & 36 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type blipHandler struct {
*BlipSyncContext
db *Database // Handler-specific copy of the BlipSyncContext's blipContextDb
collection *DatabaseCollectionWithUser // Handler-specific copy of the BlipSyncContext's collection specific DB
collectionCtx *blipSyncCollectionContext // Sync-specific data for this collection
collectionIdx *int // index into BlipSyncContext.collectionMapping for the collection
loggingCtx context.Context // inherited from BlipSyncContext.loggingCtx with additional handler-only information (like keyspace)
serialNumber uint64 // This blip handler's serial number to differentiate logs w/ other handlers
Expand Down Expand Up @@ -166,7 +167,11 @@ func collectionBlipHandler(next blipHandlerFunc) blipHandlerFunc {
if err != nil {
return err
}
bh.collections.setNonCollectionAware(newBlipSyncCollectionContext(bh.loggingCtx, bh.collection.DatabaseCollection))
bh.collectionCtx, err = bh.collections.get(nil)
if err != nil {
bh.collections.setNonCollectionAware(newBlipSyncCollectionContext(bh.loggingCtx, bh.collection.DatabaseCollection))
bh.collectionCtx, _ = bh.collections.get(nil)
}
return next(bh, bm)
}
if !bh.collections.hasNamedCollections() {
Expand All @@ -179,12 +184,12 @@ func collectionBlipHandler(next blipHandlerFunc) blipHandlerFunc {
}

bh.collectionIdx = &collectionIndex
collectionCtx, err := bh.collections.get(&collectionIndex)
bh.collectionCtx, err = bh.collections.get(&collectionIndex)
if err != nil {
return base.HTTPErrorf(http.StatusBadRequest, fmt.Sprintf("%s", err))
}
bh.collection = &DatabaseCollectionWithUser{
DatabaseCollection: collectionCtx.dbCollection,
DatabaseCollection: bh.collectionCtx.dbCollection,
user: bh.db.user,
}
bh.loggingCtx = base.CollectionLogCtx(bh.BlipSyncContext.loggingCtx, bh.collection.Name)
Expand Down Expand Up @@ -260,10 +265,7 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error {
}

// Ensure that only _one_ subChanges subscription can be open on this blip connection at any given time. SG #3222.
collectionCtx, err := bh.collections.get(bh.collectionIdx)
if err != nil {
return base.HTTPErrorf(http.StatusBadRequest, fmt.Sprintf("%s", err))
}
collectionCtx := bh.collectionCtx
collectionCtx.changesCtxLock.Lock()
defer collectionCtx.changesCtxLock.Unlock()
if !collectionCtx.activeSubChanges.CASRetry(false, true) {
Expand Down Expand Up @@ -359,10 +361,7 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error {
}

func (bh *blipHandler) handleUnsubChanges(rq *blip.Message) error {
collectionCtx, err := bh.collections.get(bh.collectionIdx)
if err != nil {
return err
}
collectionCtx := bh.collectionCtx
collectionCtx.changesCtxLock.Lock()
defer collectionCtx.changesCtxLock.Unlock()
collectionCtx.changesCtxCancel()
Expand Down Expand Up @@ -632,10 +631,7 @@ func (bh *blipHandler) handleChanges(rq *blip.Message) error {
return err
}

collectionCtx, err := bh.collections.get(bh.collectionIdx)
if err != nil {
return err
}
collectionCtx := bh.collectionCtx

bh.logEndpointEntry(rq.Profile(), fmt.Sprintf("#Changes:%d", len(changeList)))
if len(changeList) == 0 {
Expand Down Expand Up @@ -889,17 +885,12 @@ func (bh *blipHandler) handleNoRev(rq *blip.Message) error {
base.InfofCtx(bh.loggingCtx, base.KeySyncMsg, "%s: norev for doc %q / %q seq:%q - error: %q - reason: %q",
rq.String(), base.UD(docID), revID, seqStr, rq.Properties[NorevMessageError], rq.Properties[NorevMessageReason])

collectionCtx, err := bh.collections.get(bh.collectionIdx)
if err != nil {
return err
}

if collectionCtx.sgr2PullProcessedSeqCallback != nil {
if bh.collectionCtx.sgr2PullProcessedSeqCallback != nil {
seq, err := ParseJSONSequenceID(seqStr)
if err != nil {
base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from norev message: %v - not tracking for checkpointing", seqStr, err)
} else {
collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
}
}

Expand Down Expand Up @@ -971,19 +962,14 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
return err
}

collectionCtx, err := bh.collections.get(bh.collectionIdx)
if err != nil {
return err
}

stats.docsPurgedCount.Add(1)
if collectionCtx.sgr2PullProcessedSeqCallback != nil {
if bh.collectionCtx.sgr2PullProcessedSeqCallback != nil {
seqStr := rq.Properties[RevMessageSequence]
seq, err := ParseJSONSequenceID(seqStr)
if err != nil {
base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from rev message: %v - not tracking for checkpointing", seqStr, err)
} else {
collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
}
}
return nil
Expand Down Expand Up @@ -1193,18 +1179,13 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
return err
}

collectionCtx, err := bh.collections.get(bh.collectionIdx)
if err != nil {
return err
}

if collectionCtx.sgr2PullProcessedSeqCallback != nil {
if bh.collectionCtx.sgr2PullProcessedSeqCallback != nil {
seqProperty := rq.Properties[RevMessageSequence]
seq, err := ParseJSONSequenceID(seqProperty)
if err != nil {
base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from rev message: %v - not tracking for checkpointing", seqProperty, err)
} else {
collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
}
}

Expand Down

0 comments on commit d0d5060

Please sign in to comment.