Skip to content

Commit

Permalink
TabletServer: Handle nil targets properly everywhere (#14734)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Dec 9, 2023
1 parent ab1ba2e commit a77344b
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 15 deletions.
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletserver/messager/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ import (
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/mysql/replication"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
)

var (
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func (state servingState) String() string {
var transitionRetryInterval = 1 * time.Second
var logInitTime sync.Once

var ErrNoTarget = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No target")

// stateManager manages state transition for all the TabletServer
// subcomponents.
type stateManager struct {
Expand Down Expand Up @@ -433,7 +435,7 @@ func (sm *stateManager) verifyTargetLocked(ctx context.Context, target *querypb.
}
} else {
if !tabletenv.IsLocalContext(ctx) {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No target")
return ErrNoTarget
}
}
return nil
Expand Down
77 changes: 66 additions & 11 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func NewTabletServer(ctx context.Context, name string, config *tabletenv.TabletC
tsOnce.Do(func() { srvTopoServer = srvtopo.NewResilientServer(ctx, topoServer, "TabletSrvTopo") })

tabletTypeFunc := func() topodatapb.TabletType {
if tsv.sm == nil {
if tsv.sm == nil || tsv.sm.Target() == nil {
return topodatapb.TabletType_UNKNOWN
}
return tsv.sm.Target().TabletType
Expand Down Expand Up @@ -551,7 +551,11 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save
logStats.OriginalSQL = beginSQL
if beginSQL != "" {
tsv.stats.QueryTimings.Record("BEGIN", startTime)
tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), startTime)
targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), startTime)
} else {
logStats.Method = ""
}
Expand Down Expand Up @@ -585,6 +589,24 @@ func (tsv *TabletServer) getPriorityFromOptions(options *querypb.ExecuteOptions)
return optionsPriority
}

// resolveTargetType returns the appropriate target tablet type for a
// TabletServer request. If the caller has a local context then it's
// an internal request and the target is the local tablet's current
// target. If it's not a local context then there should always be a
// non-nil target specified.
func (tsv *TabletServer) resolveTargetType(ctx context.Context, target *querypb.Target) (topodatapb.TabletType, error) {
if target != nil {
return target.TabletType, nil
}
if !tabletenv.IsLocalContext(ctx) {
return topodatapb.TabletType_UNKNOWN, ErrNoTarget
}
if tsv.sm.Target() == nil {
return topodatapb.TabletType_UNKNOWN, nil // This is true, and does not block the request
}
return tsv.sm.Target().TabletType, nil
}

// Commit commits the specified transaction.
func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, transactionID int64) (newReservedID int64, err error) {
err = tsv.execRequest(
Expand All @@ -607,7 +629,11 @@ func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, tra
// handlePanicAndSendLogStats doesn't log the no-op.
if commitSQL != "" {
tsv.stats.QueryTimings.Record("COMMIT", startTime)
tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), startTime)
targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), startTime)
} else {
logStats.Method = ""
}
Expand All @@ -625,7 +651,11 @@ func (tsv *TabletServer) Rollback(ctx context.Context, target *querypb.Target, t
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
defer tsv.stats.QueryTimings.Record("ROLLBACK", time.Now())
defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now())
targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now())
logStats.TransactionID = transactionID
newReservedID, err = tsv.te.Rollback(ctx, transactionID)
if newReservedID > 0 {
Expand Down Expand Up @@ -836,6 +866,10 @@ func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sq
return err
}
}
targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
qre := &QueryExecutor{
query: query,
marginComments: comments,
Expand All @@ -846,7 +880,7 @@ func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sq
ctx: ctx,
logStats: logStats,
tsv: tsv,
tabletType: target.GetTabletType(),
tabletType: targetType,
setting: connSetting,
}
result, err = qre.Execute()
Expand Down Expand Up @@ -1240,7 +1274,11 @@ func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *queryp
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
defer tsv.stats.QueryTimings.Record("RESERVE", time.Now())
defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now())
targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now())
connID, sessionStateChanges, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries)
if err != nil {
return err
Expand Down Expand Up @@ -1286,7 +1324,11 @@ func (tsv *TabletServer) ReserveBeginStreamExecute(
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
defer tsv.stats.QueryTimings.Record("RESERVE", time.Now())
defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now())
targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now())
connID, sessionStateChanges, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries)
if err != nil {
return err
Expand Down Expand Up @@ -1340,7 +1382,11 @@ func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Tar
target, options, allowOnShutdown,
func(ctx context.Context, logStats *tabletenv.LogStats) error {
defer tsv.stats.QueryTimings.Record("RESERVE", time.Now())
defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now())
targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now())
state.ReservedID, err = tsv.te.Reserve(ctx, options, transactionID, preQueries)
if err != nil {
return err
Expand Down Expand Up @@ -1391,7 +1437,11 @@ func (tsv *TabletServer) ReserveStreamExecute(
target, options, allowOnShutdown,
func(ctx context.Context, logStats *tabletenv.LogStats) error {
defer tsv.stats.QueryTimings.Record("RESERVE", time.Now())
defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now())
targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now())
state.ReservedID, err = tsv.te.Reserve(ctx, options, transactionID, preQueries)
if err != nil {
return err
Expand Down Expand Up @@ -1421,15 +1471,19 @@ func (tsv *TabletServer) Release(ctx context.Context, target *querypb.Target, tr
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
defer tsv.stats.QueryTimings.Record("RELEASE", time.Now())
defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now())
targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now())
logStats.TransactionID = transactionID
logStats.ReservedID = reservedID
if reservedID != 0 {
// Release to close the underlying connection.
return tsv.te.Release(reservedID)
}
// Rollback to cleanup the transaction before returning to the pool.
_, err := tsv.te.Rollback(ctx, transactionID)
_, err = tsv.te.Rollback(ctx, transactionID)
return err
},
)
Expand Down Expand Up @@ -1505,6 +1559,7 @@ func (tsv *TabletServer) execRequest(
span.Annotate("workload_name", options.WorkloadName)
}
trace.AnnotateSQL(span, sqlparser.Preview(sql))
// With a tabletenv.LocalContext() the target will be nil.
if target != nil {
span.Annotate("cell", target.Cell)
span.Annotate("shard", target.Shard)
Expand Down
72 changes: 72 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,78 @@ func TestTabletServerCommitPrepared(t *testing.T) {
require.NoError(t, err)
}

// TestTabletServerWithNilTarget confirms that a nil target is
// handled correctly. This means that when a local context is
// used, the target type is inferred from the local tablet's
// latest target type.
// And if it's not a local context then we return an error.
func TestTabletServerWithNilTarget(t *testing.T) {
// A non-nil target is required when not using a local context.
ctx := tabletenv.LocalContext()
db, tsv := setupTabletServerTest(t, ctx, "")
defer tsv.StopService()
defer db.Close()

// With a nil target, the local tablet's latest target type is
// what should be used as the inferred target type for our local
// calls.
target := (*querypb.Target)(nil)
localTargetType := topodatapb.TabletType_RDONLY // Use a non-default type
err := tsv.SetServingType(localTargetType, time.Now(), true, "test")
require.NoError(t, err)

baseKey := "TabletServerTest" // Our TabletServer's name
fullKey := fmt.Sprintf("%s.%s", baseKey, localTargetType.String())

executeSQL := "select * from test_table limit 1000"
executeSQLResult := &sqltypes.Result{
Fields: []*querypb.Field{
{Type: sqltypes.VarBinary},
},
Rows: [][]sqltypes.Value{
{sqltypes.NewVarBinary("row01")},
},
}
// BEGIN gets transmuted to this since it's a RDONLY tablet.
db.AddQuery("start transaction read only", &sqltypes.Result{})
db.AddQuery(executeSQL, executeSQLResult)

expectedCount := tsv.stats.QueryTimingsByTabletType.Counts()[fullKey]

state, err := tsv.Begin(ctx, target, nil)
require.NoError(t, err)
expectedCount++
require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey])

_, err = tsv.Execute(ctx, target, executeSQL, nil, state.TransactionID, 0, nil)
require.NoError(t, err)
expectedCount++
require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey])

_, err = tsv.Rollback(ctx, target, state.TransactionID)
require.NoError(t, err)
expectedCount++
require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey])

state, err = tsv.Begin(ctx, target, nil)
require.NoError(t, err)
expectedCount++
require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey])

_, err = tsv.Commit(ctx, target, state.TransactionID)
require.NoError(t, err)
expectedCount++
require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey])

// Finally be sure that we return an error now as expected when NOT
// using a local context but passing a nil target.
nonLocalCtx := context.Background()
_, err = tsv.Begin(nonLocalCtx, target, nil)
require.True(t, errors.Is(err, ErrNoTarget))
_, err = tsv.resolveTargetType(nonLocalCtx, target)
require.True(t, errors.Is(err, ErrNoTarget))
}

func TestSmallerTimeout(t *testing.T) {
testcases := []struct {
t1, t2, want time.Duration
Expand Down

0 comments on commit a77344b

Please sign in to comment.