From dc8913152cbfbe840c24ca0e36e9342657075f37 Mon Sep 17 00:00:00 2001 From: Vilius Okockis Date: Thu, 11 Apr 2024 17:15:52 +0300 Subject: [PATCH] backport Online DDL/VReplication: able to read from replica https://github.com/vitessio/vitess/pull/8405 Signed-off-by: Vilius Okockis --- go/vt/discovery/tablet_picker.go | 57 ++++++---- go/vt/discovery/tablet_picker_test.go | 149 ++++++++++++++++++++------ go/vt/schema/ddl_strategy.go | 11 +- go/vt/schema/online_ddl.go | 26 +++++ go/vt/schema/online_ddl_test.go | 22 ++-- go/vt/topo/tablet.go | 2 +- go/vt/vttablet/onlineddl/vrepl.go | 2 +- 7 files changed, 197 insertions(+), 72 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index dea4d7d7def..e26861e0955 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -19,6 +19,7 @@ package discovery import ( "fmt" "math/rand" + "sort" "strings" "sync" "time" @@ -44,6 +45,7 @@ var ( tabletPickerRetryDelay = 30 * time.Second muTabletPickerRetryDelay sync.Mutex globalTPStats *tabletPickerStats + inOrderHint = "in_order:" ) // GetTabletPickerRetryDelay synchronizes changes to tabletPickerRetryDelay. Used in tests only at the moment @@ -67,10 +69,16 @@ type TabletPicker struct { keyspace string shard string tabletTypes []topodatapb.TabletType + inOrder bool } // NewTabletPicker returns a TabletPicker. func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTypesStr string) (*TabletPicker, error) { + inOrder := false + if strings.HasPrefix(tabletTypesStr, inOrderHint) { + inOrder = true + tabletTypesStr = tabletTypesStr[len(inOrderHint):] + } tabletTypes, err := topoproto.ParseTabletTypes(tabletTypesStr) if err != nil { return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to parse list of tablet types: %v", tabletTypesStr) @@ -95,6 +103,7 @@ func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTyp keyspace: keyspace, shard: shard, tabletTypes: tabletTypes, + inOrder: inOrder, }, nil } @@ -102,6 +111,7 @@ func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTyp // All tablets that belong to tp.cells are evaluated and one is // chosen at random func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) { + rand.Seed(time.Now().UnixNano()) // keep trying at intervals (tabletPickerRetryDelay) until a tablet is found // or the context is canceled for { @@ -111,6 +121,25 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table default: } candidates := tp.GetMatchingTablets(ctx) + if tp.inOrder { + // Sort candidates slice such that tablets appear in same tablet type order as in tp.tabletTypes + orderMap := map[topodatapb.TabletType]int{} + for i, t := range tp.tabletTypes { + orderMap[t] = i + } + sort.Slice(candidates, func(i, j int) bool { + if orderMap[candidates[i].Type] == orderMap[candidates[j].Type] { + // identical tablet types: randomize order of tablets for this type + return rand.Intn(2) == 0 // 50% chance + } + return orderMap[candidates[i].Type] < orderMap[candidates[j].Type] + }) + } else { + // Randomize candidates + rand.Shuffle(len(candidates), func(i, j int) { + candidates[i], candidates[j] = candidates[j], candidates[i] + }) + } if len(candidates) == 0 { // if no candidates were found, sleep and try again tp.incNoTabletFoundStat() @@ -125,27 +154,19 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table } continue } - // try at most len(candidate) times to find a healthy tablet - for i := 0; i < len(candidates); i++ { - idx := rand.Intn(len(candidates)) - ti := candidates[idx] - // get tablet + for _, ti := range candidates { // try to connect to tablet - conn, err := tabletconn.GetDialer()(ti.Tablet, true) - if err != nil { - log.Warningf("unable to connect to tablet for alias %v", ti.Alias) - candidates = append(candidates[:idx], candidates[idx+1:]...) - if len(candidates) == 0 { - tp.incNoTabletFoundStat() - break - } - continue + if conn, err := tabletconn.GetDialer()(ti.Tablet, true); err == nil { + // OK to use ctx here because it is not actually used by the underlying Close implementation + _ = conn.Close(ctx) + log.Infof("tablet picker found tablet %s", ti.Tablet.String()) + return ti.Tablet, nil } - // OK to use ctx here because it is not actually used by the underlying Close implementation - _ = conn.Close(ctx) - log.Infof("tablet picker found tablet %s", ti.Tablet.String()) - return ti.Tablet, nil + // err found + log.Warningf("unable to connect to tablet for alias %v", ti.Alias) } + // Got here? Means we iterated all tablets and did not find a healthy one + tp.incNoTabletFoundStat() } } diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 64e8cea0f44..0f2f3e24318 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -17,13 +17,10 @@ limitations under the License. package discovery import ( + "context" "testing" "time" - "vitess.io/vitess/go/vt/log" - - "context" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -37,7 +34,7 @@ import ( func TestPickSimple(t *testing.T) { te := newPickerTestEnv(t, []string{"cell"}) want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(te, want) + defer deleteTablet(t, te, want) tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") require.NoError(t, err) @@ -50,9 +47,9 @@ func TestPickSimple(t *testing.T) { func TestPickFromTwoHealthy(t *testing.T) { te := newPickerTestEnv(t, []string{"cell"}) want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(te, want1) + defer deleteTablet(t, te, want1) want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "cell", true, true) - defer deleteTablet(te, want2) + defer deleteTablet(t, te, want2) tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly") require.NoError(t, err) @@ -73,12 +70,102 @@ func TestPickFromTwoHealthy(t *testing.T) { assert.True(t, picked2) } +func TestPickInOrder1(t *testing.T) { + te := newPickerTestEnv(t, []string{"cell"}) + want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) + defer deleteTablet(t, te, want1) + want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "cell", true, true) + defer deleteTablet(t, te, want2) + + tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "in_order:replica,rdonly") + require.NoError(t, err) + + // In 20 attempts, we always pick the first healthy tablet in order + var picked1, picked2 bool + for i := 0; i < 20; i++ { + tablet, err := tp.PickForStreaming(context.Background()) + require.NoError(t, err) + if proto.Equal(tablet, want1) { + picked1 = true + } + if proto.Equal(tablet, want2) { + picked2 = true + } + } + assert.True(t, picked1) + assert.False(t, picked2) +} + +func TestPickInOrder2(t *testing.T) { + te := newPickerTestEnv(t, []string{"cell"}) + want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) + defer deleteTablet(t, te, want1) + want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "cell", true, true) + defer deleteTablet(t, te, want2) + + tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "in_order:rdonly,replica") + require.NoError(t, err) + + // In 20 attempts, we always pick the first healthy tablet in order + var picked1, picked2 bool + for i := 0; i < 20; i++ { + tablet, err := tp.PickForStreaming(context.Background()) + require.NoError(t, err) + if proto.Equal(tablet, want1) { + picked1 = true + } + if proto.Equal(tablet, want2) { + picked2 = true + } + } + assert.False(t, picked1) + assert.True(t, picked2) +} + +func TestPickInOrderMultipleInGroup(t *testing.T) { + te := newPickerTestEnv(t, []string{"cell"}) + want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) + defer deleteTablet(t, te, want1) + want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "cell", true, true) + defer deleteTablet(t, te, want2) + want3 := addTablet(te, 102, topodatapb.TabletType_RDONLY, "cell", true, true) + defer deleteTablet(t, te, want3) + want4 := addTablet(te, 103, topodatapb.TabletType_RDONLY, "cell", true, true) + defer deleteTablet(t, te, want4) + + tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "in_order:rdonly,replica") + require.NoError(t, err) + + // In 40 attempts, we pick each of the three RDONLY, but never the REPLICA + var picked1, picked2, picked3, picked4 bool + for i := 0; i < 40; i++ { + tablet, err := tp.PickForStreaming(context.Background()) + require.NoError(t, err) + if proto.Equal(tablet, want1) { + picked1 = true + } + if proto.Equal(tablet, want2) { + picked2 = true + } + if proto.Equal(tablet, want3) { + picked3 = true + } + if proto.Equal(tablet, want4) { + picked4 = true + } + } + assert.False(t, picked1) + assert.True(t, picked2) + assert.True(t, picked3) + assert.True(t, picked4) +} + func TestPickRespectsTabletType(t *testing.T) { te := newPickerTestEnv(t, []string{"cell"}) want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(te, want) + defer deleteTablet(t, te, want) dont := addTablet(te, 101, topodatapb.TabletType_MASTER, "cell", true, true) - defer deleteTablet(te, dont) + defer deleteTablet(t, te, dont) tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly") require.NoError(t, err) @@ -95,7 +182,7 @@ func TestPickRespectsTabletType(t *testing.T) { func TestPickMultiCell(t *testing.T) { te := newPickerTestEnv(t, []string{"cell", "otherCell"}) want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(te, want) + defer deleteTablet(t, te, want) tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") require.NoError(t, err) @@ -110,7 +197,7 @@ func TestPickMultiCell(t *testing.T) { func TestPickMaster(t *testing.T) { te := newPickerTestEnv(t, []string{"cell", "otherCell"}) want := addTablet(te, 100, topodatapb.TabletType_MASTER, "cell", true, true) - defer deleteTablet(te, want) + defer deleteTablet(t, te, want) ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() _, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error { @@ -132,7 +219,7 @@ func TestPickMaster(t *testing.T) { func TestPickFromOtherCell(t *testing.T) { te := newPickerTestEnv(t, []string{"cell", "otherCell"}) want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "otherCell", true, true) - defer deleteTablet(te, want) + defer deleteTablet(t, te, want) tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") require.NoError(t, err) @@ -147,9 +234,9 @@ func TestPickFromOtherCell(t *testing.T) { func TestDontPickFromOtherCell(t *testing.T) { te := newPickerTestEnv(t, []string{"cell", "otherCell"}) want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(te, want1) + defer deleteTablet(t, te, want1) want2 := addTablet(te, 101, topodatapb.TabletType_REPLICA, "otherCell", true, true) - defer deleteTablet(te, want2) + defer deleteTablet(t, te, want2) tp, err := NewTabletPicker(te.topoServ, []string{"cell"}, te.keyspace, te.shard, "replica") require.NoError(t, err) @@ -176,9 +263,9 @@ func TestDontPickFromOtherCell(t *testing.T) { func TestPickMultiCellTwoTablets(t *testing.T) { te := newPickerTestEnv(t, []string{"cell", "otherCell"}) want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(te, want1) + defer deleteTablet(t, te, want1) want2 := addTablet(te, 101, topodatapb.TabletType_REPLICA, "otherCell", true, true) - defer deleteTablet(te, want2) + defer deleteTablet(t, te, want2) tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") require.NoError(t, err) @@ -205,9 +292,9 @@ func TestPickMultiCellTwoTablets(t *testing.T) { func TestPickMultiCellTwoTabletTypes(t *testing.T) { te := newPickerTestEnv(t, []string{"cell", "otherCell"}) want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(te, want1) + defer deleteTablet(t, te, want1) want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "otherCell", true, true) - defer deleteTablet(te, want2) + defer deleteTablet(t, te, want2) tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly") require.NoError(t, err) @@ -235,7 +322,7 @@ func TestPickUsingCellAlias(t *testing.T) { // test env puts all cells into an alias called "cella" te := newPickerTestEnv(t, []string{"cell", "otherCell"}) want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(te, want1) + defer deleteTablet(t, te, want1) tp, err := NewTabletPicker(te.topoServ, []string{"cella"}, te.keyspace, te.shard, "replica") require.NoError(t, err) @@ -247,9 +334,9 @@ func TestPickUsingCellAlias(t *testing.T) { assert.True(t, proto.Equal(want1, tablet), "Pick: %v, want %v", tablet, want1) // create a tablet in the other cell, it should be picked - deleteTablet(te, want1) + deleteTablet(t, te, want1) want2 := addTablet(te, 101, topodatapb.TabletType_REPLICA, "otherCell", true, true) - defer deleteTablet(te, want2) + defer deleteTablet(t, te, want2) ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel2() tablet, err = tp.PickForStreaming(ctx2) @@ -299,7 +386,7 @@ func TestTabletAppearsDuringSleep(t *testing.T) { }() want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) - defer deleteTablet(te, want) + defer deleteTablet(t, te, want) got := <-result require.NotNil(t, got, "Tablet should not be nil") assert.True(t, proto.Equal(want, got), "Pick: %v, want %v", got, want) @@ -324,7 +411,7 @@ func TestPickError(t *testing.T) { _, err = tp.PickForStreaming(ctx) require.EqualError(t, err, "context has expired") // no tablets of the correct type - defer deleteTablet(te, addTablet(te, 200, topodatapb.TabletType_RDONLY, "cell", true, true)) + defer deleteTablet(t, te, addTablet(te, 200, topodatapb.TabletType_RDONLY, "cell", true, true)) ctx, cancel = context.WithTimeout(context.Background(), 20*time.Millisecond) defer cancel() _, err = tp.PickForStreaming(ctx) @@ -395,18 +482,16 @@ func addTablet(te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell return tablet } -func deleteTablet(te *pickerTestEnv, tablet *topodatapb.Tablet) { - +func deleteTablet(t *testing.T, te *pickerTestEnv, tablet *topodatapb.Tablet) { if tablet == nil { return } - //log error - if err := te.topoServ.DeleteTablet(context.Background(), tablet.Alias); err != nil { - log.Errorf("failed to DeleteTablet with alias : %v", err) + { //log error + err := te.topoServ.DeleteTablet(context.Background(), tablet.Alias) + require.NoError(t, err, "failed to DeleteTablet with alias: %v", err) } - - //This is not automatically removed from shard replication, which results in log spam and log error - if err := topo.DeleteTabletReplicationData(context.Background(), te.topoServ, tablet); err != nil { - log.Errorf("failed to automatically remove from shard replication: %v", err) + { //This is not automatically removed from shard replication, which results in log spam and log error + err := topo.DeleteTabletReplicationData(context.Background(), te.topoServ, tablet) + require.NoError(t, err, "failed to automatically remove from shard replication: %v", err) } } diff --git a/go/vt/schema/ddl_strategy.go b/go/vt/schema/ddl_strategy.go index dbc1f2d32b6..ad2b98e563c 100644 --- a/go/vt/schema/ddl_strategy.go +++ b/go/vt/schema/ddl_strategy.go @@ -155,13 +155,10 @@ func (setting *DDLStrategySetting) RuntimeOptions() []string { // IsSkipTopo suggests that DDL should apply to tables bypassing global topo request func (setting *DDLStrategySetting) IsSkipTopo() bool { - switch { - case setting.IsSingleton(), setting.IsSingletonContext(): - return true - case setting.hasFlag(skipTopoFlag): - return true - } - return false + // Vitess 11 introduced the flag -skip-topo. starting Vitess 12 the flag is _always_ considered 'true'. + // Ideally the flag should be gone, but for backwards compatibility we allow users to still specify it + // (and we stil ignore the value, it's always set to true) + return true } // ToString returns a simple string representation of this instance diff --git a/go/vt/schema/online_ddl.go b/go/vt/schema/online_ddl.go index 81f0d506f2c..a67fe0cc268 100644 --- a/go/vt/schema/online_ddl.go +++ b/go/vt/schema/online_ddl.go @@ -372,6 +372,32 @@ func (onlineDDL *OnlineDDL) ToJSON() ([]byte, error) { return json.Marshal(onlineDDL) } +// sqlWithoutComments returns the SQL statement without comment directives. Useful for tests +func (onlineDDL *OnlineDDL) sqlWithoutComments() (sql string, err error) { + sql = onlineDDL.SQL + stmt, err := sqlparser.Parse(sql) + if err != nil { + // query validation and rebuilding + if _, err := legacyParseRevertUUID(sql); err == nil { + // This is a revert statement of the form "revert ". We allow this for now. Future work will + // make sure the statement is a valid, parseable "revert vitess_migration ''", but we must + // be backwards compatible for now. + return sql, nil + } + // otherwise the statement should have been parseable! + return "", err + } + + switch stmt := stmt.(type) { + case sqlparser.DDLStatement: + stmt.SetComments(nil) + case *sqlparser.RevertMigration: + stmt.SetComments(nil) + } + sql = sqlparser.String(stmt) + return sql, nil +} + // GetAction extracts the DDL action type from the online DDL statement func (onlineDDL *OnlineDDL) GetAction() (action sqlparser.DDLAction, err error) { if _, err := onlineDDL.GetRevertUUID(); err == nil { diff --git a/go/vt/schema/online_ddl_test.go b/go/vt/schema/online_ddl_test.go index b361ac1ae3a..fcf4634ac49 100644 --- a/go/vt/schema/online_ddl_test.go +++ b/go/vt/schema/online_ddl_test.go @@ -210,8 +210,8 @@ func TestNewOnlineDDL(t *testing.T) { NewDDLStrategySetting(DDLStrategyOnline, ""), NewDDLStrategySetting(DDLStrategyOnline, "-singleton"), } - require.False(t, strategies[0].IsSkipTopo()) - require.False(t, strategies[1].IsSkipTopo()) + require.True(t, strategies[0].IsSkipTopo()) + require.True(t, strategies[1].IsSkipTopo()) require.True(t, strategies[2].IsSkipTopo()) for _, ts := range tt { @@ -224,16 +224,11 @@ func TestNewOnlineDDL(t *testing.T) { return } assert.NoError(t, err) - if stgy.IsSkipTopo() { - // onlineDDL.SQL enriched with /*vt+ ... */ comment - assert.Contains(t, onlineDDL.SQL, hex.EncodeToString([]byte(onlineDDL.UUID))) - assert.Contains(t, onlineDDL.SQL, hex.EncodeToString([]byte(migrationContext))) - assert.Contains(t, onlineDDL.SQL, hex.EncodeToString([]byte(string(stgy.Strategy)))) - } else { - assert.NotContains(t, onlineDDL.SQL, hex.EncodeToString([]byte(onlineDDL.UUID))) - assert.NotContains(t, onlineDDL.SQL, hex.EncodeToString([]byte(migrationContext))) - assert.NotContains(t, onlineDDL.SQL, hex.EncodeToString([]byte(string(stgy.Strategy)))) - } + require.True(t, stgy.IsSkipTopo(), "IsSkipTopo() should always be true") + // onlineDDL.SQL enriched with /*vt+ ... */ comment + assert.Contains(t, onlineDDL.SQL, hex.EncodeToString([]byte(onlineDDL.UUID))) + assert.Contains(t, onlineDDL.SQL, hex.EncodeToString([]byte(migrationContext))) + assert.Contains(t, onlineDDL.SQL, hex.EncodeToString([]byte(string(stgy.Strategy)))) }) } }) @@ -297,7 +292,8 @@ func TestNewOnlineDDLs(t *testing.T) { sqls := []string{} for _, onlineDDL := range onlineDDLs { - sql := onlineDDL.SQL + sql, err := onlineDDL.sqlWithoutComments() + assert.NoError(t, err) sql = strings.ReplaceAll(sql, "\n", "") sql = strings.ReplaceAll(sql, "\t", "") sqls = append(sqls, sql) diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go index 5658346963e..3912a59625d 100644 --- a/go/vt/topo/tablet.go +++ b/go/vt/topo/tablet.go @@ -80,7 +80,7 @@ func IsRunningQueryService(tt topodatapb.TabletType) bool { // lameduck. Lameduck is a transition period where we are still // allowed to serve, but we tell the clients we are going away // soon. Typically, a vttablet will still serve, but broadcast a -// non-serving state through its health check. then vtgate will ctahc +// non-serving state through its health check. then vtgate will catch // that non-serving state, and stop sending queries. // // Masters are not subject to lameduck, as we usually want to transition diff --git a/go/vt/vttablet/onlineddl/vrepl.go b/go/vt/vttablet/onlineddl/vrepl.go index 2aa3df7deb8..875ef0d0cfd 100644 --- a/go/vt/vttablet/onlineddl/vrepl.go +++ b/go/vt/vttablet/onlineddl/vrepl.go @@ -525,7 +525,7 @@ func (v *VRepl) analyze(ctx context.Context, conn *dbconnpool.DBConnection) erro // generateInsertStatement generates the INSERT INTO _vt.replication stataement that creates the vreplication workflow func (v *VRepl) generateInsertStatement(ctx context.Context) (string, error) { ig := vreplication.NewInsertGenerator(binlogplayer.BlpStopped, v.dbName) - ig.AddRow(v.workflow, v.bls, v.pos, "", "MASTER") + ig.AddRow(v.workflow, v.bls, v.pos, "", "in_order:REPLICA,MASTER") return ig.String(), nil }