Skip to content

Commit

Permalink
backport Online DDL/VReplication: able to read from replica vitessio#…
Browse files Browse the repository at this point in the history
…8405

Signed-off-by: Vilius Okockis <[email protected]>
  • Loading branch information
DeathBorn committed Apr 11, 2024
1 parent 7fe7365 commit dc89131
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 72 deletions.
57 changes: 39 additions & 18 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package discovery
import (
"fmt"
"math/rand"
"sort"
"strings"
"sync"
"time"
Expand All @@ -44,6 +45,7 @@ var (
tabletPickerRetryDelay = 30 * time.Second
muTabletPickerRetryDelay sync.Mutex
globalTPStats *tabletPickerStats
inOrderHint = "in_order:"
)

// GetTabletPickerRetryDelay synchronizes changes to tabletPickerRetryDelay. Used in tests only at the moment
Expand All @@ -67,10 +69,16 @@ type TabletPicker struct {
keyspace string
shard string
tabletTypes []topodatapb.TabletType
inOrder bool
}

// NewTabletPicker returns a TabletPicker.
func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTypesStr string) (*TabletPicker, error) {
inOrder := false
if strings.HasPrefix(tabletTypesStr, inOrderHint) {
inOrder = true
tabletTypesStr = tabletTypesStr[len(inOrderHint):]
}
tabletTypes, err := topoproto.ParseTabletTypes(tabletTypesStr)
if err != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to parse list of tablet types: %v", tabletTypesStr)
Expand All @@ -95,13 +103,15 @@ func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTyp
keyspace: keyspace,
shard: shard,
tabletTypes: tabletTypes,
inOrder: inOrder,
}, nil
}

// PickForStreaming picks an available tablet
// All tablets that belong to tp.cells are evaluated and one is
// chosen at random
func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) {
rand.Seed(time.Now().UnixNano())
// keep trying at intervals (tabletPickerRetryDelay) until a tablet is found
// or the context is canceled
for {
Expand All @@ -111,6 +121,25 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
default:
}
candidates := tp.GetMatchingTablets(ctx)
if tp.inOrder {
// Sort candidates slice such that tablets appear in same tablet type order as in tp.tabletTypes
orderMap := map[topodatapb.TabletType]int{}
for i, t := range tp.tabletTypes {
orderMap[t] = i
}
sort.Slice(candidates, func(i, j int) bool {
if orderMap[candidates[i].Type] == orderMap[candidates[j].Type] {
// identical tablet types: randomize order of tablets for this type
return rand.Intn(2) == 0 // 50% chance
}
return orderMap[candidates[i].Type] < orderMap[candidates[j].Type]
})
} else {
// Randomize candidates
rand.Shuffle(len(candidates), func(i, j int) {
candidates[i], candidates[j] = candidates[j], candidates[i]
})
}
if len(candidates) == 0 {
// if no candidates were found, sleep and try again
tp.incNoTabletFoundStat()
Expand All @@ -125,27 +154,19 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
}
continue
}
// try at most len(candidate) times to find a healthy tablet
for i := 0; i < len(candidates); i++ {
idx := rand.Intn(len(candidates))
ti := candidates[idx]
// get tablet
for _, ti := range candidates {
// try to connect to tablet
conn, err := tabletconn.GetDialer()(ti.Tablet, true)
if err != nil {
log.Warningf("unable to connect to tablet for alias %v", ti.Alias)
candidates = append(candidates[:idx], candidates[idx+1:]...)
if len(candidates) == 0 {
tp.incNoTabletFoundStat()
break
}
continue
if conn, err := tabletconn.GetDialer()(ti.Tablet, true); err == nil {
// OK to use ctx here because it is not actually used by the underlying Close implementation
_ = conn.Close(ctx)
log.Infof("tablet picker found tablet %s", ti.Tablet.String())
return ti.Tablet, nil
}
// OK to use ctx here because it is not actually used by the underlying Close implementation
_ = conn.Close(ctx)
log.Infof("tablet picker found tablet %s", ti.Tablet.String())
return ti.Tablet, nil
// err found
log.Warningf("unable to connect to tablet for alias %v", ti.Alias)
}
// Got here? Means we iterated all tablets and did not find a healthy one
tp.incNoTabletFoundStat()
}
}

Expand Down
149 changes: 117 additions & 32 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ limitations under the License.
package discovery

import (
"context"
"testing"
"time"

"vitess.io/vitess/go/vt/log"

"context"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
Expand All @@ -37,7 +34,7 @@ import (
func TestPickSimple(t *testing.T) {
te := newPickerTestEnv(t, []string{"cell"})
want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true)
defer deleteTablet(te, want)
defer deleteTablet(t, te, want)

tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica")
require.NoError(t, err)
Expand All @@ -50,9 +47,9 @@ func TestPickSimple(t *testing.T) {
func TestPickFromTwoHealthy(t *testing.T) {
te := newPickerTestEnv(t, []string{"cell"})
want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true)
defer deleteTablet(te, want1)
defer deleteTablet(t, te, want1)
want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "cell", true, true)
defer deleteTablet(te, want2)
defer deleteTablet(t, te, want2)

tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly")
require.NoError(t, err)
Expand All @@ -73,12 +70,102 @@ func TestPickFromTwoHealthy(t *testing.T) {
assert.True(t, picked2)
}

func TestPickInOrder1(t *testing.T) {
te := newPickerTestEnv(t, []string{"cell"})
want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true)
defer deleteTablet(t, te, want1)
want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "cell", true, true)
defer deleteTablet(t, te, want2)

tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "in_order:replica,rdonly")
require.NoError(t, err)

// In 20 attempts, we always pick the first healthy tablet in order
var picked1, picked2 bool
for i := 0; i < 20; i++ {
tablet, err := tp.PickForStreaming(context.Background())
require.NoError(t, err)
if proto.Equal(tablet, want1) {
picked1 = true
}
if proto.Equal(tablet, want2) {
picked2 = true
}
}
assert.True(t, picked1)
assert.False(t, picked2)
}

func TestPickInOrder2(t *testing.T) {
te := newPickerTestEnv(t, []string{"cell"})
want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true)
defer deleteTablet(t, te, want1)
want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "cell", true, true)
defer deleteTablet(t, te, want2)

tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "in_order:rdonly,replica")
require.NoError(t, err)

// In 20 attempts, we always pick the first healthy tablet in order
var picked1, picked2 bool
for i := 0; i < 20; i++ {
tablet, err := tp.PickForStreaming(context.Background())
require.NoError(t, err)
if proto.Equal(tablet, want1) {
picked1 = true
}
if proto.Equal(tablet, want2) {
picked2 = true
}
}
assert.False(t, picked1)
assert.True(t, picked2)
}

func TestPickInOrderMultipleInGroup(t *testing.T) {
te := newPickerTestEnv(t, []string{"cell"})
want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true)
defer deleteTablet(t, te, want1)
want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "cell", true, true)
defer deleteTablet(t, te, want2)
want3 := addTablet(te, 102, topodatapb.TabletType_RDONLY, "cell", true, true)
defer deleteTablet(t, te, want3)
want4 := addTablet(te, 103, topodatapb.TabletType_RDONLY, "cell", true, true)
defer deleteTablet(t, te, want4)

tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "in_order:rdonly,replica")
require.NoError(t, err)

// In 40 attempts, we pick each of the three RDONLY, but never the REPLICA
var picked1, picked2, picked3, picked4 bool
for i := 0; i < 40; i++ {
tablet, err := tp.PickForStreaming(context.Background())
require.NoError(t, err)
if proto.Equal(tablet, want1) {
picked1 = true
}
if proto.Equal(tablet, want2) {
picked2 = true
}
if proto.Equal(tablet, want3) {
picked3 = true
}
if proto.Equal(tablet, want4) {
picked4 = true
}
}
assert.False(t, picked1)
assert.True(t, picked2)
assert.True(t, picked3)
assert.True(t, picked4)
}

func TestPickRespectsTabletType(t *testing.T) {
te := newPickerTestEnv(t, []string{"cell"})
want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true)
defer deleteTablet(te, want)
defer deleteTablet(t, te, want)
dont := addTablet(te, 101, topodatapb.TabletType_MASTER, "cell", true, true)
defer deleteTablet(te, dont)
defer deleteTablet(t, te, dont)

tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly")
require.NoError(t, err)
Expand All @@ -95,7 +182,7 @@ func TestPickRespectsTabletType(t *testing.T) {
func TestPickMultiCell(t *testing.T) {
te := newPickerTestEnv(t, []string{"cell", "otherCell"})
want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true)
defer deleteTablet(te, want)
defer deleteTablet(t, te, want)

tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica")
require.NoError(t, err)
Expand All @@ -110,7 +197,7 @@ func TestPickMultiCell(t *testing.T) {
func TestPickMaster(t *testing.T) {
te := newPickerTestEnv(t, []string{"cell", "otherCell"})
want := addTablet(te, 100, topodatapb.TabletType_MASTER, "cell", true, true)
defer deleteTablet(te, want)
defer deleteTablet(t, te, want)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
_, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error {
Expand All @@ -132,7 +219,7 @@ func TestPickMaster(t *testing.T) {
func TestPickFromOtherCell(t *testing.T) {
te := newPickerTestEnv(t, []string{"cell", "otherCell"})
want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "otherCell", true, true)
defer deleteTablet(te, want)
defer deleteTablet(t, te, want)

tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica")
require.NoError(t, err)
Expand All @@ -147,9 +234,9 @@ func TestPickFromOtherCell(t *testing.T) {
func TestDontPickFromOtherCell(t *testing.T) {
te := newPickerTestEnv(t, []string{"cell", "otherCell"})
want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true)
defer deleteTablet(te, want1)
defer deleteTablet(t, te, want1)
want2 := addTablet(te, 101, topodatapb.TabletType_REPLICA, "otherCell", true, true)
defer deleteTablet(te, want2)
defer deleteTablet(t, te, want2)

tp, err := NewTabletPicker(te.topoServ, []string{"cell"}, te.keyspace, te.shard, "replica")
require.NoError(t, err)
Expand All @@ -176,9 +263,9 @@ func TestDontPickFromOtherCell(t *testing.T) {
func TestPickMultiCellTwoTablets(t *testing.T) {
te := newPickerTestEnv(t, []string{"cell", "otherCell"})
want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true)
defer deleteTablet(te, want1)
defer deleteTablet(t, te, want1)
want2 := addTablet(te, 101, topodatapb.TabletType_REPLICA, "otherCell", true, true)
defer deleteTablet(te, want2)
defer deleteTablet(t, te, want2)

tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica")
require.NoError(t, err)
Expand All @@ -205,9 +292,9 @@ func TestPickMultiCellTwoTablets(t *testing.T) {
func TestPickMultiCellTwoTabletTypes(t *testing.T) {
te := newPickerTestEnv(t, []string{"cell", "otherCell"})
want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true)
defer deleteTablet(te, want1)
defer deleteTablet(t, te, want1)
want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, "otherCell", true, true)
defer deleteTablet(te, want2)
defer deleteTablet(t, te, want2)

tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly")
require.NoError(t, err)
Expand Down Expand Up @@ -235,7 +322,7 @@ func TestPickUsingCellAlias(t *testing.T) {
// test env puts all cells into an alias called "cella"
te := newPickerTestEnv(t, []string{"cell", "otherCell"})
want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true)
defer deleteTablet(te, want1)
defer deleteTablet(t, te, want1)

tp, err := NewTabletPicker(te.topoServ, []string{"cella"}, te.keyspace, te.shard, "replica")
require.NoError(t, err)
Expand All @@ -247,9 +334,9 @@ func TestPickUsingCellAlias(t *testing.T) {
assert.True(t, proto.Equal(want1, tablet), "Pick: %v, want %v", tablet, want1)

// create a tablet in the other cell, it should be picked
deleteTablet(te, want1)
deleteTablet(t, te, want1)
want2 := addTablet(te, 101, topodatapb.TabletType_REPLICA, "otherCell", true, true)
defer deleteTablet(te, want2)
defer deleteTablet(t, te, want2)
ctx2, cancel2 := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel2()
tablet, err = tp.PickForStreaming(ctx2)
Expand Down Expand Up @@ -299,7 +386,7 @@ func TestTabletAppearsDuringSleep(t *testing.T) {
}()

want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true)
defer deleteTablet(te, want)
defer deleteTablet(t, te, want)
got := <-result
require.NotNil(t, got, "Tablet should not be nil")
assert.True(t, proto.Equal(want, got), "Pick: %v, want %v", got, want)
Expand All @@ -324,7 +411,7 @@ func TestPickError(t *testing.T) {
_, err = tp.PickForStreaming(ctx)
require.EqualError(t, err, "context has expired")
// no tablets of the correct type
defer deleteTablet(te, addTablet(te, 200, topodatapb.TabletType_RDONLY, "cell", true, true))
defer deleteTablet(t, te, addTablet(te, 200, topodatapb.TabletType_RDONLY, "cell", true, true))
ctx, cancel = context.WithTimeout(context.Background(), 20*time.Millisecond)
defer cancel()
_, err = tp.PickForStreaming(ctx)
Expand Down Expand Up @@ -395,18 +482,16 @@ func addTablet(te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell
return tablet
}

func deleteTablet(te *pickerTestEnv, tablet *topodatapb.Tablet) {

func deleteTablet(t *testing.T, te *pickerTestEnv, tablet *topodatapb.Tablet) {
if tablet == nil {
return
}
//log error
if err := te.topoServ.DeleteTablet(context.Background(), tablet.Alias); err != nil {
log.Errorf("failed to DeleteTablet with alias : %v", err)
{ //log error
err := te.topoServ.DeleteTablet(context.Background(), tablet.Alias)
require.NoError(t, err, "failed to DeleteTablet with alias: %v", err)
}

//This is not automatically removed from shard replication, which results in log spam and log error
if err := topo.DeleteTabletReplicationData(context.Background(), te.topoServ, tablet); err != nil {
log.Errorf("failed to automatically remove from shard replication: %v", err)
{ //This is not automatically removed from shard replication, which results in log spam and log error
err := topo.DeleteTabletReplicationData(context.Background(), te.topoServ, tablet)
require.NoError(t, err, "failed to automatically remove from shard replication: %v", err)
}
}
11 changes: 4 additions & 7 deletions go/vt/schema/ddl_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,10 @@ func (setting *DDLStrategySetting) RuntimeOptions() []string {

// IsSkipTopo suggests that DDL should apply to tables bypassing global topo request
func (setting *DDLStrategySetting) IsSkipTopo() bool {
switch {
case setting.IsSingleton(), setting.IsSingletonContext():
return true
case setting.hasFlag(skipTopoFlag):
return true
}
return false
// Vitess 11 introduced the flag -skip-topo. starting Vitess 12 the flag is _always_ considered 'true'.
// Ideally the flag should be gone, but for backwards compatibility we allow users to still specify it
// (and we stil ignore the value, it's always set to true)
return true
}

// ToString returns a simple string representation of this instance
Expand Down
Loading

0 comments on commit dc89131

Please sign in to comment.