Skip to content

Commit

Permalink
CBG-3503 Update HLV on import (#6572)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamcfraser authored and gregns1 committed Dec 1, 2023
1 parent 998d6bc commit 89af84e
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 82 deletions.
26 changes: 22 additions & 4 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,19 +876,37 @@ func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent DocU
case ExistingVersion:
// preserve any other logic on the HLV that has been done by the client, only update to cvCAS will be needed
d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue
d.HLV.ImportCAS = 0 // remove importCAS for non-imports to save space
case Import:
// work to be done to decide if the VV needs updating here, pending CBG-3503
if d.HLV.CurrentVersionCAS == d.Cas {
// if cvCAS = document CAS, the HLV has already been updated for this mutation by another HLV-aware peer.
// Set ImportCAS to the previous document CAS, but don't otherwise modify HLV
d.HLV.ImportCAS = d.Cas
} else {
// Otherwise this is an SDK mutation made by the local cluster that should be added to HLV.
newVVEntry := SourceAndVersion{}
newVVEntry.SourceID = db.dbCtx.BucketUUID
newVVEntry.Version = hlvExpandMacroCASValue
err := d.SyncData.HLV.AddVersion(newVVEntry)
if err != nil {
return nil, err
}
d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue
d.HLV.ImportCAS = d.Cas
}

case NewVersion, ExistingVersionWithUpdateToHLV:
// add a new entry to the version vector
newVVEntry := CurrentVersionVector{}
newVVEntry := SourceAndVersion{}
newVVEntry.SourceID = db.dbCtx.BucketUUID
newVVEntry.VersionCAS = hlvExpandMacroCASValue
newVVEntry.Version = hlvExpandMacroCASValue
err := d.SyncData.HLV.AddVersion(newVVEntry)
if err != nil {
return nil, err
}
// update the cvCAS on the SGWrite event too
d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue
d.HLV.ImportCAS = 0 // remove importCAS for non-imports to save space
}
return d, nil
}
Expand Down Expand Up @@ -2059,7 +2077,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
Expiry: doc.Expiry,
Deleted: doc.History[newRevID].Deleted,
_shallowCopyBody: storedDoc.Body(ctx),
CV: &CurrentVersionVector{VersionCAS: doc.HLV.Version, SourceID: doc.HLV.SourceID},
CV: &SourceAndVersion{Version: doc.HLV.Version, SourceID: doc.HLV.SourceID},
}

if createNewRevIDSkipped {
Expand Down
4 changes: 2 additions & 2 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -1239,14 +1239,14 @@ func (doc *Document) MarshalWithXattr() (data []byte, xdata []byte, err error) {
}

// HasCurrentVersion Compares the specified CV with the fetched documents CV, returns error on mismatch between the two
func (d *Document) HasCurrentVersion(cv CurrentVersionVector) error {
func (d *Document) HasCurrentVersion(cv SourceAndVersion) error {
if d.HLV == nil {
return base.RedactErrorf("no HLV present in fetched doc %s", base.UD(d.ID))
}

// fetch the current version for the loaded doc and compare against the CV specified in the IDandCV key
fetchedDocSource, fetchedDocVersion := d.HLV.GetCurrentVersion()
if fetchedDocSource != cv.SourceID || fetchedDocVersion != cv.VersionCAS {
if fetchedDocSource != cv.SourceID || fetchedDocVersion != cv.Version {
return base.RedactErrorf("mismatch between specified current version and fetched document current version for doc %s", base.UD(d.ID))
}
return nil
Expand Down
41 changes: 29 additions & 12 deletions db/hybrid_logical_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,31 @@ const hlvExpandMacroCASValue = math.MaxUint64

type HybridLogicalVector struct {
CurrentVersionCAS uint64 // current version cas (or cvCAS) stores the current CAS at the time of replication
ImportCAS uint64 // Set when an import modifies the document CAS but preserves the HLV (import of a version replicated by XDCR)
SourceID string // source bucket uuid of where this entry originated from
Version uint64 // current cas of the current version on the version vector
MergeVersions map[string]uint64 // map of merge versions for fast efficient lookup
PreviousVersions map[string]uint64 // map of previous versions for fast efficient lookup
}

// CurrentVersionVector is a structure used to add a new sourceID:CAS entry to a HLV
type CurrentVersionVector struct {
VersionCAS uint64
SourceID string
// SourceAndVersion is a structure used to add a new entry to a HLV
type SourceAndVersion struct {
SourceID string
Version uint64
}

func CreateVersion(source string, version uint64) SourceAndVersion {
return SourceAndVersion{
SourceID: source,
Version: version,
}
}

type PersistedHybridLogicalVector struct {
CurrentVersionCAS string `json:"cvCas,omitempty"`
SourceID string `json:"src,omitempty"`
Version string `json:"vrs,omitempty"`
ImportCAS string `json:"importCAS,omitempty"`
SourceID string `json:"src"`
Version string `json:"vrs"`
MergeVersions map[string]string `json:"mv,omitempty"`
PreviousVersions map[string]string `json:"pv,omitempty"`
}
Expand Down Expand Up @@ -66,27 +75,27 @@ func (hlv *HybridLogicalVector) IsInConflict(otherVector HybridLogicalVector) bo

// AddVersion adds a version vector to the in memory representation of a HLV and moves current version vector to
// previous versions on the HLV if needed
func (hlv *HybridLogicalVector) AddVersion(newVersion CurrentVersionVector) error {
if newVersion.VersionCAS < hlv.Version {
return fmt.Errorf("attempting to add new verison vector entry with a CAS that is less than the current version CAS value. Current cas: %d new cas %d", hlv.Version, newVersion.VersionCAS)
func (hlv *HybridLogicalVector) AddVersion(newVersion SourceAndVersion) error {
if newVersion.Version < hlv.Version {
return fmt.Errorf("attempting to add new verison vector entry with a CAS that is less than the current version CAS value. Current cas: %d new cas %d", hlv.Version, newVersion.Version)
}
// check if this is the first time we're adding a source - version pair
if hlv.SourceID == "" {
hlv.Version = newVersion.VersionCAS
hlv.Version = newVersion.Version
hlv.SourceID = newVersion.SourceID
return nil
}
// if new entry has the same source we simple just update the version
if newVersion.SourceID == hlv.SourceID {
hlv.Version = newVersion.VersionCAS
hlv.Version = newVersion.Version
return nil
}
// if we get here this is a new version from a different sourceID thus need to move current sourceID to previous versions and update current version
if hlv.PreviousVersions == nil {
hlv.PreviousVersions = make(map[string]uint64)
}
hlv.PreviousVersions[hlv.SourceID] = hlv.Version
hlv.Version = newVersion.VersionCAS
hlv.Version = newVersion.Version
hlv.SourceID = newVersion.SourceID
return nil
}
Expand Down Expand Up @@ -204,10 +213,14 @@ func (hlv *HybridLogicalVector) UnmarshalJSON(inputjson []byte) error {
func (hlv *HybridLogicalVector) convertHLVToPersistedFormat() (*PersistedHybridLogicalVector, error) {
persistedHLV := PersistedHybridLogicalVector{}
var cvCasByteArray []byte
var importCASBytes []byte
var vrsCasByteArray []byte
if hlv.CurrentVersionCAS != 0 {
cvCasByteArray = base.Uint64CASToLittleEndianHex(hlv.CurrentVersionCAS)
}
if hlv.ImportCAS != 0 {
importCASBytes = base.Uint64CASToLittleEndianHex(hlv.ImportCAS)
}
if hlv.Version != 0 {
vrsCasByteArray = base.Uint64CASToLittleEndianHex(hlv.Version)
}
Expand All @@ -222,6 +235,7 @@ func (hlv *HybridLogicalVector) convertHLVToPersistedFormat() (*PersistedHybridL
}

persistedHLV.CurrentVersionCAS = string(cvCasByteArray)
persistedHLV.ImportCAS = string(importCASBytes)
persistedHLV.SourceID = hlv.SourceID
persistedHLV.Version = string(vrsCasByteArray)
persistedHLV.PreviousVersions = pvPersistedFormat
Expand All @@ -231,6 +245,9 @@ func (hlv *HybridLogicalVector) convertHLVToPersistedFormat() (*PersistedHybridL

func (hlv *HybridLogicalVector) convertPersistedHLVToInMemoryHLV(persistedJSON PersistedHybridLogicalVector) {
hlv.CurrentVersionCAS = base.HexCasToUint64(persistedJSON.CurrentVersionCAS)
if persistedJSON.ImportCAS != "" {
hlv.ImportCAS = base.HexCasToUint64(persistedJSON.ImportCAS)
}
hlv.SourceID = persistedJSON.SourceID
// convert the hex cas to uint64 cas
hlv.Version = base.HexCasToUint64(persistedJSON.Version)
Expand Down
105 changes: 99 additions & 6 deletions db/hybrid_logical_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
package db

import (
"context"
"reflect"
"strconv"
"strings"
"testing"

sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbase/sync_gateway/base"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -34,14 +37,14 @@ func TestInternalHLVFunctions(t *testing.T) {
const newSource = "s_testsource"

// create a new version vector entry that will error method AddVersion
badNewVector := CurrentVersionVector{
VersionCAS: 123345,
SourceID: currSourceId,
badNewVector := SourceAndVersion{
Version: 123345,
SourceID: currSourceId,
}
// create a new version vector entry that should be added to HLV successfully
newVersionVector := CurrentVersionVector{
VersionCAS: newCAS,
SourceID: currSourceId,
newVersionVector := SourceAndVersion{
Version: newCAS,
SourceID: currSourceId,
}

// Get current version vector, sourceID and CAS pair
Expand Down Expand Up @@ -229,3 +232,93 @@ func TestHybridLogicalVectorPersistence(t *testing.T) {
assert.Equal(t, inMemoryHLV.PreviousVersions, hlvFromPersistance.PreviousVersions)
assert.Equal(t, inMemoryHLV.MergeVersions, hlvFromPersistance.MergeVersions)
}

// Tests import of server-side mutations made by HLV-aware and non-HLV-aware peers
func TestHLVImport(t *testing.T) {

base.SetUpTestLogging(t, base.LevelInfo, base.KeyMigrate, base.KeyImport)

db, ctx := setupTestDB(t)
defer db.Close(ctx)

collection := GetSingleDatabaseCollectionWithUser(t, db)
localSource := collection.dbCtx.BucketUUID

// 1. Test standard import of an SDK write
standardImportKey := "standardImport_" + t.Name()
standardImportBody := []byte(`{"prop":"value"}`)
cas, err := collection.dataStore.WriteCas(standardImportKey, 0, 0, 0, standardImportBody, sgbucket.Raw)
require.NoError(t, err, "write error")
_, err = collection.ImportDocRaw(ctx, standardImportKey, standardImportBody, nil, nil, false, cas, nil, ImportFromFeed)
require.NoError(t, err, "import error")

importedDoc, _, err := collection.GetDocWithXattr(ctx, standardImportKey, DocUnmarshalAll)
require.NoError(t, err)
importedHLV := importedDoc.HLV
require.Equal(t, cas, importedHLV.ImportCAS)
require.Equal(t, importedDoc.Cas, importedHLV.CurrentVersionCAS)
require.Equal(t, importedDoc.Cas, importedHLV.Version)
require.Equal(t, localSource, importedHLV.SourceID)

// 2. Test import of write by HLV-aware peer (HLV is already updated, sync metadata is not).
otherSource := "otherSource"
hlvHelper := NewHLVAgent(t, collection.dataStore, otherSource, "_sync")
existingHLVKey := "existingHLV_" + t.Name()
_ = hlvHelper.insertWithHLV(ctx, existingHLVKey)

var existingBody, existingXattr []byte
cas, err = collection.dataStore.GetWithXattr(ctx, existingHLVKey, "_sync", "", &existingBody, &existingXattr, nil)
require.NoError(t, err)

_, err = collection.ImportDocRaw(ctx, existingHLVKey, existingBody, existingXattr, nil, false, cas, nil, ImportFromFeed)
require.NoError(t, err, "import error")

importedDoc, _, err = collection.GetDocWithXattr(ctx, existingHLVKey, DocUnmarshalAll)
require.NoError(t, err)
importedHLV = importedDoc.HLV
// cas in the HLV's current version and cvCAS should not have changed, and should match importCAS
require.Equal(t, cas, importedHLV.ImportCAS)
require.Equal(t, cas, importedHLV.CurrentVersionCAS)
require.Equal(t, cas, importedHLV.Version)
require.Equal(t, otherSource, importedHLV.SourceID)
}

// HLVAgent performs HLV updates directly (not via SG) for simulating/testing interaction with non-SG HLV agents
type HLVAgent struct {
t *testing.T
datastore base.DataStore
source string // All writes by the HLVHelper are done as this source
xattrName string // xattr name to store the HLV
}

var defaultHelperBody = map[string]interface{}{"version": 1}

func NewHLVAgent(t *testing.T, datastore base.DataStore, source string, xattrName string) *HLVAgent {
return &HLVAgent{
t: t,
datastore: datastore,
source: source, // all writes by the HLVHelper are done as this source
xattrName: xattrName,
}
}

// insertWithHLV inserts a new document into the bucket with a populated HLV (matching a write from
// a different HLV-aware peer)
func (h *HLVAgent) insertWithHLV(ctx context.Context, key string) (casOut uint64) {
hlv := &HybridLogicalVector{}
err := hlv.AddVersion(CreateVersion(h.source, hlvExpandMacroCASValue))
require.NoError(h.t, err)
hlv.CurrentVersionCAS = hlvExpandMacroCASValue

syncData := &SyncData{HLV: hlv}
syncDataBytes, err := base.JSONMarshal(syncData)
require.NoError(h.t, err)

mutateInOpts := &sgbucket.MutateInOptions{
MacroExpansion: hlv.computeMacroExpansions(),
}

cas, err := h.datastore.WriteCasWithXattr(ctx, key, h.xattrName, 0, 0, defaultHelperBody, syncDataBytes, mutateInOpts)
require.NoError(h.t, err)
return cas
}
4 changes: 2 additions & 2 deletions db/revision_cache_bypass.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (rc *BypassRevisionCache) GetWithRev(ctx context.Context, docID, revID stri
}

// GetWithCV fetches the Current Version for the given docID and CV immediately from the bucket.
func (rc *BypassRevisionCache) GetWithCV(ctx context.Context, docID string, cv *CurrentVersionVector, includeBody, includeDelta bool) (docRev DocumentRevision, err error) {
func (rc *BypassRevisionCache) GetWithCV(ctx context.Context, docID string, cv *SourceAndVersion, includeBody, includeDelta bool) (docRev DocumentRevision, err error) {

unmarshalLevel := DocUnmarshalSync
if includeBody {
Expand Down Expand Up @@ -126,7 +126,7 @@ func (rc *BypassRevisionCache) RemoveWithRev(docID, revID string) {
// nop
}

func (rc *BypassRevisionCache) RemoveWithCV(docID string, cv *CurrentVersionVector) {
func (rc *BypassRevisionCache) RemoveWithCV(docID string, cv *SourceAndVersion) {
// nop
}

Expand Down
20 changes: 10 additions & 10 deletions db/revision_cache_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type RevisionCache interface {
// GetWithCV returns the given revision by CV, and stores if not already cached.
// When includeBody=true, the returned DocumentRevision will include a mutable shallow copy of the marshaled body.
// When includeDelta=true, the returned DocumentRevision will include delta - requires additional locking during retrieval.
GetWithCV(ctx context.Context, docID string, cv *CurrentVersionVector, includeBody, includeDelta bool) (DocumentRevision, error)
GetWithCV(ctx context.Context, docID string, cv *SourceAndVersion, includeBody, includeDelta bool) (DocumentRevision, error)

// GetActive returns the current revision for the given doc ID, and stores if not already cached.
// When includeBody=true, the returned DocumentRevision will include a mutable shallow copy of the marshaled body.
Expand All @@ -55,7 +55,7 @@ type RevisionCache interface {
RemoveWithRev(docID, revID string)

// RemoveWithCV evicts a revision from the cache using its current version.
RemoveWithCV(docID string, cv *CurrentVersionVector)
RemoveWithCV(docID string, cv *SourceAndVersion)

// UpdateDelta stores the given toDelta value in the given rev if cached
UpdateDelta(ctx context.Context, docID, revID string, toDelta RevisionDelta)
Expand Down Expand Up @@ -128,7 +128,7 @@ type DocumentRevision struct {
Delta *RevisionDelta
Deleted bool
Removed bool // True if the revision is a removal.
CV *CurrentVersionVector
CV *SourceAndVersion

_shallowCopyBody Body // an unmarshalled body that can produce shallow copies
}
Expand Down Expand Up @@ -262,7 +262,7 @@ func newRevCacheDelta(deltaBytes []byte, fromRevID string, toRevision DocumentRe

// This is the RevisionCacheLoaderFunc callback for the context's RevisionCache.
// Its job is to load a revision from the bucket when there's a cache miss.
func revCacheLoader(ctx context.Context, backingStore RevisionCacheBackingStore, id IDAndRev, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, fetchedCV *CurrentVersionVector, err error) {
func revCacheLoader(ctx context.Context, backingStore RevisionCacheBackingStore, id IDAndRev, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, fetchedCV *SourceAndVersion, err error) {
var doc *Document
unmarshalLevel := DocUnmarshalSync
if unmarshalBody {
Expand All @@ -278,9 +278,9 @@ func revCacheLoader(ctx context.Context, backingStore RevisionCacheBackingStore,
// revCacheLoaderForCv will load a document from the bucket using the CV, comapre the fetched doc and the CV specified in the function,
// and will still return revid for purpose of populating the Rev ID lookup map on the cache
func revCacheLoaderForCv(ctx context.Context, backingStore RevisionCacheBackingStore, id IDandCV, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, err error) {
cv := CurrentVersionVector{
VersionCAS: id.Version,
SourceID: id.Source,
cv := SourceAndVersion{
Version: id.Version,
SourceID: id.Source,
}
var doc *Document
unmarshalLevel := DocUnmarshalSync
Expand All @@ -295,7 +295,7 @@ func revCacheLoaderForCv(ctx context.Context, backingStore RevisionCacheBackingS
}

// Common revCacheLoader functionality used either during a cache miss (from revCacheLoader), or directly when retrieving current rev from cache
func revCacheLoaderForDocument(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, revid string) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, fetchedCV *CurrentVersionVector, err error) {
func revCacheLoaderForDocument(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, revid string) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, fetchedCV *SourceAndVersion, err error) {
if bodyBytes, body, attachments, err = backingStore.getRevision(ctx, doc, revid); err != nil {
// If we can't find the revision (either as active or conflicted body from the document, or as old revision body backup), check whether
// the revision was a channel removal. If so, we want to store as removal in the revision cache
Expand All @@ -320,14 +320,14 @@ func revCacheLoaderForDocument(ctx context.Context, backingStore RevisionCacheBa
history = encodeRevisions(ctx, doc.ID, validatedHistory)
channels = doc.History[revid].Channels
if doc.HLV != nil {
fetchedCV = &CurrentVersionVector{SourceID: doc.HLV.SourceID, VersionCAS: doc.HLV.Version}
fetchedCV = &SourceAndVersion{SourceID: doc.HLV.SourceID, Version: doc.HLV.Version}
}

return bodyBytes, body, history, channels, removed, attachments, deleted, doc.Expiry, fetchedCV, err
}

// revCacheLoaderForDocumentCV used either during cache miss (from revCacheLoaderForCv), or used directly when getting current active CV from cache
func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, cv CurrentVersionVector) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, err error) {
func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, cv SourceAndVersion) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, err error) {
if bodyBytes, body, attachments, err = backingStore.getCurrentVersion(ctx, doc); err != nil {
// we need implementation of IsChannelRemoval for CV here.
// pending CBG-3213 support of channel removal for CV
Expand Down
Loading

0 comments on commit 89af84e

Please sign in to comment.