From 629ceb8ff7365ad35e00e39f28b4d59892793eae Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Thu, 4 Apr 2024 19:39:23 -0400 Subject: [PATCH] Combine MutationStore2, Collection, DataStoreName into DataStore (#6752) * Combine MutationStore2, Collection, DataStoreName into DataStore This is cleanup from the introduction to rosmar. - rosmar.Collection and Collection already satisified the DataStoreName interface, so putting it as part of DataStore removes need for AsDataStoreName checks. - remove GetFeedType() checks since it will always return DCP - Remove StartTapFeed from leaky bucket because it was not being used. TestStopChangeCache was only running with xattrs disabled, but the test would pass even if the events arrived out of order, so the leaky bucket tap code was not being executed. - Reflect in documentation that DCP feed is always used. * Replace leaky bucket code for DCPMissingDocs --- base/bucket.go | 29 --- base/collection.go | 15 -- base/leaky_bucket.go | 213 +----------------- base/leaky_bucket_test.go | 70 ------ base/leaky_datastore.go | 14 +- base/main_test_bucket_pool.go | 2 +- db/change_cache_test.go | 19 +- db/change_listener.go | 59 +---- db/crud.go | 2 +- db/database.go | 9 +- db/database_collection.go | 12 +- db/database_test.go | 7 +- db/indextest/main_test.go | 8 +- db/util_testing.go | 16 +- docs/api/components/schemas.yaml | 3 +- go.mod | 4 +- go.sum | 4 + rest/adminapitest/admin_api_test.go | 5 +- .../collections_admin_api_test.go | 12 +- rest/api_collections_test.go | 8 +- rest/config.go | 6 +- rest/replicatortest/replicator_test.go | 4 +- rest/server_context.go | 18 +- rest/utilities_testing_test.go | 20 +- 24 files changed, 75 insertions(+), 484 deletions(-) delete mode 100644 base/leaky_bucket_test.go diff --git a/base/bucket.go b/base/bucket.go index dfe23ae88a..d49acb513d 100644 --- a/base/bucket.go +++ b/base/bucket.go @@ -30,11 +30,6 @@ import ( pkgerrors "github.com/pkg/errors" ) -const ( - TapFeedType = "tap" - DcpFeedType = "dcp" -) - const ( DefaultPool = "default" ) @@ -92,12 +87,6 @@ func GetBaseDataStore(ds DataStore) DataStore { return ds } -// AsDataStoreName is a temporary thing until DataStoreName is implemented on wrappers (pending further design work on FQName...) -func AsDataStoreName(ds DataStore) (sgbucket.DataStoreName, bool) { - dsn, ok := GetBaseDataStore(ds).(sgbucket.DataStoreName) - return dsn, ok -} - func init() { // Increase max memcached request size to 20M bytes, to support large docs (attachments!) // arriving in a tap feed. (see issues #210, #333, #342) @@ -372,24 +361,6 @@ func IsCasMismatch(err error) bool { return false } -// Returns mutation feed type for bucket. Will first return the feed type from the spec, when present. If not found, returns default feed type for bucket -// (DCP for any couchbase bucket, TAP otherwise) -func GetFeedType(bucket Bucket) (feedType string) { - switch typedBucket := bucket.(type) { - case *GocbV2Bucket: - return DcpFeedType - case sgbucket.MutationFeedStore2: - return string(typedBucket.GetFeedType()) - case *LeakyBucket: - return GetFeedType(typedBucket.bucket) - case *TestBucket: - return GetFeedType(typedBucket.Bucket) - default: - // unknown bucket type? - return TapFeedType - } -} - // Gets the bucket max TTL, or 0 if no TTL was set. Sync gateway should fail to bring the DB online if this is non-zero, // since it's not meant to operate against buckets that auto-delete data. func getMaxTTL(ctx context.Context, store CouchbaseBucketStore) (int, error) { diff --git a/base/collection.go b/base/collection.go index 994c3d0735..475b0e2ab3 100644 --- a/base/collection.go +++ b/base/collection.go @@ -289,10 +289,6 @@ func (b *GocbV2Bucket) StartDCPFeed(ctx context.Context, args sgbucket.FeedArgum return StartGocbDCPFeed(ctx, b, b.Spec.BucketName, args, callback, dbStats, DCPMetadataStoreInMemory, groupID) } -func (b *GocbV2Bucket) StartTapFeed(args sgbucket.FeedArguments, dbStats *expvar.Map) (sgbucket.MutationFeed, error) { - return nil, errors.New("StartTapFeed not implemented") -} - func (b *GocbV2Bucket) GetStatsVbSeqno(maxVbno uint16, useAbsHighSeqNo bool) (uuids map[uint16]uint64, highSeqnos map[uint16]uint64, seqErr error) { agent, agentErr := b.getGoCBAgent() @@ -721,14 +717,3 @@ func (b *GocbV2Bucket) ServerMetrics(ctx context.Context) (map[string]*dto.Metri return mf, nil } - -func GetCollectionID(dataStore DataStore) uint32 { - switch c := dataStore.(type) { - case WrappingDatastore: - return GetCollectionID(c.GetUnderlyingDataStore()) - case sgbucket.Collection: - return c.GetCollectionID() - default: - return DefaultCollectionID - } -} diff --git a/base/leaky_bucket.go b/base/leaky_bucket.go index 988f86be3b..f34eda63a8 100644 --- a/base/leaky_bucket.go +++ b/base/leaky_bucket.go @@ -13,8 +13,6 @@ import ( "context" "expvar" "fmt" - "math" - "time" sgbucket "github.com/couchbase/sg-bucket" ) @@ -123,12 +121,7 @@ type LeakyBucketConfig struct { DDocDeleteErrorCount int DDocGetErrorCount int - // Emulate TAP/DCP feed de-dupliation behavior, such that within a - // window of # of mutations or a timeout, mutations for a given document - // will be filtered such that only the _latest_ mutation will make it through. - TapFeedDeDuplication bool - TapFeedVbuckets bool // Emulate vbucket numbers on feed - TapFeedMissingDocs []string // Emulate entry not appearing on tap feed + DCPFeedMissingDocs []string // Emulate entry not appearing on DCP feed ForceErrorSetRawKeys []string // Issuing a SetRaw call with a specified key will return an error @@ -161,209 +154,17 @@ type LeakyBucketConfig struct { IgnoreClose bool } -func (b *LeakyBucket) StartTapFeed(args sgbucket.FeedArguments, dbStats *expvar.Map) (sgbucket.MutationFeed, error) { - - if b.config.TapFeedDeDuplication { - return b.wrapFeedForDeduplication(args, dbStats) - } else if len(b.config.TapFeedMissingDocs) > 0 { - callback := func(event *sgbucket.FeedEvent) bool { - for _, key := range b.config.TapFeedMissingDocs { +func (b *LeakyBucket) StartDCPFeed(ctx context.Context, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map) error { + if len(b.config.DCPFeedMissingDocs) > 0 { + wrappedCallback := func(event sgbucket.FeedEvent) bool { + for _, key := range b.config.DCPFeedMissingDocs { if string(event.Key) == key { return false } } - return true - } - return b.wrapFeed(args, callback, dbStats) - } else if b.config.TapFeedVbuckets { - // kick off the wrapped sgbucket tap feed - walrusTapFeed, err := b.bucket.StartTapFeed(args, dbStats) - if err != nil { - return walrusTapFeed, err - } - // this is the sgbucket.MutationFeed impl we'll return to callers, which - // will add vbucket information - channel := make(chan sgbucket.FeedEvent, 10) - vbTapFeed := &wrappedTapFeedImpl{ - channel: channel, - wrappedTapFeed: walrusTapFeed, + return callback(event) } - go func() { - for event := range walrusTapFeed.Events() { - key := string(event.Key) - event.VbNo = uint16(sgbucket.VBHash(key, 1024)) - vbTapFeed.channel <- event - } - close(vbTapFeed.channel) - }() - return vbTapFeed, nil - - } else { - return b.bucket.StartTapFeed(args, dbStats) + return b.bucket.StartDCPFeed(ctx, args, wrappedCallback, dbStats) } - -} - -func (b *LeakyBucket) StartDCPFeed(ctx context.Context, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map) error { return b.bucket.StartDCPFeed(ctx, args, callback, dbStats) } - -type EventUpdateFunc func(event *sgbucket.FeedEvent) bool - -func (b *LeakyBucket) wrapFeed(args sgbucket.FeedArguments, callback EventUpdateFunc, dbStats *expvar.Map) (sgbucket.MutationFeed, error) { - - // kick off the wrapped sgbucket tap feed - walrusTapFeed, err := b.bucket.StartTapFeed(args, dbStats) - if err != nil { - return walrusTapFeed, err - } - - // create an output channel - channel := make(chan sgbucket.FeedEvent, 10) - - // this is the sgbucket.MutationFeed impl we'll return to callers, which - // will have missing entries - wrapperFeed := &wrappedTapFeedImpl{ - channel: channel, - wrappedTapFeed: walrusTapFeed, - } - - go func() { - for event := range walrusTapFeed.Events() { - // Callback returns false if the event should be skipped - if callback(&event) { - wrapperFeed.channel <- event - } - } - close(wrapperFeed.channel) - }() - return wrapperFeed, nil -} - -func (b *LeakyBucket) wrapFeedForDeduplication(args sgbucket.FeedArguments, dbStats *expvar.Map) (sgbucket.MutationFeed, error) { - // create an output channel - // start a goroutine which reads off the sgbucket tap feed - // - de-duplicate certain events - // - puts them to output channel - - // the number of changes that it will buffer up before de-duplicating - deDuplicationWindowSize := 5 - - // the timeout window in milliseconds after which it will flush to output, even if - // the dedupe buffer has not filled up yet. - deDuplicationTimeoutMs := time.Millisecond * 1000 - - // kick off the wrapped sgbucket tap feed - walrusTapFeed, err := b.bucket.StartTapFeed(args, dbStats) - if err != nil { - return walrusTapFeed, err - } - - // create an output channel for de-duplicated events - channel := make(chan sgbucket.FeedEvent, 10) - - // this is the sgbucket.MutationFeed impl we'll return to callers, which - // will reead from the de-duplicated events channel - dupeTapFeed := &wrappedTapFeedImpl{ - channel: channel, - wrappedTapFeed: walrusTapFeed, - } - - go func() { - defer close(dupeTapFeed.channel) - // the buffer to hold tap events that are candidates for de-duplication - deDupeBuffer := []sgbucket.FeedEvent{} - - timer := time.NewTimer(math.MaxInt64) - for { - select { - case tapEvent, ok := <-walrusTapFeed.Events(): - if !ok { - // channel closed, goroutine is done - // dedupe and send what we currently have - dedupeAndForward(deDupeBuffer, channel) - return - } - deDupeBuffer = append(deDupeBuffer, tapEvent) - - // if we've collected enough, dedeupe and send what we have, - // and reset buffer. - if len(deDupeBuffer) >= deDuplicationWindowSize { - dedupeAndForward(deDupeBuffer, channel) - deDupeBuffer = []sgbucket.FeedEvent{} - } else { - _ = timer.Reset(deDuplicationTimeoutMs) - } - - case <-timer.C: - if len(deDupeBuffer) > 0 { - // give up on waiting for the buffer to fill up, - // de-dupe and send what we currently have - dedupeAndForward(deDupeBuffer, channel) - deDupeBuffer = []sgbucket.FeedEvent{} - } - } - } - - }() - return dupeTapFeed, nil -} - -// An implementation of a sgbucket tap feed that wraps -// tap events on the upstream tap feed to better emulate real world -// TAP/DCP behavior. -type wrappedTapFeedImpl struct { - channel chan sgbucket.FeedEvent - wrappedTapFeed sgbucket.MutationFeed -} - -func (feed *wrappedTapFeedImpl) Close() error { - return feed.wrappedTapFeed.Close() -} - -func (feed *wrappedTapFeedImpl) Events() <-chan sgbucket.FeedEvent { - return feed.channel -} - -func (feed *wrappedTapFeedImpl) WriteEvents() chan<- sgbucket.FeedEvent { - return feed.channel -} - -func dedupeAndForward(tapEvents []sgbucket.FeedEvent, destChannel chan<- sgbucket.FeedEvent) { - - deduped := dedupeTapEvents(tapEvents) - - for _, tapEvent := range deduped { - destChannel <- tapEvent - } - -} - -func dedupeTapEvents(tapEvents []sgbucket.FeedEvent) []sgbucket.FeedEvent { - - // For each document key, keep track of the latest seen tapEvent - // doc1 -> tapEvent with Seq=1 - // doc2 -> tapEvent with Seq=5 - // (if tapEvent with Seq=7 comes in for doc1, it will clobber existing) - latestTapEventPerKey := map[string]sgbucket.FeedEvent{} - - for _, tapEvent := range tapEvents { - key := string(tapEvent.Key) - latestTapEventPerKey[key] = tapEvent - } - - // Iterate over the original tapEvents, and only keep what - // is in latestTapEventPerKey, and discard all previous mutations - // of that doc. This will preserve the original - // sequence order as read off the feed. - deduped := []sgbucket.FeedEvent{} - for _, tapEvent := range tapEvents { - latestTapEventForKey := latestTapEventPerKey[string(tapEvent.Key)] - if tapEvent.Cas == latestTapEventForKey.Cas { - deduped = append(deduped, tapEvent) - } - } - - return deduped - -} diff --git a/base/leaky_bucket_test.go b/base/leaky_bucket_test.go deleted file mode 100644 index cf3519e94f..0000000000 --- a/base/leaky_bucket_test.go +++ /dev/null @@ -1,70 +0,0 @@ -/* -Copyright 2016-Present Couchbase, Inc. - -Use of this software is governed by the Business Source License included in -the file licenses/BSL-Couchbase.txt. As of the Change Date specified in that -file, in accordance with the Business Source License, use of this software will -be governed by the Apache License, Version 2.0, included in the file -licenses/APL2.txt. -*/ - -package base - -import ( - "testing" - - sgbucket "github.com/couchbase/sg-bucket" - "github.com/stretchr/testify/assert" -) - -func TestDedupeTapEventsLaterSeqSameDoc(t *testing.T) { - - tapEvents := []sgbucket.FeedEvent{ - { - Opcode: sgbucket.FeedOpMutation, - Key: []byte("doc1"), - Value: []byte(`".."`), - Cas: 1, - }, - { - Opcode: sgbucket.FeedOpMutation, - Key: []byte("doc1"), - Value: []byte(`".."`), - Cas: 2, - }, - } - - deduped := dedupeTapEvents(tapEvents) - - // make sure that one was deduped - assert.Len(t, deduped, 1) - - // make sure the earlier event was deduped - dedupedEvent := deduped[0] - assert.True(t, dedupedEvent.Cas == 2) - -} - -func TestDedupeNoDedupeDifferentDocs(t *testing.T) { - - tapEvents := []sgbucket.FeedEvent{ - { - Opcode: sgbucket.FeedOpMutation, - Key: []byte("doc1"), - Value: []byte(`".."`), - Cas: 1, - }, - { - Opcode: sgbucket.FeedOpMutation, - Key: []byte("doc2"), - Value: []byte(`".."`), - Cas: 2, - }, - } - - deduped := dedupeTapEvents(tapEvents) - - // make sure that nothing was deduped - assert.True(t, len(deduped) == 2) - -} diff --git a/base/leaky_datastore.go b/base/leaky_datastore.go index ba1b581a43..b252d7cedf 100644 --- a/base/leaky_datastore.go +++ b/base/leaky_datastore.go @@ -67,12 +67,16 @@ func (lds *LeakyDataStore) GetName() string { return lds.dataStore.GetName() } +func (lds *LeakyDataStore) ScopeName() string { + return lds.dataStore.ScopeName() +} + +func (lds *LeakyDataStore) CollectionName() string { + return lds.dataStore.CollectionName() +} + func (lds *LeakyDataStore) GetCollectionID() uint32 { - if coll, ok := lds.dataStore.(sgbucket.Collection); ok { - return coll.GetCollectionID() - } else { - return DefaultCollectionID - } + return lds.dataStore.GetCollectionID() } func (lds *LeakyDataStore) Get(k string, rv interface{}) (cas uint64, err error) { diff --git a/base/main_test_bucket_pool.go b/base/main_test_bucket_pool.go index 8cab3f3c2d..5697f770ac 100644 --- a/base/main_test_bucket_pool.go +++ b/base/main_test_bucket_pool.go @@ -222,7 +222,7 @@ func (tbp *TestBucketPool) GetWalrusTestBucket(t testing.TB, url string) (b Buck } // Wrap Walrus buckets with a leaky bucket to support vbucket IDs on feed. - b = &LeakyBucket{bucket: walrusBucket, config: &LeakyBucketConfig{TapFeedVbuckets: true}} + b = &LeakyBucket{bucket: walrusBucket, config: &LeakyBucketConfig{}} ctx := bucketCtx(testCtx, b) tbp.Logf(ctx, "Creating new %s test bucket", typeName) diff --git a/db/change_cache_test.go b/db/change_cache_test.go index d1bfdf4be2..0d1afbfaf5 100644 --- a/db/change_cache_test.go +++ b/db/change_cache_test.go @@ -504,10 +504,6 @@ func WriteUserDirect(t *testing.T, db *Database, username string, sequence uint6 func WriteDirectWithKey(t *testing.T, db *Database, key string, channelArray []string, sequence uint64) { - if base.TestUseXattrs() { - panic(fmt.Sprintf("WriteDirectWithKey() cannot be used in tests that are xattr enabled")) - } - rev := "1-a" chanMap := make(map[string]*channels.ChannelRemoval, 10) @@ -521,8 +517,15 @@ func WriteDirectWithKey(t *testing.T, db *Database, key string, channelArray []s Channels: chanMap, TimeSaved: time.Now(), } + body := fmt.Sprintf(`{"key": "%s"}`, key) collection := GetSingleDatabaseCollectionWithUser(t, db) - _, _ = collection.dataStore.Add(key, 0, Body{base.SyncPropertyName: syncData, "key": key}) + if base.TestUseXattrs() { + _, err := collection.dataStore.WriteWithXattrs(base.TestCtx(t), key, 0, 0, []byte(body), map[string][]byte{base.SyncXattrName: base.MustJSONMarshal(t, syncData)}, nil) + require.NoError(t, err) + } else { + _, err := collection.dataStore.Add(key, 0, Body{base.SyncPropertyName: syncData, "key": key}) + require.NoError(t, err) + } } // Create a document directly to the bucket with specific _sync metadata - used for @@ -1304,10 +1307,6 @@ func TestStopChangeCache(t *testing.T) { base.SetUpTestLogging(t, base.LevelDebug, base.KeyChanges, base.KeyDCP) - if base.TestUseXattrs() { - t.Skip("This test does not work with XATTRs due to calling WriteDirect(). Skipping.") - } - // Setup short-wait cache to ensure cleanup goroutines fire often cacheOptions := DefaultCacheOptions() cacheOptions.CachePendingSeqMaxWait = 10 * time.Millisecond @@ -1316,7 +1315,7 @@ func TestStopChangeCache(t *testing.T) { // Use leaky bucket to have the tap feed 'lose' document 3 leakyConfig := base.LeakyBucketConfig{ - TapFeedMissingDocs: []string{"doc-3"}, + DCPFeedMissingDocs: []string{"doc-3"}, } db, ctx := setupTestLeakyDBWithCacheOptions(t, cacheOptions, leakyConfig) diff --git a/db/change_listener.go b/db/change_listener.go index a9041fb4ad..8316cdad54 100644 --- a/db/change_listener.go +++ b/db/change_listener.go @@ -13,7 +13,6 @@ package db import ( "context" "expvar" - "fmt" "math" "strings" "sync" @@ -77,18 +76,13 @@ func (listener *changeListener) Start(ctx context.Context, bucket base.Bucket, d // build the set of collections to be requested // Add the metadata collection first - metadataStoreName, ok := base.AsDataStoreName(metadataStore) - if !ok { - return fmt.Errorf("changeListener started with collections, but unable to retrieve metadata store name for %T", metadataStore) - } - metadataStoreFoundInScopes := false scopeArgs := make(map[string][]string) for scopeName, scope := range scopes { collections := make([]string, 0) for collectionName, _ := range scope.Collections { collections = append(collections, collectionName) - if scopeName == metadataStoreName.ScopeName() && collectionName == metadataStoreName.CollectionName() { + if scopeName == metadataStore.ScopeName() && collectionName == metadataStore.CollectionName() { metadataStoreFoundInScopes = true } } @@ -97,11 +91,11 @@ func (listener *changeListener) Start(ctx context.Context, bucket base.Bucket, d // If the metadataStore's collection isn't already present in the list of scopes, add it to the DCP scopes if !metadataStoreFoundInScopes { - _, ok = scopeArgs[metadataStoreName.ScopeName()] + _, ok := scopeArgs[metadataStore.ScopeName()] if !ok { - scopeArgs[metadataStoreName.ScopeName()] = []string{metadataStoreName.CollectionName()} + scopeArgs[metadataStore.ScopeName()] = []string{metadataStore.CollectionName()} } else { - scopeArgs[metadataStoreName.ScopeName()] = append(scopeArgs[metadataStoreName.ScopeName()], metadataStoreName.CollectionName()) + scopeArgs[metadataStore.ScopeName()] = append(scopeArgs[metadataStore.ScopeName()], metadataStore.CollectionName()) } } listener.FeedArgs.Scopes = scopeArgs @@ -118,38 +112,10 @@ func (listener *changeListener) StartMutationFeed(ctx context.Context, bucket ba } }() - // Uses DCP by default, unless TAP is explicitly specified - feedType := base.GetFeedType(bucket) - switch feedType { - case base.TapFeedType: - // TAP Feed - // TAP feed is a go-channel of Tap events served by the bucket. Start the feed, then - // start a goroutine to work the event channel, calling ProcessEvent for each event - var err error - listener.tapFeed, err = bucket.StartTapFeed(listener.FeedArgs, dbStats) - if err != nil { - return err - } - go func() { - defer func() { - if listener.FeedArgs.DoneChan != nil { - close(listener.FeedArgs.DoneChan) - } - }() - defer base.FatalPanicHandler() - defer listener.notifyStopping(ctx) - for event := range listener.tapFeed.Events() { - event.TimeReceived = time.Now() - listener.ProcessFeedEvent(event) - } - }() - return nil - default: - // DCP Feed - // DCP receiver isn't go-channel based - DCPReceiver calls ProcessEvent directly. - base.InfofCtx(ctx, base.KeyDCP, "Using DCP feed for bucket: %q (based on feed_type specified in config file)", base.MD(bucket.GetName())) - return bucket.StartDCPFeed(ctx, listener.FeedArgs, listener.ProcessFeedEvent, dbStats) - } + // DCP Feed + // DCP receiver isn't go-channel based - DCPReceiver calls ProcessEvent directly. + base.InfofCtx(ctx, base.KeyDCP, "Using DCP feed for bucket: %q (based on feed_type specified in config file)", base.MD(bucket.GetName())) + return bucket.StartDCPFeed(ctx, listener.FeedArgs, listener.ProcessFeedEvent, dbStats) } // ProcessFeedEvent is invoked for each mutate or delete event seen on the server's mutation feed (TAP or DCP). Uses document @@ -278,15 +244,6 @@ func (listener *changeListener) NotifyCheckForTermination(ctx context.Context, k listener.tapNotifier.L.Unlock() } -func (listener *changeListener) notifyStopping(ctx context.Context) { - listener.tapNotifier.L.Lock() - listener.counter = 0 - listener.keyCounts = map[string]uint64{} - base.DebugfCtx(ctx, base.KeyChanges, "Notifying that changeListener is stopping") - listener.tapNotifier.Broadcast() - listener.tapNotifier.L.Unlock() -} - // Waits until either the counter, or terminateCheckCounter exceeds the given value. Returns the new counters. func (listener *changeListener) Wait(ctx context.Context, keys []string, counter uint64, terminateCheckCounter uint64) (uint64, uint64) { listener.tapNotifier.L.Lock() diff --git a/db/crud.go b/db/crud.go index 57925d6f60..ac003a0dfb 100644 --- a/db/crud.go +++ b/db/crud.go @@ -1607,7 +1607,7 @@ func (db *DatabaseContext) assignSequence(ctx context.Context, docSequence uint6 doc.Sequence = docSequence doc.UnusedSequences = unusedSequences - // The server TAP/DCP feed will deduplicate multiple revisions for the same doc if they occur in + // The server DCP feed will deduplicate multiple revisions for the same doc if they occur in // the same mutation queue processing window. This results in missing sequences on the change listener. // To account for this, we track the recent sequence numbers for the document. if doc.RecentSequences == nil { diff --git a/db/database.go b/db/database.go index c6df802c70..8508e4bbf3 100644 --- a/db/database.go +++ b/db/database.go @@ -747,14 +747,7 @@ func (dbCtx *DatabaseContext) RemoveObsoleteIndexes(ctx context.Context, preview var errs *base.MultiError var removedIndexes []string for _, dataStore := range dbCtx.getDataStores() { - dsName, ok := base.AsDataStoreName(dataStore) - if !ok { - err := fmt.Sprintf("Cannot get datastore name from %s", dataStore) - base.WarnfCtx(ctx, err) - errs = errs.Append(errors.New(err)) - continue - } - collectionName := fmt.Sprintf("`%s`.`%s`", dsName.ScopeName(), dsName.CollectionName()) + collectionName := fmt.Sprintf("`%s`.`%s`", dataStore.ScopeName(), dataStore.CollectionName()) n1qlStore, ok := base.AsN1QLStore(dataStore) if !ok { err := fmt.Sprintf("Cannot remove obsolete indexes for non-gocb collection %s - skipping.", base.MD(collectionName)) diff --git a/db/database_collection.go b/db/database_collection.go index 75f4248ca2..f095495f9c 100644 --- a/db/database_collection.go +++ b/db/database_collection.go @@ -43,19 +43,14 @@ func newDatabaseCollection(ctx context.Context, dbContext *DatabaseContext, data dataStore: dataStore, dbCtx: dbContext, collectionStats: stats, + ScopeName: dataStore.ScopeName(), + Name: dataStore.CollectionName(), } dbCollection.revisionCache = NewRevisionCache( dbContext.Options.RevisionCacheOptions, dbCollection, dbContext.DbStats.Cache(), ) - if metadataStoreName, ok := base.AsDataStoreName(dataStore); ok { - dbCollection.ScopeName = metadataStoreName.ScopeName() - dbCollection.Name = metadataStoreName.CollectionName() - } else { - dbCollection.ScopeName = base.DefaultScope - dbCollection.Name = base.DefaultCollection - } return dbCollection, nil } @@ -141,8 +136,7 @@ func (c *DatabaseCollection) exitChanges() chan struct{} { // GetCollectionID returns a collectionID. If couchbase server does not return collections, it will return base.DefaultCollectionID, like the default collection for a Couchbase Server that does support collections. func (c *DatabaseCollection) GetCollectionID() uint32 { - ds := base.GetBaseDataStore(c.dataStore) - return base.GetCollectionID(ds) + return c.dataStore.GetCollectionID() } // GetRevisionCacheForTest allow accessing a copy of revision cache. diff --git a/db/database_test.go b/db/database_test.go index f64edb8e98..97c9330678 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -3025,9 +3025,6 @@ func TestGetDatabaseCollectionWithUserDefaultCollection(t *testing.T) { require.NoError(t, err) require.NotNil(t, ds) - dataStoreName, ok := base.AsDataStoreName(ds) - require.True(t, ok) - testCases := []struct { name string scope string @@ -3042,9 +3039,9 @@ func TestGetDatabaseCollectionWithUserDefaultCollection(t *testing.T) { err: false, options: DatabaseContextOptions{ Scopes: map[string]ScopeOptions{ - dataStoreName.ScopeName(): ScopeOptions{ + ds.ScopeName(): ScopeOptions{ Collections: map[string]CollectionOptions{ - dataStoreName.CollectionName(): {}, + ds.CollectionName(): {}, }, }, base.DefaultScope: ScopeOptions{ diff --git a/db/indextest/main_test.go b/db/indextest/main_test.go index dde193a570..0290ebc43b 100644 --- a/db/indextest/main_test.go +++ b/db/indextest/main_test.go @@ -81,12 +81,6 @@ var primaryIndexReadier base.TBPBucketReadierFunc = func(ctx context.Context, b if err != nil { return err } - dsName, ok := base.AsDataStoreName(dataStore) - if !ok { - err := fmt.Errorf("Could not determine datastore name from datastore: %+v", dataStore) - tbp.Logf(ctx, "%s", err) - return err - } tbp.Logf(ctx, "dropping existing bucket indexes") if err := db.EmptyPrimaryIndex(ctx, dataStore); err != nil { @@ -104,7 +98,7 @@ var primaryIndexReadier base.TBPBucketReadierFunc = func(ctx context.Context, b if len(indexes) != 1 && indexes[0] != base.PrimaryIndexName { return fmt.Errorf("expected only primary index to be present, found: %v", indexes) } - tbp.Logf(ctx, "waiting for empty bucket indexes %s.%s.%s", b.GetName(), dsName.ScopeName(), dsName.CollectionName()) + tbp.Logf(ctx, "waiting for empty bucket indexes %s.%s.%s", b.GetName(), dataStore.ScopeName(), dataStore.CollectionName()) // wait for primary index to be empty if err := db.WaitForPrimaryIndexEmpty(ctx, n1qlStore); err != nil { tbp.Logf(ctx, "waitForPrimaryIndexEmpty returned an error: %v", err) diff --git a/db/util_testing.go b/db/util_testing.go index 336943206e..d7a1fab4e3 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -301,12 +301,6 @@ var viewsAndGSIBucketReadier base.TBPBucketReadierFunc = func(ctx context.Contex if err != nil { return err } - dsName, ok := base.AsDataStoreName(dataStore) - if !ok { - err := fmt.Errorf("Could not determine datastore name from datastore: %+v", dataStore) - tbp.Logf(ctx, "%s", err) - return err - } if _, err := emptyAllDocsIndex(ctx, dataStore, tbp); err != nil { return err } @@ -317,7 +311,7 @@ var viewsAndGSIBucketReadier base.TBPBucketReadierFunc = func(ctx context.Contex if !ok { return errors.New("attempting to empty indexes with non-N1QL store") } - tbp.Logf(ctx, "waiting for empty bucket indexes %s.%s.%s", b.GetName(), dsName.ScopeName(), dsName.CollectionName()) + tbp.Logf(ctx, "waiting for empty bucket indexes %s.%s.%s", b.GetName(), dataStore.ScopeName(), dataStore.CollectionName()) // we can't init indexes concurrently, so we'll just wait for them to be empty after emptying instead of recreating. if err := WaitForPrimaryIndexEmpty(ctx, n1qlStore); err != nil { tbp.Logf(ctx, "waitForPrimaryIndexEmpty returned an error: %v", err) @@ -390,13 +384,7 @@ var viewsAndGSIBucketInit base.TBPBucketInitFunc = func(ctx context.Context, b b Serverless: false, MetadataIndexes: IndexesWithoutMetadata, } - dsName, ok := base.AsDataStoreName(dataStore) - if !ok { - err := fmt.Errorf("Could not determine datastore name from datastore: %+v", dataStore) - tbp.Logf(ctx, "%s", err) - return err - } - if base.IsDefaultCollection(dsName.ScopeName(), dsName.CollectionName()) { + if base.IsDefaultCollection(dataStore.ScopeName(), dataStore.CollectionName()) { options.MetadataIndexes = IndexesAll } if err := InitializeIndexes(ctx, n1qlStore, options); err != nil { diff --git a/docs/api/components/schemas.yaml b/docs/api/components/schemas.yaml index 644eecc46b..034430dce4 100644 --- a/docs/api/components/schemas.yaml +++ b/docs/api/components/schemas.yaml @@ -1273,12 +1273,11 @@ Database: db_state_changed: $ref: '#/Event-config' feed_type: - description: The type of feed to use to communicate with Couchbase Server. + description: The type of feed to use to communicate with Couchbase Server. This will use DCP regardless of specification. type: string default: DCP enum: - DCP - - TAP deprecated: true allow_empty_password: description: This controls whether users that are created can have an empty password or not. diff --git a/go.mod b/go.mod index 89a0bb7316..46921ec62f 100644 --- a/go.mod +++ b/go.mod @@ -13,10 +13,10 @@ require ( github.com/couchbase/gocbcore/v10 v10.3.1 github.com/couchbase/gomemcached v0.2.1 github.com/couchbase/goutils v0.1.2 - github.com/couchbase/sg-bucket v0.0.0-20240326230241-0b197e169b27 + github.com/couchbase/sg-bucket v0.0.0-20240402154301-12625d8851a8 github.com/couchbaselabs/go-fleecedelta v0.0.0-20220909152808-6d09efa7a338 github.com/couchbaselabs/gocbconnstr v1.0.5 - github.com/couchbaselabs/rosmar v0.0.0-20240326232309-04dfb3337b60 + github.com/couchbaselabs/rosmar v0.0.0-20240404180245-795e6df684f3 github.com/elastic/gosigar v0.14.3 github.com/felixge/fgprof v0.9.3 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index c2987f2e64..c59f8ba9ad 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,8 @@ github.com/couchbase/goutils v0.1.2 h1:gWr8B6XNWPIhfalHNog3qQKfGiYyh4K4VhO3P2o9B github.com/couchbase/goutils v0.1.2/go.mod h1:h89Ek/tiOxxqjz30nPPlwZdQbdB8BwgnuBxeoUe/ViE= github.com/couchbase/sg-bucket v0.0.0-20240326230241-0b197e169b27 h1:FGNvJsAQk6JZzuVXvoLXcoSQzOnQxWkywzYJFQqzXEg= github.com/couchbase/sg-bucket v0.0.0-20240326230241-0b197e169b27/go.mod h1:5me3TJLTPfR0s3aMJZcPoTu5FT8oelaINz5l7Q3cApE= +github.com/couchbase/sg-bucket v0.0.0-20240402154301-12625d8851a8 h1:kfWMYvUgSg2yIZJx+t63Ucl+zorvFqlYayXPkiXFtSE= +github.com/couchbase/sg-bucket v0.0.0-20240402154301-12625d8851a8/go.mod h1:5me3TJLTPfR0s3aMJZcPoTu5FT8oelaINz5l7Q3cApE= github.com/couchbase/tools-common/cloud v1.0.0 h1:SQZIccXoedbrThehc/r9BJbpi/JhwJ8X00PDjZ2gEBE= github.com/couchbase/tools-common/cloud v1.0.0/go.mod h1:6KVlRpbcnDWrvickUJ+xpqCWx1vgYYlEli/zL4xmZAg= github.com/couchbase/tools-common/fs v1.0.0 h1:HFA4xCF/r3BtZShFJUxzVvGuXtDkqGnaPzYJP3Kp1mw= @@ -74,6 +76,8 @@ github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20230515165046-68b522a21131 h1:2E github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20230515165046-68b522a21131/go.mod h1:o7T431UOfFVHDNvMBUmUxpHnhivwv7BziUao/nMl81E= github.com/couchbaselabs/rosmar v0.0.0-20240326232309-04dfb3337b60 h1:w9E8CEvQia8BPA+2Ai6dJh64wYTmxNUrXNPkKhtPpGw= github.com/couchbaselabs/rosmar v0.0.0-20240326232309-04dfb3337b60/go.mod h1:MnlZ8BXE9Z7rUQEyb069P/6E9+YVkUxcqW5cmN23h0I= +github.com/couchbaselabs/rosmar v0.0.0-20240404180245-795e6df684f3 h1:AUvojYsPc2WgiO9xRalRQLyXzooRRWemdEkiGl+PZno= +github.com/couchbaselabs/rosmar v0.0.0-20240404180245-795e6df684f3/go.mod h1:SM0w4YHwXFMIyfqUbkpXZNWwAQKLwsUH91fsKUooMqw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/rest/adminapitest/admin_api_test.go b/rest/adminapitest/admin_api_test.go index 48b0a3a524..601e8a3012 100644 --- a/rest/adminapitest/admin_api_test.go +++ b/rest/adminapitest/admin_api_test.go @@ -984,15 +984,12 @@ func TestResyncUsingDCPStreamForNamedCollection(t *testing.T) { dataStore1, err := rt.TestBucket.GetNamedDataStore(0) require.NoError(t, err) - dataStore1Name, ok := base.AsDataStoreName(dataStore1) - require.True(t, ok) - // Run resync for single collection // Request body {"scopes": "scopeName": ["collection1Name", "collection2Name"]}} body := fmt.Sprintf(`{ "scopes" :{ "%s": ["%s"] } - }`, dataStore1Name.ScopeName(), dataStore1Name.CollectionName()) + }`, dataStore1.ScopeName(), dataStore1.CollectionName()) resp := rt.SendAdminRequest("POST", "/db/_resync?action=start", body) rest.RequireStatus(t, resp, http.StatusOK) resyncManagerStatus := rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted) diff --git a/rest/adminapitest/collections_admin_api_test.go b/rest/adminapitest/collections_admin_api_test.go index 2c08d48811..43d30ce4bc 100644 --- a/rest/adminapitest/collections_admin_api_test.go +++ b/rest/adminapitest/collections_admin_api_test.go @@ -46,14 +46,10 @@ func TestCollectionsSyncImportFunctions(t *testing.T) { dataStore1, err := tb.GetNamedDataStore(0) require.NoError(t, err) - dataStore1Name, ok := base.AsDataStoreName(dataStore1) - require.True(t, ok) dataStore2, err := tb.GetNamedDataStore(1) require.NoError(t, err) - dataStore2Name, ok := base.AsDataStoreName(dataStore2) - require.True(t, ok) - keyspace1 := fmt.Sprintf("%s.%s.%s", "db", dataStore1Name.ScopeName(), dataStore1Name.CollectionName()) - keyspace2 := fmt.Sprintf("%s.%s.%s", "db", dataStore2Name.ScopeName(), dataStore2Name.CollectionName()) + keyspace1 := fmt.Sprintf("%s.%s.%s", "db", dataStore1.ScopeName(), dataStore1.CollectionName()) + keyspace2 := fmt.Sprintf("%s.%s.%s", "db", dataStore2.ScopeName(), dataStore2.CollectionName()) bucketConfig := fmt.Sprintf( `{"bucket": "%s", @@ -74,9 +70,9 @@ func TestCollectionsSyncImportFunctions(t *testing.T) { "num_index_replicas": 0, "enable_shared_bucket_access": true, "use_views": false}`, - tb.GetName(), dataStore1Name.ScopeName(), dataStore1Name.CollectionName(), + tb.GetName(), dataStore1.ScopeName(), dataStore1.CollectionName(), importFilter1, - syncFunction1, dataStore2Name.CollectionName(), + syncFunction1, dataStore2.CollectionName(), importFilter2, syncFunction2, ) diff --git a/rest/api_collections_test.go b/rest/api_collections_test.go index 2666207527..fc7a3e34b8 100644 --- a/rest/api_collections_test.go +++ b/rest/api_collections_test.go @@ -45,8 +45,6 @@ func TestCollectionsPutDocInKeyspace(t *testing.T) { defer rt.Close() ds := rt.GetSingleDataStore() - dataStoreName, ok := base.AsDataStoreName(ds) - require.True(t, ok) tests := []struct { name string keyspace string @@ -60,17 +58,17 @@ func TestCollectionsPutDocInKeyspace(t *testing.T) { }, { name: "collection only", - keyspace: strings.Join([]string{dbName, dataStoreName.CollectionName()}, base.ScopeCollectionSeparator), + keyspace: strings.Join([]string{dbName, ds.CollectionName()}, base.ScopeCollectionSeparator), expectedStatus: http.StatusCreated, }, { name: "invalid collection", - keyspace: strings.Join([]string{dbName, dataStoreName.ScopeName(), "buzz"}, base.ScopeCollectionSeparator), + keyspace: strings.Join([]string{dbName, ds.ScopeName(), "buzz"}, base.ScopeCollectionSeparator), expectedStatus: http.StatusNotFound, }, { name: "invalid scope", - keyspace: strings.Join([]string{dbName, "buzz", dataStoreName.CollectionName()}, base.ScopeCollectionSeparator), + keyspace: strings.Join([]string{dbName, "buzz", ds.CollectionName()}, base.ScopeCollectionSeparator), expectedStatus: http.StatusNotFound, }, { diff --git a/rest/config.go b/rest/config.go index 33b8b998be..78ff575449 100644 --- a/rest/config.go +++ b/rest/config.go @@ -71,6 +71,8 @@ const ( DefaultUseTLSServer = true DefaultMinConfigFetchInterval = time.Second + + tapFeedType = "tap" ) // serverType indicates which type of HTTP server sync gateway is running @@ -152,7 +154,7 @@ type DbConfig struct { ImportFilter *string `json:"import_filter,omitempty"` // The import filter applied to import operations in the _default scope and collection ImportBackupOldRev *bool `json:"import_backup_old_rev,omitempty"` // Whether import should attempt to create a temporary backup of the previous revision body, when available. EventHandlers *EventHandlerConfig `json:"event_handlers,omitempty"` // Event handlers (webhook) - FeedType string `json:"feed_type,omitempty"` // Feed type - "DCP" or "TAP"; defaults based on Couchbase server version + FeedType string `json:"feed_type,omitempty"` // Feed type - "DCP" only, "TAP" is ignored AllowEmptyPassword *bool `json:"allow_empty_password,omitempty"` // Allow empty passwords? Defaults to false CacheConfig *CacheConfig `json:"cache,omitempty"` // Cache settings DeprecatedRevCacheSize *uint32 `json:"rev_cache_size,omitempty"` // Maximum number of revisions to store in the revision cache (deprecated, CBG-356) @@ -781,7 +783,7 @@ func (dbConfig *DbConfig) validateVersion(ctx context.Context, isEnterpriseEditi if err != nil { multiError = multiError.Append(err) } - if dbConfig.FeedType == base.TapFeedType && autoImportEnabled == true { + if dbConfig.FeedType == tapFeedType && autoImportEnabled == true { multiError = multiError.Append(fmt.Errorf("Invalid configuration for Sync Gw. TAP feed type can not be used with auto-import")) } diff --git a/rest/replicatortest/replicator_test.go b/rest/replicatortest/replicator_test.go index 8882b9f898..13592893e9 100644 --- a/rest/replicatortest/replicator_test.go +++ b/rest/replicatortest/replicator_test.go @@ -7931,9 +7931,7 @@ func TestGroupIDReplications(t *testing.T) { if !rt.GetDatabase().OnlyDefaultCollection() { dataStore, err = activeBucket.GetNamedDataStore(0) require.NoError(t, err) - dsName, ok := base.AsDataStoreName(dataStore) - require.True(t, ok) - keyspace = fmt.Sprintf("/db.%s.%s/", dsName.ScopeName(), dsName.CollectionName()) + keyspace = fmt.Sprintf("/db.%s.%s/", dataStore.ScopeName(), dataStore.CollectionName()) } for groupNum, group := range groupIDs { channel := "chan" + group diff --git a/rest/server_context.go b/rest/server_context.go index e604ce3150..915891a1e2 100644 --- a/rest/server_context.go +++ b/rest/server_context.go @@ -718,11 +718,7 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config MetadataIndexes: indexInfo.indexSet, UseXattrs: config.UseXattrs(), } - dsName, ok := base.AsDataStoreName(ds) - if !ok { - return nil, fmt.Errorf("Could not get datastore name from %s", base.MD(ds.GetName())) - } - ctx := base.KeyspaceLogCtx(ctx, bucket.GetName(), dsName.ScopeName(), dsName.CollectionName()) + ctx := base.KeyspaceLogCtx(ctx, bucket.GetName(), ds.ScopeName(), ds.CollectionName()) indexErr := db.InitializeIndexes(ctx, n1qlStore, options) if indexErr != nil { return nil, indexErr @@ -1334,15 +1330,11 @@ func validateMetadataStore(ctx context.Context, metadataStore base.DataStore) er if err == nil { return nil } - metadataStoreName, ok := base.AsDataStoreName(metadataStore) - if ok { - keyspace := strings.Join([]string{metadataStore.GetName(), metadataStoreName.ScopeName(), metadataStoreName.CollectionName()}, base.ScopeCollectionSeparator) - if base.IsDefaultCollection(metadataStoreName.ScopeName(), metadataStoreName.CollectionName()) { - base.WarnfCtx(ctx, "_default._default has been deleted from the server for bucket %s, to recover recreate the bucket", metadataStore.GetName()) - } - return fmt.Errorf("metadata store %s does not exist on couchbase server: %w", base.MD(keyspace), err) + keyspace := strings.Join([]string{metadataStore.GetName(), metadataStore.ScopeName(), metadataStore.CollectionName()}, base.ScopeCollectionSeparator) + if base.IsDefaultCollection(metadataStore.ScopeName(), metadataStore.CollectionName()) { + base.WarnfCtx(ctx, "_default._default has been deleted from the server for bucket %s, to recover recreate the bucket", metadataStore.GetName()) } - return fmt.Errorf("metadata store %s does not exist on couchbase server: %w", base.MD(metadataStore.GetName()), err) + return fmt.Errorf("metadata store %s does not exist on couchbase server: %w", base.MD(keyspace), err) } // validateEventConfigOptions returns errors for all invalid event type options. diff --git a/rest/utilities_testing_test.go b/rest/utilities_testing_test.go index c7b426aaf8..f14816e65c 100644 --- a/rest/utilities_testing_test.go +++ b/rest/utilities_testing_test.go @@ -207,12 +207,8 @@ func TestRestTesterTemplateMultipleDatabases(t *testing.T) { dbOne := "dbone" bucket1Datastore1, err := rt.TestBucket.GetNamedDataStore(0) require.NoError(t, err) - bucket1Datastore1Name, ok := base.AsDataStoreName(bucket1Datastore1) - require.True(t, ok) bucket1Datastore2, err := rt.TestBucket.GetNamedDataStore(1) require.NoError(t, err) - bucket1Datastore2Name, ok := base.AsDataStoreName(bucket1Datastore2) - require.True(t, ok) resp := rt.CreateDatabase(dbOne, dbConfig) RequireStatus(t, resp, http.StatusCreated) testCases = []struct { @@ -236,11 +232,11 @@ func TestRestTesterTemplateMultipleDatabases(t *testing.T) { }, { input: "/{{.keyspace1}}/", - output: fmt.Sprintf("/%s.%s.%s/", dbOne, bucket1Datastore1Name.ScopeName(), bucket1Datastore1Name.CollectionName()), + output: fmt.Sprintf("/%s.%s.%s/", dbOne, bucket1Datastore1.ScopeName(), bucket1Datastore1.CollectionName()), }, { input: "/{{.keyspace2}}/", - output: fmt.Sprintf("/%s.%s.%s/", dbOne, bucket1Datastore2Name.ScopeName(), bucket1Datastore2Name.CollectionName()), + output: fmt.Sprintf("/%s.%s.%s/", dbOne, bucket1Datastore2.ScopeName(), bucket1Datastore2.CollectionName()), }, } for _, test := range testCases { @@ -266,12 +262,8 @@ func TestRestTesterTemplateMultipleDatabases(t *testing.T) { dbTwo := "dbtwo" bucket2Datastore1, err := rt.TestBucket.GetNamedDataStore(0) require.NoError(t, err) - bucket2Datastore1Name, ok := base.AsDataStoreName(bucket2Datastore1) - require.True(t, ok) bucket2Datastore2, err := rt.TestBucket.GetNamedDataStore(1) require.NoError(t, err) - bucket2Datastore2Name, ok := base.AsDataStoreName(bucket2Datastore2) - require.True(t, ok) resp = rt.CreateDatabase(dbTwo, dbConfig) RequireStatus(t, resp, http.StatusCreated) testCases = []struct { @@ -305,20 +297,20 @@ func TestRestTesterTemplateMultipleDatabases(t *testing.T) { }, { input: "/{{.db1keyspace1}}/", - output: fmt.Sprintf("/%s.%s.%s/", dbOne, bucket1Datastore1Name.ScopeName(), bucket2Datastore1Name.CollectionName()), + output: fmt.Sprintf("/%s.%s.%s/", dbOne, bucket1Datastore1.ScopeName(), bucket2Datastore1.CollectionName()), }, { input: "/{{.db1keyspace2}}/", - output: fmt.Sprintf("/%s.%s.%s/", dbOne, bucket1Datastore2Name.ScopeName(), bucket2Datastore2Name.CollectionName()), + output: fmt.Sprintf("/%s.%s.%s/", dbOne, bucket1Datastore2.ScopeName(), bucket2Datastore2.CollectionName()), }, { input: "/{{.db2keyspace1}}/", - output: fmt.Sprintf("/%s.%s.%s/", dbTwo, bucket2Datastore1Name.ScopeName(), bucket2Datastore1Name.CollectionName()), + output: fmt.Sprintf("/%s.%s.%s/", dbTwo, bucket2Datastore1.ScopeName(), bucket2Datastore1.CollectionName()), }, { input: "/{{.db2keyspace2}}/", - output: fmt.Sprintf("/%s.%s.%s/", dbTwo, bucket2Datastore2Name.ScopeName(), bucket2Datastore2Name.CollectionName()), + output: fmt.Sprintf("/%s.%s.%s/", dbTwo, bucket2Datastore2.ScopeName(), bucket2Datastore2.CollectionName()), }, } for _, test := range testCases {