Skip to content

Commit

Permalink
CBG-4420: fix legacy rev handling on processRev (#7248)
Browse files Browse the repository at this point in the history
  • Loading branch information
gregns1 authored Dec 17, 2024
1 parent 3d0adec commit 0da6692
Show file tree
Hide file tree
Showing 8 changed files with 520 additions and 38 deletions.
5 changes: 3 additions & 2 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
historyStr := rq.Properties[RevMessageHistory]
var incomingHLV *HybridLogicalVector
// Build history/HLV
var legacyRevList []string
changeIsVector := strings.Contains(rev, "@")
if !bh.useHLV() || !changeIsVector {
newDoc.RevID = rev
Expand All @@ -1080,7 +1081,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
if historyStr != "" {
versionVectorStr += ";" + historyStr
}
incomingHLV, err = ExtractHLVFromBlipMessage(versionVectorStr)
incomingHLV, legacyRevList, err = ExtractHLVFromBlipMessage(versionVectorStr)
if err != nil {
base.InfofCtx(bh.loggingCtx, base.KeySync, "Error parsing hlv while processing rev for doc %v. HLV:%v Error: %v", base.UD(docID), versionVectorStr, err)
return base.HTTPErrorf(http.StatusUnprocessableEntity, "error extracting hlv from blip message")
Expand Down Expand Up @@ -1298,7 +1299,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
// bh.conflictResolver != nil represents an active SGR2 and BLIPClientTypeSGR2 represents a passive SGR2
forceAllowConflictingTombstone := newDoc.Deleted && (bh.conflictResolver != nil || bh.clientType == BLIPClientTypeSGR2)
if bh.useHLV() && changeIsVector {
_, _, _, err = bh.collection.PutExistingCurrentVersion(bh.loggingCtx, newDoc, incomingHLV, rawBucketDoc)
_, _, _, err = bh.collection.PutExistingCurrentVersion(bh.loggingCtx, newDoc, incomingHLV, rawBucketDoc, legacyRevList)
} else if bh.conflictResolver != nil {
_, _, err = bh.collection.PutExistingRevWithConflictResolution(bh.loggingCtx, newDoc, history, true, bh.conflictResolver, forceAllowConflictingTombstone, rawBucketDoc, ExistingVersionWithUpdateToHLV)
} else {
Expand Down
4 changes: 2 additions & 2 deletions db/blip_sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b
if bsc.useDeltas && len(knownRevsArray) > 0 {
if revID, ok := knownRevsArray[0].(string); ok {
if versionVectorProtocol {
msgHLV, err := ExtractHLVFromBlipMessage(revID)
msgHLV, _, err := ExtractHLVFromBlipMessage(revID)
if err != nil {
base.DebugfCtx(ctx, base.KeySync, "Invalid known rev format for hlv on doc: %s falling back to full body replication.", base.UD(docID))
deltaSrcRevID = "" // will force falling back to full body replication below
Expand All @@ -376,7 +376,7 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b

for _, rev := range knownRevsArray {
if revID, ok := rev.(string); ok {
msgHLV, err := ExtractHLVFromBlipMessage(revID)
msgHLV, _, err := ExtractHLVFromBlipMessage(revID)
if err != nil {
// assume we have received legacy rev if we cannot parse hlv from known revs, and we are in vv replication
if versionVectorProtocol {
Expand Down
46 changes: 42 additions & 4 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,7 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod
return newRevID, doc, err
}

func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, newDocHLV *HybridLogicalVector, existingDoc *sgbucket.BucketDocument) (doc *Document, cv *Version, newRevID string, err error) {
func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, newDocHLV *HybridLogicalVector, existingDoc *sgbucket.BucketDocument, revTreeHistory []string) (doc *Document, cv *Version, newRevID string, err error) {
var matchRev string
if existingDoc != nil {
doc, unmarshalErr := db.unmarshalDocumentWithXattrs(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattrs, existingDoc.Cas, DocUnmarshalRev)
Expand Down Expand Up @@ -1216,9 +1216,18 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
}

// set up revTreeID for backward compatibility
previousRevTreeID := doc.CurrentRev
prevGeneration, _ := ParseRevID(ctx, previousRevTreeID)
newGeneration := prevGeneration + 1
var previousRevTreeID string
var prevGeneration int
var newGeneration int
if len(revTreeHistory) == 0 {
previousRevTreeID = doc.CurrentRev
prevGeneration, _ = ParseRevID(ctx, previousRevTreeID)
newGeneration = prevGeneration + 1
} else {
previousRevTreeID = revTreeHistory[0]
prevGeneration, _ = ParseRevID(ctx, previousRevTreeID)
newGeneration = prevGeneration + 1
}

// Conflict check here
// if doc has no HLV defined this is a new doc we haven't seen before, skip conflict check
Expand Down Expand Up @@ -1249,6 +1258,35 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
if newDocHLV.MergeVersions != nil {
doc.HLV.MergeVersions = newDocHLV.MergeVersions
}
// rev tree conflict check if we have rev tree history to check against
currentRevIndex := len(revTreeHistory)
parent := ""
if currentRevIndex > 0 {
for i, revid := range revTreeHistory {
if doc.History.contains(revid) {
currentRevIndex = i
parent = revid
break
}
}
// conflict check on rev tree history
if db.IsIllegalConflict(ctx, doc, parent, newDoc.Deleted, true, revTreeHistory) {
return nil, nil, false, nil, base.HTTPErrorf(http.StatusConflict, "Document revision conflict")
}
}
// Add all the new revisions to the rev tree:
for i := currentRevIndex - 1; i >= 0; i-- {
err := doc.History.addRevision(newDoc.ID,
RevInfo{
ID: revTreeHistory[i],
Parent: parent,
Deleted: i == 0 && newDoc.Deleted})

if err != nil {
return nil, nil, false, nil, err
}
parent = revTreeHistory[i]
}

// Process the attachments, replacing bodies with digests.
newAttachments, err := db.storeAttachments(ctx, doc, newDoc.DocAttachments, newGeneration, previousRevTreeID, nil)
Expand Down
10 changes: 5 additions & 5 deletions db/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1824,7 +1824,7 @@ func TestPutExistingCurrentVersion(t *testing.T) {
PreviousVersions: pv,
}

doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil)
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil, nil)
assertHTTPError(t, err, 409)
require.Nil(t, doc)
require.Nil(t, cv)
Expand All @@ -1834,7 +1834,7 @@ func TestPutExistingCurrentVersion(t *testing.T) {
// TODO: because currentRev isn't being updated, storeOldBodyInRevTreeAndUpdateCurrent isn't
// updating the document body. Need to review whether it makes sense to keep using
// storeOldBodyInRevTreeAndUpdateCurrent, or if this needs a larger overhaul to support VV
doc, cv, _, err = collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil)
doc, cv, _, err = collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil, nil)
require.NoError(t, err)
assert.Equal(t, "test", cv.SourceID)
assert.Equal(t, incomingVersion, cv.Value)
Expand All @@ -1856,7 +1856,7 @@ func TestPutExistingCurrentVersion(t *testing.T) {

// Attempt to push the same client update, validate server rejects as an already known version and cancels the update.
// This case doesn't return error, verify that SyncData hasn't been changed.
_, _, _, err = collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil)
_, _, _, err = collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil, nil)
require.NoError(t, err)
syncData2, err := collection.GetDocSyncData(ctx, "doc1")
require.NoError(t, err)
Expand Down Expand Up @@ -1902,7 +1902,7 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) {
}

// assert that a conflict is correctly identified and the doc and cv are nil
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil)
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil, nil)
assertHTTPError(t, err, 409)
require.Nil(t, doc)
require.Nil(t, cv)
Expand Down Expand Up @@ -1942,7 +1942,7 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {
PreviousVersions: pv,
}
// call PutExistingCurrentVersion with empty existing doc
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, &sgbucket.BucketDocument{})
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, &sgbucket.BucketDocument{}, nil)
require.NoError(t, err)
assert.NotNil(t, doc)
// assert on returned CV value
Expand Down
65 changes: 46 additions & 19 deletions db/hybrid_logical_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,75 +420,76 @@ func appendRevocationMacroExpansions(currentSpec []sgbucket.MacroExpansionSpec,
// 2. cv and pv: cv;pv
// 3. cv, pv, and mv: cv;mv;pv
//
// Function will return list of revIDs if legacy rev ID was found in the HLV history section (PV)
// TODO: CBG-3662 - Optimise once we've settled on and tested the format with CBL
func ExtractHLVFromBlipMessage(versionVectorStr string) (*HybridLogicalVector, error) {
func ExtractHLVFromBlipMessage(versionVectorStr string) (*HybridLogicalVector, []string, error) {
hlv := &HybridLogicalVector{}

vectorFields := strings.Split(versionVectorStr, ";")
vectorLength := len(vectorFields)
if (vectorLength == 1 && vectorFields[0] == "") || vectorLength > 3 {
return &HybridLogicalVector{}, fmt.Errorf("invalid hlv in changes message received")
return &HybridLogicalVector{}, nil, fmt.Errorf("invalid hlv in changes message received")
}

// add current version (should always be present)
cvStr := vectorFields[0]
version := strings.Split(cvStr, "@")
if len(version) < 2 {
return &HybridLogicalVector{}, fmt.Errorf("invalid version in changes message received")
return &HybridLogicalVector{}, nil, fmt.Errorf("invalid version in changes message received")
}

vrs, err := strconv.ParseUint(version[0], 16, 64)
if err != nil {
return &HybridLogicalVector{}, err
return &HybridLogicalVector{}, nil, err
}
err = hlv.AddVersion(Version{SourceID: version[1], Value: vrs})
if err != nil {
return &HybridLogicalVector{}, err
return &HybridLogicalVector{}, nil, err
}

switch vectorLength {
case 1:
// cv only
return hlv, nil
return hlv, nil, nil
case 2:
// only cv and pv present
sourceVersionListPV, err := parseVectorValues(vectorFields[1])
sourceVersionListPV, legacyRev, err := parseVectorValues(vectorFields[1])
if err != nil {
return &HybridLogicalVector{}, err
return &HybridLogicalVector{}, nil, err
}
hlv.PreviousVersions = make(HLVVersions)
for _, v := range sourceVersionListPV {
hlv.PreviousVersions[v.SourceID] = v.Value
}
return hlv, nil
return hlv, legacyRev, nil
case 3:
// cv, mv and pv present
sourceVersionListPV, err := parseVectorValues(vectorFields[2])
sourceVersionListPV, legacyRev, err := parseVectorValues(vectorFields[2])
hlv.PreviousVersions = make(HLVVersions)
if err != nil {
return &HybridLogicalVector{}, err
return &HybridLogicalVector{}, nil, err
}
for _, pv := range sourceVersionListPV {
hlv.PreviousVersions[pv.SourceID] = pv.Value
}

sourceVersionListMV, err := parseVectorValues(vectorFields[1])
sourceVersionListMV, _, err := parseVectorValues(vectorFields[1])
hlv.MergeVersions = make(HLVVersions)
if err != nil {
return &HybridLogicalVector{}, err
return &HybridLogicalVector{}, nil, err
}
for _, mv := range sourceVersionListMV {
hlv.MergeVersions[mv.SourceID] = mv.Value
}
return hlv, nil
return hlv, legacyRev, nil
default:
return &HybridLogicalVector{}, fmt.Errorf("invalid hlv in changes message received")
return &HybridLogicalVector{}, nil, fmt.Errorf("invalid hlv in changes message received")
}
}

// parseVectorValues takes an HLV section (cv, pv or mv) in string form and splits into
// source and version pairs
func parseVectorValues(vectorStr string) (versions []Version, err error) {
// source and version pairs. Also returns legacyRev list if legacy revID's are found in the input string.
func parseVectorValues(vectorStr string) (versions []Version, legacyRevList []string, err error) {
versionsStr := strings.Split(vectorStr, ",")
versions = make([]Version, 0, len(versionsStr))

Expand All @@ -500,12 +501,38 @@ func parseVectorValues(vectorStr string) (versions []Version, err error) {
}
version, err := ParseVersion(v)
if err != nil {
return nil, err
// If v is a legacy rev ID, ignore when constructing the HLV.
if isLegacyRev(v) {
legacyRevList = append(legacyRevList, v)
continue
}
return nil, nil, err
}
versions = append(versions, version)
}

return versions, nil
return versions, legacyRevList, nil
}

// isLegacyRev returns true if the given string is a revID, false otherwise. Has the same functionality as ParseRevID
// but doesn't warn for malformed revIDs
func isLegacyRev(rev string) bool {
if rev == "" {
return false
}

idx := strings.Index(rev, "-")
if idx == -1 {
return false
}

gen, err := strconv.Atoi(rev[:idx])
if err != nil {
return false
} else if gen < 1 {
return false
}
return true
}

// Helper functions for version source and value encoding
Expand Down
8 changes: 4 additions & 4 deletions db/hybrid_logical_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,13 +534,13 @@ func TestHLVMapToCBLString(t *testing.T) {
func TestInvalidHLVInBlipMessageForm(t *testing.T) {
hlvStr := "25@def; 22@def,21@eff; 20@abc,18@hij; 222@hiowdwdew, 5555@dhsajidfgd"

hlv, err := ExtractHLVFromBlipMessage(hlvStr)
hlv, _, err := ExtractHLVFromBlipMessage(hlvStr)
require.Error(t, err)
assert.ErrorContains(t, err, "invalid hlv in changes message received")
assert.Equal(t, &HybridLogicalVector{}, hlv)

hlvStr = ""
hlv, err = ExtractHLVFromBlipMessage(hlvStr)
hlv, _, err = ExtractHLVFromBlipMessage(hlvStr)
require.Error(t, err)
assert.ErrorContains(t, err, "invalid hlv in changes message received")
assert.Equal(t, &HybridLogicalVector{}, hlv)
Expand Down Expand Up @@ -615,7 +615,7 @@ func TestExtractHLVFromChangesMessage(t *testing.T) {
// TODO: When CBG-3662 is done, should be able to simplify base64 handling to treat source as a string
// that may represent a base64 encoding
base64EncodedHlvString := EncodeTestHistory(test.hlvString)
hlv, err := ExtractHLVFromBlipMessage(base64EncodedHlvString)
hlv, _, err := ExtractHLVFromBlipMessage(base64EncodedHlvString)
require.NoError(t, err)

assert.Equal(t, expectedVector.SourceID, hlv.SourceID)
Expand All @@ -634,7 +634,7 @@ func BenchmarkExtractHLVFromBlipMessage(b *testing.B) {
for _, bm := range extractHLVFromBlipMsgBMarkCases {
b.Run(bm.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, _ = ExtractHLVFromBlipMessage(bm.hlvString)
_, _, _ = ExtractHLVFromBlipMessage(bm.hlvString)
}
})
}
Expand Down
Loading

0 comments on commit 0da6692

Please sign in to comment.