Skip to content

Commit

Permalink
CBG-3211: Add PutExistingRev for HLV (#6515)
Browse files Browse the repository at this point in the history
* CBG-3210: Updating HLV on Put And PutExistingRev (#6366)

* CBG-3209: Add cv index and retrieval for revision cache (#6491)

* CBG-3209: changes for retreival of a doc from the rev cache via CV with backwards compatability in mind

* fix failing test, add commnets

* fix lint

* updated to address comments

* rebase chnages needed

* updated to tests that call Get on revision cache

* updates based of new direction with PR + addressing comments

* updated to fix panic

* updated to fix another panic

* address comments

* updates based off commnets

* remove commnented out line

* updates to skip test relying on import and update PutExistingRev doc update type to update HLV

* updates to remove code adding rev id to value inside addToRevMapPostLoad. Added code to assign this inside value.store

* remove redundent code

* Add support for PutExistingCurrentVersion

* updated to remove function not used anymore

* remove duplicated code from dev time

* fix linter errors + add assertions on body of doc update

* address commnets

* updates to add further test cases for AddNewerVersions function + fix some incorrect logic

* updates to chnage helper function for creation of doc for tests. Also adress further comments

* lint error

* address comments, add new merge function for merge versions when hlv is in conflict.

* updates to remove test case and test

* remove unused function

* rebase

* missed current version name change

* more missing updates to name changes
  • Loading branch information
gregns1 committed Dec 1, 2023
1 parent 89af84e commit 071c677
Show file tree
Hide file tree
Showing 5 changed files with 397 additions and 5 deletions.
97 changes: 97 additions & 0 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,103 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod
return newRevID, doc, err
}

func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, docHLV HybridLogicalVector, existingDoc *sgbucket.BucketDocument) (doc *Document, cv *SourceAndVersion, newRevID string, err error) {
var matchRev string
if existingDoc != nil {
doc, unmarshalErr := unmarshalDocumentWithXattr(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattr, existingDoc.UserXattr, existingDoc.Cas, DocUnmarshalRev)
if unmarshalErr != nil {
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling exsiting doc")
}
matchRev = doc.CurrentRev
}
generation, _ := ParseRevID(ctx, matchRev)
if generation < 0 {
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Invalid revision ID")
}
generation++

docUpdateEvent := ExistingVersion
allowImport := db.UseXattrs()
doc, newRevID, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, newDoc.DocExpiry, nil, docUpdateEvent, existingDoc, func(doc *Document) (resultDoc *Document, resultAttachmentData AttachmentData, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
// (Be careful: this block can be invoked multiple times if there are races!)

var isSgWrite bool
var crc32Match bool

// Is this doc an sgWrite?
if doc != nil {
isSgWrite, crc32Match, _ = doc.IsSGWrite(ctx, nil)
if crc32Match {
db.dbStats().Database().Crc32MatchCount.Add(1)
}
}

// If the existing doc isn't an SG write, import prior to updating
if doc != nil && !isSgWrite && db.UseXattrs() {
err := db.OnDemandImportForWrite(ctx, newDoc.ID, doc, newDoc.Deleted)
if err != nil {
return nil, nil, false, nil, err
}
}

// Conflict check here
// if doc has no HLV defined this is a new doc we haven't seen before, skip conflict check
if doc.HLV == nil {
doc.HLV = &HybridLogicalVector{}
addNewerVersionsErr := doc.HLV.AddNewerVersions(docHLV)
if addNewerVersionsErr != nil {
return nil, nil, false, nil, addNewerVersionsErr
}
} else {
if !docHLV.IsInConflict(*doc.HLV) {
// update hlv for all newer incoming source version pairs
addNewerVersionsErr := doc.HLV.AddNewerVersions(docHLV)
if addNewerVersionsErr != nil {
return nil, nil, false, nil, addNewerVersionsErr
}
} else {
base.InfofCtx(ctx, base.KeyCRUD, "conflict detected between the two HLV's for doc %s", base.UD(doc.ID))
// cancel rest of update, HLV needs to be sent back to client with merge versions populated
return nil, nil, false, nil, base.HTTPErrorf(http.StatusConflict, "Document revision conflict")
}
}

// Process the attachments, replacing bodies with digests.
newAttachments, err := db.storeAttachments(ctx, doc, newDoc.DocAttachments, generation, matchRev, nil)
if err != nil {
return nil, nil, false, nil, err
}

// generate rev id for new arriving doc
strippedBody, _ := stripInternalProperties(newDoc._body)
encoding, err := base.JSONMarshalCanonical(strippedBody)
if err != nil {
return nil, nil, false, nil, err
}
newRev := CreateRevIDWithBytes(generation, matchRev, encoding)

if err := doc.History.addRevision(newDoc.ID, RevInfo{ID: newRev, Parent: matchRev, Deleted: newDoc.Deleted}); err != nil {
base.InfofCtx(ctx, base.KeyCRUD, "Failed to add revision ID: %s, for doc: %s, error: %v", newRev, base.UD(newDoc.ID), err)
return nil, nil, false, nil, base.ErrRevTreeAddRevFailure
}

newDoc.RevID = newRev

return newDoc, newAttachments, false, nil, nil
})

if doc != nil && doc.HLV != nil {
if cv == nil {
cv = &SourceAndVersion{}
}
source, version := doc.HLV.GetCurrentVersion()
cv.SourceID = source
cv.Version = version
}

return doc, cv, newRevID, err
}

// Adds an existing revision to a document along with its history (list of rev IDs.)
func (db *DatabaseCollectionWithUser) PutExistingRev(ctx context.Context, newDoc *Document, docHistory []string, noConflicts bool, forceAllConflicts bool, existingDoc *sgbucket.BucketDocument, docUpdateEvent DocUpdateType) (doc *Document, newRevID string, err error) {
return db.PutExistingRevWithConflictResolution(ctx, newDoc, docHistory, noConflicts, nil, forceAllConflicts, existingDoc, docUpdateEvent)
Expand Down
192 changes: 192 additions & 0 deletions db/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"encoding/json"
"log"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -1675,3 +1676,194 @@ func TestAssignSequenceReleaseLoop(t *testing.T) {
releasedSequenceCount := db.DbStats.Database().SequenceReleasedCount.Value() - startReleasedSequenceCount
assert.Equal(t, int64(expectedReleasedSequenceCount), releasedSequenceCount)
}

// TestPutExistingCurrentVersion:
// - Put a document in a db
// - Assert on the update to HLV after that PUT
// - Construct a HLV to represent the doc created locally being updated on a client
// - Call PutExistingCurrentVersion simulating doc update arriving over replicator
// - Assert that the doc's HLV in the bucket has been updated correctly with the CV, PV and cvCAS
func TestPutExistingCurrentVersion(t *testing.T) {
db, ctx := setupTestDB(t)
defer db.Close(ctx)

bucketUUID := db.BucketUUID
collection := GetSingleDatabaseCollectionWithUser(t, db)

// create a new doc
key := "doc1"
body := Body{"key1": "value1"}

rev, _, err := collection.Put(ctx, key, body)
require.NoError(t, err)

// assert on HLV on that above PUT
syncData, err := collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)
uintCAS := base.HexCasToUint64(syncData.Cas)
assert.Equal(t, bucketUUID, syncData.HLV.SourceID)
assert.Equal(t, uintCAS, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)

// store the cas version allocated to the above doc creation for creation of incoming HLV later in test
originalDocVersion := syncData.HLV.Version

// PUT an update to the above doc
body = Body{"key1": "value11"}
body[BodyRev] = rev
_, _, err = collection.Put(ctx, key, body)
require.NoError(t, err)

// grab the new version for the above update to assert against later in test
syncData, err = collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)
docUpdateVersion := syncData.HLV.Version

// construct a mock doc update coming over a replicator
body = Body{"key1": "value2"}
newDoc := createTestDocument(key, "", body, false, 0)

// construct a HLV that simulates a doc update happening on a client
// this means moving the current source version pair to PV and adding new sourceID and version pair to CV
pv := make(map[string]uint64)
pv[bucketUUID] = originalDocVersion
// create a version larger than the allocated version above
incomingVersion := docUpdateVersion + 10
incomingHLV := HybridLogicalVector{
SourceID: "test",
Version: incomingVersion,
PreviousVersions: pv,
}

// grab the raw doc from the bucket to pass into the PutExistingCurrentVersion function for the above simulation of
// doc update arriving over replicator
_, rawDoc, err := collection.GetDocumentWithRaw(ctx, key, DocUnmarshalSync)
require.NoError(t, err)

doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, rawDoc)
require.NoError(t, err)
// assert on returned CV
assert.Equal(t, "test", cv.SourceID)
assert.Equal(t, incomingVersion, cv.Version)
assert.Equal(t, []byte(`{"key1":"value2"}`), doc._rawBody)

// assert on the sync data from the above update to the doc
// CV should be equal to CV of update on client but the cvCAS should be updated with the new update and
// PV should contain the old CV pair
syncData, err = collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)
uintCAS = base.HexCasToUint64(syncData.Cas)

assert.Equal(t, "test", syncData.HLV.SourceID)
assert.Equal(t, incomingVersion, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
// update the pv map so we can assert we have correct pv map in HLV
pv[bucketUUID] = docUpdateVersion
assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv))
assert.Equal(t, "3-60b024c44c283b369116c2c2570e8088", syncData.CurrentRev)
}

// TestPutExistingCurrentVersionWithConflict:
// - Put a document in a db
// - Assert on the update to HLV after that PUT
// - Construct a HLV to represent the doc created locally being updated on a client
// - Call PutExistingCurrentVersion simulating doc update arriving over replicator
// - Assert conflict between the local HLV for the doc and the incoming mutation is correctly identified
// - Assert that the doc's HLV in the bucket hasn't been updated
func TestPutExistingCurrentVersionWithConflict(t *testing.T) {
db, ctx := setupTestDB(t)
defer db.Close(ctx)

bucketUUID := db.BucketUUID
collection := GetSingleDatabaseCollectionWithUser(t, db)

// create a new doc
key := "doc1"
body := Body{"key1": "value1"}

_, _, err := collection.Put(ctx, key, body)
require.NoError(t, err)

// assert on the HLV values after the above creation of the doc
syncData, err := collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)
uintCAS := base.HexCasToUint64(syncData.Cas)
assert.Equal(t, bucketUUID, syncData.HLV.SourceID)
assert.Equal(t, uintCAS, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)

// create a new doc update to simulate a doc update arriving over replicator from, client
body = Body{"key1": "value2"}
newDoc := createTestDocument(key, "", body, false, 0)
incomingHLV := HybridLogicalVector{
SourceID: "test",
Version: 1234,
}

// grab the raw doc from the bucket to pass into the PutExistingCurrentVersion function
_, rawDoc, err := collection.GetDocumentWithRaw(ctx, key, DocUnmarshalSync)
require.NoError(t, err)

// assert that a conflict is correctly identified and the resulting doc and cv are nil
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, rawDoc)
require.Error(t, err)
assert.ErrorContains(t, err, "Document revision conflict")
assert.Nil(t, cv)
assert.Nil(t, doc)

// assert persisted doc hlv hasn't been updated
syncData, err = collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)
assert.Equal(t, bucketUUID, syncData.HLV.SourceID)
assert.Equal(t, uintCAS, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
}

// TestPutExistingCurrentVersionWithNoExistingDoc:
// - Purpose of this test is to test PutExistingRevWithBody code pathway where an
// existing doc is not provided from the bucket into the function simulating a new, not seen
// before doc entering this code path
func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {
db, ctx := setupTestDB(t)
defer db.Close(ctx)

bucketUUID := db.BucketUUID
collection := GetSingleDatabaseCollectionWithUser(t, db)

// construct a mock doc update coming over a replicator
body := Body{"key1": "value2"}
newDoc := createTestDocument("doc2", "", body, false, 0)

// construct a HLV that simulates a doc update happening on a client
// this means moving the current source version pair to PV and adding new sourceID and version pair to CV
pv := make(map[string]uint64)
pv[bucketUUID] = 2
// create a version larger than the allocated version above
incomingVersion := uint64(2 + 10)
incomingHLV := HybridLogicalVector{
SourceID: "test",
Version: incomingVersion,
PreviousVersions: pv,
}
// call PutExistingCurrentVersion with empty existing doc
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, &sgbucket.BucketDocument{})
require.NoError(t, err)
assert.NotNil(t, doc)
// assert on returned CV value
assert.Equal(t, "test", cv.SourceID)
assert.Equal(t, incomingVersion, cv.Version)
assert.Equal(t, []byte(`{"key1":"value2"}`), doc._rawBody)

// assert on the sync data from the above update to the doc
// CV should be equal to CV of update on client but the cvCAS should be updated with the new update and
// PV should contain the old CV pair
syncData, err := collection.GetDocSyncData(ctx, "doc2")
assert.NoError(t, err)
uintCAS := base.HexCasToUint64(syncData.Cas)
assert.Equal(t, "test", syncData.HLV.SourceID)
assert.Equal(t, incomingVersion, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
// update the pv map so we can assert we have correct pv map in HLV
assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv))
assert.Equal(t, "1-3a208ea66e84121b528f05b5457d1134", syncData.CurrentRev)
}
Loading

0 comments on commit 071c677

Please sign in to comment.