From c6e6860cdb9da8f23451e2d341d001c6c565499d Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith <109068393+gregns1@users.noreply.github.com> Date: Fri, 10 Nov 2023 17:08:41 +0000 Subject: [PATCH] CBG-3576: BlipTestClient support for HLV and rev tree modes (#6567) * CBG-3210: Updating HLV on Put And PutExistingRev (#6366) * CBG-3209: Add cv index and retrieval for revision cache (#6491) * CBG-3209: changes for retreival of a doc from the rev cache via CV with backwards compatability in mind * fix failing test, add commnets * fix lint * updated to address comments * rebase chnages needed * updated to tests that call Get on revision cache * updates based of new direction with PR + addressing comments * updated to fix panic * updated to fix another panic * address comments * updates based off commnets * remove commnented out line * updates to skip test relying on import and update PutExistingRev doc update type to update HLV * updates to remove code adding rev id to value inside addToRevMapPostLoad. Added code to assign this inside value.store * remove redundent code * CBG-3210: Updating HLV on Put And PutExistingRev (#6366) * CBG-3209: Add cv index and retrieval for revision cache (#6491) * CBG-3209: changes for retreival of a doc from the rev cache via CV with backwards compatability in mind * fix failing test, add commnets * fix lint * updated to address comments * rebase chnages needed * updated to tests that call Get on revision cache * updates based of new direction with PR + addressing comments * updated to fix panic * updated to fix another panic * address comments * updates based off commnets * remove commnented out line * updates to skip test relying on import and update PutExistingRev doc update type to update HLV * updates to remove code adding rev id to value inside addToRevMapPostLoad. Added code to assign this inside value.store * remove redundent code * CBG-3576: changes to BlipTesterClient to run with version vector subprotocol and non version vector subprotocol * updates to work with rebase changes * changes to remove repeated code, assuming from the rebase? * refactoring based off discussion with Ben * lint error fix --- db/access_test.go | 8 +- db/attachment_test.go | 28 +- db/blip_handler.go | 4 +- db/change_cache.go | 2 +- db/changes_test.go | 4 +- db/database.go | 16 ++ db/database_test.go | 216 +++++++------- db/document.go | 82 ++++-- db/document_test.go | 101 +++++++ db/query_test.go | 30 +- db/revision_cache_bypass.go | 40 ++- db/revision_cache_interface.go | 100 ++++++- db/revision_cache_lru.go | 286 ++++++++++++++++--- db/revision_cache_test.go | 381 +++++++++++++++++++++++-- db/revision_test.go | 2 +- rest/api_test.go | 91 ++++++ rest/bulk_api.go | 2 +- rest/doc_api.go | 4 +- rest/importtest/import_test.go | 3 + rest/replicatortest/replicator_test.go | 43 +++ 20 files changed, 1183 insertions(+), 260 deletions(-) diff --git a/db/access_test.go b/db/access_test.go index 9b23710cb5..48ee595fc7 100644 --- a/db/access_test.go +++ b/db/access_test.go @@ -44,7 +44,7 @@ func TestDynamicChannelGrant(t *testing.T) { // Create a document in channel chan1 doc1Body := Body{"channel": "chan1", "greeting": "hello"} - _, _, err = dbCollection.PutExistingRevWithBody(ctx, "doc1", doc1Body, []string{"1-a"}, false) + _, _, err = dbCollection.PutExistingRevWithBody(ctx, "doc1", doc1Body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err) // Verify user cannot access document @@ -54,7 +54,7 @@ func TestDynamicChannelGrant(t *testing.T) { // Write access granting document grantingBody := Body{"type": "setaccess", "owner": "user1", "channel": "chan1"} - _, _, err = dbCollection.PutExistingRevWithBody(ctx, "grant1", grantingBody, []string{"1-a"}, false) + _, _, err = dbCollection.PutExistingRevWithBody(ctx, "grant1", grantingBody, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err) // Verify reloaded user can access document @@ -66,12 +66,12 @@ func TestDynamicChannelGrant(t *testing.T) { // Create a document in channel chan2 doc2Body := Body{"channel": "chan2", "greeting": "hello"} - _, _, err = dbCollection.PutExistingRevWithBody(ctx, "doc2", doc2Body, []string{"1-a"}, false) + _, _, err = dbCollection.PutExistingRevWithBody(ctx, "doc2", doc2Body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err) // Write access granting document for chan2 (tests invalidation when channels/inval_seq exists) grantingBody = Body{"type": "setaccess", "owner": "user1", "channel": "chan2"} - _, _, err = dbCollection.PutExistingRevWithBody(ctx, "grant2", grantingBody, []string{"1-a"}, false) + _, _, err = dbCollection.PutExistingRevWithBody(ctx, "grant2", grantingBody, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err) // Verify user can now access both documents diff --git a/db/attachment_test.go b/db/attachment_test.go index 33dd298782..15752002ed 100644 --- a/db/attachment_test.go +++ b/db/attachment_test.go @@ -68,7 +68,7 @@ func TestBackupOldRevisionWithAttachments(t *testing.T) { var rev2Body Body rev2Data := `{"test": true, "updated": true, "_attachments": {"hello.txt": {"stub": true, "revpos": 1}}}` require.NoError(t, base.JSONUnmarshal([]byte(rev2Data), &rev2Body)) - _, _, err = collection.PutExistingRevWithBody(ctx, docID, rev2Body, []string{"2-abc", rev1ID}, true) + _, _, err = collection.PutExistingRevWithBody(ctx, docID, rev2Body, []string{"2-abc", rev1ID}, true, ExistingVersionWithUpdateToHLV) require.NoError(t, err) rev2ID := "2-abc" @@ -196,7 +196,7 @@ func TestAttachments(t *testing.T) { rev2Bstr := `{"_attachments": {"bye.txt": {"stub":true,"revpos":1,"digest":"sha1-gwwPApfQR9bzBKpqoEYwFmKp98A="}}, "_rev": "2-f000"}` var body2B Body assert.NoError(t, base.JSONUnmarshal([]byte(rev2Bstr), &body2B)) - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body2B, []string{"2-f000", rev1id}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body2B, []string{"2-f000", rev1id}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Couldn't update document") } @@ -280,7 +280,7 @@ func TestAttachmentCASRetryAfterNewAttachment(t *testing.T) { rev2Data := `{"prop1":"value2", "_attachments": {"hello.txt": {"data":"aGVsbG8gd29ybGQ="}}}` require.NoError(t, base.JSONUnmarshal([]byte(rev2Data), &rev2Body)) collection := GetSingleDatabaseCollectionWithUser(t, db) - _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", rev2Body, []string{"2-abc", rev1ID}, true) + _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", rev2Body, []string{"2-abc", rev1ID}, true, ExistingVersionWithUpdateToHLV) require.NoError(t, err) log.Printf("Done creating rev 2 for key %s", key) @@ -311,7 +311,7 @@ func TestAttachmentCASRetryAfterNewAttachment(t *testing.T) { var rev3Body Body rev3Data := `{"prop1":"value3", "_attachments": {"hello.txt": {"revpos":2,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}` require.NoError(t, base.JSONUnmarshal([]byte(rev3Data), &rev3Body)) - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", rev3Body, []string{"3-abc", "2-abc", rev1ID}, true) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", rev3Body, []string{"3-abc", "2-abc", rev1ID}, true, ExistingVersionWithUpdateToHLV) require.NoError(t, err) log.Printf("rev 3 done") @@ -343,7 +343,7 @@ func TestAttachmentCASRetryDuringNewAttachment(t *testing.T) { rev2Data := `{"prop1":"value2"}` require.NoError(t, base.JSONUnmarshal([]byte(rev2Data), &rev2Body)) collection := GetSingleDatabaseCollectionWithUser(t, db) - _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", rev2Body, []string{"2-abc", rev1ID}, true) + _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", rev2Body, []string{"2-abc", rev1ID}, true, ExistingVersionWithUpdateToHLV) require.NoError(t, err) log.Printf("Done creating rev 2 for key %s", key) @@ -374,7 +374,7 @@ func TestAttachmentCASRetryDuringNewAttachment(t *testing.T) { var rev3Body Body rev3Data := `{"prop1":"value3", "_attachments": {"hello.txt": {"data":"aGVsbG8gd29ybGQ="}}}` require.NoError(t, base.JSONUnmarshal([]byte(rev3Data), &rev3Body)) - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", rev3Body, []string{"3-abc", "2-abc", rev1ID}, true) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", rev3Body, []string{"3-abc", "2-abc", rev1ID}, true, ExistingVersionWithUpdateToHLV) require.NoError(t, err) log.Printf("rev 3 done") @@ -563,7 +563,7 @@ func TestRetrieveAncestorAttachments(t *testing.T) { // Create document (rev 1) text := `{"key": "value", "version": "1a"}` assert.NoError(t, base.JSONUnmarshal([]byte(text), &body)) - doc, revID, err := collection.PutExistingRevWithBody(ctx, "doc", body, []string{"1-a"}, false) + doc, revID, err := collection.PutExistingRevWithBody(ctx, "doc", body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Couldn't create document") log.Printf("doc: %v", doc) @@ -571,49 +571,49 @@ func TestRetrieveAncestorAttachments(t *testing.T) { text = `{"key": "value", "version": "2a", "_attachments": {"att1.txt": {"data": "YXR0MS50eHQ="}}}` assert.NoError(t, base.JSONUnmarshal([]byte(text), &body)) body[BodyRev] = revID - doc, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-a", "1-a"}, false) + doc, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Couldn't create document") log.Printf("doc: %v", doc) text = `{"key": "value", "version": "3a", "_attachments": {"att1.txt": {"stub":true,"revpos":2,"digest":"sha1-gwwPApfQR9bzBKpqoEYwFmKp98A="}}}` assert.NoError(t, base.JSONUnmarshal([]byte(text), &body)) body[BodyRev] = revID - doc, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"3-a", "2-a"}, false) + doc, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"3-a", "2-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Couldn't create document") log.Printf("doc: %v", doc) text = `{"key": "value", "version": "4a", "_attachments": {"att1.txt": {"stub":true,"revpos":2,"digest":"sha1-gwwPApfQR9bzBKpqoEYwFmKp98A="}}}` assert.NoError(t, base.JSONUnmarshal([]byte(text), &body)) body[BodyRev] = revID - doc, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"4-a", "3-a"}, false) + doc, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"4-a", "3-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Couldn't create document") log.Printf("doc: %v", doc) text = `{"key": "value", "version": "5a", "_attachments": {"att1.txt": {"stub":true,"revpos":2,"digest":"sha1-gwwPApfQR9bzBKpqoEYwFmKp98A="}}}` assert.NoError(t, base.JSONUnmarshal([]byte(text), &body)) body[BodyRev] = revID - doc, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"5-a", "4-a"}, false) + doc, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"5-a", "4-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Couldn't create document") log.Printf("doc: %v", doc) text = `{"key": "value", "version": "6a", "_attachments": {"att1.txt": {"stub":true,"revpos":2,"digest":"sha1-gwwPApfQR9bzBKpqoEYwFmKp98A="}}}` assert.NoError(t, base.JSONUnmarshal([]byte(text), &body)) body[BodyRev] = revID - doc, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"6-a", "5-a"}, false) + doc, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"6-a", "5-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Couldn't create document") log.Printf("doc: %v", doc) text = `{"key": "value", "version": "3b", "type": "pruned"}` assert.NoError(t, base.JSONUnmarshal([]byte(text), &body)) body[BodyRev] = revID - doc, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"3-b", "2-a"}, false) + doc, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"3-b", "2-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Couldn't create document") log.Printf("doc: %v", doc) text = `{"key": "value", "version": "3b", "_attachments": {"att1.txt": {"stub":true,"revpos":2,"digest":"sha1-gwwPApfQR9bzBKpqoEYwFmKp98A="}}}` assert.NoError(t, base.JSONUnmarshal([]byte(text), &body)) body[BodyRev] = revID - doc, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"3-b", "2-a"}, false) + doc, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"3-b", "2-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Couldn't create document") log.Printf("doc: %v", doc) } diff --git a/db/blip_handler.go b/db/blip_handler.go index 85262db4ce..2a743386d9 100644 --- a/db/blip_handler.go +++ b/db/blip_handler.go @@ -1210,9 +1210,9 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err // bh.conflictResolver != nil represents an active SGR2 and BLIPClientTypeSGR2 represents a passive SGR2 forceAllowConflictingTombstone := newDoc.Deleted && (bh.conflictResolver != nil || bh.clientType == BLIPClientTypeSGR2) if bh.conflictResolver != nil { - _, _, err = bh.collection.PutExistingRevWithConflictResolution(bh.loggingCtx, newDoc, history, true, bh.conflictResolver, forceAllowConflictingTombstone, rawBucketDoc) + _, _, err = bh.collection.PutExistingRevWithConflictResolution(bh.loggingCtx, newDoc, history, true, bh.conflictResolver, forceAllowConflictingTombstone, rawBucketDoc, ExistingVersionWithUpdateToHLV) } else { - _, _, err = bh.collection.PutExistingRev(bh.loggingCtx, newDoc, history, revNoConflicts, forceAllowConflictingTombstone, rawBucketDoc) + _, _, err = bh.collection.PutExistingRev(bh.loggingCtx, newDoc, history, revNoConflicts, forceAllowConflictingTombstone, rawBucketDoc, ExistingVersionWithUpdateToHLV) } if err != nil { return err diff --git a/db/change_cache.go b/db/change_cache.go index 7a46206656..38dafa94f9 100644 --- a/db/change_cache.go +++ b/db/change_cache.go @@ -497,7 +497,7 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) { // Now add the entry for the new doc revision: if len(rawUserXattr) > 0 { - collection.revisionCache.Remove(docID, syncData.CurrentRev) + collection.revisionCache.RemoveWithRev(docID, syncData.CurrentRev) } change := &LogEntry{ Sequence: syncData.Sequence, diff --git a/db/changes_test.go b/db/changes_test.go index 4e9fe4db10..3deaeea019 100644 --- a/db/changes_test.go +++ b/db/changes_test.go @@ -478,14 +478,14 @@ func BenchmarkChangesFeedDocUnmarshalling(b *testing.B) { // Create child rev 1 docBody["child"] = "A" - _, _, err = collection.PutExistingRevWithBody(ctx, docid, docBody, []string{"2-A", revId}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, docid, docBody, []string{"2-A", revId}, false, ExistingVersionWithUpdateToHLV) if err != nil { b.Fatalf("Error creating child1 rev: %v", err) } // Create child rev 2 docBody["child"] = "B" - _, _, err = collection.PutExistingRevWithBody(ctx, docid, docBody, []string{"2-B", revId}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, docid, docBody, []string{"2-B", revId}, false, ExistingVersionWithUpdateToHLV) if err != nil { b.Fatalf("Error creating child2 rev: %v", err) } diff --git a/db/database.go b/db/database.go index 6326592fe5..a3a94e5c6d 100644 --- a/db/database.go +++ b/db/database.go @@ -48,6 +48,15 @@ const ( DBCompactRunning ) +const ( + Import DocUpdateType = iota + NewVersion + ExistingVersion + ExistingVersionWithUpdateToHLV +) + +type DocUpdateType uint32 + const ( DefaultRevsLimitNoConflicts = 50 DefaultRevsLimitConflicts = 100 @@ -88,6 +97,7 @@ type DatabaseContext struct { MetadataStore base.DataStore // Storage for database metadata (anything that isn't an end-user's/customer's documents) Bucket base.Bucket // Storage BucketSpec base.BucketSpec // The BucketSpec + BucketUUID string // The bucket UUID for the bucket the database is created against BucketLock sync.RWMutex // Control Access to the underlying bucket object mutationListener changeListener // Caching feed listener ImportListener *importListener // Import feed listener @@ -398,6 +408,11 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket, metadataStore = bucket.DefaultDataStore() } + bucketUUID, err := bucket.UUID() + if err != nil { + return nil, err + } + // Register the cbgt pindex type for the configGroup RegisterImportPindexImpl(ctx, options.GroupID) @@ -406,6 +421,7 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket, UUID: cbgt.NewUUID(), MetadataStore: metadataStore, Bucket: bucket, + BucketUUID: bucketUUID, StartTime: time.Now(), autoImport: autoImport, Options: options, diff --git a/db/database_test.go b/db/database_test.go index 97c9330678..54bcfa528c 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -294,7 +294,7 @@ func TestDatabase(t *testing.T) { body["key2"] = int64(4444) history := []string{"4-four", "3-three", "2-488724414d0ed6b398d6d2aeb228d797", "1-cb0c9a22be0e5a1b01084ec019defa81"} - doc, newRev, err := collection.PutExistingRevWithBody(ctx, "doc1", body, history, false) + doc, newRev, err := collection.PutExistingRevWithBody(ctx, "doc1", body, history, false, ExistingVersionWithUpdateToHLV) body[BodyId] = doc.ID body[BodyRev] = newRev assert.NoError(t, err, "PutExistingRev failed") @@ -1020,18 +1020,18 @@ func TestRepeatedConflict(t *testing.T) { // Create rev 1 of "doc": body := Body{"n": 1, "channels": []string{"all", "1"}} - _, _, err := collection.PutExistingRevWithBody(ctx, "doc", body, []string{"1-a"}, false) + _, _, err := collection.PutExistingRevWithBody(ctx, "doc", body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 1-a") // Create two conflicting changes: body["n"] = 2 body["channels"] = []string{"all", "2b"} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-b", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 2-b") body["n"] = 3 body["channels"] = []string{"all", "2a"} - _, newRev, err := collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-a", "1-a"}, false) + _, newRev, err := collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 2-a") // Get the _rev that was set in the body by PutExistingRevWithBody() and make assertions on it @@ -1040,7 +1040,7 @@ func TestRepeatedConflict(t *testing.T) { // Remove the _rev key from the body, and call PutExistingRevWithBody() again, which should re-add it delete(body, BodyRev) - _, newRev, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-a", "1-a"}, false) + _, newRev, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err) // The _rev should pass the same assertions as before, since PutExistingRevWithBody() should re-add it @@ -1068,7 +1068,7 @@ func TestConflicts(t *testing.T) { // Create rev 1 of "doc": body := Body{"n": 1, "channels": []string{"all", "1"}} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 1-a") // Wait for rev to be cached @@ -1081,11 +1081,11 @@ func TestConflicts(t *testing.T) { // Create two conflicting changes: body["n"] = 2 body["channels"] = []string{"all", "2b"} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-b", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 2-b") body["n"] = 3 body["channels"] = []string{"all", "2a"} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 2-a") cacheWaiter.Add(2) @@ -1213,55 +1213,55 @@ func TestNoConflictsMode(t *testing.T) { // Create revs 1 and 2 of "doc": body := Body{"n": 1, "channels": []string{"all", "1"}} - _, _, err := collection.PutExistingRevWithBody(ctx, "doc", body, []string{"1-a"}, false) + _, _, err := collection.PutExistingRevWithBody(ctx, "doc", body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 1-a") body["n"] = 2 - _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 2-a") // Try to create a conflict branching from rev 1: - _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-b", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) assertHTTPError(t, err, 409) // Try to create a conflict with no common ancestor: - _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-c", "1-c"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-c", "1-c"}, false, ExistingVersionWithUpdateToHLV) assertHTTPError(t, err, 409) // Try to create a conflict with a longer history: - _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"4-d", "3-d", "2-d", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"4-d", "3-d", "2-d", "1-a"}, false, ExistingVersionWithUpdateToHLV) assertHTTPError(t, err, 409) // Try to create a conflict with no history: - _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"1-e"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"1-e"}, false, ExistingVersionWithUpdateToHLV) assertHTTPError(t, err, 409) // Create a non-conflict with a longer history, ending in a deletion: body[BodyDeleted] = true - _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"4-a", "3-a", "2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"4-a", "3-a", "2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 4-a") delete(body, BodyDeleted) // Try to resurrect the document with a conflicting branch - _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"4-f", "3-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"4-f", "3-a"}, false, ExistingVersionWithUpdateToHLV) assertHTTPError(t, err, 409) // Resurrect the tombstoned document with a disconnected branch): - _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"1-f"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"1-f"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 1-f") // Tombstone the resurrected branch body[BodyDeleted] = true - _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-f", "1-f"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"2-f", "1-f"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 2-f") delete(body, BodyDeleted) // Resurrect the tombstoned document with a valid history (descendents of leaf) - _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"5-f", "4-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc", body, []string{"5-f", "4-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 5-f") delete(body, BodyDeleted) // Create a new document with a longer history: - _, _, err = collection.PutExistingRevWithBody(ctx, "COD", body, []string{"4-a", "3-a", "2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "COD", body, []string{"4-a", "3-a", "2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add COD") delete(body, BodyDeleted) @@ -1289,34 +1289,34 @@ func TestAllowConflictsFalseTombstoneExistingConflict(t *testing.T) { // Create documents with multiple non-deleted branches log.Printf("Creating docs") body := Body{"n": 1} - _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"1-a"}, false) + _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 1-a") - _, _, err = collection.PutExistingRevWithBody(ctx, "doc2", body, []string{"1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc2", body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 1-a") - _, _, err = collection.PutExistingRevWithBody(ctx, "doc3", body, []string{"1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc3", body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 1-a") // Create two conflicting changes: body["n"] = 2 - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-b", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 2-b") - _, _, err = collection.PutExistingRevWithBody(ctx, "doc2", body, []string{"2-b", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc2", body, []string{"2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 2-b") - _, _, err = collection.PutExistingRevWithBody(ctx, "doc3", body, []string{"2-b", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc3", body, []string{"2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 2-b") body["n"] = 3 - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 2-a") - _, _, err = collection.PutExistingRevWithBody(ctx, "doc2", body, []string{"2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc2", body, []string{"2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 2-a") - _, _, err = collection.PutExistingRevWithBody(ctx, "doc3", body, []string{"2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc3", body, []string{"2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 2-a") // Set AllowConflicts to false db.Options.AllowConflicts = base.BoolPtr(false) // Attempt to tombstone a non-leaf node of a conflicted document - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-c", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-c", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.True(t, err != nil, "expected error tombstoning non-leaf") // Tombstone the non-winning branch of a conflicted document @@ -1366,27 +1366,27 @@ func TestAllowConflictsFalseTombstoneExistingConflictNewEditsFalse(t *testing.T) // Create documents with multiple non-deleted branches log.Printf("Creating docs") body := Body{"n": 1} - _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"1-a"}, false) + _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 1-a") - _, _, err = collection.PutExistingRevWithBody(ctx, "doc2", body, []string{"1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc2", body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 1-a") - _, _, err = collection.PutExistingRevWithBody(ctx, "doc3", body, []string{"1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc3", body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 1-a") // Create two conflicting changes: body["n"] = 2 - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-b", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 2-b") - _, _, err = collection.PutExistingRevWithBody(ctx, "doc2", body, []string{"2-b", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc2", body, []string{"2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 2-b") - _, _, err = collection.PutExistingRevWithBody(ctx, "doc3", body, []string{"2-b", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc3", body, []string{"2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 2-b") body["n"] = 3 - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 2-a") - _, _, err = collection.PutExistingRevWithBody(ctx, "doc2", body, []string{"2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc2", body, []string{"2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 2-a") - _, _, err = collection.PutExistingRevWithBody(ctx, "doc3", body, []string{"2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc3", body, []string{"2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 2-a") // Set AllowConflicts to false @@ -1395,12 +1395,12 @@ func TestAllowConflictsFalseTombstoneExistingConflictNewEditsFalse(t *testing.T) // Attempt to tombstone a non-leaf node of a conflicted document body[BodyDeleted] = true - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-c", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-c", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.True(t, err != nil, "expected error tombstoning non-leaf") // Tombstone the non-winning branch of a conflicted document body[BodyDeleted] = true - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"3-a", "2-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"3-a", "2-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 3-a (tombstone)") doc, err := collection.GetDocument(ctx, "doc1", DocUnmarshalAll) assert.NoError(t, err, "Retrieve doc post-tombstone") @@ -1408,7 +1408,7 @@ func TestAllowConflictsFalseTombstoneExistingConflictNewEditsFalse(t *testing.T) // Tombstone the winning branch of a conflicted document body[BodyDeleted] = true - _, _, err = collection.PutExistingRevWithBody(ctx, "doc2", body, []string{"3-b", "2-b"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc2", body, []string{"3-b", "2-b"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 3-b (tombstone)") doc, err = collection.GetDocument(ctx, "doc2", DocUnmarshalAll) assert.NoError(t, err, "Retrieve doc post-tombstone") @@ -1417,7 +1417,7 @@ func TestAllowConflictsFalseTombstoneExistingConflictNewEditsFalse(t *testing.T) // Set revs_limit=1, then tombstone non-winning branch of a conflicted document. Validate retrieval still works. body[BodyDeleted] = true db.RevsLimit = uint32(1) - _, _, err = collection.PutExistingRevWithBody(ctx, "doc3", body, []string{"3-a", "2-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc3", body, []string{"3-a", "2-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "add 3-a (tombstone)") doc, err = collection.GetDocument(ctx, "doc3", DocUnmarshalAll) assert.NoError(t, err, "Retrieve doc post-tombstone") @@ -1453,7 +1453,7 @@ func TestSyncFnOnPush(t *testing.T) { body["channels"] = "clibup" history := []string{"4-four", "3-three", "2-488724414d0ed6b398d6d2aeb228d797", rev1id} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, history, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, history, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "PutExistingRev failed") // Check that the doc has the correct channel (test for issue #300) @@ -2145,7 +2145,7 @@ func TestConcurrentPushSameNewNonWinningRevision(t *testing.T) { enableCallback = false body := Body{"name": "Emily", "age": 20} collection := GetSingleDatabaseCollectionWithUser(t, db) - _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"3-b", "2-b", "1-a"}, false) + _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"3-b", "2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Adding revision 3-b") } } @@ -2160,29 +2160,29 @@ func TestConcurrentPushSameNewNonWinningRevision(t *testing.T) { collection := GetSingleDatabaseCollectionWithUser(t, db) body := Body{"name": "Olivia", "age": 80} - _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"1-a"}, false) + _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Adding revision 1-a") body = Body{"name": "Harry", "age": 40} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Adding revision 2-a") body = Body{"name": "Amelia", "age": 20} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"3-a", "2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"3-a", "2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Adding revision 3-a") body = Body{"name": "Charlie", "age": 10} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"4-a", "3-a", "2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"4-a", "3-a", "2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Adding revision 4-a") body = Body{"name": "Noah", "age": 40} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-b", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Adding revision 2-b") enableCallback = true body = Body{"name": "Emily", "age": 20} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"3-b", "2-b", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"3-b", "2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Adding revision 3-b") doc, err := collection.GetDocument(ctx, "doc1", DocUnmarshalAll) @@ -2203,7 +2203,7 @@ func TestConcurrentPushSameTombstoneWinningRevision(t *testing.T) { enableCallback = false body := Body{"name": "Charlie", "age": 10, BodyDeleted: true} collection := GetSingleDatabaseCollectionWithUser(t, db) - _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"4-a", "3-a", "2-a", "1-a"}, false) + _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"4-a", "3-a", "2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Couldn't add revision 4-a (tombstone)") } } @@ -2218,19 +2218,19 @@ func TestConcurrentPushSameTombstoneWinningRevision(t *testing.T) { collection := GetSingleDatabaseCollectionWithUser(t, db) body := Body{"name": "Olivia", "age": 80} - _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"1-a"}, false) + _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Adding revision 1-a") body = Body{"name": "Harry", "age": 40} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Adding revision 2-a") body = Body{"name": "Amelia", "age": 20} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"3-a", "2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"3-a", "2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Adding revision 3-a") body = Body{"name": "Noah", "age": 40} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-b", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Adding revision 2-b") doc, err := collection.GetDocument(ctx, "doc1", DocUnmarshalAll) @@ -2240,7 +2240,7 @@ func TestConcurrentPushSameTombstoneWinningRevision(t *testing.T) { enableCallback = true body = Body{"name": "Charlie", "age": 10, BodyDeleted: true} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"4-a", "3-a", "2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"4-a", "3-a", "2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Couldn't add revision 4-a (tombstone)") doc, err = collection.GetDocument(ctx, "doc1", DocUnmarshalAll) @@ -2261,7 +2261,7 @@ func TestConcurrentPushDifferentUpdateNonWinningRevision(t *testing.T) { enableCallback = false body := Body{"name": "Joshua", "age": 11} collection := GetSingleDatabaseCollectionWithUser(t, db) - _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"3-b1", "2-b", "1-a"}, false) + _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"3-b1", "2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Couldn't add revision 3-b1") } } @@ -2276,29 +2276,29 @@ func TestConcurrentPushDifferentUpdateNonWinningRevision(t *testing.T) { collection := GetSingleDatabaseCollectionWithUser(t, db) body := Body{"name": "Olivia", "age": 80} - _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"1-a"}, false) + _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Adding revision 1-a") body = Body{"name": "Harry", "age": 40} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Adding revision 2-a") body = Body{"name": "Amelia", "age": 20} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"3-a", "2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"3-a", "2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Adding revision 3-a") body = Body{"name": "Charlie", "age": 10} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"4-a", "3-a", "2-a", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"4-a", "3-a", "2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Adding revision 4-a") body = Body{"name": "Noah", "age": 40} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-b", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Adding revision 2-b") enableCallback = true body = Body{"name": "Liam", "age": 12} - _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"3-b2", "2-b", "1-a"}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"3-b2", "2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Couldn't add revision 3-b2") doc, err := collection.GetDocument(ctx, "doc1", DocUnmarshalAll) @@ -2332,7 +2332,7 @@ func TestIncreasingRecentSequences(t *testing.T) { enableCallback = false // Write a doc collection := GetSingleDatabaseCollectionWithUser(t, db) - _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-abc", revid}, true) + _, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"2-abc", revid}, true, ExistingVersionWithUpdateToHLV) assert.NoError(t, err) } } @@ -2349,7 +2349,7 @@ func TestIncreasingRecentSequences(t *testing.T) { assert.NoError(t, err) enableCallback = true - doc, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"3-abc", "2-abc", revid}, true) + doc, _, err := collection.PutExistingRevWithBody(ctx, "doc1", body, []string{"3-abc", "2-abc", revid}, true, ExistingVersionWithUpdateToHLV) assert.NoError(t, err) assert.True(t, sort.IsSorted(base.SortedUint64Slice(doc.SyncData.RecentSequences))) @@ -2799,72 +2799,62 @@ func Test_invalidateAllPrincipalsCache(t *testing.T) { } func Test_resyncDocument(t *testing.T) { - testCases := []struct { - useXattr bool - }{ - {useXattr: true}, - {useXattr: false}, + if !base.TestUseXattrs() { + t.Skip("Walrus doesn't support xattr") } + db, ctx := setupTestDB(t) + defer db.Close(ctx) - for _, testCase := range testCases { - t.Run(fmt.Sprintf("Test_resyncDocument with useXattr: %t", testCase.useXattr), func(t *testing.T) { - if !base.TestUseXattrs() && testCase.useXattr { - t.Skip("Don't run xattr tests on non xattr tests") - } - db, ctx := setupTestDB(t) - defer db.Close(ctx) - - db.Options.EnableXattr = testCase.useXattr - db.Options.QueryPaginationLimit = 100 - collection := GetSingleDatabaseCollectionWithUser(t, db) + db.Options.EnableXattr = true + db.Options.QueryPaginationLimit = 100 + collection := GetSingleDatabaseCollectionWithUser(t, db) - syncFn := ` + syncFn := ` function sync(doc, oldDoc){ channel("channel." + "ABC"); } ` - _, err := collection.UpdateSyncFun(ctx, syncFn) - require.NoError(t, err) + _, err := collection.UpdateSyncFun(ctx, syncFn) + require.NoError(t, err) - docID := uuid.NewString() + docID := uuid.NewString() - updateBody := make(map[string]interface{}) - updateBody["val"] = "value" - _, doc, err := collection.Put(ctx, docID, updateBody) - require.NoError(t, err) - assert.NotNil(t, doc) + updateBody := make(map[string]interface{}) + updateBody["val"] = "value" + _, doc, err := collection.Put(ctx, docID, updateBody) + require.NoError(t, err) + assert.NotNil(t, doc) - syncFn = ` + syncFn = ` function sync(doc, oldDoc){ channel("channel." + "ABC12332423234"); } ` - _, err = collection.UpdateSyncFun(ctx, syncFn) - require.NoError(t, err) - - _, _, err = collection.resyncDocument(ctx, docID, realDocID(docID), false, []uint64{10}) - require.NoError(t, err) - err = collection.WaitForPendingChanges(ctx) - require.NoError(t, err) + _, err = collection.UpdateSyncFun(ctx, syncFn) + require.NoError(t, err) - syncData, err := collection.GetDocSyncData(ctx, docID) - assert.NoError(t, err) + _, _, err = collection.resyncDocument(ctx, docID, realDocID(docID), false, []uint64{10}) + require.NoError(t, err) + err = collection.WaitForPendingChanges(ctx) + require.NoError(t, err) - assert.Len(t, syncData.ChannelSet, 2) - assert.Len(t, syncData.Channels, 2) - found := false + syncData, err := collection.GetDocSyncData(ctx, docID) + assert.NoError(t, err) - for _, chSet := range syncData.ChannelSet { - if chSet.Name == "channel.ABC12332423234" { - found = true - break - } - } + assert.Len(t, syncData.ChannelSet, 2) + assert.Len(t, syncData.Channels, 2) + found := false - assert.True(t, found) - assert.Equal(t, 2, int(db.DbStats.Database().SyncFunctionCount.Value())) - }) + for _, chSet := range syncData.ChannelSet { + if chSet.Name == "channel.ABC12332423234" { + found = true + break + } } + + assert.True(t, found) + assert.Equal(t, 2, int(db.DbStats.Database().SyncFunctionCount.Value())) + } func Test_getUpdatedDocument(t *testing.T) { diff --git a/db/document.go b/db/document.go index f8b20913db..4ec8ae9c49 100644 --- a/db/document.go +++ b/db/document.go @@ -41,6 +41,7 @@ const ( DocUnmarshalHistory // Unmarshals history + rev + CAS only DocUnmarshalRev // Unmarshals rev + CAS only DocUnmarshalCAS // Unmarshals CAS (for import check) only + DocUnmarshalVV // Unmarshals Version Vector only DocUnmarshalNone // No unmarshalling (skips import/upgrade check) ) @@ -65,23 +66,24 @@ type ChannelSetEntry struct { // The sync-gateway metadata stored in the "_sync" property of a Couchbase document. type SyncData struct { - CurrentRev string `json:"rev"` - NewestRev string `json:"new_rev,omitempty"` // Newest rev, if different from CurrentRev - Flags uint8 `json:"flags,omitempty"` - Sequence uint64 `json:"sequence,omitempty"` - UnusedSequences []uint64 `json:"unused_sequences,omitempty"` // unused sequences due to update conflicts/CAS retry - RecentSequences []uint64 `json:"recent_sequences,omitempty"` // recent sequences for this doc - used in server dedup handling - Channels channels.ChannelMap `json:"channels,omitempty"` - Access UserAccessMap `json:"access,omitempty"` - RoleAccess UserAccessMap `json:"role_access,omitempty"` - Expiry *time.Time `json:"exp,omitempty"` // Document expiry. Information only - actual expiry/delete handling is done by bucket storage. Needs to be pointer for omitempty to work (see https://github.com/golang/go/issues/4357) - Cas string `json:"cas"` // String representation of a cas value, populated via macro expansion - Crc32c string `json:"value_crc32c"` // String representation of crc32c hash of doc body, populated via macro expansion - Crc32cUserXattr string `json:"user_xattr_value_crc32c,omitempty"` // String representation of crc32c hash of user xattr - TombstonedAt int64 `json:"tombstoned_at,omitempty"` // Time the document was tombstoned. Used for view compaction - Attachments AttachmentsMeta `json:"attachments,omitempty"` - ChannelSet []ChannelSetEntry `json:"channel_set"` - ChannelSetHistory []ChannelSetEntry `json:"channel_set_history"` + CurrentRev string `json:"rev"` + NewestRev string `json:"new_rev,omitempty"` // Newest rev, if different from CurrentRev + Flags uint8 `json:"flags,omitempty"` + Sequence uint64 `json:"sequence,omitempty"` + UnusedSequences []uint64 `json:"unused_sequences,omitempty"` // unused sequences due to update conflicts/CAS retry + RecentSequences []uint64 `json:"recent_sequences,omitempty"` // recent sequences for this doc - used in server dedup handling + Channels channels.ChannelMap `json:"channels,omitempty"` + Access UserAccessMap `json:"access,omitempty"` + RoleAccess UserAccessMap `json:"role_access,omitempty"` + Expiry *time.Time `json:"exp,omitempty"` // Document expiry. Information only - actual expiry/delete handling is done by bucket storage. Needs to be pointer for omitempty to work (see https://github.com/golang/go/issues/4357) + Cas string `json:"cas"` // String representation of a cas value, populated via macro expansion + Crc32c string `json:"value_crc32c"` // String representation of crc32c hash of doc body, populated via macro expansion + Crc32cUserXattr string `json:"user_xattr_value_crc32c,omitempty"` // String representation of crc32c hash of user xattr + TombstonedAt int64 `json:"tombstoned_at,omitempty"` // Time the document was tombstoned. Used for view compaction + Attachments AttachmentsMeta `json:"attachments,omitempty"` + ChannelSet []ChannelSetEntry `json:"channel_set"` + ChannelSetHistory []ChannelSetEntry `json:"channel_set_history"` + HLV *HybridLogicalVector `json:"_vv,omitempty"` // Only used for performance metrics: TimeSaved time.Time `json:"time_saved,omitempty"` // Timestamp of save. @@ -176,11 +178,12 @@ type Document struct { Cas uint64 // Document cas rawUserXattr []byte // Raw user xattr as retrieved from the bucket - Deleted bool - DocExpiry uint32 - RevID string - DocAttachments AttachmentsMeta - inlineSyncData bool + Deleted bool + DocExpiry uint32 + RevID string + DocAttachments AttachmentsMeta + inlineSyncData bool + currentRevChannels base.Set // A base.Set of the current revision's channels (determined by SyncData.Channels at UnmarshalJSON time) } type historyOnlySyncData struct { @@ -969,6 +972,7 @@ func (doc *Document) updateChannels(ctx context.Context, newChannels base.Set) ( doc.updateChannelHistory(channel, doc.Sequence, true) } } + doc.currentRevChannels = newChannels if changed != nil { base.InfofCtx(ctx, base.KeyCRUD, "\tDoc %q / %q in channels %q", base.UD(doc.ID), doc.CurrentRev, base.UD(newChannels)) changedChannels, err = channels.SetFromArray(changed, channels.KeepStar) @@ -1078,6 +1082,17 @@ func (doc *Document) UnmarshalJSON(data []byte) error { doc.SyncData = *syncData.SyncData } + // determine current revision's channels and store in-memory (avoids doc.Channels iteration at access-check time) + if len(doc.Channels) > 0 { + ch := base.SetOf() + for channelName, channelRemoval := range doc.Channels { + if channelRemoval == nil || channelRemoval.Seq == 0 { + ch.Add(channelName) + } + } + doc.currentRevChannels = ch + } + // Unmarshal the rest of the doc body as map[string]interface{} if err := doc._body.Unmarshal(data); err != nil { return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalJSON() doc with id: %s. Error: %v", base.UD(doc.ID), err)) @@ -1132,7 +1147,6 @@ func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata if unmarshalLevel == DocUnmarshalAll && len(data) > 0 { return doc._body.Unmarshal(data) } - case DocUnmarshalNoHistory: // Unmarshal sync metadata only, excluding history doc.SyncData = SyncData{} @@ -1176,6 +1190,14 @@ func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata Cas: casOnlyMeta.Cas, } doc._rawBody = data + case DocUnmarshalVV: + tmpData := SyncData{} + unmarshalErr := base.JSONUnmarshal(xdata, &tmpData) + if unmarshalErr != nil { + return base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalVV). Error: %w", base.UD(doc.ID), unmarshalErr) + } + doc.SyncData.HLV = tmpData.HLV + doc._rawBody = data } // If there's no body, but there is an xattr, set deleted flag and initialize an empty body @@ -1217,3 +1239,17 @@ func (doc *Document) MarshalWithXattr() (data []byte, xdata []byte, err error) { return data, xdata, nil } + +// HasCurrentVersion Compares the specified CV with the fetched documents CV, returns error on mismatch between the two +func (d *Document) HasCurrentVersion(cv CurrentVersionVector) error { + if d.HLV == nil { + return base.RedactErrorf("no HLV present in fetched doc %s", base.UD(d.ID)) + } + + // fetch the current version for the loaded doc and compare against the CV specified in the IDandCV key + fetchedDocSource, fetchedDocVersion := d.HLV.GetCurrentVersion() + if fetchedDocSource != cv.SourceID || fetchedDocVersion != cv.VersionCAS { + return base.RedactErrorf("mismatch between specified current version and fetched document current version for doc %s", base.UD(d.ID)) + } + return nil +} diff --git a/db/document_test.go b/db/document_test.go index 16fbd97ff4..6301e99ec3 100644 --- a/db/document_test.go +++ b/db/document_test.go @@ -14,6 +14,7 @@ import ( "bytes" "encoding/binary" "log" + "reflect" "testing" "github.com/couchbase/sync_gateway/base" @@ -190,6 +191,106 @@ func BenchmarkUnmarshalBody(b *testing.B) { } } +const doc_meta_with_vv = `{ + "rev": "3-89758294abc63157354c2b08547c2d21", + "sequence": 7, + "recent_sequences": [ + 5, + 6, + 7 + ], + "history": { + "revs": [ + "1-fc591a068c153d6c3d26023d0d93dcc1", + "2-0eab03571bc55510c8fc4bfac9fe4412", + "3-89758294abc63157354c2b08547c2d21" + ], + "parents": [ + -1, + 0, + 1 + ], + "channels": [ + [ + "ABC", + "DEF" + ], + [ + "ABC", + "DEF", + "GHI" + ], + [ + "ABC", + "GHI" + ] + ] + }, + "channels": { + "ABC": null, + "DEF": { + "seq": 7, + "rev": "3-89758294abc63157354c2b08547c2d21" + }, + "GHI": null + }, + "_vv":{ + "cvCas":"0x40e2010000000000", + "src":"cb06dc003846116d9b66d2ab23887a96", + "vrs":"0x40e2010000000000", + "mv":{ + "s_LhRPsa7CpjEvP5zeXTXEBA":"c0ff05d7ac059a16", + "s_NqiIe0LekFPLeX4JvTO6Iw":"1c008cd6ac059a16" + }, + "pv":{ + "s_YZvBpEaztom9z5V/hDoeIw":"f0ff44d6ac059a16" + } + }, + "cas": "", + "time_saved": "2017-10-25T12:45:29.622450174-07:00" + }` + +func TestParseVersionVectorSyncData(t *testing.T) { + mv := make(map[string]uint64) + pv := make(map[string]uint64) + mv["s_LhRPsa7CpjEvP5zeXTXEBA"] = 1628620455147864000 + mv["s_NqiIe0LekFPLeX4JvTO6Iw"] = 1628620455139868700 + pv["s_YZvBpEaztom9z5V/hDoeIw"] = 1628620455135215600 + + ctx := base.TestCtx(t) + + doc_meta := []byte(doc_meta_with_vv) + doc, err := unmarshalDocumentWithXattr(ctx, "doc_1k", nil, doc_meta, nil, 1, DocUnmarshalVV) + require.NoError(t, err) + + // assert on doc version vector values + assert.Equal(t, uint64(123456), doc.SyncData.HLV.CurrentVersionCAS) + assert.Equal(t, uint64(123456), doc.SyncData.HLV.Version) + assert.Equal(t, "cb06dc003846116d9b66d2ab23887a96", doc.SyncData.HLV.SourceID) + assert.True(t, reflect.DeepEqual(mv, doc.SyncData.HLV.MergeVersions)) + assert.True(t, reflect.DeepEqual(pv, doc.SyncData.HLV.PreviousVersions)) + + doc, err = unmarshalDocumentWithXattr(ctx, "doc1", nil, doc_meta, nil, 1, DocUnmarshalAll) + require.NoError(t, err) + + // assert on doc version vector values + assert.Equal(t, uint64(123456), doc.SyncData.HLV.CurrentVersionCAS) + assert.Equal(t, uint64(123456), doc.SyncData.HLV.Version) + assert.Equal(t, "cb06dc003846116d9b66d2ab23887a96", doc.SyncData.HLV.SourceID) + assert.True(t, reflect.DeepEqual(mv, doc.SyncData.HLV.MergeVersions)) + assert.True(t, reflect.DeepEqual(pv, doc.SyncData.HLV.PreviousVersions)) + + doc, err = unmarshalDocumentWithXattr(ctx, "doc1", nil, doc_meta, nil, 1, DocUnmarshalNoHistory) + require.NoError(t, err) + + // assert on doc version vector values + assert.Equal(t, uint64(123456), doc.SyncData.HLV.CurrentVersionCAS) + assert.Equal(t, uint64(123456), doc.SyncData.HLV.Version) + assert.Equal(t, "cb06dc003846116d9b66d2ab23887a96", doc.SyncData.HLV.SourceID) + assert.True(t, reflect.DeepEqual(mv, doc.SyncData.HLV.MergeVersions)) + assert.True(t, reflect.DeepEqual(pv, doc.SyncData.HLV.PreviousVersions)) +} + func TestParseXattr(t *testing.T) { zeroByte := byte(0) // Build payload for single xattr pair and body diff --git a/db/query_test.go b/db/query_test.go index 2ef43c0f82..81d262c96f 100644 --- a/db/query_test.go +++ b/db/query_test.go @@ -372,7 +372,7 @@ func TestQueryChannelsActiveOnlyWithLimit(t *testing.T) { // Create 10 added documents for i := 1; i <= 10; i++ { id := "created" + strconv.Itoa(i) - doc, revId, err := collection.PutExistingRevWithBody(ctx, id, body, []string{"1-a"}, false) + doc, revId, err := collection.PutExistingRevWithBody(ctx, id, body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err, "Couldn't create document") require.Equal(t, "1-a", revId) docIdFlagMap[doc.ID] = uint8(0x0) @@ -385,12 +385,12 @@ func TestQueryChannelsActiveOnlyWithLimit(t *testing.T) { // Create 10 deleted documents for i := 1; i <= 10; i++ { id := "deleted" + strconv.Itoa(i) - _, revId, err := collection.PutExistingRevWithBody(ctx, id, body, []string{"1-a"}, false) + _, revId, err := collection.PutExistingRevWithBody(ctx, id, body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err, "Couldn't create document") require.Equal(t, "1-a", revId) body[BodyDeleted] = true - doc, revId, err := collection.PutExistingRevWithBody(ctx, id, body, []string{"2-a", "1-a"}, false) + doc, revId, err := collection.PutExistingRevWithBody(ctx, id, body, []string{"2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err, "Couldn't create document") require.Equal(t, "2-a", revId, "Couldn't create tombstone revision") @@ -402,22 +402,22 @@ func TestQueryChannelsActiveOnlyWithLimit(t *testing.T) { for i := 1; i <= 10; i++ { body["sound"] = "meow" id := "branched" + strconv.Itoa(i) - _, revId, err := collection.PutExistingRevWithBody(ctx, id, body, []string{"1-a"}, false) + _, revId, err := collection.PutExistingRevWithBody(ctx, id, body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err, "Couldn't create document revision 1-a") require.Equal(t, "1-a", revId) body["sound"] = "bark" - _, revId, err = collection.PutExistingRevWithBody(ctx, id, body, []string{"2-b", "1-a"}, false) + _, revId, err = collection.PutExistingRevWithBody(ctx, id, body, []string{"2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err, "Couldn't create revision 2-b") require.Equal(t, "2-b", revId) body["sound"] = "bleat" - _, revId, err = collection.PutExistingRevWithBody(ctx, id, body, []string{"2-a", "1-a"}, false) + _, revId, err = collection.PutExistingRevWithBody(ctx, id, body, []string{"2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err, "Couldn't create revision 2-a") require.Equal(t, "2-a", revId) body[BodyDeleted] = true - doc, revId, err := collection.PutExistingRevWithBody(ctx, id, body, []string{"3-a", "2-a"}, false) + doc, revId, err := collection.PutExistingRevWithBody(ctx, id, body, []string{"3-a", "2-a"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err, "Couldn't create document") require.Equal(t, "3-a", revId, "Couldn't create tombstone revision") @@ -429,27 +429,27 @@ func TestQueryChannelsActiveOnlyWithLimit(t *testing.T) { for i := 1; i <= 10; i++ { body["sound"] = "meow" id := "branched|deleted" + strconv.Itoa(i) - _, revId, err := collection.PutExistingRevWithBody(ctx, id, body, []string{"1-a"}, false) + _, revId, err := collection.PutExistingRevWithBody(ctx, id, body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err, "Couldn't create document revision 1-a") require.Equal(t, "1-a", revId) body["sound"] = "bark" - _, revId, err = collection.PutExistingRevWithBody(ctx, id, body, []string{"2-b", "1-a"}, false) + _, revId, err = collection.PutExistingRevWithBody(ctx, id, body, []string{"2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err, "Couldn't create revision 2-b") require.Equal(t, "2-b", revId) body["sound"] = "bleat" - _, revId, err = collection.PutExistingRevWithBody(ctx, id, body, []string{"2-a", "1-a"}, false) + _, revId, err = collection.PutExistingRevWithBody(ctx, id, body, []string{"2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err, "Couldn't create revision 2-a") require.Equal(t, "2-a", revId) body[BodyDeleted] = true - _, revId, err = collection.PutExistingRevWithBody(ctx, id, body, []string{"3-a", "2-a"}, false) + _, revId, err = collection.PutExistingRevWithBody(ctx, id, body, []string{"3-a", "2-a"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err, "Couldn't create document") require.Equal(t, "3-a", revId, "Couldn't create tombstone revision") body[BodyDeleted] = true - doc, revId, err := collection.PutExistingRevWithBody(ctx, id, body, []string{"3-b", "2-b"}, false) + doc, revId, err := collection.PutExistingRevWithBody(ctx, id, body, []string{"3-b", "2-b"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err, "Couldn't create document") require.Equal(t, "3-b", revId, "Couldn't create tombstone revision") @@ -461,17 +461,17 @@ func TestQueryChannelsActiveOnlyWithLimit(t *testing.T) { for i := 1; i <= 10; i++ { body["sound"] = "meow" id := "branched|conflict" + strconv.Itoa(i) - _, revId, err := collection.PutExistingRevWithBody(ctx, id, body, []string{"1-a"}, false) + _, revId, err := collection.PutExistingRevWithBody(ctx, id, body, []string{"1-a"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err, "Couldn't create document revision 1-a") require.Equal(t, "1-a", revId) body["sound"] = "bark" - _, revId, err = collection.PutExistingRevWithBody(ctx, id, body, []string{"2-b", "1-a"}, false) + _, revId, err = collection.PutExistingRevWithBody(ctx, id, body, []string{"2-b", "1-a"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err, "Couldn't create revision 2-b") require.Equal(t, "2-b", revId) body["sound"] = "bleat" - doc, revId, err := collection.PutExistingRevWithBody(ctx, id, body, []string{"2-a", "1-a"}, false) + doc, revId, err := collection.PutExistingRevWithBody(ctx, id, body, []string{"2-a", "1-a"}, false, ExistingVersionWithUpdateToHLV) require.NoError(t, err, "Couldn't create revision 2-a") require.Equal(t, "2-a", revId) diff --git a/db/revision_cache_bypass.go b/db/revision_cache_bypass.go index 049faeb937..1b05788870 100644 --- a/db/revision_cache_bypass.go +++ b/db/revision_cache_bypass.go @@ -30,8 +30,8 @@ func NewBypassRevisionCache(backingStore RevisionCacheBackingStore, bypassStat * } } -// Get fetches the revision for the given docID and revID immediately from the bucket. -func (rc *BypassRevisionCache) Get(ctx context.Context, docID, revID string, includeBody bool, includeDelta bool) (docRev DocumentRevision, err error) { +// GetWithRev fetches the revision for the given docID and revID immediately from the bucket. +func (rc *BypassRevisionCache) GetWithRev(ctx context.Context, docID, revID string, includeBody, includeDelta bool) (docRev DocumentRevision, err error) { unmarshalLevel := DocUnmarshalSync if includeBody { @@ -45,7 +45,33 @@ func (rc *BypassRevisionCache) Get(ctx context.Context, docID, revID string, inc docRev = DocumentRevision{ RevID: revID, } - docRev.BodyBytes, docRev._shallowCopyBody, docRev.History, docRev.Channels, docRev.Removed, docRev.Attachments, docRev.Deleted, docRev.Expiry, err = revCacheLoaderForDocument(ctx, rc.backingStore, doc, revID) + docRev.BodyBytes, docRev._shallowCopyBody, docRev.History, docRev.Channels, docRev.Removed, docRev.Attachments, docRev.Deleted, docRev.Expiry, docRev.CV, err = revCacheLoaderForDocument(ctx, rc.backingStore, doc, revID) + if err != nil { + return DocumentRevision{}, err + } + + rc.bypassStat.Add(1) + + return docRev, nil +} + +// GetWithCV fetches the Current Version for the given docID and CV immediately from the bucket. +func (rc *BypassRevisionCache) GetWithCV(ctx context.Context, docID string, cv *CurrentVersionVector, includeBody, includeDelta bool) (docRev DocumentRevision, err error) { + + unmarshalLevel := DocUnmarshalSync + if includeBody { + unmarshalLevel = DocUnmarshalAll + } + docRev = DocumentRevision{ + CV: cv, + } + + doc, err := rc.backingStore.GetDocument(ctx, docID, unmarshalLevel) + if err != nil { + return DocumentRevision{}, err + } + + docRev.BodyBytes, docRev._shallowCopyBody, docRev.History, docRev.Channels, docRev.Removed, docRev.Attachments, docRev.Deleted, docRev.Expiry, docRev.RevID, err = revCacheLoaderForDocumentCV(ctx, rc.backingStore, doc, *cv) if err != nil { return DocumentRevision{}, err } @@ -71,7 +97,7 @@ func (rc *BypassRevisionCache) GetActive(ctx context.Context, docID string, incl RevID: doc.CurrentRev, } - docRev.BodyBytes, docRev._shallowCopyBody, docRev.History, docRev.Channels, docRev.Removed, docRev.Attachments, docRev.Deleted, docRev.Expiry, err = revCacheLoaderForDocument(ctx, rc.backingStore, doc, doc.SyncData.CurrentRev) + docRev.BodyBytes, docRev._shallowCopyBody, docRev.History, docRev.Channels, docRev.Removed, docRev.Attachments, docRev.Deleted, docRev.Expiry, docRev.CV, err = revCacheLoaderForDocument(ctx, rc.backingStore, doc, doc.SyncData.CurrentRev) if err != nil { return DocumentRevision{}, err } @@ -96,7 +122,11 @@ func (rc *BypassRevisionCache) Upsert(ctx context.Context, docRev DocumentRevisi // no-op } -func (rc *BypassRevisionCache) Remove(docID, revID string) { +func (rc *BypassRevisionCache) RemoveWithRev(docID, revID string) { + // nop +} + +func (rc *BypassRevisionCache) RemoveWithCV(docID string, cv *CurrentVersionVector) { // nop } diff --git a/db/revision_cache_interface.go b/db/revision_cache_interface.go index cd8ba32b39..e50ba72f98 100644 --- a/db/revision_cache_interface.go +++ b/db/revision_cache_interface.go @@ -28,10 +28,15 @@ const ( // RevisionCache is an interface that can be used to fetch a DocumentRevision for a Doc ID and Rev ID pair. type RevisionCache interface { - // Get returns the given revision, and stores if not already cached. + // GetWithRev returns the given revision, and stores if not already cached. // When includeBody=true, the returned DocumentRevision will include a mutable shallow copy of the marshaled body. // When includeDelta=true, the returned DocumentRevision will include delta - requires additional locking during retrieval. - Get(ctx context.Context, docID, revID string, includeBody bool, includeDelta bool) (DocumentRevision, error) + GetWithRev(ctx context.Context, docID, revID string, includeBody, includeDelta bool) (DocumentRevision, error) + + // GetWithCV returns the given revision by CV, and stores if not already cached. + // When includeBody=true, the returned DocumentRevision will include a mutable shallow copy of the marshaled body. + // When includeDelta=true, the returned DocumentRevision will include delta - requires additional locking during retrieval. + GetWithCV(ctx context.Context, docID string, cv *CurrentVersionVector, includeBody, includeDelta bool) (DocumentRevision, error) // GetActive returns the current revision for the given doc ID, and stores if not already cached. // When includeBody=true, the returned DocumentRevision will include a mutable shallow copy of the marshaled body. @@ -46,8 +51,11 @@ type RevisionCache interface { // Update will remove existing value and re-create new one Upsert(ctx context.Context, docRev DocumentRevision) - // Remove eliminates a revision in the cache. - Remove(docID, revID string) + // RemoveWithRev evicts a revision from the cache using its revID. + RemoveWithRev(docID, revID string) + + // RemoveWithCV evicts a revision from the cache using its current version. + RemoveWithCV(docID string, cv *CurrentVersionVector) // UpdateDelta stores the given toDelta value in the given rev if cached UpdateDelta(ctx context.Context, docID, revID string, toDelta RevisionDelta) @@ -104,6 +112,7 @@ func DefaultRevisionCacheOptions() *RevisionCacheOptions { type RevisionCacheBackingStore interface { GetDocument(ctx context.Context, docid string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) getRevision(ctx context.Context, doc *Document, revid string) ([]byte, Body, AttachmentsMeta, error) + getCurrentVersion(ctx context.Context, doc *Document) ([]byte, Body, AttachmentsMeta, error) } // DocumentRevision stored and returned by the rev cache @@ -119,6 +128,7 @@ type DocumentRevision struct { Delta *RevisionDelta Deleted bool Removed bool // True if the revision is a removal. + CV *CurrentVersionVector _shallowCopyBody Body // an unmarshalled body that can produce shallow copies } @@ -223,6 +233,12 @@ type IDAndRev struct { RevID string } +type IDandCV struct { + DocID string + Version uint64 + Source string +} + // RevisionDelta stores data about a delta between a revision and ToRevID. type RevisionDelta struct { ToRevID string // Target revID for the delta @@ -246,44 +262,104 @@ func newRevCacheDelta(deltaBytes []byte, fromRevID string, toRevision DocumentRe // This is the RevisionCacheLoaderFunc callback for the context's RevisionCache. // Its job is to load a revision from the bucket when there's a cache miss. -func revCacheLoader(ctx context.Context, backingStore RevisionCacheBackingStore, id IDAndRev, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, err error) { +func revCacheLoader(ctx context.Context, backingStore RevisionCacheBackingStore, id IDAndRev, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, fetchedCV *CurrentVersionVector, err error) { var doc *Document unmarshalLevel := DocUnmarshalSync if unmarshalBody { unmarshalLevel = DocUnmarshalAll } if doc, err = backingStore.GetDocument(ctx, id.DocID, unmarshalLevel); doc == nil { - return bodyBytes, body, history, channels, removed, attachments, deleted, expiry, err + return bodyBytes, body, history, channels, removed, attachments, deleted, expiry, fetchedCV, err } return revCacheLoaderForDocument(ctx, backingStore, doc, id.RevID) } +// revCacheLoaderForCv will load a document from the bucket using the CV, comapre the fetched doc and the CV specified in the function, +// and will still return revid for purpose of populating the Rev ID lookup map on the cache +func revCacheLoaderForCv(ctx context.Context, backingStore RevisionCacheBackingStore, id IDandCV, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, err error) { + cv := CurrentVersionVector{ + VersionCAS: id.Version, + SourceID: id.Source, + } + var doc *Document + unmarshalLevel := DocUnmarshalSync + if unmarshalBody { + unmarshalLevel = DocUnmarshalAll + } + if doc, err = backingStore.GetDocument(ctx, id.DocID, unmarshalLevel); doc == nil { + return bodyBytes, body, history, channels, removed, attachments, deleted, expiry, revid, err + } + + return revCacheLoaderForDocumentCV(ctx, backingStore, doc, cv) +} + // Common revCacheLoader functionality used either during a cache miss (from revCacheLoader), or directly when retrieving current rev from cache -func revCacheLoaderForDocument(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, revid string) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, err error) { +func revCacheLoaderForDocument(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, revid string) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, fetchedCV *CurrentVersionVector, err error) { if bodyBytes, body, attachments, err = backingStore.getRevision(ctx, doc, revid); err != nil { // If we can't find the revision (either as active or conflicted body from the document, or as old revision body backup), check whether // the revision was a channel removal. If so, we want to store as removal in the revision cache removalBodyBytes, removalHistory, activeChannels, isRemoval, isDelete, isRemovalErr := doc.IsChannelRemoval(ctx, revid) if isRemovalErr != nil { - return bodyBytes, body, history, channels, isRemoval, nil, isDelete, nil, isRemovalErr + return bodyBytes, body, history, channels, isRemoval, nil, isDelete, nil, fetchedCV, isRemovalErr } if isRemoval { - return removalBodyBytes, body, removalHistory, activeChannels, isRemoval, nil, isDelete, nil, nil + return removalBodyBytes, body, removalHistory, activeChannels, isRemoval, nil, isDelete, nil, fetchedCV, nil } else { // If this wasn't a removal, return the original error from getRevision - return bodyBytes, body, history, channels, removed, nil, isDelete, nil, err + return bodyBytes, body, history, channels, removed, nil, isDelete, nil, fetchedCV, err } } deleted = doc.History[revid].Deleted validatedHistory, getHistoryErr := doc.History.getHistory(revid) if getHistoryErr != nil { - return bodyBytes, body, history, channels, removed, nil, deleted, nil, getHistoryErr + return bodyBytes, body, history, channels, removed, nil, deleted, nil, fetchedCV, getHistoryErr } history = encodeRevisions(ctx, doc.ID, validatedHistory) channels = doc.History[revid].Channels + if doc.HLV != nil { + fetchedCV = &CurrentVersionVector{SourceID: doc.HLV.SourceID, VersionCAS: doc.HLV.Version} + } + + return bodyBytes, body, history, channels, removed, attachments, deleted, doc.Expiry, fetchedCV, err +} + +// revCacheLoaderForDocumentCV used either during cache miss (from revCacheLoaderForCv), or used directly when getting current active CV from cache +func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, cv CurrentVersionVector) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, err error) { + if bodyBytes, body, attachments, err = backingStore.getCurrentVersion(ctx, doc); err != nil { + // we need implementation of IsChannelRemoval for CV here. + // pending CBG-3213 support of channel removal for CV + } - return bodyBytes, body, history, channels, removed, attachments, deleted, doc.Expiry, err + if err = doc.HasCurrentVersion(cv); err != nil { + return bodyBytes, body, history, channels, removed, attachments, deleted, doc.Expiry, revid, err + } + channels = doc.currentRevChannels + revid = doc.CurrentRev + + return bodyBytes, body, history, channels, removed, attachments, deleted, doc.Expiry, revid, err +} + +func (c *DatabaseCollection) getCurrentVersion(ctx context.Context, doc *Document) (bodyBytes []byte, body Body, attachments AttachmentsMeta, err error) { + bodyBytes, err = doc.BodyBytes(ctx) + if err != nil { + base.WarnfCtx(ctx, "Marshal error when retrieving active current version body: %v", err) + return nil, nil, nil, err + } + + body = doc._body + attachments = doc.Attachments + + // handle backup revision inline attachments, or pre-2.5 meta + if inlineAtts, cleanBodyBytes, cleanBody, err := extractInlineAttachments(bodyBytes); err != nil { + return nil, nil, nil, err + } else if len(inlineAtts) > 0 { + // we found some inline attachments, so merge them with attachments, and update the bodies + attachments = mergeAttachments(inlineAtts, attachments) + bodyBytes = cleanBodyBytes + body = cleanBody + } + return bodyBytes, body, attachments, err } diff --git a/db/revision_cache_lru.go b/db/revision_cache_lru.go index 575c7c6811..32d78d7613 100644 --- a/db/revision_cache_lru.go +++ b/db/revision_cache_lru.go @@ -45,8 +45,12 @@ func (sc *ShardedLRURevisionCache) getShard(docID string) *LRURevisionCache { return sc.caches[sgbucket.VBHash(docID, sc.numShards)] } -func (sc *ShardedLRURevisionCache) Get(ctx context.Context, docID, revID string, includeBody bool, includeDelta bool) (docRev DocumentRevision, err error) { - return sc.getShard(docID).Get(ctx, docID, revID, includeBody, includeDelta) +func (sc *ShardedLRURevisionCache) GetWithRev(ctx context.Context, docID, revID string, includeBody, includeDelta bool) (docRev DocumentRevision, err error) { + return sc.getShard(docID).GetWithRev(ctx, docID, revID, includeBody, includeDelta) +} + +func (sc *ShardedLRURevisionCache) GetWithCV(ctx context.Context, docID string, cv *CurrentVersionVector, includeBody, includeDelta bool) (docRev DocumentRevision, err error) { + return sc.getShard(docID).GetWithCV(ctx, docID, cv, includeBody, includeDelta) } func (sc *ShardedLRURevisionCache) Peek(ctx context.Context, docID, revID string) (docRev DocumentRevision, found bool) { @@ -69,14 +73,19 @@ func (sc *ShardedLRURevisionCache) Upsert(ctx context.Context, docRev DocumentRe sc.getShard(docRev.DocID).Upsert(ctx, docRev) } -func (sc *ShardedLRURevisionCache) Remove(docID, revID string) { - sc.getShard(docID).Remove(docID, revID) +func (sc *ShardedLRURevisionCache) RemoveWithRev(docID, revID string) { + sc.getShard(docID).RemoveWithRev(docID, revID) +} + +func (sc *ShardedLRURevisionCache) RemoveWithCV(docID string, cv *CurrentVersionVector) { + sc.getShard(docID).RemoveWithCV(docID, cv) } // An LRU cache of document revision bodies, together with their channel access. type LRURevisionCache struct { backingStore RevisionCacheBackingStore cache map[IDAndRev]*list.Element + hlvCache map[IDandCV]*list.Element lruList *list.List cacheHits *base.SgwIntStat cacheMisses *base.SgwIntStat @@ -93,7 +102,9 @@ type revCacheValue struct { attachments AttachmentsMeta delta *RevisionDelta body Body - key IDAndRev + id string + cv CurrentVersionVector + revID string bodyBytes []byte lock sync.RWMutex deleted bool @@ -105,6 +116,7 @@ func NewLRURevisionCache(capacity uint32, backingStore RevisionCacheBackingStore return &LRURevisionCache{ cache: map[IDAndRev]*list.Element{}, + hlvCache: map[IDandCV]*list.Element{}, lruList: list.New(), capacity: capacity, backingStore: backingStore, @@ -117,14 +129,18 @@ func NewLRURevisionCache(capacity uint32, backingStore RevisionCacheBackingStore // Returns the body of the revision, its history, and the set of channels it's in. // If the cache has a loaderFunction, it will be called if the revision isn't in the cache; // any error returned by the loaderFunction will be returned from Get. -func (rc *LRURevisionCache) Get(ctx context.Context, docID, revID string, includeBody bool, includeDelta bool) (DocumentRevision, error) { - return rc.getFromCache(ctx, docID, revID, true, includeBody, includeDelta) +func (rc *LRURevisionCache) GetWithRev(ctx context.Context, docID, revID string, includeBody, includeDelta bool) (DocumentRevision, error) { + return rc.getFromCacheByRev(ctx, docID, revID, true, includeBody, includeDelta) +} + +func (rc *LRURevisionCache) GetWithCV(ctx context.Context, docID string, cv *CurrentVersionVector, includeBody, includeDelta bool) (DocumentRevision, error) { + return rc.getFromCacheByCV(ctx, docID, cv, true, includeBody, includeDelta) } // Looks up a revision from the cache only. Will not fall back to loader function if not // present in the cache. func (rc *LRURevisionCache) Peek(ctx context.Context, docID, revID string) (docRev DocumentRevision, found bool) { - docRev, err := rc.getFromCache(ctx, docID, revID, false, RevCacheOmitBody, RevCacheOmitDelta) + docRev, err := rc.getFromCacheByRev(ctx, docID, revID, false, RevCacheOmitBody, RevCacheOmitDelta) if err != nil { return DocumentRevision{}, false } @@ -140,18 +156,42 @@ func (rc *LRURevisionCache) UpdateDelta(ctx context.Context, docID, revID string } } -func (rc *LRURevisionCache) getFromCache(ctx context.Context, docID, revID string, loadOnCacheMiss bool, includeBody bool, includeDelta bool) (DocumentRevision, error) { +func (rc *LRURevisionCache) getFromCacheByRev(ctx context.Context, docID, revID string, loadOnCacheMiss bool, includeBody bool, includeDelta bool) (DocumentRevision, error) { value := rc.getValue(docID, revID, loadOnCacheMiss) if value == nil { return DocumentRevision{}, nil } - docRev, statEvent, err := value.load(ctx, rc.backingStore, includeBody, includeDelta) - rc.statsRecorderFunc(statEvent) + docRev, cacheHit, err := value.load(ctx, rc.backingStore, includeBody, includeDelta) + rc.statsRecorderFunc(cacheHit) + + if err != nil { + rc.removeValue(value) // don't keep failed loads in the cache + } + if !cacheHit { + rc.addToHLVMapPostLoad(docID, docRev.RevID, docRev.CV) + } + + return docRev, err +} + +func (rc *LRURevisionCache) getFromCacheByCV(ctx context.Context, docID string, cv *CurrentVersionVector, loadCacheOnMiss bool, includeBody bool, includeDelta bool) (DocumentRevision, error) { + value := rc.getValueByCV(docID, cv, loadCacheOnMiss) + if value == nil { + return DocumentRevision{}, nil + } + + docRev, cacheHit, err := value.load(ctx, rc.backingStore, includeBody, includeDelta) + rc.statsRecorderFunc(cacheHit) if err != nil { rc.removeValue(value) // don't keep failed loads in the cache } + + if !cacheHit { + rc.addToRevMapPostLoad(docID, docRev.RevID, docRev.CV) + } + return docRev, err } @@ -162,15 +202,16 @@ func (rc *LRURevisionCache) LoadInvalidRevFromBackingStore(ctx context.Context, var docRevBody Body value := revCacheValue{ - key: key, + id: key.DocID, + revID: key.RevID, } // If doc has been passed in use this to grab values. Otherwise run revCacheLoader which will grab the Document // first if doc != nil { - value.bodyBytes, value.body, value.history, value.channels, value.removed, value.attachments, value.deleted, value.expiry, value.err = revCacheLoaderForDocument(ctx, rc.backingStore, doc, key.RevID) + value.bodyBytes, value.body, value.history, value.channels, value.removed, value.attachments, value.deleted, value.expiry, _, value.err = revCacheLoaderForDocument(ctx, rc.backingStore, doc, key.RevID) } else { - value.bodyBytes, value.body, value.history, value.channels, value.removed, value.attachments, value.deleted, value.expiry, value.err = revCacheLoader(ctx, rc.backingStore, key, includeBody) + value.bodyBytes, value.body, value.history, value.channels, value.removed, value.attachments, value.deleted, value.expiry, _, value.err = revCacheLoader(ctx, rc.backingStore, key, includeBody) } if includeDelta { @@ -210,12 +251,15 @@ func (rc *LRURevisionCache) GetActive(ctx context.Context, docID string, include // Retrieve from or add to rev cache value := rc.getValue(docID, bucketDoc.CurrentRev, true) - docRev, statEvent, err := value.loadForDoc(ctx, rc.backingStore, bucketDoc, includeBody) - rc.statsRecorderFunc(statEvent) + docRev, cacheHit, err := value.loadForDoc(ctx, rc.backingStore, bucketDoc, includeBody) + rc.statsRecorderFunc(cacheHit) if err != nil { rc.removeValue(value) // don't keep failed loads in the cache } + // add successfully fetched value to cv lookup map too + rc.addToHLVMapPostLoad(docID, docRev.RevID, docRev.CV) + return docRev, err } @@ -234,30 +278,43 @@ func (rc *LRURevisionCache) Put(ctx context.Context, docRev DocumentRevision) { // TODO: CBG-1948 panic("Missing history for RevisionCache.Put") } - value := rc.getValue(docRev.DocID, docRev.RevID, true) + // doc should always have a cv present in a PUT operation on the cache (update HLV is called before hand in doc update process) + // thus we can call getValueByCV directly the update the rev lookup post this + value := rc.getValueByCV(docRev.DocID, docRev.CV, true) + // store the created value value.store(docRev) + + // add new doc version to the rev id lookup map + rc.addToRevMapPostLoad(docRev.DocID, docRev.RevID, docRev.CV) } // Upsert a revision in the cache. func (rc *LRURevisionCache) Upsert(ctx context.Context, docRev DocumentRevision) { - key := IDAndRev{DocID: docRev.DocID, RevID: docRev.RevID} + var value *revCacheValue + // similar to PUT operation we should have the CV defined by this point (updateHLV is called before calling this) + key := IDandCV{DocID: docRev.DocID, Source: docRev.CV.SourceID, Version: docRev.CV.VersionCAS} + legacyKey := IDAndRev{DocID: docRev.DocID, RevID: docRev.RevID} rc.lock.Lock() - // If element exists remove from lrulist - if elem := rc.cache[key]; elem != nil { + // lookup for element in hlv lookup map, if not found for some reason try rev lookup map + if elem := rc.hlvCache[key]; elem != nil { + rc.lruList.Remove(elem) + } else if elem = rc.cache[legacyKey]; elem != nil { rc.lruList.Remove(elem) } // Add new value and overwrite existing cache key, pushing to front to maintain order - value := &revCacheValue{key: key} - rc.cache[key] = rc.lruList.PushFront(value) + // also ensure we add to rev id lookup map too + value = &revCacheValue{id: docRev.DocID, cv: *docRev.CV} + elem := rc.lruList.PushFront(value) + rc.hlvCache[key] = elem + rc.cache[legacyKey] = elem - // Purge oldest item if required - for len(rc.cache) > int(rc.capacity) { + for rc.lruList.Len() > int(rc.capacity) { rc.purgeOldest_() } rc.lock.Unlock() - + // store upsert value value.store(docRev) } @@ -272,9 +329,32 @@ func (rc *LRURevisionCache) getValue(docID, revID string, create bool) (value *r rc.lruList.MoveToFront(elem) value = elem.Value.(*revCacheValue) } else if create { - value = &revCacheValue{key: key} + value = &revCacheValue{id: docID, revID: revID} rc.cache[key] = rc.lruList.PushFront(value) - for len(rc.cache) > int(rc.capacity) { + for rc.lruList.Len() > int(rc.capacity) { + rc.purgeOldest_() + } + } + rc.lock.Unlock() + return +} + +// getValueByCV gets a value from rev cache by CV, if not found and create is true, will add the value to cache and both lookup maps +func (rc *LRURevisionCache) getValueByCV(docID string, cv *CurrentVersionVector, create bool) (value *revCacheValue) { + if docID == "" || cv == nil { + return nil + } + + key := IDandCV{DocID: docID, Source: cv.SourceID, Version: cv.VersionCAS} + rc.lock.Lock() + if elem := rc.hlvCache[key]; elem != nil { + rc.lruList.MoveToFront(elem) + value = elem.Value.(*revCacheValue) + } else if create { + value = &revCacheValue{id: docID, cv: *cv} + newElem := rc.lruList.PushFront(value) + rc.hlvCache[key] = newElem + for rc.lruList.Len() > int(rc.capacity) { rc.purgeOldest_() } } @@ -282,8 +362,93 @@ func (rc *LRURevisionCache) getValue(docID, revID string, create bool) (value *r return } +// addToRevMapPostLoad will generate and entry in the Rev lookup map for a new document entering the cache +func (rc *LRURevisionCache) addToRevMapPostLoad(docID, revID string, cv *CurrentVersionVector) { + legacyKey := IDAndRev{DocID: docID, RevID: revID} + key := IDandCV{DocID: docID, Source: cv.SourceID, Version: cv.VersionCAS} + + rc.lock.Lock() + defer rc.lock.Unlock() + // check for existing value in rev cache map (due to concurrent fetch by rev ID) + cvElem, cvFound := rc.hlvCache[key] + revElem, revFound := rc.cache[legacyKey] + if !cvFound { + // its possible the element has been evicted if we don't find the element above (high churn on rev cache) + // need to return doc revision to caller still but no need repopulate the cache + return + } + // Check if another goroutine has already updated the rev map + if revFound { + if cvElem == revElem { + // already match, return + return + } + // if CV map and rev map are targeting different list elements, update to have both use the cv map element + rc.cache[legacyKey] = cvElem + rc.lruList.Remove(revElem) + } else { + // if not found we need to add the element to the rev lookup (for PUT code path) + rc.cache[legacyKey] = cvElem + } +} + +// addToHLVMapPostLoad will generate and entry in the CV lookup map for a new document entering the cache +func (rc *LRURevisionCache) addToHLVMapPostLoad(docID, revID string, cv *CurrentVersionVector) { + legacyKey := IDAndRev{DocID: docID, RevID: revID} + key := IDandCV{DocID: docID, Source: cv.SourceID, Version: cv.VersionCAS} + + rc.lock.Lock() + defer rc.lock.Unlock() + // check for existing value in rev cache map (due to concurrent fetch by rev ID) + cvElem, cvFound := rc.hlvCache[key] + revElem, revFound := rc.cache[legacyKey] + if !revFound { + // its possible the element has been evicted if we don't find the element above (high churn on rev cache) + // need to return doc revision to caller still but no need repopulate the cache + return + } + // Check if another goroutine has already updated the cv map + if cvFound { + if cvElem == revElem { + // already match, return + return + } + // if CV map and rev map are targeting different list elements, update to have both use the cv map element + rc.cache[legacyKey] = cvElem + rc.lruList.Remove(revElem) + } +} + // Remove removes a value from the revision cache, if present. -func (rc *LRURevisionCache) Remove(docID, revID string) { +func (rc *LRURevisionCache) RemoveWithRev(docID, revID string) { + rc.removeFromCacheByRev(docID, revID) +} + +// RemoveWithCV removes a value from rev cache by CV reference if present +func (rc *LRURevisionCache) RemoveWithCV(docID string, cv *CurrentVersionVector) { + rc.removeFromCacheByCV(docID, cv) +} + +// removeFromCacheByCV removes an entry from rev cache by CV +func (rc *LRURevisionCache) removeFromCacheByCV(docID string, cv *CurrentVersionVector) { + key := IDandCV{DocID: docID, Source: cv.SourceID, Version: cv.VersionCAS} + rc.lock.Lock() + defer rc.lock.Unlock() + element, ok := rc.hlvCache[key] + if !ok { + return + } + // grab the revid key from the value to enable us to remove the reference from the rev lookup map too + elem := element.Value.(*revCacheValue) + legacyKey := IDAndRev{DocID: docID, RevID: elem.revID} + rc.lruList.Remove(element) + delete(rc.hlvCache, key) + // remove from rev lookup map too + delete(rc.cache, legacyKey) +} + +// removeFromCacheByRev removes an entry from rev cache by revID +func (rc *LRURevisionCache) removeFromCacheByRev(docID, revID string) { key := IDAndRev{DocID: docID, RevID: revID} rc.lock.Lock() defer rc.lock.Unlock() @@ -291,23 +456,38 @@ func (rc *LRURevisionCache) Remove(docID, revID string) { if !ok { return } + // grab the cv key key from the value to enable us to remove the reference from the rev lookup map too + elem := element.Value.(*revCacheValue) + hlvKey := IDandCV{DocID: docID, Source: elem.cv.SourceID, Version: elem.cv.VersionCAS} rc.lruList.Remove(element) delete(rc.cache, key) + // remove from CV lookup map too + delete(rc.hlvCache, hlvKey) } // removeValue removes a value from the revision cache, if present and the value matches the the value. If there's an item in the revision cache with a matching docID and revID but the document is different, this item will not be removed from the rev cache. func (rc *LRURevisionCache) removeValue(value *revCacheValue) { rc.lock.Lock() - if element := rc.cache[value.key]; element != nil && element.Value == value { + defer rc.lock.Unlock() + revKey := IDAndRev{DocID: value.id, RevID: value.revID} + if element := rc.cache[revKey]; element != nil && element.Value == value { rc.lruList.Remove(element) - delete(rc.cache, value.key) + delete(rc.cache, revKey) + } + // need to also check hlv lookup cache map + hlvKey := IDandCV{DocID: value.id, Source: value.cv.SourceID, Version: value.cv.VersionCAS} + if element := rc.hlvCache[hlvKey]; element != nil && element.Value == value { + rc.lruList.Remove(element) + delete(rc.hlvCache, hlvKey) } - rc.lock.Unlock() } func (rc *LRURevisionCache) purgeOldest_() { value := rc.lruList.Remove(rc.lruList.Back()).(*revCacheValue) - delete(rc.cache, value.key) + revKey := IDAndRev{DocID: value.id, RevID: value.revID} + hlvKey := IDandCV{DocID: value.id, Source: value.cv.SourceID, Version: value.cv.VersionCAS} + delete(rc.cache, revKey) + delete(rc.hlvCache, hlvKey) } // Gets the body etc. out of a revCacheValue. If they aren't present already, the loader func @@ -319,6 +499,8 @@ func (value *revCacheValue) load(ctx context.Context, backingStore RevisionCache // to reduce locking when includeDelta=false var delta *RevisionDelta var docRevBody Body + var fetchedCV *CurrentVersionVector + var revid string // Attempt to read cached value. value.lock.RLock() @@ -349,12 +531,24 @@ func (value *revCacheValue) load(ctx context.Context, backingStore RevisionCache // If body is requested and not already present in cache, populate value.body from value.BodyBytes if includeBody && value.body == nil && value.err == nil { if err := value.body.Unmarshal(value.bodyBytes); err != nil { - base.WarnfCtx(ctx, "Unable to marshal BodyBytes in revcache for %s %s", base.UD(value.key.DocID), value.key.RevID) + base.WarnfCtx(ctx, "Unable to marshal BodyBytes in revcache for %s %s", base.UD(value.id), value.revID) } } } else { cacheHit = false - value.bodyBytes, value.body, value.history, value.channels, value.removed, value.attachments, value.deleted, value.expiry, value.err = revCacheLoader(ctx, backingStore, value.key, includeBody) + if value.revID == "" { + hlvKey := IDandCV{DocID: value.id, Source: value.cv.SourceID, Version: value.cv.VersionCAS} + value.bodyBytes, value.body, value.history, value.channels, value.removed, value.attachments, value.deleted, value.expiry, revid, value.err = revCacheLoaderForCv(ctx, backingStore, hlvKey, includeBody) + // based off the current value load we need to populate the revid key with what has been fetched from the bucket (for use of populating the opposite lookup map) + value.revID = revid + } else { + revKey := IDAndRev{DocID: value.id, RevID: value.revID} + value.bodyBytes, value.body, value.history, value.channels, value.removed, value.attachments, value.deleted, value.expiry, fetchedCV, value.err = revCacheLoader(ctx, backingStore, revKey, includeBody) + // based off the revision load we need to populate the hlv key with what has been fetched from the bucket (for use of populating the opposite lookup map) + if fetchedCV != nil { + value.cv = *fetchedCV + } + } } if includeDelta { @@ -374,7 +568,7 @@ func (value *revCacheValue) updateBody(ctx context.Context) (err error) { var body Body if err := body.Unmarshal(value.bodyBytes); err != nil { // On unmarshal error, warn return docRev without body - base.WarnfCtx(ctx, "Unable to marshal BodyBytes in revcache for %s %s", base.UD(value.key.DocID), value.key.RevID) + base.WarnfCtx(ctx, "Unable to marshal BodyBytes in revcache for %s %s", base.UD(value.id), value.revID) return err } @@ -391,8 +585,8 @@ func (value *revCacheValue) updateBody(ctx context.Context) (err error) { func (value *revCacheValue) asDocumentRevision(body Body, delta *RevisionDelta) (DocumentRevision, error) { docRev := DocumentRevision{ - DocID: value.key.DocID, - RevID: value.key.RevID, + DocID: value.id, + RevID: value.revID, BodyBytes: value.bodyBytes, History: value.history, Channels: value.channels, @@ -400,6 +594,7 @@ func (value *revCacheValue) asDocumentRevision(body Body, delta *RevisionDelta) Attachments: value.attachments.ShallowCopy(), // Avoid caller mutating the stored attachments Deleted: value.deleted, Removed: value.removed, + CV: &CurrentVersionVector{VersionCAS: value.cv.VersionCAS, SourceID: value.cv.SourceID}, } if body != nil { docRev._shallowCopyBody = body.ShallowCopy() @@ -414,6 +609,8 @@ func (value *revCacheValue) asDocumentRevision(body Body, delta *RevisionDelta) func (value *revCacheValue) loadForDoc(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, includeBody bool) (docRev DocumentRevision, cacheHit bool, err error) { var docRevBody Body + var fetchedCV *CurrentVersionVector + var revid string value.lock.RLock() if value.bodyBytes != nil || value.err != nil { if includeBody { @@ -443,13 +640,22 @@ func (value *revCacheValue) loadForDoc(ctx context.Context, backingStore Revisio // If body is requested and not already present in cache, attempt to generate from bytes and insert into cache if includeBody && value.body == nil { if err := value.body.Unmarshal(value.bodyBytes); err != nil { - base.WarnfCtx(ctx, "Unable to marshal BodyBytes in revcache for %s %s", base.UD(value.key.DocID), value.key.RevID) + base.WarnfCtx(ctx, "Unable to marshal BodyBytes in revcache for %s %s", base.UD(value.id), value.revID) } } } else { cacheHit = false - value.bodyBytes, value.body, value.history, value.channels, value.removed, value.attachments, value.deleted, value.expiry, value.err = revCacheLoaderForDocument(ctx, backingStore, doc, value.key.RevID) + if value.revID == "" { + value.bodyBytes, value.body, value.history, value.channels, value.removed, value.attachments, value.deleted, value.expiry, revid, value.err = revCacheLoaderForDocumentCV(ctx, backingStore, doc, value.cv) + value.revID = revid + } else { + value.bodyBytes, value.body, value.history, value.channels, value.removed, value.attachments, value.deleted, value.expiry, fetchedCV, value.err = revCacheLoaderForDocument(ctx, backingStore, doc, value.revID) + if fetchedCV != nil { + value.cv = *fetchedCV + } + } } + if includeBody { docRevBody = value.body } @@ -462,7 +668,7 @@ func (value *revCacheValue) loadForDoc(ctx context.Context, backingStore Revisio func (value *revCacheValue) store(docRev DocumentRevision) { value.lock.Lock() if value.bodyBytes == nil { - // value already has doc id/rev id in key + value.revID = docRev.RevID value.bodyBytes = docRev.BodyBytes value.history = docRev.History value.channels = docRev.Channels diff --git a/db/revision_cache_test.go b/db/revision_cache_test.go index 1451d353d9..d5abbe6b97 100644 --- a/db/revision_cache_test.go +++ b/db/revision_cache_test.go @@ -50,6 +50,13 @@ func (t *testBackingStore) GetDocument(ctx context.Context, docid string, unmars Channels: base.SetOf("*"), }, } + doc.currentRevChannels = base.SetOf("*") + + doc.HLV = &HybridLogicalVector{ + SourceID: "test", + Version: 123, + } + return doc, nil } @@ -66,6 +73,19 @@ func (t *testBackingStore) getRevision(ctx context.Context, doc *Document, revid return bodyBytes, b, nil, err } +func (t *testBackingStore) getCurrentVersion(ctx context.Context, doc *Document) ([]byte, Body, AttachmentsMeta, error) { + t.getRevisionCounter.Add(1) + + b := Body{ + "testing": true, + BodyId: doc.ID, + BodyRev: doc.CurrentRev, + "current_version": &CurrentVersionVector{VersionCAS: doc.HLV.Version, SourceID: doc.HLV.SourceID}, + } + bodyBytes, err := base.JSONMarshal(b) + return bodyBytes, b, nil, err +} + type noopBackingStore struct{} func (*noopBackingStore) GetDocument(ctx context.Context, docid string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) { @@ -76,6 +96,10 @@ func (*noopBackingStore) getRevision(ctx context.Context, doc *Document, revid s return nil, nil, nil, nil } +func (*noopBackingStore) getCurrentVersion(ctx context.Context, doc *Document) ([]byte, Body, AttachmentsMeta, error) { + return nil, nil, nil, nil +} + // Tests the eviction from the LRURevisionCache func TestLRURevisionCacheEviction(t *testing.T) { cacheHitCounter, cacheMissCounter := base.SgwIntStat{}, base.SgwIntStat{} @@ -86,13 +110,13 @@ func TestLRURevisionCacheEviction(t *testing.T) { // Fill up the rev cache with the first 10 docs for docID := 0; docID < 10; docID++ { id := strconv.Itoa(docID) - cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: id, RevID: "1-abc", History: Revisions{"start": 1}}) + cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: id, RevID: "1-abc", CV: &CurrentVersionVector{VersionCAS: uint64(docID), SourceID: "test"}, History: Revisions{"start": 1}}) } // Get them back out for i := 0; i < 10; i++ { docID := strconv.Itoa(i) - docRev, err := cache.Get(ctx, docID, "1-abc", RevCacheOmitBody, RevCacheOmitDelta) + docRev, err := cache.GetWithRev(ctx, docID, "1-abc", RevCacheOmitBody, RevCacheOmitDelta) assert.NoError(t, err) assert.NotNil(t, docRev.BodyBytes, "nil body for %s", docID) assert.Equal(t, docID, docRev.DocID) @@ -103,7 +127,7 @@ func TestLRURevisionCacheEviction(t *testing.T) { // Add 3 more docs to the now full revcache for i := 10; i < 13; i++ { docID := strconv.Itoa(i) - cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: docID, RevID: "1-abc", History: Revisions{"start": 1}}) + cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: docID, RevID: "1-abc", CV: &CurrentVersionVector{VersionCAS: uint64(i), SourceID: "test"}, History: Revisions{"start": 1}}) } // Check that the first 3 docs were evicted @@ -120,7 +144,68 @@ func TestLRURevisionCacheEviction(t *testing.T) { // and check we can Get up to and including the last 3 we put in for i := 0; i < 10; i++ { id := strconv.Itoa(i + 3) - docRev, err := cache.Get(ctx, id, "1-abc", RevCacheOmitBody, RevCacheOmitDelta) + docRev, err := cache.GetWithRev(ctx, id, "1-abc", RevCacheOmitBody, RevCacheOmitDelta) + assert.NoError(t, err) + assert.NotNil(t, docRev.BodyBytes, "nil body for %s", id) + assert.Equal(t, id, docRev.DocID) + assert.Equal(t, int64(0), cacheMissCounter.Value()) + assert.Equal(t, prevCacheHitCount+int64(i)+1, cacheHitCounter.Value()) + } +} + +// TestLRURevisionCacheEvictionMixedRevAndCV: +// - Add 10 docs to the cache +// - Assert that the cache list and relevant lookup maps have correct lengths +// - Add 3 more docs +// - Assert that lookup maps and the cache list still only have 10 elements in +// - Perform a Get with CV specified on all 10 elements in the cache and assert we get a hit for each element and no misses, +// testing the eviction worked correct +// - Then do the same but for rev lookup +func TestLRURevisionCacheEvictionMixedRevAndCV(t *testing.T) { + cacheHitCounter, cacheMissCounter := base.SgwIntStat{}, base.SgwIntStat{} + cache := NewLRURevisionCache(10, &noopBackingStore{}, &cacheHitCounter, &cacheMissCounter) + + ctx := base.TestCtx(t) + + // Fill up the rev cache with the first 10 docs + for docID := 0; docID < 10; docID++ { + id := strconv.Itoa(docID) + cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: id, RevID: "1-abc", CV: &CurrentVersionVector{VersionCAS: uint64(docID), SourceID: "test"}, History: Revisions{"start": 1}}) + } + + // assert that the list has 10 elements along with both lookup maps + assert.Equal(t, 10, len(cache.hlvCache)) + assert.Equal(t, 10, len(cache.cache)) + assert.Equal(t, 10, cache.lruList.Len()) + + // Add 3 more docs to the now full rev cache to trigger eviction + for docID := 10; docID < 13; docID++ { + id := strconv.Itoa(docID) + cache.Put(ctx, DocumentRevision{BodyBytes: []byte(`{}`), DocID: id, RevID: "1-abc", CV: &CurrentVersionVector{VersionCAS: uint64(docID), SourceID: "test"}, History: Revisions{"start": 1}}) + } + // assert the cache and associated lookup maps only have 10 items in them (i.e.e is eviction working?) + assert.Equal(t, 10, len(cache.hlvCache)) + assert.Equal(t, 10, len(cache.cache)) + assert.Equal(t, 10, cache.lruList.Len()) + + // assert we can get a hit on all 10 elements in the cache by CV lookup + prevCacheHitCount := cacheHitCounter.Value() + for i := 0; i < 10; i++ { + id := strconv.Itoa(i + 3) + cv := CurrentVersionVector{VersionCAS: uint64(i + 3), SourceID: "test"} + docRev, err := cache.GetWithCV(ctx, id, &cv, RevCacheOmitBody, RevCacheOmitDelta) + assert.NoError(t, err) + assert.NotNil(t, docRev.BodyBytes, "nil body for %s", id) + assert.Equal(t, id, docRev.DocID) + assert.Equal(t, int64(0), cacheMissCounter.Value()) + assert.Equal(t, prevCacheHitCount+int64(i)+1, cacheHitCounter.Value()) + } + + // now do same but for rev lookup + prevCacheHitCount = cacheHitCounter.Value() + for i := 0; i < 10; i++ { + id := strconv.Itoa(i + 3) + docRev, err := cache.GetWithRev(ctx, id, "1-abc", RevCacheOmitBody, RevCacheOmitDelta) assert.NoError(t, err) assert.NotNil(t, docRev.BodyBytes, "nil body for %s", id) assert.Equal(t, id, docRev.DocID) @@ -135,7 +220,7 @@ func TestBackingStore(t *testing.T) { cache := NewLRURevisionCache(10, &testBackingStore{[]string{"Peter"}, &getDocumentCounter, &getRevisionCounter}, &cacheHitCounter, &cacheMissCounter) // Get Rev for the first time - miss cache, but fetch the doc and revision to store - docRev, err := cache.Get(base.TestCtx(t), "Jens", "1-abc", RevCacheOmitBody, RevCacheOmitDelta) + docRev, err := cache.GetWithRev(base.TestCtx(t), "Jens", "1-abc", RevCacheOmitBody, RevCacheOmitDelta) assert.NoError(t, err) assert.Equal(t, "Jens", docRev.DocID) assert.NotNil(t, docRev.History) @@ -146,7 +231,7 @@ func TestBackingStore(t *testing.T) { assert.Equal(t, int64(1), getRevisionCounter.Value()) // Doc doesn't exist, so miss the cache, and fail when getting the doc - docRev, err = cache.Get(base.TestCtx(t), "Peter", "1-abc", RevCacheOmitBody, RevCacheOmitDelta) + docRev, err = cache.GetWithRev(base.TestCtx(t), "Peter", "1-abc", RevCacheOmitBody, RevCacheOmitDelta) assertHTTPError(t, err, 404) assert.Nil(t, docRev.BodyBytes) assert.Equal(t, int64(0), cacheHitCounter.Value()) @@ -155,7 +240,7 @@ func TestBackingStore(t *testing.T) { assert.Equal(t, int64(1), getRevisionCounter.Value()) // Rev is already resident, but still issue GetDocument to check for later revisions - docRev, err = cache.Get(base.TestCtx(t), "Jens", "1-abc", RevCacheOmitBody, RevCacheOmitDelta) + docRev, err = cache.GetWithRev(base.TestCtx(t), "Jens", "1-abc", RevCacheOmitBody, RevCacheOmitDelta) assert.NoError(t, err) assert.Equal(t, "Jens", docRev.DocID) assert.NotNil(t, docRev.History) @@ -166,7 +251,60 @@ func TestBackingStore(t *testing.T) { assert.Equal(t, int64(1), getRevisionCounter.Value()) // Rev still doesn't exist, make sure it wasn't cached - docRev, err = cache.Get(base.TestCtx(t), "Peter", "1-abc", RevCacheOmitBody, RevCacheOmitDelta) + docRev, err = cache.GetWithRev(base.TestCtx(t), "Peter", "1-abc", RevCacheOmitBody, RevCacheOmitDelta) + assertHTTPError(t, err, 404) + assert.Nil(t, docRev.BodyBytes) + assert.Equal(t, int64(1), cacheHitCounter.Value()) + assert.Equal(t, int64(3), cacheMissCounter.Value()) + assert.Equal(t, int64(3), getDocumentCounter.Value()) + assert.Equal(t, int64(1), getRevisionCounter.Value()) +} + +// TestBackingStoreCV: +// - Perform a Get on a doc by cv that is not currently in the rev cache, assert we get cache miss +// - Perform a Get again on the same doc and assert we get cache hit +// - Perform a Get on doc that doesn't exist, so misses cache and will fail on retrieving doc from bucket +// - Try a Get again on the same doc and assert it wasn't loaded into the cache as it doesn't exist +func TestBackingStoreCV(t *testing.T) { + cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} + cache := NewLRURevisionCache(10, &testBackingStore{[]string{"not_found"}, &getDocumentCounter, &getRevisionCounter}, &cacheHitCounter, &cacheMissCounter) + + // Get Rev for the first time - miss cache, but fetch the doc and revision to store + cv := CurrentVersionVector{SourceID: "test", VersionCAS: 123} + docRev, err := cache.GetWithCV(base.TestCtx(t), "doc1", &cv, RevCacheOmitBody, RevCacheOmitDelta) + assert.NoError(t, err) + assert.Equal(t, "doc1", docRev.DocID) + assert.NotNil(t, docRev.Channels) + assert.Equal(t, "test", docRev.CV.SourceID) + assert.Equal(t, uint64(123), docRev.CV.VersionCAS) + assert.Equal(t, int64(0), cacheHitCounter.Value()) + assert.Equal(t, int64(1), cacheMissCounter.Value()) + assert.Equal(t, int64(1), getDocumentCounter.Value()) + assert.Equal(t, int64(1), getRevisionCounter.Value()) + + // Perform a get on the same doc as above, check that we get cache hit + docRev, err = cache.GetWithCV(base.TestCtx(t), "doc1", &cv, RevCacheOmitBody, RevCacheOmitDelta) + assert.NoError(t, err) + assert.Equal(t, "doc1", docRev.DocID) + assert.Equal(t, "test", docRev.CV.SourceID) + assert.Equal(t, uint64(123), docRev.CV.VersionCAS) + assert.Equal(t, int64(1), cacheHitCounter.Value()) + assert.Equal(t, int64(1), cacheMissCounter.Value()) + assert.Equal(t, int64(1), getDocumentCounter.Value()) + assert.Equal(t, int64(1), getRevisionCounter.Value()) + + // Doc doesn't exist, so miss the cache, and fail when getting the doc + cv = CurrentVersionVector{SourceID: "test11", VersionCAS: 100} + docRev, err = cache.GetWithCV(base.TestCtx(t), "not_found", &cv, RevCacheOmitBody, RevCacheOmitDelta) + assertHTTPError(t, err, 404) + assert.Nil(t, docRev.BodyBytes) + assert.Equal(t, int64(1), cacheHitCounter.Value()) + assert.Equal(t, int64(2), cacheMissCounter.Value()) + assert.Equal(t, int64(2), getDocumentCounter.Value()) + assert.Equal(t, int64(1), getRevisionCounter.Value()) + + // Rev still doesn't exist, make sure it wasn't cached + docRev, err = cache.GetWithCV(base.TestCtx(t), "not_found", &cv, RevCacheOmitBody, RevCacheOmitDelta) assertHTTPError(t, err, 404) assert.Nil(t, docRev.BodyBytes) assert.Equal(t, int64(1), cacheHitCounter.Value()) @@ -255,15 +393,15 @@ func TestBypassRevisionCache(t *testing.T) { assert.False(t, ok) // Get non-existing doc - _, err = rc.Get(base.TestCtx(t), "invalid", rev1, RevCacheOmitBody, RevCacheOmitDelta) + _, err = rc.GetWithRev(base.TestCtx(t), "invalid", rev1, RevCacheOmitBody, RevCacheOmitDelta) assert.True(t, base.IsDocNotFoundError(err)) // Get non-existing revision - _, err = rc.Get(base.TestCtx(t), key, "3-abc", RevCacheOmitBody, RevCacheOmitDelta) + _, err = rc.GetWithRev(base.TestCtx(t), key, "3-abc", RevCacheOmitBody, RevCacheOmitDelta) assertHTTPError(t, err, 404) // Get specific revision - doc, err := rc.Get(base.TestCtx(t), key, rev1, RevCacheOmitBody, RevCacheOmitDelta) + doc, err := rc.GetWithRev(base.TestCtx(t), key, rev1, RevCacheOmitBody, RevCacheOmitDelta) assert.NoError(t, err) require.NotNil(t, doc) assert.Equal(t, `{"value":1234}`, string(doc.BodyBytes)) @@ -350,7 +488,7 @@ func TestPutExistingRevRevisionCacheAttachmentProperty(t *testing.T) { "value": 1235, BodyAttachments: map[string]interface{}{"myatt": map[string]interface{}{"content_type": "text/plain", "data": "SGVsbG8gV29ybGQh"}}, } - _, _, err = collection.PutExistingRevWithBody(ctx, docKey, rev2body, []string{rev2id, rev1id}, false) + _, _, err = collection.PutExistingRevWithBody(ctx, docKey, rev2body, []string{rev2id, rev1id}, false, ExistingVersionWithUpdateToHLV) assert.NoError(t, err, "Unexpected error calling collection.PutExistingRev") // Get the raw document directly from the bucket, validate _attachments property isn't found @@ -361,7 +499,7 @@ func TestPutExistingRevRevisionCacheAttachmentProperty(t *testing.T) { assert.False(t, ok, "_attachments property still present in document body retrieved from bucket: %#v", bucketBody) // Get the raw document directly from the revcache, validate _attachments property isn't found - docRevision, err := collection.revisionCache.Get(base.TestCtx(t), docKey, rev2id, RevCacheOmitBody, RevCacheOmitDelta) + docRevision, err := collection.revisionCache.GetWithRev(base.TestCtx(t), docKey, rev2id, RevCacheOmitBody, RevCacheOmitDelta) assert.NoError(t, err, "Unexpected error calling collection.revisionCache.Get") assert.NotContains(t, docRevision.BodyBytes, BodyAttachments, "_attachments property still present in document body retrieved from rev cache: %#v", bucketBody) _, ok = docRevision.Attachments["myatt"] @@ -388,12 +526,12 @@ func TestRevisionImmutableDelta(t *testing.T) { secondDelta := []byte("modified delta") // Trigger load into cache - _, err := cache.Get(base.TestCtx(t), "doc1", "1-abc", RevCacheOmitBody, RevCacheIncludeDelta) + _, err := cache.GetWithRev(base.TestCtx(t), "doc1", "1-abc", RevCacheOmitBody, RevCacheIncludeDelta) assert.NoError(t, err, "Error adding to cache") cache.UpdateDelta(base.TestCtx(t), "doc1", "1-abc", RevisionDelta{ToRevID: "rev2", DeltaBytes: firstDelta}) // Retrieve from cache - retrievedRev, err := cache.Get(base.TestCtx(t), "doc1", "1-abc", RevCacheOmitBody, RevCacheIncludeDelta) + retrievedRev, err := cache.GetWithRev(base.TestCtx(t), "doc1", "1-abc", RevCacheOmitBody, RevCacheIncludeDelta) assert.NoError(t, err, "Error retrieving from cache") assert.Equal(t, "rev2", retrievedRev.Delta.ToRevID) assert.Equal(t, firstDelta, retrievedRev.Delta.DeltaBytes) @@ -404,7 +542,7 @@ func TestRevisionImmutableDelta(t *testing.T) { assert.Equal(t, firstDelta, retrievedRev.Delta.DeltaBytes) // Retrieve again, validate delta is correct - updatedRev, err := cache.Get(base.TestCtx(t), "doc1", "1-abc", RevCacheOmitBody, RevCacheIncludeDelta) + updatedRev, err := cache.GetWithRev(base.TestCtx(t), "doc1", "1-abc", RevCacheOmitBody, RevCacheIncludeDelta) assert.NoError(t, err, "Error retrieving from cache") assert.Equal(t, "rev3", updatedRev.Delta.ToRevID) assert.Equal(t, secondDelta, updatedRev.Delta.DeltaBytes) @@ -419,8 +557,8 @@ func TestSingleLoad(t *testing.T) { cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} cache := NewLRURevisionCache(10, &testBackingStore{nil, &getDocumentCounter, &getRevisionCounter}, &cacheHitCounter, &cacheMissCounter) - cache.Put(base.TestCtx(t), DocumentRevision{BodyBytes: []byte(`{"test":"1234"}`), DocID: "doc123", RevID: "1-abc", History: Revisions{"start": 1}}) - _, err := cache.Get(base.TestCtx(t), "doc123", "1-abc", true, false) + cache.Put(base.TestCtx(t), DocumentRevision{BodyBytes: []byte(`{"test":"1234"}`), DocID: "doc123", RevID: "1-abc", CV: &CurrentVersionVector{VersionCAS: uint64(123), SourceID: "test"}, History: Revisions{"start": 1}}) + _, err := cache.GetWithRev(base.TestCtx(t), "doc123", "1-abc", true, false) assert.NoError(t, err) } @@ -429,14 +567,14 @@ func TestConcurrentLoad(t *testing.T) { cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} cache := NewLRURevisionCache(10, &testBackingStore{nil, &getDocumentCounter, &getRevisionCounter}, &cacheHitCounter, &cacheMissCounter) - cache.Put(base.TestCtx(t), DocumentRevision{BodyBytes: []byte(`{"test":"1234"}`), DocID: "doc1", RevID: "1-abc", History: Revisions{"start": 1}}) + cache.Put(base.TestCtx(t), DocumentRevision{BodyBytes: []byte(`{"test":"1234"}`), DocID: "doc1", RevID: "1-abc", CV: &CurrentVersionVector{VersionCAS: uint64(1234), SourceID: "test"}, History: Revisions{"start": 1}}) // Trigger load into cache var wg sync.WaitGroup wg.Add(20) for i := 0; i < 20; i++ { go func() { - _, err := cache.Get(base.TestCtx(t), "doc1", "1-abc", true, false) + _, err := cache.GetWithRev(base.TestCtx(t), "doc1", "1-abc", true, false) assert.NoError(t, err) wg.Done() }() @@ -454,14 +592,14 @@ func TestRevisionCacheRemove(t *testing.T) { rev1id, _, err := collection.Put(ctx, "doc", Body{"val": 123}) assert.NoError(t, err) - docRev, err := collection.revisionCache.Get(base.TestCtx(t), "doc", rev1id, true, true) + docRev, err := collection.revisionCache.GetWithRev(base.TestCtx(t), "doc", rev1id, true, true) assert.NoError(t, err) assert.Equal(t, rev1id, docRev.RevID) assert.Equal(t, int64(0), db.DbStats.Cache().RevisionCacheMisses.Value()) - collection.revisionCache.Remove("doc", rev1id) + collection.revisionCache.RemoveWithRev("doc", rev1id) - docRev, err = collection.revisionCache.Get(base.TestCtx(t), "doc", rev1id, true, true) + docRev, err = collection.revisionCache.GetWithRev(base.TestCtx(t), "doc", rev1id, true, true) assert.NoError(t, err) assert.Equal(t, rev1id, docRev.RevID) assert.Equal(t, int64(1), db.DbStats.Cache().RevisionCacheMisses.Value()) @@ -482,6 +620,59 @@ func TestRevisionCacheRemove(t *testing.T) { assert.Equal(t, int64(1), db.DbStats.Cache().RevisionCacheMisses.Value()) } +// TestRevCacheOperationsCV: +// - Create doc revision, put the revision into the cache +// - Perform a get on that doc by cv and assert that it has correctly been handled +// - Updated doc revision and upsert the cache +// - Get the updated doc by cv and assert iot has been correctly handled +// - Peek the doc by cv and assert it has been found +// - Peek the rev id cache for the same doc and assert that doc also has been updated in that lookup cache +// - Remove the doc by cv, and asser that the doc is gone +func TestRevCacheOperationsCV(t *testing.T) { + cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} + cache := NewLRURevisionCache(10, &testBackingStore{[]string{"test_doc"}, &getDocumentCounter, &getRevisionCounter}, &cacheHitCounter, &cacheMissCounter) + + cv := CurrentVersionVector{SourceID: "test", VersionCAS: 123} + documentRevision := DocumentRevision{ + DocID: "doc1", + RevID: "1-abc", + BodyBytes: []byte(`{"test":"1234"}`), + Channels: base.SetOf("chan1"), + History: Revisions{"start": 1}, + CV: &cv, + } + cache.Put(base.TestCtx(t), documentRevision) + + docRev, err := cache.GetWithCV(base.TestCtx(t), "doc1", &cv, RevCacheOmitBody, RevCacheOmitDelta) + require.NoError(t, err) + assert.Equal(t, "doc1", docRev.DocID) + assert.Equal(t, base.SetOf("chan1"), docRev.Channels) + assert.Equal(t, "test", docRev.CV.SourceID) + assert.Equal(t, uint64(123), docRev.CV.VersionCAS) + assert.Equal(t, int64(1), cacheHitCounter.Value()) + assert.Equal(t, int64(0), cacheMissCounter.Value()) + + documentRevision.BodyBytes = []byte(`{"test":"12345"}`) + + cache.Upsert(base.TestCtx(t), documentRevision) + + docRev, err = cache.GetWithCV(base.TestCtx(t), "doc1", &cv, RevCacheOmitBody, RevCacheOmitDelta) + require.NoError(t, err) + assert.Equal(t, "doc1", docRev.DocID) + assert.Equal(t, base.SetOf("chan1"), docRev.Channels) + assert.Equal(t, "test", docRev.CV.SourceID) + assert.Equal(t, uint64(123), docRev.CV.VersionCAS) + assert.Equal(t, []byte(`{"test":"12345"}`), docRev.BodyBytes) + assert.Equal(t, int64(2), cacheHitCounter.Value()) + assert.Equal(t, int64(0), cacheMissCounter.Value()) + + // remove the doc rev from the cache and assert that the document is no longer present in cache + cache.RemoveWithCV("doc1", &cv) + assert.Equal(t, 0, len(cache.cache)) + assert.Equal(t, 0, len(cache.hlvCache)) + assert.Equal(t, 0, cache.lruList.Len()) +} + func BenchmarkRevisionCacheRead(b *testing.B) { base.SetUpBenchmarkLogging(b, base.LevelDebug, base.KeyAll) @@ -492,7 +683,7 @@ func BenchmarkRevisionCacheRead(b *testing.B) { // trigger load into cache for i := 0; i < 5000; i++ { - _, _ = cache.Get(ctx, fmt.Sprintf("doc%d", i), "1-abc", RevCacheOmitBody, RevCacheOmitDelta) + _, _ = cache.GetWithRev(ctx, fmt.Sprintf("doc%d", i), "1-abc", RevCacheOmitBody, RevCacheOmitDelta) } b.ResetTimer() @@ -500,7 +691,147 @@ func BenchmarkRevisionCacheRead(b *testing.B) { // GET the document until test run has completed for pb.Next() { docId := fmt.Sprintf("doc%d", rand.Intn(5000)) - _, _ = cache.Get(ctx, docId, "1-abc", RevCacheOmitBody, RevCacheOmitDelta) + _, _ = cache.GetWithRev(ctx, docId, "1-abc", RevCacheOmitBody, RevCacheOmitDelta) } }) } + +// TestLoaderMismatchInCV: +// - Get doc that is not in cache by CV to trigger a load from bucket +// - Ensure the CV passed into teh GET operation won't match the doc in teh bucket +// - Assert we get error and the value is not loaded into the cache +func TestLoaderMismatchInCV(t *testing.T) { + cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} + cache := NewLRURevisionCache(10, &testBackingStore{[]string{"test_doc"}, &getDocumentCounter, &getRevisionCounter}, &cacheHitCounter, &cacheMissCounter) + + // create cv with incorrect version to the one stored in backing store + cv := CurrentVersionVector{SourceID: "test", VersionCAS: 1234} + + _, err := cache.GetWithCV(base.TestCtx(t), "doc1", &cv, RevCacheOmitBody, RevCacheOmitDelta) + require.Error(t, err) + assert.ErrorContains(t, err, "mismatch between specified current version and fetched document current version for doc") + assert.Equal(t, int64(0), cacheHitCounter.Value()) + assert.Equal(t, int64(1), cacheMissCounter.Value()) + assert.Equal(t, 0, cache.lruList.Len()) + assert.Equal(t, 0, len(cache.hlvCache)) + assert.Equal(t, 0, len(cache.cache)) +} + +// TestConcurrentLoadByCVAndRevOnCache: +// - Create cache +// - Now perform two concurrent Gets, one by CV and one by revid on a document that doesn't exist in the cache +// - This will trigger two concurrent loads from bucket in the CV code path and revid code path +// - In doing so we will have two processes trying to update lookup maps at the same time and a race condition will appear +// - In doing so will cause us to potentially have two of teh same elements the cache, one with nothing referencing it +// - Assert after both gets are processed, that the cache only has one element in it and that both lookup maps have only one +// element +// - Grab the single element in the list and assert that both maps point to that element in the cache list +func TestConcurrentLoadByCVAndRevOnCache(t *testing.T) { + cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} + cache := NewLRURevisionCache(10, &testBackingStore{[]string{"test_doc"}, &getDocumentCounter, &getRevisionCounter}, &cacheHitCounter, &cacheMissCounter) + + ctx := base.TestCtx(t) + + wg := sync.WaitGroup{} + wg.Add(2) + + cv := CurrentVersionVector{SourceID: "test", VersionCAS: 123} + go func() { + _, err := cache.GetWithRev(ctx, "doc1", "1-abc", RevCacheOmitBody, RevCacheIncludeDelta) + require.NoError(t, err) + wg.Done() + }() + + go func() { + _, err := cache.GetWithCV(ctx, "doc1", &cv, RevCacheOmitBody, RevCacheIncludeDelta) + require.NoError(t, err) + wg.Done() + }() + + wg.Wait() + + revElement := cache.cache[IDAndRev{RevID: "1-abc", DocID: "doc1"}] + cvElement := cache.hlvCache[IDandCV{DocID: "doc1", Source: "test", Version: 123}] + assert.Equal(t, 1, cache.lruList.Len()) + assert.Equal(t, 1, len(cache.cache)) + assert.Equal(t, 1, len(cache.hlvCache)) + // grab the single elem in the cache list + cacheElem := cache.lruList.Front() + // assert that both maps point to the same element in cache list + assert.Equal(t, cacheElem, cvElement) + assert.Equal(t, cacheElem, revElement) +} + +// TestGetActive: +// - Create db, create a doc on the db +// - Call GetActive pn the rev cache and assert that the rev and cv are correct +func TestGetActive(t *testing.T) { + db, ctx := setupTestDB(t) + defer db.Close(ctx) + collection := GetSingleDatabaseCollectionWithUser(t, db) + + rev1id, doc, err := collection.Put(ctx, "doc", Body{"val": 123}) + require.NoError(t, err) + + expectedCV := CurrentVersionVector{ + SourceID: db.BucketUUID, + VersionCAS: doc.Cas, + } + + // remove the entry form the rev cache to force teh cache to not have the active version in it + collection.revisionCache.RemoveWithCV("doc", &expectedCV) + + // call get active to get teh active version from the bucket + docRev, err := collection.revisionCache.GetActive(base.TestCtx(t), "doc", true) + assert.NoError(t, err) + assert.Equal(t, rev1id, docRev.RevID) + assert.Equal(t, expectedCV, *docRev.CV) +} + +// TestConcurrentPutAndGetOnRevCache: +// - Perform a Get with rev on the cache for a doc not in the cache +// - Concurrently perform a PUT on the cache with doc revision the same as the GET +// - Assert we get consistent cache with only 1 entry in lookup maps and the cache itself +func TestConcurrentPutAndGetOnRevCache(t *testing.T) { + cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{} + cache := NewLRURevisionCache(10, &testBackingStore{[]string{"test_doc"}, &getDocumentCounter, &getRevisionCounter}, &cacheHitCounter, &cacheMissCounter) + + ctx := base.TestCtx(t) + + wg := sync.WaitGroup{} + wg.Add(2) + + cv := CurrentVersionVector{SourceID: "test", VersionCAS: 123} + docRev := DocumentRevision{ + DocID: "doc1", + RevID: "1-abc", + BodyBytes: []byte(`{"test":"1234"}`), + Channels: base.SetOf("chan1"), + History: Revisions{"start": 1}, + CV: &cv, + } + + go func() { + _, err := cache.GetWithRev(ctx, "doc1", "1-abc", RevCacheOmitBody, RevCacheIncludeDelta) + require.NoError(t, err) + wg.Done() + }() + + go func() { + cache.Put(ctx, docRev) + wg.Done() + }() + + wg.Wait() + + revElement := cache.cache[IDAndRev{RevID: "1-abc", DocID: "doc1"}] + cvElement := cache.hlvCache[IDandCV{DocID: "doc1", Source: "test", Version: 123}] + + assert.Equal(t, 1, cache.lruList.Len()) + assert.Equal(t, 1, len(cache.cache)) + assert.Equal(t, 1, len(cache.hlvCache)) + cacheElem := cache.lruList.Front() + // assert that both maps point to the same element in cache list + assert.Equal(t, cacheElem, cvElement) + assert.Equal(t, cacheElem, revElement) +} diff --git a/db/revision_test.go b/db/revision_test.go index 683e477a4d..5601dd4eda 100644 --- a/db/revision_test.go +++ b/db/revision_test.go @@ -131,7 +131,7 @@ func TestBackupOldRevision(t *testing.T) { // create rev 2 and check backups for both revs rev2ID := "2-abc" - _, _, err = collection.PutExistingRevWithBody(ctx, docID, Body{"test": true, "updated": true}, []string{rev2ID, rev1ID}, true) + _, _, err = collection.PutExistingRevWithBody(ctx, docID, Body{"test": true, "updated": true}, []string{rev2ID, rev1ID}, true, ExistingVersionWithUpdateToHLV) require.NoError(t, err) // now in all cases we'll have rev 1 backed up (for at least 5 minutes) diff --git a/rest/api_test.go b/rest/api_test.go index 5a0a0d71ac..c8c53ccf7a 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -2746,6 +2746,97 @@ func TestNullDocHandlingForMutable1xBody(t *testing.T) { assert.Contains(t, err.Error(), "null doc body for doc") } +// TestPutDocUpdateVersionVector: +// - Put a doc and assert that the versions and the source for the hlv is correctly updated +// - Update that doc and assert HLV has also been updated +// - Delete the doc and assert that the HLV has been updated in deletion event +func TestPutDocUpdateVersionVector(t *testing.T) { + rt := NewRestTester(t, nil) + defer rt.Close() + + bucketUUID, err := rt.GetDatabase().Bucket.UUID() + require.NoError(t, err) + + resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc1", `{"key": "value"}`) + RequireStatus(t, resp, http.StatusCreated) + + syncData, err := rt.GetSingleTestDatabaseCollection().GetDocSyncData(base.TestCtx(t), "doc1") + assert.NoError(t, err) + uintCAS := base.HexCasToUint64(syncData.Cas) + + assert.Equal(t, bucketUUID, syncData.HLV.SourceID) + assert.Equal(t, uintCAS, syncData.HLV.Version) + assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + + // Put a new revision of this doc and assert that the version vector SourceID and Version is updated + resp = rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc1?rev="+syncData.CurrentRev, `{"key1": "value1"}`) + RequireStatus(t, resp, http.StatusCreated) + + syncData, err = rt.GetSingleTestDatabaseCollection().GetDocSyncData(base.TestCtx(t), "doc1") + assert.NoError(t, err) + uintCAS = base.HexCasToUint64(syncData.Cas) + + assert.Equal(t, bucketUUID, syncData.HLV.SourceID) + assert.Equal(t, uintCAS, syncData.HLV.Version) + assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + + // Delete doc and assert that the version vector SourceID and Version is updated + resp = rt.SendAdminRequest(http.MethodDelete, "/{{.keyspace}}/doc1?rev="+syncData.CurrentRev, "") + RequireStatus(t, resp, http.StatusOK) + + syncData, err = rt.GetSingleTestDatabaseCollection().GetDocSyncData(base.TestCtx(t), "doc1") + assert.NoError(t, err) + uintCAS = base.HexCasToUint64(syncData.Cas) + + assert.Equal(t, bucketUUID, syncData.HLV.SourceID) + assert.Equal(t, uintCAS, syncData.HLV.Version) + assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) +} + +// TestHLVOnPutWithImportRejection: +// - Put a doc successfully and assert the HLV is updated correctly +// - Put a doc that will be rejected by the custom import filter +// - Assert that the HLV values on the sync data are still correctly updated/preserved +func TestHLVOnPutWithImportRejection(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeyImport) + importFilter := `function (doc) { return doc.type == "mobile"}` + rtConfig := RestTesterConfig{ + DatabaseConfig: &DatabaseConfig{DbConfig: DbConfig{ + AutoImport: false, + ImportFilter: &importFilter, + }}, + } + rt := NewRestTester(t, &rtConfig) + defer rt.Close() + + bucketUUID, err := rt.GetDatabase().Bucket.UUID() + require.NoError(t, err) + + resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc1", `{"type": "mobile"}`) + RequireStatus(t, resp, http.StatusCreated) + + syncData, err := rt.GetSingleTestDatabaseCollection().GetDocSyncData(base.TestCtx(t), "doc1") + assert.NoError(t, err) + uintCAS := base.HexCasToUint64(syncData.Cas) + + assert.Equal(t, bucketUUID, syncData.HLV.SourceID) + assert.Equal(t, uintCAS, syncData.HLV.Version) + assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + + // Put a doc that will be rejected by the import filter on the attempt to perform on demand import for write + resp = rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc2", `{"type": "not-mobile"}`) + RequireStatus(t, resp, http.StatusCreated) + + // assert that the hlv is correctly updated and in tact after the import was cancelled on the doc + syncData, err = rt.GetSingleTestDatabaseCollection().GetDocSyncData(base.TestCtx(t), "doc2") + assert.NoError(t, err) + uintCAS = base.HexCasToUint64(syncData.Cas) + + assert.Equal(t, bucketUUID, syncData.HLV.SourceID) + assert.Equal(t, uintCAS, syncData.HLV.Version) + assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) +} + func TestTombstoneCompactionAPI(t *testing.T) { rt := NewRestTester(t, nil) defer rt.Close() diff --git a/rest/bulk_api.go b/rest/bulk_api.go index 83358e7c49..15f6b731a6 100644 --- a/rest/bulk_api.go +++ b/rest/bulk_api.go @@ -511,7 +511,7 @@ func (h *handler) handleBulkDocs() error { err = base.HTTPErrorf(http.StatusBadRequest, "Bad _revisions") } else { revid = revisions[0] - _, _, err = h.collection.PutExistingRevWithBody(h.ctx(), docid, doc, revisions, false) + _, _, err = h.collection.PutExistingRevWithBody(h.ctx(), docid, doc, revisions, false, db.ExistingVersionWithUpdateToHLV) } } diff --git a/rest/doc_api.go b/rest/doc_api.go index 4c278e8f0c..d7ca12924e 100644 --- a/rest/doc_api.go +++ b/rest/doc_api.go @@ -471,7 +471,7 @@ func (h *handler) handlePutDoc() error { if revisions == nil { return base.HTTPErrorf(http.StatusBadRequest, "Bad _revisions") } - doc, newRev, err = h.collection.PutExistingRevWithBody(h.ctx(), docid, body, revisions, false) + doc, newRev, err = h.collection.PutExistingRevWithBody(h.ctx(), docid, body, revisions, false, db.ExistingVersionWithUpdateToHLV) if err != nil { return err } @@ -548,7 +548,7 @@ func (h *handler) handlePutDocReplicator2(docid string, roundTrip bool) (err err newDoc.UpdateBody(body) } - doc, rev, err := h.collection.PutExistingRev(h.ctx(), newDoc, history, true, false, nil) + doc, rev, err := h.collection.PutExistingRev(h.ctx(), newDoc, history, true, false, nil, db.ExistingVersionWithUpdateToHLV) if err != nil { return err diff --git a/rest/importtest/import_test.go b/rest/importtest/import_test.go index d76dcde275..a2ea063a8b 100644 --- a/rest/importtest/import_test.go +++ b/rest/importtest/import_test.go @@ -424,6 +424,9 @@ func TestXattrDoubleDelete(t *testing.T) { } func TestViewQueryTombstoneRetrieval(t *testing.T) { + t.Skip("Disabled pending CBG-3503") + base.SkipImportTestsIfNotEnabled(t) + if !base.TestsDisableGSI() { t.Skip("views tests are not applicable under GSI") } diff --git a/rest/replicatortest/replicator_test.go b/rest/replicatortest/replicator_test.go index 13592893e9..5660e773f7 100644 --- a/rest/replicatortest/replicator_test.go +++ b/rest/replicatortest/replicator_test.go @@ -8309,3 +8309,46 @@ func requireBodyEqual(t *testing.T, expected string, doc *db.Document) { require.NoError(t, base.JSONUnmarshal([]byte(expected), &expectedBody)) require.Equal(t, expectedBody, doc.Body(base.TestCtx(t))) } + +// TestReplicatorUpdateHLVOnPut: +// - For purpose of testing the PutExistingRev code path +// - Put a doc on a active rest tester +// - Create replication and wait for the doc to be replicated to passive node +// - Assert on the HLV in the metadata of the replicated document +func TestReplicatorUpdateHLVOnPut(t *testing.T) { + + activeRT, passiveRT, remoteURL, teardown := rest.SetupSGRPeers(t) + defer teardown() + + // Grab the bucket UUIDs for both rest testers + activeBucketUUID, err := activeRT.GetDatabase().Bucket.UUID() + require.NoError(t, err) + + const rep = "replication" + + // Put a doc and assert on the HLV update in the sync data + resp := activeRT.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc1", `{"source": "activeRT"}`) + rest.RequireStatus(t, resp, http.StatusCreated) + + syncData, err := activeRT.GetSingleTestDatabaseCollection().GetDocSyncData(base.TestCtx(t), "doc1") + assert.NoError(t, err) + uintCAS := base.HexCasToUint64(syncData.Cas) + + assert.Equal(t, activeBucketUUID, syncData.HLV.SourceID) + assert.Equal(t, uintCAS, syncData.HLV.Version) + assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) + + // create the replication to push the doc to the passive node and wait for the doc to be replicated + activeRT.CreateReplication(rep, remoteURL, db.ActiveReplicatorTypePush, nil, false, db.ConflictResolverDefault) + + _, err = passiveRT.WaitForChanges(1, "/{{.keyspace}}/_changes", "", true) + require.NoError(t, err) + + // assert on the HLV update on the passive node + syncData, err = passiveRT.GetSingleTestDatabaseCollection().GetDocSyncData(base.TestCtx(t), "doc1") + assert.NoError(t, err) + uintCAS = base.HexCasToUint64(syncData.Cas) + + // TODO: assert that the SourceID and Verison pair are preserved correctly pending CBG-3211 + assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) +}