From 81c1965ab51f23f074926e844c8fe6ce91bc1402 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 9 Apr 2024 15:29:21 +0530 Subject: [PATCH 1/5] feat: prevent adding to query details after unserve common has started Signed-off-by: Manan Gupta --- .../vttablet/tabletserver/livequeryz_test.go | 10 +++---- go/vt/vttablet/tabletserver/query_executor.go | 20 +++++++++++--- go/vt/vttablet/tabletserver/query_list.go | 25 ++++++++++++++---- .../vttablet/tabletserver/query_list_test.go | 26 ++++++++++++++++--- go/vt/vttablet/tabletserver/state_manager.go | 9 +++++++ .../tabletserver/state_manager_test.go | 8 +++--- 6 files changed, 77 insertions(+), 21 deletions(-) diff --git a/go/vt/vttablet/tabletserver/livequeryz_test.go b/go/vt/vttablet/tabletserver/livequeryz_test.go index e507f365afb..18ce01c2273 100644 --- a/go/vt/vttablet/tabletserver/livequeryz_test.go +++ b/go/vt/vttablet/tabletserver/livequeryz_test.go @@ -30,8 +30,8 @@ func TestLiveQueryzHandlerJSON(t *testing.T) { req, _ := http.NewRequest("GET", "/livequeryz/?format=json", nil) queryList := NewQueryList("test", sqlparser.NewTestParser()) - queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 1})) - queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 2})) + _ = queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 1})) + _ = queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 2})) livequeryzHandler([]*QueryList{queryList}, resp, req) } @@ -41,8 +41,8 @@ func TestLiveQueryzHandlerHTTP(t *testing.T) { req, _ := http.NewRequest("GET", "/livequeryz/", nil) queryList := NewQueryList("test", sqlparser.NewTestParser()) - queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 1})) - queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 2})) + _ = queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 1})) + _ = queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 2})) livequeryzHandler([]*QueryList{queryList}, resp, req) } @@ -64,7 +64,7 @@ func TestLiveQueryzHandlerTerminateConn(t *testing.T) { queryList := NewQueryList("test", sqlparser.NewTestParser()) testConn := &testConn{id: 1} - queryList.Add(NewQueryDetail(context.Background(), testConn)) + _ = queryList.Add(NewQueryDetail(context.Background(), testConn)) if testConn.IsKilled() { t.Fatalf("conn should still be alive") } diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index f371d62006c..d1fbc96123f 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -1085,7 +1085,10 @@ func (qre *QueryExecutor) execDBConn(conn *connpool.Conn, sql string, wantfields defer qre.logStats.AddRewrittenSQL(sql, time.Now()) qd := NewQueryDetail(qre.logStats.Ctx, conn) - qre.tsv.statelessql.Add(qd) + err := qre.tsv.statelessql.Add(qd) + if err != nil { + return nil, err + } defer qre.tsv.statelessql.Remove(qd) return conn.Exec(ctx, sql, int(qre.tsv.qe.maxResultSize.Load()), wantfields) @@ -1098,7 +1101,10 @@ func (qre *QueryExecutor) execStatefulConn(conn *StatefulConnection, sql string, defer qre.logStats.AddRewrittenSQL(sql, time.Now()) qd := NewQueryDetail(qre.logStats.Ctx, conn) - qre.tsv.statefulql.Add(qd) + err := qre.tsv.statefulql.Add(qd) + if err != nil { + return nil, err + } defer qre.tsv.statefulql.Remove(qd) return conn.Exec(ctx, sql, int(qre.tsv.qe.maxResultSize.Load()), wantfields) @@ -1122,11 +1128,17 @@ func (qre *QueryExecutor) execStreamSQL(conn *connpool.PooledConn, isTransaction // once their grace period is over. qd := NewQueryDetail(qre.logStats.Ctx, conn.Conn) if isTransaction { - qre.tsv.statefulql.Add(qd) + err := qre.tsv.statefulql.Add(qd) + if err != nil { + return err + } defer qre.tsv.statefulql.Remove(qd) return conn.Conn.StreamOnce(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Load()), sqltypes.IncludeFieldsOrDefault(qre.options)) } - qre.tsv.olapql.Add(qd) + err := qre.tsv.olapql.Add(qd) + if err != nil { + return err + } defer qre.tsv.olapql.Remove(qd) return conn.Conn.Stream(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Load()), sqltypes.IncludeFieldsOrDefault(qre.options)) } diff --git a/go/vt/vttablet/tabletserver/query_list.go b/go/vt/vttablet/tabletserver/query_list.go index a41f23b6aa0..3ccf13418a7 100644 --- a/go/vt/vttablet/tabletserver/query_list.go +++ b/go/vt/vttablet/tabletserver/query_list.go @@ -26,7 +26,9 @@ import ( "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/callinfo" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" ) // QueryDetail is a simple wrapper for Query, Context and a killable conn. @@ -58,28 +60,41 @@ type QueryList struct { // and remove appropriately. queryDetails map[int64][]*QueryDetail - parser *sqlparser.Parser + parser *sqlparser.Parser + clusterActionInProgress bool } // NewQueryList creates a new QueryList func NewQueryList(name string, parser *sqlparser.Parser) *QueryList { return &QueryList{ - name: name, - queryDetails: make(map[int64][]*QueryDetail), - parser: parser, + name: name, + queryDetails: make(map[int64][]*QueryDetail), + parser: parser, + clusterActionInProgress: false, } } +// SetClusterAction sets the clusterActionInProgress field. +func (ql *QueryList) SetClusterAction(inProgress bool) { + ql.mu.Lock() + defer ql.mu.Unlock() + ql.clusterActionInProgress = inProgress +} + // Add adds a QueryDetail to QueryList -func (ql *QueryList) Add(qd *QueryDetail) { +func (ql *QueryList) Add(qd *QueryDetail) error { ql.mu.Lock() defer ql.mu.Unlock() + if ql.clusterActionInProgress { + return vterrors.New(vtrpcpb.Code_CLUSTER_EVENT, vterrors.ShuttingDown) + } qds, exists := ql.queryDetails[qd.connID] if exists { ql.queryDetails[qd.connID] = append(qds, qd) } else { ql.queryDetails[qd.connID] = []*QueryDetail{qd} } + return nil } // Remove removes a QueryDetail from QueryList diff --git a/go/vt/vttablet/tabletserver/query_list_test.go b/go/vt/vttablet/tabletserver/query_list_test.go index 57b672a16e0..bb1b47db4af 100644 --- a/go/vt/vttablet/tabletserver/query_list_test.go +++ b/go/vt/vttablet/tabletserver/query_list_test.go @@ -49,7 +49,8 @@ func TestQueryList(t *testing.T) { ql := NewQueryList("test", sqlparser.NewTestParser()) connID := int64(1) qd := NewQueryDetail(context.Background(), &testConn{id: connID}) - ql.Add(qd) + err := ql.Add(qd) + require.NoError(t, err) if qd1, ok := ql.queryDetails[connID]; !ok || qd1[0].connID != connID { t.Errorf("failed to add to QueryList") @@ -57,7 +58,8 @@ func TestQueryList(t *testing.T) { conn2ID := int64(2) qd2 := NewQueryDetail(context.Background(), &testConn{id: conn2ID}) - ql.Add(qd2) + err = ql.Add(qd2) + require.NoError(t, err) rows := ql.AppendQueryzRows(nil) if len(rows) != 2 || rows[0].ConnID != 1 || rows[1].ConnID != 2 { @@ -74,11 +76,13 @@ func TestQueryListChangeConnIDInMiddle(t *testing.T) { ql := NewQueryList("test", sqlparser.NewTestParser()) connID := int64(1) qd1 := NewQueryDetail(context.Background(), &testConn{id: connID}) - ql.Add(qd1) + err := ql.Add(qd1) + require.NoError(t, err) conn := &testConn{id: connID} qd2 := NewQueryDetail(context.Background(), conn) - ql.Add(qd2) + err = ql.Add(qd2) + require.NoError(t, err) require.Len(t, ql.queryDetails[1], 2) @@ -92,3 +96,17 @@ func TestQueryListChangeConnIDInMiddle(t *testing.T) { require.Equal(t, qd1, ql.queryDetails[1][0]) require.NotEqual(t, qd2, ql.queryDetails[1][0]) } + +func TestClusterAction(t *testing.T) { + ql := NewQueryList("test", sqlparser.NewTestParser()) + connID := int64(1) + qd1 := NewQueryDetail(context.Background(), &testConn{id: connID}) + + ql.SetClusterAction(true) + err := ql.Add(qd1) + require.ErrorContains(t, err, "operation not allowed in state SHUTTING_DOWN") + + ql.SetClusterAction(false) + err = ql.Add(qd1) + require.NoError(t, err) +} diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 60b1f1281d0..af2da48f75d 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -542,6 +542,8 @@ func (sm *stateManager) connect(tabletType topodatapb.TabletType) error { } func (sm *stateManager) unserveCommon() { + sm.markClusterAction(true) + defer sm.markClusterAction(false) // We create a wait group that tracks whether all the queries have been terminated or not. wg := sync.WaitGroup{} wg.Add(1) @@ -850,3 +852,10 @@ func (sm *stateManager) IsServingString() string { func (sm *stateManager) SetUnhealthyThreshold(v time.Duration) { sm.unhealthyThreshold.Store(v.Nanoseconds()) } + +// markClusterAction marks whether a cluster action is in progress or not for all the query details. +func (sm *stateManager) markClusterAction(inProgress bool) { + sm.statefulql.SetClusterAction(inProgress) + sm.statelessql.SetClusterAction(inProgress) + sm.olapql.SetClusterAction(inProgress) +} diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index a0ef3557074..f6345b9b29c 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -409,18 +409,20 @@ func TestStateManagerShutdownGracePeriod(t *testing.T) { sm.te = &delayedTxEngine{} kconn1 := &killableConn{id: 1} - sm.statelessql.Add(&QueryDetail{ + err := sm.statelessql.Add(&QueryDetail{ conn: kconn1, connID: kconn1.id, }) + require.NoError(t, err) kconn2 := &killableConn{id: 2} - sm.statefulql.Add(&QueryDetail{ + err = sm.statefulql.Add(&QueryDetail{ conn: kconn2, connID: kconn2.id, }) + require.NoError(t, err) // Transition to replica with no shutdown grace period should kill kconn2 but not kconn1. - err := sm.SetServingType(topodatapb.TabletType_PRIMARY, testNow, StateServing, "") + err = sm.SetServingType(topodatapb.TabletType_PRIMARY, testNow, StateServing, "") require.NoError(t, err) assert.False(t, kconn1.killed.Load()) assert.True(t, kconn2.killed.Load()) From 0d4899f0a00b15263287a78452b13decb1fb4e0d Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 9 Apr 2024 16:19:01 +0530 Subject: [PATCH 2/5] test: fix test by removing an assertion on the test error since it is not deterministic anymore Signed-off-by: Manan Gupta --- go/test/endtoend/vtgate/transaction/restart/main_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/go/test/endtoend/vtgate/transaction/restart/main_test.go b/go/test/endtoend/vtgate/transaction/restart/main_test.go index de52a3e8870..01185b5fa59 100644 --- a/go/test/endtoend/vtgate/transaction/restart/main_test.go +++ b/go/test/endtoend/vtgate/transaction/restart/main_test.go @@ -23,7 +23,6 @@ import ( "os" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" @@ -113,5 +112,4 @@ func TestStreamTxRestart(t *testing.T) { // query should return connection error _, err = utils.ExecAllowError(t, conn, "select connection_id()") require.Error(t, err) - assert.Contains(t, err.Error(), "broken pipe (errno 2006) (sqlstate HY000)") } From aee780ba69f065d9797115976e4614d5dd956c68 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 9 Apr 2024 17:26:09 +0530 Subject: [PATCH 3/5] test: check we receive no errors from Add Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletserver/livequeryz_test.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/go/vt/vttablet/tabletserver/livequeryz_test.go b/go/vt/vttablet/tabletserver/livequeryz_test.go index 18ce01c2273..8dad3cd1631 100644 --- a/go/vt/vttablet/tabletserver/livequeryz_test.go +++ b/go/vt/vttablet/tabletserver/livequeryz_test.go @@ -22,6 +22,8 @@ import ( "net/http/httptest" "testing" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/sqlparser" ) @@ -30,8 +32,10 @@ func TestLiveQueryzHandlerJSON(t *testing.T) { req, _ := http.NewRequest("GET", "/livequeryz/?format=json", nil) queryList := NewQueryList("test", sqlparser.NewTestParser()) - _ = queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 1})) - _ = queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 2})) + err := queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 1})) + require.NoError(t, err) + err = queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 2})) + require.NoError(t, err) livequeryzHandler([]*QueryList{queryList}, resp, req) } @@ -41,8 +45,10 @@ func TestLiveQueryzHandlerHTTP(t *testing.T) { req, _ := http.NewRequest("GET", "/livequeryz/", nil) queryList := NewQueryList("test", sqlparser.NewTestParser()) - _ = queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 1})) - _ = queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 2})) + err := queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 1})) + require.NoError(t, err) + err = queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 2})) + require.NoError(t, err) livequeryzHandler([]*QueryList{queryList}, resp, req) } @@ -64,7 +70,8 @@ func TestLiveQueryzHandlerTerminateConn(t *testing.T) { queryList := NewQueryList("test", sqlparser.NewTestParser()) testConn := &testConn{id: 1} - _ = queryList.Add(NewQueryDetail(context.Background(), testConn)) + err := queryList.Add(NewQueryDetail(context.Background(), testConn)) + require.NoError(t, err) if testConn.IsKilled() { t.Fatalf("conn should still be alive") } From 5ef853a7459de6a5504c06fc0cd93542d38ab885 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 10 Apr 2024 11:04:14 +0530 Subject: [PATCH 4/5] feat: only prevent query addition after the grace period has expired Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletserver/state_manager.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index af2da48f75d..c7f12201796 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -542,7 +542,6 @@ func (sm *stateManager) connect(tabletType topodatapb.TabletType) error { } func (sm *stateManager) unserveCommon() { - sm.markClusterAction(true) defer sm.markClusterAction(false) // We create a wait group that tracks whether all the queries have been terminated or not. wg := sync.WaitGroup{} @@ -603,6 +602,8 @@ func (sm *stateManager) terminateAllQueries(wg *sync.WaitGroup) (cancel func()) if err := timer.SleepContext(ctx, sm.shutdownGracePeriod); err != nil { return } + // Prevent any new queries from being added before we kill all the queries in the list. + sm.markClusterAction(true) log.Infof("Grace Period %v exceeded. Killing all OLTP queries.", sm.shutdownGracePeriod) sm.statelessql.TerminateAll() log.Infof("Killed all stateless OLTP queries.") From 880c5e3588725773394acc39092136bc4ec27937 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 11 Apr 2024 13:59:58 +0530 Subject: [PATCH 5/5] feat: fix race where we set cluster action to not accepting queries after the defer function has run Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletserver/query_list.go | 30 +++++++++++++------ .../vttablet/tabletserver/query_list_test.go | 9 ++++-- go/vt/vttablet/tabletserver/state_manager.go | 13 ++++---- 3 files changed, 35 insertions(+), 17 deletions(-) diff --git a/go/vt/vttablet/tabletserver/query_list.go b/go/vt/vttablet/tabletserver/query_list.go index 3ccf13418a7..a21acd6f92a 100644 --- a/go/vt/vttablet/tabletserver/query_list.go +++ b/go/vt/vttablet/tabletserver/query_list.go @@ -60,32 +60,44 @@ type QueryList struct { // and remove appropriately. queryDetails map[int64][]*QueryDetail - parser *sqlparser.Parser - clusterActionInProgress bool + parser *sqlparser.Parser + ca ClusterActionState } +type ClusterActionState int + +const ( + ClusterActionNotInProgress ClusterActionState = iota + ClusterActionInProgress ClusterActionState = iota + ClusterActionNoQueries ClusterActionState = iota +) + // NewQueryList creates a new QueryList func NewQueryList(name string, parser *sqlparser.Parser) *QueryList { return &QueryList{ - name: name, - queryDetails: make(map[int64][]*QueryDetail), - parser: parser, - clusterActionInProgress: false, + name: name, + queryDetails: make(map[int64][]*QueryDetail), + parser: parser, + ca: ClusterActionNotInProgress, } } // SetClusterAction sets the clusterActionInProgress field. -func (ql *QueryList) SetClusterAction(inProgress bool) { +func (ql *QueryList) SetClusterAction(ca ClusterActionState) { ql.mu.Lock() defer ql.mu.Unlock() - ql.clusterActionInProgress = inProgress + // If the current state is ClusterActionNotInProgress, then we want to ignore setting ClusterActionNoQueries. + if ca == ClusterActionNoQueries && ql.ca == ClusterActionNotInProgress { + return + } + ql.ca = ca } // Add adds a QueryDetail to QueryList func (ql *QueryList) Add(qd *QueryDetail) error { ql.mu.Lock() defer ql.mu.Unlock() - if ql.clusterActionInProgress { + if ql.ca == ClusterActionNoQueries { return vterrors.New(vtrpcpb.Code_CLUSTER_EVENT, vterrors.ShuttingDown) } qds, exists := ql.queryDetails[qd.connID] diff --git a/go/vt/vttablet/tabletserver/query_list_test.go b/go/vt/vttablet/tabletserver/query_list_test.go index bb1b47db4af..1e9dc2bf42c 100644 --- a/go/vt/vttablet/tabletserver/query_list_test.go +++ b/go/vt/vttablet/tabletserver/query_list_test.go @@ -102,11 +102,16 @@ func TestClusterAction(t *testing.T) { connID := int64(1) qd1 := NewQueryDetail(context.Background(), &testConn{id: connID}) - ql.SetClusterAction(true) + ql.SetClusterAction(ClusterActionInProgress) + ql.SetClusterAction(ClusterActionNoQueries) err := ql.Add(qd1) require.ErrorContains(t, err, "operation not allowed in state SHUTTING_DOWN") - ql.SetClusterAction(false) + ql.SetClusterAction(ClusterActionNotInProgress) + err = ql.Add(qd1) + require.NoError(t, err) + // If the current state is not in progress, then setting no queries, shouldn't change anything. + ql.SetClusterAction(ClusterActionNoQueries) err = ql.Add(qd1) require.NoError(t, err) } diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index c7f12201796..308f9165ba6 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -542,7 +542,8 @@ func (sm *stateManager) connect(tabletType topodatapb.TabletType) error { } func (sm *stateManager) unserveCommon() { - defer sm.markClusterAction(false) + sm.markClusterAction(ClusterActionInProgress) + defer sm.markClusterAction(ClusterActionNotInProgress) // We create a wait group that tracks whether all the queries have been terminated or not. wg := sync.WaitGroup{} wg.Add(1) @@ -603,7 +604,7 @@ func (sm *stateManager) terminateAllQueries(wg *sync.WaitGroup) (cancel func()) return } // Prevent any new queries from being added before we kill all the queries in the list. - sm.markClusterAction(true) + sm.markClusterAction(ClusterActionNoQueries) log.Infof("Grace Period %v exceeded. Killing all OLTP queries.", sm.shutdownGracePeriod) sm.statelessql.TerminateAll() log.Infof("Killed all stateless OLTP queries.") @@ -855,8 +856,8 @@ func (sm *stateManager) SetUnhealthyThreshold(v time.Duration) { } // markClusterAction marks whether a cluster action is in progress or not for all the query details. -func (sm *stateManager) markClusterAction(inProgress bool) { - sm.statefulql.SetClusterAction(inProgress) - sm.statelessql.SetClusterAction(inProgress) - sm.olapql.SetClusterAction(inProgress) +func (sm *stateManager) markClusterAction(ca ClusterActionState) { + sm.statefulql.SetClusterAction(ca) + sm.statelessql.SetClusterAction(ca) + sm.olapql.SetClusterAction(ca) }