diff --git a/.github/workflows/cluster_endtoend_vreplication_foreign_key_stress.yml b/.github/workflows/cluster_endtoend_vreplication_foreign_key_stress.yml new file mode 100644 index 00000000000..50b65ecc27d --- /dev/null +++ b/.github/workflows/cluster_endtoend_vreplication_foreign_key_stress.yml @@ -0,0 +1,170 @@ +# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows" + +name: Cluster (vreplication_foreign_key_stress) +on: [push, pull_request] +concurrency: + group: format('{0}-{1}', ${{ github.ref }}, 'Cluster (vreplication_foreign_key_stress)') + cancel-in-progress: true + +permissions: read-all + +env: + LAUNCHABLE_ORGANIZATION: "vitess" + LAUNCHABLE_WORKSPACE: "vitess-app" + GITHUB_PR_HEAD_SHA: "${{ github.event.pull_request.head.sha }}" + +jobs: + build: + name: Run endtoend tests on Cluster (vreplication_foreign_key_stress) + runs-on: gh-hosted-runners-4cores-1 + + steps: + - name: Skip CI + run: | + if [[ "${{contains( github.event.pull_request.labels.*.name, 'Skip CI')}}" == "true" ]]; then + echo "skipping CI due to the 'Skip CI' label" + exit 1 + fi + + - name: Check if workflow needs to be skipped + id: skip-workflow + run: | + skip='false' + if [[ "${{github.event.pull_request}}" == "" ]] && [[ "${{github.ref}}" != "refs/heads/main" ]] && [[ ! "${{github.ref}}" =~ ^refs/heads/release-[0-9]+\.[0-9]$ ]] && [[ ! "${{github.ref}}" =~ "refs/tags/.*" ]]; then + skip='true' + fi + echo Skip ${skip} + echo "skip-workflow=${skip}" >> $GITHUB_OUTPUT + + PR_DATA=$(curl \ + -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ + -H "Accept: application/vnd.github.v3+json" \ + "https://api.github.com/repos/${{ github.repository }}/pulls/${{ github.event.pull_request.number }}") + draft=$(echo "$PR_DATA" | jq .draft -r) + echo "is_draft=${draft}" >> $GITHUB_OUTPUT + + - name: Check out code + if: steps.skip-workflow.outputs.skip-workflow == 'false' + uses: actions/checkout@v3 + + - name: Check for changes in relevant files + if: steps.skip-workflow.outputs.skip-workflow == 'false' + uses: frouioui/paths-filter@main + id: changes + with: + token: '' + filters: | + end_to_end: + - 'go/**/*.go' + - 'test.go' + - 'Makefile' + - 'build.env' + - 'go.sum' + - 'go.mod' + - 'proto/*.proto' + - 'tools/**' + - 'config/**' + - 'bootstrap.sh' + - '.github/workflows/cluster_endtoend_vreplication_foreign_key_stress.yml' + + - name: Set up Go + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + uses: actions/setup-go@v4 + with: + go-version: 1.21.3 + + - name: Set up python + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + uses: actions/setup-python@v4 + + - name: Tune the OS + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + run: | + # Limit local port range to not use ports that overlap with server side + # ports that we listen on. + sudo sysctl -w net.ipv4.ip_local_port_range="22768 65535" + # Increase the asynchronous non-blocking I/O. More information at https://dev.mysql.com/doc/refman/5.7/en/innodb-parameters.html#sysvar_innodb_use_native_aio + echo "fs.aio-max-nr = 1048576" | sudo tee -a /etc/sysctl.conf + sudo sysctl -p /etc/sysctl.conf + + - name: Get dependencies + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + run: | + + # Get key to latest MySQL repo + sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 467B942D3A79BD29 + # Setup MySQL 8.0 + wget -c https://dev.mysql.com/get/mysql-apt-config_0.8.24-1_all.deb + echo mysql-apt-config mysql-apt-config/select-server select mysql-8.0 | sudo debconf-set-selections + sudo DEBIAN_FRONTEND="noninteractive" dpkg -i mysql-apt-config* + sudo apt-get update + # Install everything else we need, and configure + sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata xz-utils libncurses5 + + sudo service mysql stop + sudo service etcd stop + sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ + sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld + go mod download + + # install JUnit report formatter + go install github.com/vitessio/go-junit-report@HEAD + + - name: Setup launchable dependencies + if: steps.skip-workflow.outputs.is_draft == 'false' && steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && github.base_ref == 'main' + run: | + # Get Launchable CLI installed. If you can, make it a part of the builder image to speed things up + pip3 install --user launchable~=1.0 > /dev/null + + # verify that launchable setup is all correct. + launchable verify || true + + # Tell Launchable about the build you are producing and testing + launchable record build --name "$GITHUB_RUN_ID" --no-commit-collection --source . + + - name: Run cluster endtoend test + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + timeout-minutes: 45 + run: | + # We set the VTDATAROOT to the /tmp folder to reduce the file path of mysql.sock file + # which musn't be more than 107 characters long. + export VTDATAROOT="/tmp/" + source build.env + + set -exo pipefail + + # Increase our open file descriptor limit as we could hit this + ulimit -n 65536 + cat <<-EOF>>./config/mycnf/mysql80.cnf + innodb_buffer_pool_dump_at_shutdown=OFF + innodb_buffer_pool_in_core_file=OFF + innodb_buffer_pool_load_at_startup=OFF + innodb_buffer_pool_size=64M + innodb_doublewrite=OFF + innodb_flush_log_at_trx_commit=0 + innodb_flush_method=O_DIRECT + innodb_numa_interleave=ON + innodb_adaptive_hash_index=OFF + sync_binlog=0 + sync_relay_log=0 + performance_schema=OFF + slow-query-log=OFF + EOF + + cat <<-EOF>>./config/mycnf/mysql80.cnf + binlog-transaction-compression=ON + EOF + + # run the tests however you normally do, then produce a JUnit XML file + eatmydata -- go run test.go -docker=false -follow -shard vreplication_foreign_key_stress | tee -a output.txt | go-junit-report -set-exit-code > report.xml + + - name: Print test output and Record test result in launchable if PR is not a draft + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && always() + run: | + if [[ "${{steps.skip-workflow.outputs.is_draft}}" == "false" ]]; then + # send recorded tests to launchable + launchable record tests --build "$GITHUB_RUN_ID" go-test . || true + fi + + # print test output + cat output.txt diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index ed6a97a15c2..f51e1838a5b 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -79,6 +79,7 @@ type VttabletProcess struct { DbFlavor string Charset string ConsolidationsURL string + IsPrimary bool // Extra Args to be set before starting the vttablet process ExtraArgs []string @@ -460,6 +461,29 @@ func (vttablet *VttabletProcess) QueryTablet(query string, keyspace string, useD return executeQuery(conn, query) } +// QueryTabletMultiple lets you execute multiple queries -- without any +// results -- against the tablet. +func (vttablet *VttabletProcess) QueryTabletMultiple(queries []string, keyspace string, useDb bool) error { + if !useDb { + keyspace = "" + } + dbParams := NewConnParams(vttablet.DbPort, vttablet.DbPassword, path.Join(vttablet.Directory, "mysql.sock"), keyspace) + conn, err := vttablet.conn(&dbParams) + if err != nil { + return err + } + defer conn.Close() + + for _, query := range queries { + log.Infof("Executing query %s (on %s)", query, vttablet.Name) + _, err := executeQuery(conn, query) + if err != nil { + return err + } + } + return nil +} + func (vttablet *VttabletProcess) defaultConn(dbname string) (*mysql.Conn, error) { dbParams := mysql.ConnParams{ Uname: "vt_dba", diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index 4735e94560f..89cebc7d0b1 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -544,6 +544,7 @@ func (vc *VitessCluster) AddShards(t *testing.T, cells []*Cell, keyspace *Keyspa tablets = append(tablets, primary) dbProcesses = append(dbProcesses, proc) primaryTabletUID = primary.Vttablet.TabletUID + primary.Vttablet.IsPrimary = true } for i := 0; i < numReplicas; i++ { @@ -795,13 +796,13 @@ func (vc *VitessCluster) getPrimaryTablet(t *testing.T, ksName, shardName string continue } for _, tablet := range shard.Tablets { - if tablet.Vttablet.GetTabletStatus() == "SERVING" { + if tablet.Vttablet.IsPrimary { return tablet.Vttablet } } } } - require.FailNow(t, "no primary found for %s:%s", ksName, shardName) + require.FailNow(t, "no primary found", "keyspace %s, shard %s", ksName, shardName) return nil } diff --git a/go/test/endtoend/vreplication/fk_ext_load_generator_test.go b/go/test/endtoend/vreplication/fk_ext_load_generator_test.go new file mode 100644 index 00000000000..12b5871781f --- /dev/null +++ b/go/test/endtoend/vreplication/fk_ext_load_generator_test.go @@ -0,0 +1,503 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "context" + "fmt" + "math/rand" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/log" +) + +const ( + // Only used when debugging tests. + queryLog = "queries.txt" + + LoadGeneratorStateLoading = "loading" + LoadGeneratorStateRunning = "running" + LoadGeneratorStateStopped = "stopped" + + dataLoadTimeout = 1 * time.Minute + tickInterval = 1 * time.Second + queryTimeout = 1 * time.Minute + + getRandomIdQuery = "SELECT id FROM %s.parent ORDER BY RAND() LIMIT 1" + insertQuery = "INSERT INTO %s.parent (id, name) VALUES (%d, 'name-%d')" + updateQuery = "UPDATE %s.parent SET name = 'rename-%d' WHERE id = %d" + deleteQuery = "DELETE FROM %s.parent WHERE id = %d" + insertChildQuery = "INSERT INTO %s.child (id, parent_id) VALUES (%d, %d)" + insertChildQueryOverrideConstraints = "INSERT /*+ SET_VAR(foreign_key_checks=0) */ INTO %s.child (id, parent_id) VALUES (%d, %d)" +) + +// ILoadGenerator is an interface for load generators that we will use to simulate different types of loads. +type ILoadGenerator interface { + Init(ctx context.Context, vc *VitessCluster) // name & description only for logging. + Teardown() + + // "direct", use direct db connection to primary, only for unsharded keyspace. + // or "vtgate" to use vtgate routing. + // Stop() before calling SetDBStrategy(). + SetDBStrategy(direct, keyspace string) + SetOverrideConstraints(allow bool) // true if load generator can insert rows without FK constraints. + + Keyspace() string + DBStrategy() string // direct or vtgate + State() string // state of load generator (stopped, running) + OverrideConstraints() bool // true if load generator can insert rows without FK constraints. + + Load() error // initial load of data. + Start() error // start populating additional data. + Stop() error // stop populating additional data. + + // Implementation will decide which table to wait for extra rows on. + WaitForAdditionalRows(count int) error + // table == "", implementation will decide which table to get rows from, same table as in WaitForAdditionalRows(). + GetRowCount(table string) (int, error) +} + +var lg ILoadGenerator + +var _ ILoadGenerator = (*SimpleLoadGenerator)(nil) + +type LoadGenerator struct { + ctx context.Context + vc *VitessCluster + state string + dbStrategy string + overrideConstraints bool + keyspace string + tables []string +} + +// SimpleLoadGenerator, which has a single parent table and a single child table for which different types +// of DMLs are run. +type SimpleLoadGenerator struct { + LoadGenerator + currentParentId int + currentChildId int + ch chan bool + runCtx context.Context + runCtxCancel context.CancelFunc +} + +func (lg *SimpleLoadGenerator) SetOverrideConstraints(allow bool) { + lg.overrideConstraints = allow +} + +func (lg *SimpleLoadGenerator) OverrideConstraints() bool { + return lg.overrideConstraints +} + +func (lg *SimpleLoadGenerator) GetRowCount(table string) (int, error) { + vtgateConn, err := lg.getVtgateConn(context.Background()) + if err != nil { + return 0, err + } + defer vtgateConn.Close() + return lg.getNumRows(vtgateConn, table), nil +} + +func (lg *SimpleLoadGenerator) getVtgateConn(ctx context.Context) (*mysql.Conn, error) { + vtParams := mysql.ConnParams{ + Host: lg.vc.ClusterConfig.hostname, + Port: lg.vc.ClusterConfig.vtgateMySQLPort, + Uname: "vt_dba", + } + conn, err := mysql.Connect(ctx, &vtParams) + return conn, err +} + +func (lg *SimpleLoadGenerator) getNumRows(vtgateConn *mysql.Conn, table string) int { + t := lg.vc.t + return getRowCount(t, vtgateConn, table) +} + +func (lg *SimpleLoadGenerator) WaitForAdditionalRows(count int) error { + t := lg.vc.t + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + numRowsStart := lg.getNumRows(vtgateConn, "parent") + shortCtx, cancel := context.WithTimeout(context.Background(), dataLoadTimeout) + defer cancel() + for { + select { + case <-shortCtx.Done(): + t.Fatalf("Timed out waiting for additional rows in %q table", "parent") + default: + numRows := lg.getNumRows(vtgateConn, "parent") + if numRows >= numRowsStart+count { + return nil + } + time.Sleep(tickInterval) + } + } +} + +func (lg *SimpleLoadGenerator) exec(query string) (*sqltypes.Result, error) { + switch lg.dbStrategy { + case "direct": + // direct is expected to be used only for unsharded keyspaces to simulate an unmanaged keyspace + // that proxies to an external database. + primary := lg.vc.getPrimaryTablet(lg.vc.t, lg.keyspace, "0") + qr, err := primary.QueryTablet(query, lg.keyspace, true) + require.NoError(lg.vc.t, err) + return qr, err + case "vtgate": + return lg.execQueryWithRetry(query) + default: + err := fmt.Errorf("invalid dbStrategy: %v", lg.dbStrategy) + return nil, err + } +} + +// When a workflow switches traffic it is possible to get transient errors from vtgate while executing queries +// due to cluster-level changes. isQueryRetryable() checks for such errors so that tests can wait for such changes +// to complete before proceeding. +func isQueryRetryable(err error) bool { + retryableErrorStrings := []string{ + "retry", + "resharded", + "VT13001", + "Lock wait timeout exceeded", + "errno 2003", + } + for _, e := range retryableErrorStrings { + if strings.Contains(err.Error(), e) { + return true + } + } + return false +} + +func (lg *SimpleLoadGenerator) execQueryWithRetry(query string) (*sqltypes.Result, error) { + ctx, cancel := context.WithTimeout(context.Background(), queryTimeout) + defer cancel() + errCh := make(chan error) + qrCh := make(chan *sqltypes.Result) + var vtgateConn *mysql.Conn + go func() { + var qr *sqltypes.Result + var err error + retry := false + for { + select { + case <-ctx.Done(): + errCh <- fmt.Errorf("query %q did not succeed before the timeout of %s", query, queryTimeout) + return + default: + } + if lg.runCtx != nil && lg.runCtx.Err() != nil { + log.Infof("Load generator run context done, query never completed: %q", query) + errCh <- fmt.Errorf("load generator stopped") + return + } + if retry { + time.Sleep(tickInterval) + } + // We need to parse the error as well as the output of vdiff to determine if the error is retryable, since + // sometimes it is observed that we get the error output as part of vdiff output. + vtgateConn, err = lg.getVtgateConn(ctx) + if err != nil { + if !isQueryRetryable(err) { + errCh <- err + return + } + time.Sleep(tickInterval) + continue + } + qr, err = vtgateConn.ExecuteFetch(query, 1000, false) + vtgateConn.Close() + if err == nil { + qrCh <- qr + return + } + if !isQueryRetryable(err) { + errCh <- err + return + } + retry = true + } + }() + select { + case qr := <-qrCh: + return qr, nil + case err := <-errCh: + log.Infof("query %q failed with error %v", query, err) + return nil, err + } +} + +func (lg *SimpleLoadGenerator) Load() error { + lg.state = LoadGeneratorStateLoading + defer func() { lg.state = LoadGeneratorStateStopped }() + log.Infof("Inserting initial FK data") + var queries = []string{ + "insert into parent values(1, 'parent1'), (2, 'parent2');", + "insert into child values(1, 1, 'child11'), (2, 1, 'child21'), (3, 2, 'child32');", + } + for _, query := range queries { + _, err := lg.exec(query) + require.NoError(lg.vc.t, err) + } + log.Infof("Done inserting initial FK data") + return nil +} + +func (lg *SimpleLoadGenerator) Start() error { + if lg.state == LoadGeneratorStateRunning { + log.Infof("Load generator already running") + return nil + } + lg.state = LoadGeneratorStateRunning + go func() { + defer func() { + lg.state = LoadGeneratorStateStopped + log.Infof("Load generator stopped") + }() + lg.runCtx, lg.runCtxCancel = context.WithCancel(lg.ctx) + defer func() { + lg.runCtx = nil + lg.runCtxCancel = nil + }() + t := lg.vc.t + var err error + log.Infof("Load generator starting") + for i := 0; ; i++ { + if i%1000 == 0 { + // Log occasionally to show that the test is still running. + log.Infof("Load simulation iteration %d", i) + } + select { + case <-lg.ctx.Done(): + log.Infof("Load generator context done") + lg.ch <- true + return + case <-lg.runCtx.Done(): + log.Infof("Load generator run context done") + lg.ch <- true + return + default: + } + op := rand.Intn(100) + switch { + case op < 50: // 50% chance to insert + lg.insert() + case op < 80: // 30% chance to update + lg.update() + default: // 20% chance to delete + lg.delete() + } + require.NoError(t, err) + time.Sleep(1 * time.Millisecond) + } + }() + return nil +} + +func (lg *SimpleLoadGenerator) Stop() error { + if lg.state == LoadGeneratorStateStopped { + log.Infof("Load generator already stopped") + return nil + } + if lg.runCtx != nil && lg.runCtxCancel != nil { + log.Infof("Canceling load generator") + lg.runCtxCancel() + } + // Wait for ch to be closed or we hit a timeout. + timeout := vdiffTimeout + select { + case <-lg.ch: + log.Infof("Load generator stopped") + lg.state = LoadGeneratorStateStopped + return nil + case <-time.After(timeout): + log.Infof("Timed out waiting for load generator to stop") + return fmt.Errorf("timed out waiting for load generator to stop") + } +} + +func (lg *SimpleLoadGenerator) Init(ctx context.Context, vc *VitessCluster) { + lg.ctx = ctx + lg.vc = vc + lg.state = LoadGeneratorStateStopped + lg.currentParentId = 100 + lg.currentChildId = 100 + lg.ch = make(chan bool) + lg.tables = []string{"parent", "child"} +} + +func (lg *SimpleLoadGenerator) Teardown() { + // noop +} + +func (lg *SimpleLoadGenerator) SetDBStrategy(strategy, keyspace string) { + lg.dbStrategy = strategy + lg.keyspace = keyspace +} + +func (lg *SimpleLoadGenerator) Keyspace() string { + return lg.keyspace +} + +func (lg *SimpleLoadGenerator) DBStrategy() string { + return lg.dbStrategy +} + +func (lg *SimpleLoadGenerator) State() string { + return lg.state +} + +func isQueryCancelled(err error) bool { + if err == nil { + return false + } + return strings.Contains(err.Error(), "load generator stopped") +} + +func (lg *SimpleLoadGenerator) insert() { + t := lg.vc.t + currentParentId++ + query := fmt.Sprintf(insertQuery, lg.keyspace, currentParentId, currentParentId) + qr, err := lg.exec(query) + if isQueryCancelled(err) { + return + } + require.NoError(t, err) + require.NotNil(t, qr) + // Insert one or more children, some with valid foreign keys, some without. + for i := 0; i < rand.Intn(4)+1; i++ { + currentChildId++ + if i == 3 && lg.overrideConstraints { + query = fmt.Sprintf(insertChildQueryOverrideConstraints, lg.keyspace, currentChildId, currentParentId+1000000) + lg.exec(query) + } else { + query = fmt.Sprintf(insertChildQuery, lg.keyspace, currentChildId, currentParentId) + lg.exec(query) + } + } +} + +func (lg *SimpleLoadGenerator) getRandomId() int64 { + t := lg.vc.t + qr, err := lg.exec(fmt.Sprintf(getRandomIdQuery, lg.keyspace)) + if isQueryCancelled(err) { + return 0 + } + require.NoError(t, err) + require.NotNil(t, qr) + if len(qr.Rows) == 0 { + return 0 + } + id, err := qr.Rows[0][0].ToInt64() + require.NoError(t, err) + return id +} + +func (lg *SimpleLoadGenerator) update() { + id := lg.getRandomId() + if id == 0 { + return + } + updateQuery := fmt.Sprintf(updateQuery, lg.keyspace, id, id) + _, err := lg.exec(updateQuery) + if isQueryCancelled(err) { + return + } + require.NoError(lg.vc.t, err) +} + +func (lg *SimpleLoadGenerator) delete() { + id := lg.getRandomId() + if id == 0 { + return + } + deleteQuery := fmt.Sprintf(deleteQuery, lg.keyspace, id) + _, err := lg.exec(deleteQuery) + if isQueryCancelled(err) { + return + } + require.NoError(lg.vc.t, err) +} + +// FIXME: following three functions are copied over from vtgate test utility functions in +// `go/test/endtoend/utils/utils.go`. +// We will to refactor and then reuse the same functionality from vtgate tests, in the near future. + +func convertToMap(input interface{}) map[string]interface{} { + output := input.(map[string]interface{}) + return output +} + +func getTableT2Map(res *interface{}, ks, tbl string) map[string]interface{} { + step1 := convertToMap(*res)["keyspaces"] + step2 := convertToMap(step1)[ks] + step3 := convertToMap(step2)["tables"] + tblMap := convertToMap(step3)[tbl] + return convertToMap(tblMap) +} + +// waitForColumn waits for a table's column to be present in the vschema because vtgate's foreign key managed mode +// expects the column to be present in the vschema before it can be used in a foreign key constraint. +func waitForColumn(t *testing.T, vtgateProcess *cluster.VtgateProcess, ks, tbl, col string) error { + timeout := time.After(defaultTimeout) + for { + select { + case <-timeout: + return fmt.Errorf("schema tracking did not find column '%s' in table '%s'", col, tbl) + default: + time.Sleep(defaultTick) + res, err := vtgateProcess.ReadVSchema() + require.NoError(t, err, res) + t2Map := getTableT2Map(res, ks, tbl) + authoritative, fieldPresent := t2Map["column_list_authoritative"] + if !fieldPresent { + break + } + authoritativeBool, isBool := authoritative.(bool) + if !isBool || !authoritativeBool { + break + } + colMap, exists := t2Map["columns"] + if !exists { + break + } + colList, isSlice := colMap.([]interface{}) + if !isSlice { + break + } + for _, c := range colList { + colDef, isMap := c.(map[string]interface{}) + if !isMap { + break + } + if colName, exists := colDef["name"]; exists && colName == col { + log.Infof("Found column '%s' in table '%s' for keyspace '%s'", col, tbl, ks) + return nil + } + } + } + } +} diff --git a/go/test/endtoend/vreplication/fk_ext_test.go b/go/test/endtoend/vreplication/fk_ext_test.go new file mode 100644 index 00000000000..665f8052486 --- /dev/null +++ b/go/test/endtoend/vreplication/fk_ext_test.go @@ -0,0 +1,450 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "context" + _ "embed" + "fmt" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/log" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" +) + +const ( + shardStatusWaitTimeout = 30 * time.Second +) + +var ( + //go:embed schema/fkext/source_schema.sql + FKExtSourceSchema string + //go:embed schema/fkext/source_vschema.json + FKExtSourceVSchema string + //go:embed schema/fkext/target1_vschema.json + FKExtTarget1VSchema string + //go:embed schema/fkext/target2_vschema.json + FKExtTarget2VSchema string + //go:embed schema/fkext/materialize_schema.sql + FKExtMaterializeSchema string +) + +type fkextConfigType struct { + *ClusterConfig + sourceKeyspaceName string + target1KeyspaceName string + target2KeyspaceName string + cell string +} + +var fkextConfig *fkextConfigType + +func initFKExtConfig(t *testing.T) { + fkextConfig = &fkextConfigType{ + ClusterConfig: mainClusterConfig, + sourceKeyspaceName: "source", + target1KeyspaceName: "target1", + target2KeyspaceName: "target2", + cell: "zone1", + } +} + +/* +TestFKExt is an end-to-end test for validating the foreign key implementation with respect to, both vreplication +flows and vtgate processing of DMLs for tables with foreign key constraints. It currently: +* Sets up a source keyspace, to simulate the external database, with a parent and child table with a foreign key constraint. +* Creates a target keyspace with two shards, the Vitess keyspace, into which the source data is imported. +* Imports the data using MoveTables. This uses the atomic copy flow +to test that we can import data with foreign key constraints and that data is kept consistent even after the copy phase +since the tables continue to have the FK Constraints. +* Creates a new keyspace with two shards, the Vitess keyspace, into which the data is migrated using MoveTables. +* Materializes the parent and child tables into a different keyspace. +* Reshards the keyspace from 2 shards to 3 shards. +* Reshards the keyspace from 3 shards to 1 shard. + +We drop constraints from the tables from some replicas to simulate a replica that is not doing cascades in +innodb, to confirm that vtgate's fkmanaged mode is working properly. +*/ + +func TestFKExt(t *testing.T) { + setSidecarDBName("_vt") + + // Ensure that there are multiple copy phase cycles per table. + extraVTTabletArgs = append(extraVTTabletArgs, "--vstream_packet_size=256", "--queryserver-config-schema-change-signal") + extraVTGateArgs = append(extraVTGateArgs, "--schema_change_signal=true", "--planner-version", "Gen4") + defer func() { extraVTTabletArgs = nil }() + initFKExtConfig(t) + + cellName := fkextConfig.cell + cells := []string{cellName} + vc = NewVitessCluster(t, t.Name(), cells, fkextConfig.ClusterConfig) + + require.NotNil(t, vc) + allCellNames = cellName + defaultCellName := cellName + defaultCell = vc.Cells[defaultCellName] + cell := vc.Cells[cellName] + + defer vc.TearDown(t) + + sourceKeyspace := fkextConfig.sourceKeyspaceName + vc.AddKeyspace(t, []*Cell{cell}, sourceKeyspace, "0", FKExtSourceVSchema, FKExtSourceSchema, 0, 0, 100, nil) + + vtgate = cell.Vtgates[0] + require.NotNil(t, vtgate) + err := cluster.WaitForHealthyShard(vc.VtctldClient, sourceKeyspace, "0") + require.NoError(t, err) + require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", sourceKeyspace, "0"), 1, shardStatusWaitTimeout)) + + vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + verifyClusterHealth(t, vc) + + lg = &SimpleLoadGenerator{} + lg.Init(context.Background(), vc) + lg.SetDBStrategy("vtgate", fkextConfig.sourceKeyspaceName) + if lg.Load() != nil { + t.Fatal("Load failed") + } + if lg.Start() != nil { + t.Fatal("Start failed") + } + t.Run("Import from external db", func(t *testing.T) { + // Import data into vitess from sourceKeyspace to target1Keyspace, both unsharded. + importIntoVitess(t) + }) + + t.Run("MoveTables from unsharded to sharded keyspace", func(t *testing.T) { + // Migrate data from target1Keyspace to the sharded target2Keyspace. Drops constraints from + // replica to simulate a replica that is not doing cascades in innodb to test vtgate's fkmanaged mode. + // The replica with dropped constraints is used as source for the next workflow called in materializeTables(). + moveKeyspace(t) + }) + + t.Run("Materialize parent and copy tables without constraints", func(t *testing.T) { + // Materialize the tables from target2Keyspace to target1Keyspace. Stream only from replicas, one + // shard with constraints dropped. + materializeTables(t) + }) + lg.SetDBStrategy("vtgate", fkextConfig.target2KeyspaceName) + if lg.Start() != nil { + t.Fatal("Start failed") + } + threeShards := "-40,40-c0,c0-" + keyspaceName := fkextConfig.target2KeyspaceName + ks := vc.Cells[fkextConfig.cell].Keyspaces[keyspaceName] + numReplicas := 1 + + t.Run("Reshard keyspace from 2 shards to 3 shards", func(t *testing.T) { + tabletID := 500 + require.NoError(t, vc.AddShards(t, []*Cell{defaultCell}, ks, threeShards, numReplicas, 0, tabletID, nil)) + tablets := make(map[string]*cluster.VttabletProcess) + for i, shard := range strings.Split(threeShards, ",") { + require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", keyspaceName, shard), numReplicas, shardStatusWaitTimeout)) + tablets[shard] = vc.Cells[cellName].Keyspaces[keyspaceName].Shards[shard].Tablets[fmt.Sprintf("%s-%d", cellName, tabletID+i*100)].Vttablet + } + sqls := strings.Split(FKExtSourceSchema, "\n") + for _, sql := range sqls { + output, err := vc.VtctlClient.ExecuteCommandWithOutput("ApplySchema", "--", + "--ddl_strategy=direct", "--sql", sql, keyspaceName) + require.NoErrorf(t, err, output) + } + doReshard(t, fkextConfig.target2KeyspaceName, "reshard2to3", "-80,80-", threeShards, tablets) + }) + t.Run("Reshard keyspace from 3 shards to 1 shard", func(t *testing.T) { + tabletID := 800 + shard := "0" + require.NoError(t, vc.AddShards(t, []*Cell{defaultCell}, ks, shard, numReplicas, 0, tabletID, nil)) + tablets := make(map[string]*cluster.VttabletProcess) + require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", keyspaceName, shard), numReplicas, shardStatusWaitTimeout)) + tablets[shard] = vc.Cells[cellName].Keyspaces[keyspaceName].Shards[shard].Tablets[fmt.Sprintf("%s-%d", cellName, tabletID)].Vttablet + sqls := strings.Split(FKExtSourceSchema, "\n") + for _, sql := range sqls { + output, err := vc.VtctlClient.ExecuteCommandWithOutput("ApplySchema", "--", + "--ddl_strategy=direct", "--sql", sql, keyspaceName) + require.NoErrorf(t, err, output) + } + doReshard(t, fkextConfig.target2KeyspaceName, "reshard3to1", threeShards, "0", tablets) + }) + lg.Stop() + waitForLowLag(t, fkextConfig.target1KeyspaceName, "mat") + t.Run("Validate materialize counts at end of test", func(t *testing.T) { + validateMaterializeRowCounts(t) + }) + +} + +// compareRowCounts compares the row counts for the parent and child tables in the source and target shards. In addition to vdiffs, +// it is another check to ensure that both tables have the same number of rows in the source and target shards after load generation +// has stopped. +func compareRowCounts(t *testing.T, keyspace string, sourceShards, targetShards []string) error { + log.Infof("Comparing row counts for keyspace %s, source shards: %v, target shards: %v", keyspace, sourceShards, targetShards) + lg.Stop() + defer lg.Start() + if err := waitForCondition("load generator to stop", func() bool { return lg.State() == LoadGeneratorStateStopped }, 10*time.Second); err != nil { + return err + } + + sourceTabs := make(map[string]*cluster.VttabletProcess) + targetTabs := make(map[string]*cluster.VttabletProcess) + for _, shard := range sourceShards { + sourceTabs[shard] = vc.getPrimaryTablet(t, keyspace, shard) + } + for _, shard := range targetShards { + targetTabs[shard] = vc.getPrimaryTablet(t, keyspace, shard) + } + + getCount := func(tab *cluster.VttabletProcess, table string) (int64, error) { + qr, err := tab.QueryTablet(fmt.Sprintf("select count(*) from %s", table), keyspace, true) + if err != nil { + return 0, err + } + return qr.Rows[0][0].ToInt64() + } + + var sourceParentCount, sourceChildCount int64 + var targetParentCount, targetChildCount int64 + for _, tab := range sourceTabs { + count, _ := getCount(tab, "parent") + sourceParentCount += count + count, _ = getCount(tab, "child") + sourceChildCount += count + } + for _, tab := range targetTabs { + count, _ := getCount(tab, "parent") + targetParentCount += count + count, _ = getCount(tab, "child") + targetChildCount += count + } + log.Infof("Source parent count: %d, child count: %d, target parent count: %d, child count: %d.", + sourceParentCount, sourceChildCount, targetParentCount, targetChildCount) + if sourceParentCount != targetParentCount || sourceChildCount != targetChildCount { + return fmt.Errorf(fmt.Sprintf("source and target row counts do not match; source parent count: %d, target parent count: %d, source child count: %d, target child count: %d", + sourceParentCount, targetParentCount, sourceChildCount, targetChildCount)) + } + return nil +} + +func doReshard(t *testing.T, keyspace, workflowName, sourceShards, targetShards string, targetTabs map[string]*cluster.VttabletProcess) { + rs := newReshard(vc, &reshardWorkflow{ + workflowInfo: &workflowInfo{ + vc: vc, + workflowName: workflowName, + targetKeyspace: keyspace, + }, + sourceShards: sourceShards, + targetShards: targetShards, + skipSchemaCopy: true, + }, workflowFlavorVtctl) + rs.Create() + waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String()) + for _, targetTab := range targetTabs { + catchup(t, targetTab, workflowName, "Reshard") + } + vdiff(t, keyspace, workflowName, fkextConfig.cell, false, true, nil) + rs.SwitchReadsAndWrites() + //if lg.WaitForAdditionalRows(100) != nil { + // t.Fatal("WaitForAdditionalRows failed") + //} + waitForLowLag(t, keyspace, workflowName+"_reverse") + if compareRowCounts(t, keyspace, strings.Split(sourceShards, ","), strings.Split(targetShards, ",")) != nil { + t.Fatal("Row counts do not match") + } + vdiff(t, keyspace, workflowName+"_reverse", fkextConfig.cell, true, false, nil) + + rs.ReverseReadsAndWrites() + //if lg.WaitForAdditionalRows(100) != nil { + // t.Fatal("WaitForAdditionalRows failed") + //} + waitForLowLag(t, keyspace, workflowName) + if compareRowCounts(t, keyspace, strings.Split(targetShards, ","), strings.Split(sourceShards, ",")) != nil { + t.Fatal("Row counts do not match") + } + vdiff(t, keyspace, workflowName, fkextConfig.cell, false, true, nil) + lg.Stop() + + rs.SwitchReadsAndWrites() + rs.Complete() +} + +func areRowCountsEqual(t *testing.T) bool { + vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + parentRowCount := getRowCount(t, vtgateConn, "target2.parent") + childRowCount := getRowCount(t, vtgateConn, "target2.child") + parentCopyRowCount := getRowCount(t, vtgateConn, "target1.parent_copy") + childCopyRowCount := getRowCount(t, vtgateConn, "target1.child_copy") + log.Infof("Post-materialize row counts are parent: %d, child: %d, parent_copy: %d, child_copy: %d", + parentRowCount, childRowCount, parentCopyRowCount, childCopyRowCount) + if parentRowCount != parentCopyRowCount || childRowCount != childCopyRowCount { + return false + } + return true +} + +// validateMaterializeRowCounts expects the Load generator to be stopped before calling it. +func validateMaterializeRowCounts(t *testing.T) { + if lg.State() != LoadGeneratorStateStopped { + t.Fatal("Load generator was unexpectedly still running when validateMaterializeRowCounts was called -- this will produce unreliable results.") + } + areRowCountsEqual2 := func() bool { + return areRowCountsEqual(t) + } + require.NoError(t, waitForCondition("row counts to be equal", areRowCountsEqual2, defaultTimeout)) +} + +const fkExtMaterializeSpec = ` +{"workflow": "%s", "source_keyspace": "%s", "target_keyspace": "%s", +"table_settings": [ {"target_table": "parent_copy", "source_expression": "select * from parent" },{"target_table": "child_copy", "source_expression": "select * from child" }], +"tablet_types": "replica"}` + +func materializeTables(t *testing.T) { + wfName := "mat" + err := vc.VtctlClient.ExecuteCommand("ApplySchema", "--", "--ddl_strategy=direct", + "--sql", FKExtMaterializeSchema, fkextConfig.target1KeyspaceName) + require.NoError(t, err, fmt.Sprintf("ApplySchema Error: %s", err)) + materializeSpec := fmt.Sprintf(fkExtMaterializeSpec, "mat", fkextConfig.target2KeyspaceName, fkextConfig.target1KeyspaceName) + err = vc.VtctlClient.ExecuteCommand("Materialize", materializeSpec) + require.NoError(t, err, "Materialize") + tab := vc.getPrimaryTablet(t, fkextConfig.target1KeyspaceName, "0") + catchup(t, tab, wfName, "Materialize") + validateMaterializeRowCounts(t) +} + +func moveKeyspace(t *testing.T) { + targetTabs := newKeyspace(t, fkextConfig.target2KeyspaceName, "-80,80-", FKExtTarget2VSchema, FKExtSourceSchema, 300, 1) + shard := "-80" + tabletId := fmt.Sprintf("%s-%d", fkextConfig.cell, 301) + replicaTab := vc.Cells[fkextConfig.cell].Keyspaces[fkextConfig.target2KeyspaceName].Shards[shard].Tablets[tabletId].Vttablet + dropReplicaConstraints(t, fkextConfig.target2KeyspaceName, replicaTab) + doMoveTables(t, fkextConfig.target1KeyspaceName, fkextConfig.target2KeyspaceName, "move", "replica", targetTabs, false) +} + +func newKeyspace(t *testing.T, keyspaceName, shards, vschema, schema string, tabletId, numReplicas int) map[string]*cluster.VttabletProcess { + tablets := make(map[string]*cluster.VttabletProcess) + cellName := fkextConfig.cell + cell := vc.Cells[fkextConfig.cell] + vc.AddKeyspace(t, []*Cell{cell}, keyspaceName, shards, vschema, schema, numReplicas, 0, tabletId, nil) + for i, shard := range strings.Split(shards, ",") { + require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", keyspaceName, shard), 1, shardStatusWaitTimeout)) + tablets[shard] = vc.Cells[cellName].Keyspaces[keyspaceName].Shards[shard].Tablets[fmt.Sprintf("%s-%d", cellName, tabletId+i*100)].Vttablet + } + err := vc.VtctldClient.ExecuteCommand("RebuildVSchemaGraph") + require.NoError(t, err) + require.NoError(t, waitForColumn(t, vtgate, keyspaceName, "parent", "id")) + require.NoError(t, waitForColumn(t, vtgate, keyspaceName, "child", "parent_id")) + return tablets +} + +func doMoveTables(t *testing.T, sourceKeyspace, targetKeyspace, workflowName, tabletTypes string, targetTabs map[string]*cluster.VttabletProcess, atomicCopy bool) { + mt := newMoveTables(vc, &moveTablesWorkflow{ + workflowInfo: &workflowInfo{ + vc: vc, + workflowName: workflowName, + targetKeyspace: targetKeyspace, + tabletTypes: tabletTypes, + }, + sourceKeyspace: sourceKeyspace, + atomicCopy: atomicCopy, + }, workflowFlavorRandom) + mt.Create() + + waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", targetKeyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String()) + + for _, targetTab := range targetTabs { + catchup(t, targetTab, workflowName, "MoveTables") + } + vdiff(t, targetKeyspace, workflowName, fkextConfig.cell, false, true, nil) + lg.Stop() + lg.SetDBStrategy("vtgate", targetKeyspace) + if lg.Start() != nil { + t.Fatal("Start failed") + } + + mt.SwitchReadsAndWrites() + + if lg.WaitForAdditionalRows(100) != nil { + t.Fatal("WaitForAdditionalRows failed") + } + + waitForLowLag(t, sourceKeyspace, workflowName+"_reverse") + vdiff(t, sourceKeyspace, workflowName+"_reverse", fkextConfig.cell, false, true, nil) + if lg.WaitForAdditionalRows(100) != nil { + t.Fatal("WaitForAdditionalRows failed") + } + + mt.ReverseReadsAndWrites() + if lg.WaitForAdditionalRows(100) != nil { + t.Fatal("WaitForAdditionalRows failed") + } + waitForLowLag(t, targetKeyspace, workflowName) + time.Sleep(5 * time.Second) + vdiff(t, targetKeyspace, workflowName, fkextConfig.cell, false, true, nil) + lg.Stop() + mt.SwitchReadsAndWrites() + mt.Complete() + if err := vc.VtctldClient.ExecuteCommand("ApplyRoutingRules", "--rules={}"); err != nil { + t.Fatal(err) + } +} + +func importIntoVitess(t *testing.T) { + targetTabs := newKeyspace(t, fkextConfig.target1KeyspaceName, "0", FKExtTarget1VSchema, FKExtSourceSchema, 200, 1) + doMoveTables(t, fkextConfig.sourceKeyspaceName, fkextConfig.target1KeyspaceName, "import", "primary", targetTabs, true) +} + +const getConstraintsQuery = ` +SELECT CONSTRAINT_NAME, TABLE_NAME +FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE +WHERE TABLE_SCHEMA = '%s' AND REFERENCED_TABLE_NAME IS NOT NULL; +` + +// dropReplicaConstraints drops all foreign key constraints on replica tables for a given keyspace/shard. +// We do this so that we can replay binlogs from a replica which is not doing cascades but just replaying +// the binlogs created by the primary. This will confirm that vtgate is doing the cascades correctly. +func dropReplicaConstraints(t *testing.T, keyspaceName string, tablet *cluster.VttabletProcess) { + var dropConstraints []string + require.Equal(t, "replica", strings.ToLower(tablet.TabletType)) + dbName := "vt_" + keyspaceName + qr, err := tablet.QueryTablet(fmt.Sprintf(getConstraintsQuery, dbName), keyspaceName, true) + if err != nil { + t.Fatal(err) + } + for _, row := range qr.Rows { + constraintName := row[0].ToString() + tableName := row[1].ToString() + dropConstraints = append(dropConstraints, fmt.Sprintf("ALTER TABLE `%s`.`%s` DROP FOREIGN KEY `%s`", + dbName, tableName, constraintName)) + } + prefixQueries := []string{ + "set sql_log_bin=0", + "SET @@global.super_read_only=0", + } + suffixQueries := []string{ + "SET @@global.super_read_only=1", + "set sql_log_bin=1", + } + queries := append(prefixQueries, dropConstraints...) + queries = append(queries, suffixQueries...) + require.NoError(t, tablet.QueryTabletMultiple(queries, keyspaceName, true)) +} diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go index 31886864f11..06e94e0222d 100644 --- a/go/test/endtoend/vreplication/fk_test.go +++ b/go/test/endtoend/vreplication/fk_test.go @@ -94,12 +94,15 @@ func TestFKWorkflow(t *testing.T) { workflowName := "fk" ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName) - mt := newMoveTables(vc, &moveTables{ - workflowName: workflowName, - targetKeyspace: targetKeyspace, + mt := newMoveTables(vc, &moveTablesWorkflow{ + workflowInfo: &workflowInfo{ + vc: vc, + workflowName: workflowName, + targetKeyspace: targetKeyspace, + }, sourceKeyspace: sourceKeyspace, atomicCopy: true, - }, moveTablesFlavorRandom) + }, workflowFlavorRandom) mt.Create() waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index d2154f13f1f..07c12caf194 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -24,6 +24,7 @@ import ( "fmt" "io" "net/http" + "os" "os/exec" "regexp" "sort" @@ -56,6 +57,11 @@ const ( workflowStateTimeout = 90 * time.Second ) +func setSidecarDBName(dbName string) { + sidecarDBName = dbName + sidecarDBIdentifier = sqlparser.String(sqlparser.NewIdentifierCS(sidecarDBName)) +} + func execMultipleQueries(t *testing.T, conn *mysql.Conn, database string, lines string) { queries := strings.Split(lines, "\n") for _, query := range queries { @@ -65,8 +71,35 @@ func execMultipleQueries(t *testing.T, conn *mysql.Conn, database string, lines execVtgateQuery(t, conn, database, string(query)) } } + +func execQueryWithRetry(t *testing.T, conn *mysql.Conn, query string, timeout time.Duration) *sqltypes.Result { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ticker := time.NewTicker(defaultTick) + defer ticker.Stop() + + var qr *sqltypes.Result + var err error + for { + qr, err = conn.ExecuteFetch(query, 1000, false) + if err == nil { + return qr + } + select { + case <-ctx.Done(): + require.FailNow(t, fmt.Sprintf("query %q did not succeed before the timeout of %s; last seen result: %v", + query, timeout, qr.Rows)) + case <-ticker.C: + log.Infof("query %q failed with error %v, retrying in %ds", query, err, defaultTick) + } + } +} + func execQuery(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { qr, err := conn.ExecuteFetch(query, 1000, false) + if err != nil { + log.Errorf("Error executing query: %s: %v", query, err) + } require.NoError(t, err) return qr } @@ -79,7 +112,7 @@ func getConnection(t *testing.T, hostname string, port int) *mysql.Conn { } ctx := context.Background() conn, err := mysql.Connect(ctx, &vtParams) - require.NoError(t, err) + require.NoErrorf(t, err, "error connecting to vtgate on %s:%d", hostname, port) return conn } @@ -96,6 +129,19 @@ func execVtgateQuery(t *testing.T, conn *mysql.Conn, database string, query stri return qr } +func execVtgateQueryWithRetry(t *testing.T, conn *mysql.Conn, database string, query string, timeout time.Duration) *sqltypes.Result { + if strings.TrimSpace(query) == "" { + return nil + } + if database != "" { + execQuery(t, conn, "use `"+database+"`;") + } + execQuery(t, conn, "begin") + qr := execQueryWithRetry(t, conn, query, timeout) + execQuery(t, conn, "commit") + return qr +} + func checkHealth(t *testing.T, url string) bool { resp, err := http.Get(url) require.NoError(t, err) @@ -703,6 +749,13 @@ func isBinlogRowImageNoBlob(t *testing.T, tablet *cluster.VttabletProcess) bool return mode == "noblob" } +func getRowCount(t *testing.T, vtgateConn *mysql.Conn, table string) int { + query := fmt.Sprintf("select count(*) from %s", table) + qr := execVtgateQuery(t, vtgateConn, "", query) + numRows, _ := qr.Rows[0][0].ToInt() + return numRows +} + const ( loadTestBufferingWindowDurationStr = "30s" loadTestPostBufferingInsertWindow = 60 * time.Second // should be greater than loadTestBufferingWindowDurationStr @@ -820,3 +873,37 @@ func (lg *loadGenerator) waitForCount(want int64) { } } } + +// appendToQueryLog is useful when debugging tests. +func appendToQueryLog(msg string) { + file, err := os.OpenFile(queryLog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Errorf("Error opening query log file: %v", err) + return + } + defer file.Close() + if _, err := file.WriteString(msg + "\n"); err != nil { + log.Errorf("Error writing to query log file: %v", err) + } +} + +func waitForCondition(name string, condition func() bool, timeout time.Duration) error { + if condition() { + return nil + } + + ticker := time.NewTicker(tickInterval) + defer ticker.Stop() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + for { + select { + case <-ticker.C: + if condition() { + return nil + } + case <-ctx.Done(): + return fmt.Errorf("%s: waiting for %s", ctx.Err(), name) + } + } +} diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go index 4e4b7cada97..113587a1669 100644 --- a/go/test/endtoend/vreplication/movetables_buffering_test.go +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -21,7 +21,7 @@ func TestMoveTablesBuffering(t *testing.T) { setupMinimalCustomerKeyspace(t) tables := "loadtest" err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs, - tables, workflowActionCreate, "", "", "", false) + tables, workflowActionCreate, "", "", "", defaultWorkflowExecOptions) require.NoError(t, err) waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) diff --git a/go/test/endtoend/vreplication/partial_movetables_seq_test.go b/go/test/endtoend/vreplication/partial_movetables_seq_test.go index 6a1ed92cb9c..f8dc440b62d 100644 --- a/go/test/endtoend/vreplication/partial_movetables_seq_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_seq_test.go @@ -239,7 +239,7 @@ func (wf *workflow) create() { currentWorkflowType = wrangler.MoveTablesWorkflow sourceShards := strings.Join(wf.options.sourceShards, ",") err = tstWorkflowExec(t, cell, wf.name, wf.fromKeyspace, wf.toKeyspace, - strings.Join(wf.options.tables, ","), workflowActionCreate, "", sourceShards, "", false) + strings.Join(wf.options.tables, ","), workflowActionCreate, "", sourceShards, "", defaultWorkflowExecOptions) case "reshard": currentWorkflowType = wrangler.ReshardWorkflow sourceShards := strings.Join(wf.options.sourceShards, ",") @@ -248,7 +248,7 @@ func (wf *workflow) create() { targetShards = sourceShards } err = tstWorkflowExec(t, cell, wf.name, wf.fromKeyspace, wf.toKeyspace, - strings.Join(wf.options.tables, ","), workflowActionCreate, "", sourceShards, targetShards, false) + strings.Join(wf.options.tables, ","), workflowActionCreate, "", sourceShards, targetShards, defaultWorkflowExecOptions) default: panic(fmt.Sprintf("unknown workflow type: %s", wf.typ)) } @@ -266,15 +266,15 @@ func (wf *workflow) create() { } func (wf *workflow) switchTraffic() { - require.NoError(wf.tc.t, tstWorkflowExec(wf.tc.t, wf.tc.defaultCellName, wf.name, wf.fromKeyspace, wf.toKeyspace, "", workflowActionSwitchTraffic, "", "", "", false)) + require.NoError(wf.tc.t, tstWorkflowExec(wf.tc.t, wf.tc.defaultCellName, wf.name, wf.fromKeyspace, wf.toKeyspace, "", workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions)) } func (wf *workflow) reverseTraffic() { - require.NoError(wf.tc.t, tstWorkflowExec(wf.tc.t, wf.tc.defaultCellName, wf.name, wf.fromKeyspace, wf.toKeyspace, "", workflowActionReverseTraffic, "", "", "", false)) + require.NoError(wf.tc.t, tstWorkflowExec(wf.tc.t, wf.tc.defaultCellName, wf.name, wf.fromKeyspace, wf.toKeyspace, "", workflowActionReverseTraffic, "", "", "", defaultWorkflowExecOptions)) } func (wf *workflow) complete() { - require.NoError(wf.tc.t, tstWorkflowExec(wf.tc.t, wf.tc.defaultCellName, wf.name, wf.fromKeyspace, wf.toKeyspace, "", workflowActionComplete, "", "", "", false)) + require.NoError(wf.tc.t, tstWorkflowExec(wf.tc.t, wf.tc.defaultCellName, wf.name, wf.fromKeyspace, wf.toKeyspace, "", workflowActionComplete, "", "", "", defaultWorkflowExecOptions)) } // TestPartialMoveTablesWithSequences enhances TestPartialMoveTables by adding an unsharded keyspace which has a @@ -505,7 +505,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) { // We switched traffic, so it's the reverse workflow we want to cancel. reverseWf := wf + "_reverse" reverseKs := sourceKs // customer - err = tstWorkflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", false) + err = tstWorkflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", defaultWorkflowExecOptions) require.NoError(t, err) output, err := tc.vc.VtctlClient.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", reverseKs, reverseWf), "show") diff --git a/go/test/endtoend/vreplication/partial_movetables_test.go b/go/test/endtoend/vreplication/partial_movetables_test.go index 5583232fbdc..d9573b50e4a 100644 --- a/go/test/endtoend/vreplication/partial_movetables_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_test.go @@ -44,13 +44,16 @@ func testCancel(t *testing.T) { table := "customer2" shard := "80-" // start the partial movetables for 80- - mt := newMoveTables(vc, &moveTables{ - workflowName: workflowName, - targetKeyspace: targetKeyspace, + mt := newMoveTables(vc, &moveTablesWorkflow{ + workflowInfo: &workflowInfo{ + vc: vc, + workflowName: workflowName, + targetKeyspace: targetKeyspace, + }, sourceKeyspace: sourceKeyspace, tables: table, sourceShards: shard, - }, moveTablesFlavorRandom) + }, workflowFlavorRandom) mt.Create() checkDenyList := func(keyspace string, expected bool) { @@ -141,7 +144,7 @@ func TestPartialMoveTablesBasic(t *testing.T) { // start the partial movetables for 80- err := tstWorkflowExec(t, defaultCellName, wfName, sourceKs, targetKs, - "customer,loadtest", workflowActionCreate, "", shard, "", false) + "customer,loadtest", workflowActionCreate, "", shard, "", defaultWorkflowExecOptions) require.NoError(t, err) var lg *loadGenerator if runWithLoad { // start load after routing rules are set, otherwise we end up with ambiguous tables @@ -214,7 +217,7 @@ func TestPartialMoveTablesBasic(t *testing.T) { require.Contains(t, err.Error(), "target: customer.-80.primary", "Query was routed to the target before any SwitchTraffic") // Switch all traffic for the shard - require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "", false)) + require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions)) expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n", targetKs, wfName, shard, shard) require.Equal(t, expectedSwitchOutput, lastOutput) @@ -272,7 +275,7 @@ func TestPartialMoveTablesBasic(t *testing.T) { // We cannot Complete a partial move tables at the moment because // it will find that all traffic has (obviously) not been switched. - err = tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionComplete, "", "", "", false) + err = tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionComplete, "", "", "", defaultWorkflowExecOptions) require.Error(t, err) // Confirm global routing rules: -80 should still be be routed to customer @@ -285,14 +288,14 @@ func TestPartialMoveTablesBasic(t *testing.T) { ksWf = fmt.Sprintf("%s.%s", targetKs, wfName) // Start the partial movetables for -80, 80- has already been switched err = tstWorkflowExec(t, defaultCellName, wfName, sourceKs, targetKs, - "customer,loadtest", workflowActionCreate, "", shard, "", false) + "customer,loadtest", workflowActionCreate, "", shard, "", defaultWorkflowExecOptions) require.NoError(t, err) targetTab2 := vc.getPrimaryTablet(t, targetKs, shard) catchup(t, targetTab2, wfName, "Partial MoveTables Customer to Customer2: -80") vdiffSideBySide(t, ksWf, "") // Switch all traffic for the shard - require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "", false)) + require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions)) expectedSwitchOutput = fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n", targetKs, wfName) require.Equal(t, expectedSwitchOutput, lastOutput) @@ -313,7 +316,7 @@ func TestPartialMoveTablesBasic(t *testing.T) { // We switched traffic, so it's the reverse workflow we want to cancel. reverseWf := wf + "_reverse" reverseKs := sourceKs // customer - err = tstWorkflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", false) + err = tstWorkflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", defaultWorkflowExecOptions) require.NoError(t, err) output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", reverseKs, reverseWf), "show") diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 338310fdf14..401147a3887 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -61,9 +61,18 @@ var ( currentWorkflowType wrangler.VReplicationWorkflowType ) +type workflowExecOptions struct { + deferSecondaryKeys bool + atomicCopy bool +} + +var defaultWorkflowExecOptions = &workflowExecOptions{ + deferSecondaryKeys: true, +} + func createReshardWorkflow(t *testing.T, sourceShards, targetShards string) error { err := tstWorkflowExec(t, defaultCellName, workflowName, targetKs, targetKs, - "", workflowActionCreate, "", sourceShards, targetShards, false) + "", workflowActionCreate, "", sourceShards, targetShards, defaultWorkflowExecOptions) require.NoError(t, err) waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) confirmTablesHaveSecondaryKeys(t, []*cluster.VttabletProcess{targetTab1}, targetKs, "") @@ -78,7 +87,7 @@ func createMoveTablesWorkflow(t *testing.T, tables string) { tables = tablesToMove } err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs, - tables, workflowActionCreate, "", "", "", false) + tables, workflowActionCreate, "", "", "", defaultWorkflowExecOptions) require.NoError(t, err) waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) confirmTablesHaveSecondaryKeys(t, []*cluster.VttabletProcess{targetTab1}, targetKs, tables) @@ -88,10 +97,12 @@ func createMoveTablesWorkflow(t *testing.T, tables string) { } func tstWorkflowAction(t *testing.T, action, tabletTypes, cells string) error { - return tstWorkflowExec(t, cells, workflowName, sourceKs, targetKs, tablesToMove, action, tabletTypes, "", "", false) + return tstWorkflowExec(t, cells, workflowName, sourceKs, targetKs, tablesToMove, action, tabletTypes, "", "", defaultWorkflowExecOptions) } -func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes, sourceShards, targetShards string, atomicCopy bool) error { +func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes, + sourceShards, targetShards string, options *workflowExecOptions) error { + var args []string if currentWorkflowType == wrangler.MoveTablesWorkflow { args = append(args, "MoveTables") @@ -104,7 +115,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, if BypassLagCheck { args = append(args, "--max_replication_lag_allowed=2542087h") } - if atomicCopy { + if options.atomicCopy { args = append(args, "--atomic-copy") } switch action { @@ -125,7 +136,8 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, // Test new experimental --defer-secondary-keys flag switch currentWorkflowType { case wrangler.MoveTablesWorkflow, wrangler.MigrateWorkflow, wrangler.ReshardWorkflow: - if !atomicCopy { + + if !options.atomicCopy && options.deferSecondaryKeys { args = append(args, "--defer-secondary-keys") } args = append(args, "--initialize-target-sequences") // Only used for MoveTables @@ -317,17 +329,17 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) { // use MoveTables to move customer2 from product to customer using currentWorkflowType = wrangler.MoveTablesWorkflow err := tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs, - "customer2", workflowActionCreate, "", "", "", false) + "customer2", workflowActionCreate, "", "", "", defaultWorkflowExecOptions) require.NoError(t, err) waitForWorkflowState(t, vc, "customer.wf2", binlogdatapb.VReplicationWorkflowState_Running.String()) waitForLowLag(t, "customer", "wf2") err = tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs, - "", workflowActionSwitchTraffic, "", "", "", false) + "", workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions) require.NoError(t, err) err = tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs, - "", workflowActionComplete, "", "", "", false) + "", workflowActionComplete, "", "", "", defaultWorkflowExecOptions) require.NoError(t, err) // sanity check @@ -352,16 +364,16 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) { // use MoveTables to move customer2 back to product. Note that now the table has an associated sequence err = tstWorkflowExec(t, defaultCellName, "wf3", targetKs, sourceKs, - "customer2", workflowActionCreate, "", "", "", false) + "customer2", workflowActionCreate, "", "", "", defaultWorkflowExecOptions) require.NoError(t, err) waitForWorkflowState(t, vc, "product.wf3", binlogdatapb.VReplicationWorkflowState_Running.String()) waitForLowLag(t, "product", "wf3") err = tstWorkflowExec(t, defaultCellName, "wf3", targetKs, sourceKs, - "", workflowActionSwitchTraffic, "", "", "", false) + "", workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions) require.NoError(t, err) err = tstWorkflowExec(t, defaultCellName, "wf3", targetKs, sourceKs, - "", workflowActionComplete, "", "", "", false) + "", workflowActionComplete, "", "", "", defaultWorkflowExecOptions) require.NoError(t, err) // sanity check diff --git a/go/test/endtoend/vreplication/schema/fkext/materialize_schema.sql b/go/test/endtoend/vreplication/schema/fkext/materialize_schema.sql new file mode 100644 index 00000000000..6af8ca99b94 --- /dev/null +++ b/go/test/endtoend/vreplication/schema/fkext/materialize_schema.sql @@ -0,0 +1,2 @@ +create table parent_copy(id int, name varchar(128), primary key(id)) engine=innodb; +create table child_copy(id int, parent_id int, name varchar(128), primary key(id)) engine=innodb; \ No newline at end of file diff --git a/go/test/endtoend/vreplication/schema/fkext/source_schema.sql b/go/test/endtoend/vreplication/schema/fkext/source_schema.sql new file mode 100644 index 00000000000..01b788338b6 --- /dev/null +++ b/go/test/endtoend/vreplication/schema/fkext/source_schema.sql @@ -0,0 +1,2 @@ +create table if not exists parent(id int, name varchar(128), primary key(id)) engine=innodb; +create table if not exists child(id int, parent_id int, name varchar(128), primary key(id), foreign key(parent_id) references parent(id) on delete cascade) engine=innodb; \ No newline at end of file diff --git a/go/test/endtoend/vreplication/schema/fkext/source_vschema.json b/go/test/endtoend/vreplication/schema/fkext/source_vschema.json new file mode 100644 index 00000000000..01cde0d643d --- /dev/null +++ b/go/test/endtoend/vreplication/schema/fkext/source_vschema.json @@ -0,0 +1,6 @@ +{ + "tables": { + "parent": {}, + "child": {} + } +} diff --git a/go/test/endtoend/vreplication/schema/fkext/target1_vschema.json b/go/test/endtoend/vreplication/schema/fkext/target1_vschema.json new file mode 100644 index 00000000000..dc89232fbbb --- /dev/null +++ b/go/test/endtoend/vreplication/schema/fkext/target1_vschema.json @@ -0,0 +1,28 @@ +{ + "sharded": false, + "foreignKeyMode": "managed", + "vindexes": { + "reverse_bits": { + "type": "reverse_bits" + } + }, + "tables": { + "parent": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] + }, + "child": { + "column_vindexes": [ + { + "column": "parent_id", + "name": "reverse_bits" + } + ] + } + + } +} diff --git a/go/test/endtoend/vreplication/schema/fkext/target2_vschema.json b/go/test/endtoend/vreplication/schema/fkext/target2_vschema.json new file mode 100644 index 00000000000..06e851a9007 --- /dev/null +++ b/go/test/endtoend/vreplication/schema/fkext/target2_vschema.json @@ -0,0 +1,43 @@ +{ + "sharded": true, + "foreignKeyMode": "managed", + "vindexes": { + "reverse_bits": { + "type": "reverse_bits" + } + }, + "tables": { + "parent": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] + }, + "child": { + "column_vindexes": [ + { + "column": "parent_id", + "name": "reverse_bits" + } + ] + }, + "parent_copy": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] + }, + "child_copy": { + "column_vindexes": [ + { + "column": "parent_id", + "name": "reverse_bits" + } + ] + } + } +} diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index 72b09e8fede..eb96985af57 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -323,6 +323,7 @@ func testResume(t *testing.T, tc *testCase, cells string) { // expected number of rows in total (original run and resume) _, _ = performVDiff2Action(t, false, ksWorkflow, cells, "resume", uuid, false) info := waitForVDiff2ToComplete(t, false, ksWorkflow, cells, uuid, ogTime) + require.NotNil(t, info) require.False(t, info.HasMismatch) require.Equal(t, expectedRows, info.RowsCompared) }) @@ -375,6 +376,7 @@ func testAutoRetryError(t *testing.T, tc *testCase, cells string) { // confirm that the VDiff was retried, able to complete, and we compared the expected // number of rows in total (original run and retry) info := waitForVDiff2ToComplete(t, false, ksWorkflow, cells, uuid, ogTime) + require.NotNil(t, info) require.False(t, info.HasMismatch) require.Equal(t, expectedRows, info.RowsCompared) }) diff --git a/go/test/endtoend/vreplication/vdiff_helper_test.go b/go/test/endtoend/vreplication/vdiff_helper_test.go index 38ae9273a42..7dbc675886b 100644 --- a/go/test/endtoend/vreplication/vdiff_helper_test.go +++ b/go/test/endtoend/vreplication/vdiff_helper_test.go @@ -17,6 +17,7 @@ limitations under the License. package vreplication import ( + "context" "fmt" "strings" "testing" @@ -31,7 +32,10 @@ import ( ) const ( - vdiffTimeout = time.Second * 90 // we can leverage auto retry on error with this longer-than-usual timeout + vdiffTimeout = 90 * time.Second // we can leverage auto retry on error with this longer-than-usual timeout + vdiffRetryTimeout = 30 * time.Second + vdiffStatusCheckInterval = 1 * time.Second + vdiffRetryInterval = 5 * time.Second ) var ( @@ -66,6 +70,7 @@ func doVtctlclientVDiff(t *testing.T, keyspace, workflow, cells string, want *ex // update-table-stats is needed in order to test progress reports. uuid, _ := performVDiff2Action(t, true, ksWorkflow, cells, "create", "", false, "--auto-retry", "--update-table-stats") info := waitForVDiff2ToComplete(t, true, ksWorkflow, cells, uuid, time.Time{}) + require.NotNil(t, info) require.Equal(t, workflow, info.Workflow) require.Equal(t, keyspace, info.Keyspace) if want != nil { @@ -90,9 +95,10 @@ func waitForVDiff2ToComplete(t *testing.T, useVtctlclient bool, ksWorkflow, cell ch := make(chan bool) go func() { for { - time.Sleep(1 * time.Second) + time.Sleep(vdiffStatusCheckInterval) _, jsonStr := performVDiff2Action(t, useVtctlclient, ksWorkflow, cells, "show", uuid, false) info = getVDiffInfo(jsonStr) + require.NotNil(t, info) if info.State == "completed" { if !completedAtMin.IsZero() { ca := info.CompletedAt @@ -136,6 +142,7 @@ func waitForVDiff2ToComplete(t *testing.T, useVtctlclient bool, ksWorkflow, cell case <-ch: return info case <-time.After(vdiffTimeout): + log.Errorf("VDiff never completed for UUID %s", uuid) require.FailNow(t, fmt.Sprintf("VDiff never completed for UUID %s", uuid)) return nil } @@ -153,7 +160,7 @@ func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *e // update-table-stats is needed in order to test progress reports. uuid, _ := performVDiff2Action(t, false, ksWorkflow, cells, "create", "", false, "--auto-retry", "--update-table-stats") info := waitForVDiff2ToComplete(t, false, ksWorkflow, cells, uuid, time.Time{}) - + require.NotNil(t, info) require.Equal(t, workflow, info.Workflow) require.Equal(t, keyspace, info.Keyspace) if want != nil { @@ -186,7 +193,7 @@ func performVDiff2Action(t *testing.T, useVtctlclient bool, ksWorkflow, cells, a args = append(args, extraFlags...) } args = append(args, ksWorkflow, action, actionArg) - output, err = vc.VtctlClient.ExecuteCommandWithOutput(args...) + output, err = execVDiffWithRetry(t, expectError, false, args) log.Infof("vdiff output: %+v (err: %+v)", output, err) if !expectError { require.Nil(t, err) @@ -211,7 +218,8 @@ func performVDiff2Action(t *testing.T, useVtctlclient bool, ksWorkflow, cells, a if actionArg != "" { args = append(args, actionArg) } - output, err = vc.VtctldClient.ExecuteCommandWithOutput(args...) + + output, err = execVDiffWithRetry(t, expectError, true, args) log.Infof("vdiff output: %+v (err: %+v)", output, err) if !expectError { require.NoError(t, err) @@ -226,6 +234,79 @@ func performVDiff2Action(t *testing.T, useVtctlclient bool, ksWorkflow, cells, a return uuid, output } +// During SwitchTraffic, due to changes in the cluster, vdiff can return transient errors. isVDiffRetryable() is used to +// ignore such errors and retry vdiff expecting the condition to be resolved. +func isVDiffRetryable(str string) bool { + for _, s := range []string{"Error while dialing", "failed to connect"} { + if strings.Contains(str, s) { + return true + } + } + return false +} + +type vdiffResult struct { + output string + err error +} + +// execVDiffWithRetry will ignore transient errors that can occur during workflow state changes. +func execVDiffWithRetry(t *testing.T, expectError bool, useVtctldClient bool, args []string) (string, error) { + ctx, cancel := context.WithTimeout(context.Background(), vdiffRetryTimeout) + defer cancel() + vdiffResultCh := make(chan vdiffResult) + go func() { + var output string + var err error + retry := false + for { + select { + case <-ctx.Done(): + return + default: + } + if retry { + time.Sleep(vdiffRetryInterval) + } + retry = false + if useVtctldClient { + output, err = vc.VtctldClient.ExecuteCommandWithOutput(args...) + } else { + output, err = vc.VtctlClient.ExecuteCommandWithOutput(args...) + } + if err != nil { + if expectError { + result := vdiffResult{output: output, err: err} + vdiffResultCh <- result + return + } + log.Infof("vdiff error: %s", err) + if isVDiffRetryable(err.Error()) { + retry = true + } else { + result := vdiffResult{output: output, err: err} + vdiffResultCh <- result + return + } + } + if isVDiffRetryable(output) { + retry = true + } + if !retry { + result := vdiffResult{output: output, err: nil} + vdiffResultCh <- result + return + } + } + }() + select { + case <-ctx.Done(): + return "", fmt.Errorf("timed out waiting for vdiff to complete") + case result := <-vdiffResultCh: + return result.output, result.err + } +} + type vdiffInfo struct { Workflow, Keyspace string State, Shards string diff --git a/go/test/endtoend/vreplication/vdiff_multiple_movetables_test.go b/go/test/endtoend/vreplication/vdiff_multiple_movetables_test.go index 0f6a9f668d0..434ea6db3e0 100644 --- a/go/test/endtoend/vreplication/vdiff_multiple_movetables_test.go +++ b/go/test/endtoend/vreplication/vdiff_multiple_movetables_test.go @@ -93,12 +93,16 @@ func TestMultipleConcurrentVDiffs(t *testing.T) { time.Sleep(15 * time.Second) // wait for some rows to be inserted. createWorkflow := func(workflowName, tables string) { - mt := newMoveTables(vc, &moveTables{ - workflowName: workflowName, - targetKeyspace: targetKeyspace, + mt := newMoveTables(vc, &moveTablesWorkflow{ + workflowInfo: &workflowInfo{ + vc: vc, + workflowName: workflowName, + targetKeyspace: targetKeyspace, + tabletTypes: "primary", + }, sourceKeyspace: sourceKeyspace, tables: tables, - }, moveTablesFlavorVtctld) + }, workflowFlavorVtctld) mt.Create() waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", targetKeyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String()) catchup(t, targetTab, workflowName, "MoveTables") diff --git a/go/test/endtoend/vreplication/wrappers_test.go b/go/test/endtoend/vreplication/wrappers_test.go index 6bd0bbb19d8..76a4e627b1b 100644 --- a/go/test/endtoend/vreplication/wrappers_test.go +++ b/go/test/endtoend/vreplication/wrappers_test.go @@ -20,28 +20,49 @@ import ( "math/rand" "strconv" + "vitess.io/vitess/go/vt/wrangler" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/log" ) -type moveTablesFlavor int +type iWorkflow interface { + Create() + Show() + SwitchReads() + SwitchWrites() + SwitchReadsAndWrites() + ReverseReadsAndWrites() + Cancel() + Complete() + Flavor() string +} + +type workflowFlavor int const ( - moveTablesFlavorRandom moveTablesFlavor = iota - moveTablesFlavorVtctl - moveTablesFlavorVtctld + workflowFlavorRandom workflowFlavor = iota + workflowFlavorVtctl + workflowFlavorVtctld ) -var moveTablesFlavors = []moveTablesFlavor{ - moveTablesFlavorVtctl, - moveTablesFlavorVtctld, +var workflowFlavors = []workflowFlavor{ + workflowFlavorVtctl, + workflowFlavorVtctld, } -type moveTables struct { +type workflowInfo struct { vc *VitessCluster workflowName string targetKeyspace string + tabletTypes string +} + +// MoveTables wrappers + +type moveTablesWorkflow struct { + *workflowInfo sourceKeyspace string tables string atomicCopy bool @@ -49,27 +70,19 @@ type moveTables struct { } type iMoveTables interface { - Create() - Show() - SwitchReads() - SwitchWrites() - SwitchReadsAndWrites() - ReverseReadsAndWrites() - Cancel() - Complete() - Flavor() string + iWorkflow } -func newMoveTables(vc *VitessCluster, mt *moveTables, flavor moveTablesFlavor) iMoveTables { +func newMoveTables(vc *VitessCluster, mt *moveTablesWorkflow, flavor workflowFlavor) iMoveTables { mt.vc = vc var mt2 iMoveTables - if flavor == moveTablesFlavorRandom { - flavor = moveTablesFlavors[rand.Intn(len(moveTablesFlavors))] + if flavor == workflowFlavorRandom { + flavor = workflowFlavors[rand.Intn(len(workflowFlavors))] } switch flavor { - case moveTablesFlavorVtctl: + case workflowFlavorVtctl: mt2 = newVtctlMoveTables(mt) - case moveTablesFlavorVtctld: + case workflowFlavorVtctld: mt2 = newVtctldMoveTables(mt) default: panic("unreachable") @@ -79,33 +92,31 @@ func newMoveTables(vc *VitessCluster, mt *moveTables, flavor moveTablesFlavor) i } type VtctlMoveTables struct { - *moveTables + *moveTablesWorkflow } func (vmt *VtctlMoveTables) Flavor() string { return "vtctl" } -func newVtctlMoveTables(mt *moveTables) *VtctlMoveTables { +func newVtctlMoveTables(mt *moveTablesWorkflow) *VtctlMoveTables { return &VtctlMoveTables{mt} } func (vmt *VtctlMoveTables) Create() { - log.Infof("vmt is %+v", vmt.vc, vmt.tables) - err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, - vmt.tables, workflowActionCreate, "", vmt.sourceShards, "", vmt.atomicCopy) - require.NoError(vmt.vc.t, err) + currentWorkflowType = wrangler.MoveTablesWorkflow + vmt.exec(workflowActionCreate) } func (vmt *VtctlMoveTables) SwitchReadsAndWrites() { err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, - vmt.tables, workflowActionSwitchTraffic, "", "", "", vmt.atomicCopy) + vmt.tables, workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions) require.NoError(vmt.vc.t, err) } func (vmt *VtctlMoveTables) ReverseReadsAndWrites() { err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, - vmt.tables, workflowActionReverseTraffic, "", "", "", vmt.atomicCopy) + vmt.tables, workflowActionReverseTraffic, "", "", "", defaultWorkflowExecOptions) require.NoError(vmt.vc.t, err) } @@ -114,6 +125,15 @@ func (vmt *VtctlMoveTables) Show() { panic("implement me") } +func (vmt *VtctlMoveTables) exec(action string) { + options := &workflowExecOptions{ + deferSecondaryKeys: false, + atomicCopy: vmt.atomicCopy, + } + err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + vmt.tables, action, vmt.tabletTypes, vmt.sourceShards, "", options) + require.NoError(vmt.vc.t, err) +} func (vmt *VtctlMoveTables) SwitchReads() { //TODO implement me panic("implement me") @@ -125,23 +145,20 @@ func (vmt *VtctlMoveTables) SwitchWrites() { } func (vmt *VtctlMoveTables) Cancel() { - err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, - vmt.tables, workflowActionCancel, "", "", "", vmt.atomicCopy) - require.NoError(vmt.vc.t, err) + vmt.exec(workflowActionCancel) } func (vmt *VtctlMoveTables) Complete() { - //TODO implement me - panic("implement me") + vmt.exec(workflowActionComplete) } var _ iMoveTables = (*VtctldMoveTables)(nil) type VtctldMoveTables struct { - *moveTables + *moveTablesWorkflow } -func newVtctldMoveTables(mt *moveTables) *VtctldMoveTables { +func newVtctldMoveTables(mt *moveTablesWorkflow) *VtctldMoveTables { return &VtctldMoveTables{mt} } @@ -201,6 +218,158 @@ func (v VtctldMoveTables) Cancel() { } func (v VtctldMoveTables) Complete() { + v.exec("Complete") +} + +// Reshard wrappers + +type reshardWorkflow struct { + *workflowInfo + sourceShards string + targetShards string + skipSchemaCopy bool +} + +type iReshard interface { + iWorkflow +} + +func newReshard(vc *VitessCluster, rs *reshardWorkflow, flavor workflowFlavor) iReshard { + rs.vc = vc + var rs2 iReshard + if flavor == workflowFlavorRandom { + flavor = workflowFlavors[rand.Intn(len(workflowFlavors))] + } + switch flavor { + case workflowFlavorVtctl: + rs2 = newVtctlReshard(rs) + case workflowFlavorVtctld: + rs2 = newVtctldReshard(rs) + default: + panic("unreachable") + } + log.Infof("Using reshard flavor: %s", rs2.Flavor()) + return rs2 +} + +type VtctlReshard struct { + *reshardWorkflow +} + +func (vrs *VtctlReshard) Flavor() string { + return "vtctl" +} + +func newVtctlReshard(rs *reshardWorkflow) *VtctlReshard { + return &VtctlReshard{rs} +} + +func (vrs *VtctlReshard) Create() { + currentWorkflowType = wrangler.ReshardWorkflow + vrs.exec(workflowActionCreate) +} + +func (vrs *VtctlReshard) SwitchReadsAndWrites() { + vrs.exec(workflowActionSwitchTraffic) +} + +func (vrs *VtctlReshard) ReverseReadsAndWrites() { + vrs.exec(workflowActionReverseTraffic) +} + +func (vrs *VtctlReshard) Show() { + //TODO implement me + panic("implement me") +} + +func (vrs *VtctlReshard) exec(action string) { + options := &workflowExecOptions{} + err := tstWorkflowExec(vrs.vc.t, "", vrs.workflowName, "", vrs.targetKeyspace, + "", action, vrs.tabletTypes, vrs.sourceShards, vrs.targetShards, options) + require.NoError(vrs.vc.t, err) +} + +func (vrs *VtctlReshard) SwitchReads() { + //TODO implement me + panic("implement me") +} + +func (vrs *VtctlReshard) SwitchWrites() { + //TODO implement me + panic("implement me") +} + +func (vrs *VtctlReshard) Cancel() { + vrs.exec(workflowActionCancel) +} + +func (vrs *VtctlReshard) Complete() { + vrs.exec(workflowActionComplete) +} + +var _ iReshard = (*VtctldReshard)(nil) + +type VtctldReshard struct { + *reshardWorkflow +} + +func newVtctldReshard(rs *reshardWorkflow) *VtctldReshard { + return &VtctldReshard{rs} +} + +func (v VtctldReshard) Flavor() string { + return "vtctld" +} + +func (v VtctldReshard) exec(args ...string) { + args2 := []string{"Reshard", "--workflow=" + v.workflowName, "--target-keyspace=" + v.targetKeyspace} + args2 = append(args2, args...) + if err := vc.VtctldClient.ExecuteCommand(args2...); err != nil { + v.vc.t.Fatalf("failed to create Reshard workflow: %v", err) + } +} + +func (v VtctldReshard) Create() { + args := []string{"Create"} + if v.sourceShards != "" { + args = append(args, "--source-shards="+v.sourceShards) + } + if v.targetShards != "" { + args = append(args, "--target-shards="+v.targetShards) + } + if v.skipSchemaCopy { + args = append(args, "--skip-schema-copy="+strconv.FormatBool(v.skipSchemaCopy)) + } + v.exec(args...) +} + +func (v VtctldReshard) SwitchReadsAndWrites() { + v.exec("SwitchTraffic") +} + +func (v VtctldReshard) ReverseReadsAndWrites() { + v.exec("ReverseTraffic") +} + +func (v VtctldReshard) Show() { + //TODO implement me + panic("implement me") +} + +func (v VtctldReshard) SwitchReads() { //TODO implement me panic("implement me") } + +func (v VtctldReshard) SwitchWrites() { + //TODO implement me + panic("implement me") +} + +func (v VtctldReshard) Cancel() { + v.exec("Cancel") +} + +func (v VtctldReshard) Complete() { + v.exec("Complete") +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 3abd24aa0e6..be8876f26d2 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -152,8 +152,21 @@ func (vp *vplayer) play(ctx context.Context) error { // The foreign_key_checks value for a transaction is determined by the 2nd bit (least significant) of the flags: // - If set (1), foreign key checks are disabled. // - If unset (0), foreign key checks are enabled. -// updateFKCheck also updates the state for the first row event that this vplayer and hence the connection sees. +// updateFKCheck also updates the state for the first row event that this vplayer, and hence the db connection, sees. func (vp *vplayer) updateFKCheck(ctx context.Context, flags2 uint32) error { + mustUpdate := false + if vp.vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy) { + // If this is an atomic copy, we must update the foreign_key_checks state even when the vplayer runs during + // the copy phase, i.e., for catchup and fastforward. + mustUpdate = true + } else if vp.vr.state == binlogdatapb.VReplicationWorkflowState_Running { + // If the vreplication workflow is in Running state, we must update the foreign_key_checks + // state for all workflow types. + mustUpdate = true + } + if !mustUpdate { + return nil + } dbForeignKeyChecksEnabled := true if flags2&NoForeignKeyCheckFlagBitmask == NoForeignKeyCheckFlagBitmask { dbForeignKeyChecksEnabled = false diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index ccbcee9c3b0..e7700e08079 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -1391,8 +1391,8 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error Filter: filter, }) } - log.Infof("Creating reverse workflow vreplication stream on tablet %s: workflow %s, startPos %s", - source.GetPrimary().Alias, ts.ReverseWorkflowName(), target.Position) + log.Infof("Creating reverse workflow vreplication stream on tablet %s: workflow %s, startPos %s for target %s:%s, uid %d", + source.GetPrimary().Alias, ts.ReverseWorkflowName(), target.Position, ts.TargetKeyspaceName(), target.GetShard().ShardName(), uid) _, err := ts.VReplicationExec(ctx, source.GetPrimary().Alias, binlogplayer.CreateVReplicationState(ts.ReverseWorkflowName(), reverseBls, target.Position, binlogdatapb.VReplicationWorkflowState_Stopped, source.GetPrimary().DbName(), ts.workflowType, ts.workflowSubType)) diff --git a/test/ci_workflow_gen.go b/test/ci_workflow_gen.go index 5a3031d7307..1bc76a2868a 100644 --- a/test/ci_workflow_gen.go +++ b/test/ci_workflow_gen.go @@ -122,6 +122,7 @@ var ( "vreplication_v2", "vreplication_partial_movetables_basic", "vreplication_partial_movetables_sequences", + "vreplication_foreign_key_stress", "schemadiff_vrepl", "topo_connection_cache", "vtgate_partial_keyspace", diff --git a/test/config.json b/test/config.json index 66657b4f37e..7aafcaf1a80 100644 --- a/test/config.json +++ b/test/config.json @@ -1220,6 +1220,15 @@ "RetryMax": 1, "Tags": [] }, + "vreplication_foreign_key_stress": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestFKExt"], + "Command": [], + "Manual": false, + "Shard": "vreplication_foreign_key_stress", + "RetryMax": 1, + "Tags": [] + }, "vreplication_across_db_versions": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestV2WorkflowsAcrossDBVersions", "-timeout", "20m"],