From 071c6772906f81baee00b40924cd52dc46490702 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith <109068393+gregns1@users.noreply.github.com> Date: Thu, 16 Nov 2023 22:11:17 +0000 Subject: [PATCH] CBG-3211: Add PutExistingRev for HLV (#6515) * CBG-3210: Updating HLV on Put And PutExistingRev (#6366) * CBG-3209: Add cv index and retrieval for revision cache (#6491) * CBG-3209: changes for retreival of a doc from the rev cache via CV with backwards compatability in mind * fix failing test, add commnets * fix lint * updated to address comments * rebase chnages needed * updated to tests that call Get on revision cache * updates based of new direction with PR + addressing comments * updated to fix panic * updated to fix another panic * address comments * updates based off commnets * remove commnented out line * updates to skip test relying on import and update PutExistingRev doc update type to update HLV * updates to remove code adding rev id to value inside addToRevMapPostLoad. Added code to assign this inside value.store * remove redundent code * Add support for PutExistingCurrentVersion * updated to remove function not used anymore * remove duplicated code from dev time * fix linter errors + add assertions on body of doc update * address commnets * updates to add further test cases for AddNewerVersions function + fix some incorrect logic * updates to chnage helper function for creation of doc for tests. Also adress further comments * lint error * address comments, add new merge function for merge versions when hlv is in conflict. * updates to remove test case and test * remove unused function * rebase * missed current version name change * more missing updates to name changes --- db/crud.go | 97 ++++++++++++++++ db/crud_test.go | 192 +++++++++++++++++++++++++++++++ db/hybrid_logical_vector.go | 66 ++++++++++- db/hybrid_logical_vector_test.go | 36 ++++++ db/util_testing.go | 11 ++ 5 files changed, 397 insertions(+), 5 deletions(-) diff --git a/db/crud.go b/db/crud.go index dc4430753b..6542f68269 100644 --- a/db/crud.go +++ b/db/crud.go @@ -1059,6 +1059,103 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod return newRevID, doc, err } +func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, docHLV HybridLogicalVector, existingDoc *sgbucket.BucketDocument) (doc *Document, cv *SourceAndVersion, newRevID string, err error) { + var matchRev string + if existingDoc != nil { + doc, unmarshalErr := unmarshalDocumentWithXattr(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattr, existingDoc.UserXattr, existingDoc.Cas, DocUnmarshalRev) + if unmarshalErr != nil { + return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling exsiting doc") + } + matchRev = doc.CurrentRev + } + generation, _ := ParseRevID(ctx, matchRev) + if generation < 0 { + return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Invalid revision ID") + } + generation++ + + docUpdateEvent := ExistingVersion + allowImport := db.UseXattrs() + doc, newRevID, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, newDoc.DocExpiry, nil, docUpdateEvent, existingDoc, func(doc *Document) (resultDoc *Document, resultAttachmentData AttachmentData, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) { + // (Be careful: this block can be invoked multiple times if there are races!) + + var isSgWrite bool + var crc32Match bool + + // Is this doc an sgWrite? + if doc != nil { + isSgWrite, crc32Match, _ = doc.IsSGWrite(ctx, nil) + if crc32Match { + db.dbStats().Database().Crc32MatchCount.Add(1) + } + } + + // If the existing doc isn't an SG write, import prior to updating + if doc != nil && !isSgWrite && db.UseXattrs() { + err := db.OnDemandImportForWrite(ctx, newDoc.ID, doc, newDoc.Deleted) + if err != nil { + return nil, nil, false, nil, err + } + } + + // Conflict check here + // if doc has no HLV defined this is a new doc we haven't seen before, skip conflict check + if doc.HLV == nil { + doc.HLV = &HybridLogicalVector{} + addNewerVersionsErr := doc.HLV.AddNewerVersions(docHLV) + if addNewerVersionsErr != nil { + return nil, nil, false, nil, addNewerVersionsErr + } + } else { + if !docHLV.IsInConflict(*doc.HLV) { + // update hlv for all newer incoming source version pairs + addNewerVersionsErr := doc.HLV.AddNewerVersions(docHLV) + if addNewerVersionsErr != nil { + return nil, nil, false, nil, addNewerVersionsErr + } + } else { + base.InfofCtx(ctx, base.KeyCRUD, "conflict detected between the two HLV's for doc %s", base.UD(doc.ID)) + // cancel rest of update, HLV needs to be sent back to client with merge versions populated + return nil, nil, false, nil, base.HTTPErrorf(http.StatusConflict, "Document revision conflict") + } + } + + // Process the attachments, replacing bodies with digests. + newAttachments, err := db.storeAttachments(ctx, doc, newDoc.DocAttachments, generation, matchRev, nil) + if err != nil { + return nil, nil, false, nil, err + } + + // generate rev id for new arriving doc + strippedBody, _ := stripInternalProperties(newDoc._body) + encoding, err := base.JSONMarshalCanonical(strippedBody) + if err != nil { + return nil, nil, false, nil, err + } + newRev := CreateRevIDWithBytes(generation, matchRev, encoding) + + if err := doc.History.addRevision(newDoc.ID, RevInfo{ID: newRev, Parent: matchRev, Deleted: newDoc.Deleted}); err != nil { + base.InfofCtx(ctx, base.KeyCRUD, "Failed to add revision ID: %s, for doc: %s, error: %v", newRev, base.UD(newDoc.ID), err) + return nil, nil, false, nil, base.ErrRevTreeAddRevFailure + } + + newDoc.RevID = newRev + + return newDoc, newAttachments, false, nil, nil + }) + + if doc != nil && doc.HLV != nil { + if cv == nil { + cv = &SourceAndVersion{} + } + source, version := doc.HLV.GetCurrentVersion() + cv.SourceID = source + cv.Version = version + } + + return doc, cv, newRevID, err +} + // Adds an existing revision to a document along with its history (list of rev IDs.) func (db *DatabaseCollectionWithUser) PutExistingRev(ctx context.Context, newDoc *Document, docHistory []string, noConflicts bool, forceAllConflicts bool, existingDoc *sgbucket.BucketDocument, docUpdateEvent DocUpdateType) (doc *Document, newRevID string, err error) { return db.PutExistingRevWithConflictResolution(ctx, newDoc, docHistory, noConflicts, nil, forceAllConflicts, existingDoc, docUpdateEvent) diff --git a/db/crud_test.go b/db/crud_test.go index 38ca667b3d..f38f117899 100644 --- a/db/crud_test.go +++ b/db/crud_test.go @@ -14,6 +14,7 @@ import ( "context" "encoding/json" "log" + "reflect" "testing" "time" @@ -1675,3 +1676,194 @@ func TestAssignSequenceReleaseLoop(t *testing.T) { releasedSequenceCount := db.DbStats.Database().SequenceReleasedCount.Value() - startReleasedSequenceCount assert.Equal(t, int64(expectedReleasedSequenceCount), releasedSequenceCount) } + +// TestPutExistingCurrentVersion: +// - Put a document in a db +// - Assert on the update to HLV after that PUT +// - Construct a HLV to represent the doc created locally being updated on a client +// - Call PutExistingCurrentVersion simulating doc update arriving over replicator +// - Assert that the doc's HLV in the bucket has been updated correctly with the CV, PV and cvCAS +func TestPutExistingCurrentVersion(t *testing.T) { + db, ctx := setupTestDB(t) + defer db.Close(ctx) + + bucketUUID := db.BucketUUID + collection := GetSingleDatabaseCollectionWithUser(t, db) + + // create a new doc + key := "doc1" + body := Body{"key1": "value1"} + + rev, _, err := collection.Put(ctx, key, body) + require.NoError(t, err) + + // assert on HLV on that above PUT + syncData, err := collection.GetDocSyncData(ctx, "doc1") + assert.NoError(t, err) + uintCAS := base.HexCasToUint64(syncData.Cas) + assert.Equal(t, bucketUUID, syncData.HLV.SourceID) + assert.Equal(t, uintCAS, syncData.HLV.Version) + assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + + // store the cas version allocated to the above doc creation for creation of incoming HLV later in test + originalDocVersion := syncData.HLV.Version + + // PUT an update to the above doc + body = Body{"key1": "value11"} + body[BodyRev] = rev + _, _, err = collection.Put(ctx, key, body) + require.NoError(t, err) + + // grab the new version for the above update to assert against later in test + syncData, err = collection.GetDocSyncData(ctx, "doc1") + assert.NoError(t, err) + docUpdateVersion := syncData.HLV.Version + + // construct a mock doc update coming over a replicator + body = Body{"key1": "value2"} + newDoc := createTestDocument(key, "", body, false, 0) + + // construct a HLV that simulates a doc update happening on a client + // this means moving the current source version pair to PV and adding new sourceID and version pair to CV + pv := make(map[string]uint64) + pv[bucketUUID] = originalDocVersion + // create a version larger than the allocated version above + incomingVersion := docUpdateVersion + 10 + incomingHLV := HybridLogicalVector{ + SourceID: "test", + Version: incomingVersion, + PreviousVersions: pv, + } + + // grab the raw doc from the bucket to pass into the PutExistingCurrentVersion function for the above simulation of + // doc update arriving over replicator + _, rawDoc, err := collection.GetDocumentWithRaw(ctx, key, DocUnmarshalSync) + require.NoError(t, err) + + doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, rawDoc) + require.NoError(t, err) + // assert on returned CV + assert.Equal(t, "test", cv.SourceID) + assert.Equal(t, incomingVersion, cv.Version) + assert.Equal(t, []byte(`{"key1":"value2"}`), doc._rawBody) + + // assert on the sync data from the above update to the doc + // CV should be equal to CV of update on client but the cvCAS should be updated with the new update and + // PV should contain the old CV pair + syncData, err = collection.GetDocSyncData(ctx, "doc1") + assert.NoError(t, err) + uintCAS = base.HexCasToUint64(syncData.Cas) + + assert.Equal(t, "test", syncData.HLV.SourceID) + assert.Equal(t, incomingVersion, syncData.HLV.Version) + assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + // update the pv map so we can assert we have correct pv map in HLV + pv[bucketUUID] = docUpdateVersion + assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv)) + assert.Equal(t, "3-60b024c44c283b369116c2c2570e8088", syncData.CurrentRev) +} + +// TestPutExistingCurrentVersionWithConflict: +// - Put a document in a db +// - Assert on the update to HLV after that PUT +// - Construct a HLV to represent the doc created locally being updated on a client +// - Call PutExistingCurrentVersion simulating doc update arriving over replicator +// - Assert conflict between the local HLV for the doc and the incoming mutation is correctly identified +// - Assert that the doc's HLV in the bucket hasn't been updated +func TestPutExistingCurrentVersionWithConflict(t *testing.T) { + db, ctx := setupTestDB(t) + defer db.Close(ctx) + + bucketUUID := db.BucketUUID + collection := GetSingleDatabaseCollectionWithUser(t, db) + + // create a new doc + key := "doc1" + body := Body{"key1": "value1"} + + _, _, err := collection.Put(ctx, key, body) + require.NoError(t, err) + + // assert on the HLV values after the above creation of the doc + syncData, err := collection.GetDocSyncData(ctx, "doc1") + assert.NoError(t, err) + uintCAS := base.HexCasToUint64(syncData.Cas) + assert.Equal(t, bucketUUID, syncData.HLV.SourceID) + assert.Equal(t, uintCAS, syncData.HLV.Version) + assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + + // create a new doc update to simulate a doc update arriving over replicator from, client + body = Body{"key1": "value2"} + newDoc := createTestDocument(key, "", body, false, 0) + incomingHLV := HybridLogicalVector{ + SourceID: "test", + Version: 1234, + } + + // grab the raw doc from the bucket to pass into the PutExistingCurrentVersion function + _, rawDoc, err := collection.GetDocumentWithRaw(ctx, key, DocUnmarshalSync) + require.NoError(t, err) + + // assert that a conflict is correctly identified and the resulting doc and cv are nil + doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, rawDoc) + require.Error(t, err) + assert.ErrorContains(t, err, "Document revision conflict") + assert.Nil(t, cv) + assert.Nil(t, doc) + + // assert persisted doc hlv hasn't been updated + syncData, err = collection.GetDocSyncData(ctx, "doc1") + assert.NoError(t, err) + assert.Equal(t, bucketUUID, syncData.HLV.SourceID) + assert.Equal(t, uintCAS, syncData.HLV.Version) + assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) +} + +// TestPutExistingCurrentVersionWithNoExistingDoc: +// - Purpose of this test is to test PutExistingRevWithBody code pathway where an +// existing doc is not provided from the bucket into the function simulating a new, not seen +// before doc entering this code path +func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) { + db, ctx := setupTestDB(t) + defer db.Close(ctx) + + bucketUUID := db.BucketUUID + collection := GetSingleDatabaseCollectionWithUser(t, db) + + // construct a mock doc update coming over a replicator + body := Body{"key1": "value2"} + newDoc := createTestDocument("doc2", "", body, false, 0) + + // construct a HLV that simulates a doc update happening on a client + // this means moving the current source version pair to PV and adding new sourceID and version pair to CV + pv := make(map[string]uint64) + pv[bucketUUID] = 2 + // create a version larger than the allocated version above + incomingVersion := uint64(2 + 10) + incomingHLV := HybridLogicalVector{ + SourceID: "test", + Version: incomingVersion, + PreviousVersions: pv, + } + // call PutExistingCurrentVersion with empty existing doc + doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, &sgbucket.BucketDocument{}) + require.NoError(t, err) + assert.NotNil(t, doc) + // assert on returned CV value + assert.Equal(t, "test", cv.SourceID) + assert.Equal(t, incomingVersion, cv.Version) + assert.Equal(t, []byte(`{"key1":"value2"}`), doc._rawBody) + + // assert on the sync data from the above update to the doc + // CV should be equal to CV of update on client but the cvCAS should be updated with the new update and + // PV should contain the old CV pair + syncData, err := collection.GetDocSyncData(ctx, "doc2") + assert.NoError(t, err) + uintCAS := base.HexCasToUint64(syncData.Cas) + assert.Equal(t, "test", syncData.HLV.SourceID) + assert.Equal(t, incomingVersion, syncData.HLV.Version) + assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + // update the pv map so we can assert we have correct pv map in HLV + assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv)) + assert.Equal(t, "1-3a208ea66e84121b528f05b5457d1134", syncData.CurrentRev) +} diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index 39c168e87a..d548e5655f 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -94,7 +94,19 @@ func (hlv *HybridLogicalVector) AddVersion(newVersion SourceAndVersion) error { if hlv.PreviousVersions == nil { hlv.PreviousVersions = make(map[string]uint64) } - hlv.PreviousVersions[hlv.SourceID] = hlv.Version + // we need to check if source ID already exists in PV, if so we need to ensure we are only updating with the + // sourceID-version pair if incoming version is greater than version already there + if currPVVersion, ok := hlv.PreviousVersions[hlv.SourceID]; ok { + // if we get here source ID exists in PV, only replace version if it is less than the incoming version + if currPVVersion < hlv.Version { + hlv.PreviousVersions[hlv.SourceID] = hlv.Version + } else { + return fmt.Errorf("local hlv has current source in previous versiosn with version greater than current version. Current CAS: %d, PV CAS %d", hlv.Version, currPVVersion) + } + } else { + // source doesn't exist in PV so add + hlv.PreviousVersions[hlv.SourceID] = hlv.Version + } hlv.Version = newVersion.Version hlv.SourceID = newVersion.SourceID return nil @@ -119,7 +131,7 @@ func (hlv *HybridLogicalVector) isDominating(otherVector HybridLogicalVector) bo // Grab the latest CAS version for HLV(A)'s sourceID in HLV(B), if HLV(A) version CAS is > HLV(B)'s then it is dominating // If 0 CAS is returned then the sourceID does not exist on HLV(B) - if latestCAS := otherVector.GetVersion(hlv.SourceID); latestCAS != 0 && hlv.Version > latestCAS { + if latestCAS, found := otherVector.GetVersion(hlv.SourceID); found && hlv.Version > latestCAS { return true } // HLV A is not dominating over HLV B @@ -174,8 +186,12 @@ func (hlv *HybridLogicalVector) equalPreviousVectors(otherVector HybridLogicalVe return true } -// GetVersion returns the latest CAS value in the HLV for a given sourceID, if the sourceID is not present in the HLV it will return 0 CAS value -func (hlv *HybridLogicalVector) GetVersion(sourceID string) uint64 { +// GetVersion returns the latest CAS value in the HLV for a given sourceID along with boolean value to +// indicate if sourceID is found in the HLV, if the sourceID is not present in the HLV it will return 0 CAS value and false +func (hlv *HybridLogicalVector) GetVersion(sourceID string) (uint64, bool) { + if sourceID == "" { + return 0, false + } var latestVersion uint64 if sourceID == hlv.SourceID { latestVersion = hlv.Version @@ -186,7 +202,39 @@ func (hlv *HybridLogicalVector) GetVersion(sourceID string) uint64 { if mvEntry := hlv.MergeVersions[sourceID]; mvEntry > latestVersion { latestVersion = mvEntry } - return latestVersion + // if we have 0 cas value, there is no entry for this source ID in the HLV + if latestVersion == 0 { + return latestVersion, false + } + return latestVersion, true +} + +// AddNewerVersions will take a hlv and add any newer source/version pairs found across CV and PV found in the other HLV taken as parameter +// when both HLV +func (hlv *HybridLogicalVector) AddNewerVersions(otherVector HybridLogicalVector) error { + + // create current version for incoming vector and attempt to add it to the local HLV, AddVersion will handle if attempting to add older + // version than local HLVs CV pair + otherVectorCV := SourceAndVersion{SourceID: otherVector.SourceID, Version: otherVector.Version} + err := hlv.AddVersion(otherVectorCV) + if err != nil { + return err + } + + if otherVector.PreviousVersions != nil || len(otherVector.PreviousVersions) != 0 { + // Iterate through incoming vector previous versions, update with the version from other vector + // for source if the local version for that source is lower + for i, v := range otherVector.PreviousVersions { + if hlv.PreviousVersions[i] == 0 || hlv.PreviousVersions[i] < v { + hlv.setPreviousVersion(i, v) + } + } + } + // if current source exists in PV, delete it. + if _, ok := hlv.PreviousVersions[hlv.SourceID]; ok { + delete(hlv.PreviousVersions, hlv.SourceID) + } + return nil } func (hlv HybridLogicalVector) MarshalJSON() ([]byte, error) { @@ -300,3 +348,11 @@ func (hlv *HybridLogicalVector) computeMacroExpansions() []sgbucket.MacroExpansi } return outputSpec } + +// setPreviousVersion will take a source/version pair and add it to the HLV previous versions map +func (hlv *HybridLogicalVector) setPreviousVersion(source string, version uint64) { + if hlv.PreviousVersions == nil { + hlv.PreviousVersions = make(map[string]uint64) + } + hlv.PreviousVersions[source] = version +} diff --git a/db/hybrid_logical_vector_test.go b/db/hybrid_logical_vector_test.go index 029baef512..e6e62184ba 100644 --- a/db/hybrid_logical_vector_test.go +++ b/db/hybrid_logical_vector_test.go @@ -233,6 +233,42 @@ func TestHybridLogicalVectorPersistence(t *testing.T) { assert.Equal(t, inMemoryHLV.MergeVersions, hlvFromPersistance.MergeVersions) } +func TestAddNewerVersionsBetweenTwoVectorsWhenNotInConflict(t *testing.T) { + testCases := []struct { + name string + localInput []string + incomingInput []string + expected []string + }{ + { + name: "testcase1", + localInput: []string{"abc@15"}, + incomingInput: []string{"def@25", "abc@20"}, + expected: []string{"def@25", "abc@20"}, + }, + { + name: "testcase2", + localInput: []string{"abc@15", "def@30"}, + incomingInput: []string{"def@35", "abc@15"}, + expected: []string{"def@35", "abc@15"}, + }, + } + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + localHLV := createHLVForTest(t, test.localInput) + incomingHLV := createHLVForTest(t, test.incomingInput) + expectedHLV := createHLVForTest(t, test.expected) + + _ = localHLV.AddNewerVersions(incomingHLV) + // assert on expected values + assert.Equal(t, expectedHLV.SourceID, localHLV.SourceID) + assert.Equal(t, expectedHLV.Version, localHLV.Version) + assert.True(t, reflect.DeepEqual(expectedHLV.PreviousVersions, localHLV.PreviousVersions)) + }) + } +} + // Tests import of server-side mutations made by HLV-aware and non-HLV-aware peers func TestHLVImport(t *testing.T) { diff --git a/db/util_testing.go b/db/util_testing.go index 337e52e6f4..30e42b62cb 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -644,3 +644,14 @@ func DefaultMutateInOpts() *sgbucket.MutateInOptions { MacroExpansion: macroExpandSpec(base.SyncXattrName), } } + +func createTestDocument(docID string, revID string, body Body, deleted bool, expiry uint32) (newDoc *Document) { + newDoc = &Document{ + ID: docID, + Deleted: deleted, + DocExpiry: expiry, + RevID: revID, + _body: body, + } + return newDoc +}