Skip to content

Commit

Permalink
backport upstream 16428
Browse files Browse the repository at this point in the history
  • Loading branch information
rvrangel authored and tanjinx committed Nov 18, 2024
1 parent 6042ea7 commit 9829839
Show file tree
Hide file tree
Showing 27 changed files with 2,721 additions and 2,049 deletions.
35 changes: 23 additions & 12 deletions go/cmd/vtctldclient/command/backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
var (
// Backup makes a Backup gRPC call to a vtctld.
Backup = &cobra.Command{
Use: "Backup [--concurrency <concurrency>] [--allow-primary] [--incremental-from-pos=<pos>|<backup-name>|auto] [--upgrade-safe] <tablet_alias>",
Use: "Backup [--concurrency <concurrency>] [--allow-primary] [--incremental-from-pos=<pos>|<backup-name>|auto] [--upgrade-safe] [--backup-engine=enginename] <tablet_alias>",
Short: "Uses the BackupStorage service on the given tablet to create and store a new backup.",
DisableFlagsInUseLine: true,
Args: cobra.ExactArgs(1),
Expand Down Expand Up @@ -70,7 +70,7 @@ If no replica-type tablet can be found, the backup can be taken on the primary i
}
// RestoreFromBackup makes a RestoreFromBackup gRPC call to a vtctld.
RestoreFromBackup = &cobra.Command{
Use: "RestoreFromBackup [--backup-timestamp|-t <YYYY-mm-DD.HHMMSS>] [--restore-to-pos <pos>] [--dry-run] <tablet_alias>",
Use: "RestoreFromBackup [--backup-timestamp|-t <YYYY-mm-DD.HHMMSS>] [--restore-to-pos <pos>] [--allowed-backup-engines=enginename,] [--dry-run] <tablet_alias>",
Short: "Stops mysqld on the specified tablet and restores the data from either the latest backup or closest before `backup-timestamp`.",
DisableFlagsInUseLine: true,
Args: cobra.ExactArgs(1),
Expand All @@ -80,6 +80,7 @@ If no replica-type tablet can be found, the backup can be taken on the primary i

var backupOptions = struct {
AllowPrimary bool
BackupEngine string
Concurrency int32
IncrementalFromPos string
UpgradeSafe bool
Expand All @@ -93,13 +94,19 @@ func commandBackup(cmd *cobra.Command, args []string) error {

cli.FinishedParsing(cmd)

stream, err := client.Backup(commandCtx, &vtctldatapb.BackupRequest{
req := &vtctldatapb.BackupRequest{
TabletAlias: tabletAlias,
AllowPrimary: backupOptions.AllowPrimary,
Concurrency: backupOptions.Concurrency,
IncrementalFromPos: backupOptions.IncrementalFromPos,
UpgradeSafe: backupOptions.UpgradeSafe,
})
}

if backupOptions.BackupEngine != "" {
req.BackupEngine = &backupOptions.BackupEngine
}

stream, err := client.Backup(commandCtx, req)
if err != nil {
return err
}
Expand Down Expand Up @@ -218,10 +225,11 @@ func commandRemoveBackup(cmd *cobra.Command, args []string) error {
}

var restoreFromBackupOptions = struct {
BackupTimestamp string
RestoreToPos string
RestoreToTimestamp string
DryRun bool
BackupTimestamp string
AllowedBackupEngines []string
RestoreToPos string
RestoreToTimestamp string
DryRun bool
}{}

func commandRestoreFromBackup(cmd *cobra.Command, args []string) error {
Expand All @@ -243,10 +251,11 @@ func commandRestoreFromBackup(cmd *cobra.Command, args []string) error {
}

req := &vtctldatapb.RestoreFromBackupRequest{
TabletAlias: alias,
RestoreToPos: restoreFromBackupOptions.RestoreToPos,
RestoreToTimestamp: protoutil.TimeToProto(restoreToTimestamp),
DryRun: restoreFromBackupOptions.DryRun,
TabletAlias: alias,
RestoreToPos: restoreFromBackupOptions.RestoreToPos,
RestoreToTimestamp: protoutil.TimeToProto(restoreToTimestamp),
DryRun: restoreFromBackupOptions.DryRun,
AllowedBackupEngines: restoreFromBackupOptions.AllowedBackupEngines,
}

if restoreFromBackupOptions.BackupTimestamp != "" {
Expand Down Expand Up @@ -282,6 +291,7 @@ func init() {
Backup.Flags().BoolVar(&backupOptions.AllowPrimary, "allow-primary", false, "Allow the primary of a shard to be used for the backup. WARNING: If using the builtin backup engine, this will shutdown mysqld on the primary and stop writes for the duration of the backup.")
Backup.Flags().Int32Var(&backupOptions.Concurrency, "concurrency", 4, "Specifies the number of compression/checksum jobs to run simultaneously.")
Backup.Flags().StringVar(&backupOptions.IncrementalFromPos, "incremental-from-pos", "", "Position, or name of backup from which to create an incremental backup. Default: empty. If given, then this backup becomes an incremental backup from given position or given backup. If value is 'auto', this backup will be taken from the last successful backup position.")
Backup.Flags().StringVar(&backupOptions.BackupEngine, "backup-engine", "", "Request a specific backup engine for this backup request. Defaults to the preferred backup engine of the target vttablet")

Backup.Flags().BoolVar(&backupOptions.UpgradeSafe, "upgrade-safe", false, "Whether to use innodb_fast_shutdown=0 for the backup so it is safe to use for MySQL upgrades.")
Root.AddCommand(Backup)
Expand All @@ -299,6 +309,7 @@ func init() {
Root.AddCommand(RemoveBackup)

RestoreFromBackup.Flags().StringVarP(&restoreFromBackupOptions.BackupTimestamp, "backup-timestamp", "t", "", "Use the backup taken at, or closest before, this timestamp. Omit to use the latest backup. Timestamp format is \"YYYY-mm-DD.HHMMSS\".")
RestoreFromBackup.Flags().StringSliceVar(&restoreFromBackupOptions.AllowedBackupEngines, "allowed-backup-engines", restoreFromBackupOptions.AllowedBackupEngines, "if set, only backups taken with the specified engines are eligible to be restored")
RestoreFromBackup.Flags().StringVar(&restoreFromBackupOptions.RestoreToPos, "restore-to-pos", "", "Run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups")
RestoreFromBackup.Flags().StringVar(&restoreFromBackupOptions.RestoreToTimestamp, "restore-to-timestamp", "", "Run a point in time recovery that restores up to, and excluding, given timestamp in RFC3339 format (`2006-01-02T15:04:05Z07:00`). This will attempt to use one full backup followed by zero or more incremental backups")
RestoreFromBackup.Flags().BoolVar(&restoreFromBackupOptions.DryRun, "dry-run", false, "Only validate restore steps, do not actually restore data")
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ Flags:
--relay_log_max_size int Maximum buffer size (in bytes) for VReplication target buffering. If single rows are larger than this, a single row is buffered at a time. (default 250000)
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
--replication_connect_retry duration how long to wait in between replica reconnect attempts. Only precise to the second. (default 10s)
--restore-from-backup-allowed-engines strings (init restore parameter) if set, only backups taken with the specified engines are eligible to be restored
--restore-to-pos string (init incremental restore parameter) if set, run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups
--restore-to-timestamp string (init incremental restore parameter) if set, run a point in time recovery that restores up to the given timestamp, if possible. Given timestamp in RFC3339 format. Example: '2006-01-02T15:04:05Z07:00'
--restore_concurrency int (init restore parameter) how many concurrent files to restore at once (default 4)
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ Flags:
--relay_log_max_size int Maximum buffer size (in bytes) for VReplication target buffering. If single rows are larger than this, a single row is buffered at a time. (default 250000)
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
--replication_connect_retry duration how long to wait in between replica reconnect attempts. Only precise to the second. (default 10s)
--restore-from-backup-allowed-engines strings (init restore parameter) if set, only backups taken with the specified engines are eligible to be restored
--restore-to-pos string (init incremental restore parameter) if set, run a point in time recovery that ends with the given position. This will attempt to use one full backup followed by zero or more incremental backups
--restore-to-timestamp string (init incremental restore parameter) if set, run a point in time recovery that restores up to the given timestamp, if possible. Given timestamp in RFC3339 format. Example: '2006-01-02T15:04:05Z07:00'
--restore_concurrency int (init restore parameter) how many concurrent files to restore at once (default 4)
Expand Down
3 changes: 3 additions & 0 deletions go/test/endtoend/backup/vtctlbackup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestBuiltinBackup(t *testing.T) {

func TestBuiltinBackupWithZstdCompression(t *testing.T) {
defer setDefaultCompressionFlag()
defer setDefaultCommonArgs()
cDetails := &CompressionDetails{
CompressorEngineName: "zstd",
ExternalCompressorCmd: "zstd",
Expand All @@ -41,6 +42,7 @@ func TestBuiltinBackupWithZstdCompression(t *testing.T) {

func TestBuiltinBackupWithExternalZstdCompression(t *testing.T) {
defer setDefaultCompressionFlag()
defer setDefaultCommonArgs()
cDetails := &CompressionDetails{
CompressorEngineName: "external",
ExternalCompressorCmd: "zstd",
Expand All @@ -53,6 +55,7 @@ func TestBuiltinBackupWithExternalZstdCompression(t *testing.T) {

func TestBuiltinBackupWithExternalZstdCompressionAndManifestedDecompressor(t *testing.T) {
defer setDefaultCompressionFlag()
defer setDefaultCommonArgs()
cDetails := &CompressionDetails{
CompressorEngineName: "external",
ExternalCompressorCmd: "zstd",
Expand Down
155 changes: 147 additions & 8 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,7 @@ var (
shardKsName = fmt.Sprintf("%s/%s", keyspaceName, shardName)
dbCredentialFile string
shardName = "0"
commonTabletArg = []string{
"--vreplication_retry_delay", "1s",
"--degraded_threshold", "5s",
"--lock_tables_timeout", "5s",
"--watch_replication_stream",
"--enable_replication_reporter",
"--serving_state_grace_period", "1s",
}
commonTabletArg = getDefaultCommonArgs()

vtInsertTest = `
create table vt_insert_test (
Expand Down Expand Up @@ -1456,3 +1449,149 @@ func verifyTabletRestoreStats(t *testing.T, vars map[string]any) {

require.Contains(t, bd, "BackupStorage.File.File:Read")
}

func getDefaultCommonArgs() []string {
return []string{
"--vreplication_retry_delay", "1s",
"--degraded_threshold", "5s",
"--lock_tables_timeout", "5s",
"--watch_replication_stream",
"--enable_replication_reporter",
"--serving_state_grace_period", "1s",
}
}

func setDefaultCommonArgs() { commonTabletArg = getDefaultCommonArgs() }

// fetch the backup engine used on the last backup triggered by the end-to-end tests.
func getBackupEngineOfLastBackup(t *testing.T) string {
lastBackup := getLastBackup(t)

manifest := readManifestFile(t, path.Join(localCluster.CurrentVTDATAROOT, "backups", keyspaceName, shardName, lastBackup))

return manifest.BackupMethod
}

func getLastBackup(t *testing.T) string {
backups, err := localCluster.ListBackups(shardKsName)
require.NoError(t, err)

return backups[len(backups)-1]
}

func TestBackupEngineSelector(t *testing.T) {
defer setDefaultCommonArgs()
defer cluster.PanicHandler(t)

// launch the custer with xtrabackup as the default engine
code, err := LaunchCluster(XtraBackup, "xbstream", 0, &CompressionDetails{CompressorEngineName: "pgzip"})
require.Nilf(t, err, "setup failed with status code %d", code)

defer TearDownCluster()

localCluster.DisableVTOrcRecoveries(t)
defer func() {
localCluster.EnableVTOrcRecoveries(t)
}()
verifyInitialReplication(t)

t.Run("backup with backup-engine=builtin", func(t *testing.T) {
// first try to backup with an alternative engine (builtin)
err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", "--allow-primary", "--backup-engine=builtin", primary.Alias)
require.NoError(t, err)
engineUsed := getBackupEngineOfLastBackup(t)
require.Equal(t, "builtin", engineUsed)
})

t.Run("backup with backup-engine=xtrabackup", func(t *testing.T) {
// then try to backup specifying the xtrabackup engine
err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", "--allow-primary", "--backup-engine=xtrabackup", primary.Alias)
require.NoError(t, err)
engineUsed := getBackupEngineOfLastBackup(t)
require.Equal(t, "xtrabackup", engineUsed)
})

t.Run("backup without specifying backup-engine", func(t *testing.T) {
// check that by default we still use the xtrabackup engine if not specified
err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", "--allow-primary", primary.Alias)
require.NoError(t, err)
engineUsed := getBackupEngineOfLastBackup(t)
require.Equal(t, "xtrabackup", engineUsed)
})
}

func TestRestoreAllowedBackupEngines(t *testing.T) {
defer setDefaultCommonArgs()
defer cluster.PanicHandler(t)

backupMsg := "right after xtrabackup backup"

cDetails := &CompressionDetails{CompressorEngineName: "pgzip"}

// launch the custer with xtrabackup as the default engine
code, err := LaunchCluster(XtraBackup, "xbstream", 0, cDetails)
require.Nilf(t, err, "setup failed with status code %d", code)

defer TearDownCluster()

localCluster.DisableVTOrcRecoveries(t)
defer func() {
localCluster.EnableVTOrcRecoveries(t)
}()
verifyInitialReplication(t)

t.Run("generate backups", func(t *testing.T) {
// lets take two backups, each using a different backup engine
err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", "--allow-primary", "--backup-engine=builtin", primary.Alias)
require.NoError(t, err)

err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", "--allow-primary", "--backup-engine=xtrabackup", primary.Alias)
require.NoError(t, err)
})

// insert more data on the primary
_, err = primary.VttabletProcess.QueryTablet(fmt.Sprintf("insert into vt_insert_test (msg) values ('%s')", backupMsg), keyspaceName, true)
require.NoError(t, err)

t.Run("restore replica and verify data", func(t *testing.T) {
// now bring up another replica, letting it restore from backup.
restoreWaitForBackup(t, "replica", cDetails, true)
err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, timeout)
require.NoError(t, err)

// check the new replica has the data
cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2)
result, err := replica2.VttabletProcess.QueryTablet(
fmt.Sprintf("select msg from vt_insert_test where msg='%s'", backupMsg), replica2.VttabletProcess.Keyspace, true)
require.NoError(t, err)
require.Equal(t, backupMsg, result.Named().Row().AsString("msg", ""))
})

t.Run("test broken restore", func(t *testing.T) {
// now lets break the last backup in the shard
err = os.Remove(path.Join(localCluster.CurrentVTDATAROOT,
"backups", keyspaceName, shardName,
getLastBackup(t), "backup.xbstream.gz"))
require.NoError(t, err)

// and try to restore from it
err = localCluster.VtctldClientProcess.ExecuteCommand("RestoreFromBackup", replica2.Alias)
require.Error(t, err) // this should fail
})

t.Run("test older working backup", func(t *testing.T) {
// now we retry but with the first backup
err = localCluster.VtctldClientProcess.ExecuteCommand("RestoreFromBackup", "--allowed-backup-engines=builtin", replica2.Alias)
require.NoError(t, err) // this should succeed

// make sure we are replicating after the restore is done
err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, timeout)
require.NoError(t, err)
cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2)

result, err := replica2.VttabletProcess.QueryTablet(
fmt.Sprintf("select msg from vt_insert_test where msg='%s'", backupMsg), replica2.VttabletProcess.Keyspace, true)
require.NoError(t, err)
require.Equal(t, backupMsg, result.Named().Row().AsString("msg", ""))
})
}
17 changes: 17 additions & 0 deletions go/test/endtoend/backup/xtrabackup/select_engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package vtctlbackup

import (
"testing"

backup "vitess.io/vitess/go/test/endtoend/backup/vtctlbackup"
)

func TestBackupEngineSelector(t *testing.T) {
defer setDefaultCompressionFlag()
backup.TestBackupEngineSelector(t)
}

func TestRestoreAllowedBackupEngines(t *testing.T) {
defer setDefaultCompressionFlag()
backup.TestRestoreAllowedBackupEngines(t)
}
4 changes: 3 additions & 1 deletion go/vt/mysqlctl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,14 @@ func Backup(ctx context.Context, params BackupParams) error {
// appropriate binlog files.
be = BackupRestoreEngineMap[builtinBackupEngineName]
} else {
be, err = GetBackupEngine()
be, err = GetBackupEngine(params.BackupEngine)
if err != nil {
return vterrors.Wrap(err, "failed to find backup engine")
}
}

params.Logger.Infof("Using backup engine %q", be.Name())

// Take the backup, and either AbortBackup or EndBackup.
backupResult, err := be.ExecuteBackup(ctx, beParams, bh)
logger := params.Logger
Expand Down
16 changes: 8 additions & 8 deletions go/vt/mysqlctl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func TestRestoreManifestMySQLVersionValidation(t *testing.T) {

manifest := BackupManifest{
BackupTime: time.Now().Add(-1 * time.Hour).Format(time.RFC3339),
BackupMethod: "fake",
BackupMethod: fakeBackupEngineName,
Keyspace: "test",
Shard: "-",
MySQLVersion: tc.fromVersion,
Expand Down Expand Up @@ -613,7 +613,7 @@ func createFakeBackupRestoreEnv(t *testing.T) *fakeBackupRestoreEnv {

manifest := BackupManifest{
BackupTime: FormatRFC3339(time.Now().Add(-1 * time.Hour)),
BackupMethod: "fake",
BackupMethod: fakeBackupEngineName,
Keyspace: "test",
Shard: "-",
MySQLVersion: "8.0.32",
Expand All @@ -626,8 +626,8 @@ func createFakeBackupRestoreEnv(t *testing.T) *fakeBackupRestoreEnv {
testBackupEngine.ExecuteRestoreReturn = FakeBackupEngineExecuteRestoreReturn{&manifest, nil}

previousBackupEngineImplementation := backupEngineImplementation
BackupRestoreEngineMap["fake"] = &testBackupEngine
backupEngineImplementation = "fake"
BackupRestoreEngineMap[fakeBackupEngineName] = &testBackupEngine
backupEngineImplementation = fakeBackupEngineName

testBackupStorage := FakeBackupStorage{}
testBackupStorage.ListBackupsReturn = FakeBackupStorageListBackupsReturn{
Expand All @@ -642,9 +642,9 @@ func createFakeBackupRestoreEnv(t *testing.T) *fakeBackupRestoreEnv {
testBackupStorage.StartBackupReturn = FakeBackupStorageStartBackupReturn{&FakeBackupHandle{}, nil}
testBackupStorage.WithParamsReturn = &testBackupStorage

backupstorage.BackupStorageMap["fake"] = &testBackupStorage
backupstorage.BackupStorageMap[fakeBackupEngineName] = &testBackupStorage
previousBackupStorageImplementation := backupstorage.BackupStorageImplementation
backupstorage.BackupStorageImplementation = "fake"
backupstorage.BackupStorageImplementation = fakeBackupEngineName

// all restore integration tests must be leak checked
t.Cleanup(func() {
Expand All @@ -655,10 +655,10 @@ func createFakeBackupRestoreEnv(t *testing.T) *fakeBackupRestoreEnv {
backupstats.DeprecatedBackupDurationS.Reset()
backupstats.DeprecatedRestoreDurationS.Reset()

delete(BackupRestoreEngineMap, "fake")
delete(BackupRestoreEngineMap, fakeBackupEngineName)
backupEngineImplementation = previousBackupEngineImplementation

delete(backupstorage.BackupStorageMap, "fake")
delete(backupstorage.BackupStorageMap, fakeBackupEngineName)
backupstorage.BackupStorageImplementation = previousBackupStorageImplementation
mysqld.Close()
sqldb.Close()
Expand Down
Loading

0 comments on commit 9829839

Please sign in to comment.