From f0fa6c3867f88475a8b5422366989b6e3489d640 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 8 Dec 2023 11:06:44 -0500 Subject: [PATCH 01/10] Pass static type for messaging TSV calls and add nil check in TSV Signed-off-by: Matt Lord --- .../tabletserver/messager/message_manager.go | 14 +++++--- go/vt/vttablet/tabletserver/tabletserver.go | 32 ++++++++++++++----- 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index be2e0cf7034..1053c5b2f89 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -28,20 +28,24 @@ import ( "golang.org/x/sync/semaphore" "vitess.io/vitess/go/mysql/replication" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var ( + // messageManager only runs on primary tablets. + queryTarget = &querypb.Target{TabletType: topodatapb.TabletType_PRIMARY} + // MessageStats tracks stats for messages. MessageStats = stats.NewGaugesWithMultiLabels( "Messages", @@ -635,7 +639,7 @@ func (mm *messageManager) postpone(ctx context.Context, tsv TabletService, ackWa defer mm.postponeSema.Release(1) ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), ackWaitTime) defer cancel() - if _, err := tsv.PostponeMessages(ctx, nil, mm, ids); err != nil { + if _, err := tsv.PostponeMessages(ctx, queryTarget, mm, ids); err != nil { // This can happen during spikes. Record the incident for monitoring. MessageStats.Add([]string{mm.name.String(), "PostponeFailed"}, 1) } @@ -833,7 +837,7 @@ func (mm *messageManager) runPurge() { cancel() }() for { - count, err := mm.tsv.PurgeMessages(ctx, nil, mm, time.Now().Add(-mm.purgeAfter).UnixNano()) + count, err := mm.tsv.PurgeMessages(ctx, queryTarget, mm, time.Now().Add(-mm.purgeAfter).UnixNano()) if err != nil { MessageStats.Add([]string{mm.name.String(), "PurgeFailed"}, 1) log.Errorf("Unable to delete messages: %v", err) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 5f9310add84..e53969860df 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -551,7 +551,9 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save logStats.OriginalSQL = beginSQL if beginSQL != "" { tsv.stats.QueryTimings.Record("BEGIN", startTime) - tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), startTime) + if target != nil { + tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), startTime) + } } else { logStats.Method = "" } @@ -607,7 +609,9 @@ func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, tra // handlePanicAndSendLogStats doesn't log the no-op. if commitSQL != "" { tsv.stats.QueryTimings.Record("COMMIT", startTime) - tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), startTime) + if target != nil { + tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), startTime) + } } else { logStats.Method = "" } @@ -625,7 +629,9 @@ func (tsv *TabletServer) Rollback(ctx context.Context, target *querypb.Target, t target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("ROLLBACK", time.Now()) - defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + if target != nil { + defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + } logStats.TransactionID = transactionID newReservedID, err = tsv.te.Rollback(ctx, transactionID) if newReservedID > 0 { @@ -1240,7 +1246,9 @@ func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *queryp target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + if target != nil { + defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + } connID, sessionStateChanges, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries) if err != nil { return err @@ -1286,7 +1294,9 @@ func (tsv *TabletServer) ReserveBeginStreamExecute( target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + if target != nil { + defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + } connID, sessionStateChanges, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries) if err != nil { return err @@ -1340,7 +1350,9 @@ func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Tar target, options, allowOnShutdown, func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + if target != nil { + defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + } state.ReservedID, err = tsv.te.Reserve(ctx, options, transactionID, preQueries) if err != nil { return err @@ -1391,7 +1403,9 @@ func (tsv *TabletServer) ReserveStreamExecute( target, options, allowOnShutdown, func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + if target != nil { + defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + } state.ReservedID, err = tsv.te.Reserve(ctx, options, transactionID, preQueries) if err != nil { return err @@ -1421,7 +1435,9 @@ func (tsv *TabletServer) Release(ctx context.Context, target *querypb.Target, tr target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RELEASE", time.Now()) - defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + if target != nil { + defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + } logStats.TransactionID = transactionID logStats.ReservedID = reservedID if reservedID != 0 { From ae8a5c646744710b87d70db4d6c7c3f1ec74bb12 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 8 Dec 2023 11:30:39 -0500 Subject: [PATCH 02/10] Add unit test Signed-off-by: Matt Lord --- .../tabletserver/tabletserver_test.go | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index a72a1aa76db..c9e312847b3 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -566,6 +566,33 @@ func TestTabletServerCommitPrepared(t *testing.T) { require.NoError(t, err) } +func TestTabletServerWithNilTarget(t *testing.T) { + // A non-nil target is required when not using a local context. + ctx := tabletenv.LocalContext() + db, tsv := setupTabletServerTest(t, ctx, "") + defer tsv.StopService() + defer db.Close() + + executeSQL := "select * from test_table limit 1000" + executeSQLResult := &sqltypes.Result{ + Fields: []*querypb.Field{ + {Type: sqltypes.VarBinary}, + }, + Rows: [][]sqltypes.Value{ + {sqltypes.NewVarBinary("row01")}, + }, + } + db.AddQuery(executeSQL, executeSQLResult) + + target := (*querypb.Target)(nil) + state, err := tsv.Begin(ctx, target, nil) + require.NoError(t, err) + _, err = tsv.Execute(ctx, target, executeSQL, nil, state.TransactionID, 0, nil) + require.NoError(t, err) + _, err = tsv.Rollback(ctx, target, state.TransactionID) + require.NoError(t, err) +} + func TestSmallerTimeout(t *testing.T) { testcases := []struct { t1, t2, want time.Duration From fa19408e5c5d8b29ec7d36454d012a59b716b276 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 8 Dec 2023 11:56:21 -0500 Subject: [PATCH 03/10] Improve comments Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/tabletserver.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index e53969860df..abb8f92e665 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -551,6 +551,7 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save logStats.OriginalSQL = beginSQL if beginSQL != "" { tsv.stats.QueryTimings.Record("BEGIN", startTime) + // With a tabletenv.LocalContext() the target can be nil. if target != nil { tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), startTime) } @@ -609,6 +610,7 @@ func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, tra // handlePanicAndSendLogStats doesn't log the no-op. if commitSQL != "" { tsv.stats.QueryTimings.Record("COMMIT", startTime) + // With a tabletenv.LocalContext() the target is nil. if target != nil { tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), startTime) } @@ -629,6 +631,7 @@ func (tsv *TabletServer) Rollback(ctx context.Context, target *querypb.Target, t target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("ROLLBACK", time.Now()) + // With a tabletenv.LocalContext() the target is nil. if target != nil { defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) } @@ -1246,6 +1249,7 @@ func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *queryp target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) + // With a tabletenv.LocalContext() the target is nil. if target != nil { defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) } @@ -1294,6 +1298,7 @@ func (tsv *TabletServer) ReserveBeginStreamExecute( target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) + // With a tabletenv.LocalContext() the target is nil. if target != nil { defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) } @@ -1350,6 +1355,7 @@ func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Tar target, options, allowOnShutdown, func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) + // With a tabletenv.LocalContext() the target is nil. if target != nil { defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) } @@ -1403,6 +1409,7 @@ func (tsv *TabletServer) ReserveStreamExecute( target, options, allowOnShutdown, func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) + // With a tabletenv.LocalContext() the target is nil. if target != nil { defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) } @@ -1435,6 +1442,7 @@ func (tsv *TabletServer) Release(ctx context.Context, target *querypb.Target, tr target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RELEASE", time.Now()) + // With a tabletenv.LocalContext() the target is nil. if target != nil { defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) } @@ -1521,6 +1529,7 @@ func (tsv *TabletServer) execRequest( span.Annotate("workload_name", options.WorkloadName) } trace.AnnotateSQL(span, sqlparser.Preview(sql)) + // With a tabletenv.LocalContext() the target is nil. if target != nil { span.Annotate("cell", target.Cell) span.Annotate("shard", target.Shard) From f3d0deb781518e22c73e2a06ebac32038ae7edda Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 8 Dec 2023 11:57:47 -0500 Subject: [PATCH 04/10] Revert message manager changes Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/messager/message_manager.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 1053c5b2f89..845eaf8df40 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -39,13 +39,9 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var ( - // messageManager only runs on primary tablets. - queryTarget = &querypb.Target{TabletType: topodatapb.TabletType_PRIMARY} - // MessageStats tracks stats for messages. MessageStats = stats.NewGaugesWithMultiLabels( "Messages", @@ -639,7 +635,7 @@ func (mm *messageManager) postpone(ctx context.Context, tsv TabletService, ackWa defer mm.postponeSema.Release(1) ctx, cancel := context.WithTimeout(tabletenv.LocalContext(), ackWaitTime) defer cancel() - if _, err := tsv.PostponeMessages(ctx, queryTarget, mm, ids); err != nil { + if _, err := tsv.PostponeMessages(ctx, nil, mm, ids); err != nil { // This can happen during spikes. Record the incident for monitoring. MessageStats.Add([]string{mm.name.String(), "PostponeFailed"}, 1) } @@ -837,7 +833,7 @@ func (mm *messageManager) runPurge() { cancel() }() for { - count, err := mm.tsv.PurgeMessages(ctx, queryTarget, mm, time.Now().Add(-mm.purgeAfter).UnixNano()) + count, err := mm.tsv.PurgeMessages(ctx, nil, mm, time.Now().Add(-mm.purgeAfter).UnixNano()) if err != nil { MessageStats.Add([]string{mm.name.String(), "PurgeFailed"}, 1) log.Errorf("Unable to delete messages: %v", err) From 7f8c840ba97499c940ec9c27692430479e6c3e5a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 8 Dec 2023 11:59:33 -0500 Subject: [PATCH 05/10] Unify comments Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/tabletserver.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index abb8f92e665..80992dadcb3 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -610,7 +610,7 @@ func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, tra // handlePanicAndSendLogStats doesn't log the no-op. if commitSQL != "" { tsv.stats.QueryTimings.Record("COMMIT", startTime) - // With a tabletenv.LocalContext() the target is nil. + // With a tabletenv.LocalContext() the target can be nil. if target != nil { tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), startTime) } @@ -631,7 +631,7 @@ func (tsv *TabletServer) Rollback(ctx context.Context, target *querypb.Target, t target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("ROLLBACK", time.Now()) - // With a tabletenv.LocalContext() the target is nil. + // With a tabletenv.LocalContext() the target can be nil. if target != nil { defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) } @@ -1249,7 +1249,7 @@ func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *queryp target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - // With a tabletenv.LocalContext() the target is nil. + // With a tabletenv.LocalContext() the target can be nil. if target != nil { defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) } @@ -1298,7 +1298,7 @@ func (tsv *TabletServer) ReserveBeginStreamExecute( target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - // With a tabletenv.LocalContext() the target is nil. + // With a tabletenv.LocalContext() the target can be nil. if target != nil { defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) } @@ -1355,7 +1355,7 @@ func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Tar target, options, allowOnShutdown, func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - // With a tabletenv.LocalContext() the target is nil. + // With a tabletenv.LocalContext() the target can be nil. if target != nil { defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) } @@ -1409,7 +1409,7 @@ func (tsv *TabletServer) ReserveStreamExecute( target, options, allowOnShutdown, func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - // With a tabletenv.LocalContext() the target is nil. + // With a tabletenv.LocalContext() the target can be nil. if target != nil { defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) } @@ -1442,7 +1442,7 @@ func (tsv *TabletServer) Release(ctx context.Context, target *querypb.Target, tr target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RELEASE", time.Now()) - // With a tabletenv.LocalContext() the target is nil. + // With a tabletenv.LocalContext() the target can be nil. if target != nil { defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) } @@ -1529,7 +1529,7 @@ func (tsv *TabletServer) execRequest( span.Annotate("workload_name", options.WorkloadName) } trace.AnnotateSQL(span, sqlparser.Preview(sql)) - // With a tabletenv.LocalContext() the target is nil. + // With a tabletenv.LocalContext() the target can be nil. if target != nil { span.Annotate("cell", target.Cell) span.Annotate("shard", target.Shard) From 3a76cd3fffd0da45d9652b1e14abf0a4761183da Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 8 Dec 2023 16:12:36 -0500 Subject: [PATCH 06/10] Properly resolve target type when needed Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/tabletserver.go | 63 ++++++++++++++------- 1 file changed, 43 insertions(+), 20 deletions(-) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 80992dadcb3..97345f478cc 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -588,6 +588,23 @@ func (tsv *TabletServer) getPriorityFromOptions(options *querypb.ExecuteOptions) return optionsPriority } +// resolveTargetType returns the appropriate target tablet type for a +// TabletServer request. If the caller has a local context then it's +// an internal request and the target is the local tablet's current +// target. +func (tsv *TabletServer) resolveTargetType(ctx context.Context, target *querypb.Target) (string, error) { + if target != nil { + return target.TabletType.String(), nil + } + if !tabletenv.IsLocalContext(ctx) { + return topodatapb.ShardReplicationError_UNKNOWN.String(), vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no target specified") + } + if tsv.sm.Target() == nil { + return topodatapb.ShardReplicationError_UNKNOWN.String(), vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "TabletServer has no current target") + } + return tsv.sm.Target().String(), nil +} + // Commit commits the specified transaction. func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, transactionID int64) (newReservedID int64, err error) { err = tsv.execRequest( @@ -610,10 +627,11 @@ func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, tra // handlePanicAndSendLogStats doesn't log the no-op. if commitSQL != "" { tsv.stats.QueryTimings.Record("COMMIT", startTime) - // With a tabletenv.LocalContext() the target can be nil. - if target != nil { - tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), startTime) + tabletTypeStr, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err } + tsv.stats.QueryTimingsByTabletType.Record(tabletTypeStr, startTime) } else { logStats.Method = "" } @@ -631,10 +649,11 @@ func (tsv *TabletServer) Rollback(ctx context.Context, target *querypb.Target, t target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("ROLLBACK", time.Now()) - // With a tabletenv.LocalContext() the target can be nil. - if target != nil { - defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + tabletTypeStr, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err } + defer tsv.stats.QueryTimingsByTabletType.Record(tabletTypeStr, time.Now()) logStats.TransactionID = transactionID newReservedID, err = tsv.te.Rollback(ctx, transactionID) if newReservedID > 0 { @@ -1249,10 +1268,11 @@ func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *queryp target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - // With a tabletenv.LocalContext() the target can be nil. - if target != nil { - defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + tabletTypeStr, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err } + defer tsv.stats.QueryTimingsByTabletType.Record(tabletTypeStr, time.Now()) connID, sessionStateChanges, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries) if err != nil { return err @@ -1298,10 +1318,11 @@ func (tsv *TabletServer) ReserveBeginStreamExecute( target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - // With a tabletenv.LocalContext() the target can be nil. - if target != nil { - defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + tabletTypeStr, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err } + defer tsv.stats.QueryTimingsByTabletType.Record(tabletTypeStr, time.Now()) connID, sessionStateChanges, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries) if err != nil { return err @@ -1355,10 +1376,11 @@ func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Tar target, options, allowOnShutdown, func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - // With a tabletenv.LocalContext() the target can be nil. - if target != nil { - defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + tabletTypeStr, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err } + defer tsv.stats.QueryTimingsByTabletType.Record(tabletTypeStr, time.Now()) state.ReservedID, err = tsv.te.Reserve(ctx, options, transactionID, preQueries) if err != nil { return err @@ -1409,10 +1431,11 @@ func (tsv *TabletServer) ReserveStreamExecute( target, options, allowOnShutdown, func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - // With a tabletenv.LocalContext() the target can be nil. - if target != nil { - defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + tabletTypeStr, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err } + defer tsv.stats.QueryTimingsByTabletType.Record(tabletTypeStr, time.Now()) state.ReservedID, err = tsv.te.Reserve(ctx, options, transactionID, preQueries) if err != nil { return err @@ -1442,7 +1465,7 @@ func (tsv *TabletServer) Release(ctx context.Context, target *querypb.Target, tr target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RELEASE", time.Now()) - // With a tabletenv.LocalContext() the target can be nil. + if target != nil { defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) } @@ -1529,7 +1552,7 @@ func (tsv *TabletServer) execRequest( span.Annotate("workload_name", options.WorkloadName) } trace.AnnotateSQL(span, sqlparser.Preview(sql)) - // With a tabletenv.LocalContext() the target can be nil. + // With a tabletenv.LocalContext() the target will be nil. if target != nil { span.Annotate("cell", target.Cell) span.Annotate("shard", target.Shard) From e5fe9e936c651eff11678feed6dad2c425583703 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 8 Dec 2023 17:45:41 -0500 Subject: [PATCH 07/10] Fixes from additional testing Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/state_manager.go | 4 +- go/vt/vttablet/tabletserver/tabletserver.go | 58 +++++++++++-------- .../tabletserver/tabletserver_test.go | 47 ++++++++++++++- 3 files changed, 82 insertions(+), 27 deletions(-) diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 75d1a7c7c3b..98ed846600c 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -66,6 +66,8 @@ func (state servingState) String() string { var transitionRetryInterval = 1 * time.Second var logInitTime sync.Once +var ErrNoTarget = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No target") + // stateManager manages state transition for all the TabletServer // subcomponents. type stateManager struct { @@ -433,7 +435,7 @@ func (sm *stateManager) verifyTargetLocked(ctx context.Context, target *querypb. } } else { if !tabletenv.IsLocalContext(ctx) { - return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No target") + return ErrNoTarget } } return nil diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 97345f478cc..63714dc47f9 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -551,10 +551,11 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save logStats.OriginalSQL = beginSQL if beginSQL != "" { tsv.stats.QueryTimings.Record("BEGIN", startTime) - // With a tabletenv.LocalContext() the target can be nil. - if target != nil { - tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), startTime) + targetType, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err } + tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), startTime) } else { logStats.Method = "" } @@ -591,18 +592,19 @@ func (tsv *TabletServer) getPriorityFromOptions(options *querypb.ExecuteOptions) // resolveTargetType returns the appropriate target tablet type for a // TabletServer request. If the caller has a local context then it's // an internal request and the target is the local tablet's current -// target. -func (tsv *TabletServer) resolveTargetType(ctx context.Context, target *querypb.Target) (string, error) { +// target. If it's not a local context then there should always be a +// non-nil target specified. +func (tsv *TabletServer) resolveTargetType(ctx context.Context, target *querypb.Target) (topodatapb.TabletType, error) { if target != nil { - return target.TabletType.String(), nil + return target.TabletType, nil } if !tabletenv.IsLocalContext(ctx) { - return topodatapb.ShardReplicationError_UNKNOWN.String(), vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no target specified") + return topodatapb.TabletType_UNKNOWN, ErrNoTarget } if tsv.sm.Target() == nil { - return topodatapb.ShardReplicationError_UNKNOWN.String(), vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "TabletServer has no current target") + return topodatapb.TabletType_UNKNOWN, ErrNoTarget } - return tsv.sm.Target().String(), nil + return tsv.sm.Target().TabletType, nil } // Commit commits the specified transaction. @@ -627,11 +629,11 @@ func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, tra // handlePanicAndSendLogStats doesn't log the no-op. if commitSQL != "" { tsv.stats.QueryTimings.Record("COMMIT", startTime) - tabletTypeStr, err := tsv.resolveTargetType(ctx, target) + targetType, err := tsv.resolveTargetType(ctx, target) if err != nil { return err } - tsv.stats.QueryTimingsByTabletType.Record(tabletTypeStr, startTime) + tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), startTime) } else { logStats.Method = "" } @@ -649,11 +651,11 @@ func (tsv *TabletServer) Rollback(ctx context.Context, target *querypb.Target, t target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("ROLLBACK", time.Now()) - tabletTypeStr, err := tsv.resolveTargetType(ctx, target) + targetType, err := tsv.resolveTargetType(ctx, target) if err != nil { return err } - defer tsv.stats.QueryTimingsByTabletType.Record(tabletTypeStr, time.Now()) + defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now()) logStats.TransactionID = transactionID newReservedID, err = tsv.te.Rollback(ctx, transactionID) if newReservedID > 0 { @@ -864,6 +866,10 @@ func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sq return err } } + targetType, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err + } qre := &QueryExecutor{ query: query, marginComments: comments, @@ -874,7 +880,7 @@ func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sq ctx: ctx, logStats: logStats, tsv: tsv, - tabletType: target.GetTabletType(), + tabletType: targetType, setting: connSetting, } result, err = qre.Execute() @@ -1268,11 +1274,11 @@ func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *queryp target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - tabletTypeStr, err := tsv.resolveTargetType(ctx, target) + targetType, err := tsv.resolveTargetType(ctx, target) if err != nil { return err } - defer tsv.stats.QueryTimingsByTabletType.Record(tabletTypeStr, time.Now()) + defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now()) connID, sessionStateChanges, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries) if err != nil { return err @@ -1318,11 +1324,11 @@ func (tsv *TabletServer) ReserveBeginStreamExecute( target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - tabletTypeStr, err := tsv.resolveTargetType(ctx, target) + targetType, err := tsv.resolveTargetType(ctx, target) if err != nil { return err } - defer tsv.stats.QueryTimingsByTabletType.Record(tabletTypeStr, time.Now()) + defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now()) connID, sessionStateChanges, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries) if err != nil { return err @@ -1376,11 +1382,11 @@ func (tsv *TabletServer) ReserveExecute(ctx context.Context, target *querypb.Tar target, options, allowOnShutdown, func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - tabletTypeStr, err := tsv.resolveTargetType(ctx, target) + targetType, err := tsv.resolveTargetType(ctx, target) if err != nil { return err } - defer tsv.stats.QueryTimingsByTabletType.Record(tabletTypeStr, time.Now()) + defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now()) state.ReservedID, err = tsv.te.Reserve(ctx, options, transactionID, preQueries) if err != nil { return err @@ -1431,11 +1437,11 @@ func (tsv *TabletServer) ReserveStreamExecute( target, options, allowOnShutdown, func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RESERVE", time.Now()) - tabletTypeStr, err := tsv.resolveTargetType(ctx, target) + targetType, err := tsv.resolveTargetType(ctx, target) if err != nil { return err } - defer tsv.stats.QueryTimingsByTabletType.Record(tabletTypeStr, time.Now()) + defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now()) state.ReservedID, err = tsv.te.Reserve(ctx, options, transactionID, preQueries) if err != nil { return err @@ -1466,9 +1472,11 @@ func (tsv *TabletServer) Release(ctx context.Context, target *querypb.Target, tr func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RELEASE", time.Now()) - if target != nil { - defer tsv.stats.QueryTimingsByTabletType.Record(target.TabletType.String(), time.Now()) + targetType, err := tsv.resolveTargetType(ctx, target) + if err != nil { + return err } + defer tsv.stats.QueryTimingsByTabletType.Record(targetType.String(), time.Now()) logStats.TransactionID = transactionID logStats.ReservedID = reservedID if reservedID != 0 { @@ -1476,7 +1484,7 @@ func (tsv *TabletServer) Release(ctx context.Context, target *querypb.Target, tr return tsv.te.Release(reservedID) } // Rollback to cleanup the transaction before returning to the pool. - _, err := tsv.te.Rollback(ctx, transactionID) + _, err = tsv.te.Rollback(ctx, transactionID) return err }, ) diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index c9e312847b3..d8595630480 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -566,6 +566,11 @@ func TestTabletServerCommitPrepared(t *testing.T) { require.NoError(t, err) } +// TestTabletServerWithNilTarget confirms that a nil target is +// handled correctly. This means that when a local context is +// used, the target type is inferred from the local tablet's +// latest target type. +// And if it's not a local context then we return an error. func TestTabletServerWithNilTarget(t *testing.T) { // A non-nil target is required when not using a local context. ctx := tabletenv.LocalContext() @@ -573,6 +578,17 @@ func TestTabletServerWithNilTarget(t *testing.T) { defer tsv.StopService() defer db.Close() + // With a nil target, the local tablet's latest target type is + // what should be used as the inferred target type for our local + // calls. + target := (*querypb.Target)(nil) + localTargetType := topodatapb.TabletType_RDONLY // Use a non-default type + err := tsv.SetServingType(localTargetType, time.Now(), true, "test") + require.NoError(t, err) + + baseKey := "TabletServerTest" // Our TabletServer's name + fullKey := fmt.Sprintf("%s.%s", baseKey, localTargetType.String()) + executeSQL := "select * from test_table limit 1000" executeSQLResult := &sqltypes.Result{ Fields: []*querypb.Field{ @@ -582,15 +598,44 @@ func TestTabletServerWithNilTarget(t *testing.T) { {sqltypes.NewVarBinary("row01")}, }, } + // BEGIN gets transmuted to this since it's a RDONLY tablet. + db.AddQuery("start transaction read only", &sqltypes.Result{}) db.AddQuery(executeSQL, executeSQLResult) - target := (*querypb.Target)(nil) + expectedCount := tsv.stats.QueryTimingsByTabletType.Counts()[fullKey] + state, err := tsv.Begin(ctx, target, nil) require.NoError(t, err) + expectedCount++ + require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey]) + _, err = tsv.Execute(ctx, target, executeSQL, nil, state.TransactionID, 0, nil) require.NoError(t, err) + expectedCount++ + require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey]) + _, err = tsv.Rollback(ctx, target, state.TransactionID) require.NoError(t, err) + expectedCount++ + require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey]) + + state, err = tsv.Begin(ctx, target, nil) + require.NoError(t, err) + expectedCount++ + require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey]) + + _, err = tsv.Commit(ctx, target, state.TransactionID) + require.NoError(t, err) + expectedCount++ + require.Equal(t, expectedCount, tsv.stats.QueryTimingsByTabletType.Counts()[fullKey]) + + // Finally be sure that we return an error now as expected when NOT + // using a local context but passing a nil target. + nonLocalCtx := context.Background() + _, err = tsv.Begin(nonLocalCtx, target, nil) + require.True(t, errors.Is(err, ErrNoTarget)) + _, err = tsv.resolveTargetType(nonLocalCtx, target) + require.True(t, errors.Is(err, ErrNoTarget)) } func TestSmallerTimeout(t *testing.T) { From db7106891257a4a870a611b23250ce3ef58dd3df Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 8 Dec 2023 17:49:09 -0500 Subject: [PATCH 08/10] Tweak for unknown Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/tabletserver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 63714dc47f9..6ff545150c2 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -602,7 +602,7 @@ func (tsv *TabletServer) resolveTargetType(ctx context.Context, target *querypb. return topodatapb.TabletType_UNKNOWN, ErrNoTarget } if tsv.sm.Target() == nil { - return topodatapb.TabletType_UNKNOWN, ErrNoTarget + return topodatapb.TabletType_UNKNOWN, nil // This is true, and does not block the request. } return tsv.sm.Target().TabletType, nil } From 64953e6fa72116997348e8a187ebd74ffd43c3fb Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 8 Dec 2023 17:53:41 -0500 Subject: [PATCH 09/10] Minor changes after self review Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/tabletserver.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 6ff545150c2..f6e202d7a3c 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -602,7 +602,7 @@ func (tsv *TabletServer) resolveTargetType(ctx context.Context, target *querypb. return topodatapb.TabletType_UNKNOWN, ErrNoTarget } if tsv.sm.Target() == nil { - return topodatapb.TabletType_UNKNOWN, nil // This is true, and does not block the request. + return topodatapb.TabletType_UNKNOWN, nil // This is true, and does not block the request } return tsv.sm.Target().TabletType, nil } @@ -1471,7 +1471,6 @@ func (tsv *TabletServer) Release(ctx context.Context, target *querypb.Target, tr target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { defer tsv.stats.QueryTimings.Record("RELEASE", time.Now()) - targetType, err := tsv.resolveTargetType(ctx, target) if err != nil { return err From 2e3b883725c96d3163ae98d08a92e00c2dac80e2 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 8 Dec 2023 18:31:16 -0500 Subject: [PATCH 10/10] Add additional nil check Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/tabletserver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index f6e202d7a3c..295a85c31fd 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -166,7 +166,7 @@ func NewTabletServer(ctx context.Context, name string, config *tabletenv.TabletC tsOnce.Do(func() { srvTopoServer = srvtopo.NewResilientServer(ctx, topoServer, "TabletSrvTopo") }) tabletTypeFunc := func() topodatapb.TabletType { - if tsv.sm == nil { + if tsv.sm == nil || tsv.sm.Target() == nil { return topodatapb.TabletType_UNKNOWN } return tsv.sm.Target().TabletType