diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go index 69f8b8a63be..09692930c5c 100644 --- a/go/test/endtoend/vreplication/fk_test.go +++ b/go/test/endtoend/vreplication/fk_test.go @@ -222,7 +222,7 @@ func (ls *fkLoadSimulator) simulateLoad() { func (ls *fkLoadSimulator) getNumRowsParent(vtgateConn *mysql.Conn) int { t := ls.t - qr := execQueryWithDatabase(t, vtgateConn, "fksource", "SELECT COUNT(*) FROM parent") + qr := execVtgateQuery(t, vtgateConn, "fksource", "SELECT COUNT(*) FROM parent") require.NotNil(t, qr) numRows, err := strconv.Atoi(qr.Rows[0][0].ToString()) require.NoError(t, err) @@ -296,7 +296,7 @@ func (ls *fkLoadSimulator) exec(query string) *sqltypes.Result { t := ls.t vtgateConn, closeConn := getVTGateConn() defer closeConn() - qr := execQueryWithDatabase(t, vtgateConn, "fksource", query) + qr := execVtgateQuery(t, vtgateConn, "fksource", query) require.NotNil(t, qr) return qr } diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 8126b41fa76..eca4c312ae7 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -72,7 +72,7 @@ func execMultipleQueries(t *testing.T, conn *mysql.Conn, database string, lines if strings.HasPrefix(query, "--") { continue } - execQueryWithDatabase(t, conn, database, string(query)) + execVtgateQuery(t, conn, database, string(query)) } } @@ -134,7 +134,7 @@ func getConnection(t *testing.T, hostname string, port int) *mysql.Conn { return conn } -func execQueryWithDatabase(t *testing.T, conn *mysql.Conn, database string, query string) *sqltypes.Result { +func execVtgateQuery(t *testing.T, conn *mysql.Conn, database string, query string) *sqltypes.Result { if strings.TrimSpace(query) == "" { return nil } @@ -158,7 +158,7 @@ func waitForQueryResult(t *testing.T, conn *mysql.Conn, database string, query s timer := time.NewTimer(defaultTimeout) defer timer.Stop() for { - qr := execQueryWithDatabase(t, conn, database, query) + qr := execVtgateQuery(t, conn, database, query) require.NotNil(t, qr) if want == fmt.Sprintf("%v", qr.Rows) { return @@ -232,7 +232,7 @@ func waitForNoWorkflowLag(t *testing.T, vc *VitessCluster, keyspace, worfklow st // verifyNoInternalTables can e.g. be used to confirm that no internal tables were // copied from a source to a target during a MoveTables or Reshard operation. func verifyNoInternalTables(t *testing.T, conn *mysql.Conn, keyspaceShard string) { - qr := execQueryWithDatabase(t, conn, keyspaceShard, "show tables") + qr := execVtgateQuery(t, conn, keyspaceShard, "show tables") require.NotNil(t, qr) require.NotNil(t, qr.Rows) for _, row := range qr.Rows { @@ -247,7 +247,7 @@ func waitForRowCount(t *testing.T, conn *mysql.Conn, database string, table stri timer := time.NewTimer(defaultTimeout) defer timer.Stop() for { - qr := execQueryWithDatabase(t, conn, database, query) + qr := execVtgateQuery(t, conn, database, query) require.NotNil(t, qr) if wantRes == fmt.Sprintf("%v", qr.Rows) { return @@ -319,7 +319,7 @@ func executeOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletPro count0, body0 := getQueryCount(t, queryStatsURL, matchQuery) - qr := execQueryWithDatabase(t, conn, ksName, query) + qr := execVtgateQuery(t, conn, ksName, query) require.NotNil(t, qr) count1, body1 := getQueryCount(t, queryStatsURL, matchQuery) @@ -595,15 +595,10 @@ func expectNumberOfStreams(t *testing.T, vtgateConn *mysql.Conn, name string, wo // confirmAllStreamsRunning confirms that all of the workflow's streams are // in the running state. -func confirmAllStreamsRunning(t *testing.T, keyspace, shard string) { +func confirmAllStreamsRunning(t *testing.T, vtgateConn *mysql.Conn, database string) { query := sqlparser.BuildParsedQuery("select count(*) from %s.vreplication where state != '%s'", sidecarDBIdentifier, binlogdatapb.VReplicationWorkflowState_Running.String()).Query - tablet := vc.getPrimaryTablet(t, keyspace, shard) - // Query the tablet's mysqld directly as the target may have denied table entries. - dbc, err := tablet.TabletConn(keyspace, true) - require.NoError(t, err) - defer dbc.Close() - waitForQueryResult(t, dbc, sidecarDBName, query, `[[INT64(0)]]`) + waitForQueryResult(t, vtgateConn, database, query, `[[INT64(0)]]`) } func printShardPositions(vc *VitessCluster, ksShards []string) { @@ -1005,7 +1000,7 @@ func vexplain(t *testing.T, database, query string) *VExplainPlan { vtgateConn := vc.GetVTGateConn(t) defer vtgateConn.Close() - qr := execQueryWithDatabase(t, vtgateConn, database, fmt.Sprintf("vexplain %s", query)) + qr := execVtgateQuery(t, vtgateConn, database, fmt.Sprintf("vexplain %s", query)) require.NotNil(t, qr) require.Equal(t, 1, len(qr.Rows)) json := qr.Rows[0][0].ToString() diff --git a/go/test/endtoend/vreplication/initial_data_test.go b/go/test/endtoend/vreplication/initial_data_test.go index 22cd9c50c33..ea34ef7fddf 100644 --- a/go/test/endtoend/vreplication/initial_data_test.go +++ b/go/test/endtoend/vreplication/initial_data_test.go @@ -32,9 +32,9 @@ func insertInitialData(t *testing.T) { log.Infof("Inserting initial data") lines, _ := os.ReadFile("unsharded_init_data.sql") execMultipleQueries(t, vtgateConn, "product:0", string(lines)) - execQueryWithDatabase(t, vtgateConn, "product:0", "insert into customer_seq(id, next_id, cache) values(0, 100, 100);") - execQueryWithDatabase(t, vtgateConn, "product:0", "insert into order_seq(id, next_id, cache) values(0, 100, 100);") - execQueryWithDatabase(t, vtgateConn, "product:0", "insert into customer_seq2(id, next_id, cache) values(0, 100, 100);") + execVtgateQuery(t, vtgateConn, "product:0", "insert into customer_seq(id, next_id, cache) values(0, 100, 100);") + execVtgateQuery(t, vtgateConn, "product:0", "insert into order_seq(id, next_id, cache) values(0, 100, 100);") + execVtgateQuery(t, vtgateConn, "product:0", "insert into customer_seq2(id, next_id, cache) values(0, 100, 100);") log.Infof("Done inserting initial data") waitForRowCount(t, vtgateConn, "product:0", "product", 2) @@ -52,12 +52,12 @@ func insertJSONValues(t *testing.T) { // insert null value combinations vtgateConn, closeConn := getVTGateConn() defer closeConn() - execQueryWithDatabase(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(1, \"{}\")") - execQueryWithDatabase(t, vtgateConn, "product:0", "insert into json_tbl(id, j1, j3) values(2, \"{}\", \"{}\")") - execQueryWithDatabase(t, vtgateConn, "product:0", "insert into json_tbl(id, j2, j3) values(3, \"{}\", \"{}\")") - execQueryWithDatabase(t, vtgateConn, "product:0", "insert into json_tbl(id, j1, j2, j3) values(4, NULL, 'null', '\"null\"')") - execQueryWithDatabase(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(5, JSON_QUOTE('null'))") - execQueryWithDatabase(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(6, '{}')") + execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(1, \"{}\")") + execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j1, j3) values(2, \"{}\", \"{}\")") + execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j2, j3) values(3, \"{}\", \"{}\")") + execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j1, j2, j3) values(4, NULL, 'null', '\"null\"')") + execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(5, JSON_QUOTE('null'))") + execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(6, '{}')") id := 8 // 6 inserted above and one after copy phase is done @@ -68,7 +68,7 @@ func insertJSONValues(t *testing.T) { j1 := rand.IntN(numJsonValues) j2 := rand.IntN(numJsonValues) query := fmt.Sprintf(q, id, jsonValues[j1], jsonValues[j2]) - execQueryWithDatabase(t, vtgateConn, "product:0", query) + execVtgateQuery(t, vtgateConn, "product:0", query) } } @@ -97,28 +97,28 @@ func insertMoreCustomers(t *testing.T, numCustomers int) { } cid++ } - execQueryWithDatabase(t, vtgateConn, "customer", sql) + execVtgateQuery(t, vtgateConn, "customer", sql) } func insertMoreProducts(t *testing.T) { vtgateConn, closeConn := getVTGateConn() defer closeConn() sql := "insert into product(pid, description) values(3, 'cpu'),(4, 'camera'),(5, 'mouse');" - execQueryWithDatabase(t, vtgateConn, "product", sql) + execVtgateQuery(t, vtgateConn, "product", sql) } func insertMoreProductsForSourceThrottler(t *testing.T) { vtgateConn, closeConn := getVTGateConn() defer closeConn() sql := "insert into product(pid, description) values(103, 'new-cpu'),(104, 'new-camera'),(105, 'new-mouse');" - execQueryWithDatabase(t, vtgateConn, "product", sql) + execVtgateQuery(t, vtgateConn, "product", sql) } func insertMoreProductsForTargetThrottler(t *testing.T) { vtgateConn, closeConn := getVTGateConn() defer closeConn() sql := "insert into product(pid, description) values(203, 'new-cpu'),(204, 'new-camera'),(205, 'new-mouse');" - execQueryWithDatabase(t, vtgateConn, "product", sql) + execVtgateQuery(t, vtgateConn, "product", sql) } var blobTableQueries = []string{ @@ -137,6 +137,6 @@ func insertIntoBlobTable(t *testing.T) { vtgateConn, closeConn := getVTGateConn() defer closeConn() for _, query := range blobTableQueries { - execQueryWithDatabase(t, vtgateConn, "product:0", query) + execVtgateQuery(t, vtgateConn, "product:0", query) } } diff --git a/go/test/endtoend/vreplication/materialize_test.go b/go/test/endtoend/vreplication/materialize_test.go index 3297d83acd7..486692a58ba 100644 --- a/go/test/endtoend/vreplication/materialize_test.go +++ b/go/test/endtoend/vreplication/materialize_test.go @@ -206,7 +206,7 @@ func testMaterialize(t *testing.T, useVtctldClient bool) { waitForQueryResult(t, vtgateConn, targetKs, "select id, val, ts, day, month, x from mat2", want) // insert data to test the replication phase - execQueryWithDatabase(t, vtgateConn, sourceKs, "insert into mat(id, val, ts) values (3, 'ghi', '2021-12-11 16:17:36')") + execVtgateQuery(t, vtgateConn, sourceKs, "insert into mat(id, val, ts) values (3, 'ghi', '2021-12-11 16:17:36')") // validate data after the replication phase waitForQueryResult(t, vtgateConn, targetKs, "select count(*) from mat2", "[[INT64(3)]]") diff --git a/go/test/endtoend/vreplication/migrate_test.go b/go/test/endtoend/vreplication/migrate_test.go index f2d6b62024c..1f365c47600 100644 --- a/go/test/endtoend/vreplication/migrate_test.go +++ b/go/test/endtoend/vreplication/migrate_test.go @@ -31,11 +31,11 @@ import ( func insertInitialDataIntoExternalCluster(t *testing.T, conn *mysql.Conn) { t.Run("insertInitialData", func(t *testing.T) { fmt.Printf("Inserting initial data\n") - execQueryWithDatabase(t, conn, "rating:0", "insert into review(rid, pid, review) values(1, 1, 'review1');") - execQueryWithDatabase(t, conn, "rating:0", "insert into review(rid, pid, review) values(2, 1, 'review2');") - execQueryWithDatabase(t, conn, "rating:0", "insert into review(rid, pid, review) values(3, 2, 'review3');") - execQueryWithDatabase(t, conn, "rating:0", "insert into rating(gid, pid, rating) values(1, 1, 4);") - execQueryWithDatabase(t, conn, "rating:0", "insert into rating(gid, pid, rating) values(2, 2, 5);") + execVtgateQuery(t, conn, "rating:0", "insert into review(rid, pid, review) values(1, 1, 'review1');") + execVtgateQuery(t, conn, "rating:0", "insert into review(rid, pid, review) values(2, 1, 'review2');") + execVtgateQuery(t, conn, "rating:0", "insert into review(rid, pid, review) values(3, 2, 'review3');") + execVtgateQuery(t, conn, "rating:0", "insert into rating(gid, pid, rating) values(1, 1, 4);") + execVtgateQuery(t, conn, "rating:0", "insert into rating(gid, pid, rating) values(2, 2, 5);") }) } @@ -109,8 +109,8 @@ func TestVtctlMigrate(t *testing.T) { expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 1) waitForRowCount(t, vtgateConn, "product:0", "rating", 2) waitForRowCount(t, vtgateConn, "product:0", "review", 3) - execQueryWithDatabase(t, extVtgateConn, "rating", "insert into review(rid, pid, review) values(4, 1, 'review4');") - execQueryWithDatabase(t, extVtgateConn, "rating", "insert into rating(gid, pid, rating) values(3, 1, 3);") + execVtgateQuery(t, extVtgateConn, "rating", "insert into review(rid, pid, review) values(4, 1, 'review4');") + execVtgateQuery(t, extVtgateConn, "rating", "insert into rating(gid, pid, rating) values(3, 1, 3);") waitForRowCount(t, vtgateConn, "product:0", "rating", 3) waitForRowCount(t, vtgateConn, "product:0", "review", 4) vdiffSideBySide(t, ksWorkflow, "extcell1") @@ -122,7 +122,7 @@ func TestVtctlMigrate(t *testing.T) { expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 0) }) t.Run("cancel migrate workflow", func(t *testing.T) { - execQueryWithDatabase(t, vtgateConn, "product", "drop table review,rating") + execVtgateQuery(t, vtgateConn, "product", "drop table review,rating") if output, err = vc.VtctlClient.ExecuteCommandWithOutput("Migrate", "--", "--all", "--auto_start=false", "--cells=extcell1", "--source=ext1.rating", "create", ksWorkflow); err != nil { @@ -234,8 +234,8 @@ func TestVtctldMigrate(t *testing.T) { expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 1) waitForRowCount(t, vtgateConn, "product:0", "rating", 2) waitForRowCount(t, vtgateConn, "product:0", "review", 3) - execQueryWithDatabase(t, extVtgateConn, "rating", "insert into review(rid, pid, review) values(4, 1, 'review4');") - execQueryWithDatabase(t, extVtgateConn, "rating", "insert into rating(gid, pid, rating) values(3, 1, 3);") + execVtgateQuery(t, extVtgateConn, "rating", "insert into review(rid, pid, review) values(4, 1, 'review4');") + execVtgateQuery(t, extVtgateConn, "rating", "insert into rating(gid, pid, rating) values(3, 1, 3);") waitForRowCount(t, vtgateConn, "product:0", "rating", 3) waitForRowCount(t, vtgateConn, "product:0", "review", 4) vdiffSideBySide(t, ksWorkflow, "extcell1") @@ -261,7 +261,7 @@ func TestVtctldMigrate(t *testing.T) { expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 0) }) t.Run("cancel migrate workflow", func(t *testing.T) { - execQueryWithDatabase(t, vtgateConn, "product", "drop table review,rating") + execVtgateQuery(t, vtgateConn, "product", "drop table review,rating") output, err = vc.VtctldClient.ExecuteCommandWithOutput("Migrate", "--target-keyspace", "product", "--workflow", "e1", "Create", "--source-keyspace", "rating", "--mount-name", "ext1", "--all-tables", "--auto-start=false", "--cells=extcell1") diff --git a/go/test/endtoend/vreplication/partial_movetables_seq_test.go b/go/test/endtoend/vreplication/partial_movetables_seq_test.go index d57e18f4bf1..eec304e0a4d 100644 --- a/go/test/endtoend/vreplication/partial_movetables_seq_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_seq_test.go @@ -532,7 +532,7 @@ var lastCustomerId int64 func getCustomerCount(t *testing.T, msg string) int64 { vtgateConn, closeConn := getVTGateConn() defer closeConn() - qr := execQueryWithDatabase(t, vtgateConn, "", "select count(*) from customer") + qr := execVtgateQuery(t, vtgateConn, "", "select count(*) from customer") require.NotNil(t, qr) count, err := qr.Rows[0][0].ToInt64() require.NoError(t, err) @@ -542,7 +542,7 @@ func getCustomerCount(t *testing.T, msg string) int64 { func confirmLastCustomerIdHasIncreased(t *testing.T) { vtgateConn, closeConn := getVTGateConn() defer closeConn() - qr := execQueryWithDatabase(t, vtgateConn, "", "select cid from customer order by cid desc limit 1") + qr := execVtgateQuery(t, vtgateConn, "", "select cid from customer order by cid desc limit 1") require.NotNil(t, qr) currentCustomerId, err := qr.Rows[0][0].ToInt64() require.NoError(t, err) @@ -554,7 +554,7 @@ func insertCustomers(t *testing.T) { vtgateConn, closeConn := getVTGateConn() defer closeConn() for i := int64(1); i < newCustomerCount+1; i++ { - execQueryWithDatabase(t, vtgateConn, "customer@primary", fmt.Sprintf("insert into customer(name) values ('name-%d')", currentCustomerCount+i)) + execVtgateQuery(t, vtgateConn, "customer@primary", fmt.Sprintf("insert into customer(name) values ('name-%d')", currentCustomerCount+i)) } customerCount = getCustomerCount(t, "") require.Equal(t, currentCustomerCount+newCustomerCount, customerCount) diff --git a/go/test/endtoend/vreplication/reference_test.go b/go/test/endtoend/vreplication/reference_test.go index 969e0bd2c88..8ff77de8708 100644 --- a/go/test/endtoend/vreplication/reference_test.go +++ b/go/test/endtoend/vreplication/reference_test.go @@ -146,7 +146,7 @@ func TestReferenceTableMaterializationAndRouting(t *testing.T) { execRefQuery(t, "update sks.mfg2 set name = concat(name, '-updated') where id = 4") waitForRowCount(t, vtgateConn, uks, "mfg", 8) - qr := execQueryWithDatabase(t, vtgateConn, "uks", "select count(*) from uks.mfg where name like '%updated%'") + qr := execVtgateQuery(t, vtgateConn, "uks", "select count(*) from uks.mfg where name like '%updated%'") require.NotNil(t, qr) require.Equal(t, "4", qr.Rows[0][0].ToString()) diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 862ab31cb4d..82c859acb40 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -359,7 +359,7 @@ func validateWritesRouteToSource(t *testing.T) { insertQuery := "insert into customer(name, cid) values('tempCustomer2', 200)" matchInsertQuery := "insert into customer(`name`, cid) values" assertQueryExecutesOnTablet(t, vtgateConn, sourceTab, "customer", insertQuery, matchInsertQuery) - execQueryWithDatabase(t, vtgateConn, "customer", "delete from customer where cid = 200") + execVtgateQuery(t, vtgateConn, "customer", "delete from customer where cid = 200") } func validateWritesRouteToTarget(t *testing.T) { @@ -370,7 +370,7 @@ func validateWritesRouteToTarget(t *testing.T) { assertQueryExecutesOnTablet(t, vtgateConn, targetTab2, "customer", insertQuery, matchInsertQuery) insertQuery = "insert into customer(name, cid) values('tempCustomer3', 102)" assertQueryExecutesOnTablet(t, vtgateConn, targetTab1, "customer", insertQuery, matchInsertQuery) - execQueryWithDatabase(t, vtgateConn, "customer", "delete from customer where cid in (101, 102)") + execVtgateQuery(t, vtgateConn, "customer", "delete from customer where cid in (101, 102)") } func revert(t *testing.T, workflowType string) { @@ -470,7 +470,7 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) { // ensure sequence is available to vtgate num := 5 for i := 0; i < num; i++ { - execQueryWithDatabase(t, vtgateConn, "customer", "insert into customer2(name) values('a')") + execVtgateQuery(t, vtgateConn, "customer", "insert into customer2(name) values('a')") } waitForRowCount(t, vtgateConn, "customer", "customer2", 3+num) want := fmt.Sprintf("[[INT32(%d)]]", 100+num-1) @@ -502,10 +502,10 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) { // ensure sequence is available to vtgate for i := 0; i < num; i++ { - execQueryWithDatabase(t, vtgateConn, "product", "insert into customer2(name) values('a')") + execVtgateQuery(t, vtgateConn, "product", "insert into customer2(name) values('a')") } waitForRowCount(t, vtgateConn, "product", "customer2", 3+num+num) - res := execQueryWithDatabase(t, vtgateConn, "product", "select max(cid) from customer2") + res := execVtgateQuery(t, vtgateConn, "product", "select max(cid) from customer2") cid, err := res.Rows[0][0].ToInt() require.NoError(t, err) require.GreaterOrEqual(t, cid, 100+num+num-1) @@ -527,10 +527,10 @@ func testReplicatingWithPKEnumCols(t *testing.T) { // typ is an enum, with soho having a stored and binlogged value of 2 deleteQuery := "delete from customer where cid = 2 and typ = 'soho'" insertQuery := "insert into customer(cid, name, typ, sport, meta) values(2, 'PaĆ¼l','soho','cricket',convert(x'7b7d' using utf8mb4))" - execQueryWithDatabase(t, vtgateConn, sourceKs, deleteQuery) + execVtgateQuery(t, vtgateConn, sourceKs, deleteQuery) waitForNoWorkflowLag(t, vc, targetKs, workflowName) vdiffSideBySide(t, ksWorkflow, "") - execQueryWithDatabase(t, vtgateConn, sourceKs, insertQuery) + execVtgateQuery(t, vtgateConn, sourceKs, insertQuery) waitForNoWorkflowLag(t, vc, targetKs, workflowName) vdiffSideBySide(t, ksWorkflow, "") } @@ -559,7 +559,7 @@ func testReshardV2Workflow(t *testing.T) { return default: // Use a random customer type for each record. - _ = execQueryWithDatabase(t, dataGenConn, "customer", fmt.Sprintf("insert into customer (cid, name, typ) values (%d, 'tempCustomer%d', %s)", + _ = execVtgateQuery(t, dataGenConn, "customer", fmt.Sprintf("insert into customer (cid, name, typ) values (%d, 'tempCustomer%d', %s)", id, id, customerTypes[rand.IntN(len(customerTypes))])) } time.Sleep(1 * time.Millisecond) @@ -590,17 +590,17 @@ func testReshardV2Workflow(t *testing.T) { // Confirm that we lost no customer related writes during the Reshard. dataGenCancel() dataGenWg.Wait() - cres := execQueryWithDatabase(t, dataGenConn, "customer", "select count(*) from customer") + cres := execVtgateQuery(t, dataGenConn, "customer", "select count(*) from customer") require.Len(t, cres.Rows, 1) waitForNoWorkflowLag(t, vc, "customer", "customer_name") - cnres := execQueryWithDatabase(t, dataGenConn, "customer", "select count(*) from customer_name") + cnres := execVtgateQuery(t, dataGenConn, "customer", "select count(*) from customer_name") require.Len(t, cnres.Rows, 1) require.EqualValues(t, cres.Rows, cnres.Rows) if debugMode { // We expect the row count to differ in enterprise_customer because it is // using a `where typ='enterprise'` filter. So the count is only for debug // info. - ecres := execQueryWithDatabase(t, dataGenConn, "customer", "select count(*) from enterprise_customer") + ecres := execVtgateQuery(t, dataGenConn, "customer", "select count(*) from enterprise_customer") t.Logf("Done inserting customer data. Record counts in customer: %s, customer_name: %s, enterprise_customer: %s", cres.Rows[0][0].ToString(), cnres.Rows[0][0].ToString(), ecres.Rows[0][0].ToString()) } diff --git a/go/test/endtoend/vreplication/time_zone_test.go b/go/test/endtoend/vreplication/time_zone_test.go index cb271e04c8a..2c0a9a4f5a5 100644 --- a/go/test/endtoend/vreplication/time_zone_test.go +++ b/go/test/endtoend/vreplication/time_zone_test.go @@ -190,7 +190,7 @@ func TestMoveTablesTZ(t *testing.T) { } // inserts to test date conversions in reverse replication - execQueryWithDatabase(t, vtgateConn, "customer", "insert into datze(id, dt2) values (13, '2022-01-01 18:20:30')") - execQueryWithDatabase(t, vtgateConn, "customer", "insert into datze(id, dt2) values (14, '2022-04-01 12:06:07')") + execVtgateQuery(t, vtgateConn, "customer", "insert into datze(id, dt2) values (13, '2022-01-01 18:20:30')") + execVtgateQuery(t, vtgateConn, "customer", "insert into datze(id, dt2) values (14, '2022-04-01 12:06:07')") vdiffSideBySide(t, ksReverseWorkflow, "") } diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index 7fb37430768..08f5bb8926d 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -164,13 +164,13 @@ func TestVDiff2(t *testing.T) { // Insert null and empty enum values for testing vdiff comparisons for those values. // If we add this to the initial data list, the counts in several other tests will need to change query := `insert into customer(cid, name, typ, sport) values(1001, null, 'soho','')` - execQueryWithDatabase(t, vtgateConn, fmt.Sprintf("%s:%s", sourceKs, sourceShards[0]), query) + execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:%s", sourceKs, sourceShards[0]), query) generateMoreCustomers(t, sourceKs, 1000) // Create rows in the nopk table using the customer names and random ages between 20 and 100. query = "insert into nopk(name, age) select name, floor(rand()*80)+20 from customer" - execQueryWithDatabase(t, vtgateConn, fmt.Sprintf("%s:%s", sourceKs, sourceShards[0]), query) + execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:%s", sourceKs, sourceShards[0]), query) // The primary tablet is only added in the first cell. // We ONLY add primary tablets in this test. @@ -502,7 +502,7 @@ func testResume(t *testing.T, tc *testCase, cells string) { expectedNewRows := int64(0) if tc.resumeInsert != "" { - res := execQueryWithDatabase(t, vtgateConn, tc.sourceKs, tc.resumeInsert) + res := execVtgateQuery(t, vtgateConn, tc.sourceKs, tc.resumeInsert) expectedNewRows = int64(res.RowsAffected) } expectedRows := rowsCompared + expectedNewRows @@ -549,7 +549,7 @@ func testAutoRetryError(t *testing.T, tc *testCase, cells string) { // compared is cumulative. expectedNewRows := int64(0) if tc.retryInsert != "" { - res := execQueryWithDatabase(t, vtgateConn, tc.sourceKs, tc.retryInsert) + res := execVtgateQuery(t, vtgateConn, tc.sourceKs, tc.retryInsert) expectedNewRows = int64(res.RowsAffected) } expectedRows := rowsCompared + expectedNewRows diff --git a/go/test/endtoend/vreplication/vdiff_helper_test.go b/go/test/endtoend/vreplication/vdiff_helper_test.go index a2a967df3f1..53e19e56731 100644 --- a/go/test/endtoend/vreplication/vdiff_helper_test.go +++ b/go/test/endtoend/vreplication/vdiff_helper_test.go @@ -354,7 +354,7 @@ func generateMoreCustomers(t *testing.T, keyspace string, numCustomers int64) { vtgateConn, closeConn := getVTGateConn() defer closeConn() log.Infof("Generating more test data with an additional %d customers", numCustomers) - res := execQueryWithDatabase(t, vtgateConn, keyspace, "select max(cid) from customer") + res := execVtgateQuery(t, vtgateConn, keyspace, "select max(cid) from customer") startingID, _ := res.Rows[0][0].ToInt64() insert := strings.Builder{} insert.WriteString("insert into customer(cid, name, typ) values ") @@ -366,5 +366,5 @@ func generateMoreCustomers(t *testing.T, keyspace string, numCustomers int64) { insert.WriteString(", ") } } - execQueryWithDatabase(t, vtgateConn, keyspace, insert.String()) + execVtgateQuery(t, vtgateConn, keyspace, insert.String()) } diff --git a/go/test/endtoend/vreplication/vdiff_online_ddl_test.go b/go/test/endtoend/vreplication/vdiff_online_ddl_test.go index 1650ea50e0f..92977111294 100644 --- a/go/test/endtoend/vreplication/vdiff_online_ddl_test.go +++ b/go/test/endtoend/vreplication/vdiff_online_ddl_test.go @@ -132,7 +132,7 @@ func waitForAdditionalRows(t *testing.T, keyspace, table string, count int) { } func getNumRows(t *testing.T, vtgateConn *mysql.Conn, keyspace, table string) int { - qr := execQueryWithDatabase(t, vtgateConn, keyspace, fmt.Sprintf("SELECT COUNT(*) FROM %s", table)) + qr := execVtgateQuery(t, vtgateConn, keyspace, fmt.Sprintf("SELECT COUNT(*) FROM %s", table)) require.NotNil(t, qr) numRows, err := strconv.Atoi(qr.Rows[0][0].ToString()) require.NoError(t, err) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 726f91cfbd0..1f298992d90 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -340,7 +340,7 @@ func testVreplicationWorkflows(t *testing.T, limited bool, binlogRowImage string // the Lead and Lead-1 tables tested a specific case with binary sharding keys. Drop it now so that we don't // have to update the rest of the tests - execQueryWithDatabase(t, vtgateConn, "customer", "drop table `Lead`,`Lead-1`") + execVtgateQuery(t, vtgateConn, "customer", "drop table `Lead`,`Lead-1`") validateRollupReplicates(t) shardOrders(t) shardMerchant(t) @@ -360,13 +360,14 @@ func testVreplicationWorkflows(t *testing.T, limited bool, binlogRowImage string insertMoreCustomers(t, 16) reshardCustomer2to4Split(t, nil, "") - confirmAllStreamsRunning(t, "customer", "-40") + confirmAllStreamsRunning(t, vtgateConn, "customer:-40") expectNumberOfStreams(t, vtgateConn, "Customer2to4", "sales", "product:0", 4) reshardCustomer3to2SplitMerge(t) - confirmAllStreamsRunning(t, "customer", "-60") + confirmAllStreamsRunning(t, vtgateConn, "customer:-60") expectNumberOfStreams(t, vtgateConn, "Customer3to2", "sales", "product:0", 3) reshardCustomer3to1Merge(t) - confirmAllStreamsRunning(t, "customer", "0") + confirmAllStreamsRunning(t, vtgateConn, "customer:0") + expectNumberOfStreams(t, vtgateConn, "Customer3to1", "sales", "product:0", 1) t.Run("Verify CopyState Is Optimized Afterwards", func(t *testing.T) { @@ -717,10 +718,10 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl defer vtgateConn.Close() // Confirm that the 0 scale decimal field, dec80, is replicated correctly dec80Replicated := false - execQueryWithDatabase(t, vtgateConn, sourceKs, "update customer set dec80 = 0") - execQueryWithDatabase(t, vtgateConn, sourceKs, "update customer set blb = \"new blob data\" where cid=3") - execQueryWithDatabase(t, vtgateConn, sourceKs, "update json_tbl set j1 = null, j2 = 'null', j3 = '\"null\"'") - execQueryWithDatabase(t, vtgateConn, sourceKs, "insert into json_tbl(id, j1, j2, j3) values (7, null, 'null', '\"null\"')") + execVtgateQuery(t, vtgateConn, sourceKs, "update customer set dec80 = 0") + execVtgateQuery(t, vtgateConn, sourceKs, "update customer set blb = \"new blob data\" where cid=3") + execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j1 = null, j2 = 'null', j3 = '\"null\"'") + execVtgateQuery(t, vtgateConn, sourceKs, "insert into json_tbl(id, j1, j2, j3) values (7, null, 'null', '\"null\"')") waitForNoWorkflowLag(t, vc, targetKs, workflow) for _, tablet := range []*cluster.VttabletProcess{customerTab1, customerTab2} { // Query the tablet's mysqld directly as the target may have denied table entries. @@ -737,8 +738,8 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl // Insert multiple rows in the loadtest table and immediately delete them to confirm that bulk delete // works the same way with the vplayer optimization enabled and disabled. Currently this optimization // is disabled by default, but enabled in TestCellAliasVreplicationWorkflow. - execQueryWithDatabase(t, vtgateConn, sourceKs, "insert into loadtest(id, name) values(10001, 'tempCustomer'), (10002, 'tempCustomer2'), (10003, 'tempCustomer3'), (10004, 'tempCustomer4')") - execQueryWithDatabase(t, vtgateConn, sourceKs, "delete from loadtest where id > 10000") + execVtgateQuery(t, vtgateConn, sourceKs, "insert into loadtest(id, name) values(10001, 'tempCustomer'), (10002, 'tempCustomer2'), (10003, 'tempCustomer3'), (10004, 'tempCustomer4')") + execVtgateQuery(t, vtgateConn, sourceKs, "delete from loadtest where id > 10000") // Confirm that all partial query metrics get updated when we are testing the noblob mode. t.Run("validate partial query counts", func(t *testing.T) { @@ -782,7 +783,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl if err != nil { require.FailNow(t, output) } - execQueryWithDatabase(t, vtgateConn, "product", fmt.Sprintf("update `%s` set name='xyz'", tbl)) + execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("update `%s` set name='xyz'", tbl)) } } vdiffSideBySide(t, ksWorkflow, "") @@ -803,7 +804,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl // The original unsharded customer data included an insert with the // vindex column (cid) of 999999, so the backing sequence table should // now have a next_id of 1000000 after SwitchTraffic. - res := execQueryWithDatabase(t, vtgateConn, sourceKs, "select next_id from customer_seq where id = 0") + res := execVtgateQuery(t, vtgateConn, sourceKs, "select next_id from customer_seq where id = 0") require.Equal(t, "1000000", res.Rows[0][0].ToString()) if withOpenTx && commit != nil { @@ -814,7 +815,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl vdiffSideBySide(t, "product.p2c_reverse", "") if withOpenTx { - execQueryWithDatabase(t, vtgateConn, "", deleteOpenTxQuery) + execVtgateQuery(t, vtgateConn, "", deleteOpenTxQuery) } ksShards := []string{"product/0", "customer/-80", "customer/80-"} @@ -829,7 +830,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl insertQuery2 = "insert into customer(name, cid) values('tempCustomer4', 102)" // ID 102, hence due to reverse_bits in shard -80 assertQueryExecutesOnTablet(t, vtgateConn, customerTab1, "customer", insertQuery2, matchInsertQuery2) - execQueryWithDatabase(t, vtgateConn, "customer", "update customer set meta = convert(x'7b7d' using utf8mb4) where cid = 1") + execVtgateQuery(t, vtgateConn, "customer", "update customer set meta = convert(x'7b7d' using utf8mb4) where cid = 1") if testReverse { // Reverse Replicate switchReads(t, workflowType, cellNames, ksWorkflow, true) @@ -888,13 +889,13 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl insertQuery2 = "insert into customer(name, cid) values('tempCustomer9', 105)" // ID 104, hence due to reverse_bits in shard 80- assertQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery2, matchInsertQuery2) - execQueryWithDatabase(t, vtgateConn, "customer", "delete from customer where name like 'tempCustomer%'") + execVtgateQuery(t, vtgateConn, "customer", "delete from customer where name like 'tempCustomer%'") waitForRowCountInTablet(t, customerTab1, "customer", "customer", 1) waitForRowCountInTablet(t, customerTab2, "customer", "customer", 2) waitForRowCount(t, vtgateConn, "customer", "customer.customer", 3) query = "insert into customer (name, cid) values('george', 5)" - execQueryWithDatabase(t, vtgateConn, "customer", query) + execVtgateQuery(t, vtgateConn, "customer", query) waitForRowCountInTablet(t, customerTab1, "customer", "customer", 1) waitForRowCountInTablet(t, customerTab2, "customer", "customer", 3) waitForRowCount(t, vtgateConn, "customer", "customer.customer", 4) @@ -923,7 +924,7 @@ func reshardCustomer2to4Split(t *testing.T, cells []*Cell, sourceCellOrAlias str 600, counts, nil, nil, cells, sourceCellOrAlias, 1) waitForRowCount(t, vtgateConn, ksName, "customer", 20) query := "insert into customer (name) values('yoko')" - execQueryWithDatabase(t, vtgateConn, ksName, query) + execVtgateQuery(t, vtgateConn, ksName, query) waitForRowCount(t, vtgateConn, ksName, "customer", 21) }) } @@ -938,7 +939,7 @@ func reshardMerchant2to3SplitMerge(t *testing.T) { 1600, counts, dryRunResultsSwitchReadM2m3, dryRunResultsSwitchWritesM2m3, nil, "", 1) waitForRowCount(t, vtgateConn, ksName, "merchant", 2) query := "insert into merchant (mname, category) values('amazon', 'electronics')" - execQueryWithDatabase(t, vtgateConn, ksName, query) + execVtgateQuery(t, vtgateConn, ksName, query) waitForRowCount(t, vtgateConn, ksName, "merchant", 3) var output string @@ -987,7 +988,7 @@ func reshardMerchant3to1Merge(t *testing.T) { 2000, counts, nil, nil, nil, "", 1) waitForRowCount(t, vtgateConn, ksName, "merchant", 3) query := "insert into merchant (mname, category) values('flipkart', 'electronics')" - execQueryWithDatabase(t, vtgateConn, ksName, query) + execVtgateQuery(t, vtgateConn, ksName, query) waitForRowCount(t, vtgateConn, ksName, "merchant", 4) }) } diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index b92dbeb9937..e13c3e24e80 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -95,7 +95,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) { } insertMu.Lock() id++ - execQueryWithDatabase(t, vtgateConn, "product", fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id)) + execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id)) insertMu.Unlock() } }() @@ -164,7 +164,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) { } } - qr := execQueryWithDatabase(t, vtgateConn, "product", "select count(*) from customer") + qr := execVtgateQuery(t, vtgateConn, "product", "select count(*) from customer") require.NotNil(t, qr) // total number of row events found by the VStream API should match the rows inserted insertedRows, err := qr.Rows[0][0].ToCastInt64() @@ -507,7 +507,7 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven // because the keyspace remains unsharded and the number of rows in the customer_seq table is always 1. // We believe that checking the number of row events for the unsharded keyspace, which should always be greater than 0 before and after resharding, // is sufficient to confirm that the resharding of one keyspace does not affect another keyspace, while keeping the test straightforward. - customerResult := execQueryWithDatabase(t, vtgateConn, "sharded", "select count(*) from customer") + customerResult := execVtgateQuery(t, vtgateConn, "sharded", "select count(*) from customer") insertedCustomerRows, err := customerResult.Rows[0][0].ToCastInt64() require.NoError(t, err) require.Equal(t, insertedCustomerRows, ne.numDash80Events+ne.num80DashEvents+ne.numDash40Events+ne.num40DashEvents) @@ -698,7 +698,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { require.NotZero(t, newShardRowEvents) // The number of row events streamed by the VStream API should match the number of rows inserted. - customerResult := execQueryWithDatabase(t, vtgateConn, ks, "select count(*) from customer") + customerResult := execVtgateQuery(t, vtgateConn, ks, "select count(*) from customer") customerCount, err := customerResult.Rows[0][0].ToInt64() require.NoError(t, err) require.Equal(t, customerCount, int64(oldShardRowEvents+newShardRowEvents))