Skip to content

Commit

Permalink
VReplication: Enforce consistent order for table copies and diffs (#1…
Browse files Browse the repository at this point in the history
…5152)

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Feb 9, 2024
1 parent 1f728ea commit a60297b
Show file tree
Hide file tree
Showing 13 changed files with 413 additions and 36 deletions.
60 changes: 60 additions & 0 deletions go/protoutil/binlogsource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package protoutil

import (
"slices"
"sort"
"strings"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

// SortBinlogSourceTables sorts the table related contents of the
// BinlogSource struct lexicographically by table name in order to
// produce consistent results.
func SortBinlogSourceTables(bls *binlogdatapb.BinlogSource) {
if bls == nil {
return
}

// Sort the tables by name to ensure a consistent order.
slices.Sort(bls.Tables)

if bls.Filter == nil || len(bls.Filter.Rules) == 0 {
return
}
sort.Slice(bls.Filter.Rules, func(i, j int) bool {
// Exclude filters should logically be processed first.
if bls.Filter.Rules[i].Filter == "exclude" && bls.Filter.Rules[j].Filter != "exclude" {
return true
}
if bls.Filter.Rules[j].Filter == "exclude" && bls.Filter.Rules[i].Filter != "exclude" {
return false
}

// Remove preceding slash from the match string.
// That is used when the filter is a regular expression.
fi, _ := strings.CutPrefix(bls.Filter.Rules[i].Match, "/")
fj, _ := strings.CutPrefix(bls.Filter.Rules[j].Match, "/")
if fi != fj {
return fi < fj
}

return bls.Filter.Rules[i].Filter < bls.Filter.Rules[j].Filter
})
}
209 changes: 209 additions & 0 deletions go/protoutil/binlogsource_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package protoutil

import (
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

func TestSortBinlogSourceTables(t *testing.T) {
tests := []struct {
name string
inSource *binlogdatapb.BinlogSource
outSource *binlogdatapb.BinlogSource
}{
{
name: "Basic",
inSource: &binlogdatapb.BinlogSource{
Tables: []string{"wuts1", "atable", "1table", "ztable2", "table3"},
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: "ztable2",
},
{
Match: "table3",
},
{
Match: "/wuts",
},
{
Match: "1table",
Filter: "a",
},
{
Match: "1table",
},
{
Match: "atable",
},
},
},
},
outSource: &binlogdatapb.BinlogSource{
Tables: []string{"1table", "atable", "table3", "wuts1", "ztable2"},
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: "1table",
},
{
Match: "1table",
Filter: "a",
},
{
Match: "atable",
},
{
Match: "table3",
},
{
Match: "/wuts",
},
{
Match: "ztable2",
},
},
},
},
},
{
name: "With excludes",
inSource: &binlogdatapb.BinlogSource{
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: "./*",
},
{
Match: "no4",
Filter: "exclude",
},
{
Match: "no2",
Filter: "exclude",
},
{
Match: "ztable2",
},
{
Match: "atable2",
},
},
},
},
outSource: &binlogdatapb.BinlogSource{
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: "no2",
Filter: "exclude",
},
{
Match: "no4",
Filter: "exclude",
},
{
Match: "./*",
},
{
Match: "atable2",
},
{
Match: "ztable2",
},
},
},
},
},
{
name: "With excludes",
inSource: &binlogdatapb.BinlogSource{
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: "no4",
Filter: "exclude",
},
{
Match: "no2",
Filter: "exclude",
},
{
Match: "./*",
},
},
},
},
outSource: &binlogdatapb.BinlogSource{
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: "no2",
Filter: "exclude",
},
{
Match: "no4",
Filter: "exclude",
},
{
Match: "./*",
},
},
},
},
},
{
name: "Nil",
inSource: nil,
outSource: nil,
},
{
name: "No filter",
inSource: &binlogdatapb.BinlogSource{
Tables: []string{"wuts1", "atable", "1table", "ztable2", "table3"},
Filter: nil,
},
outSource: &binlogdatapb.BinlogSource{
Tables: []string{"1table", "atable", "table3", "wuts1", "ztable2"},
Filter: nil,
},
},
{
name: "No filter rules",
inSource: &binlogdatapb.BinlogSource{
Tables: []string{"wuts1", "atable", "1table", "ztable2", "table3"},
Filter: &binlogdatapb.Filter{},
},
outSource: &binlogdatapb.BinlogSource{
Tables: []string{"1table", "atable", "table3", "wuts1", "ztable2"},
Filter: &binlogdatapb.Filter{},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
SortBinlogSourceTables(tt.inSource)
require.True(t, proto.Equal(tt.inSource, tt.outSource), "got: %s, want: %s", tt.inSource.String(), tt.outSource.String())
})
}
}
5 changes: 3 additions & 2 deletions go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,14 @@ func doVtctlclientVDiff(t *testing.T, keyspace, workflow, cells string, want *ex

func waitForVDiff2ToComplete(t *testing.T, useVtctlclient bool, ksWorkflow, cells, uuid string, completedAtMin time.Time) *vdiffInfo {
var info *vdiffInfo
var jsonStr string
first := true
previousProgress := vdiff2.ProgressReport{}
ch := make(chan bool)
go func() {
for {
time.Sleep(vdiffStatusCheckInterval)
_, jsonStr := performVDiff2Action(t, useVtctlclient, ksWorkflow, cells, "show", uuid, false)
_, jsonStr = performVDiff2Action(t, useVtctlclient, ksWorkflow, cells, "show", uuid, false)
info = getVDiffInfo(jsonStr)
require.NotNil(t, info)
if info.State == "completed" {
Expand Down Expand Up @@ -142,7 +143,7 @@ func waitForVDiff2ToComplete(t *testing.T, useVtctlclient bool, ksWorkflow, cell
case <-ch:
return info
case <-time.After(vdiffTimeout):
log.Errorf("VDiff never completed for UUID %s", uuid)
log.Errorf("VDiff never completed for UUID %s. Latest output: %s", uuid, jsonStr)
require.FailNow(t, fmt.Sprintf("VDiff never completed for UUID %s", uuid))
return nil
}
Expand Down
9 changes: 5 additions & 4 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@ import (
"time"

"github.com/spf13/pflag"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/history"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -606,6 +605,7 @@ func ReadVRSettings(dbClient DBClient, uid int32) (VRSettings, error) {
// the _vt.vreplication table.
func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, position string, maxTPS, maxReplicationLag, timeUpdated int64, dbName string,
workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType, deferSecondaryKeys bool) string {
protoutil.SortBinlogSourceTables(source)
return fmt.Sprintf("insert into _vt.vreplication "+
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys) "+
"values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d, %v)",
Expand All @@ -616,6 +616,7 @@ func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, posi
// CreateVReplicationState returns a statement to create a stopped vreplication.
func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, position string, state binlogdatapb.VReplicationWorkflowState, dbName string,
workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType) string {
protoutil.SortBinlogSourceTables(source)
return fmt.Sprintf("insert into _vt.vreplication "+
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type) "+
"values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d)",
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vtctl/workflow/resharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -281,8 +282,8 @@ func (rs *resharder) createStreams(ctx context.Context) error {

ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, targetPrimary.DbName())

// copy excludeRules to prevent data race.
copyExcludeRules := append([]*binlogdatapb.Rule(nil), excludeRules...)
// Clone excludeRules to prevent data races.
copyExcludeRules := slices.Clone(excludeRules)
for _, source := range rs.sourceShards {
if !key.KeyRangeIntersect(target.KeyRange, source.KeyRange) {
continue
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_vreplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/discovery"
Expand Down Expand Up @@ -57,6 +58,7 @@ func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *ta
}
res := &sqltypes.Result{}
for _, bls := range req.BinlogSource {
protoutil.SortBinlogSourceTables(bls)
source, err := prototext.Marshal(bls)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit a60297b

Please sign in to comment.