Skip to content

Commit

Permalink
Refactor and add more e2e testing
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Dec 3, 2023
1 parent 87ec1db commit 673e334
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 88 deletions.
1 change: 1 addition & 0 deletions examples/common/scripts/vttablet-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ vttablet \
--heartbeat_enable \
--heartbeat_interval=250ms \
--heartbeat_on_demand_duration=5s \
--vreplication_experimental_flags=7 \
> $VTDATAROOT/$tablet_dir/vttablet.out 2>&1 &

# Block waiting for the tablet to be listening
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
Expand Down Expand Up @@ -184,6 +185,9 @@ func TestMain(m *testing.M) {
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
"--watch_replication_stream",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
clusterInstance.VtGateExtraArgs = []string{
"--ddl_strategy", "online",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
Expand Down Expand Up @@ -436,6 +437,9 @@ func TestMain(m *testing.M) {
"--migration_check_interval", "5s",
"--vstream_packet_size", "4096", // Keep this value small and below 10k to ensure multilple vstream iterations
"--watch_replication_stream",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
clusterInstance.VtGateExtraArgs = []string{
"--ddl_strategy", "online",
Expand Down
10 changes: 8 additions & 2 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vttablet"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
Expand All @@ -38,8 +39,13 @@ import (
// It inserts initial data, then simulates load. We insert both child rows with foreign keys and those without,
// i.e. with foreign_key_checks=0.
func TestFKWorkflow(t *testing.T) {
// ensure that there are multiple copy phase cycles per table
extraVTTabletArgs = []string{"--vstream_packet_size=256"}
extraVTTabletArgs = []string{
// Ensure that there are multiple copy phase cycles per table.
"--vstream_packet_size=256",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
defer func() { extraVTTabletArgs = nil }()

cellName := "zone"
Expand Down
13 changes: 7 additions & 6 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func TestVreplicationCopyThrottling(t *testing.T) {
func TestBasicVreplicationWorkflow(t *testing.T) {
ogflags := extraVTTabletArgs
defer func() { extraVTTabletArgs = ogflags }()
// Test VPlayer batching mode.
extraVTTabletArgs = append(extraVTTabletArgs, fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching))
sourceKsOpts["DBTypeVersion"] = "mysql-8.0"
Expand Down Expand Up @@ -627,10 +628,11 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) {
cells := []string{"zone1", "zone2"}
mainClusterConfig.vreplicationCompressGTID = true
oldVTTabletExtraArgs := extraVTTabletArgs
// Enable the bulk delete vplayer optimization in this test, which is disabled by default, to confirm that we
// don't have a regression due to the bulk delete functionality of this functionality.
extraVTTabletArgs = append(extraVTTabletArgs, fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching))
extraVTTabletArgs = append(extraVTTabletArgs,
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
)
defer func() {
mainClusterConfig.vreplicationCompressGTID = false
extraVTTabletArgs = oldVTTabletExtraArgs
Expand Down Expand Up @@ -787,7 +789,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
}
require.Equal(t, true, dec80Replicated)

// insert multiple rows in the loadtest table and immediately delete them to confirm that bulk delete
// Insert multiple rows in the loadtest table and immediately delete them to confirm that bulk delete
// works the same way with the vplayer optimization enabled and disabled. Currently this optimization
// is disabled by default, but enabled in TestCellAliasVreplicationWorkflow.
execVtgateQuery(t, vtgateConn, sourceKs, "insert into loadtest(id, name) values(10001, 'tempCustomer'), (10002, 'tempCustomer2'), (10003, 'tempCustomer3'), (10004, 'tempCustomer4')")
Expand Down Expand Up @@ -941,7 +943,6 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
assertQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery2, matchInsertQuery2)

execVtgateQuery(t, vtgateConn, "customer", "delete from customer where name like 'tempCustomer%'")

waitForRowCountInTablet(t, customerTab1, "customer", "customer", 1)
waitForRowCountInTablet(t, customerTab2, "customer", "customer", 2)
waitForRowCount(t, vtgateConn, "customer", "customer.customer", 3)
Expand Down
4 changes: 4 additions & 0 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ var (

// BlplQuery is the key for the stats map.
BlplQuery = "Query"
// BlplMultiQuery is the key for the stats map.
BlplMultiQuery = "MultiQuery"
// BlplTransaction is the key for the stats map.
BlplTransaction = "Transaction"
// BlplBatchTransaction is the key for the stats map.
BlplBatchTransaction = "BatchTransaction"
)

// Stats is the internal stats of a player. It is a different
Expand Down
7 changes: 4 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vttablet"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// ReplicatorPlan is the execution plan for the replicator. It contains
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ func (vbc *vcopierCopyWorker) insertRows(ctx context.Context, rows []*querypb.Ro
&vbc.sqlbuffer,
rows,
func(sql string) (*sqltypes.Result, error) {
return vbc.vdbClient.ExecuteWithRetry(ctx, sql, -1)
return vbc.vdbClient.ExecuteWithRetry(ctx, sql)
},
)
}
Expand Down
56 changes: 29 additions & 27 deletions go/vt/vttablet/tabletmanager/vreplication/vdbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ type vdbClient struct {
InTransaction bool
startTime time.Time
queries []string
queriesPos int64
batchSize int64
maxBatchSize int64
}

func newVDBClient(dbclient binlogplayer.DBClient, stats *binlogplayer.Stats) *vdbClient {
Expand All @@ -55,6 +57,10 @@ func (vc *vdbClient) Begin() error {
if err := vc.DBClient.Begin(); err != nil {
return err
}
// If we're batching, we only batch the contents of the
// transaction, which starts with the begin and ends with
// the commit.
vc.queriesPos = int64(len(vc.queries))
vc.queries = append(vc.queries, "begin")
vc.InTransaction = true
vc.startTime = time.Now()
Expand All @@ -72,27 +78,22 @@ func (vc *vdbClient) Commit() error {
return nil
}

func (vc *vdbClient) CommitQueriesInBatch(ctx context.Context) error {
log.Errorf("DEBUG: CommitQueriesInBatch: %s", strings.Join(vc.queries, ";"))
// CommitQueryBatch sends the current transaction's query batch -- which
// is often the full contents of the transaction, unless we've crossed
// the maxBatchSize one ore more times -- down the wire to the database,
// including the final commit.
func (vc *vdbClient) CommitQueryBatch(ctx context.Context) error {
log.Errorf("DEBUG: CommitQueryBatch: %s", strings.Join(vc.queries[vc.queriesPos:], ";"))
vc.queries = append(vc.queries, "commit")
queries := strings.Join(vc.queries, ";")
queries := strings.Join(vc.queries[vc.queriesPos:], ";")
for _, err := vc.DBClient.ExecuteFetchMulti(queries, -1); err != nil; {
if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Number() == sqlerror.ERLockDeadlock || sqlErr.Number() == sqlerror.ERLockWaitTimeout {
log.Infof("retryable error: %v, waiting for %v and retrying", sqlErr, dbLockRetryDelay)
time.Sleep(dbLockRetryDelay)
select {
case <-ctx.Done():
return io.EOF
default:
}
continue
}
return err
}
vc.InTransaction = false
vc.queries = nil
vc.queriesPos = 0
vc.batchSize = 0
vc.stats.Timings.Record(binlogplayer.BlplTransaction, vc.startTime)
vc.stats.Timings.Record(binlogplayer.BlplBatchTransaction, vc.startTime)
return nil
}

Expand All @@ -119,18 +120,21 @@ func (vc *vdbClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result,
return vc.DBClient.ExecuteFetch(query, maxrows)
}

func (vc *vdbClient) AddBatchQuery(query string, maxBatchSize int64) error {
// AddBatchQuery adds the query to the current transaction's query
// batch. If this new query would cause the current batch to exceed
// the maxBatchSize, then the current unsent batch is sent down the
// wire and this query will be included in the next batch.
func (vc *vdbClient) AddBatchQuery(query string) error {
if !vc.InTransaction {
return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "cannot batch query outside of a transaction: %s", query)
}

addedSize := int64(len(query)) + 1 // plus 1 for the semicolon
if vc.batchSize+addedSize > maxBatchSize {
log.Errorf("DEBUG: AddBatchQuery: %s ; but over maxBatchSize of %d", query, maxBatchSize)
if vc.batchSize+addedSize > vc.maxBatchSize {
log.Errorf("DEBUG: AddBatchQuery: %s ; but over maxBatchSize of %d", query, vc.maxBatchSize)
if _, err := vc.ExecuteQueryBatch(); err != nil {
return err
}
return vc.Begin()
}
log.Errorf("DEBUG: AddBatchQuery: %s", query)
vc.queries = append(vc.queries, query)
Expand All @@ -139,15 +143,17 @@ func (vc *vdbClient) AddBatchQuery(query string, maxBatchSize int64) error {
return nil
}

// ExecuteQueryBatch sends the transaction's current batch of queries
// down the wire to the database.
func (vc *vdbClient) ExecuteQueryBatch() ([]*sqltypes.Result, error) {
defer vc.stats.Timings.Record(binlogplayer.BlplQuery, time.Now())
defer vc.stats.Timings.Record(binlogplayer.BlplMultiQuery, time.Now())

log.Errorf("DEBUG: ExecuteQueryBatch: %s", strings.Join(vc.queries, ";"))
qrs, err := vc.DBClient.ExecuteFetchMulti(strings.Join(vc.queries, ";"), -1)
log.Errorf("DEBUG: ExecuteQueryBatch: %s", strings.Join(vc.queries[vc.queriesPos:], ";"))
qrs, err := vc.DBClient.ExecuteFetchMulti(strings.Join(vc.queries[vc.queriesPos:], ";"), -1)
if err != nil {
return nil, err
}
vc.queries = nil
vc.queriesPos += int64(len(vc.queries[vc.queriesPos:]))
vc.batchSize = 0

return qrs, nil
Expand All @@ -159,11 +165,7 @@ func (vc *vdbClient) Execute(query string) (*sqltypes.Result, error) {
return vc.ExecuteFetch(query, relayLogMaxItems)
}

func (vc *vdbClient) ExecuteWithRetry(ctx context.Context, query string, maxBatchSize int64) (*sqltypes.Result, error) {
if maxBatchSize > 0 {
return nil, vc.AddBatchQuery(query, maxBatchSize)
}

func (vc *vdbClient) ExecuteWithRetry(ctx context.Context, query string) (*sqltypes.Result, error) {
qr, err := vc.Execute(query)
for err != nil {
if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Number() == sqlerror.ERLockDeadlock || sqlErr.Number() == sqlerror.ERLockWaitTimeout {
Expand Down
Loading

0 comments on commit 673e334

Please sign in to comment.