Skip to content

Commit

Permalink
Backup: --incremental-from-pos supports backup name (#14923)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
Co-authored-by: Matt Lord <[email protected]>
Co-authored-by: Deepthi Sigireddi <[email protected]>
  • Loading branch information
3 people authored Jan 29, 2024
1 parent 38573f0 commit b977e51
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 73 deletions.
2 changes: 1 addition & 1 deletion go/cmd/vtbackup/cli/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func init() {
Main.Flags().StringVar(&initKeyspace, "init_keyspace", initKeyspace, "(init parameter) keyspace to use for this tablet")
Main.Flags().StringVar(&initShard, "init_shard", initShard, "(init parameter) shard to use for this tablet")
Main.Flags().IntVar(&concurrency, "concurrency", concurrency, "(init restore parameter) how many concurrent files to restore at once")
Main.Flags().StringVar(&incrementalFromPos, "incremental_from_pos", incrementalFromPos, "Position of previous backup. Default: empty. If given, then this backup becomes an incremental backup from given position. If value is 'auto', backup taken from last successful backup position")
Main.Flags().StringVar(&incrementalFromPos, "incremental_from_pos", incrementalFromPos, "Position, or name of backup from which to create an incremental backup. Default: empty. If given, then this backup becomes an incremental backup from given position or given backup. If value is 'auto', this backup will be taken from the last successful backup position.")

// mysqlctld-like flags
Main.Flags().IntVar(&mysqlPort, "mysql_port", mysqlPort, "mysql port")
Expand Down
8 changes: 4 additions & 4 deletions go/cmd/vtctldclient/command/backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ import (
var (
// Backup makes a Backup gRPC call to a vtctld.
Backup = &cobra.Command{
Use: "Backup [--concurrency <concurrency>] [--allow-primary] [--incremental-from-pos=<pos>|auto] [--upgrade-safe] <tablet_alias>",
Use: "Backup [--concurrency <concurrency>] [--allow-primary] [--incremental-from-pos=<pos>|<backup-name>|auto] [--upgrade-safe] <tablet_alias>",
Short: "Uses the BackupStorage service on the given tablet to create and store a new backup.",
DisableFlagsInUseLine: true,
Args: cobra.ExactArgs(1),
RunE: commandBackup,
}
// BackupShard makes a BackupShard gRPC call to a vtctld.
BackupShard = &cobra.Command{
Use: "BackupShard [--concurrency <concurrency>] [--allow-primary] [--incremental-from-pos=<pos>|auto] [--upgrade-safe] <keyspace/shard>",
Use: "BackupShard [--concurrency <concurrency>] [--allow-primary] [--incremental-from-pos=<pos>|<backup-name>|auto] [--upgrade-safe] <keyspace/shard>",
Short: "Finds the most up-to-date REPLICA, RDONLY, or SPARE tablet in the given shard and uses the BackupStorage service on that tablet to create and store a new backup.",
Long: `Finds the most up-to-date REPLICA, RDONLY, or SPARE tablet in the given shard and uses the BackupStorage service on that tablet to create and store a new backup.
Expand Down Expand Up @@ -281,14 +281,14 @@ func commandRestoreFromBackup(cmd *cobra.Command, args []string) error {
func init() {
Backup.Flags().BoolVar(&backupOptions.AllowPrimary, "allow-primary", false, "Allow the primary of a shard to be used for the backup. WARNING: If using the builtin backup engine, this will shutdown mysqld on the primary and stop writes for the duration of the backup.")
Backup.Flags().Int32Var(&backupOptions.Concurrency, "concurrency", 4, "Specifies the number of compression/checksum jobs to run simultaneously.")
Backup.Flags().StringVar(&backupOptions.IncrementalFromPos, "incremental-from-pos", "", "Position of previous backup. Default: empty. If given, then this backup becomes an incremental backup from given position. If value is 'auto', backup taken from last successful backup position")
Backup.Flags().StringVar(&backupOptions.IncrementalFromPos, "incremental-from-pos", "", "Position, or name of backup from which to create an incremental backup. Default: empty. If given, then this backup becomes an incremental backup from given position or given backup. If value is 'auto', this backup will be taken from the last successful backup position.")

Backup.Flags().BoolVar(&backupOptions.UpgradeSafe, "upgrade-safe", false, "Whether to use innodb_fast_shutdown=0 for the backup so it is safe to use for MySQL upgrades.")
Root.AddCommand(Backup)

BackupShard.Flags().BoolVar(&backupShardOptions.AllowPrimary, "allow-primary", false, "Allow the primary of a shard to be used for the backup. WARNING: If using the builtin backup engine, this will shutdown mysqld on the primary and stop writes for the duration of the backup.")
BackupShard.Flags().Int32Var(&backupShardOptions.Concurrency, "concurrency", 4, "Specifies the number of compression/checksum jobs to run simultaneously.")
BackupShard.Flags().StringVar(&backupShardOptions.IncrementalFromPos, "incremental-from-pos", "", "Position of previous backup. Default: empty. If given, then this backup becomes an incremental backup from given position. If value is 'auto', backup taken from last successful backup position")
BackupShard.Flags().StringVar(&backupShardOptions.IncrementalFromPos, "incremental-from-pos", "", "Position, or name of backup from which to create an incremental backup. Default: empty. If given, then this backup becomes an incremental backup from given position or given backup. If value is 'auto', this backup will be taken from the last successful backup position.")
BackupShard.Flags().BoolVar(&backupOptions.UpgradeSafe, "upgrade-safe", false, "Whether to use innodb_fast_shutdown=0 for the backup so it is safe to use for MySQL upgrades.")
Root.AddCommand(BackupShard)

Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtbackup.txt
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ Flags:
--grpc_max_message_size int Maximum allowed RPC message size. Larger messages will be rejected by gRPC with the error 'exceeding the max size'. (default 16777216)
--grpc_prometheus Enable gRPC monitoring with Prometheus.
-h, --help help for vtbackup
--incremental_from_pos string Position of previous backup. Default: empty. If given, then this backup becomes an incremental backup from given position. If value is 'auto', backup taken from last successful backup position
--incremental_from_pos string Position, or name of backup from which to create an incremental backup. Default: empty. If given, then this backup becomes an incremental backup from given position or given backup. If value is 'auto', this backup will be taken from the last successful backup position.
--init_db_name_override string (init parameter) override the name of the db used by vttablet
--init_db_sql_file string path to .sql file to run after mysql_install_db
--init_keyspace string (init parameter) keyspace to use for this tablet
Expand Down
10 changes: 3 additions & 7 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,13 +1255,9 @@ func waitForNumBackups(t *testing.T, expectNumBackups int) []string {
}
}

func testReplicaIncrementalBackup(t *testing.T, replica *cluster.Vttablet, incrementalFromPos replication.Position, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) {
func testReplicaIncrementalBackup(t *testing.T, replica *cluster.Vttablet, incrementalFromPos string, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) {
numBackups := len(waitForNumBackups(t, -1))
incrementalFromPosArg := "auto"
if !incrementalFromPos.IsZero() {
incrementalFromPosArg = replication.EncodePosition(incrementalFromPos)
}
output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("Backup", "--incremental-from-pos", incrementalFromPosArg, replica.Alias)
output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("Backup", "--incremental-from-pos", incrementalFromPos, replica.Alias)
if expectError != "" {
require.Errorf(t, err, "expected: %v", expectError)
require.Contains(t, output, expectError)
Expand All @@ -1278,7 +1274,7 @@ func testReplicaIncrementalBackup(t *testing.T, replica *cluster.Vttablet, incre
return readManifestFile(t, backupLocation), backupName
}

func TestReplicaIncrementalBackup(t *testing.T, replicaIndex int, incrementalFromPos replication.Position, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) {
func TestReplicaIncrementalBackup(t *testing.T, replicaIndex int, incrementalFromPos string, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) {
replica := getReplica(t, replicaIndex)
return testReplicaIncrementalBackup(t, replica, incrementalFromPos, expectError)
}
Expand Down
123 changes: 83 additions & 40 deletions go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ const (
operationFlushAndPurge
)

type incrementalFromPosType int

const (
incrementalFromPosPosition incrementalFromPosType = iota
incrementalFromPosAuto
incrementalFromPosBackupName
)

type PITRTestCase struct {
Name string
SetupType int
Expand Down Expand Up @@ -106,6 +114,7 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
}

var fullBackupPos replication.Position
var lastBackupName string
t.Run("full backup", func(t *testing.T) {
InsertRowOnPrimary(t, "before-full-backup")
waitForReplica(t, 0)
Expand All @@ -118,6 +127,8 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
pos := replication.EncodePosition(fullBackupPos)
backupPositions = append(backupPositions, pos)
rowsPerPosition[pos] = len(msgs)

lastBackupName = manifest.BackupName
})

lastBackupPos := fullBackupPos
Expand All @@ -127,50 +138,57 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
name string
writeBeforeBackup bool
fromFullPosition bool
autoPosition bool
incrementalFrom incrementalFromPosType
expectError string
}{
{
name: "first incremental backup",
name: "first incremental backup",
incrementalFrom: incrementalFromPosPosition,
},
{
name: "fail1",
expectError: "no binary logs to backup",
name: "fail1",
incrementalFrom: incrementalFromPosPosition,
expectError: "no binary logs to backup",
},
{
name: "fail2",
expectError: "no binary logs to backup",
name: "fail2",
incrementalFrom: incrementalFromPosPosition,
expectError: "no binary logs to backup",
},
{
name: "make writes, succeed",
writeBeforeBackup: true,
incrementalFrom: incrementalFromPosPosition,
},
{
name: "fail, no binary logs to backup",
expectError: "no binary logs to backup",
name: "fail, no binary logs to backup",
incrementalFrom: incrementalFromPosPosition,
expectError: "no binary logs to backup",
},
{
name: "make writes again, succeed",
writeBeforeBackup: true,
incrementalFrom: incrementalFromPosBackupName,
},
{
name: "auto position, succeed",
writeBeforeBackup: true,
autoPosition: true,
incrementalFrom: incrementalFromPosAuto,
},
{
name: "fail auto position, no binary logs to backup",
autoPosition: true,
expectError: "no binary logs to backup",
name: "fail auto position, no binary logs to backup",
incrementalFrom: incrementalFromPosAuto,
expectError: "no binary logs to backup",
},
{
name: "auto position, make writes again, succeed",
writeBeforeBackup: true,
autoPosition: true,
incrementalFrom: incrementalFromPosAuto,
},
{
name: "from full backup position",
fromFullPosition: true,
incrementalFrom: incrementalFromPosPosition,
},
}
var fromFullPositionBackups []string
Expand All @@ -192,11 +210,16 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
// - auto
// - explicit last backup pos
// - back in history to the original full backup
var incrementalFromPos replication.Position
if !tc.autoPosition {
incrementalFromPos = lastBackupPos
var incrementalFromPos string
switch tc.incrementalFrom {
case incrementalFromPosAuto:
incrementalFromPos = mysqlctl.AutoIncrementalFromPos
case incrementalFromPosBackupName:
incrementalFromPos = lastBackupName
case incrementalFromPosPosition:
incrementalFromPos = replication.EncodePosition(lastBackupPos)
if tc.fromFullPosition {
incrementalFromPos = fullBackupPos
incrementalFromPos = replication.EncodePosition(fullBackupPos)
}
}
// always use same 1st replica
Expand All @@ -206,6 +229,7 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
}
defer func() {
lastBackupPos = manifest.Position
lastBackupName = manifest.BackupName
}()
if tc.fromFullPosition {
fromFullPositionBackups = append(fromFullPositionBackups, backupName)
Expand All @@ -219,8 +243,10 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
fromPositionIncludingPurged := manifest.FromPosition.GTIDSet.Union(gtidPurgedPos.GTIDSet)

expectFromPosition := lastBackupPos.GTIDSet
if !incrementalFromPos.IsZero() {
expectFromPosition = incrementalFromPos.GTIDSet.Union(gtidPurgedPos.GTIDSet)
if tc.incrementalFrom == incrementalFromPosPosition {
pos, err := replication.DecodePosition(incrementalFromPos)
assert.NoError(t, err)
expectFromPosition = pos.GTIDSet.Union(gtidPurgedPos.GTIDSet)
}
require.Equalf(t, expectFromPosition, fromPositionIncludingPurged, "expected: %v, found: %v, gtid_purged: %v, manifest.Position: %v", expectFromPosition, fromPositionIncludingPurged, gtidPurgedPos, manifest.Position)
})
Expand Down Expand Up @@ -304,6 +330,7 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
testedBackups := []testedBackupTimestampInfo{}

var fullBackupPos replication.Position
var lastBackupName string
t.Run("full backup", func(t *testing.T) {
insertRowOnPrimary(t, "before-full-backup")
waitForReplica(t, 0)
Expand All @@ -314,6 +341,8 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
//
rows := ReadRowsFromReplica(t, 0)
testedBackups = append(testedBackups, testedBackupTimestampInfo{len(rows), time.Now()})

lastBackupName = manifest.BackupName
})

lastBackupPos := fullBackupPos
Expand All @@ -323,50 +352,57 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
name string
writeBeforeBackup bool
fromFullPosition bool
autoPosition bool
incrementalFrom incrementalFromPosType
expectError string
}{
{
name: "first incremental backup",
name: "first incremental backup",
incrementalFrom: incrementalFromPosPosition,
},
{
name: "fail1",
expectError: "no binary logs to backup",
name: "fail1",
incrementalFrom: incrementalFromPosPosition,
expectError: "no binary logs to backup",
},
{
name: "fail2",
expectError: "no binary logs to backup",
name: "fail2",
incrementalFrom: incrementalFromPosPosition,
expectError: "no binary logs to backup",
},
{
name: "make writes, succeed",
writeBeforeBackup: true,
incrementalFrom: incrementalFromPosPosition,
},
{
name: "fail, no binary logs to backup",
expectError: "no binary logs to backup",
name: "fail, no binary logs to backup",
incrementalFrom: incrementalFromPosPosition,
expectError: "no binary logs to backup",
},
{
name: "make writes again, succeed",
writeBeforeBackup: true,
incrementalFrom: incrementalFromPosBackupName,
},
{
name: "auto position, succeed",
writeBeforeBackup: true,
autoPosition: true,
incrementalFrom: incrementalFromPosAuto,
},
{
name: "fail auto position, no binary logs to backup",
autoPosition: true,
expectError: "no binary logs to backup",
name: "fail auto position, no binary logs to backup",
incrementalFrom: incrementalFromPosAuto,
expectError: "no binary logs to backup",
},
{
name: "auto position, make writes again, succeed",
writeBeforeBackup: true,
autoPosition: true,
incrementalFrom: incrementalFromPosAuto,
},
{
name: "from full backup position",
fromFullPosition: true,
incrementalFrom: incrementalFromPosPosition,
},
}
var fromFullPositionBackups []string
Expand All @@ -386,11 +422,16 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
// - auto
// - explicit last backup pos
// - back in history to the original full backup
var incrementalFromPos replication.Position
if !tc.autoPosition {
incrementalFromPos = lastBackupPos
var incrementalFromPos string
switch tc.incrementalFrom {
case incrementalFromPosAuto:
incrementalFromPos = mysqlctl.AutoIncrementalFromPos
case incrementalFromPosBackupName:
incrementalFromPos = lastBackupName
case incrementalFromPosPosition:
incrementalFromPos = replication.EncodePosition(lastBackupPos)
if tc.fromFullPosition {
incrementalFromPos = fullBackupPos
incrementalFromPos = replication.EncodePosition(fullBackupPos)
}
}
manifest, backupName := TestReplicaIncrementalBackup(t, 0, incrementalFromPos, tc.expectError)
Expand All @@ -405,6 +446,7 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
testedBackups = append(testedBackups, testedBackupTimestampInfo{len(rowsBeforeBackup), time.Now()})
defer func() {
lastBackupPos = manifest.Position
lastBackupName = manifest.BackupName
}()
if tc.fromFullPosition {
fromFullPositionBackups = append(fromFullPositionBackups, backupName)
Expand Down Expand Up @@ -434,8 +476,10 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
fromPositionIncludingPurged := manifest.FromPosition.GTIDSet.Union(gtidPurgedPos.GTIDSet)

expectFromPosition := lastBackupPos.GTIDSet.Union(gtidPurgedPos.GTIDSet)
if !incrementalFromPos.IsZero() {
expectFromPosition = incrementalFromPos.GTIDSet.Union(gtidPurgedPos.GTIDSet)
if tc.incrementalFrom == incrementalFromPosPosition {
pos, err := replication.DecodePosition(incrementalFromPos)
assert.NoError(t, err)
expectFromPosition = pos.GTIDSet.Union(gtidPurgedPos.GTIDSet)
}
require.Equalf(t, expectFromPosition, fromPositionIncludingPurged, "expected: %v, found: %v, gtid_purged: %v, manifest.Position: %v", expectFromPosition, fromPositionIncludingPurged, gtidPurgedPos, manifest.Position)
})
Expand Down Expand Up @@ -663,8 +707,7 @@ func ExecTestIncrementalBackupOnTwoTablets(t *testing.T, tcase *PITRTestCase) {

lastBackupPos = fullBackupPos
case operationIncrementalBackup:
var incrementalFromPos replication.Position // keep zero, we will use "auto"
manifest, _ := TestReplicaIncrementalBackup(t, tc.replicaIndex, incrementalFromPos, tc.expectError)
manifest, _ := TestReplicaIncrementalBackup(t, tc.replicaIndex, "auto", tc.expectError)
if tc.expectError != "" {
return
}
Expand Down
Loading

0 comments on commit b977e51

Please sign in to comment.