From 58b2ed2ea73bf78afb412b1bfccde24b2d21d879 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 9 Nov 2023 19:04:23 +0100 Subject: [PATCH 1/2] `slack-vitess-r14.0.5`: backport vitessio/vitess#14428 from v16 (#165) * make column resolution closer to mysql Signed-off-by: Andres Taylor * Fix bad merge conflict fix Signed-off-by: Tim Vaillancourt * Possible horizon_planning.go fix Signed-off-by: Tim Vaillancourt * Fix aggr_cases.json plan Signed-off-by: Tim Vaillancourt * Fix group by order Signed-off-by: Tim Vaillancourt * Remove 'TablesUsed', not in v14 Signed-off-by: Tim Vaillancourt --------- Signed-off-by: Andres Taylor Signed-off-by: Tim Vaillancourt Co-authored-by: Andres Taylor --- .../planbuilder/abstract/queryprojection.go | 59 ++++++++------- go/vt/vtgate/planbuilder/horizon_planning.go | 13 +--- .../planbuilder/testdata/aggr_cases.json | 18 ++--- .../planbuilder/testdata/select_cases.json | 75 +++++++++++++++++++ 4 files changed, 120 insertions(+), 45 deletions(-) diff --git a/go/vt/vtgate/planbuilder/abstract/queryprojection.go b/go/vt/vtgate/planbuilder/abstract/queryprojection.go index 9b7b9df34ce..4ffb0a6cb83 100644 --- a/go/vt/vtgate/planbuilder/abstract/queryprojection.go +++ b/go/vt/vtgate/planbuilder/abstract/queryprojection.go @@ -151,17 +151,14 @@ func CreateQPFromSelect(sel *sqlparser.Select) (*QueryProjection, error) { } for _, group := range sel.GroupBy { selectExprIdx, aliasExpr := qp.FindSelectExprIndexForExpr(group) - expr, weightStrExpr, err := qp.GetSimplifiedExpr(group) - if err != nil { - return nil, err - } + weightStrExpr := qp.GetSimplifiedExpr(group) err = checkForInvalidGroupingExpressions(weightStrExpr) if err != nil { return nil, err } groupBy := GroupBy{ - Inner: expr, + Inner: group, WeightStrExpr: weightStrExpr, InnerIndex: selectExprIdx, aliasedExpr: aliasExpr, @@ -274,17 +271,14 @@ func CreateQPFromUnion(union *sqlparser.Union) (*QueryProjection, error) { func (qp *QueryProjection) addOrderBy(orderBy sqlparser.OrderBy) error { canPushDownSorting := true for _, order := range orderBy { - expr, weightStrExpr, err := qp.GetSimplifiedExpr(order.Expr) - if err != nil { - return err - } + weightStrExpr := qp.GetSimplifiedExpr(order.Expr) if sqlparser.IsNull(weightStrExpr) { // ORDER BY null can safely be ignored continue } qp.OrderExprs = append(qp.OrderExprs, OrderBy{ Inner: &sqlparser.Order{ - Expr: expr, + Expr: order.Expr, Direction: order.Direction, }, WeightStrExpr: weightStrExpr, @@ -360,34 +354,47 @@ func (qp *QueryProjection) isExprInGroupByExprs(expr SelectExpr) bool { } // GetSimplifiedExpr takes an expression used in ORDER BY or GROUP BY, and returns an expression that is simpler to evaluate -func (qp *QueryProjection) GetSimplifiedExpr(e sqlparser.Expr) (expr sqlparser.Expr, weightStrExpr sqlparser.Expr, err error) { +func (qp *QueryProjection) GetSimplifiedExpr(e sqlparser.Expr) (found sqlparser.Expr) { + if qp == nil { + return e + } // If the ORDER BY is against a column alias, we need to remember the expression // behind the alias. The weightstring(.) calls needs to be done against that expression and not the alias. // Eg - select music.foo as bar, weightstring(music.foo) from music order by bar - colExpr, isColName := e.(*sqlparser.ColName) - if !isColName { - return e, e, nil - } - - if sqlparser.IsNull(e) { - return e, nil, nil + in, isColName := e.(*sqlparser.ColName) + if !(isColName && in.Qualifier.IsEmpty()) { + // we are only interested in unqualified column names. if it's not a column name and not unqualified, we're done + return e } - if colExpr.Qualifier.IsEmpty() { - for _, selectExpr := range qp.SelectExprs { - aliasedExpr, isAliasedExpr := selectExpr.Col.(*sqlparser.AliasedExpr) - if !isAliasedExpr { + for _, selectExpr := range qp.SelectExprs { + ae, ok := selectExpr.Col.(*sqlparser.AliasedExpr) + if !ok { + continue + } + aliased := !ae.As.IsEmpty() + if aliased { + if in.Name.Equal(ae.As) { + return ae.Expr + } + } else { + seCol, ok := ae.Expr.(*sqlparser.ColName) + if !ok { continue } - isAliasExpr := !aliasedExpr.As.IsEmpty() - if isAliasExpr && colExpr.Name.Equal(aliasedExpr.As) { - return e, aliasedExpr.Expr, nil + if seCol.Name.Equal(in.Name) { + // If the column name matches, we have a match, even if the table name is not listed + return ae.Expr } } } - return e, e, nil + if found == nil { + found = e + } + + return found } // toString should only be used for tests diff --git a/go/vt/vtgate/planbuilder/horizon_planning.go b/go/vt/vtgate/planbuilder/horizon_planning.go index 0f3b25a70b4..847a84d9cef 100644 --- a/go/vt/vtgate/planbuilder/horizon_planning.go +++ b/go/vt/vtgate/planbuilder/horizon_planning.go @@ -541,10 +541,8 @@ func (hp *horizonPlanning) handleDistinctAggr(ctx *plancontext.PlanningContext, err = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "syntax error: %s", sqlparser.String(expr.Original)) return } - inner, innerWS, err := hp.qp.GetSimplifiedExpr(aliasedExpr.Expr) - if err != nil { - return nil, nil, nil, err - } + inner := aliasedExpr.Expr + innerWS := hp.qp.GetSimplifiedExpr(inner) if exprHasVindex(ctx.SemTable, innerWS, false) { aggrs = append(aggrs, expr) continue @@ -600,13 +598,10 @@ func newOffset(col int) offsets { func (hp *horizonPlanning) createGroupingsForColumns(columns []*sqlparser.ColName) ([]abstract.GroupBy, error) { var lhsGrouping []abstract.GroupBy for _, lhsColumn := range columns { - expr, wsExpr, err := hp.qp.GetSimplifiedExpr(lhsColumn) - if err != nil { - return nil, err - } + wsExpr := hp.qp.GetSimplifiedExpr(lhsColumn) lhsGrouping = append(lhsGrouping, abstract.GroupBy{ - Inner: expr, + Inner: lhsColumn, WeightStrExpr: wsExpr, }) } diff --git a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json index 6016622d2e3..377c5b06008 100644 --- a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json @@ -4784,9 +4784,9 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select id, b as id, count(*), weight_string(b) from `user` where 1 != 1", + "FieldQuery": "select id, b as id, count(*), weight_string(id) from `user` where 1 != 1", "OrderBy": "(1|3) ASC", - "Query": "select id, b as id, count(*), weight_string(b) from `user` order by id asc", + "Query": "select id, b as id, count(*), weight_string(id) from `user` order by id asc", "Table": "`user`" } ] @@ -4851,14 +4851,12 @@ "Instructions": { "OperatorType": "Aggregate", "Variant": "Ordered", - "Aggregates": "random(0) AS id", - "GroupBy": "(2|1)", + "GroupBy": "(1|0)", "ResultColumns": 1, "Inputs": [ { "OperatorType": "Projection", "Expressions": [ - "[COLUMN 2] as id", "[COLUMN 1]", "[COLUMN 0] as id" ], @@ -4866,7 +4864,7 @@ { "OperatorType": "Join", "Variant": "Join", - "JoinColumnIndexes": "L:0,L:1,L:0", + "JoinColumnIndexes": "L:0,L:1", "TableName": "`user`_user_extra", "Inputs": [ { @@ -4876,9 +4874,9 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select `user`.id, weight_string(id) from `user` where 1 != 1 group by id, weight_string(id)", + "FieldQuery": "select id, weight_string(`user`.id) from `user` where 1 != 1 group by id, weight_string(`user`.id)", "OrderBy": "(0|1) ASC", - "Query": "select `user`.id, weight_string(id) from `user` group by id, weight_string(id) order by id asc", + "Query": "select id, weight_string(`user`.id) from `user` group by id, weight_string(`user`.id) order by id asc", "Table": "`user`" }, { @@ -4888,8 +4886,8 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select 1 from user_extra where 1 != 1 group by 1", - "Query": "select 1 from user_extra group by 1", + "FieldQuery": "select 1 from user_extra where 1 != 1", + "Query": "select 1 from user_extra", "Table": "user_extra" } ] diff --git a/go/vt/vtgate/planbuilder/testdata/select_cases.json b/go/vt/vtgate/planbuilder/testdata/select_cases.json index d6b7a8c57bd..5b0f32b6689 100644 --- a/go/vt/vtgate/planbuilder/testdata/select_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/select_cases.json @@ -5684,5 +5684,80 @@ "Table": "dual, unsharded_a" } } + }, + { + "comment": "column with qualifier is correctly used", + "query": "select u.foo, ue.foo as apa from user u, user_extra ue order by foo ", + "v3-plan": { + "QueryType": "SELECT", + "Original": "select u.foo, ue.foo as apa from user u, user_extra ue order by foo ", + "Instructions": { + "OperatorType": "Join", + "Variant": "Join", + "JoinColumnIndexes": "L:0,R:0", + "TableName": "`user`_user_extra", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select u.foo, weight_string(u.foo) from `user` as u where 1 != 1", + "OrderBy": "(0|1) ASC", + "Query": "select u.foo, weight_string(u.foo) from `user` as u order by foo asc", + "ResultColumns": 1, + "Table": "`user`" + }, + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select ue.foo as apa from user_extra as ue where 1 != 1", + "Query": "select ue.foo as apa from user_extra as ue", + "Table": "user_extra" + } + ] + } + }, + "gen4-plan": { + "QueryType": "SELECT", + "Original": "select u.foo, ue.foo as apa from user u, user_extra ue order by foo ", + "Instructions": { + "OperatorType": "Join", + "Variant": "Join", + "JoinColumnIndexes": "L:0,R:0", + "TableName": "`user`_user_extra", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select u.foo, weight_string(u.foo) from `user` as u where 1 != 1", + "OrderBy": "(0|1) ASC", + "Query": "select u.foo, weight_string(u.foo) from `user` as u order by foo asc", + "Table": "`user`" + }, + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select ue.foo as apa from user_extra as ue where 1 != 1", + "Query": "select ue.foo as apa from user_extra as ue", + "Table": "user_extra" + } + ] + } + } } ] From 8509de656960ade1af6afcb09e90dd354a0cfe75 Mon Sep 17 00:00:00 2001 From: "Eduardo J. Ortega U" <5791035+ejortegau@users.noreply.github.com> Date: Wed, 6 Dec 2023 12:30:12 +0100 Subject: [PATCH 2/2] TxThrottler only throttles if current lag is above threshold. Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> --- .../tabletserver/txthrottler/tx_throttler.go | 30 ++++++++- .../txthrottler/tx_throttler_test.go | 62 ++++++++++++++++--- 2 files changed, 83 insertions(+), 9 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 9f2f369ffd9..94035b9ec0c 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -23,6 +23,8 @@ import ( "sync" "time" + "github.com/patrickmn/go-cache" + "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" @@ -184,6 +186,8 @@ type txThrottlerStateImpl struct { healthCheck discovery.LegacyHealthCheck topologyWatchers []TopologyWatcherInterface + // lagRecordsCache holds the most recently seen lag for each tablet + lagRecordsCache *cache.Cache } // NewTxThrottler tries to construct a txThrottler from the @@ -331,6 +335,8 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar result := &txThrottlerStateImpl{ config: config, throttler: t, + lagRecordsCache: cache.New(time.Duration(2*config.throttlerConfig.TargetReplicationLagSec)*time.Second, + time.Duration(4*config.throttlerConfig.TargetReplicationLagSec)*time.Second), } result.healthCheck = healthCheckFactory() result.healthCheck.SetListener(result, false /* sendDownEvents */) @@ -359,7 +365,25 @@ func (ts *txThrottlerStateImpl) throttle() bool { // Serialize calls to ts.throttle.Throttle() ts.throttleMu.Lock() defer ts.throttleMu.Unlock() - return ts.throttler.Throttle(0 /* threadId */) > 0 + + // Find out the max lag seen recently from the lag cache + var maxLag int64 + + for host, lagInfo := range ts.lagRecordsCache.Items() { + lag, ok := lagInfo.Object.(int64) + if !ok { + log.Warningf("Failed to get lag of tablet %s from cache: %+v", host, lagInfo.Object) + + continue + } + + if lag > maxLag { + maxLag = lag + } + } + + return ts.throttler.Throttle(0 /* threadId */) > 0 && // Throttle if underlying Throttler object says so... + maxLag > ts.config.throttlerConfig.TargetReplicationLagSec // ... but only if there is actual lag } func (ts *txThrottlerStateImpl) deallocateResources() { @@ -391,6 +415,10 @@ func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.LegacyTabletS for _, expectedTabletType := range ts.config.tabletTypes { if tabletStats.Target.TabletType == expectedTabletType { ts.throttler.RecordReplicationLag(time.Now(), tabletStats) + if tabletStats.Stats != nil { + ts.lagRecordsCache.Set(tabletStats.Name, int64(tabletStats.Stats.ReplicationLagSeconds), + cache.DefaultExpiration) + } return } } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 7b9e4bc98bd..aa129bdb09c 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -90,26 +90,53 @@ func TestEnabledThrottler(t *testing.T) { } call0 := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */) + // Call 1 cannot throttle because the return value of call to underlying Throttle is zero. call1 := mockThrottler.EXPECT().Throttle(0) call1.Return(0 * time.Second) - tabletStats := &discovery.LegacyTabletStats{ + tabletStatsReplicaLagged := &discovery.LegacyTabletStats{ Target: &querypb.Target{ TabletType: topodatapb.TabletType_REPLICA, }, + Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 20}, // Lag high enough for throttling + Name: "replica-name", } - call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats) + // call2 Adds a lag record above threshold for replica tablets + call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStatsReplicaLagged) + + // call3 can throttle because the return value of call to Throttle is > zero - provided other conditions are met call3 := mockThrottler.EXPECT().Throttle(0) call3.Return(1 * time.Second) - + // call4 can throttle because the return value of call to Throttle is > zero - provided other conditions are met call4 := mockThrottler.EXPECT().Throttle(0) call4.Return(1 * time.Second) + + tabletStatsReplicaNotLagged := &discovery.LegacyTabletStats{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_REPLICA, + }, + Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1}, // Lag high enough for throttling + Name: "replica-name", + } + + // call5 Adds a lag record below threshold for the right tablet type + call5 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStatsReplicaNotLagged) + // call6 can throttle because the return value of call to Throttle is > zero - provided other conditions are met + call6 := mockThrottler.EXPECT().Throttle(0) + call6.Return(1 * time.Second) + // call7 can throttle because the return value of call to Throttle is > zero - provided other conditions are met + call7 := mockThrottler.EXPECT().Throttle(0) + call7.Return(1 * time.Second) + calllast := mockThrottler.EXPECT().Close() call1.After(call0) call2.After(call1) call3.After(call2) call4.After(call3) - calllast.After(call4) + call5.After(call4) + call6.After(call5) + call7.After(call6) + calllast.After(call7) config := tabletenv.NewDefaultConfig() config.EnableTxThrottler = true @@ -126,27 +153,46 @@ func TestEnabledThrottler(t *testing.T) { assert.Nil(t, throttler.Open()) assert.Equal(t, int64(1), throttler.throttlerRunning.Get()) + // call1 can't throttle. assert.False(t, throttler.Throttle(100, "some-workload")) assert.Equal(t, int64(1), throttler.requestsTotal.Counts()["some-workload"]) assert.Zero(t, throttler.requestsThrottled.Counts()["some-workload"]) + // call2 records lag above threshold for REPLICA tablet + throttler.state.StatsUpdate(tabletStatsReplicaLagged) - throttler.state.StatsUpdate(tabletStats) + // This call should not be forwarded to the go/vt/throttler.Throttler object. Ignore RDONLY replicas due to config rdonlyTabletStats := &discovery.LegacyTabletStats{ Target: &querypb.Target{ TabletType: topodatapb.TabletType_RDONLY, }, + Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 20}, // Lag high enough for throttling + Name: "rdonly-name", } - // This call should not be forwarded to the go/vt/throttler.Throttler object. hcListener.StatsUpdate(rdonlyTabletStats) - // The second throttle call should reject. + + // call3 throttles due to priority & enough replication lag besides return value for underlying Throttle call assert.True(t, throttler.Throttle(100, "some-workload")) assert.Equal(t, int64(2), throttler.requestsTotal.Counts()["some-workload"]) assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"]) - // This call should not throttle due to priority. Check that's the case and counters agree. + // call4 does not throttle due to priority, despite enough replication lag besides return value for underlying + // Throttle call assert.False(t, throttler.Throttle(0, "some-workload")) assert.Equal(t, int64(3), throttler.requestsTotal.Counts()["some-workload"]) assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"]) + + // call5 records lag below threshold for REPLICA tablet + throttler.state.StatsUpdate(tabletStatsReplicaNotLagged) + + // call6 does not throttle despite priority, because lag is below threshold + assert.False(t, throttler.Throttle(100, "some-workload")) + assert.Equal(t, int64(4), throttler.requestsTotal.Counts()["some-workload"]) + assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"]) + // call7 does not throttle due to priority and replication lag below threshold + assert.False(t, throttler.Throttle(0, "some-workload")) + assert.Equal(t, int64(5), throttler.requestsTotal.Counts()["some-workload"]) + assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"]) + throttler.Close() assert.Zero(t, throttler.throttlerRunning.Get()) }