From 0da6692ddc15724c8fe98f2e104761ed9e1f5465 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith <109068393+gregns1@users.noreply.github.com> Date: Tue, 17 Dec 2024 17:27:39 +0000 Subject: [PATCH] CBG-4420: fix legacy rev handling on processRev (#7248) --- db/blip_handler.go | 5 +- db/blip_sync_context.go | 4 +- db/crud.go | 46 ++- db/crud_test.go | 10 +- db/hybrid_logical_vector.go | 65 ++-- db/hybrid_logical_vector_test.go | 8 +- rest/blip_legacy_revid_test.go | 416 ++++++++++++++++++++++++++ rest/utilities_testing_blip_client.go | 4 +- 8 files changed, 520 insertions(+), 38 deletions(-) diff --git a/db/blip_handler.go b/db/blip_handler.go index 11634a3ac5..5684bcdf0d 100644 --- a/db/blip_handler.go +++ b/db/blip_handler.go @@ -1068,6 +1068,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err historyStr := rq.Properties[RevMessageHistory] var incomingHLV *HybridLogicalVector // Build history/HLV + var legacyRevList []string changeIsVector := strings.Contains(rev, "@") if !bh.useHLV() || !changeIsVector { newDoc.RevID = rev @@ -1080,7 +1081,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err if historyStr != "" { versionVectorStr += ";" + historyStr } - incomingHLV, err = ExtractHLVFromBlipMessage(versionVectorStr) + incomingHLV, legacyRevList, err = ExtractHLVFromBlipMessage(versionVectorStr) if err != nil { base.InfofCtx(bh.loggingCtx, base.KeySync, "Error parsing hlv while processing rev for doc %v. HLV:%v Error: %v", base.UD(docID), versionVectorStr, err) return base.HTTPErrorf(http.StatusUnprocessableEntity, "error extracting hlv from blip message") @@ -1298,7 +1299,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err // bh.conflictResolver != nil represents an active SGR2 and BLIPClientTypeSGR2 represents a passive SGR2 forceAllowConflictingTombstone := newDoc.Deleted && (bh.conflictResolver != nil || bh.clientType == BLIPClientTypeSGR2) if bh.useHLV() && changeIsVector { - _, _, _, err = bh.collection.PutExistingCurrentVersion(bh.loggingCtx, newDoc, incomingHLV, rawBucketDoc) + _, _, _, err = bh.collection.PutExistingCurrentVersion(bh.loggingCtx, newDoc, incomingHLV, rawBucketDoc, legacyRevList) } else if bh.conflictResolver != nil { _, _, err = bh.collection.PutExistingRevWithConflictResolution(bh.loggingCtx, newDoc, history, true, bh.conflictResolver, forceAllowConflictingTombstone, rawBucketDoc, ExistingVersionWithUpdateToHLV) } else { diff --git a/db/blip_sync_context.go b/db/blip_sync_context.go index fa365c3dc8..5767174d93 100644 --- a/db/blip_sync_context.go +++ b/db/blip_sync_context.go @@ -361,7 +361,7 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b if bsc.useDeltas && len(knownRevsArray) > 0 { if revID, ok := knownRevsArray[0].(string); ok { if versionVectorProtocol { - msgHLV, err := ExtractHLVFromBlipMessage(revID) + msgHLV, _, err := ExtractHLVFromBlipMessage(revID) if err != nil { base.DebugfCtx(ctx, base.KeySync, "Invalid known rev format for hlv on doc: %s falling back to full body replication.", base.UD(docID)) deltaSrcRevID = "" // will force falling back to full body replication below @@ -376,7 +376,7 @@ func (bsc *BlipSyncContext) handleChangesResponse(ctx context.Context, sender *b for _, rev := range knownRevsArray { if revID, ok := rev.(string); ok { - msgHLV, err := ExtractHLVFromBlipMessage(revID) + msgHLV, _, err := ExtractHLVFromBlipMessage(revID) if err != nil { // assume we have received legacy rev if we cannot parse hlv from known revs, and we are in vv replication if versionVectorProtocol { diff --git a/db/crud.go b/db/crud.go index 09aa12beab..98dcee3752 100644 --- a/db/crud.go +++ b/db/crud.go @@ -1176,7 +1176,7 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod return newRevID, doc, err } -func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, newDocHLV *HybridLogicalVector, existingDoc *sgbucket.BucketDocument) (doc *Document, cv *Version, newRevID string, err error) { +func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, newDocHLV *HybridLogicalVector, existingDoc *sgbucket.BucketDocument, revTreeHistory []string) (doc *Document, cv *Version, newRevID string, err error) { var matchRev string if existingDoc != nil { doc, unmarshalErr := db.unmarshalDocumentWithXattrs(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattrs, existingDoc.Cas, DocUnmarshalRev) @@ -1216,9 +1216,18 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont } // set up revTreeID for backward compatibility - previousRevTreeID := doc.CurrentRev - prevGeneration, _ := ParseRevID(ctx, previousRevTreeID) - newGeneration := prevGeneration + 1 + var previousRevTreeID string + var prevGeneration int + var newGeneration int + if len(revTreeHistory) == 0 { + previousRevTreeID = doc.CurrentRev + prevGeneration, _ = ParseRevID(ctx, previousRevTreeID) + newGeneration = prevGeneration + 1 + } else { + previousRevTreeID = revTreeHistory[0] + prevGeneration, _ = ParseRevID(ctx, previousRevTreeID) + newGeneration = prevGeneration + 1 + } // Conflict check here // if doc has no HLV defined this is a new doc we haven't seen before, skip conflict check @@ -1249,6 +1258,35 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont if newDocHLV.MergeVersions != nil { doc.HLV.MergeVersions = newDocHLV.MergeVersions } + // rev tree conflict check if we have rev tree history to check against + currentRevIndex := len(revTreeHistory) + parent := "" + if currentRevIndex > 0 { + for i, revid := range revTreeHistory { + if doc.History.contains(revid) { + currentRevIndex = i + parent = revid + break + } + } + // conflict check on rev tree history + if db.IsIllegalConflict(ctx, doc, parent, newDoc.Deleted, true, revTreeHistory) { + return nil, nil, false, nil, base.HTTPErrorf(http.StatusConflict, "Document revision conflict") + } + } + // Add all the new revisions to the rev tree: + for i := currentRevIndex - 1; i >= 0; i-- { + err := doc.History.addRevision(newDoc.ID, + RevInfo{ + ID: revTreeHistory[i], + Parent: parent, + Deleted: i == 0 && newDoc.Deleted}) + + if err != nil { + return nil, nil, false, nil, err + } + parent = revTreeHistory[i] + } // Process the attachments, replacing bodies with digests. newAttachments, err := db.storeAttachments(ctx, doc, newDoc.DocAttachments, newGeneration, previousRevTreeID, nil) diff --git a/db/crud_test.go b/db/crud_test.go index 3938cf7f06..503c5aa572 100644 --- a/db/crud_test.go +++ b/db/crud_test.go @@ -1824,7 +1824,7 @@ func TestPutExistingCurrentVersion(t *testing.T) { PreviousVersions: pv, } - doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil) + doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil, nil) assertHTTPError(t, err, 409) require.Nil(t, doc) require.Nil(t, cv) @@ -1834,7 +1834,7 @@ func TestPutExistingCurrentVersion(t *testing.T) { // TODO: because currentRev isn't being updated, storeOldBodyInRevTreeAndUpdateCurrent isn't // updating the document body. Need to review whether it makes sense to keep using // storeOldBodyInRevTreeAndUpdateCurrent, or if this needs a larger overhaul to support VV - doc, cv, _, err = collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil) + doc, cv, _, err = collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil, nil) require.NoError(t, err) assert.Equal(t, "test", cv.SourceID) assert.Equal(t, incomingVersion, cv.Value) @@ -1856,7 +1856,7 @@ func TestPutExistingCurrentVersion(t *testing.T) { // Attempt to push the same client update, validate server rejects as an already known version and cancels the update. // This case doesn't return error, verify that SyncData hasn't been changed. - _, _, _, err = collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil) + _, _, _, err = collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil, nil) require.NoError(t, err) syncData2, err := collection.GetDocSyncData(ctx, "doc1") require.NoError(t, err) @@ -1902,7 +1902,7 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) { } // assert that a conflict is correctly identified and the doc and cv are nil - doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil) + doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, nil, nil) assertHTTPError(t, err, 409) require.Nil(t, doc) require.Nil(t, cv) @@ -1942,7 +1942,7 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) { PreviousVersions: pv, } // call PutExistingCurrentVersion with empty existing doc - doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, &sgbucket.BucketDocument{}) + doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, &sgbucket.BucketDocument{}, nil) require.NoError(t, err) assert.NotNil(t, doc) // assert on returned CV value diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index b1b911accb..d6460e0056 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -420,75 +420,76 @@ func appendRevocationMacroExpansions(currentSpec []sgbucket.MacroExpansionSpec, // 2. cv and pv: cv;pv // 3. cv, pv, and mv: cv;mv;pv // +// Function will return list of revIDs if legacy rev ID was found in the HLV history section (PV) // TODO: CBG-3662 - Optimise once we've settled on and tested the format with CBL -func ExtractHLVFromBlipMessage(versionVectorStr string) (*HybridLogicalVector, error) { +func ExtractHLVFromBlipMessage(versionVectorStr string) (*HybridLogicalVector, []string, error) { hlv := &HybridLogicalVector{} vectorFields := strings.Split(versionVectorStr, ";") vectorLength := len(vectorFields) if (vectorLength == 1 && vectorFields[0] == "") || vectorLength > 3 { - return &HybridLogicalVector{}, fmt.Errorf("invalid hlv in changes message received") + return &HybridLogicalVector{}, nil, fmt.Errorf("invalid hlv in changes message received") } // add current version (should always be present) cvStr := vectorFields[0] version := strings.Split(cvStr, "@") if len(version) < 2 { - return &HybridLogicalVector{}, fmt.Errorf("invalid version in changes message received") + return &HybridLogicalVector{}, nil, fmt.Errorf("invalid version in changes message received") } vrs, err := strconv.ParseUint(version[0], 16, 64) if err != nil { - return &HybridLogicalVector{}, err + return &HybridLogicalVector{}, nil, err } err = hlv.AddVersion(Version{SourceID: version[1], Value: vrs}) if err != nil { - return &HybridLogicalVector{}, err + return &HybridLogicalVector{}, nil, err } switch vectorLength { case 1: // cv only - return hlv, nil + return hlv, nil, nil case 2: // only cv and pv present - sourceVersionListPV, err := parseVectorValues(vectorFields[1]) + sourceVersionListPV, legacyRev, err := parseVectorValues(vectorFields[1]) if err != nil { - return &HybridLogicalVector{}, err + return &HybridLogicalVector{}, nil, err } hlv.PreviousVersions = make(HLVVersions) for _, v := range sourceVersionListPV { hlv.PreviousVersions[v.SourceID] = v.Value } - return hlv, nil + return hlv, legacyRev, nil case 3: // cv, mv and pv present - sourceVersionListPV, err := parseVectorValues(vectorFields[2]) + sourceVersionListPV, legacyRev, err := parseVectorValues(vectorFields[2]) hlv.PreviousVersions = make(HLVVersions) if err != nil { - return &HybridLogicalVector{}, err + return &HybridLogicalVector{}, nil, err } for _, pv := range sourceVersionListPV { hlv.PreviousVersions[pv.SourceID] = pv.Value } - sourceVersionListMV, err := parseVectorValues(vectorFields[1]) + sourceVersionListMV, _, err := parseVectorValues(vectorFields[1]) hlv.MergeVersions = make(HLVVersions) if err != nil { - return &HybridLogicalVector{}, err + return &HybridLogicalVector{}, nil, err } for _, mv := range sourceVersionListMV { hlv.MergeVersions[mv.SourceID] = mv.Value } - return hlv, nil + return hlv, legacyRev, nil default: - return &HybridLogicalVector{}, fmt.Errorf("invalid hlv in changes message received") + return &HybridLogicalVector{}, nil, fmt.Errorf("invalid hlv in changes message received") } } // parseVectorValues takes an HLV section (cv, pv or mv) in string form and splits into -// source and version pairs -func parseVectorValues(vectorStr string) (versions []Version, err error) { +// source and version pairs. Also returns legacyRev list if legacy revID's are found in the input string. +func parseVectorValues(vectorStr string) (versions []Version, legacyRevList []string, err error) { versionsStr := strings.Split(vectorStr, ",") versions = make([]Version, 0, len(versionsStr)) @@ -500,12 +501,38 @@ func parseVectorValues(vectorStr string) (versions []Version, err error) { } version, err := ParseVersion(v) if err != nil { - return nil, err + // If v is a legacy rev ID, ignore when constructing the HLV. + if isLegacyRev(v) { + legacyRevList = append(legacyRevList, v) + continue + } + return nil, nil, err } versions = append(versions, version) } - return versions, nil + return versions, legacyRevList, nil +} + +// isLegacyRev returns true if the given string is a revID, false otherwise. Has the same functionality as ParseRevID +// but doesn't warn for malformed revIDs +func isLegacyRev(rev string) bool { + if rev == "" { + return false + } + + idx := strings.Index(rev, "-") + if idx == -1 { + return false + } + + gen, err := strconv.Atoi(rev[:idx]) + if err != nil { + return false + } else if gen < 1 { + return false + } + return true } // Helper functions for version source and value encoding diff --git a/db/hybrid_logical_vector_test.go b/db/hybrid_logical_vector_test.go index db9439f208..5ffe9ae613 100644 --- a/db/hybrid_logical_vector_test.go +++ b/db/hybrid_logical_vector_test.go @@ -534,13 +534,13 @@ func TestHLVMapToCBLString(t *testing.T) { func TestInvalidHLVInBlipMessageForm(t *testing.T) { hlvStr := "25@def; 22@def,21@eff; 20@abc,18@hij; 222@hiowdwdew, 5555@dhsajidfgd" - hlv, err := ExtractHLVFromBlipMessage(hlvStr) + hlv, _, err := ExtractHLVFromBlipMessage(hlvStr) require.Error(t, err) assert.ErrorContains(t, err, "invalid hlv in changes message received") assert.Equal(t, &HybridLogicalVector{}, hlv) hlvStr = "" - hlv, err = ExtractHLVFromBlipMessage(hlvStr) + hlv, _, err = ExtractHLVFromBlipMessage(hlvStr) require.Error(t, err) assert.ErrorContains(t, err, "invalid hlv in changes message received") assert.Equal(t, &HybridLogicalVector{}, hlv) @@ -615,7 +615,7 @@ func TestExtractHLVFromChangesMessage(t *testing.T) { // TODO: When CBG-3662 is done, should be able to simplify base64 handling to treat source as a string // that may represent a base64 encoding base64EncodedHlvString := EncodeTestHistory(test.hlvString) - hlv, err := ExtractHLVFromBlipMessage(base64EncodedHlvString) + hlv, _, err := ExtractHLVFromBlipMessage(base64EncodedHlvString) require.NoError(t, err) assert.Equal(t, expectedVector.SourceID, hlv.SourceID) @@ -634,7 +634,7 @@ func BenchmarkExtractHLVFromBlipMessage(b *testing.B) { for _, bm := range extractHLVFromBlipMsgBMarkCases { b.Run(bm.name, func(b *testing.B) { for i := 0; i < b.N; i++ { - _, _ = ExtractHLVFromBlipMessage(bm.hlvString) + _, _, _ = ExtractHLVFromBlipMessage(bm.hlvString) } }) } diff --git a/rest/blip_legacy_revid_test.go b/rest/blip_legacy_revid_test.go index bb23180cf4..6f78300e26 100644 --- a/rest/blip_legacy_revid_test.go +++ b/rest/blip_legacy_revid_test.go @@ -223,6 +223,196 @@ func TestProcessLegacyRev(t *testing.T) { assert.NotEqual(t, uint64(0), docVrs) } +// TestProcessRevWithLegacyHistory: +// - 1. CBL sends rev=1010@CBL1, history=1-abc when SGW has current rev 1-abc (document underwent an update before being pushed to SGW) +// - 2. CBL sends rev=1010@CBL1, history=1000@CBL2,1-abc when SGW has current rev 1-abc (document underwent multiple p2p updates before being pushed to SGW) +// - 3. CBL sends rev=1010@CBL1, history=1000@CBL2,2-abc,1-abc when SGW has current rev 1-abc (document underwent multiple legacy and p2p updates before being pushed to SGW) +// - 4. CBL sends rev=1010@CBL1, history=1-abc when SGW does not have the doc (document underwent multiple legacy and p2p updates before being pushed to SGW) +// - Assert that the bucket doc resulting on each operation is as expected +func TestProcessRevWithLegacyHistory(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP, base.KeySync, base.KeySyncMsg) + + bt, err := NewBlipTesterFromSpec(t, BlipTesterSpec{ + noConflictsMode: true, + GuestEnabled: true, + blipProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()}, + }) + assert.NoError(t, err, "Error creating BlipTester") + defer bt.Close() + rt := bt.restTester + ds := rt.GetSingleDataStore() + collection, ctx := rt.GetSingleTestDatabaseCollectionWithUser() + const ( + docID = "doc1" + docID2 = "doc2" + docID3 = "doc3" + docID4 = "doc4" + ) + + // 1. CBL sends rev=1010@CBL1, history=1-abc when SGW has current rev 1-abc (document underwent an update before being pushed to SGW) + docVersion := rt.PutDocDirectly(docID, db.Body{"test": "doc"}) + rev1ID := docVersion.RevTreeID + + // remove hlv here to simulate a legacy rev + require.NoError(t, ds.RemoveXattrs(ctx, docID, []string{base.VvXattrName}, docVersion.CV.Value)) + rt.GetDatabase().FlushRevisionCacheForTest() + + // Have CBL send an update to that doc, with history in revTreeID format + history := []string{rev1ID} + sent, _, _, err := bt.SendRevWithHistory(docID, "1000@CBL1", history, []byte(`{"key": "val"}`), blip.Properties{}) + assert.True(t, sent) + require.NoError(t, err) + + // assert that the bucket doc is as expected + bucketDoc, _, err := collection.GetDocWithXattrs(ctx, docID, db.DocUnmarshalAll) + require.NoError(t, err) + assert.Equal(t, "1000@CBL1", bucketDoc.HLV.GetCurrentVersionString()) + assert.NotNil(t, bucketDoc.History[rev1ID]) + + // 2. CBL sends rev=1010@CBL1, history=1000@CBL2,1-abc when SGW has current rev 1-abc (document underwent multiple p2p updates before being pushed to SGW) + docVersion = rt.PutDocDirectly(docID2, db.Body{"test": "doc"}) + rev1ID = docVersion.RevTreeID + + // remove hlv here to simulate a legacy rev + require.NoError(t, ds.RemoveXattrs(ctx, docID2, []string{base.VvXattrName}, docVersion.CV.Value)) + rt.GetDatabase().FlushRevisionCacheForTest() + + // Have CBL send an update to that doc, with history in HLV + revTreeID format + history = []string{"1000@CBL2", rev1ID} + sent, _, _, err = bt.SendRevWithHistory(docID2, "1001@CBL1", history, []byte(`{"some": "update"}`), blip.Properties{}) + assert.True(t, sent) + require.NoError(t, err) + + // assert that the bucket doc is as expected + bucketDoc, _, err = collection.GetDocWithXattrs(ctx, docID2, db.DocUnmarshalAll) + require.NoError(t, err) + assert.Equal(t, "1001@CBL1", bucketDoc.HLV.GetCurrentVersionString()) + assert.Equal(t, uint64(4096), bucketDoc.HLV.PreviousVersions["CBL2"]) + assert.NotNil(t, bucketDoc.History[rev1ID]) + + // 3. CBL sends rev=1010@CBL1, history=1000@CBL2,2-abc,1-abc when SGW has current rev 1-abc (document underwent multiple legacy and p2p updates before being pushed to SGW) + docVersion = rt.PutDocDirectly(docID3, db.Body{"test": "doc"}) + rev1ID = docVersion.RevTreeID + + // remove hlv here to simulate a legacy rev + require.NoError(t, ds.RemoveXattrs(ctx, docID3, []string{base.VvXattrName}, docVersion.CV.Value)) + rt.GetDatabase().FlushRevisionCacheForTest() + + history = []string{"1000@CBL2", "2-abc", rev1ID} + sent, _, _, err = bt.SendRevWithHistory(docID3, "1010@CBL1", history, []byte(`{"some": "update"}`), blip.Properties{}) + assert.True(t, sent) + require.NoError(t, err) + + // assert that the bucket doc is as expected + bucketDoc, _, err = collection.GetDocWithXattrs(ctx, docID3, db.DocUnmarshalAll) + require.NoError(t, err) + assert.Equal(t, "1010@CBL1", bucketDoc.HLV.GetCurrentVersionString()) + assert.Equal(t, uint64(4096), bucketDoc.HLV.PreviousVersions["CBL2"]) + assert.NotNil(t, bucketDoc.History[rev1ID]) + assert.NotNil(t, bucketDoc.History["2-abc"]) + + // 4. CBL sends rev=1010@CBL1, history=1-abc when SGW does not have the doc (document underwent multiple legacy and p2p updates before being pushed to SGW) + history = []string{"1000@CBL2", "1-abc"} + sent, _, _, err = bt.SendRevWithHistory(docID4, "1010@CBL1", history, []byte(`{"some": "update"}`), blip.Properties{}) + assert.True(t, sent) + require.NoError(t, err) + + // assert that the bucket doc is as expected + bucketDoc, _, err = collection.GetDocWithXattrs(ctx, docID4, db.DocUnmarshalAll) + require.NoError(t, err) + assert.Equal(t, "1010@CBL1", bucketDoc.HLV.GetCurrentVersionString()) + assert.Equal(t, uint64(4096), bucketDoc.HLV.PreviousVersions["CBL2"]) + assert.NotNil(t, bucketDoc.History["1-abc"]) +} + +// TestProcessRevWithLegacyHistoryConflict: +// - 1. conflicting changes with legacy rev on both sides of communication (no upgrade of doc at all) +// - 2. conflicting changes with legacy rev on client side and HLV on SGW side +// - 3. CBL sends rev=1010@CBL1, history=1000@CBL2,1-abc when SGW has current rev 2-abc (document underwent multiple p2p updates before being pushed to SGW) +// - 4. CBL sends rev=1010@CBL1, history=2-abc and SGW has 1000@CBL2, 2-abc +func TestProcessRevWithLegacyHistoryConflict(t *testing.T) { + base.SetUpTestLogging(t, base.LevelTrace, base.KeyHTTP, base.KeySync, base.KeySyncMsg, base.KeyCRUD, base.KeyChanges, base.KeyImport) + + bt, err := NewBlipTesterFromSpec(t, BlipTesterSpec{ + noConflictsMode: true, + GuestEnabled: true, + blipProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()}, + }) + assert.NoError(t, err, "Error creating BlipTester") + defer bt.Close() + rt := bt.restTester + ds := rt.GetSingleDataStore() + const ( + docID = "doc1" + docID2 = "doc2" + docID3 = "doc3" + docID4 = "doc4" + ) + + // 1. conflicting changes with legacy rev on both sides of communication (no upgrade of doc at all) + docVersion := rt.PutDocDirectly(docID, db.Body{"test": "doc"}) + rev1ID := docVersion.RevTreeID + + docVersion = rt.UpdateDocDirectly(docID, docVersion, db.Body{"some": "update"}) + rev2ID := docVersion.RevTreeID + + docVersion = rt.UpdateDocDirectly(docID, docVersion, db.Body{"some": "update2"}) + + // remove hlv here to simulate a legacy rev + require.NoError(t, ds.RemoveXattrs(base.TestCtx(t), docID, []string{base.VvXattrName}, docVersion.CV.Value)) + rt.GetDatabase().FlushRevisionCacheForTest() + + history := []string{rev2ID, rev1ID} + sent, _, _, err := bt.SendRevWithHistory(docID, "3-abc", history, []byte(`{"key": "val"}`), blip.Properties{}) + assert.True(t, sent) + require.ErrorContains(t, err, "Document revision conflict") + + // 2. same as above but not having the rev be legacy on SGW side (don't remove the hlv) + docVersion = rt.PutDocDirectly(docID2, db.Body{"test": "doc"}) + rev1ID = docVersion.RevTreeID + + docVersion = rt.UpdateDocDirectly(docID2, docVersion, db.Body{"some": "update"}) + rev2ID = docVersion.RevTreeID + + docVersion = rt.UpdateDocDirectly(docID2, docVersion, db.Body{"some": "update2"}) + + history = []string{rev2ID, rev1ID} + sent, _, _, err = bt.SendRevWithHistory(docID2, "3-abc", history, []byte(`{"key": "val"}`), blip.Properties{}) + assert.True(t, sent) + require.ErrorContains(t, err, "Document revision conflict") + + // 3. CBL sends rev=1010@CBL1, history=1000@CBL2,1-abc when SGW has current rev 2-abc (document underwent multiple p2p updates before being pushed to SGW) + docVersion = rt.PutDocDirectly(docID3, db.Body{"test": "doc"}) + rev1ID = docVersion.RevTreeID + + docVersion = rt.UpdateDocDirectly(docID3, docVersion, db.Body{"some": "update"}) + + // remove hlv here to simulate a legacy rev + require.NoError(t, ds.RemoveXattrs(base.TestCtx(t), docID3, []string{base.VvXattrName}, docVersion.CV.Value)) + rt.GetDatabase().FlushRevisionCacheForTest() + + history = []string{"1000@CBL2", rev1ID} + sent, _, _, err = bt.SendRevWithHistory(docID3, "1010@CBL1", history, []byte(`{"some": "update"}`), blip.Properties{}) + assert.True(t, sent) + require.ErrorContains(t, err, "Document revision conflict") + + // 4. CBL sends rev=1010@CBL1, history=2-abc and SGW has 1000@CBL2, 2-abc + docVersion = rt.PutDocDirectly(docID4, db.Body{"test": "doc"}) + + docVersion = rt.UpdateDocDirectly(docID4, docVersion, db.Body{"some": "update"}) + version := docVersion.CV.Value + rev2ID = docVersion.RevTreeID + pushedRev := db.Version{ + Value: version + 1000, + SourceID: "CBL1", + } + + history = []string{rev2ID} + sent, _, _, err = bt.SendRevWithHistory(docID4, pushedRev.String(), history, []byte(`{"some": "update"}`), blip.Properties{}) + assert.True(t, sent) + require.ErrorContains(t, err, "Document revision conflict") +} + // TestChangesResponseLegacyRev: // - Create doc // - Update doc through SGW, creating a new revision @@ -432,3 +622,229 @@ func TestChangesResponseWithHLVInHistory(t *testing.T) { timeoutErr = WaitWithTimeout(&revsFinishedWg, time.Second*10) require.NoError(t, timeoutErr, "Timed out waiting") } + +// TestCBLHasPreUpgradeMutationThatHasNotBeenReplicated: +// - Test case 2 of non conflict plan from design doc +func TestCBLHasPreUpgradeMutationThatHasNotBeenReplicated(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP, base.KeySync, base.KeySyncMsg, base.KeyChanges) + + bt, err := NewBlipTesterFromSpec(t, BlipTesterSpec{ + noConflictsMode: true, + GuestEnabled: true, + blipProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()}, + }) + assert.NoError(t, err, "Error creating BlipTester") + defer bt.Close() + rt := bt.restTester + collection, ctx := rt.GetSingleTestDatabaseCollection() + ds := rt.GetSingleDataStore() + + docVersion := rt.PutDocDirectly("doc1", db.Body{"test": "doc"}) + rev1ID := docVersion.RevTreeID + + // remove hlv here to simulate a legacy rev + require.NoError(t, ds.RemoveXattrs(ctx, "doc1", []string{base.VvXattrName}, docVersion.CV.Value)) + rt.GetDatabase().FlushRevisionCacheForTest() + + history := []string{rev1ID} + sent, _, _, err := bt.SendRevWithHistory("doc1", "2-abc", history, []byte(`{"key": "val"}`), blip.Properties{}) + assert.True(t, sent) + require.NoError(t, err) + + // assert that the bucket doc is as expected + bucketDoc, _, err := collection.GetDocWithXattrs(ctx, "doc1", db.DocUnmarshalAll) + require.NoError(t, err) + // assert a cv was assigned + assert.NotEqual(t, "", bucketDoc.HLV.GetCurrentVersionString()) + assert.NotNil(t, bucketDoc.History[rev1ID]) + assert.Equal(t, "2-abc", bucketDoc.CurrentRev) +} + +// TestCBLHasOfPreUpgradeMutationThatSGWAlreadyKnows: +// - Test case 3 of non conflict plan from design doc +func TestCBLHasOfPreUpgradeMutationThatSGWAlreadyKnows(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP, base.KeySync, base.KeySyncMsg, base.KeyChanges) + + bt, err := NewBlipTesterFromSpec(t, BlipTesterSpec{ + noConflictsMode: true, + GuestEnabled: true, + blipProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()}, + }) + assert.NoError(t, err, "Error creating BlipTester") + defer bt.Close() + rt := bt.restTester + collection, ctx := rt.GetSingleTestDatabaseCollection() + ds := rt.GetSingleDataStore() + + docVersion := rt.PutDocDirectly("doc1", db.Body{"test": "doc"}) + rev1ID := docVersion.RevTreeID + + docVersion = rt.UpdateDocDirectly("doc1", docVersion, db.Body{"test": "update"}) + rev2ID := docVersion.RevTreeID + + // remove hlv here to simulate a legacy rev + require.NoError(t, ds.RemoveXattrs(ctx, "doc1", []string{base.VvXattrName}, docVersion.CV.Value)) + rt.GetDatabase().FlushRevisionCacheForTest() + + history := []string{rev1ID} + sent, _, _, err := bt.SendRevWithHistory("doc1", rev2ID, history, []byte(`{"key": "val"}`), blip.Properties{}) + assert.True(t, sent) + require.NoError(t, err) + + // assert that the bucket doc is as expected + bucketDoc, _, err := collection.GetDocWithXattrs(ctx, "doc1", db.DocUnmarshalAll) + require.NoError(t, err) + assert.Equal(t, rev2ID, bucketDoc.CurrentRev) + assert.NotNil(t, bucketDoc.History[rev1ID]) + assert.NotNil(t, bucketDoc.History[rev2ID]) +} + +// TestPushOfPostUpgradeMutationThatHasCommonAncestorToSGWVersion: +// - Test case 6 of non conflict plan from design doc +func TestPushOfPostUpgradeMutationThatHasCommonAncestorToSGWVersion(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP, base.KeySync, base.KeySyncMsg, base.KeyChanges) + + bt, err := NewBlipTesterFromSpec(t, BlipTesterSpec{ + noConflictsMode: true, + GuestEnabled: true, + blipProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()}, + }) + assert.NoError(t, err, "Error creating BlipTester") + defer bt.Close() + rt := bt.restTester + collection, ctx := rt.GetSingleTestDatabaseCollection() + ds := rt.GetSingleDataStore() + + docVersion := rt.PutDocDirectly("doc1", db.Body{"test": "doc"}) + rev1ID := docVersion.RevTreeID + + docVersion = rt.UpdateDocDirectly("doc1", docVersion, db.Body{"test": "update"}) + rev2ID := docVersion.RevTreeID + + // remove hlv here to simulate a legacy rev + require.NoError(t, ds.RemoveXattrs(ctx, "doc1", []string{base.VvXattrName}, docVersion.CV.Value)) + rt.GetDatabase().FlushRevisionCacheForTest() + + // send 100@CBL1 + sent, _, _, err := bt.SendRevWithHistory("doc1", "100@CBL1", nil, []byte(`{"key": "val"}`), blip.Properties{}) + assert.True(t, sent) + require.NoError(t, err) + + bucketDoc, _, err := collection.GetDocWithXattrs(ctx, "doc1", db.DocUnmarshalAll) + require.NoError(t, err) + assert.NotEqual(t, rev2ID, bucketDoc.CurrentRev) + assert.NotNil(t, bucketDoc.History[rev1ID]) + assert.NotNil(t, bucketDoc.History[rev2ID]) + assert.Equal(t, "100@CBL1", bucketDoc.HLV.GetCurrentVersionString()) +} + +// TestPushDocConflictBetweenPreUpgradeCBLMutationAndPreUpgradeSGWMutation: +// - Test case 1 of conflict test plan from design doc +func TestPushDocConflictBetweenPreUpgradeCBLMutationAndPreUpgradeSGWMutation(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP, base.KeySync, base.KeySyncMsg, base.KeyChanges) + + bt, err := NewBlipTesterFromSpec(t, BlipTesterSpec{ + noConflictsMode: true, + GuestEnabled: true, + blipProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()}, + }) + assert.NoError(t, err, "Error creating BlipTester") + defer bt.Close() + rt := bt.restTester + collection, ctx := rt.GetSingleTestDatabaseCollection() + ds := rt.GetSingleDataStore() + + docVersion := rt.PutDocDirectly("doc1", db.Body{"test": "doc"}) + rev1ID := docVersion.RevTreeID + + docVersion = rt.UpdateDocDirectly("doc1", docVersion, db.Body{"test": "update"}) + rev2ID := docVersion.RevTreeID + + docVersion = rt.UpdateDocDirectly("doc1", docVersion, db.Body{"test": "update1"}) + rev3ID := docVersion.RevTreeID + + // remove hlv here to simulate a legacy rev + require.NoError(t, ds.RemoveXattrs(ctx, "doc1", []string{base.VvXattrName}, docVersion.CV.Value)) + rt.GetDatabase().FlushRevisionCacheForTest() + + // send rev 3-def + history := []string{rev2ID, rev1ID} + sent, _, _, err := bt.SendRevWithHistory("doc1", "3-def", history, []byte(`{"key": "val"}`), blip.Properties{}) + assert.True(t, sent) + require.ErrorContains(t, err, "Document revision conflict") + + // assert that the bucket doc is as expected + bucketDoc, _, err := collection.GetDocWithXattrs(ctx, "doc1", db.DocUnmarshalAll) + require.NoError(t, err) + assert.Equal(t, rev3ID, bucketDoc.CurrentRev) + assert.NotNil(t, bucketDoc.History[rev1ID]) + assert.NotNil(t, bucketDoc.History[rev2ID]) +} + +// TestPushDocConflictBetweenPreUpgradeCBLMutationAndPostUpgradeSGWMutation: +// - Test case 3 of conflict test plan from design doc +func TestPushDocConflictBetweenPreUpgradeCBLMutationAndPostUpgradeSGWMutation(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP, base.KeySync, base.KeySyncMsg, base.KeyChanges) + + bt, err := NewBlipTesterFromSpec(t, BlipTesterSpec{ + noConflictsMode: true, + GuestEnabled: true, + blipProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()}, + }) + assert.NoError(t, err, "Error creating BlipTester") + defer bt.Close() + rt := bt.restTester + collection, ctx := rt.GetSingleTestDatabaseCollection() + + docVersion := rt.PutDocDirectly("doc1", db.Body{"test": "doc"}) + rev1ID := docVersion.RevTreeID + + docVersion = rt.UpdateDocDirectly("doc1", docVersion, db.Body{"test": "update"}) + rev2ID := docVersion.RevTreeID + + docVersion = rt.UpdateDocDirectly("doc1", docVersion, db.Body{"test": "update1"}) + rev3ID := docVersion.RevTreeID + + // send rev 3-def + history := []string{rev2ID, rev1ID} + sent, _, _, err := bt.SendRevWithHistory("doc1", "3-def", history, []byte(`{"key": "val"}`), blip.Properties{}) + assert.True(t, sent) + require.ErrorContains(t, err, "Document revision conflict") + + // assert that the bucket doc is as expected + bucketDoc, _, err := collection.GetDocWithXattrs(ctx, "doc1", db.DocUnmarshalAll) + require.NoError(t, err) + assert.Equal(t, rev3ID, bucketDoc.CurrentRev) + assert.NotNil(t, bucketDoc.History[rev1ID]) + assert.NotNil(t, bucketDoc.History[rev2ID]) +} + +// TestConflictBetweenPostUpgradeCBLMutationAndPostUpgradeSGWMutation: +// - Test case 6 of conflict test plan from design doc +func TestConflictBetweenPostUpgradeCBLMutationAndPostUpgradeSGWMutation(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP, base.KeySync, base.KeySyncMsg, base.KeyChanges) + + bt, err := NewBlipTesterFromSpec(t, BlipTesterSpec{ + noConflictsMode: true, + GuestEnabled: true, + blipProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()}, + }) + assert.NoError(t, err, "Error creating BlipTester") + defer bt.Close() + rt := bt.restTester + collection, ctx := rt.GetSingleTestDatabaseCollection() + + docVersion := rt.PutDocDirectly("doc1", db.Body{"test": "doc"}) + rev1ID := docVersion.RevTreeID + + history := []string{rev1ID} + sent, _, _, err := bt.SendRevWithHistory("doc1", "100@CBL1", history, []byte(`{"key": "val"}`), blip.Properties{}) + assert.True(t, sent) + require.ErrorContains(t, err, "Document revision conflict") + + // assert that the bucket doc is as expected + bucketDoc, _, err := collection.GetDocWithXattrs(ctx, "doc1", db.DocUnmarshalAll) + require.NoError(t, err) + assert.Equal(t, rev1ID, bucketDoc.CurrentRev) + assert.Equal(t, docVersion.CV.String(), bucketDoc.HLV.GetCurrentVersionString()) +} diff --git a/rest/utilities_testing_blip_client.go b/rest/utilities_testing_blip_client.go index 12732dbeb2..7b86f2ab52 100644 --- a/rest/utilities_testing_blip_client.go +++ b/rest/utilities_testing_blip_client.go @@ -513,7 +513,7 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { var hlv db.HybridLogicalVector if btc.UseHLV() { if revHistory != "" { - existingVersion, err := db.ExtractHLVFromBlipMessage(revHistory) + existingVersion, _, err := db.ExtractHLVFromBlipMessage(revHistory) if err != nil { require.FailNowf(btr.TB(), "error extracting HLV", "error extracting HLV %q: %v", revHistory, err) } @@ -747,7 +747,7 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { var hlv db.HybridLogicalVector if btc.UseHLV() { if revHistory != "" { - existingVersion, err := db.ExtractHLVFromBlipMessage(revHistory) + existingVersion, _, err := db.ExtractHLVFromBlipMessage(revHistory) if err != nil { require.FailNowf(btr.TB(), "error extracting HLV", "error extracting HLV %q: %v", revHistory, err) }