Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check keyspace snapshot time for restore #105

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
uses: actions/setup-go@v3
with:
go-version: 1.19.3
go-version: 1.18.7

- name: Set up python
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
Expand Down
11 changes: 7 additions & 4 deletions go/test/endtoend/recovery/recovery_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,21 @@ func VerifyQueriesUsingVtgate(t *testing.T, session *vtgateconn.VTGateSession, q
}

// RestoreTablet performs a PITR restore.
func RestoreTablet(t *testing.T, localCluster *cluster.LocalProcessCluster, tablet *cluster.Vttablet, restoreKSName string, shardName string, keyspaceName string, commonTabletArg []string) {
func RestoreTablet(t *testing.T, localCluster *cluster.LocalProcessCluster, tablet *cluster.Vttablet, restoreKSName string, shardName string, keyspaceName string, commonTabletArg []string, restoreTime time.Time) {
tablet.ValidateTabletRestart(t)
replicaTabletArgs := commonTabletArg

_, err := localCluster.VtctlProcess.ExecuteCommandWithOutput("GetKeyspace", restoreKSName)

if err != nil {
tm := time.Now().UTC()
tm.Format(time.RFC3339)
if restoreTime.IsZero() {
restoreTime = time.Now().UTC()
}
restoreTime.Format(time.RFC3339)

_, err := localCluster.VtctlProcess.ExecuteCommandWithOutput("CreateKeyspace", "--",
"--keyspace_type=SNAPSHOT", "--base_keyspace="+keyspaceName,
"--snapshot_time", tm.Format(time.RFC3339), restoreKSName)
"--snapshot_time", restoreTime.Format(time.RFC3339), restoreKSName)
require.Nil(t, err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -194,7 +195,7 @@ func TestUnShardedRecoveryAfterSharding(t *testing.T) {
require.NoError(t, err)

// now bring up the recovery keyspace and a tablet, letting it restore from backup.
recovery.RestoreTablet(t, localCluster, replica2, recoveryKS, "0", keyspaceName, commonTabletArg)
recovery.RestoreTablet(t, localCluster, replica2, recoveryKS, "0", keyspaceName, commonTabletArg, time.Time{})

// check the new replica does not have the data
cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2)
Expand Down Expand Up @@ -395,8 +396,8 @@ func TestShardedRecovery(t *testing.T) {
require.NoError(t, err)

// now bring up the recovery keyspace and 2 tablets, letting it restore from backup.
recovery.RestoreTablet(t, localCluster, replica2, recoveryKS, "-80", keyspaceName, commonTabletArg)
recovery.RestoreTablet(t, localCluster, replica3, recoveryKS, "80-", keyspaceName, commonTabletArg)
recovery.RestoreTablet(t, localCluster, replica2, recoveryKS, "-80", keyspaceName, commonTabletArg, time.Time{})
recovery.RestoreTablet(t, localCluster, replica3, recoveryKS, "80-", keyspaceName, commonTabletArg, time.Time{})

// check the new replicas have the correct number of rows
cluster.VerifyRowsInTablet(t, replica2, keyspaceName, shard0Count)
Expand Down
117 changes: 66 additions & 51 deletions go/test/endtoend/recovery/unshardedrecovery/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os/exec"
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -112,7 +113,7 @@ SET GLOBAL old_alter_table = ON;
}

var mysqlProcs []*exec.Cmd
for i := 0; i < 4; i++ {
for i := 0; i < 5; i++ {
tabletType := "replica"
if i == 0 {
tabletType = "primary"
Expand Down Expand Up @@ -175,43 +176,47 @@ SET GLOBAL old_alter_table = ON;
}

// TestRecoveryImpl does following
// - create a shard with primary and replica1 only
// - run InitShardPrimary
// - insert some data
// - take a backup
// - insert more data on the primary
// - take another backup
// - create a recovery keyspace after first backup
// - bring up tablet_replica2 in the new keyspace
// - check that new tablet does not have data created after backup1
// - create second recovery keyspace after second backup
// - bring up tablet_replica3 in second keyspace
// - check that new tablet has data created after backup1 but not data created after backup2
// 1. create a shard with primary and replica1 only
// - run InitShardPrimary
// - insert some data
// 2. take a backup
// 3.create a recovery keyspace after first backup
// - bring up tablet_replica2 in the new keyspace
// - check that new tablet has data from backup1
// 4. insert more data on the primary
// 5. take another backup
// 6. create a recovery keyspace after second backup
// - bring up tablet_replica3 in the new keyspace
// - check that new tablet has data from backup2
// 7. insert more data on the primary
// 8. take another backup
// 9. create a recovery keyspace after second backup again
// - bring up tablet_replica4 in the new keyspace
// - check that new tablet has data from backup2 but not backup3
// - check that vtgate queries work correctly
func TestRecoveryImpl(t *testing.T) {
defer cluster.PanicHandler(t)
defer tabletsTeardown()
verifyInitialReplication(t)

// take first backup of value = test1
err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias)
assert.NoError(t, err)

backups := listBackups(t)
require.Equal(t, len(backups), 1)
assert.Contains(t, backups[0], replica1.Alias)

_, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true)
assert.NoError(t, err)
cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 2)

err = localCluster.VtctlclientProcess.ApplyVSchema(keyspaceName, vSchema)
assert.NoError(t, err)

output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetVSchema", keyspaceName)
assert.NoError(t, err)
assert.Contains(t, output, "vt_insert_test")

recovery.RestoreTablet(t, localCluster, replica2, recoveryKS1, "0", keyspaceName, commonTabletArg)
// restore with latest backup
restoreTime := time.Now().UTC()
recovery.RestoreTablet(t, localCluster, replica2, recoveryKS1, "0", keyspaceName, commonTabletArg, restoreTime)

output, err = localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetSrvVSchema", cell)
assert.NoError(t, err)
Expand All @@ -227,50 +232,67 @@ func TestRecoveryImpl(t *testing.T) {

cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 1)

//verify that restored replica has value = test1
qr, err := replica2.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
assert.NoError(t, err)
assert.Equal(t, "test1", qr.Rows[0][0].ToString())

cluster.VerifyLocalMetadata(t, replica2, recoveryKS1, shardName, cell)

// insert new row on primary
_, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true)
assert.NoError(t, err)
cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 2)

// update the original row in primary
_, err = primary.VttabletProcess.QueryTablet("update vt_insert_test set msg = 'msgx1' where id = 1", keyspaceName, true)
assert.NoError(t, err)

//verify that primary has new value
qr, err := primary.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
qr, err = primary.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
assert.NoError(t, err)
assert.Equal(t, "msgx1", qr.Rows[0][0].ToString())

//verify that restored replica has old value
qr, err = replica2.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
assert.NoError(t, err)
assert.Equal(t, "test1", qr.Rows[0][0].ToString())
// check that replica1, used for the backup, has the new value
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

err = localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias)
assert.NoError(t, err)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

_, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test3')", keyspaceName, true)
assert.NoError(t, err)
cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 3)
for {
qr, err = replica1.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
assert.NoError(t, err)
if qr.Rows[0][0].ToString() == "msgx1" {
break
}

recovery.RestoreTablet(t, localCluster, replica3, recoveryKS2, "0", keyspaceName, commonTabletArg)
select {
case <-ctx.Done():
t.Error("timeout waiting for new value to be replicated on replica 1")
break
case <-ticker.C:
}
}

output, err = localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetVSchema", recoveryKS2)
// take second backup of value = msgx1
err = localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias)
assert.NoError(t, err)
assert.Contains(t, output, "vt_insert_test")

cluster.VerifyRowsInTablet(t, replica3, keyspaceName, 2)
// restore to first backup
recovery.RestoreTablet(t, localCluster, replica3, recoveryKS2, "0", keyspaceName, commonTabletArg, restoreTime)

// update the original row in primary
_, err = primary.VttabletProcess.QueryTablet("update vt_insert_test set msg = 'msgx2' where id = 1", keyspaceName, true)
output, err = localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetVSchema", recoveryKS2)
assert.NoError(t, err)
assert.Contains(t, output, "vt_insert_test")

//verify that primary has new value
qr, err = primary.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
assert.NoError(t, err)
assert.Equal(t, "msgx2", qr.Rows[0][0].ToString())
// only one row in first backup
cluster.VerifyRowsInTablet(t, replica3, keyspaceName, 1)

//verify that restored replica has old value
//verify that restored replica has value = test1
qr, err = replica3.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true)
assert.NoError(t, err)
assert.Equal(t, "msgx1", qr.Rows[0][0].ToString())
assert.Equal(t, "test1", qr.Rows[0][0].ToString())

vtgateInstance := localCluster.NewVtgateInstance()
vtgateInstance.TabletTypesToWait = "REPLICA"
Expand All @@ -295,26 +317,19 @@ func TestRecoveryImpl(t *testing.T) {
session := vtgateConn.Session("@replica", nil)

//check that vtgate doesn't route queries to new tablet
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(3)")
recovery.VerifyQueriesUsingVtgate(t, session, "select msg from vt_insert_test where id = 1", `VARCHAR("msgx2")`)
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(2)")
recovery.VerifyQueriesUsingVtgate(t, session, "select msg from vt_insert_test where id = 1", `VARCHAR("msgx1")`)
recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select count(*) from %s.vt_insert_test", recoveryKS1), "INT64(1)")
recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select msg from %s.vt_insert_test where id = 1", recoveryKS1), `VARCHAR("test1")`)
recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select count(*) from %s.vt_insert_test", recoveryKS2), "INT64(2)")
recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select msg from %s.vt_insert_test where id = 1", recoveryKS2), `VARCHAR("msgx1")`)
recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select count(*) from %s.vt_insert_test", recoveryKS2), "INT64(1)")
recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select msg from %s.vt_insert_test where id = 1", recoveryKS2), `VARCHAR("test1")`)

// check that new keyspace is accessible with 'use ks'
cluster.ExecuteQueriesUsingVtgate(t, session, "use "+recoveryKS1+"@replica")
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(1)")

cluster.ExecuteQueriesUsingVtgate(t, session, "use "+recoveryKS2+"@replica")
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(2)")

// check that new tablet is accessible with use `ks:shard`
cluster.ExecuteQueriesUsingVtgate(t, session, "use `"+recoveryKS1+":0@replica`")
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(1)")

cluster.ExecuteQueriesUsingVtgate(t, session, "use `"+recoveryKS2+":0@replica`")
recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(2)")
}

// verifyInitialReplication will create schema in primary, insert some data to primary and verify the same data in replica.
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletmanager/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L
return err
}

// If backupTime not specified in restore_from_backup_ts, check to see if the keyspace has a SNAPSHOT time
// to restore from (for PITR)
if backupTime.IsZero() {
backupTime = logutil.ProtoToTime(keyspaceInfo.SnapshotTime)
}

// For a SNAPSHOT keyspace, we have to look for backups of BaseKeyspace
// so we will pass the BaseKeyspace in RestoreParams instead of tablet.Keyspace
if keyspaceInfo.KeyspaceType == topodatapb.KeyspaceType_SNAPSHOT {
Expand Down