Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into lookupvindex_cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Sep 23, 2024
2 parents 5bd4b7e + 95f2e3e commit 2b0d2f3
Show file tree
Hide file tree
Showing 77 changed files with 7,990 additions and 4,796 deletions.
50 changes: 24 additions & 26 deletions go/cmd/vtctldclient/command/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,44 @@ import (
"github.com/spf13/cobra"

"vitess.io/vitess/go/cmd/vtctldclient/cli"
querypb "vitess.io/vitess/go/vt/proto/query"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

var (
DistributedTransaction = &cobra.Command{
Use: "DistributedTransaction <cmd>",
Use: "DistributedTransaction [command] [command-flags]",
Short: "Perform commands on distributed transaction",
Args: cobra.MinimumNArgs(2),
Args: cobra.ExactArgs(1),

DisableFlagsInUseLine: true,
}

unresolvedTransactionsOptions = struct {
Keyspace string
AbandonAge int64 // in seconds
}{}

// GetUnresolvedTransactions makes an GetUnresolvedTransactions gRPC call to a vtctld.
GetUnresolvedTransactions = &cobra.Command{
Use: "list <keyspace>",
Use: "unresolved-list --keyspace <keyspace> --abandon-age <abandon_time_seconds>",
Short: "Retrieves unresolved transactions for the given keyspace.",
Aliases: []string{"List"},
Args: cobra.ExactArgs(1),
Args: cobra.NoArgs,
RunE: commandGetUnresolvedTransactions,

DisableFlagsInUseLine: true,
}

concludeTransactionOptions = struct {
Dtid string
}{}

// ConcludeTransaction makes a ConcludeTransaction gRPC call to a vtctld.
ConcludeTransaction = &cobra.Command{
Use: "conclude <dtid> [<keyspace/shard> ...]",
Use: "conclude --dtid <dtid>",
Short: "Concludes the unresolved transaction by rolling back the prepared transaction on each participating shard and removing the transaction metadata record.",
Aliases: []string{"Conclude"},
Args: cobra.MinimumNArgs(1),
Args: cobra.NoArgs,
RunE: commandConcludeTransaction,

DisableFlagsInUseLine: true,
Expand All @@ -72,10 +80,10 @@ const (
func commandGetUnresolvedTransactions(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

keyspace := cmd.Flags().Arg(0)
resp, err := client.GetUnresolvedTransactions(commandCtx,
&vtctldatapb.GetUnresolvedTransactionsRequest{
Keyspace: keyspace,
Keyspace: unresolvedTransactionsOptions.Keyspace,
AbandonAge: unresolvedTransactionsOptions.AbandonAge,
})
if err != nil {
return err
Expand All @@ -89,31 +97,17 @@ func commandGetUnresolvedTransactions(cmd *cobra.Command, args []string) error {
return nil
}

func commandConcludeTransaction(cmd *cobra.Command, args []string) error {
allArgs := cmd.Flags().Args()
shards, err := cli.ParseKeyspaceShards(allArgs[1:])
if err != nil {
return err
}
func commandConcludeTransaction(cmd *cobra.Command, args []string) (err error) {
cli.FinishedParsing(cmd)

dtid := allArgs[0]
var participants []*querypb.Target
for _, shard := range shards {
participants = append(participants, &querypb.Target{
Keyspace: shard.Keyspace,
Shard: shard.Name,
})
}
output := ConcludeTransactionOutput{
Dtid: dtid,
Dtid: concludeTransactionOptions.Dtid,
Message: concludeSuccess,
}

_, err = client.ConcludeTransaction(commandCtx,
&vtctldatapb.ConcludeTransactionRequest{
Dtid: dtid,
Participants: participants,
Dtid: concludeTransactionOptions.Dtid,
})
if err != nil {
output.Message = concludeFailure
Expand All @@ -127,7 +121,11 @@ func commandConcludeTransaction(cmd *cobra.Command, args []string) error {
}

func init() {
GetUnresolvedTransactions.Flags().StringVarP(&unresolvedTransactionsOptions.Keyspace, "keyspace", "k", "", "unresolved transactions list for the given keyspace.")
GetUnresolvedTransactions.Flags().Int64VarP(&unresolvedTransactionsOptions.AbandonAge, "abandon-age", "a", 0, "unresolved transactions list which are older than the specified age(in seconds).")
DistributedTransaction.AddCommand(GetUnresolvedTransactions)

ConcludeTransaction.Flags().StringVarP(&concludeTransactionOptions.Dtid, "dtid", "d", "", "conclude transaction for the given distributed transaction ID.")
DistributedTransaction.AddCommand(ConcludeTransaction)

Root.AddCommand(DistributedTransaction)
Expand Down
11 changes: 11 additions & 0 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,17 @@ func GetReplicaGtidPurged(t *testing.T, replicaIndex int) string {
return row.AsString("gtid_purged", "")
}

func ReconnectReplicaToPrimary(t *testing.T, replicaIndex int) {
query := fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='localhost', SOURCE_PORT=%d, SOURCE_USER='vt_repl', SOURCE_AUTO_POSITION = 1", primary.MySQLPort)
replica := getReplica(t, replicaIndex)
_, err := replica.VttabletProcess.QueryTablet("stop replica", keyspaceName, true)
require.NoError(t, err)
_, err = replica.VttabletProcess.QueryTablet(query, keyspaceName, true)
require.NoError(t, err)
_, err = replica.VttabletProcess.QueryTablet("start replica", keyspaceName, true)
require.NoError(t, err)
}

func InsertRowOnPrimary(t *testing.T, hint string) {
if hint == "" {
hint = textutil.RandomHash()[:12]
Expand Down
23 changes: 19 additions & 4 deletions go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,24 @@ type testedBackupTimestampInfo struct {
postTimestamp time.Time
}

func waitForReplica(t *testing.T, replicaIndex int) {
// waitForReplica waits for the replica to have same row set as on primary.
func waitForReplica(t *testing.T, replicaIndex int) int {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
pMsgs := ReadRowsFromPrimary(t)
for {
rMsgs := ReadRowsFromReplica(t, replicaIndex)
if len(pMsgs) == len(rMsgs) {
// success
return
return len(pMsgs)
}
select {
case <-ctx.Done():
assert.FailNow(t, "timeout waiting for replica to catch up")
return
case <-time.After(time.Second):
return 0
case <-ticker.C:
//
}
}
Expand Down Expand Up @@ -289,6 +292,12 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
if sampleTestedBackupPos == "" {
sampleTestedBackupPos = pos
}
t.Run("post-pitr, wait for replica to catch up", func(t *testing.T) {
// Replica is DRAINED and does not have replication configuration.
// We now connect the replica to the primary and validate it's able to catch up.
ReconnectReplicaToPrimary(t, 0)
waitForReplica(t, 0)
})
})
}
}
Expand Down Expand Up @@ -539,6 +548,12 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
if sampleTestedBackupIndex < 0 {
sampleTestedBackupIndex = backupIndex
}
t.Run("post-pitr, wait for replica to catch up", func(t *testing.T) {
// Replica is DRAINED and does not have replication configuration.
// We now connect the replica to the primary and validate it's able to catch up.
ReconnectReplicaToPrimary(t, 0)
waitForReplica(t, 0)
})
} else {
numFailedRestores++
}
Expand Down
17 changes: 10 additions & 7 deletions go/test/endtoend/tabletmanager/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,22 @@ func TestTabletCommands(t *testing.T) {
})

t.Run("GetUnresolvedTransactions", func(t *testing.T) {
_, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "list", keyspaceName)
_, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "unresolved-list",
"--keyspace", keyspaceName)
require.NoError(t, err)
})
t.Run("ConcludeTransaction", func(t *testing.T) {
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "conclude", "ks:0:1234")
assert.NoError(t, err)
assert.Contains(t, output, "Successfully concluded the distributed transaction")
t.Run("GetUnresolvedTransactions with age threshold", func(t *testing.T) {
_, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "unresolved-list",
"--keyspace", keyspaceName,
"--abandon-age", "32")
require.NoError(t, err)
})
t.Run("ConcludeTransaction with participants", func(t *testing.T) {
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "conclude", "ks:0:1234", "ks/0")
t.Run("ConcludeTransaction", func(t *testing.T) {
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "conclude", "--dtid", "ks:0:1234")
assert.NoError(t, err)
assert.Contains(t, output, "Successfully concluded the distributed transaction")
})

// check Ping / RefreshState / RefreshStateByShard
err = clusterInstance.VtctldClientProcess.ExecuteCommand("PingTablet", primaryTablet.Alias)
require.Nil(t, err, "error should be Nil")
Expand Down
89 changes: 88 additions & 1 deletion go/test/endtoend/vreplication/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package vreplication

import (
"fmt"
"strings"
"testing"

"github.com/tidwall/gjson"

"vitess.io/vitess/go/test/endtoend/cluster"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -165,11 +168,18 @@ func TestVtctlMigrate(t *testing.T) {
// However now we need to create an external Vitess cluster. For this we need a different VTDATAROOT and
// hence the VTDATAROOT env variable gets overwritten.
// Each time we need to create vt processes in the "other" cluster we need to set the appropriate VTDATAROOT
func TestVtctldMigrate(t *testing.T) {
func TestVtctldMigrateUnsharded(t *testing.T) {
vc = NewVitessCluster(t, nil)

oldDefaultReplicas := defaultReplicas
oldDefaultRdonly := defaultRdonly
defaultReplicas = 0
defaultRdonly = 0
defer func() {
defaultReplicas = oldDefaultReplicas
defaultRdonly = oldDefaultRdonly
}()

defer vc.TearDown()

defaultCell := vc.Cells[vc.CellNames[0]]
Expand Down Expand Up @@ -299,3 +309,80 @@ func TestVtctldMigrate(t *testing.T) {
require.Errorf(t, err, "there is no vitess cluster named ext1")
})
}

// TestVtctldMigrate adds a test for a sharded cluster to validate a fix for a bug where the target keyspace name
// doesn't match that of the source cluster. The test migrates from a cluster with keyspace customer to an "external"
// cluster with keyspace rating.
func TestVtctldMigrateSharded(t *testing.T) {
oldDefaultReplicas := defaultReplicas
oldDefaultRdonly := defaultRdonly
defaultReplicas = 1
defaultRdonly = 1
defer func() {
defaultReplicas = oldDefaultReplicas
defaultRdonly = oldDefaultRdonly
}()

setSidecarDBName("_vt")
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
vc = setupCluster(t)
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
defer vc.TearDown()
setupCustomerKeyspace(t)
createMoveTablesWorkflow(t, "customer,Lead,datze,customer2")
tstWorkflowSwitchReadsAndWrites(t)
tstWorkflowComplete(t)

var err error
// create external cluster
extCell := "extcell1"
extCells := []string{extCell}
extVc := NewVitessCluster(t, &clusterOptions{
cells: extCells,
clusterConfig: externalClusterConfig,
})
defer extVc.TearDown()

setupExtKeyspace(t, extVc, "rating", extCell)
err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "-80")
require.NoError(t, err)
err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "80-")
require.NoError(t, err)
verifyClusterHealth(t, extVc)
extVtgateConn := getConnection(t, extVc.ClusterConfig.hostname, extVc.ClusterConfig.vtgateMySQLPort)
defer extVtgateConn.Close()

currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Migrate
var output string
if output, err = extVc.VtctldClient.ExecuteCommandWithOutput("Mount", "register", "--name=external", "--topo-type=etcd2",
fmt.Sprintf("--topo-server=localhost:%d", vc.ClusterConfig.topoPort), "--topo-root=/vitess/global"); err != nil {
require.FailNow(t, "Mount command failed with %+v : %s\n", err, output)
}
ksWorkflow := "rating.e1"
if output, err = extVc.VtctldClient.ExecuteCommandWithOutput("Migrate",
"--target-keyspace", "rating", "--workflow", "e1",
"create", "--source-keyspace", "customer", "--mount-name", "external", "--all-tables", "--cells=zone1",
"--tablet-types=primary,replica"); err != nil {
require.FailNow(t, "Migrate command failed with %+v : %s\n", err, output)
}
waitForWorkflowState(t, extVc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
// this is because currently doVtctldclientVDiff is using the global vc :-( and we want to run a diff on the extVc cluster
vc = extVc
doVtctldclientVDiff(t, "rating", "e1", "zone1", nil)
}

func setupExtKeyspace(t *testing.T, vc *VitessCluster, ksName, cellName string) {
numReplicas := 1
shards := []string{"-80", "80-"}
if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells[cellName]}, ksName, strings.Join(shards, ","),
customerVSchema, customerSchema, numReplicas, 0, 1200, nil); err != nil {
t.Fatal(err)
}
vtgate := vc.Cells[cellName].Vtgates[0]
for _, shard := range shards {
err := cluster.WaitForHealthyShard(vc.VtctldClient, ksName, shard)
require.NoError(t, err)
require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", ksName, shard), numReplicas, waitTimeout))
}
}
3 changes: 3 additions & 0 deletions go/test/endtoend/vtgate/queries/timeout/timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func TestQueryTimeoutWithDual(t *testing.T) {
_, err = utils.ExecAllowError(t, mcmp.VtConn, "select /*vt+ QUERY_TIMEOUT_MS=15 */ sleep(0.001) from dual")
assert.NoError(t, err)
// infinite query timeout overriding all defaults
utils.SkipIfBinaryIsBelowVersion(t, 21, "vttablet")
_, err = utils.ExecAllowError(t, mcmp.VtConn, "select /*vt+ QUERY_TIMEOUT_MS=0 */ sleep(5) from dual")
assert.NoError(t, err)
}
Expand Down Expand Up @@ -131,6 +132,7 @@ func TestQueryTimeoutWithShardTargeting(t *testing.T) {
}

func TestQueryTimeoutWithoutVTGateDefault(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 21, "vttablet")
// disable query timeout
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs,
"--query-timeout", "0")
Expand Down Expand Up @@ -184,6 +186,7 @@ func TestQueryTimeoutWithoutVTGateDefault(t *testing.T) {
// and not just individual routes.
func TestOverallQueryTimeout(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 21, "vtgate")
utils.SkipIfBinaryIsBelowVersion(t, 21, "vttablet")
mcmp, closer := start(t)
defer closer()

Expand Down
Loading

0 comments on commit 2b0d2f3

Please sign in to comment.