Skip to content

Commit

Permalink
updates to address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gregns1 committed Oct 5, 2023
1 parent 7d73389 commit 6b688cb
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 69 deletions.
12 changes: 1 addition & 11 deletions db/attachment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,12 +807,7 @@ func TestMigrateBodyAttachments(t *testing.T) {
},
"channels": {
"ABC": null
},
"_vv":{
"cvCas":"0x0",
"src":"abc",
"vrs":"0x0"
}
}
}`

var bodyVal map[string]interface{}
Expand Down Expand Up @@ -1099,11 +1094,6 @@ func TestMigrateBodyAttachmentsMerge(t *testing.T) {
"channels": {
"ABC": null
},
"_vv":{
"cvCas":"0x0",
"src":"abc",
"vrs":"0x0"
},
"attachments": {
"bye.txt": {
"digest": "sha1-l+N7VpXGnoxMm8xfvtWPbz2YvDc=",
Expand Down
40 changes: 12 additions & 28 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (

const (
kMaxRecentSequences = 20 // Maximum number of sequences stored in RecentSequences before pruning is triggered
// hlvExpandMacroCASValue causes the field to be populated by CAS value by macro expansion
hlvExpandMacroCASValue = math.MaxUint64
)

// ErrForbidden is returned when the user requests a document without a revision that they do not have access to.
Expand Down Expand Up @@ -871,29 +869,17 @@ func (db *DatabaseCollectionWithUser) OnDemandImportForWrite(ctx context.Context
return nil
}

// updateMutateInSpec returns the mutate in spec needed for the document update based off the outcome in updateHLV
func updateMutateInSpec(hlv HybridLogicalVector) []sgbucket.MacroExpansionSpec {
var outputSpec []sgbucket.MacroExpansionSpec
if hlv.Version == math.MaxUint64 {
spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionPath(base.SyncXattrName), sgbucket.MacroCas)
outputSpec = append(outputSpec, spec)
}
if hlv.CurrentVersionCAS == math.MaxUint64 {
spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionCASPath(base.SyncXattrName), sgbucket.MacroCas)
outputSpec = append(outputSpec, spec)
}
return outputSpec
}

// updateHLV updates the HLV in the sync data appropriately based on what type of document update event we are encountering
func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent DocUpdateEvent) (*Document, error) {

switch docUpdateEvent.eventType {
func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent uint32) (*Document, error) {
if d.HLV == nil {
d.HLV = &HybridLogicalVector{}
}
switch docUpdateEvent {
case BlipWriteEvent:
// preserve any other logic on the HLV that has been done by the client, only update to cvCAS will be needed
d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue
case ImportEvent:
// VV should remain unchanged
// work to be done to decide if the VV needs updating here, pending CBG-3503
case SGWriteEvent:
// add a new entry to the version vector
newVVEntry := CurrentVersionVector{}
Expand All @@ -903,6 +889,8 @@ func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent DocU
if err != nil {
return nil, err
}
// update the cvCAS on the SGWrite event too
d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue
}
return d, nil
}
Expand Down Expand Up @@ -946,9 +934,7 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod
return "", nil, err
}

docUpdateEvent := DocUpdateEvent{
SGWriteEvent,
}
docUpdateEvent := SGWriteEvent
allowImport := db.UseXattrs()
doc, newRevID, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, expiry, nil, docUpdateEvent, nil, func(doc *Document) (resultDoc *Document, resultAttachmentData AttachmentData, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
var isSgWrite bool
Expand Down Expand Up @@ -1074,9 +1060,7 @@ func (db *DatabaseCollectionWithUser) PutExistingRevWithConflictResolution(ctx c
return nil, "", base.HTTPErrorf(http.StatusBadRequest, "Invalid revision ID")
}

docUpdateEvent := DocUpdateEvent{
BlipWriteEvent,
}
docUpdateEvent := BlipWriteEvent
allowImport := db.UseXattrs()
doc, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, newDoc.DocExpiry, nil, docUpdateEvent, existingDoc, func(doc *Document) (resultDoc *Document, resultAttachmentData AttachmentData, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
// (Be careful: this block can be invoked multiple times if there are races!)
Expand Down Expand Up @@ -1878,7 +1862,7 @@ type updateAndReturnDocCallback func(*Document) (resultDoc *Document, resultAtta
// 1. Receive the updated document body in the response
// 2. Specify the existing document body/xattr/cas, to avoid initial retrieval of the doc in cases that the current contents are already known (e.g. import).
// On cas failure, the document will still be reloaded from the bucket as usual.
func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, docid string, allowImport bool, expiry uint32, opts *sgbucket.MutateInOptions, docUpdateEvent DocUpdateEvent, existingDoc *sgbucket.BucketDocument, callback updateAndReturnDocCallback) (doc *Document, newRevID string, err error) {
func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, docid string, allowImport bool, expiry uint32, opts *sgbucket.MutateInOptions, docUpdateEvent uint32, existingDoc *sgbucket.BucketDocument, callback updateAndReturnDocCallback) (doc *Document, newRevID string, err error) {

key := realDocID(docid)
if key == "" {
Expand Down Expand Up @@ -1958,7 +1942,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
return
}
// update the mutate in options based on the above logic
updatedSpec = updateMutateInSpec(doc.SyncData.HLV)
updatedSpec = doc.SyncData.HLV.computeMacroExpansions()

// Check whether Sync Data originated in body
if currentXattr == nil && doc.Sequence > 0 {
Expand Down
4 changes: 1 addition & 3 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ const (
BlipWriteEvent
)

type DocUpdateEvent struct {
eventType uint32
}
var eventType uint32

const (
DefaultRevsLimitNoConflicts = 50
Expand Down
36 changes: 18 additions & 18 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,24 @@ type ChannelSetEntry struct {

// The sync-gateway metadata stored in the "_sync" property of a Couchbase document.
type SyncData struct {
CurrentRev string `json:"rev"`
NewestRev string `json:"new_rev,omitempty"` // Newest rev, if different from CurrentRev
Flags uint8 `json:"flags,omitempty"`
Sequence uint64 `json:"sequence,omitempty"`
UnusedSequences []uint64 `json:"unused_sequences,omitempty"` // unused sequences due to update conflicts/CAS retry
RecentSequences []uint64 `json:"recent_sequences,omitempty"` // recent sequences for this doc - used in server dedup handling
Channels channels.ChannelMap `json:"channels,omitempty"`
Access UserAccessMap `json:"access,omitempty"`
RoleAccess UserAccessMap `json:"role_access,omitempty"`
Expiry *time.Time `json:"exp,omitempty"` // Document expiry. Information only - actual expiry/delete handling is done by bucket storage. Needs to be pointer for omitempty to work (see https://github.com/golang/go/issues/4357)
Cas string `json:"cas"` // String representation of a cas value, populated via macro expansion
Crc32c string `json:"value_crc32c"` // String representation of crc32c hash of doc body, populated via macro expansion
Crc32cUserXattr string `json:"user_xattr_value_crc32c,omitempty"` // String representation of crc32c hash of user xattr
TombstonedAt int64 `json:"tombstoned_at,omitempty"` // Time the document was tombstoned. Used for view compaction
Attachments AttachmentsMeta `json:"attachments,omitempty"`
ChannelSet []ChannelSetEntry `json:"channel_set"`
ChannelSetHistory []ChannelSetEntry `json:"channel_set_history"`
HLV HybridLogicalVector `json:"_vv,omitempty"`
CurrentRev string `json:"rev"`
NewestRev string `json:"new_rev,omitempty"` // Newest rev, if different from CurrentRev
Flags uint8 `json:"flags,omitempty"`
Sequence uint64 `json:"sequence,omitempty"`
UnusedSequences []uint64 `json:"unused_sequences,omitempty"` // unused sequences due to update conflicts/CAS retry
RecentSequences []uint64 `json:"recent_sequences,omitempty"` // recent sequences for this doc - used in server dedup handling
Channels channels.ChannelMap `json:"channels,omitempty"`
Access UserAccessMap `json:"access,omitempty"`
RoleAccess UserAccessMap `json:"role_access,omitempty"`
Expiry *time.Time `json:"exp,omitempty"` // Document expiry. Information only - actual expiry/delete handling is done by bucket storage. Needs to be pointer for omitempty to work (see https://github.com/golang/go/issues/4357)
Cas string `json:"cas"` // String representation of a cas value, populated via macro expansion
Crc32c string `json:"value_crc32c"` // String representation of crc32c hash of doc body, populated via macro expansion
Crc32cUserXattr string `json:"user_xattr_value_crc32c,omitempty"` // String representation of crc32c hash of user xattr
TombstonedAt int64 `json:"tombstoned_at,omitempty"` // Time the document was tombstoned. Used for view compaction
Attachments AttachmentsMeta `json:"attachments,omitempty"`
ChannelSet []ChannelSetEntry `json:"channel_set"`
ChannelSetHistory []ChannelSetEntry `json:"channel_set_history"`
HLV *HybridLogicalVector `json:"_vv,omitempty"`

// Only used for performance metrics:
TimeSaved time.Time `json:"time_saved,omitempty"` // Timestamp of save.
Expand Down
19 changes: 19 additions & 0 deletions db/hybrid_logical_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@ package db

import (
"fmt"
"math"

sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbase/sync_gateway/base"
)

// hlvExpandMacroCASValue causes the field to be populated by CAS value by macro expansion
const hlvExpandMacroCASValue = math.MaxUint64

type HybridLogicalVector struct {
CurrentVersionCAS uint64 // current version cas (or cvCAS) stores the current CAS at the time of replication
SourceID string // source bucket uuid of where this entry originated from
Expand Down Expand Up @@ -261,3 +266,17 @@ func convertMapToInMemoryFormat(persistedMap map[string]string) map[string]uint6
}
return returnedMap
}

// computeMacroExpansions returns the mutate in spec needed for the document update based off the outcome in updateHLV
func (hlv *HybridLogicalVector) computeMacroExpansions() []sgbucket.MacroExpansionSpec {
var outputSpec []sgbucket.MacroExpansionSpec
if hlv.Version == hlvExpandMacroCASValue {
spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionPath(base.SyncXattrName), sgbucket.MacroCas)
outputSpec = append(outputSpec, spec)
}
if hlv.CurrentVersionCAS == hlvExpandMacroCASValue {
spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionCASPath(base.SyncXattrName), sgbucket.MacroCas)
outputSpec = append(outputSpec, spec)
}
return outputSpec
}
4 changes: 1 addition & 3 deletions db/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,7 @@ func (db *DatabaseCollectionWithUser) importDoc(ctx context.Context, docid strin
existingDoc.Expiry = *expiry
}

docUpdateEvent := DocUpdateEvent{
ImportEvent,
}
docUpdateEvent := ImportEvent
docOut, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, true, existingDoc.Expiry, mutationOptions, docUpdateEvent, existingDoc, func(doc *Document) (resultDocument *Document, resultAttachmentData AttachmentData, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
// Perform cas mismatch check first, as we want to identify cas mismatch before triggering migrate handling.
// If there's a cas mismatch, the doc has been updated since the version that triggered the import. Handling depends on import mode.
Expand Down
5 changes: 0 additions & 5 deletions db/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,6 @@ func TestImportWithCasFailureUpdate(t *testing.T) {
null
]
},
"_vv":{
"cvCas":"0x0",
"src":"abc",
"vrs":"0x0"
},
"cas": "",
"time_saved": "2017-11-29T12:46:13.456631-08:00"
}`
Expand Down
3 changes: 3 additions & 0 deletions rest/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2765,6 +2765,7 @@ func TestPutDocUpdateVersionVector(t *testing.T) {

assert.Equal(t, bucketUUID, syncData.HLV.SourceID)
assert.Equal(t, uintCAS, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)

// Put a new revision of this doc and assert that the version vector SourceID and Version is updated
resp = rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc1?rev="+syncData.CurrentRev, `{"key1": "value1"}`)
Expand All @@ -2776,6 +2777,7 @@ func TestPutDocUpdateVersionVector(t *testing.T) {

assert.Equal(t, bucketUUID, syncData.HLV.SourceID)
assert.Equal(t, uintCAS, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)

// Delete doc and assert that the version vector SourceID and Version is updated
resp = rt.SendAdminRequest(http.MethodDelete, "/{{.keyspace}}/doc1?rev="+syncData.CurrentRev, "")
Expand All @@ -2787,6 +2789,7 @@ func TestPutDocUpdateVersionVector(t *testing.T) {

assert.Equal(t, bucketUUID, syncData.HLV.SourceID)
assert.Equal(t, uintCAS, syncData.HLV.Version)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
}

func TestTombstoneCompactionAPI(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion rest/replicatortest/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8486,7 +8486,7 @@ func TestReplicatorUpdateHLVOnPut(t *testing.T) {

assert.Equal(t, activeBucketUUID, syncData.HLV.SourceID)
assert.Equal(t, uintCAS, syncData.HLV.Version)
assert.Equal(t, uint64(0), syncData.HLV.CurrentVersionCAS)
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)

// create the replication to push the doc to the passive node and wait for the doc to be replicated
activeRT.CreateReplication(rep, remoteURL, db.ActiveReplicatorTypePush, nil, false, db.ConflictResolverDefault)
Expand Down

0 comments on commit 6b688cb

Please sign in to comment.