From a77344b9e996e28eaad8ca8e0df58c90a632e76d Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 9 Dec 2023 08:42:18 -0500 Subject: [PATCH] TabletServer: Handle nil targets properly everywhere (#14734) Signed-off-by: Matt Lord --- .../tabletserver/messager/message_manager.go | 6 +- go/vt/vttablet/tabletserver/state_manager.go | 4 +- go/vt/vttablet/tabletserver/tabletserver.go | 77 ++++++++++++++++--- .../tabletserver/tabletserver_test.go | 72 +++++++++++++++++ 4 files changed, 144 insertions(+), 15 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index be2e0cf7034..845eaf8df40 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -28,17 +28,17 @@ import ( "golang.org/x/sync/semaphore" "vitess.io/vitess/go/mysql/replication" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" ) var ( diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 75d1a7c7c3b..98ed846600c 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -66,6 +66,8 @@ func (state servingState) String() string { var transitionRetryInterval = 1 * time.Second var logInitTime sync.Once +var ErrNoTarget = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No target") + // stateManager manages state transition for all the TabletServer // subcomponents. type stateManager struct { @@ -433,7 +435,7 @@ func (sm *stateManager) verifyTargetLocked(ctx context.Context, target *querypb. } } else { if !tabletenv.IsLocalContext(ctx) { - return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No target") + return ErrNoTarget } } return nil diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 5f9310add84..295a85c31fd 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -166,7 +166,7 @@ func NewTabletServer(ctx context.Context, name string, config *tabletenv.TabletC tsOnce.Do(func() { srvTopoServer = srvtopo.NewResilientServer(ctx, topoServer, "TabletSrvTopo") }) tabletTypeFunc := func() topodatapb.TabletType { - if tsv.sm == nil { + if tsv.sm == nil || tsv.sm.Target() == nil { return topodatapb.TabletType_UNKNOWN } return tsv.sm.Target().TabletType @@ -551,7 +551,11 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save logStats.OriginalSQL = beginSQL if beginSQL != "" { tsv.stats.QueryTimings.Record("BEGIN", startTime) - tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), startTime) + targetType, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err + } + tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), startTime) } else { logStats.Method = "" } @@ -585,6 +589,24 @@ func (tsv *TabletServer) getPriorityFromOptions(options *querypb.ExecuteOptions) return optionsPriority } +// resolveTargetType returns the appropriate target tablet type for a +// TabletServer request. If the caller has a local context then it's +// an internal request and the target is the local tablet's current +// target. If it's not a local context then there should always be a +// non-nil target specified. +func (tsv *TabletServer) resolveTargetType(ctx context.Context, target *querypb.Target) (topodatapb.TabletType, error) { + if target != nil { + return target.TabletType, nil + } + if !tabletenv.IsLocalContext(ctx) { + return topodatapb.TabletType_UNKNOWN, ErrNoTarget + } + if tsv.sm.Target() == nil { + return topodatapb.TabletType_UNKNOWN, nil // This is true, and does not block the request + } + return tsv.sm.Target().TabletType, nil +} + // Commit commits the specified transaction. func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, transactionID int64) (newReservedID int64, err error) { err = tsv.execRequest( @@ -607,7 +629,11 @@ func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, tra // handlePanicAndSendLogStats doesn't log the no-op. if commitSQL != "" { tsv.stats.QueryTimings.Record("COMMIT", startTime) - tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), startTime) + targetType, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err + } + tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), startTime) } else { logStats.Method = "" } @@ -625,7 +651,11 @@ func (tsv *TabletServer) Rollback(ctx context.Context, target *querypb.Target, t target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("ROLLBACK", time.Now()) - defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + targetType, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err + } + defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now()) logStats.TransactionID = transactionID newReservedID, err = tsv.te.Rollback(ctx, transactionID) if newReservedID > 0 { @@ -836,6 +866,10 @@ func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sq return err } } + targetType, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err + } qre := &QueryExecutor{ query: query, marginComments: comments, @@ -846,7 +880,7 @@ func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sq ctx: ctx, logStats: logStats, tsv: tsv, - tabletType: target.GetTabletType(), + tabletType: targetType, setting: connSetting, } result, err = qre.Execute() @@ -1240,7 +1274,11 @@ func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *queryp target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + targetType, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err + } + defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now()) connID, sessionStateChanges, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries) if err != nil { return err @@ -1286,7 +1324,11 @@ func (tsv *TabletServer) ReserveBeginStreamExecute( target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + targetType, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err + } + defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now()) connID, sessionStateChanges, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries) if err != nil { return err @@ -1340,7 +1382,11 @@ func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Tar target, options, allowOnShutdown, func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + targetType, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err + } + defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now()) state.ReservedID, err = tsv.te.Reserve(ctx, options, transactionID, preQueries) if err != nil { return err @@ -1391,7 +1437,11 @@ func (tsv *TabletServer) ReserveStreamExecute( target, options, allowOnShutdown, func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + targetType, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err + } + defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now()) state.ReservedID, err = tsv.te.Reserve(ctx, options, transactionID, preQueries) if err != nil { return err @@ -1421,7 +1471,11 @@ func (tsv *TabletServer) Release(ctx context.Context, target *querypb.Target, tr target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RELEASE", time.Now()) - defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + targetType, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err + } + defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now()) logStats.TransactionID = transactionID logStats.ReservedID = reservedID if reservedID != 0 { @@ -1429,7 +1483,7 @@ func (tsv *TabletServer) Release(ctx context.Context, target *querypb.Target, tr return tsv.te.Release(reservedID) } // Rollback to cleanup the transaction before returning to the pool. - _, err := tsv.te.Rollback(ctx, transactionID) + _, err = tsv.te.Rollback(ctx, transactionID) return err }, ) @@ -1505,6 +1559,7 @@ func (tsv *TabletServer) execRequest( span.Annotate("workload_name", options.WorkloadName) } trace.AnnotateSQL(span, sqlparser.Preview(sql)) + // With a tabletenv.LocalContext() the target will be nil. if target != nil { span.Annotate("cell", target.Cell) span.Annotate("shard", target.Shard) diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index a72a1aa76db..d8595630480 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -566,6 +566,78 @@ func TestTabletServerCommitPrepared(t *testing.T) { require.NoError(t, err) } +// TestTabletServerWithNilTarget confirms that a nil target is +// handled correctly. This means that when a local context is +// used, the target type is inferred from the local tablet's +// latest target type. +// And if it's not a local context then we return an error. +func TestTabletServerWithNilTarget(t *testing.T) { + // A non-nil target is required when not using a local context. + ctx := tabletenv.LocalContext() + db, tsv := setupTabletServerTest(t, ctx, "") + defer tsv.StopService() + defer db.Close() + + // With a nil target, the local tablet's latest target type is + // what should be used as the inferred target type for our local + // calls. + target := (*querypb.Target)(nil) + localTargetType := topodatapb.TabletType_RDONLY // Use a non-default type + err := tsv.SetServingType(localTargetType, time.Now(), true, "test") + require.NoError(t, err) + + baseKey := "TabletServerTest" // Our TabletServer's name + fullKey := fmt.Sprintf("%s.%s", baseKey, localTargetType.String()) + + executeSQL := "select * from test_table limit 1000" + executeSQLResult := &sqltypes.Result{ + Fields: []*querypb.Field{ + {Type: sqltypes.VarBinary}, + }, + Rows: [][]sqltypes.Value{ + {sqltypes.NewVarBinary("row01")}, + }, + } + // BEGIN gets transmuted to this since it's a RDONLY tablet. + db.AddQuery("start transaction read only", &sqltypes.Result{}) + db.AddQuery(executeSQL, executeSQLResult) + + expectedCount := tsv.stats.QueryTimingsByTabletType.Counts()[fullKey] + + state, err := tsv.Begin(ctx, target, nil) + require.NoError(t, err) + expectedCount++ + require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey]) + + _, err = tsv.Execute(ctx, target, executeSQL, nil, state.TransactionID, 0, nil) + require.NoError(t, err) + expectedCount++ + require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey]) + + _, err = tsv.Rollback(ctx, target, state.TransactionID) + require.NoError(t, err) + expectedCount++ + require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey]) + + state, err = tsv.Begin(ctx, target, nil) + require.NoError(t, err) + expectedCount++ + require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey]) + + _, err = tsv.Commit(ctx, target, state.TransactionID) + require.NoError(t, err) + expectedCount++ + require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey]) + + // Finally be sure that we return an error now as expected when NOT + // using a local context but passing a nil target. + nonLocalCtx := context.Background() + _, err = tsv.Begin(nonLocalCtx, target, nil) + require.True(t, errors.Is(err, ErrNoTarget)) + _, err = tsv.resolveTargetType(nonLocalCtx, target) + require.True(t, errors.Is(err, ErrNoTarget)) +} + func TestSmallerTimeout(t *testing.T) { testcases := []struct { t1, t2, want time.Duration