From 50438755357f8dffc3ca2a96572050068a2421d1 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith <109068393+gregns1@users.noreply.github.com> Date: Fri, 8 Nov 2024 15:42:16 +0000 Subject: [PATCH] CBG-4213: add attachment migration api (#7183) * CBG-4213: add attachment migration api * tidy up * fix api docs lint error * Update db/background_mgr_attachment_migration.go Co-authored-by: Ben Brooks * Update db/background_mgr_attachment_migration.go Co-authored-by: Ben Brooks --------- Co-authored-by: Ben Brooks --- db/background_mgr_attachment_migration.go | 7 +- docs/api/admin.yaml | 2 + docs/api/components/schemas.yaml | 35 +++ .../paths/admin/db-_attachment_migration.yaml | 73 +++++ rest/api.go | 46 ++++ .../attachment_migration_api_test.go | 255 ++++++++++++++++++ .../attachment_migration_test.go | 51 ++-- rest/attachmentmigrationtest/main_test.go | 25 ++ rest/routing.go | 4 + rest/utilities_testing_attachment.go | 15 ++ 10 files changed, 487 insertions(+), 26 deletions(-) create mode 100644 docs/api/paths/admin/db-_attachment_migration.yaml create mode 100644 rest/attachmentmigrationtest/attachment_migration_api_test.go rename rest/{ => attachmentmigrationtest}/attachment_migration_test.go (91%) create mode 100644 rest/attachmentmigrationtest/main_test.go diff --git a/db/background_mgr_attachment_migration.go b/db/background_mgr_attachment_migration.go index 9e0fe05c8e..66db30ecb1 100644 --- a/db/background_mgr_attachment_migration.go +++ b/db/background_mgr_attachment_migration.go @@ -66,9 +66,14 @@ func (a *AttachmentMigrationManager) Init(ctx context.Context, options map[strin var statusDoc AttachmentMigrationManagerStatusDoc err := base.JSONUnmarshal(clusterStatus, &statusDoc) + reset, _ := options["reset"].(bool) + if reset { + base.InfofCtx(ctx, base.KeyAll, "Attachment Migration: Resetting migration process. Will not resume any partially completed process") + } + // If the previous run completed, or there was an error during unmarshalling the status we will start the // process from scratch with a new migration ID. Otherwise, we should resume with the migration ID, stats specified in the doc. - if statusDoc.State == BackgroundProcessStateCompleted || err != nil { + if statusDoc.State == BackgroundProcessStateCompleted || err != nil || reset { return newRunInit() } a.MigrationID = statusDoc.MigrationID diff --git a/docs/api/admin.yaml b/docs/api/admin.yaml index c70e006d5b..84d2266baf 100644 --- a/docs/api/admin.yaml +++ b/docs/api/admin.yaml @@ -124,6 +124,8 @@ paths: $ref: ./paths/admin/_all_dbs.yaml '/{db}/_compact': $ref: './paths/admin/db-_compact.yaml' + '/{db}/_attachment_migration': + $ref: './paths/admin/db-_attachment_migration.yaml' '/{db}/': $ref: './paths/admin/db-.yaml' '/{keyspace}/': diff --git a/docs/api/components/schemas.yaml b/docs/api/components/schemas.yaml index 0c06f0bff4..20fc301a4e 100644 --- a/docs/api/components/schemas.yaml +++ b/docs/api/components/schemas.yaml @@ -1988,6 +1988,41 @@ Resync-status: - docs_changed - docs_processed title: Resync-status +Attachment-Migration-status: + description: The status of a attachment migration operation + type: object + properties: + status: + description: The status of the current attachment migration operation. + type: string + enum: + - running + - completed + - stopping + - stopped + - error + start_time: + description: The ISO-8601 date and time the attachment migration operation was started. + type: string + last_error: + description: The last error that occurred in the attachment migration operation (if any). + type: string + migration_id: + description: The UUID given to the attachment migration operation. + type: string + docs_changed: + description: The amount of documents that have had attachment metadata migrated as a result of attachment migration operation. + type: integer + docs_processed: + description: The amount of docs that have been processed through the attachment migration operation. + type: integer + required: + - status + - start_time + - last_error + - docs_changed + - docs_processed + title: Attachment-Migration-status Compact-status: description: The status returned from a compaction. type: object diff --git a/docs/api/paths/admin/db-_attachment_migration.yaml b/docs/api/paths/admin/db-_attachment_migration.yaml new file mode 100644 index 0000000000..8d9ab0300c --- /dev/null +++ b/docs/api/paths/admin/db-_attachment_migration.yaml @@ -0,0 +1,73 @@ +# Copyright 2024-Present Couchbase, Inc. +# +# Use of this software is governed by the Business Source License included +# in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +# in that file, in accordance with the Business Source License, use of this +# software will be governed by the Apache License, Version 2.0, included in +# the file licenses/APL2.txt. +parameters: + - $ref: ../../components/parameters.yaml#/db +post: + summary: Manage a attachment migration operation + description: |- + This allows a new attachment migration operation to be done on the database, or to stop an existing running attachment migration operation. + + Attachment Migration is a single node process and can only one node can be running it at one point. + + Required Sync Gateway RBAC roles: + + * Sync Gateway Architect + parameters: + - name: action + in: query + description: Defines whether the an attachment migration operation is being started or stopped. + schema: + type: string + default: start + enum: + - start + - stop + - name: reset + in: query + description: |- + This forces a fresh attachment migration start instead of trying to resume the previous failed migration operation. + schema: + type: boolean + responses: + '200': + description: Started or stopped compact operation successfully + '400': + $ref: ../../components/responses.yaml#/request-problem + '404': + $ref: ../../components/responses.yaml#/Not-found + '503': + description: Cannot start attachment migration due to another migration operation still running. + content: + application/json: + schema: + $ref: ../../components/schemas.yaml#/HTTP-Error + tags: + - Database Management + operationId: post_db-_attachment_migration +get: + summary: Get the status of the most recent attachment migration operation + description: |- + This will retrieve the current status of the most recent attachment migration operation. + + Required Sync Gateway RBAC roles: + + * Sync Gateway Architect + responses: + '200': + description: Attachment migration status retrieved successfully + content: + application/json: + schema: + $ref: ../../components/schemas.yaml#/Attachment-Migration-status + '400': + $ref: ../../components/responses.yaml#/request-problem + '404': + $ref: ../../components/responses.yaml#/Not-found + tags: + - Database Management + operationId: get_db-_attachment_migration diff --git a/rest/api.go b/rest/api.go index 3bf3b698d0..874da08890 100644 --- a/rest/api.go +++ b/rest/api.go @@ -130,6 +130,52 @@ func (h *handler) handleGetCompact() error { return nil } +func (h *handler) handleAttachmentMigration() error { + action := h.getQuery("action") + if action == "" { + action = string(db.BackgroundProcessActionStart) + } + reset := h.getBoolQuery("reset") + + if action != string(db.BackgroundProcessActionStart) && action != string(db.BackgroundProcessActionStop) { + return base.HTTPErrorf(http.StatusBadRequest, "Unknown parameter for 'action'. Must be start or stop") + } + + if action == string(db.BackgroundProcessActionStart) { + err := h.db.AttachmentMigrationManager.Start(h.ctx(), map[string]interface{}{ + "reset": reset, + }) + if err != nil { + return err + } + status, err := h.db.AttachmentMigrationManager.GetStatus(h.ctx()) + if err != nil { + return err + } + h.writeRawJSON(status) + } else if action == string(db.BackgroundProcessActionStop) { + err := h.db.AttachmentMigrationManager.Stop() + if err != nil { + return err + } + status, err := h.db.AttachmentMigrationManager.GetStatus(h.ctx()) + if err != nil { + return err + } + h.writeRawJSON(status) + } + return nil +} + +func (h *handler) handleGetAttachmentMigration() error { + status, err := h.db.AttachmentMigrationManager.GetStatus(h.ctx()) + if err != nil { + return err + } + h.writeRawJSON(status) + return nil +} + func (h *handler) handleCompact() error { action := h.getQuery("action") if action == "" { diff --git a/rest/attachmentmigrationtest/attachment_migration_api_test.go b/rest/attachmentmigrationtest/attachment_migration_api_test.go new file mode 100644 index 0000000000..ede191b03f --- /dev/null +++ b/rest/attachmentmigrationtest/attachment_migration_api_test.go @@ -0,0 +1,255 @@ +/* +Copyright 2024-Present Couchbase, Inc. + +Use of this software is governed by the Business Source License included in +the file licenses/BSL-Couchbase.txt. As of the Change Date specified in that +file, in accordance with the Business Source License, use of this software will +be governed by the Apache License, Version 2.0, included in the file +licenses/APL2.txt. +*/ + +package attachmentmigrationtest + +import ( + "context" + "fmt" + "net/http" + "testing" + + "github.com/couchbase/sync_gateway/base" + "github.com/couchbase/sync_gateway/db" + "github.com/couchbase/sync_gateway/rest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAttachmentMigrationAPI(t *testing.T) { + if base.UnitTestUrlIsWalrus() { + t.Skip("rosmar does not support DCP client, pending CBG-4249") + } + + rt := rest.NewRestTester(t, &rest.RestTesterConfig{ + DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{ + AutoImport: false, // turn off import feed to stop the feed migrating attachments + }}, + }) + defer rt.Close() + collection, ctx := rt.GetSingleTestDatabaseCollectionWithUser() + + // Perform GET as automatic migration kicks in upon db start + resp := rt.SendAdminRequest("GET", "/{{.db}}/_attachment_migration", "") + rest.RequireStatus(t, resp, http.StatusOK) + + var migrationStatus db.AttachmentMigrationManagerResponse + err := base.JSONUnmarshal(resp.BodyBytes(), &migrationStatus) + require.NoError(t, err) + require.Equal(t, db.BackgroundProcessStateRunning, migrationStatus.State) + assert.Equal(t, int64(0), migrationStatus.DocsChanged) + assert.Equal(t, int64(0), migrationStatus.DocsProcessed) + assert.Empty(t, migrationStatus.LastErrorMessage) + + // Wait for run on startup to complete + _ = rt.WaitForAttachmentMigrationStatus(t, db.BackgroundProcessStateCompleted) + + // add some docs for migration + addDocsForMigrationProcess(t, ctx, collection) + + // kick off migration + resp = rt.SendAdminRequest("POST", "/{{.db}}/_attachment_migration", "") + rest.RequireStatus(t, resp, http.StatusOK) + + // attempt to kick off again, should error + resp = rt.SendAdminRequest("POST", "/{{.db}}/_attachment_migration", "") + rest.RequireStatus(t, resp, http.StatusServiceUnavailable) + + // Wait for run to complete + _ = rt.WaitForAttachmentMigrationStatus(t, db.BackgroundProcessStateCompleted) + + // Perform GET after migration has been ran, ensure it starts in valid 'stopped' state + resp = rt.SendAdminRequest("GET", "/{{.db}}/_attachment_migration", "") + rest.RequireStatus(t, resp, http.StatusOK) + + migrationStatus = db.AttachmentMigrationManagerResponse{} + err = base.JSONUnmarshal(resp.BodyBytes(), &migrationStatus) + require.NoError(t, err) + require.Equal(t, db.BackgroundProcessStateCompleted, migrationStatus.State) + assert.Equal(t, int64(5), migrationStatus.DocsChanged) + assert.Equal(t, int64(10), migrationStatus.DocsProcessed) + assert.Empty(t, migrationStatus.LastErrorMessage) +} + +func TestAttachmentMigrationAbort(t *testing.T) { + if base.UnitTestUrlIsWalrus() { + t.Skip("rosmar does not support DCP client, pending CBG-4249") + } + + rt := rest.NewRestTester(t, &rest.RestTesterConfig{ + DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{ + AutoImport: false, // turn off import feed to stop the feed migrating attachments + }}, + }) + defer rt.Close() + collection, ctx := rt.GetSingleTestDatabaseCollectionWithUser() + + // Wait for run on startup to complete + _ = rt.WaitForAttachmentMigrationStatus(t, db.BackgroundProcessStateCompleted) + + // add some docs to arrive over dcp + for i := 0; i < 20; i++ { + key := fmt.Sprintf("%s_%d", t.Name(), i) + docBody := db.Body{ + "value": 1234, + } + _, _, err := collection.Put(ctx, key, docBody) + require.NoError(t, err) + } + + // start migration + resp := rt.SendAdminRequest("POST", "/{{.db}}/_attachment_migration", "") + rest.RequireStatus(t, resp, http.StatusOK) + + // stop the migration job + resp = rt.SendAdminRequest("POST", "/{{.db}}/_attachment_migration?action=stop", "") + rest.RequireStatus(t, resp, http.StatusOK) + + status := rt.WaitForAttachmentMigrationStatus(t, db.BackgroundProcessStateStopped) + assert.Equal(t, int64(0), status.DocsChanged) +} + +func TestAttachmentMigrationReset(t *testing.T) { + if base.UnitTestUrlIsWalrus() { + t.Skip("rosmar does not support DCP client, pending CBG-4249") + } + + rt := rest.NewRestTester(t, &rest.RestTesterConfig{ + DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{ + AutoImport: false, // turn off import feed to stop the feed migrating attachments + }}, + }) + defer rt.Close() + collection, ctx := rt.GetSingleTestDatabaseCollectionWithUser() + + // Wait for run on startup to complete + _ = rt.WaitForAttachmentMigrationStatus(t, db.BackgroundProcessStateCompleted) + + // add some docs for migration + addDocsForMigrationProcess(t, ctx, collection) + + // start migration + resp := rt.SendAdminRequest("POST", "/{{.db}}/_attachment_migration", "") + rest.RequireStatus(t, resp, http.StatusOK) + status := rt.WaitForAttachmentMigrationStatus(t, db.BackgroundProcessStateRunning) + migrationID := status.MigrationID + + // Stop migration + resp = rt.SendAdminRequest("POST", "/{{.db}}/_attachment_migration?action=stop", "") + rest.RequireStatus(t, resp, http.StatusOK) + status = rt.WaitForAttachmentMigrationStatus(t, db.BackgroundProcessStateStopped) + + // make sure status is stopped + resp = rt.SendAdminRequest("GET", "/{{.db}}/_attachment_migration", "") + rest.RequireStatus(t, resp, http.StatusOK) + var migrationStatus db.AttachmentManagerResponse + err := base.JSONUnmarshal(resp.BodyBytes(), &migrationStatus) + assert.NoError(t, err) + assert.Equal(t, db.BackgroundProcessStateStopped, migrationStatus.State) + + // reset migration run + resp = rt.SendAdminRequest("POST", "/{{.db}}/_attachment_migration?reset=true", "") + rest.RequireStatus(t, resp, http.StatusOK) + status = rt.WaitForAttachmentMigrationStatus(t, db.BackgroundProcessStateRunning) + assert.NotEqual(t, migrationID, status.MigrationID) + + // wait to complete + status = rt.WaitForAttachmentMigrationStatus(t, db.BackgroundProcessStateCompleted) + // assert all 10 docs are processed again + assert.Equal(t, int64(10), status.DocsProcessed) +} + +func TestAttachmentMigrationMultiNode(t *testing.T) { + if base.UnitTestUrlIsWalrus() { + t.Skip("rosmar does not support DCP client, pending CBG-4249") + } + tb := base.GetTestBucket(t) + noCloseTB := tb.NoCloseClone() + + rt1 := rest.NewRestTester(t, &rest.RestTesterConfig{ + CustomTestBucket: noCloseTB, + }) + rt2 := rest.NewRestTester(t, &rest.RestTesterConfig{ + CustomTestBucket: tb, + }) + defer rt2.Close() + defer rt1.Close() + collection, ctx := rt1.GetSingleTestDatabaseCollectionWithUser() + + // Wait for startup run to complete, assert completed status is on both nodes + _ = rt1.WaitForAttachmentMigrationStatus(t, db.BackgroundProcessStateCompleted) + _ = rt2.WaitForAttachmentMigrationStatus(t, db.BackgroundProcessStateCompleted) + + // add some docs for migration + addDocsForMigrationProcess(t, ctx, collection) + + // kick off migration on node 1 + resp := rt1.SendAdminRequest("POST", "/{{.db}}/_attachment_migration", "") + rest.RequireStatus(t, resp, http.StatusOK) + status := rt1.WaitForAttachmentMigrationStatus(t, db.BackgroundProcessStateRunning) + migrationID := status.MigrationID + + // stop migration + resp = rt1.SendAdminRequest("POST", "/{{.db}}/_attachment_migration?action=stop", "") + rest.RequireStatus(t, resp, http.StatusOK) + _ = rt1.WaitForAttachmentMigrationStatus(t, db.BackgroundProcessStateStopped) + + // assert that node 2 also has stopped status + var rt2MigrationStatus db.AttachmentMigrationManagerResponse + resp = rt2.SendAdminRequest("GET", "/{{.db}}/_attachment_migration", "") + rest.RequireStatus(t, resp, http.StatusOK) + err := base.JSONUnmarshal(resp.BodyBytes(), &rt2MigrationStatus) + assert.NoError(t, err) + assert.Equal(t, db.BackgroundProcessStateStopped, rt2MigrationStatus.State) + + // kick off migration run again on node 2. Should resume and have same migration id + resp = rt2.SendAdminRequest("POST", "/{{.db}}/_attachment_migration?action=start", "") + rest.RequireStatus(t, resp, http.StatusOK) + _ = rt2.WaitForAttachmentMigrationStatus(t, db.BackgroundProcessStateRunning) + + // assert starting on another node when already running should error + resp = rt1.SendAdminRequest("POST", "/{{.db}}/_attachment_migration?action=start", "") + rest.RequireStatus(t, resp, http.StatusServiceUnavailable) + + // Wait for run to be marked as complete on both nodes + status = rt1.WaitForAttachmentMigrationStatus(t, db.BackgroundProcessStateCompleted) + assert.Equal(t, migrationID, status.MigrationID) + _ = rt2.WaitForAttachmentMigrationStatus(t, db.BackgroundProcessStateCompleted) +} + +func addDocsForMigrationProcess(t *testing.T, ctx context.Context, collection *db.DatabaseCollectionWithUser) { + for i := 0; i < 10; i++ { + docBody := db.Body{ + "value": 1234, + db.BodyAttachments: map[string]interface{}{"myatt": map[string]interface{}{"content_type": "text/plain", "data": "SGVsbG8gV29ybGQh"}}, + } + key := fmt.Sprintf("%s_%d", t.Name(), i) + _, doc, err := collection.Put(ctx, key, docBody) + require.NoError(t, err) + assert.NotNil(t, doc.SyncData.Attachments) + } + + // Move some subset of the documents attachment metadata from global sync to sync data + for j := 0; j < 5; j++ { + key := fmt.Sprintf("%s_%d", t.Name(), j) + value, xattrs, cas, err := collection.GetCollectionDatastore().GetWithXattrs(ctx, key, []string{base.SyncXattrName, base.GlobalXattrName}) + require.NoError(t, err) + syncXattr, ok := xattrs[base.SyncXattrName] + assert.True(t, ok) + globalXattr, ok := xattrs[base.GlobalXattrName] + assert.True(t, ok) + + var attachs db.GlobalSyncData + err = base.JSONUnmarshal(globalXattr, &attachs) + require.NoError(t, err) + + db.MoveAttachmentXattrFromGlobalToSync(t, ctx, key, cas, value, syncXattr, attachs.GlobalAttachments, true, collection.GetCollectionDatastore()) + } +} diff --git a/rest/attachment_migration_test.go b/rest/attachmentmigrationtest/attachment_migration_test.go similarity index 91% rename from rest/attachment_migration_test.go rename to rest/attachmentmigrationtest/attachment_migration_test.go index cb82431cb0..bf352d0932 100644 --- a/rest/attachment_migration_test.go +++ b/rest/attachmentmigrationtest/attachment_migration_test.go @@ -6,7 +6,7 @@ // software will be governed by the Apache License, Version 2.0, included in // the file licenses/APL2.txt. -package rest +package attachmentmigrationtest import ( "fmt" @@ -16,6 +16,7 @@ import ( sgbucket "github.com/couchbase/sg-bucket" "github.com/couchbase/sync_gateway/base" "github.com/couchbase/sync_gateway/db" + "github.com/couchbase/sync_gateway/rest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -28,7 +29,7 @@ func TestMigrationJobStartOnDbStart(t *testing.T) { if base.UnitTestUrlIsWalrus() { t.Skip("rosmar does not support DCP client, pending CBG-4249") } - rt := NewRestTesterPersistentConfig(t) + rt := rest.NewRestTesterPersistentConfig(t) defer rt.Close() ctx := rt.Context() @@ -62,12 +63,12 @@ func TestChangeDbCollectionsRestartMigrationJob(t *testing.T) { base.LongRunningTest(t) base.SetUpTestLogging(t, base.LevelInfo, base.KeyAll) tb := base.GetTestBucket(t) - rtConfig := &RestTesterConfig{ + rtConfig := &rest.RestTesterConfig{ CustomTestBucket: tb, PersistentConfig: true, } - rt := NewRestTesterMultipleCollections(t, rtConfig, 2) + rt := rest.NewRestTesterMultipleCollections(t, rtConfig, 2) defer rt.Close() ctx := rt.Context() _ = rt.Bucket() @@ -98,14 +99,14 @@ func TestChangeDbCollectionsRestartMigrationJob(t *testing.T) { require.NoError(t, writeErr) } - scopesConfigC1Only := GetCollectionsConfig(t, tb, 2) - dataStoreNames := GetDataStoreNamesFromScopesConfig(scopesConfigC1Only) + scopesConfigC1Only := rest.GetCollectionsConfig(t, tb, 2) + dataStoreNames := rest.GetDataStoreNamesFromScopesConfig(scopesConfigC1Only) scope := dataStoreNames[0].ScopeName() collection1 := dataStoreNames[0].CollectionName() collection2 := dataStoreNames[1].CollectionName() delete(scopesConfigC1Only[scope].Collections, collection2) - scopesConfigBothCollection := GetCollectionsConfig(t, tb, 2) + scopesConfigBothCollection := rest.GetCollectionsConfig(t, tb, 2) // Create a db1 with one collection initially dbConfig := rt.NewDbConfig() @@ -115,7 +116,7 @@ func TestChangeDbCollectionsRestartMigrationJob(t *testing.T) { dbConfig.Scopes = scopesConfigC1Only resp := rt.CreateDatabase(dbName, dbConfig) - RequireStatus(t, resp, http.StatusCreated) + rest.RequireStatus(t, resp, http.StatusCreated) dbCtx := rt.GetDatabase() mgr := dbCtx.AttachmentMigrationManager @@ -129,7 +130,7 @@ func TestChangeDbCollectionsRestartMigrationJob(t *testing.T) { dbConfig.AutoImport = false dbConfig.Scopes = scopesConfigBothCollection resp = rt.UpsertDbConfig(dbName, dbConfig) - RequireStatus(t, resp, http.StatusCreated) + rest.RequireStatus(t, resp, http.StatusCreated) // wait for attachment migration job to start and finish dbCtx = rt.GetDatabase() @@ -170,12 +171,12 @@ func TestMigrationNewCollectionToDbNoRestart(t *testing.T) { base.RequireNumTestDataStores(t, 2) base.SetUpTestLogging(t, base.LevelInfo, base.KeyAll) tb := base.GetTestBucket(t) - rtConfig := &RestTesterConfig{ + rtConfig := &rest.RestTesterConfig{ CustomTestBucket: tb, PersistentConfig: true, } - rt := NewRestTesterMultipleCollections(t, rtConfig, 2) + rt := rest.NewRestTesterMultipleCollections(t, rtConfig, 2) defer rt.Close() ctx := rt.Context() _ = rt.Bucket() @@ -206,8 +207,8 @@ func TestMigrationNewCollectionToDbNoRestart(t *testing.T) { require.NoError(t, writeErr) } - scopesConfigC1Only := GetCollectionsConfig(t, tb, 2) - dataStoreNames := GetDataStoreNamesFromScopesConfig(scopesConfigC1Only) + scopesConfigC1Only := rest.GetCollectionsConfig(t, tb, 2) + dataStoreNames := rest.GetDataStoreNamesFromScopesConfig(scopesConfigC1Only) scope := dataStoreNames[0].ScopeName() collection2 := dataStoreNames[1].CollectionName() delete(scopesConfigC1Only[scope].Collections, collection2) @@ -219,7 +220,7 @@ func TestMigrationNewCollectionToDbNoRestart(t *testing.T) { dbConfig.AutoImport = false dbConfig.Scopes = scopesConfigC1Only resp := rt.CreateDatabase(dbName, dbConfig) - RequireStatus(t, resp, http.StatusCreated) + rest.RequireStatus(t, resp, http.StatusCreated) dbCtx := rt.GetDatabase() mgr := dbCtx.AttachmentMigrationManager @@ -239,12 +240,12 @@ func TestMigrationNewCollectionToDbNoRestart(t *testing.T) { // create db with second collection, background job should only run on new collection added given // existent of sync info meta version on collection 1 - scopesConfigBothCollection := GetCollectionsConfig(t, tb, 2) + scopesConfigBothCollection := rest.GetCollectionsConfig(t, tb, 2) dbConfig = rt.NewDbConfig() dbConfig.AutoImport = false dbConfig.Scopes = scopesConfigBothCollection resp = rt.UpsertDbConfig(dbName, dbConfig) - RequireStatus(t, resp, http.StatusCreated) + rest.RequireStatus(t, resp, http.StatusCreated) dbCtx = rt.GetDatabase() mgr = dbCtx.AttachmentMigrationManager @@ -279,12 +280,12 @@ func TestMigrationNoReRunStartStopDb(t *testing.T) { base.RequireNumTestDataStores(t, 2) base.SetUpTestLogging(t, base.LevelInfo, base.KeyAll) tb := base.GetTestBucket(t) - rtConfig := &RestTesterConfig{ + rtConfig := &rest.RestTesterConfig{ CustomTestBucket: tb, PersistentConfig: true, } - rt := NewRestTesterMultipleCollections(t, rtConfig, 2) + rt := rest.NewRestTesterMultipleCollections(t, rtConfig, 2) defer rt.Close() ctx := rt.Context() _ = rt.Bucket() @@ -314,14 +315,14 @@ func TestMigrationNoReRunStartStopDb(t *testing.T) { require.NoError(t, writeErr) } - scopesConfigBothCollection := GetCollectionsConfig(t, tb, 2) + scopesConfigBothCollection := rest.GetCollectionsConfig(t, tb, 2) dbConfig := rt.NewDbConfig() // ensure import is off to stop the docs we add from being imported by sync gateway, this could cause extra overhead // on the migration job (more doc writes going to bucket). We want to avoid for purpose of this test dbConfig.AutoImport = false dbConfig.Scopes = scopesConfigBothCollection resp := rt.CreateDatabase(dbName, dbConfig) - RequireStatus(t, resp, http.StatusCreated) + rest.RequireStatus(t, resp, http.StatusCreated) dbCtx := rt.GetDatabase() assert.Len(t, dbCtx.RequireAttachmentMigration, 2) @@ -345,7 +346,7 @@ func TestMigrationNoReRunStartStopDb(t *testing.T) { dbConfig.AutoImport = true dbConfig.Scopes = scopesConfigBothCollection resp = rt.UpsertDbConfig(dbName, dbConfig) - RequireStatus(t, resp, http.StatusCreated) + rest.RequireStatus(t, resp, http.StatusCreated) dbCtx = rt.GetDatabase() mgr = dbCtx.AttachmentMigrationManager @@ -371,12 +372,12 @@ func TestStartMigrationAlreadyRunningProcess(t *testing.T) { base.RequireNumTestDataStores(t, 1) base.SetUpTestLogging(t, base.LevelInfo, base.KeyAll) tb := base.GetTestBucket(t) - rtConfig := &RestTesterConfig{ + rtConfig := &rest.RestTesterConfig{ CustomTestBucket: tb, PersistentConfig: true, } - rt := NewRestTester(t, rtConfig) + rt := rest.NewRestTester(t, rtConfig) defer rt.Close() ctx := rt.Context() _ = rt.Bucket() @@ -402,14 +403,14 @@ func TestStartMigrationAlreadyRunningProcess(t *testing.T) { require.NoError(t, writeErr) } - scopesConfig := GetCollectionsConfig(t, tb, 1) + scopesConfig := rest.GetCollectionsConfig(t, tb, 1) dbConfig := rt.NewDbConfig() // ensure import is off to stop the docs we add from being imported by sync gateway, this could cause extra overhead // on the migration job (more doc writes going to bucket). We want to avoid for purpose of this test dbConfig.AutoImport = false dbConfig.Scopes = scopesConfig resp := rt.CreateDatabase(dbName, dbConfig) - RequireStatus(t, resp, http.StatusCreated) + rest.RequireStatus(t, resp, http.StatusCreated) dbCtx := rt.GetDatabase() nodeMgr := dbCtx.AttachmentMigrationManager // wait for migration job to start diff --git a/rest/attachmentmigrationtest/main_test.go b/rest/attachmentmigrationtest/main_test.go new file mode 100644 index 0000000000..347a1ae26a --- /dev/null +++ b/rest/attachmentmigrationtest/main_test.go @@ -0,0 +1,25 @@ +/* +Copyright 2024-Present Couchbase, Inc. + +Use of this software is governed by the Business Source License included in +the file licenses/BSL-Couchbase.txt. As of the Change Date specified in that +file, in accordance with the Business Source License, use of this software will +be governed by the Apache License, Version 2.0, included in the file +licenses/APL2.txt. +*/ + +package attachmentmigrationtest + +import ( + "context" + "testing" + + "github.com/couchbase/sync_gateway/base" + "github.com/couchbase/sync_gateway/db" +) + +func TestMain(m *testing.M) { + ctx := context.Background() // start of test process + tbpOptions := base.TestBucketPoolOptions{MemWatermarkThresholdMB: 8192} + db.TestBucketPoolWithIndexes(ctx, m, tbpOptions) +} diff --git a/rest/routing.go b/rest/routing.go index 37765078e4..9a22b6c62d 100644 --- a/rest/routing.go +++ b/rest/routing.go @@ -164,6 +164,10 @@ func CreateAdminRouter(sc *ServerContext) *mux.Router { makeHandler(sc, adminPrivs, []Permission{PermUpdateDb}, nil, (*handler).handleCompact)).Methods("POST") dbr.Handle("/_compact", makeHandler(sc, adminPrivs, []Permission{PermUpdateDb}, nil, (*handler).handleGetCompact)).Methods("GET") + dbr.Handle("/_attachment_migration", + makeHandler(sc, adminPrivs, []Permission{PermUpdateDb}, nil, (*handler).handleAttachmentMigration)).Methods("POST") + dbr.Handle("/_attachment_migration", + makeHandler(sc, adminPrivs, []Permission{PermUpdateDb}, nil, (*handler).handleGetAttachmentMigration)).Methods("GET") dbr.Handle("/_session", makeHandler(sc, adminPrivs, []Permission{PermWritePrincipal}, nil, (*handler).createUserSession)).Methods("POST") dbr.Handle("/_session/{sessionid}", diff --git a/rest/utilities_testing_attachment.go b/rest/utilities_testing_attachment.go index 4f02b3135b..e3dc9986ea 100644 --- a/rest/utilities_testing_attachment.go +++ b/rest/utilities_testing_attachment.go @@ -12,6 +12,7 @@ import ( "context" "net/http" "testing" + "time" sgbucket "github.com/couchbase/sg-bucket" "github.com/couchbase/sync_gateway/base" @@ -85,3 +86,17 @@ func CreateLegacyAttachmentDoc(t *testing.T, ctx context.Context, collection *db return attDocID } + +func (rt *RestTester) WaitForAttachmentMigrationStatus(t *testing.T, state db.BackgroundProcessState) db.AttachmentMigrationManagerResponse { + var response db.AttachmentMigrationManagerResponse + require.EventuallyWithT(t, func(c *assert.CollectT) { + resp := rt.SendAdminRequest("GET", "/{{.db}}/_attachment_migration", "") + require.Equal(c, http.StatusOK, resp.Code) + + err := base.JSONUnmarshal(resp.BodyBytes(), &response) + require.NoError(c, err) + assert.Equal(c, state, response.State) + }, time.Second*20, time.Millisecond*100) + + return response +}