Skip to content

Commit

Permalink
Update tablet picker options if keyspace was resharded.
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Mar 3, 2024
1 parent 45ae25e commit d967187
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 11 deletions.
4 changes: 2 additions & 2 deletions examples/local/vstream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
*/
func main() {
ctx := context.Background()
streamCustomer := true
streamCustomer := false
var vgtid *binlogdatapb.VGtid
if streamCustomer {
vgtid = &binlogdatapb.VGtid{
Expand All @@ -56,7 +56,7 @@ func main() {
} else {
vgtid = &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "commerce",
Keyspace: "customer",
Shard: "0",
Gtid: "",
}}}
Expand Down
69 changes: 60 additions & 9 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/srvtopo"
Expand Down Expand Up @@ -497,24 +498,40 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
var err error
cells := vs.getCells()

tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...)
tpo := vs.tabletPickerOptions
resharded, err := vs.keyspaceHasBeenResharded(ctx, sgtid.Keyspace)
if err != nil {
log.Errorf(err.Error())
return err
return vterrors.Wrapf(err, "failed to determine if keyspace %s has been resharded", sgtid.Keyspace)

Check warning on line 504 in go/vt/vtgate/vstream_manager.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtgate/vstream_manager.go#L504

Added line #L504 was not covered by tests
}
if resharded {
// The non-serving tablet in the old / non-serving shard will contain all of
// the GTIDs that we need before transitioning to the new shards along with
// the journal event that will then allow us to automatically transition to
// the new shards (provided the stop_on_reshard option is not set).
tpo.IncludeNonServingTablets = true
}

tabletPickerErr := func(err error) error {
tperr := vterrors.Wrapf(err, "failed to find a %s tablet for VStream in %s/%s within the %s cell(s)",
vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ","))
log.Errorf("%v", tperr)
return tperr

Check warning on line 518 in go/vt/vtgate/vstream_manager.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtgate/vstream_manager.go#L515-L518

Added lines #L515 - L518 were not covered by tests
}
tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.GetKeyspace(), sgtid.GetShard(), vs.tabletType.String(), tpo, ignoreTablets...)
if err != nil {
return tabletPickerErr(err)

Check warning on line 522 in go/vt/vtgate/vstream_manager.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtgate/vstream_manager.go#L522

Added line #L522 was not covered by tests
}
// Create a child context with a stricter timeout when picking a tablet.
// This will prevent hanging in the case no tablets are found.
tpCtx, tpCancel := context.WithTimeout(ctx, tabletPickerContextTimeout)
defer tpCancel()

tablet, err := tp.PickForStreaming(tpCtx)
if err != nil {
log.Errorf(err.Error())
return err
return tabletPickerErr(err)

Check warning on line 530 in go/vt/vtgate/vstream_manager.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtgate/vstream_manager.go#L530

Added line #L530 was not covered by tests
}
log.Infof("Picked tablet %s for for %s/%s/%s/%s", tablet.Alias.String(), strings.Join(cells, ","),
sgtid.Keyspace, sgtid.Shard, vs.tabletType.String())
log.Infof("Picked a %s tablet for VStream in %s/%s within the %s cell(s)",
vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ","))

target := &querypb.Target{
Keyspace: sgtid.Keyspace,
Shard: sgtid.Shard,
Expand Down Expand Up @@ -737,7 +754,7 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e
if err := vs.getError(); err != nil {
return err
}
// convert all gtids to vgtids. This should be done here while holding the lock.
// Convert all gtids to vgtids. This should be done here while holding the lock.
for j, event := range events {
if event.Type == binlogdatapb.VEventType_GTID {
// Update the VGtid and send that instead.
Expand Down Expand Up @@ -921,3 +938,37 @@ func (vs *vstream) getJournalEvent(ctx context.Context, sgtid *binlogdatapb.Shar
close(je.done)
return je, nil
}

// keyspaceHasBeenResharded returns true if the keyspace's serving shard set has changed
// since the last VStream as indicated by the shard definitions provided in the vgtid.
func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string) (bool, error) {
shards, err := vs.ts.FindAllShardsInKeyspace(ctx, keyspace, nil)
if err != nil || len(shards) == 0 {
return false, err

Check warning on line 947 in go/vt/vtgate/vstream_manager.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtgate/vstream_manager.go#L947

Added line #L947 was not covered by tests
}
for _, g := range vs.vgtid.ShardGtids {
if g.GetKeyspace() == keyspace {
// If we already have the correct (serving) shards in our vgtid then the
// keyspace MAY have an active Reshard workflow but the keyspace has not
// been resharded (meaning traffic has been switched in an active Reshard
// workflow) since we were last streaming.
if shards[g.GetShard()].GetIsPrimaryServing() {
return false, nil
}
}
}
for _, i := range shards {
for _, j := range shards {
if i.ShardName() == j.ShardName() && key.KeyRangeEqual(i.GetKeyRange(), j.GetKeyRange()) {
// It's the same shard so skip it.
continue
}
if key.KeyRangeIntersect(i.GetKeyRange(), j.GetKeyRange()) {
// We have different shards with overlapping keyranges so we know
// that a reshard has occurred.
return true, nil
}
}
}
return false, nil

Check warning on line 973 in go/vt/vtgate/vstream_manager.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtgate/vstream_manager.go#L973

Added line #L973 was not covered by tests
}

0 comments on commit d967187

Please sign in to comment.