Skip to content

Commit

Permalink
CBG-4250 Add pv support to rosmar xdcr (#7230)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamcfraser authored and bbrks committed Dec 5, 2024
1 parent 1a4559c commit 4fc9df0
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 45 deletions.
4 changes: 2 additions & 2 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err

var history []string
historyStr := rq.Properties[RevMessageHistory]
var incomingHLV HybridLogicalVector
var incomingHLV *HybridLogicalVector
// Build history/HLV
if !bh.useHLV() {
newDoc.RevID = rev
Expand All @@ -1073,7 +1073,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
base.InfofCtx(bh.loggingCtx, base.KeySync, "Error parsing hlv while processing rev for doc %v. HLV:%v Error: %v", base.UD(docID), versionVectorStr, err)
return base.HTTPErrorf(http.StatusUnprocessableEntity, "error extracting hlv from blip message")
}
newDoc.HLV = &incomingHLV
newDoc.HLV = incomingHLV
}

newDoc.UpdateBodyBytes(bodyBytes)
Expand Down
7 changes: 3 additions & 4 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,7 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod
return newRevID, doc, err
}

func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, newDocHLV HybridLogicalVector, existingDoc *sgbucket.BucketDocument) (doc *Document, cv *Version, newRevID string, err error) {
func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, newDocHLV *HybridLogicalVector, existingDoc *sgbucket.BucketDocument) (doc *Document, cv *Version, newRevID string, err error) {
var matchRev string
if existingDoc != nil {
doc, unmarshalErr := db.unmarshalDocumentWithXattrs(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattrs, existingDoc.Cas, DocUnmarshalRev)
Expand Down Expand Up @@ -1229,8 +1229,7 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
// Conflict check here
// if doc has no HLV defined this is a new doc we haven't seen before, skip conflict check
if doc.HLV == nil {
newHLV := NewHybridLogicalVector()
doc.HLV = &newHLV
doc.HLV = NewHybridLogicalVector()
addNewerVersionsErr := doc.HLV.AddNewerVersions(newDocHLV)
if addNewerVersionsErr != nil {
return nil, nil, false, nil, addNewerVersionsErr
Expand All @@ -1240,7 +1239,7 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
base.DebugfCtx(ctx, base.KeyCRUD, "PutExistingCurrentVersion(%q): No new versions to add", base.UD(newDoc.ID))
return nil, nil, false, nil, base.ErrUpdateCancel // No new revisions to add
}
if newDocHLV.isDominating(*doc.HLV) {
if newDocHLV.isDominating(doc.HLV) {
// update hlv for all newer incoming source version pairs
addNewerVersionsErr := doc.HLV.AddNewerVersions(newDocHLV)
if addNewerVersionsErr != nil {
Expand Down
6 changes: 3 additions & 3 deletions db/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1817,7 +1817,7 @@ func TestPutExistingCurrentVersion(t *testing.T) {

// create a version larger than the allocated version above
incomingVersion := docUpdateVersionInt + 10
incomingHLV := HybridLogicalVector{
incomingHLV := &HybridLogicalVector{
SourceID: "test",
Version: incomingVersion,
PreviousVersions: pv,
Expand Down Expand Up @@ -1895,7 +1895,7 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) {
// create a new doc update to simulate a doc update arriving over replicator from, client
body = Body{"key1": "value2"}
newDoc := createTestDocument(key, "", body, false, 0)
incomingHLV := HybridLogicalVector{
incomingHLV := &HybridLogicalVector{
SourceID: "test",
Version: 1234,
}
Expand Down Expand Up @@ -1935,7 +1935,7 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {
pv[bucketUUID] = uint64(2)
// create a version larger than the allocated version above
incomingVersion := uint64(2 + 10)
incomingHLV := HybridLogicalVector{
incomingHLV := &HybridLogicalVector{
SourceID: "test",
Version: incomingVersion,
PreviousVersions: pv,
Expand Down
28 changes: 14 additions & 14 deletions db/hybrid_logical_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ type HybridLogicalVector struct {
}

// NewHybridLogicalVector returns an initialised HybridLogicalVector.
func NewHybridLogicalVector() HybridLogicalVector {
return HybridLogicalVector{
func NewHybridLogicalVector() *HybridLogicalVector {
return &HybridLogicalVector{
PreviousVersions: make(HLVVersions),
MergeVersions: make(HLVVersions),
}
Expand Down Expand Up @@ -262,7 +262,7 @@ func (hlv *HybridLogicalVector) Remove(source string) error {
// If HLV A dominates CV of HLV B, it can be assumed to dominate the entire HLV, since
// CV dominates PV for a given HLV. Given this, it's sufficient to check whether HLV A
// has a version for HLV B's current source that's greater than or equal to HLV B's current version.
func (hlv *HybridLogicalVector) isDominating(otherVector HybridLogicalVector) bool {
func (hlv *HybridLogicalVector) isDominating(otherVector *HybridLogicalVector) bool {
return hlv.DominatesSource(Version{otherVector.SourceID, otherVector.Version})
}

Expand Down Expand Up @@ -296,7 +296,7 @@ func (hlv *HybridLogicalVector) GetValue(sourceID string) (uint64, bool) {

// AddNewerVersions will take a hlv and add any newer source/version pairs found across CV and PV found in the other HLV taken as parameter
// when both HLV
func (hlv *HybridLogicalVector) AddNewerVersions(otherVector HybridLogicalVector) error {
func (hlv *HybridLogicalVector) AddNewerVersions(otherVector *HybridLogicalVector) error {

// create current version for incoming vector and attempt to add it to the local HLV, AddVersion will handle if attempting to add older
// version than local HLVs CV pair
Expand Down Expand Up @@ -416,29 +416,29 @@ func appendRevocationMacroExpansions(currentSpec []sgbucket.MacroExpansionSpec,
// 3. cv, pv, and mv: cv;mv;pv
//
// TODO: CBG-3662 - Optimise once we've settled on and tested the format with CBL
func extractHLVFromBlipMessage(versionVectorStr string) (HybridLogicalVector, error) {
hlv := HybridLogicalVector{}
func extractHLVFromBlipMessage(versionVectorStr string) (*HybridLogicalVector, error) {
hlv := &HybridLogicalVector{}

vectorFields := strings.Split(versionVectorStr, ";")
vectorLength := len(vectorFields)
if (vectorLength == 1 && vectorFields[0] == "") || vectorLength > 3 {
return HybridLogicalVector{}, fmt.Errorf("invalid hlv in changes message received")
return &HybridLogicalVector{}, fmt.Errorf("invalid hlv in changes message received")
}

// add current version (should always be present)
cvStr := vectorFields[0]
version := strings.Split(cvStr, "@")
if len(version) < 2 {
return HybridLogicalVector{}, fmt.Errorf("invalid version in changes message received")
return &HybridLogicalVector{}, fmt.Errorf("invalid version in changes message received")
}

vrs, err := strconv.ParseUint(version[0], 16, 64)
if err != nil {
return HybridLogicalVector{}, err
return &HybridLogicalVector{}, err
}
err = hlv.AddVersion(Version{SourceID: version[1], Value: vrs})
if err != nil {
return HybridLogicalVector{}, err
return &HybridLogicalVector{}, err
}

switch vectorLength {
Expand All @@ -449,7 +449,7 @@ func extractHLVFromBlipMessage(versionVectorStr string) (HybridLogicalVector, er
// only cv and pv present
sourceVersionListPV, err := parseVectorValues(vectorFields[1])
if err != nil {
return HybridLogicalVector{}, err
return &HybridLogicalVector{}, err
}
hlv.PreviousVersions = make(HLVVersions)
for _, v := range sourceVersionListPV {
Expand All @@ -461,7 +461,7 @@ func extractHLVFromBlipMessage(versionVectorStr string) (HybridLogicalVector, er
sourceVersionListPV, err := parseVectorValues(vectorFields[2])
hlv.PreviousVersions = make(HLVVersions)
if err != nil {
return HybridLogicalVector{}, err
return &HybridLogicalVector{}, err
}
for _, pv := range sourceVersionListPV {
hlv.PreviousVersions[pv.SourceID] = pv.Value
Expand All @@ -470,14 +470,14 @@ func extractHLVFromBlipMessage(versionVectorStr string) (HybridLogicalVector, er
sourceVersionListMV, err := parseVectorValues(vectorFields[1])
hlv.MergeVersions = make(HLVVersions)
if err != nil {
return HybridLogicalVector{}, err
return &HybridLogicalVector{}, err
}
for _, mv := range sourceVersionListMV {
hlv.MergeVersions[mv.SourceID] = mv.Value
}
return hlv, nil
default:
return HybridLogicalVector{}, fmt.Errorf("invalid hlv in changes message received")
return &HybridLogicalVector{}, fmt.Errorf("invalid hlv in changes message received")
}
}

Expand Down
8 changes: 4 additions & 4 deletions db/hybrid_logical_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestConflictDetectionDominating(t *testing.T) {

// createHLVForTest is a helper function to create a HLV for use in a test. Takes a list of strings in the format of <sourceID@version> and assumes
// first entry is current version. For merge version entries you must specify 'm_' as a prefix to sourceID NOTE: it also sets cvCAS to the current version
func createHLVForTest(tb *testing.T, inputList []string) HybridLogicalVector {
func createHLVForTest(tb *testing.T, inputList []string) *HybridLogicalVector {
hlvOutput := NewHybridLogicalVector()

// first element will be current version and source pair
Expand Down Expand Up @@ -537,13 +537,13 @@ func TestInvalidHLVInBlipMessageForm(t *testing.T) {
hlv, err := extractHLVFromBlipMessage(hlvStr)
require.Error(t, err)
assert.ErrorContains(t, err, "invalid hlv in changes message received")
assert.Equal(t, HybridLogicalVector{}, hlv)
assert.Equal(t, &HybridLogicalVector{}, hlv)

hlvStr = ""
hlv, err = extractHLVFromBlipMessage(hlvStr)
require.Error(t, err)
assert.ErrorContains(t, err, "invalid hlv in changes message received")
assert.Equal(t, HybridLogicalVector{}, hlv)
assert.Equal(t, &HybridLogicalVector{}, hlv)
}

var extractHLVFromBlipMsgBMarkCases = []struct {
Expand Down Expand Up @@ -758,7 +758,7 @@ func TestVersionDeltaCalculation(t *testing.T) {
vvXattr, err = base.JSONMarshal(&hlv2)
require.NoError(t, err)
// convert the bytes back to an in memory format of hlv
memHLV = HybridLogicalVector{}
memHLV = &HybridLogicalVector{}
err = base.JSONUnmarshal(vvXattr, &memHLV)
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion rest/utilities_testing_blip_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (btcr *BlipTesterCollectionClient) NewBlipTesterDoc(revID string, body []by
}
if btcr.UseHLV() {
doc.revMode = revModeHLV
doc.HLV = db.NewHybridLogicalVector()
doc.HLV = *db.NewHybridLogicalVector()
_ = doc.HLV.AddVersion(VersionFromRevID(revID))
} else {
doc.revMode = revModeRevTree
Expand Down
23 changes: 12 additions & 11 deletions xdcr/rosmar_xdcr.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func newRosmarManager(ctx context.Context, fromBucket, toBucket *rosmar.Bucket,

}

// processEvent processes a DCP event coming from a toBucket and replicates it to the target datastore.
// processEvent processes a DCP event coming from a source bucket and replicates it to the target datastore.
func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEvent) bool {
docID := string(event.Key)
base.TracefCtx(ctx, base.KeyVV, "Got event %s, opcode: %s", docID, event.Opcode)
Expand Down Expand Up @@ -352,24 +352,25 @@ func getHLVAndMou(xattrs map[string][]byte) (*db.HybridLogicalVector, *db.Metada

// updateHLV will update the xattrs on the target document considering the source's HLV, _mou, sourceID and cas.
func updateHLV(xattrs map[string][]byte, sourceHLV *db.HybridLogicalVector, sourceMou *db.MetadataOnlyUpdate, sourceID string, sourceCas uint64) error {
// TODO: read existing targetXattrs[base.VvXattrName] and update the pv CBG-4250. This will need to merge pv from sourceHLV and targetHLV.
var targetHLV *db.HybridLogicalVector
// if source vv.cvCas == cas, the _vv.cv, _vv.cvCAS from the source is correct and we can use it directly.

targetHLV := db.NewHybridLogicalVector()
if sourceHLV != nil {
targetHLV = sourceHLV
}

// If source vv.cvCas == cas, the _vv.cv, _vv.cvCAS from the source already includes the latest mutation and we can use it directly.
// Otherwise we need to add the current mutation (sourceID, sourceCas) to the HLV before writing to the target
sourcecvCASMatch := sourceHLV != nil && sourceHLV.CurrentVersionCAS == sourceCas
sourceWasImport := sourceMou != nil && sourceMou.CAS() == sourceCas
if sourceHLV != nil && (sourceWasImport || sourcecvCASMatch) {
targetHLV = sourceHLV
} else {
hlv := db.NewHybridLogicalVector()
err := hlv.AddVersion(db.Version{
if !(sourceWasImport || sourcecvCASMatch) {
err := targetHLV.AddVersion(db.Version{
SourceID: sourceID,
Value: sourceCas,
})
if err != nil {
return err
}
hlv.CurrentVersionCAS = sourceCas
targetHLV = &hlv
targetHLV.CurrentVersionCAS = sourceCas
}
var err error
xattrs[base.VvXattrName], err = json.Marshal(targetHLV)
Expand Down
106 changes: 100 additions & 6 deletions xdcr/xdcr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,93 @@ func TestReplicateXattrs(t *testing.T) {
}
}

// TestVVMultiActor verifies that updates by multiple actors (updates to different clusters/buckets) are properly
// reflected in the HLV (cv and pv).
func TestVVMultiActor(t *testing.T) {
fromBucket, fromDs, toBucket, toDs := getTwoBucketDataStores(t)
ctx := base.TestCtx(t)
fromBucketSourceID, err := GetSourceID(ctx, fromBucket)
require.NoError(t, err)
toBucketSourceID, err := GetSourceID(ctx, toBucket)
require.NoError(t, err)

// Create document on source
docID := "doc1"
ver1Body := `{"ver":1}`
fromCAS, err := fromDs.WriteCas(docID, 0, 0, []byte(ver1Body), 0)
require.NoError(t, err)

// start bidirectional XDCR
xdcrSource := startXDCR(t, fromBucket, toBucket, XDCROptions{Mobile: MobileOn})
xdcrTarget := startXDCR(t, toBucket, fromBucket, XDCROptions{Mobile: MobileOn})
defer func() {
assert.NoError(t, xdcrSource.Stop(ctx))
assert.NoError(t, xdcrTarget.Stop(ctx))
}()
requireWaitForXDCRDocsProcessed(t, xdcrSource, 1)

// Verify HLV on remote.
// expected HLV:
// cv: fromCAS@source
body, xattrs, destCas, err := toDs.GetWithXattrs(ctx, docID, []string{base.VvXattrName, base.MouXattrName})
require.NoError(t, err)
require.Equal(t, fromCAS, destCas)
require.JSONEq(t, ver1Body, string(body))
requireCV(t, xattrs[base.VvXattrName], fromBucketSourceID, fromCAS)

// Update document on remote
toCAS, err := toDs.WriteCas(docID, 0, fromCAS, []byte(`{"ver":2}`), 0)
require.NoError(t, err)
requireWaitForXDCRDocsProcessed(t, xdcrTarget, 2)

// Verify HLV on source.
// expected HLV:
// cv: toCAS@remote
// pv: fromCAS@source
body, xattrs, destCas, err = fromDs.GetWithXattrs(ctx, docID, []string{base.VvXattrName, base.MouXattrName})
require.NoError(t, err)
require.Equal(t, toCAS, destCas)
require.JSONEq(t, `{"ver":2}`, string(body))
require.Contains(t, xattrs, base.VvXattrName)
requireCV(t, xattrs[base.VvXattrName], toBucketSourceID, toCAS)
requirePV(t, xattrs[base.VvXattrName], fromBucketSourceID, fromCAS)

// Update document on remote again. Verifies that another update to cv doesn't affect pv.
toCAS2, err := toDs.WriteCas(docID, 0, toCAS, []byte(`{"ver":3}`), 0)
require.NoError(t, err)
requireWaitForXDCRDocsProcessed(t, xdcrTarget, 3)

// Verify HLV on source bucket.
// expected HLV:
// cv: toCAS2@remote
// pv: fromCAS@source
body, xattrs, destCas, err = fromDs.GetWithXattrs(ctx, docID, []string{base.VvXattrName, base.MouXattrName})
require.NoError(t, err)
require.Equal(t, toCAS2, destCas)
require.JSONEq(t, `{"ver":3}`, string(body))
require.Contains(t, xattrs, base.VvXattrName)
requireCV(t, xattrs[base.VvXattrName], toBucketSourceID, toCAS2)
requirePV(t, xattrs[base.VvXattrName], fromBucketSourceID, fromCAS)

// Update document on source bucket. Verifies that local source is moved from pv to cv, target source from cv to pv.
fromCAS2, err := fromDs.WriteCas(docID, 0, toCAS2, []byte(`{"ver":4}`), 0)
require.NoError(t, err)
requireWaitForXDCRDocsProcessed(t, xdcrTarget, 4)

// Verify HLV on target
// expected HLV:
// cv: fromCAS2@source
// pv: toCAS2@remote
body, xattrs, destCas, err = toDs.GetWithXattrs(ctx, docID, []string{base.VvXattrName, base.MouXattrName})
require.NoError(t, err)
require.Equal(t, fromCAS2, destCas)
require.JSONEq(t, `{"ver":4}`, string(body))
require.Contains(t, xattrs, base.VvXattrName)
requireCV(t, xattrs[base.VvXattrName], fromBucketSourceID, fromCAS2)
requirePV(t, xattrs[base.VvXattrName], toBucketSourceID, toCAS2)

}

// startXDCR will create a new XDCR manager and start it. This must be closed by the caller.
func startXDCR(t *testing.T, fromBucket base.Bucket, toBucket base.Bucket, opts XDCROptions) Manager {
ctx := base.TestCtx(t)
Expand All @@ -640,13 +727,20 @@ func requireWaitForXDCRDocsProcessed(t *testing.T, xdcr Manager, expectedDocsPro
}, time.Second*5, time.Millisecond*100)
}

// requireCV requires tests that a given hlv from server has a sourceID and cas matching the version. This is strict and will fail if _pv is populated (TODO: CBG-4250).
// requireCV requires tests that a given hlv from server has sourceID and cas matching the current version.
func requireCV(t *testing.T, vvBytes []byte, sourceID string, cas uint64) {
var vv *db.HybridLogicalVector
require.NoError(t, base.JSONUnmarshal(vvBytes, &vv))
require.Equal(t, &db.HybridLogicalVector{
CurrentVersionCAS: cas,
SourceID: sourceID,
Version: cas,
}, vv)
require.Equal(t, cas, vv.CurrentVersionCAS)
require.Equal(t, sourceID, vv.SourceID)
}

// requirePV requires tests that a given hlv from server has an entry in the PV with sourceID and cas matching the provided values.
func requirePV(t *testing.T, vvBytes []byte, sourceID string, cas uint64) {
var vv *db.HybridLogicalVector
require.NoError(t, base.JSONUnmarshal(vvBytes, &vv))
require.NotNil(t, vv.PreviousVersions)
pvValue, ok := vv.PreviousVersions[sourceID]
require.True(t, ok)
require.Equal(t, cas, pvValue)
}

0 comments on commit 4fc9df0

Please sign in to comment.