Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TabletServer: Handle nil targets properly everywhere #14734

Merged
merged 10 commits into from
Dec 9, 2023
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletserver/messager/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ import (
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/mysql/replication"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
)

var (
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func (state servingState) String() string {
var transitionRetryInterval = 1 * time.Second
var logInitTime sync.Once

var ErrNoTarget = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No target")

// stateManager manages state transition for all the TabletServer
// subcomponents.
type stateManager struct {
Expand Down Expand Up @@ -433,7 +435,7 @@ func (sm *stateManager) verifyTargetLocked(ctx context.Context, target *querypb.
}
} else {
if !tabletenv.IsLocalContext(ctx) {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No target")
return ErrNoTarget
}
}
return nil
Expand Down
77 changes: 66 additions & 11 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func NewTabletServer(ctx context.Context, name string, config *tabletenv.TabletC
tsOnce.Do(func() { srvTopoServer = srvtopo.NewResilientServer(ctx, topoServer, "TabletSrvTopo") })

tabletTypeFunc := func() topodatapb.TabletType {
if tsv.sm == nil {
if tsv.sm == nil || tsv.sm.Target() == nil {
return topodatapb.TabletType_UNKNOWN
}
return tsv.sm.Target().TabletType
Expand Down Expand Up @@ -551,7 +551,11 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save
logStats.OriginalSQL = beginSQL
if beginSQL != "" {
tsv.stats.QueryTimings.Record("BEGIN", startTime)
tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), startTime)
targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), startTime)
} else {
logStats.Method = ""
}
Expand Down Expand Up @@ -585,6 +589,24 @@ func (tsv *TabletServer) getPriorityFromOptions(options *querypb.ExecuteOptions)
return optionsPriority
}

// resolveTargetType returns the appropriate target tablet type for a
// TabletServer request. If the caller has a local context then it's
// an internal request and the target is the local tablet's current
// target. If it's not a local context then there should always be a
// non-nil target specified.
func (tsv *TabletServer) resolveTargetType(ctx context.Context, target *querypb.Target) (topodatapb.TabletType, error) {
if target != nil {
return target.TabletType, nil
}
if !tabletenv.IsLocalContext(ctx) {
return topodatapb.TabletType_UNKNOWN, ErrNoTarget
}
if tsv.sm.Target() == nil {
return topodatapb.TabletType_UNKNOWN, nil // This is true, and does not block the request
}
return tsv.sm.Target().TabletType, nil
}

// Commit commits the specified transaction.
func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, transactionID int64) (newReservedID int64, err error) {
err = tsv.execRequest(
Expand All @@ -607,7 +629,11 @@ func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, tra
// handlePanicAndSendLogStats doesn't log the no-op.
if commitSQL != "" {
tsv.stats.QueryTimings.Record("COMMIT", startTime)
tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), startTime)
targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), startTime)
} else {
logStats.Method = ""
}
Expand All @@ -625,7 +651,11 @@ func (tsv *TabletServer) Rollback(ctx context.Context, target *querypb.Target, t
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
defer tsv.stats.QueryTimings.Record("ROLLBACK", time.Now())
defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now())
targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now())
logStats.TransactionID = transactionID
newReservedID, err = tsv.te.Rollback(ctx, transactionID)
if newReservedID > 0 {
Expand Down Expand Up @@ -836,6 +866,10 @@ func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sq
return err
}
}
targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
qre := &QueryExecutor{
query: query,
marginComments: comments,
Expand All @@ -846,7 +880,7 @@ func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sq
ctx: ctx,
logStats: logStats,
tsv: tsv,
tabletType: target.GetTabletType(),
Copy link
Contributor Author

@mattlord mattlord Dec 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This returned UNKNOWN if the method receiver (target) is nil.

tabletType: targetType,
setting: connSetting,
}
result, err = qre.Execute()
Expand Down Expand Up @@ -1240,7 +1274,11 @@ func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *queryp
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
defer tsv.stats.QueryTimings.Record("RESERVE", time.Now())
defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now())
targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now())
connID, sessionStateChanges, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries)
if err != nil {
return err
Expand Down Expand Up @@ -1286,7 +1324,11 @@ func (tsv *TabletServer) ReserveBeginStreamExecute(
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
defer tsv.stats.QueryTimings.Record("RESERVE", time.Now())
defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now())
targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now())
connID, sessionStateChanges, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries)
if err != nil {
return err
Expand Down Expand Up @@ -1340,7 +1382,11 @@ func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Tar
target, options, allowOnShutdown,
func(ctx context.Context, logStats *tabletenv.LogStats) error {
defer tsv.stats.QueryTimings.Record("RESERVE", time.Now())
defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now())
targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now())
state.ReservedID, err = tsv.te.Reserve(ctx, options, transactionID, preQueries)
if err != nil {
return err
Expand Down Expand Up @@ -1391,7 +1437,11 @@ func (tsv *TabletServer) ReserveStreamExecute(
target, options, allowOnShutdown,
func(ctx context.Context, logStats *tabletenv.LogStats) error {
defer tsv.stats.QueryTimings.Record("RESERVE", time.Now())
defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now())
targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now())
state.ReservedID, err = tsv.te.Reserve(ctx, options, transactionID, preQueries)
if err != nil {
return err
Expand Down Expand Up @@ -1421,15 +1471,19 @@ func (tsv *TabletServer) Release(ctx context.Context, target *querypb.Target, tr
target, nil, true, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
defer tsv.stats.QueryTimings.Record("RELEASE", time.Now())
defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now())
targetType, err := tsv.resolveTargetType(ctx, target)
if err != nil {
return err
}
defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now())
logStats.TransactionID = transactionID
logStats.ReservedID = reservedID
if reservedID != 0 {
// Release to close the underlying connection.
return tsv.te.Release(reservedID)
}
// Rollback to cleanup the transaction before returning to the pool.
_, err := tsv.te.Rollback(ctx, transactionID)
_, err = tsv.te.Rollback(ctx, transactionID)
return err
},
)
Expand Down Expand Up @@ -1505,6 +1559,7 @@ func (tsv *TabletServer) execRequest(
span.Annotate("workload_name", options.WorkloadName)
}
trace.AnnotateSQL(span, sqlparser.Preview(sql))
// With a tabletenv.LocalContext() the target will be nil.
if target != nil {
span.Annotate("cell", target.Cell)
span.Annotate("shard", target.Shard)
Expand Down
72 changes: 72 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,78 @@ func TestTabletServerCommitPrepared(t *testing.T) {
require.NoError(t, err)
}

// TestTabletServerWithNilTarget confirms that a nil target is
// handled correctly. This means that when a local context is
// used, the target type is inferred from the local tablet's
// latest target type.
// And if it's not a local context then we return an error.
func TestTabletServerWithNilTarget(t *testing.T) {
// A non-nil target is required when not using a local context.
ctx := tabletenv.LocalContext()
db, tsv := setupTabletServerTest(t, ctx, "")
defer tsv.StopService()
defer db.Close()

// With a nil target, the local tablet's latest target type is
// what should be used as the inferred target type for our local
// calls.
target := (*querypb.Target)(nil)
localTargetType := topodatapb.TabletType_RDONLY // Use a non-default type
err := tsv.SetServingType(localTargetType, time.Now(), true, "test")
require.NoError(t, err)

baseKey := "TabletServerTest" // Our TabletServer's name
fullKey := fmt.Sprintf("%s.%s", baseKey, localTargetType.String())

executeSQL := "select * from test_table limit 1000"
executeSQLResult := &sqltypes.Result{
Fields: []*querypb.Field{
{Type: sqltypes.VarBinary},
},
Rows: [][]sqltypes.Value{
{sqltypes.NewVarBinary("row01")},
},
}
// BEGIN gets transmuted to this since it's a RDONLY tablet.
db.AddQuery("start transaction read only", &sqltypes.Result{})
db.AddQuery(executeSQL, executeSQLResult)

expectedCount := tsv.stats.QueryTimingsByTabletType.Counts()[fullKey]

state, err := tsv.Begin(ctx, target, nil)
require.NoError(t, err)
expectedCount++
require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey])

_, err = tsv.Execute(ctx, target, executeSQL, nil, state.TransactionID, 0, nil)
require.NoError(t, err)
expectedCount++
require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey])

_, err = tsv.Rollback(ctx, target, state.TransactionID)
require.NoError(t, err)
expectedCount++
require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey])

state, err = tsv.Begin(ctx, target, nil)
require.NoError(t, err)
expectedCount++
require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey])

_, err = tsv.Commit(ctx, target, state.TransactionID)
require.NoError(t, err)
expectedCount++
require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey])

// Finally be sure that we return an error now as expected when NOT
// using a local context but passing a nil target.
nonLocalCtx := context.Background()
_, err = tsv.Begin(nonLocalCtx, target, nil)
require.True(t, errors.Is(err, ErrNoTarget))
_, err = tsv.resolveTargetType(nonLocalCtx, target)
require.True(t, errors.Is(err, ErrNoTarget))
}

func TestSmallerTimeout(t *testing.T) {
testcases := []struct {
t1, t2, want time.Duration
Expand Down
Loading