From 001fa3f768d839d306b5191b80f5e21c7476cf33 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 10 Jan 2024 15:19:00 +0530 Subject: [PATCH 1/4] test: add a failing test to see the panic Signed-off-by: Manan Gupta --- .../tabletserver/state_manager_test.go | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 4b88ce734d7..dfc9a615b0c 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -701,6 +701,29 @@ func TestRefreshReplHealthLocked(t *testing.T) { assert.False(t, sm.replHealthy) } +func TestPanicInWait(t *testing.T) { + sm := newTestStateManager(t) + sm.wantState = StateServing + sm.state = StateServing + sm.replHealthy = true + ctx := context.Background() + // Simulate an Execute RPC running + err := sm.StartRequest(ctx, sm.target, false) + require.NoError(t, err) + go func() { + time.Sleep(100 * time.Millisecond) + // Simulate the previous RPC finishing after some delay + sm.EndRequest() + // Simulate a COMMIT call arriving right afterwards + err = sm.StartRequest(ctx, sm.target, true) + require.NoError(t, err) + }() + + // Simulate going to a not serving state and calling unserveCommon that waits on requests. + sm.wantState = StateNotServing + sm.requests.Wait() +} + func verifySubcomponent(t *testing.T, order int64, component any, state testState) { tos := component.(orderState) assert.Equal(t, order, tos.Order()) From 3ac730c12fcf3e091042999a292684b3552e77a2 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 11 Jan 2024 13:47:22 +0530 Subject: [PATCH 2/4] feat: fix the problem by adding a counter for number of go routines blocked on waiting for requests Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletserver/state_manager.go | 24 +++++++++++++++++-- .../tabletserver/state_manager_test.go | 6 ++--- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 8aa7776957f..ebdc71354d5 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -99,6 +99,10 @@ type stateManager struct { alsoAllow []topodatapb.TabletType reason string transitionErr error + // requestsWaitCounter is the number of goroutines that are waiting for requests to be empty. + // If this value is greater than zero, then we have to ensure that we don't Add to the requests + // to avoid any panics in the wait. + requestsWaitCounter int requests sync.WaitGroup @@ -354,6 +358,20 @@ func (sm *stateManager) checkMySQL() { }() } +// addRequestsWaitCounter adds to the requestsWaitCounter while being protected by a mutex. +func (sm *stateManager) addRequestsWaitCounter(val int) { + sm.mu.Lock() + defer sm.mu.Unlock() + sm.requestsWaitCounter += val +} + +// waitForRequestsToBeEmpty waits for requests to be empty. It also increments and decrements the requestsWaitCounter as required. +func (sm *stateManager) waitForRequestsToBeEmpty() { + sm.addRequestsWaitCounter(1) + sm.requests.Wait() + sm.addRequestsWaitCounter(-1) +} + func (sm *stateManager) setWantState(stateWanted servingState) { sm.mu.Lock() defer sm.mu.Unlock() @@ -392,7 +410,9 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target } shuttingDown := sm.wantState != StateServing - if shuttingDown && !allowOnShutdown { + // If requestsWaitCounter is not zero, then there are go-routines blocked on waiting for requests to be empty. + // We cannot allow adding to the requests to prevent any panics from happening. + if sm.requestsWaitCounter > 0 || (shuttingDown && !allowOnShutdown) { // This specific error string needs to be returned for vtgate buffering to work. return vterrors.New(vtrpcpb.Code_CLUSTER_EVENT, vterrors.ShuttingDown) } @@ -560,7 +580,7 @@ func (sm *stateManager) unserveCommon() { log.Info("Finished Killing all OLAP queries. Started tracker close") sm.tracker.Close() log.Infof("Finished tracker close. Started wait for requests") - sm.requests.Wait() + sm.waitForRequestsToBeEmpty() log.Infof("Finished wait for requests. Finished execution of unserveCommon") } diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index dfc9a615b0c..cd72c7232c8 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -701,6 +701,7 @@ func TestRefreshReplHealthLocked(t *testing.T) { assert.False(t, sm.replHealthy) } +// TestPanicInWait tests that we don't panic when we wait for requests if more StartRequest calls come up after we start waiting. func TestPanicInWait(t *testing.T) { sm := newTestStateManager(t) sm.wantState = StateServing @@ -715,13 +716,12 @@ func TestPanicInWait(t *testing.T) { // Simulate the previous RPC finishing after some delay sm.EndRequest() // Simulate a COMMIT call arriving right afterwards - err = sm.StartRequest(ctx, sm.target, true) - require.NoError(t, err) + _ = sm.StartRequest(ctx, sm.target, true) }() // Simulate going to a not serving state and calling unserveCommon that waits on requests. sm.wantState = StateNotServing - sm.requests.Wait() + sm.waitForRequestsToBeEmpty() } func verifySubcomponent(t *testing.T, order int64, component any, state testState) { From f8bb597c4a22c7956d6e2b7e045f1e118c3cbaad Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 11 Jan 2024 13:48:33 +0530 Subject: [PATCH 3/4] empty commit to kick CI Signed-off-by: Manan Gupta From fab896644de1b42411cc55ffe47916192aec14b3 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 12 Jan 2024 14:43:52 +0530 Subject: [PATCH 4/4] refactor: reorder the or condition Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletserver/state_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index ebdc71354d5..9c01610f770 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -412,7 +412,7 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target shuttingDown := sm.wantState != StateServing // If requestsWaitCounter is not zero, then there are go-routines blocked on waiting for requests to be empty. // We cannot allow adding to the requests to prevent any panics from happening. - if sm.requestsWaitCounter > 0 || (shuttingDown && !allowOnShutdown) { + if (shuttingDown && !allowOnShutdown) || sm.requestsWaitCounter > 0 { // This specific error string needs to be returned for vtgate buffering to work. return vterrors.New(vtrpcpb.Code_CLUSTER_EVENT, vterrors.ShuttingDown) }