Skip to content

Commit

Permalink
CBG-3271 remove persistent rosmar buckets
Browse files Browse the repository at this point in the history
- rosmar now supports persisting in memory buckets, so keep the buckets
  in memory until the buckets are closed. This eliminates NoCloseClone
  in many cases. Another PR might be able to remove even more of these.
- this is prep for being able to run a bootstrap connection.
  • Loading branch information
torcolvin committed Nov 8, 2023
1 parent 1bef056 commit 90bdfab
Show file tree
Hide file tree
Showing 17 changed files with 54 additions and 131 deletions.
4 changes: 2 additions & 2 deletions base/leaky_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ func (b *LeakyBucket) SetIgnoreClose(value bool) {
b.config.IgnoreClose = value
}

func (b *LeakyBucket) CloseAndDelete() error {
func (b *LeakyBucket) CloseAndDelete(ctx context.Context) error {
if bucket, ok := b.bucket.(sgbucket.DeleteableStore); ok {
return bucket.CloseAndDelete()
return bucket.CloseAndDelete(ctx)
}
return nil
}
Expand Down
28 changes: 6 additions & 22 deletions base/main_test_bucket_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,29 +198,17 @@ func (tbp *TestBucketPool) GetWalrusTestBucket(t testing.TB, url string) (b Buck
require.NoError(t, err)

var walrusBucket *rosmar.Bucket
var typeName string
const typeName = "rosmar"
bucketName := tbpBucketNamePrefix + "rosmar_" + id
if url == "walrus:" || url == rosmar.InMemoryURL {
walrusBucket, err = rosmar.OpenBucket(url, rosmar.CreateOrOpen)
if err == nil {
err := walrusBucket.SetName(bucketName)
if err != nil {
tbp.Fatalf(testCtx, "Could not set name %s for rosmar bucket: %s", bucketName, err)
}
}
walrusBucket, err = rosmar.OpenBucket(url, bucketName, rosmar.CreateOrOpen)
} else {
walrusBucket, err = rosmar.OpenBucketIn(url, bucketName, rosmar.CreateOrOpen)
}
typeName = "rosmar"
if err != nil {
tbp.Fatalf(testCtx, "couldn't get %s bucket from <%s>: %v", typeName, url, err)
}

err = walrusBucket.SetName(bucketName)
if err != nil {
tbp.Fatalf(testCtx, "Could not set name %s for rosmar bucket: %s", bucketName, err)
}

// Wrap Walrus buckets with a leaky bucket to support vbucket IDs on feed.
b = &LeakyBucket{bucket: walrusBucket, config: &LeakyBucketConfig{TapFeedVbuckets: true}}

Expand Down Expand Up @@ -258,14 +246,10 @@ func (tbp *TestBucketPool) GetWalrusTestBucket(t testing.TB, url string) (b Buck
atomic.AddInt32(&tbp.stats.NumBucketsClosed, 1)
atomic.AddInt64(&tbp.stats.TotalInuseBucketNano, time.Since(openedStart).Nanoseconds())
tbp.markBucketClosed(t, b)
if url == kTestWalrusURL {
b.Close(ctx)
} else {
// Persisted buckets should call close and delete
closeErr := walrusBucket.CloseAndDelete()
if closeErr != nil {
tbp.Logf(ctx, "Unexpected error closing persistent %s bucket: %v", typeName, closeErr)
}
// Persisted buckets should call close and delete
closeErr := walrusBucket.CloseAndDelete(ctx)
if closeErr != nil {
tbp.Logf(ctx, "Unexpected error closing persistent %s bucket: %v", typeName, closeErr)
}

}
Expand Down
29 changes: 0 additions & 29 deletions base/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,6 @@ func GetTestBucket(t testing.TB) *TestBucket {
return getTestBucket(t, false)
}

// GetTestBucket returns a test bucket from a pool. If running with walrus buckets, will persist bucket data
// across bucket close.
func GetPersistentTestBucket(t testing.TB) *TestBucket {
return getTestBucket(t, true)
}

// getTestBucket returns a bucket from the bucket pool. Persistent flag determines behaviour for walrus
// buckets only - Couchbase bucket behaviour is defined by the bucket pool readier/init.
func getTestBucket(t testing.TB, persistent bool) *TestBucket {
Expand Down Expand Up @@ -210,29 +204,6 @@ func rosmarUriFromPath(path string) string {
return uri + strings.ReplaceAll(path, `\`, `/`)
}

// Gets a Walrus bucket which will be persisted to a temporary directory
// Returns both the test bucket which is persisted and a function which can be used to remove the created temporary
// directory once the test has finished with it.
func GetPersistentWalrusBucket(t testing.TB) (*TestBucket, func()) {
tempDir, err := os.MkdirTemp("", "walrustemp")
require.NoError(t, err)

bucket, spec, closeFn := GTestBucketPool.GetWalrusTestBucket(t, rosmarUriFromPath(tempDir))

// Return this separate to closeFn as we want to avoid this being removed on database close (/_offline handling)
removeFileFunc := func() {
err := os.RemoveAll(tempDir)
require.NoError(t, err)
}

return &TestBucket{
Bucket: bucket,
BucketSpec: spec,
closeFn: closeFn,
t: t,
}, removeFileFunc
}

// Should Sync Gateway use XATTRS functionality when running unit tests?
func TestUseXattrs() bool {
useXattrs, isSet := os.LookupEnv(TestEnvSyncGatewayUseXattrs)
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ require (
gopkg.in/square/go-jose.v2 v2.6.0
)

replace github.com/couchbaselabs/rosmar => ../rosmar

replace github.com/couchbase/sg-bucket => ../sg-bucket

require (
github.com/BurntSushi/toml v1.3.2 // indirect
github.com/aws/aws-sdk-go v1.44.299 // indirect
Expand Down
23 changes: 10 additions & 13 deletions rest/adminapitest/admin_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1472,7 +1472,6 @@ func TestCorruptDbConfigHandling(t *testing.T) {
base.SetUpTestLogging(t, base.LevelInfo, base.KeyConfig)

rt := rest.NewRestTester(t, &rest.RestTesterConfig{
CustomTestBucket: base.GetPersistentTestBucket(t),
PersistentConfig: true,
MutateStartupConfig: func(config *rest.StartupConfig) {
// configure the interval time to pick up new configs from the bucket to every 1 seconds
Expand Down Expand Up @@ -1557,7 +1556,6 @@ func TestBadConfigInsertionToBucket(t *testing.T) {
base.TestsRequireBootstrapConnection(t)

rt := rest.NewRestTester(t, &rest.RestTesterConfig{
CustomTestBucket: base.GetPersistentTestBucket(t),
PersistentConfig: true,
MutateStartupConfig: func(config *rest.StartupConfig) {
// configure the interval time to pick up new configs from the bucket to every 1 seconds
Expand Down Expand Up @@ -1607,12 +1605,8 @@ func TestBadConfigInsertionToBucket(t *testing.T) {
func TestMismatchedBucketNameOnDbConfigUpdate(t *testing.T) {
base.TestsRequireBootstrapConnection(t)
base.RequireNumTestBuckets(t, 2)
ctx := base.TestCtx(t)
tb1 := base.GetPersistentTestBucket(t)
defer tb1.Close(ctx)

rt := rest.NewRestTester(t, &rest.RestTesterConfig{
CustomTestBucket: base.GetPersistentTestBucket(t),
PersistentConfig: true,
MutateStartupConfig: func(config *rest.StartupConfig) {
// configure the interval time to pick up new configs from the bucket to every 1 seconds
Expand All @@ -1628,6 +1622,9 @@ func TestMismatchedBucketNameOnDbConfigUpdate(t *testing.T) {

// wait for db to come online
require.NoError(t, rt.WaitForDBOnline())
ctx := base.TestCtx(t)
tb1 := base.GetTestBucket(t)
defer tb1.Close(ctx)
badName := tb1.GetName()
dbConfig.Bucket = &badName

Expand All @@ -1643,11 +1640,11 @@ func TestMultipleBucketWithBadDbConfigScenario1(t *testing.T) {
base.TestsRequireBootstrapConnection(t)
base.RequireNumTestBuckets(t, 3)
ctx := base.TestCtx(t)
tb1 := base.GetPersistentTestBucket(t)
tb1 := base.GetTestBucket(t)
defer tb1.Close(ctx)
tb2 := base.GetPersistentTestBucket(t)
tb2 := base.GetTestBucket(t)
defer tb2.Close(ctx)
tb3 := base.GetPersistentTestBucket(t)
tb3 := base.GetTestBucket(t)
defer tb3.Close(ctx)

const groupID = "60ce5544-c368-4b08-b0ed-4ca3b37973f9"
Expand Down Expand Up @@ -1722,9 +1719,9 @@ func TestMultipleBucketWithBadDbConfigScenario2(t *testing.T) {

base.RequireNumTestBuckets(t, 3)
ctx := base.TestCtx(t)
tb1 := base.GetPersistentTestBucket(t)
tb1 := base.GetTestBucket(t)
defer tb1.Close(ctx)
tb2 := base.GetPersistentTestBucket(t)
tb2 := base.GetTestBucket(t)
defer tb2.Close(ctx)

rt1 := rest.NewRestTester(t, &rest.RestTesterConfig{
Expand Down Expand Up @@ -1792,9 +1789,9 @@ func TestMultipleBucketWithBadDbConfigScenario3(t *testing.T) {
base.TestsRequireBootstrapConnection(t)

ctx := base.TestCtx(t)
tb1 := base.GetPersistentTestBucket(t)
tb1 := base.GetTestBucket(t)
defer tb1.Close(ctx)
tb2 := base.GetPersistentTestBucket(t)
tb2 := base.GetTestBucket(t)
defer tb2.Close(ctx)

rt := rest.NewRestTester(t, &rest.RestTesterConfig{
Expand Down
1 change: 0 additions & 1 deletion rest/adminapitest/collections_admin_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ func TestRequireResync(t *testing.T) {
base.RequireNumTestDataStores(t, 2)
base.SetUpTestLogging(t, base.LevelInfo, base.KeyAll)
rtConfig := &rest.RestTesterConfig{
CustomTestBucket: base.GetPersistentTestBucket(t),
PersistentConfig: true,
}

Expand Down
2 changes: 1 addition & 1 deletion rest/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (h *handler) handleFlush() error {
name := h.db.Name
config := h.server.GetDatabaseConfig(name)
h.server.RemoveDatabase(h.ctx(), name)
err := bucket.CloseAndDelete()
err := bucket.CloseAndDelete(h.ctx())
_, err2 := h.server.AddDatabaseFromConfig(h.ctx(), config.DatabaseConfig)
if err == nil {
err = err2
Expand Down
28 changes: 14 additions & 14 deletions rest/api_collections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func TestMultiCollectionChannelAccess(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll)

ctx := base.TestCtx(t)
tb := base.GetPersistentTestBucket(t)
tb := base.GetTestBucket(t)
defer tb.Close(ctx)

scopesConfig := GetCollectionsConfig(t, tb, 2)
Expand All @@ -281,9 +281,8 @@ func TestMultiCollectionChannelAccess(t *testing.T) {
scopesConfig[scope].Collections[collection1] = &CollectionConfig{SyncFn: &c1SyncFunction}
scopesConfig[scope].Collections[collection2] = &CollectionConfig{SyncFn: &c1SyncFunction}

fmt.Println(scopesConfig)
rtConfig := &RestTesterConfig{
CustomTestBucket: tb.NoCloseClone(),
CustomTestBucket: tb,
DatabaseConfig: &DatabaseConfig{DbConfig: DbConfig{
Scopes: scopesConfig,
NumIndexReplicas: base.UintPtr(0),
Expand Down Expand Up @@ -337,16 +336,21 @@ func TestMultiCollectionChannelAccess(t *testing.T) {
RequireStatus(t, resp, http.StatusOK)

// Add a new collection and update the db config
scopesConfig = GetCollectionsConfig(t, tb, 3)
dataStoreNames = GetDataStoreNamesFromScopesConfig(scopesConfig)
scopesConfig3Collections := GetCollectionsConfig(t, tb, 3)
dataStoreNames = GetDataStoreNamesFromScopesConfig(scopesConfig3Collections)

collection3 := dataStoreNames[2].CollectionName()
scopesConfig[scope].Collections[collection1] = &CollectionConfig{SyncFn: &c1SyncFunction}
scopesConfig[scope].Collections[collection2] = &CollectionConfig{SyncFn: &c1SyncFunction}
scopesConfig[scope].Collections[collection3] = &CollectionConfig{SyncFn: &c1SyncFunction}
scopesConfigString, err := json.Marshal(scopesConfig)
scopesConfig3Collections[scope].Collections[collection1] = &CollectionConfig{SyncFn: &c1SyncFunction}
scopesConfig3Collections[scope].Collections[collection2] = &CollectionConfig{SyncFn: &c1SyncFunction}
scopesConfig3Collections[scope].Collections[collection3] = &CollectionConfig{SyncFn: &c1SyncFunction}
scopesConfigString, err := json.Marshal(scopesConfig3Collections)
require.NoError(t, err)

scopesConfig2Collections := GetCollectionsConfig(t, tb, 2)

scopesConfig2Collections[scope].Collections[collection1] = &CollectionConfig{SyncFn: &c1SyncFunction}
scopesConfig2Collections[scope].Collections[collection2] = &CollectionConfig{SyncFn: &c1SyncFunction}

resp = rt.SendAdminRequest("PUT", "/db/_config", fmt.Sprintf(
`{"bucket": "%s", "num_index_replicas": 0, "enable_shared_bucket_access": %t, "scopes":%s}`,
tb.GetName(), base.TestUseXattrs(), string(scopesConfigString)))
Expand Down Expand Up @@ -378,11 +382,7 @@ func TestMultiCollectionChannelAccess(t *testing.T) {
RequireStatus(t, resp, http.StatusOK)

// Remove collection and update the db config
scopesConfig = GetCollectionsConfig(t, tb, 2)

scopesConfig[scope].Collections[collection1] = &CollectionConfig{SyncFn: &c1SyncFunction}
scopesConfig[scope].Collections[collection2] = &CollectionConfig{SyncFn: &c1SyncFunction}
scopesConfigString, err = json.Marshal(scopesConfig)
scopesConfigString, err = json.Marshal(scopesConfig2Collections)
require.NoError(t, err)

resp = rt.SendAdminRequest("PUT", "/db/_config", fmt.Sprintf(
Expand Down
6 changes: 3 additions & 3 deletions rest/importtest/collections_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestMultiCollectionImportFilter(t *testing.T) {
base.RequireNumTestDataStores(t, 3)

ctx := base.TestCtx(t)
testBucket := base.GetPersistentTestBucket(t)
testBucket := base.GetTestBucket(t)
defer testBucket.Close(ctx)

scopesConfig := rest.GetCollectionsConfig(t, testBucket, 2)
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestMultiCollectionImportDynamicAddCollection(t *testing.T) {
base.RequireNumTestDataStores(t, 2)

ctx := base.TestCtx(t)
testBucket := base.GetPersistentTestBucket(t)
testBucket := base.GetTestBucket(t)
defer testBucket.Close(ctx)

rtConfig := &rest.RestTesterConfig{
Expand Down Expand Up @@ -346,7 +346,7 @@ func TestMultiCollectionImportRemoveCollection(t *testing.T) {
base.RequireNumTestDataStores(t, numCollections)

ctx := base.TestCtx(t)
testBucket := base.GetPersistentTestBucket(t)
testBucket := base.GetTestBucket(t)
defer testBucket.Close(ctx)

rtConfig := &rest.RestTesterConfig{
Expand Down
12 changes: 3 additions & 9 deletions rest/importtest/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2039,7 +2039,7 @@ func assertDocProperty(t *testing.T, getDocResponse *rest.TestResponse, property
err := base.JSONUnmarshal(getDocResponse.Body.Bytes(), &responseBody)
assert.NoError(t, err, "Error unmarshalling document response")
value, ok := responseBody[propertyName]
assert.True(t, ok, fmt.Sprintf("Expected property %s not found in response %s", propertyName, getDocResponse.Body.Bytes()))
require.True(t, ok, fmt.Sprintf("Expected property %s not found in response %s", propertyName, getDocResponse.Body.Bytes()))
assert.Equal(t, expectedPropertyValue, value)
}

Expand Down Expand Up @@ -2734,12 +2734,7 @@ func TestImportRollback(t *testing.T) {

base.SetUpTestLogging(t, base.LevelDebug, base.KeyImport, base.KeyDCP)

ctx := base.TestCtx(t)
bucket := base.GetPersistentTestBucket(t)
defer bucket.Close(ctx)

rt := rest.NewRestTester(t, &rest.RestTesterConfig{
CustomTestBucket: bucket.NoCloseClone(),
PersistentConfig: false,
})

Expand Down Expand Up @@ -2768,9 +2763,9 @@ func TestImportRollback(t *testing.T) {
}()

// fetch the checkpoint for the document's vbucket, modify the checkpoint values to a higher sequence
vbNo, err := base.GetVbucketForKey(bucket, key)
vbNo, err := base.GetVbucketForKey(rt.TestBucket, key)
require.NoError(t, err)
metaStore := bucket.GetMetadataStore()
metaStore := rt.TestBucket.GetMetadataStore()
checkpointKey := fmt.Sprintf("%s%d", checkpointPrefix, vbNo)
var checkpointData dcpMetaData
checkpointBytes, _, err := metaStore.GetRaw(checkpointKey)
Expand All @@ -2789,7 +2784,6 @@ func TestImportRollback(t *testing.T) {

// Reopen the db, expect DCP rollback
rt2 := rest.NewRestTester(t, &rest.RestTesterConfig{
CustomTestBucket: bucket.NoCloseClone(),
PersistentConfig: false,
})
defer rt2.Close()
Expand Down
4 changes: 0 additions & 4 deletions rest/server_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,11 +840,7 @@ func TestOfflineDatabaseStartup(t *testing.T) {

base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll)

ctx := base.TestCtx(t)
bucket := base.GetPersistentTestBucket(t)
defer bucket.Close(ctx)
rt := NewRestTester(t, &RestTesterConfig{
CustomTestBucket: bucket.NoCloseClone(),
DatabaseConfig: &DatabaseConfig{
DbConfig: DbConfig{
StartOffline: base.BoolPtr(true),
Expand Down
13 changes: 1 addition & 12 deletions rest/sync_fn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,20 +898,9 @@ func TestResyncRegenerateSequences(t *testing.T) {

base.SetUpTestLogging(t, base.LevelInfo, base.KeyAll)

var testBucket *base.TestBucket

if base.UnitTestUrlIsWalrus() {
var closeFn func()
testBucket, closeFn = base.GetPersistentWalrusBucket(t)
defer closeFn()
} else {
testBucket = base.GetTestBucket(t)
}

rt := NewRestTester(t,
&RestTesterConfig{
SyncFn: syncFn,
CustomTestBucket: testBucket,
SyncFn: syncFn,
},
)
defer rt.Close()
Expand Down
Loading

0 comments on commit 90bdfab

Please sign in to comment.