diff --git a/db/attachment_test.go b/db/attachment_test.go index 9f4d830609..6db8c1bf51 100644 --- a/db/attachment_test.go +++ b/db/attachment_test.go @@ -807,12 +807,7 @@ func TestMigrateBodyAttachments(t *testing.T) { }, "channels": { "ABC": null - }, - "_vv":{ - "cvCas":"0x0", - "src":"abc", - "vrs":"0x0" - } + } }` var bodyVal map[string]interface{} @@ -1099,11 +1094,6 @@ func TestMigrateBodyAttachmentsMerge(t *testing.T) { "channels": { "ABC": null }, - "_vv":{ - "cvCas":"0x0", - "src":"abc", - "vrs":"0x0" - }, "attachments": { "bye.txt": { "digest": "sha1-l+N7VpXGnoxMm8xfvtWPbz2YvDc=", diff --git a/db/crud.go b/db/crud.go index 498a7c336a..6a6156266b 100644 --- a/db/crud.go +++ b/db/crud.go @@ -26,8 +26,6 @@ import ( const ( kMaxRecentSequences = 20 // Maximum number of sequences stored in RecentSequences before pruning is triggered - // hlvExpandMacroCASValue causes the field to be populated by CAS value by macro expansion - hlvExpandMacroCASValue = math.MaxUint64 ) // ErrForbidden is returned when the user requests a document without a revision that they do not have access to. @@ -871,29 +869,17 @@ func (db *DatabaseCollectionWithUser) OnDemandImportForWrite(ctx context.Context return nil } -// updateMutateInSpec returns the mutate in spec needed for the document update based off the outcome in updateHLV -func updateMutateInSpec(hlv HybridLogicalVector) []sgbucket.MacroExpansionSpec { - var outputSpec []sgbucket.MacroExpansionSpec - if hlv.Version == math.MaxUint64 { - spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionPath(base.SyncXattrName), sgbucket.MacroCas) - outputSpec = append(outputSpec, spec) - } - if hlv.CurrentVersionCAS == math.MaxUint64 { - spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionCASPath(base.SyncXattrName), sgbucket.MacroCas) - outputSpec = append(outputSpec, spec) - } - return outputSpec -} - // updateHLV updates the HLV in the sync data appropriately based on what type of document update event we are encountering -func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent DocUpdateEvent) (*Document, error) { - - switch docUpdateEvent.eventType { +func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent uint32) (*Document, error) { + if d.HLV == nil { + d.HLV = &HybridLogicalVector{} + } + switch docUpdateEvent { case BlipWriteEvent: // preserve any other logic on the HLV that has been done by the client, only update to cvCAS will be needed d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue case ImportEvent: - // VV should remain unchanged + // work to be done to decide if the VV needs updating here, pending CBG-3503 case SGWriteEvent: // add a new entry to the version vector newVVEntry := CurrentVersionVector{} @@ -903,6 +889,8 @@ func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent DocU if err != nil { return nil, err } + // update the cvCAS on the SGWrite event too + d.HLV.CurrentVersionCAS = hlvExpandMacroCASValue } return d, nil } @@ -946,9 +934,7 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod return "", nil, err } - docUpdateEvent := DocUpdateEvent{ - SGWriteEvent, - } + docUpdateEvent := SGWriteEvent allowImport := db.UseXattrs() doc, newRevID, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, expiry, nil, docUpdateEvent, nil, func(doc *Document) (resultDoc *Document, resultAttachmentData AttachmentData, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) { var isSgWrite bool @@ -1074,9 +1060,7 @@ func (db *DatabaseCollectionWithUser) PutExistingRevWithConflictResolution(ctx c return nil, "", base.HTTPErrorf(http.StatusBadRequest, "Invalid revision ID") } - docUpdateEvent := DocUpdateEvent{ - BlipWriteEvent, - } + docUpdateEvent := BlipWriteEvent allowImport := db.UseXattrs() doc, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, newDoc.DocExpiry, nil, docUpdateEvent, existingDoc, func(doc *Document) (resultDoc *Document, resultAttachmentData AttachmentData, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) { // (Be careful: this block can be invoked multiple times if there are races!) @@ -1878,7 +1862,7 @@ type updateAndReturnDocCallback func(*Document) (resultDoc *Document, resultAtta // 1. Receive the updated document body in the response // 2. Specify the existing document body/xattr/cas, to avoid initial retrieval of the doc in cases that the current contents are already known (e.g. import). // On cas failure, the document will still be reloaded from the bucket as usual. -func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, docid string, allowImport bool, expiry uint32, opts *sgbucket.MutateInOptions, docUpdateEvent DocUpdateEvent, existingDoc *sgbucket.BucketDocument, callback updateAndReturnDocCallback) (doc *Document, newRevID string, err error) { +func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, docid string, allowImport bool, expiry uint32, opts *sgbucket.MutateInOptions, docUpdateEvent uint32, existingDoc *sgbucket.BucketDocument, callback updateAndReturnDocCallback) (doc *Document, newRevID string, err error) { key := realDocID(docid) if key == "" { @@ -1958,7 +1942,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do return } // update the mutate in options based on the above logic - updatedSpec = updateMutateInSpec(doc.SyncData.HLV) + updatedSpec = doc.SyncData.HLV.computeMacroExpansions() // Check whether Sync Data originated in body if currentXattr == nil && doc.Sequence > 0 { diff --git a/db/database.go b/db/database.go index 6ae7f288da..28cf5ca143 100644 --- a/db/database.go +++ b/db/database.go @@ -54,9 +54,7 @@ const ( BlipWriteEvent ) -type DocUpdateEvent struct { - eventType uint32 -} +var eventType uint32 const ( DefaultRevsLimitNoConflicts = 50 diff --git a/db/document.go b/db/document.go index 7b367770c4..c9803d06f6 100644 --- a/db/document.go +++ b/db/document.go @@ -65,24 +65,24 @@ type ChannelSetEntry struct { // The sync-gateway metadata stored in the "_sync" property of a Couchbase document. type SyncData struct { - CurrentRev string `json:"rev"` - NewestRev string `json:"new_rev,omitempty"` // Newest rev, if different from CurrentRev - Flags uint8 `json:"flags,omitempty"` - Sequence uint64 `json:"sequence,omitempty"` - UnusedSequences []uint64 `json:"unused_sequences,omitempty"` // unused sequences due to update conflicts/CAS retry - RecentSequences []uint64 `json:"recent_sequences,omitempty"` // recent sequences for this doc - used in server dedup handling - Channels channels.ChannelMap `json:"channels,omitempty"` - Access UserAccessMap `json:"access,omitempty"` - RoleAccess UserAccessMap `json:"role_access,omitempty"` - Expiry *time.Time `json:"exp,omitempty"` // Document expiry. Information only - actual expiry/delete handling is done by bucket storage. Needs to be pointer for omitempty to work (see https://github.com/golang/go/issues/4357) - Cas string `json:"cas"` // String representation of a cas value, populated via macro expansion - Crc32c string `json:"value_crc32c"` // String representation of crc32c hash of doc body, populated via macro expansion - Crc32cUserXattr string `json:"user_xattr_value_crc32c,omitempty"` // String representation of crc32c hash of user xattr - TombstonedAt int64 `json:"tombstoned_at,omitempty"` // Time the document was tombstoned. Used for view compaction - Attachments AttachmentsMeta `json:"attachments,omitempty"` - ChannelSet []ChannelSetEntry `json:"channel_set"` - ChannelSetHistory []ChannelSetEntry `json:"channel_set_history"` - HLV HybridLogicalVector `json:"_vv,omitempty"` + CurrentRev string `json:"rev"` + NewestRev string `json:"new_rev,omitempty"` // Newest rev, if different from CurrentRev + Flags uint8 `json:"flags,omitempty"` + Sequence uint64 `json:"sequence,omitempty"` + UnusedSequences []uint64 `json:"unused_sequences,omitempty"` // unused sequences due to update conflicts/CAS retry + RecentSequences []uint64 `json:"recent_sequences,omitempty"` // recent sequences for this doc - used in server dedup handling + Channels channels.ChannelMap `json:"channels,omitempty"` + Access UserAccessMap `json:"access,omitempty"` + RoleAccess UserAccessMap `json:"role_access,omitempty"` + Expiry *time.Time `json:"exp,omitempty"` // Document expiry. Information only - actual expiry/delete handling is done by bucket storage. Needs to be pointer for omitempty to work (see https://github.com/golang/go/issues/4357) + Cas string `json:"cas"` // String representation of a cas value, populated via macro expansion + Crc32c string `json:"value_crc32c"` // String representation of crc32c hash of doc body, populated via macro expansion + Crc32cUserXattr string `json:"user_xattr_value_crc32c,omitempty"` // String representation of crc32c hash of user xattr + TombstonedAt int64 `json:"tombstoned_at,omitempty"` // Time the document was tombstoned. Used for view compaction + Attachments AttachmentsMeta `json:"attachments,omitempty"` + ChannelSet []ChannelSetEntry `json:"channel_set"` + ChannelSetHistory []ChannelSetEntry `json:"channel_set_history"` + HLV *HybridLogicalVector `json:"_vv,omitempty"` // Only used for performance metrics: TimeSaved time.Time `json:"time_saved,omitempty"` // Timestamp of save. diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index 03cdc0f669..5f4623469e 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -10,10 +10,15 @@ package db import ( "fmt" + "math" + sgbucket "github.com/couchbase/sg-bucket" "github.com/couchbase/sync_gateway/base" ) +// hlvExpandMacroCASValue causes the field to be populated by CAS value by macro expansion +const hlvExpandMacroCASValue = math.MaxUint64 + type HybridLogicalVector struct { CurrentVersionCAS uint64 // current version cas (or cvCAS) stores the current CAS at the time of replication SourceID string // source bucket uuid of where this entry originated from @@ -261,3 +266,17 @@ func convertMapToInMemoryFormat(persistedMap map[string]string) map[string]uint6 } return returnedMap } + +// computeMacroExpansions returns the mutate in spec needed for the document update based off the outcome in updateHLV +func (hlv *HybridLogicalVector) computeMacroExpansions() []sgbucket.MacroExpansionSpec { + var outputSpec []sgbucket.MacroExpansionSpec + if hlv.Version == hlvExpandMacroCASValue { + spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionPath(base.SyncXattrName), sgbucket.MacroCas) + outputSpec = append(outputSpec, spec) + } + if hlv.CurrentVersionCAS == hlvExpandMacroCASValue { + spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionCASPath(base.SyncXattrName), sgbucket.MacroCas) + outputSpec = append(outputSpec, spec) + } + return outputSpec +} diff --git a/db/import.go b/db/import.go index 0bc64a52eb..9ca1a6e886 100644 --- a/db/import.go +++ b/db/import.go @@ -139,9 +139,7 @@ func (db *DatabaseCollectionWithUser) importDoc(ctx context.Context, docid strin existingDoc.Expiry = *expiry } - docUpdateEvent := DocUpdateEvent{ - ImportEvent, - } + docUpdateEvent := ImportEvent docOut, _, err = db.updateAndReturnDoc(ctx, newDoc.ID, true, existingDoc.Expiry, mutationOptions, docUpdateEvent, existingDoc, func(doc *Document) (resultDocument *Document, resultAttachmentData AttachmentData, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) { // Perform cas mismatch check first, as we want to identify cas mismatch before triggering migrate handling. // If there's a cas mismatch, the doc has been updated since the version that triggered the import. Handling depends on import mode. diff --git a/db/import_test.go b/db/import_test.go index 8852577825..1d514d33b5 100644 --- a/db/import_test.go +++ b/db/import_test.go @@ -283,11 +283,6 @@ func TestImportWithCasFailureUpdate(t *testing.T) { null ] }, - "_vv":{ - "cvCas":"0x0", - "src":"abc", - "vrs":"0x0" - }, "cas": "", "time_saved": "2017-11-29T12:46:13.456631-08:00" }` diff --git a/rest/api_test.go b/rest/api_test.go index 9752cf7059..56115184f3 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -2765,6 +2765,7 @@ func TestPutDocUpdateVersionVector(t *testing.T) { assert.Equal(t, bucketUUID, syncData.HLV.SourceID) assert.Equal(t, uintCAS, syncData.HLV.Version) + assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) // Put a new revision of this doc and assert that the version vector SourceID and Version is updated resp = rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc1?rev="+syncData.CurrentRev, `{"key1": "value1"}`) @@ -2776,6 +2777,7 @@ func TestPutDocUpdateVersionVector(t *testing.T) { assert.Equal(t, bucketUUID, syncData.HLV.SourceID) assert.Equal(t, uintCAS, syncData.HLV.Version) + assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) // Delete doc and assert that the version vector SourceID and Version is updated resp = rt.SendAdminRequest(http.MethodDelete, "/{{.keyspace}}/doc1?rev="+syncData.CurrentRev, "") @@ -2787,6 +2789,7 @@ func TestPutDocUpdateVersionVector(t *testing.T) { assert.Equal(t, bucketUUID, syncData.HLV.SourceID) assert.Equal(t, uintCAS, syncData.HLV.Version) + assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) } func TestTombstoneCompactionAPI(t *testing.T) { diff --git a/rest/replicatortest/replicator_test.go b/rest/replicatortest/replicator_test.go index bf4ad5705c..26405804c0 100644 --- a/rest/replicatortest/replicator_test.go +++ b/rest/replicatortest/replicator_test.go @@ -8486,7 +8486,7 @@ func TestReplicatorUpdateHLVOnPut(t *testing.T) { assert.Equal(t, activeBucketUUID, syncData.HLV.SourceID) assert.Equal(t, uintCAS, syncData.HLV.Version) - assert.Equal(t, uint64(0), syncData.HLV.CurrentVersionCAS) + assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS) // create the replication to push the doc to the passive node and wait for the doc to be replicated activeRT.CreateReplication(rep, remoteURL, db.ActiveReplicatorTypePush, nil, false, db.ConflictResolverDefault)