Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

select backup engine in Backup() and ignore engines in RestoreFromBackup() #16428

Merged
merged 20 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions go/cmd/vtctldclient/command/backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
var (
// Backup makes a Backup gRPC call to a vtctld.
Backup = &cobra.Command{
Use: "Backup [--concurrency <concurrency>] [--allow-primary] [--incremental-from-pos=<pos>|<backup-name>|auto] [--upgrade-safe] <tablet_alias>",
Use: "Backup [--concurrency <concurrency>] [--allow-primary] [--incremental-from-pos=<pos>|<backup-name>|auto] [--upgrade-safe] [--backup-engine=enginename] <tablet_alias>",
Short: "Uses the BackupStorage service on the given tablet to create and store a new backup.",
DisableFlagsInUseLine: true,
Args: cobra.ExactArgs(1),
Expand Down Expand Up @@ -70,7 +70,7 @@ If no replica-type tablet can be found, the backup can be taken on the primary i
}
// RestoreFromBackup makes a RestoreFromBackup gRPC call to a vtctld.
RestoreFromBackup = &cobra.Command{
Use: "RestoreFromBackup [--backup-timestamp|-t <YYYY-mm-DD.HHMMSS>] [--restore-to-pos <pos>] [--dry-run] <tablet_alias>",
Use: "RestoreFromBackup [--backup-timestamp|-t <YYYY-mm-DD.HHMMSS>] [--restore-to-pos <pos>] [--allowed-backup-engines=enginename,] [--dry-run] <tablet_alias>",
Short: "Stops mysqld on the specified tablet and restores the data from either the latest backup or closest before `backup-timestamp`.",
DisableFlagsInUseLine: true,
Args: cobra.ExactArgs(1),
Expand All @@ -80,6 +80,7 @@ If no replica-type tablet can be found, the backup can be taken on the primary i

var backupOptions = struct {
AllowPrimary bool
BackupEngine string
Concurrency int32
IncrementalFromPos string
UpgradeSafe bool
Expand All @@ -93,13 +94,19 @@ func commandBackup(cmd *cobra.Command, args []string) error {

cli.FinishedParsing(cmd)

stream, err := client.Backup(commandCtx, &vtctldatapb.BackupRequest{
req := &vtctldatapb.BackupRequest{
TabletAlias: tabletAlias,
AllowPrimary: backupOptions.AllowPrimary,
Concurrency: backupOptions.Concurrency,
IncrementalFromPos: backupOptions.IncrementalFromPos,
UpgradeSafe: backupOptions.UpgradeSafe,
})
}

if backupOptions.BackupEngine != "" {
req.BackupEngine = &backupOptions.BackupEngine
}

stream, err := client.Backup(commandCtx, req)
if err != nil {
return err
}
Expand Down Expand Up @@ -218,10 +225,11 @@ func commandRemoveBackup(cmd *cobra.Command, args []string) error {
}

var restoreFromBackupOptions = struct {
BackupTimestamp string
RestoreToPos string
RestoreToTimestamp string
DryRun bool
BackupTimestamp string
AllowedBackupEngines []string
RestoreToPos string
RestoreToTimestamp string
DryRun bool
}{}

func commandRestoreFromBackup(cmd *cobra.Command, args []string) error {
Expand All @@ -243,10 +251,11 @@ func commandRestoreFromBackup(cmd *cobra.Command, args []string) error {
}

req := &vtctldatapb.RestoreFromBackupRequest{
TabletAlias: alias,
RestoreToPos: restoreFromBackupOptions.RestoreToPos,
RestoreToTimestamp: protoutil.TimeToProto(restoreToTimestamp),
DryRun: restoreFromBackupOptions.DryRun,
TabletAlias: alias,
RestoreToPos: restoreFromBackupOptions.RestoreToPos,
RestoreToTimestamp: protoutil.TimeToProto(restoreToTimestamp),
DryRun: restoreFromBackupOptions.DryRun,
AllowedBackupEngines: restoreFromBackupOptions.AllowedBackupEngines,
}

if restoreFromBackupOptions.BackupTimestamp != "" {
Expand Down Expand Up @@ -282,6 +291,7 @@ func init() {
Backup.Flags().BoolVar(&backupOptions.AllowPrimary, "allow-primary", false, "Allow the primary of a shard to be used for the backup. WARNING: If using the builtin backup engine, this will shutdown mysqld on the primary and stop writes for the duration of the backup.")
Backup.Flags().Int32Var(&backupOptions.Concurrency, "concurrency", 4, "Specifies the number of compression/checksum jobs to run simultaneously.")
Backup.Flags().StringVar(&backupOptions.IncrementalFromPos, "incremental-from-pos", "", "Position, or name of backup from which to create an incremental backup. Default: empty. If given, then this backup becomes an incremental backup from given position or given backup. If value is 'auto', this backup will be taken from the last successful backup position.")
Backup.Flags().StringVar(&backupOptions.BackupEngine, "backup-engine", "", "Request a specific backup engine for this backup request. Defaults to the preferred backup engine of the target vttablet")

Backup.Flags().BoolVar(&backupOptions.UpgradeSafe, "upgrade-safe", false, "Whether to use innodb_fast_shutdown=0 for the backup so it is safe to use for MySQL upgrades.")
Root.AddCommand(Backup)
Expand All @@ -299,6 +309,7 @@ func init() {
Root.AddCommand(RemoveBackup)

RestoreFromBackup.Flags().StringVarP(&restoreFromBackupOptions.BackupTimestamp, "backup-timestamp", "t", "", "Use the backup taken at, or closest before, this timestamp. Omit to use the latest backup. Timestamp format is \"YYYY-mm-DD.HHMMSS\".")
RestoreFromBackup.Flags().StringSliceVar(&restoreFromBackupOptions.AllowedBackupEngines, "allowed-backup-engines", restoreFromBackupOptions.AllowedBackupEngines, "if set, only backups taken with the specified engines are eligible to be restored")
RestoreFromBackup.Flags().StringVar(&restoreFromBackupOptions.RestoreToPos, "restore-to-pos", "", "Run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups")
RestoreFromBackup.Flags().StringVar(&restoreFromBackupOptions.RestoreToTimestamp, "restore-to-timestamp", "", "Run a point in time recovery that restores up to, and excluding, given timestamp in RFC3339 format (`2006-01-02T15:04:05Z07:00`). This will attempt to use one full backup followed by zero or more incremental backups")
RestoreFromBackup.Flags().BoolVar(&restoreFromBackupOptions.DryRun, "dry-run", false, "Only validate restore steps, do not actually restore data")
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ Flags:
--relay_log_max_size int Maximum buffer size (in bytes) for vreplication target buffering. If single rows are larger than this, a single row is buffered at a time. (default 250000)
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
--replication_connect_retry duration how long to wait in between replica reconnect attempts. Only precise to the second. (default 10s)
--restore-from-backup-allowed-engines strings (init restore parameter) if set, only backups taken with the specified engines are eligible to be restored
--restore-to-pos string (init incremental restore parameter) if set, run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups
--restore-to-timestamp string (init incremental restore parameter) if set, run a point in time recovery that restores up to the given timestamp, if possible. Given timestamp in RFC3339 format. Example: '2006-01-02T15:04:05Z07:00'
--restore_concurrency int (init restore parameter) how many concurrent files to restore at once (default 4)
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ Flags:
--relay_log_max_size int Maximum buffer size (in bytes) for vreplication target buffering. If single rows are larger than this, a single row is buffered at a time. (default 250000)
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
--replication_connect_retry duration how long to wait in between replica reconnect attempts. Only precise to the second. (default 10s)
--restore-from-backup-allowed-engines strings (init restore parameter) if set, only backups taken with the specified engines are eligible to be restored
--restore-to-pos string (init incremental restore parameter) if set, run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups
--restore-to-timestamp string (init incremental restore parameter) if set, run a point in time recovery that restores up to the given timestamp, if possible. Given timestamp in RFC3339 format. Example: '2006-01-02T15:04:05Z07:00'
--restore_concurrency int (init restore parameter) how many concurrent files to restore at once (default 4)
Expand Down
3 changes: 3 additions & 0 deletions go/test/endtoend/backup/vtctlbackup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestBuiltinBackup(t *testing.T) {

func TestBuiltinBackupWithZstdCompression(t *testing.T) {
defer setDefaultCompressionFlag()
defer setDefaultCommonArgs()
cDetails := &CompressionDetails{
CompressorEngineName: "zstd",
ExternalCompressorCmd: "zstd",
Expand All @@ -41,6 +42,7 @@ func TestBuiltinBackupWithZstdCompression(t *testing.T) {

func TestBuiltinBackupWithExternalZstdCompression(t *testing.T) {
defer setDefaultCompressionFlag()
defer setDefaultCommonArgs()
cDetails := &CompressionDetails{
CompressorEngineName: "external",
ExternalCompressorCmd: "zstd",
Expand All @@ -53,6 +55,7 @@ func TestBuiltinBackupWithExternalZstdCompression(t *testing.T) {

func TestBuiltinBackupWithExternalZstdCompressionAndManifestedDecompressor(t *testing.T) {
defer setDefaultCompressionFlag()
defer setDefaultCommonArgs()
cDetails := &CompressionDetails{
CompressorEngineName: "external",
ExternalCompressorCmd: "zstd",
Expand Down
155 changes: 147 additions & 8 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,7 @@ var (
shardKsName = fmt.Sprintf("%s/%s", keyspaceName, shardName)
dbCredentialFile string
shardName = "0"
commonTabletArg = []string{
"--vreplication_retry_delay", "1s",
"--degraded_threshold", "5s",
"--lock_tables_timeout", "5s",
"--watch_replication_stream",
"--enable_replication_reporter",
"--serving_state_grace_period", "1s",
}
commonTabletArg = getDefaultCommonArgs()

vtInsertTest = `
create table vt_insert_test (
Expand Down Expand Up @@ -1482,3 +1475,149 @@ func verifyTabletRestoreStats(t *testing.T, vars map[string]any) {

require.Contains(t, bd, "BackupStorage.File.File:Read")
}

func getDefaultCommonArgs() []string {
return []string{
"--vreplication_retry_delay", "1s",
"--degraded_threshold", "5s",
"--lock_tables_timeout", "5s",
"--watch_replication_stream",
"--enable_replication_reporter",
"--serving_state_grace_period", "1s",
}
}

func setDefaultCommonArgs() { commonTabletArg = getDefaultCommonArgs() }

// fetch the backup engine used on the last backup triggered by the end-to-end tests.
func getBackupEngineOfLastBackup(t *testing.T) string {
lastBackup := getLastBackup(t)

manifest := readManifestFile(t, path.Join(localCluster.CurrentVTDATAROOT, "backups", keyspaceName, shardName, lastBackup))

return manifest.BackupMethod
}

func getLastBackup(t *testing.T) string {
backups, err := localCluster.ListBackups(shardKsName)
require.NoError(t, err)

return backups[len(backups)-1]
}

func TestBackupEngineSelector(t *testing.T) {
defer setDefaultCommonArgs()
defer cluster.PanicHandler(t)

// launch the custer with xtrabackup as the default engine
code, err := LaunchCluster(XtraBackup, "xbstream", 0, &CompressionDetails{CompressorEngineName: "pgzip"})
require.Nilf(t, err, "setup failed with status code %d", code)

defer TearDownCluster()

localCluster.DisableVTOrcRecoveries(t)
defer func() {
localCluster.EnableVTOrcRecoveries(t)
}()
verifyInitialReplication(t)

t.Run("backup with backup-engine=builtin", func(t *testing.T) {
// first try to backup with an alternative engine (builtin)
err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", "--allow-primary", "--backup-engine=builtin", primary.Alias)
require.NoError(t, err)
engineUsed := getBackupEngineOfLastBackup(t)
require.Equal(t, "builtin", engineUsed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

})

t.Run("backup with backup-engine=xtrabackup", func(t *testing.T) {
// then try to backup specifying the xtrabackup engine
err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", "--allow-primary", "--backup-engine=xtrabackup", primary.Alias)
require.NoError(t, err)
engineUsed := getBackupEngineOfLastBackup(t)
require.Equal(t, "xtrabackup", engineUsed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

})

t.Run("backup without specifying backup-engine", func(t *testing.T) {
// check that by default we still use the xtrabackup engine if not specified
err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", "--allow-primary", primary.Alias)
require.NoError(t, err)
engineUsed := getBackupEngineOfLastBackup(t)
require.Equal(t, "xtrabackup", engineUsed)
})
}

func TestRestoreAllowedBackupEngines(t *testing.T) {
defer setDefaultCommonArgs()
defer cluster.PanicHandler(t)

backupMsg := "right after xtrabackup backup"

cDetails := &CompressionDetails{CompressorEngineName: "pgzip"}

// launch the custer with xtrabackup as the default engine
code, err := LaunchCluster(XtraBackup, "xbstream", 0, cDetails)
require.Nilf(t, err, "setup failed with status code %d", code)

defer TearDownCluster()

localCluster.DisableVTOrcRecoveries(t)
defer func() {
localCluster.EnableVTOrcRecoveries(t)
}()
verifyInitialReplication(t)

t.Run("generate backups", func(t *testing.T) {
// lets take two backups, each using a different backup engine
err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", "--allow-primary", "--backup-engine=builtin", primary.Alias)
require.NoError(t, err)

err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", "--allow-primary", "--backup-engine=xtrabackup", primary.Alias)
require.NoError(t, err)
})

// insert more data on the primary
_, err = primary.VttabletProcess.QueryTablet(fmt.Sprintf("insert into vt_insert_test (msg) values ('%s')", backupMsg), keyspaceName, true)
require.NoError(t, err)

t.Run("restore replica and verify data", func(t *testing.T) {
// now bring up another replica, letting it restore from backup.
restoreWaitForBackup(t, "replica", cDetails, true)
err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, timeout)
require.NoError(t, err)

// check the new replica has the data
cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2)
result, err := replica2.VttabletProcess.QueryTablet(
fmt.Sprintf("select msg from vt_insert_test where msg='%s'", backupMsg), replica2.VttabletProcess.Keyspace, true)
require.NoError(t, err)
require.Equal(t, backupMsg, result.Named().Row().AsString("msg", ""))
})

t.Run("test broken restore", func(t *testing.T) {
// now lets break the last backup in the shard
err = os.Remove(path.Join(localCluster.CurrentVTDATAROOT,
"backups", keyspaceName, shardName,
getLastBackup(t), "backup.xbstream.gz"))
require.NoError(t, err)

// and try to restore from it
err = localCluster.VtctldClientProcess.ExecuteCommand("RestoreFromBackup", replica2.Alias)
require.Error(t, err) // this should fail
})

t.Run("test older working backup", func(t *testing.T) {
// now we retry but with the first backup
err = localCluster.VtctldClientProcess.ExecuteCommand("RestoreFromBackup", "--allowed-backup-engines=builtin", replica2.Alias)
require.NoError(t, err) // this should succeed

// make sure we are replicating after the restore is done
err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, timeout)
require.NoError(t, err)
cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2)

result, err := replica2.VttabletProcess.QueryTablet(
fmt.Sprintf("select msg from vt_insert_test where msg='%s'", backupMsg), replica2.VttabletProcess.Keyspace, true)
require.NoError(t, err)
require.Equal(t, backupMsg, result.Named().Row().AsString("msg", ""))
})
}
17 changes: 17 additions & 0 deletions go/test/endtoend/backup/xtrabackup/select_engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package vtctlbackup

import (
"testing"

backup "vitess.io/vitess/go/test/endtoend/backup/vtctlbackup"
)

func TestBackupEngineSelector(t *testing.T) {
defer setDefaultCompressionFlag()
backup.TestBackupEngineSelector(t)
}

func TestRestoreAllowedBackupEngines(t *testing.T) {
defer setDefaultCompressionFlag()
backup.TestRestoreAllowedBackupEngines(t)
}
4 changes: 3 additions & 1 deletion go/vt/mysqlctl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,14 @@ func Backup(ctx context.Context, params BackupParams) error {
// appropriate binlog files.
be = BackupRestoreEngineMap[builtinBackupEngineName]
} else {
be, err = GetBackupEngine()
be, err = GetBackupEngine(params.BackupEngine)
if err != nil {
return vterrors.Wrap(err, "failed to find backup engine")
}
}

params.Logger.Infof("Using backup engine %q", be.Name())

// Take the backup, and either AbortBackup or EndBackup.
backupResult, err := be.ExecuteBackup(ctx, beParams, bh)
logger := params.Logger
Expand Down
16 changes: 8 additions & 8 deletions go/vt/mysqlctl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ func TestRestoreManifestMySQLVersionValidation(t *testing.T) {

manifest := BackupManifest{
BackupTime: time.Now().Add(-1 * time.Hour).Format(time.RFC3339),
BackupMethod: "fake",
BackupMethod: fakeBackupEngineName,
Keyspace: "test",
Shard: "-",
MySQLVersion: tc.fromVersion,
Expand Down Expand Up @@ -598,7 +598,7 @@ func createFakeBackupRestoreEnv(t *testing.T) *fakeBackupRestoreEnv {

manifest := BackupManifest{
BackupTime: FormatRFC3339(time.Now().Add(-1 * time.Hour)),
BackupMethod: "fake",
BackupMethod: fakeBackupEngineName,
Keyspace: "test",
Shard: "-",
MySQLVersion: "8.0.32",
Expand All @@ -611,8 +611,8 @@ func createFakeBackupRestoreEnv(t *testing.T) *fakeBackupRestoreEnv {
testBackupEngine.ExecuteRestoreReturn = FakeBackupEngineExecuteRestoreReturn{&manifest, nil}

previousBackupEngineImplementation := backupEngineImplementation
BackupRestoreEngineMap["fake"] = &testBackupEngine
backupEngineImplementation = "fake"
BackupRestoreEngineMap[fakeBackupEngineName] = &testBackupEngine
backupEngineImplementation = fakeBackupEngineName

testBackupStorage := FakeBackupStorage{}
testBackupStorage.ListBackupsReturn = FakeBackupStorageListBackupsReturn{
Expand All @@ -627,9 +627,9 @@ func createFakeBackupRestoreEnv(t *testing.T) *fakeBackupRestoreEnv {
testBackupStorage.StartBackupReturn = FakeBackupStorageStartBackupReturn{&FakeBackupHandle{}, nil}
testBackupStorage.WithParamsReturn = &testBackupStorage

backupstorage.BackupStorageMap["fake"] = &testBackupStorage
backupstorage.BackupStorageMap[fakeBackupEngineName] = &testBackupStorage
previousBackupStorageImplementation := backupstorage.BackupStorageImplementation
backupstorage.BackupStorageImplementation = "fake"
backupstorage.BackupStorageImplementation = fakeBackupEngineName

// all restore integration tests must be leak checked
t.Cleanup(func() {
Expand All @@ -640,10 +640,10 @@ func createFakeBackupRestoreEnv(t *testing.T) *fakeBackupRestoreEnv {
backupstats.DeprecatedBackupDurationS.Reset()
backupstats.DeprecatedRestoreDurationS.Reset()

delete(BackupRestoreEngineMap, "fake")
delete(BackupRestoreEngineMap, fakeBackupEngineName)
backupEngineImplementation = previousBackupEngineImplementation

delete(backupstorage.BackupStorageMap, "fake")
delete(backupstorage.BackupStorageMap, fakeBackupEngineName)
backupstorage.BackupStorageImplementation = previousBackupStorageImplementation
mysqld.Close()
sqldb.Close()
Expand Down
Loading
Loading