Skip to content

Commit

Permalink
slack-15.0: VStreams fix backports/patches, pt. 2 (#380)
Browse files Browse the repository at this point in the history
  • Loading branch information
timvaillancourt authored May 29, 2024
1 parent 8125303 commit dce7107
Show file tree
Hide file tree
Showing 20 changed files with 1,130 additions and 408 deletions.
1 change: 1 addition & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ func testVStreamCellFlag(t *testing.T) {
flags := &vtgatepb.VStreamFlags{}
if tc.cells != "" {
flags.Cells = tc.cells
flags.CellPreference = "onlyspecified"
}

ctx2, cancel := context.WithTimeout(ctx, 30*time.Second)
Expand Down
267 changes: 236 additions & 31 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,40 @@ import (
"vitess.io/vitess/go/vt/vterrors"
)

type TabletPickerCellPreference int

const (
// PreferLocalWithAlias gives preference to the local cell first, then specified cells, if any.
// This is the default when no other option is provided.
TabletPickerCellPreference_PreferLocalWithAlias TabletPickerCellPreference = iota
// OnlySpecified only picks tablets from the list of cells given.
TabletPickerCellPreference_OnlySpecified
)

type TabletPickerTabletOrder int

const (
// All provided tablet types are given equal priority. This is the default.
TabletPickerTabletOrder_Any TabletPickerTabletOrder = iota
// Provided tablet types are expected to be prioritized in the given order.
TabletPickerTabletOrder_InOrder
)

var (
tabletPickerRetryDelay = 30 * time.Second
muTabletPickerRetryDelay sync.Mutex
globalTPStats *tabletPickerStats
inOrderHint = "in_order:"

tabletPickerCellPreferenceMap = map[string]TabletPickerCellPreference{
"preferlocalwithalias": TabletPickerCellPreference_PreferLocalWithAlias,
"onlyspecified": TabletPickerCellPreference_OnlySpecified,
}

tabletPickerTabletOrderMap = map[string]TabletPickerTabletOrder{
"any": TabletPickerTabletOrder_Any,
"inorder": TabletPickerTabletOrder_InOrder,
}
)

// GetTabletPickerRetryDelay synchronizes changes to tabletPickerRetryDelay. Used in tests only at the moment
Expand All @@ -62,18 +91,66 @@ func SetTabletPickerRetryDelay(delay time.Duration) {
tabletPickerRetryDelay = delay
}

type TabletPickerOptions struct {
CellPreference string
TabletOrder string
}

func parseTabletPickerCellPreferenceString(str string) (TabletPickerCellPreference, error) {
// return default if blank
if str == "" {
return TabletPickerCellPreference_PreferLocalWithAlias, nil
}

if c, ok := tabletPickerCellPreferenceMap[strings.ToLower(str)]; ok {
return c, nil
}

return -1, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid cell preference: %v", str)
}

func parseTabletPickerTabletOrderString(str string) (TabletPickerTabletOrder, error) {
// return default if blank
if str == "" {
return TabletPickerTabletOrder_Any, nil
}

if o, ok := tabletPickerTabletOrderMap[strings.ToLower(str)]; ok {
return o, nil
}

return -1, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid tablet order type: %v", str)
}

type localCellInfo struct {
localCell string
cellsInAlias map[string]string
}

// TabletPicker gives a simplified API for picking tablets.
type TabletPicker struct {
ts *topo.Server
cells []string
keyspace string
shard string
tabletTypes []topodatapb.TabletType
inOrder bool
ts *topo.Server
cells []string
keyspace string
shard string
tabletTypes []topodatapb.TabletType
inOrder bool
cellPref TabletPickerCellPreference
localCellInfo localCellInfo
// This map is keyed on the results of TabletAlias.String().
ignoreTablets map[string]struct{}
}

// NewTabletPicker returns a TabletPicker.
func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTypesStr string) (*TabletPicker, error) {
func NewTabletPicker(
ctx context.Context,
ts *topo.Server,
cells []string,
localCell, keyspace, shard, tabletTypesStr string,
options TabletPickerOptions,
ignoreTablets ...*topodatapb.TabletAlias,
) (*TabletPicker, error) {
// Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted.
tabletTypes, inOrder, err := ParseTabletTypesAndOrder(tabletTypesStr)
if err != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to parse list of tablet types: %v", tabletTypesStr)
Expand All @@ -92,19 +169,131 @@ func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTyp
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
fmt.Sprintf("Missing required field(s) for tablet picker: %s", strings.Join(missingFields, ", ")))
}
return &TabletPicker{
ts: ts,
cells: cells,
keyspace: keyspace,
shard: shard,
tabletTypes: tabletTypes,
inOrder: inOrder,
}, nil

// Resolve tablet picker options
cellPref, err := parseTabletPickerCellPreferenceString(options.CellPreference)
if err != nil {
return nil, err
}

// For backward compatibility only parse the options for tablet ordering
// if the in_order hint wasn't already specified. Otherwise it could be overridden.
// We can remove this check once the in_order hint is deprecated.
if !inOrder {
order, err := parseTabletPickerTabletOrderString(options.TabletOrder)
if err != nil {
return nil, err
}
switch order {
case TabletPickerTabletOrder_Any:
inOrder = false
case TabletPickerTabletOrder_InOrder:
inOrder = true
}
}

aliasCellMap := make(map[string]string)
if cellPref == TabletPickerCellPreference_PreferLocalWithAlias {
if localCell == "" {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot have local cell preference without local cell")
}

// Add local cell to the list of cells for tablet picking.
// This will be de-duped later if the local cell already exists in the original list - see: dedupeCells()
cells = append(cells, localCell)
aliasName := topo.GetAliasByCell(ctx, ts, localCell)

// If an alias exists
if aliasName != localCell {
alias, err := ts.GetCellsAlias(ctx, aliasName, false)
if err != nil {
return nil, vterrors.Wrap(err, "error fetching local cell alias")
}

// Add the aliasName to the list of cells for tablet picking.
cells = append(cells, aliasName)

// Create a map of the cells in the alias to make lookup faster later when we're giving preference to these.
// see prioritizeTablets()
for _, c := range alias.Cells {
aliasCellMap[c] = c
}
}
}

tp := &TabletPicker{
ts: ts,
cells: dedupeCells(cells),
localCellInfo: localCellInfo{localCell: localCell, cellsInAlias: aliasCellMap},
keyspace: keyspace,
shard: shard,
tabletTypes: tabletTypes,
inOrder: inOrder,
cellPref: cellPref,
ignoreTablets: make(map[string]struct{}, len(ignoreTablets)),
}

for _, ignoreTablet := range ignoreTablets {
tp.ignoreTablets[ignoreTablet.String()] = struct{}{}
}

return tp, nil

}

// dedupeCells is used to remove duplicates in the cell list in case it is passed in
// and exists in the local cell's alias. Can happen if CellPreference is PreferLocalWithAlias.
func dedupeCells(cells []string) []string {
keys := make(map[string]bool)
dedupedCells := []string{}

for _, c := range cells {
if _, value := keys[c]; !value {
keys[c] = true
dedupedCells = append(dedupedCells, c)
}
}
return dedupedCells
}

// prioritizeTablets orders the candidate pool of tablets based on CellPreference.
// If CellPreference is PreferLocalWithAlias then tablets in the local cell will be prioritized for selection,
// followed by the tablets within the local cell's alias, and finally any others specified by the client.
// If CellPreference is OnlySpecified, then tablets will only be selected randomly from the cells specified by the client.
func (tp *TabletPicker) prioritizeTablets(candidates []*topo.TabletInfo) (sameCell, sameAlias, allOthers []*topo.TabletInfo) {
for _, c := range candidates {
if c.Alias.Cell == tp.localCellInfo.localCell {
sameCell = append(sameCell, c)
} else if _, ok := tp.localCellInfo.cellsInAlias[c.Alias.Cell]; ok {
sameAlias = append(sameAlias, c)
} else {
allOthers = append(allOthers, c)
}
}

return sameCell, sameAlias, allOthers
}

func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo.TabletInfo {
// Sort candidates slice such that tablets appear in same tablet type order as in tp.tabletTypes
orderMap := map[topodatapb.TabletType]int{}
for i, t := range tp.tabletTypes {
orderMap[t] = i
}
sort.Slice(candidates, func(i, j int) bool {
if orderMap[candidates[i].Type] == orderMap[candidates[j].Type] {
// identical tablet types: randomize order of tablets for this type
return rand.Intn(2) == 0 // 50% chance
}
return orderMap[candidates[i].Type] < orderMap[candidates[j].Type]
})

return candidates
}

// PickForStreaming picks an available tablet.
// All tablets that belong to tp.cells are evaluated and one is
// chosen at random.
// Selection is based on CellPreference.
// See prioritizeTablets for prioritization logic.
func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) {
rand.Seed(time.Now().UnixNano())
// keep trying at intervals (tabletPickerRetryDelay) until a tablet is found
Expand All @@ -116,19 +305,30 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
default:
}
candidates := tp.GetMatchingTablets(ctx)
if tp.inOrder {
// Sort candidates slice such that tablets appear in same tablet type order as in tp.tabletTypes
orderMap := map[topodatapb.TabletType]int{}
for i, t := range tp.tabletTypes {
orderMap[t] = i
if tp.cellPref == TabletPickerCellPreference_PreferLocalWithAlias {
sameCellCandidates, sameAliasCandidates, allOtherCandidates := tp.prioritizeTablets(candidates)

if tp.inOrder {
sameCellCandidates = tp.orderByTabletType(sameCellCandidates)
sameAliasCandidates = tp.orderByTabletType(sameAliasCandidates)
allOtherCandidates = tp.orderByTabletType(allOtherCandidates)
} else {
// Randomize candidates
rand.Shuffle(len(sameCellCandidates), func(i, j int) {
sameCellCandidates[i], sameCellCandidates[j] = sameCellCandidates[j], sameCellCandidates[i]
})
rand.Shuffle(len(sameAliasCandidates), func(i, j int) {
sameAliasCandidates[i], sameAliasCandidates[j] = sameAliasCandidates[j], sameAliasCandidates[i]
})
rand.Shuffle(len(allOtherCandidates), func(i, j int) {
allOtherCandidates[i], allOtherCandidates[j] = allOtherCandidates[j], allOtherCandidates[i]
})
}
sort.Slice(candidates, func(i, j int) bool {
if orderMap[candidates[i].Type] == orderMap[candidates[j].Type] {
// identical tablet types: randomize order of tablets for this type
return rand.Intn(2) == 0 // 50% chance
}
return orderMap[candidates[i].Type] < orderMap[candidates[j].Type]
})

candidates = append(sameCellCandidates, sameAliasCandidates...)
candidates = append(candidates, allOtherCandidates...)
} else if tp.inOrder {
candidates = tp.orderByTabletType(candidates)
} else {
// Randomize candidates
rand.Shuffle(len(candidates), func(i, j int) {
Expand Down Expand Up @@ -179,7 +379,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
log.Errorf("error getting shard %s/%s: %s", tp.keyspace, tp.shard, err.Error())
return nil
}
aliases = append(aliases, si.PrimaryAlias)
if _, ignore := tp.ignoreTablets[si.PrimaryAlias.String()]; !ignore {
aliases = append(aliases, si.PrimaryAlias)
}
} else {
actualCells := make([]string, 0)
for _, cell := range tp.cells {
Expand All @@ -204,6 +406,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
actualCells = append(actualCells, cell)
}
}

for _, cell := range actualCells {
shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
Expand All @@ -214,7 +417,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
}

for _, node := range sri.Nodes {
aliases = append(aliases, node.TabletAlias)
if _, ignore := tp.ignoreTablets[node.TabletAlias.String()]; !ignore {
aliases = append(aliases, node.TabletAlias)
}
}
}
}
Expand Down
Loading

0 comments on commit dce7107

Please sign in to comment.