From 18632567695bd5e50ab823df35c2d277d0125d5b Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 27 Nov 2024 14:29:31 +0530 Subject: [PATCH 01/10] feat: add code to return demote primary being stuck error in healthstreamer Signed-off-by: Manan Gupta --- .../vttablet/tabletmanager/rpc_replication.go | 19 ++++++++++++ go/vt/vttablet/tabletserver/controller.go | 3 ++ go/vt/vttablet/tabletserver/state_manager.go | 29 +++++++++++-------- go/vt/vttablet/tabletserver/tabletserver.go | 5 ++++ go/vt/vttablet/tabletservermock/controller.go | 5 ++++ 5 files changed, 49 insertions(+), 12 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 90e4d835a79..7cb8caec844 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -19,6 +19,7 @@ package tabletmanager import ( "context" "fmt" + "runtime" "strings" "time" @@ -29,6 +30,7 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver" @@ -520,6 +522,23 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure } defer tm.unlock() + finishCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + select { + case <-finishCtx.Done(): + // Finished running DemotePrimary. Nothing to do. + case <-time.After(10 * topo.RemoteOperationTimeout): + // We waited for over 10 times of remote operation timeout, but DemotePrimary is still not done. + // Collect more information and signal demote primary is indefinitely stuck. + log.Errorf("DemotePrimary seems to be blocked. Collecting more information.") + tm.QueryServiceControl.SetDemotePrimaryBlocked() + buf := make([]byte, 1<<16) // 64 KB buffer size + stackSize := runtime.Stack(buf, true) + log.Errorf("Stack trace:\n%s", string(buf[:stackSize])) + } + }() + tablet := tm.Tablet() wasPrimary := tablet.Type == topodatapb.TabletType_PRIMARY wasServing := tm.QueryServiceControl.IsServing() diff --git a/go/vt/vttablet/tabletserver/controller.go b/go/vt/vttablet/tabletserver/controller.go index cef0dd2baee..9e13134ad17 100644 --- a/go/vt/vttablet/tabletserver/controller.go +++ b/go/vt/vttablet/tabletserver/controller.go @@ -119,6 +119,9 @@ type Controller interface { // WaitForPreparedTwoPCTransactions waits for all prepared transactions to be resolved. WaitForPreparedTwoPCTransactions(ctx context.Context) error + + // SetDemotePrimaryBlocked marks that demote primary is blocked in the state manager. + SetDemotePrimaryBlocked() } // Ensure TabletServer satisfies Controller interface. diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index cae6a237dc8..bcce3711439 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -87,18 +87,19 @@ type stateManager struct { // // If a transition fails, we set retrying to true and launch // retryTransition which loops until the state converges. - mu sync.Mutex - wantState servingState - wantTabletType topodatapb.TabletType - state servingState - target *querypb.Target - ptsTimestamp time.Time - retrying bool - replHealthy bool - lameduck bool - alsoAllow []topodatapb.TabletType - reason string - transitionErr error + mu sync.Mutex + wantState servingState + wantTabletType topodatapb.TabletType + state servingState + target *querypb.Target + ptsTimestamp time.Time + retrying bool + replHealthy bool + demotePrimaryBlocked bool + lameduck bool + alsoAllow []topodatapb.TabletType + reason string + transitionErr error rw *requestsWaiter @@ -715,6 +716,10 @@ func (sm *stateManager) Broadcast() { defer sm.mu.Unlock() lag, err := sm.refreshReplHealthLocked() + if sm.demotePrimaryBlocked { + // If we are blocked from demoting primary, we should send an error for it. + err = vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Demoting primary is blocked") + } sm.hs.ChangeState(sm.target.TabletType, sm.ptsTimestamp, lag, err, sm.isServingLocked()) } diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 847de25eb02..63a648fc37e 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -758,6 +758,11 @@ func (tsv *TabletServer) WaitForPreparedTwoPCTransactions(ctx context.Context) e } } +// SetDemotePrimaryBlocked marks that demote primary is blocked in the state manager. +func (tsv *TabletServer) SetDemotePrimaryBlocked() { + tsv.sm.demotePrimaryBlocked = true +} + // CreateTransaction creates the metadata for a 2PC transaction. func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb.Target, dtid string, participants []*querypb.Target) (err error) { return tsv.execRequest( diff --git a/go/vt/vttablet/tabletservermock/controller.go b/go/vt/vttablet/tabletservermock/controller.go index 9d570b8f6c7..3281072fb6c 100644 --- a/go/vt/vttablet/tabletservermock/controller.go +++ b/go/vt/vttablet/tabletservermock/controller.go @@ -274,6 +274,11 @@ func (tqsc *Controller) WaitForPreparedTwoPCTransactions(context.Context) error return nil } +// SetDemotePrimaryBlocked is part of the tabletserver.Controller interface +func (tqsc *Controller) SetDemotePrimaryBlocked() { + tqsc.MethodCalled["SetDemotePrimaryBlocked"] = true +} + // EnterLameduck implements tabletserver.Controller. func (tqsc *Controller) EnterLameduck() { tqsc.mu.Lock() From fcfd133059f0f112883f0d16ba9c73e20937f441 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 27 Nov 2024 16:04:08 +0530 Subject: [PATCH 02/10] test: add a test for demote primary blocked field Signed-off-by: Manan Gupta --- .../tabletserver/state_manager_test.go | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index df819c6f05c..d8f316ad45c 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -669,6 +669,42 @@ func TestStateManagerNotify(t *testing.T) { sm.StopService() } +func TestDemotePrimaryBlocked(t *testing.T) { + sm := newTestStateManager() + defer sm.StopService() + err := sm.SetServingType(topodatapb.TabletType_PRIMARY, testNow, StateServing, "") + require.NoError(t, err) + // Stopping the ticker so that we don't get unexpected health streams. + sm.hcticks.Stop() + + ch := make(chan *querypb.StreamHealthResponse, 5) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := sm.hs.Stream(context.Background(), func(shr *querypb.StreamHealthResponse) error { + ch <- shr + return nil + }) + assert.Contains(t, err.Error(), "tabletserver is shutdown") + }() + defer wg.Wait() + + // Send a broadcast message and check we have no error there. + sm.Broadcast() + gotshr := <-ch + require.Empty(t, gotshr.RealtimeStats.HealthError) + + // If demote primary is blocked, then we should get an error. + sm.demotePrimaryBlocked = true + sm.Broadcast() + gotshr = <-ch + require.EqualValues(t, "Demoting primary is blocked", gotshr.RealtimeStats.HealthError) + + // Stop the state manager. + sm.StopService() +} + func TestRefreshReplHealthLocked(t *testing.T) { sm := newTestStateManager() defer sm.StopService() From 0c20bed665e1fd8db61292c6c51b87cc761e19f4 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 3 Dec 2024 10:52:32 +0530 Subject: [PATCH 03/10] refactor: rename blocked to stalled Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletmanager/rpc_replication.go | 6 +++--- go/vt/vttablet/tabletserver/controller.go | 4 ++-- go/vt/vttablet/tabletserver/state_manager.go | 8 ++++---- go/vt/vttablet/tabletserver/state_manager_test.go | 8 ++++---- go/vt/vttablet/tabletserver/tabletserver.go | 6 +++--- go/vt/vttablet/tabletservermock/controller.go | 6 +++--- 6 files changed, 19 insertions(+), 19 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 7cb8caec844..49c8d0f7a7d 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -530,9 +530,9 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure // Finished running DemotePrimary. Nothing to do. case <-time.After(10 * topo.RemoteOperationTimeout): // We waited for over 10 times of remote operation timeout, but DemotePrimary is still not done. - // Collect more information and signal demote primary is indefinitely stuck. - log.Errorf("DemotePrimary seems to be blocked. Collecting more information.") - tm.QueryServiceControl.SetDemotePrimaryBlocked() + // Collect more information and signal demote primary is indefinitely stalled. + log.Errorf("DemotePrimary seems to be stalled. Collecting more information.") + tm.QueryServiceControl.SetDemotePrimaryStalled() buf := make([]byte, 1<<16) // 64 KB buffer size stackSize := runtime.Stack(buf, true) log.Errorf("Stack trace:\n%s", string(buf[:stackSize])) diff --git a/go/vt/vttablet/tabletserver/controller.go b/go/vt/vttablet/tabletserver/controller.go index 9e13134ad17..c4a4bef99fc 100644 --- a/go/vt/vttablet/tabletserver/controller.go +++ b/go/vt/vttablet/tabletserver/controller.go @@ -120,8 +120,8 @@ type Controller interface { // WaitForPreparedTwoPCTransactions waits for all prepared transactions to be resolved. WaitForPreparedTwoPCTransactions(ctx context.Context) error - // SetDemotePrimaryBlocked marks that demote primary is blocked in the state manager. - SetDemotePrimaryBlocked() + // SetDemotePrimaryStalled marks that demote primary is stalled in the state manager. + SetDemotePrimaryStalled() } // Ensure TabletServer satisfies Controller interface. diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index bcce3711439..1fc456b190e 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -95,7 +95,7 @@ type stateManager struct { ptsTimestamp time.Time retrying bool replHealthy bool - demotePrimaryBlocked bool + demotePrimaryStalled bool lameduck bool alsoAllow []topodatapb.TabletType reason string @@ -716,9 +716,9 @@ func (sm *stateManager) Broadcast() { defer sm.mu.Unlock() lag, err := sm.refreshReplHealthLocked() - if sm.demotePrimaryBlocked { - // If we are blocked from demoting primary, we should send an error for it. - err = vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Demoting primary is blocked") + if sm.demotePrimaryStalled { + // If we are stalled while demoting primary, we should send an error for it. + err = vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Failed to complete primary demotion") } sm.hs.ChangeState(sm.target.TabletType, sm.ptsTimestamp, lag, err, sm.isServingLocked()) } diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index d8f316ad45c..7ffb46bbd6c 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -669,7 +669,7 @@ func TestStateManagerNotify(t *testing.T) { sm.StopService() } -func TestDemotePrimaryBlocked(t *testing.T) { +func TestDemotePrimaryStalled(t *testing.T) { sm := newTestStateManager() defer sm.StopService() err := sm.SetServingType(topodatapb.TabletType_PRIMARY, testNow, StateServing, "") @@ -695,11 +695,11 @@ func TestDemotePrimaryBlocked(t *testing.T) { gotshr := <-ch require.Empty(t, gotshr.RealtimeStats.HealthError) - // If demote primary is blocked, then we should get an error. - sm.demotePrimaryBlocked = true + // If demote primary is stalled, then we should get an error. + sm.demotePrimaryStalled = true sm.Broadcast() gotshr = <-ch - require.EqualValues(t, "Demoting primary is blocked", gotshr.RealtimeStats.HealthError) + require.EqualValues(t, "Failed to complete primary demotion", gotshr.RealtimeStats.HealthError) // Stop the state manager. sm.StopService() diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 63a648fc37e..a3cd025bc9b 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -758,9 +758,9 @@ func (tsv *TabletServer) WaitForPreparedTwoPCTransactions(ctx context.Context) e } } -// SetDemotePrimaryBlocked marks that demote primary is blocked in the state manager. -func (tsv *TabletServer) SetDemotePrimaryBlocked() { - tsv.sm.demotePrimaryBlocked = true +// SetDemotePrimaryStalled marks that demote primary is stalled in the state manager. +func (tsv *TabletServer) SetDemotePrimaryStalled() { + tsv.sm.demotePrimaryStalled = true } // CreateTransaction creates the metadata for a 2PC transaction. diff --git a/go/vt/vttablet/tabletservermock/controller.go b/go/vt/vttablet/tabletservermock/controller.go index 3281072fb6c..a5242751454 100644 --- a/go/vt/vttablet/tabletservermock/controller.go +++ b/go/vt/vttablet/tabletservermock/controller.go @@ -274,9 +274,9 @@ func (tqsc *Controller) WaitForPreparedTwoPCTransactions(context.Context) error return nil } -// SetDemotePrimaryBlocked is part of the tabletserver.Controller interface -func (tqsc *Controller) SetDemotePrimaryBlocked() { - tqsc.MethodCalled["SetDemotePrimaryBlocked"] = true +// SetDemotePrimaryStalled is part of the tabletserver.Controller interface +func (tqsc *Controller) SetDemotePrimaryStalled() { + tqsc.MethodCalled["SetDemotePrimaryStalled"] = true } // EnterLameduck implements tabletserver.Controller. From 743db0977fad8cccbd7ae4e305e65ac0be0d75a0 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 16 Dec 2024 17:00:46 +0530 Subject: [PATCH 04/10] feat: add a constant error for demote primary stall Signed-off-by: Manan Gupta --- go/vt/vterrors/code.go | 3 +++ go/vt/vttablet/tabletserver/state_manager.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/go/vt/vterrors/code.go b/go/vt/vterrors/code.go index 31c98cef280..85ff051d8f9 100644 --- a/go/vt/vterrors/code.go +++ b/go/vt/vterrors/code.go @@ -102,6 +102,7 @@ var ( VT09028 = errorWithState("VT09028", vtrpcpb.Code_FAILED_PRECONDITION, CTERecursiveForbiddenJoinOrder, "In recursive query block of Recursive Common Table Expression '%s', the recursive table must neither be in the right argument of a LEFT JOIN, nor be forced to be non-first with join order hints", "") VT09029 = errorWithState("VT09029", vtrpcpb.Code_FAILED_PRECONDITION, CTERecursiveRequiresSingleReference, "In recursive query block of Recursive Common Table Expression %s, the recursive table must be referenced only once, and not in any subquery", "") VT09030 = errorWithState("VT09030", vtrpcpb.Code_FAILED_PRECONDITION, CTEMaxRecursionDepth, "Recursive query aborted after 1000 iterations.", "") + VT09031 = errorWithoutState("VT09031", vtrpcpb.Code_FAILED_PRECONDITION, "Failed to complete primary demotion", "") VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.") VT10002 = errorWithoutState("VT10002", vtrpcpb.Code_ABORTED, "atomic distributed transaction not allowed: %s", "The distributed transaction cannot be committed. A rollback decision is taken.") @@ -192,6 +193,8 @@ var ( VT09027, VT09028, VT09029, + VT09030, + VT09031, VT10001, VT10002, VT12001, diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 1fc456b190e..bd1082d36f4 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -718,7 +718,7 @@ func (sm *stateManager) Broadcast() { lag, err := sm.refreshReplHealthLocked() if sm.demotePrimaryStalled { // If we are stalled while demoting primary, we should send an error for it. - err = vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Failed to complete primary demotion") + err = vterrors.VT09031() } sm.hs.ChangeState(sm.target.TabletType, sm.ptsTimestamp, lag, err, sm.isServingLocked()) } From 3dd03760b5a80df1dd7ab1912718d680f6e3c526 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 16 Dec 2024 18:17:06 +0530 Subject: [PATCH 05/10] test: fix test expectation Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletserver/state_manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 7ffb46bbd6c..9c8d9c98830 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -699,7 +699,7 @@ func TestDemotePrimaryStalled(t *testing.T) { sm.demotePrimaryStalled = true sm.Broadcast() gotshr = <-ch - require.EqualValues(t, "Failed to complete primary demotion", gotshr.RealtimeStats.HealthError) + require.EqualValues(t, "VT09031: Failed to complete primary demotion", gotshr.RealtimeStats.HealthError) // Stop the state manager. sm.StopService() From e278e118a2b6f5355e2856b9c2c75b8135e530b8 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 16 Dec 2024 20:20:42 +0530 Subject: [PATCH 06/10] feat: rename the error message Signed-off-by: Manan Gupta --- go/vt/vterrors/code.go | 2 +- go/vt/vttablet/tabletserver/state_manager_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vterrors/code.go b/go/vt/vterrors/code.go index 85ff051d8f9..0ca275b71fc 100644 --- a/go/vt/vterrors/code.go +++ b/go/vt/vterrors/code.go @@ -102,7 +102,7 @@ var ( VT09028 = errorWithState("VT09028", vtrpcpb.Code_FAILED_PRECONDITION, CTERecursiveForbiddenJoinOrder, "In recursive query block of Recursive Common Table Expression '%s', the recursive table must neither be in the right argument of a LEFT JOIN, nor be forced to be non-first with join order hints", "") VT09029 = errorWithState("VT09029", vtrpcpb.Code_FAILED_PRECONDITION, CTERecursiveRequiresSingleReference, "In recursive query block of Recursive Common Table Expression %s, the recursive table must be referenced only once, and not in any subquery", "") VT09030 = errorWithState("VT09030", vtrpcpb.Code_FAILED_PRECONDITION, CTEMaxRecursionDepth, "Recursive query aborted after 1000 iterations.", "") - VT09031 = errorWithoutState("VT09031", vtrpcpb.Code_FAILED_PRECONDITION, "Failed to complete primary demotion", "") + VT09031 = errorWithoutState("VT09031", vtrpcpb.Code_FAILED_PRECONDITION, "Primary demotion is stalled", "") VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.") VT10002 = errorWithoutState("VT10002", vtrpcpb.Code_ABORTED, "atomic distributed transaction not allowed: %s", "The distributed transaction cannot be committed. A rollback decision is taken.") diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 9c8d9c98830..58f293d52b3 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -699,7 +699,7 @@ func TestDemotePrimaryStalled(t *testing.T) { sm.demotePrimaryStalled = true sm.Broadcast() gotshr = <-ch - require.EqualValues(t, "VT09031: Failed to complete primary demotion", gotshr.RealtimeStats.HealthError) + require.EqualValues(t, "VT09031: Primary demotion is stalled", gotshr.RealtimeStats.HealthError) // Stop the state manager. sm.StopService() From 0b8c3f42ea1fddca64ae4a4be4482241e8ec2181 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 18 Dec 2024 11:59:04 +0530 Subject: [PATCH 07/10] feat: add few more changes for safety Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletserver/state_manager.go | 4 ++-- go/vt/vttablet/tabletserver/tabletserver.go | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index bd1082d36f4..4512b26f177 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -388,7 +388,7 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target sm.mu.Lock() defer sm.mu.Unlock() - if sm.state != StateServing || !sm.replHealthy { + if sm.state != StateServing || !sm.replHealthy || sm.demotePrimaryStalled { // This specific error string needs to be returned for vtgate buffering to work. return vterrors.New(vtrpcpb.Code_CLUSTER_EVENT, vterrors.NotServing) } @@ -777,7 +777,7 @@ func (sm *stateManager) IsServing() bool { } func (sm *stateManager) isServingLocked() bool { - return sm.state == StateServing && sm.wantState == StateServing && sm.replHealthy && !sm.lameduck + return sm.state == StateServing && sm.wantState == StateServing && sm.replHealthy && !sm.demotePrimaryStalled && !sm.lameduck } func (sm *stateManager) AppendDetails(details []*kv) []*kv { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index a3cd025bc9b..deeac10bd05 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -760,7 +760,10 @@ func (tsv *TabletServer) WaitForPreparedTwoPCTransactions(ctx context.Context) e // SetDemotePrimaryStalled marks that demote primary is stalled in the state manager. func (tsv *TabletServer) SetDemotePrimaryStalled() { + tsv.sm.mu.Lock() tsv.sm.demotePrimaryStalled = true + tsv.sm.mu.Unlock() + tsv.BroadcastHealth() } // CreateTransaction creates the metadata for a 2PC transaction. From 7258704944487d4beefa339cb2240fa9bb7fda7c Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 19 Dec 2024 17:29:22 +0530 Subject: [PATCH 08/10] test: add test for verifying we can't start a new request Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletserver/state_manager_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 58f293d52b3..f8059d6edea 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -700,6 +700,9 @@ func TestDemotePrimaryStalled(t *testing.T) { sm.Broadcast() gotshr = <-ch require.EqualValues(t, "VT09031: Primary demotion is stalled", gotshr.RealtimeStats.HealthError) + // Verify that we can't start a new request once we have a demote primary stalled. + err = sm.StartRequest(context.Background(), &querypb.Target{TabletType: topodatapb.TabletType_PRIMARY}, false) + require.ErrorContains(t, err, "operation not allowed in state NOT_SERVING") // Stop the state manager. sm.StopService() From 50f311ac108ecd38c1314ac8153f905a1fefd509 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 19 Dec 2024 18:30:48 +0530 Subject: [PATCH 09/10] test: add test for verifying we mark demote primary stalled if it takes too longs Signed-off-by: Manan Gupta --- .../tabletmanager/rpc_replication_test.go | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/go/vt/vttablet/tabletmanager/rpc_replication_test.go b/go/vt/vttablet/tabletmanager/rpc_replication_test.go index c587f1e24b8..a8f2a114f5e 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication_test.go @@ -22,6 +22,10 @@ import ( "time" "github.com/stretchr/testify/require" + "golang.org/x/sync/semaphore" + + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vttablet/tabletserver" ) // TestWaitForGrantsToHaveApplied tests that waitForGrantsToHaveApplied only succeeds after waitForDBAGrants has been called. @@ -42,3 +46,50 @@ func TestWaitForGrantsToHaveApplied(t *testing.T) { err = tm.waitForGrantsToHaveApplied(secondContext) require.NoError(t, err) } + +type demotePrimaryStallQS struct { + tabletserver.Controller + waitTime time.Duration + primaryStalled bool +} + +func (d *demotePrimaryStallQS) SetDemotePrimaryStalled() { + d.primaryStalled = true +} + +func (d *demotePrimaryStallQS) IsServing() bool { + time.Sleep(d.waitTime) + return false +} + +// TestDemotePrimaryStalled checks that if demote primary takes too long, then we mark it as stalled. +func TestDemotePrimaryStalled(t *testing.T) { + // Set remote operation timeout to a very low value. + origVal := topo.RemoteOperationTimeout + topo.RemoteOperationTimeout = 100 * time.Millisecond + defer func() { + topo.RemoteOperationTimeout = origVal + }() + + // Create a fake query service control to intercept calls from DemotePrimary function. + qsc := &demotePrimaryStallQS{ + waitTime: 2 * time.Second, + primaryStalled: false, + } + // Create a tablet manager with a replica type tablet. + tm := &TabletManager{ + actionSema: semaphore.NewWeighted(1), + MysqlDaemon: newTestMysqlDaemon(t, 1), + tmState: &tmState{ + displayState: displayState{ + tablet: newTestTablet(t, 100, "ks", "-", map[string]string{}), + }, + }, + QueryServiceControl: qsc, + } + + // We make IsServing stall for over 2 seconds, which is longer than 10 * remote operation timeout. + // This should cause the demote primary operation to be stalled. + tm.demotePrimary(context.Background(), false) + require.True(t, qsc.primaryStalled) +} From a7cbc4a57b8189f38a4c817bcf04cbe4887c9ace Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Sun, 22 Dec 2024 18:50:16 +0530 Subject: [PATCH 10/10] test: fix data race Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletmanager/rpc_replication_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_replication_test.go b/go/vt/vttablet/tabletmanager/rpc_replication_test.go index a8f2a114f5e..b388235811b 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication_test.go @@ -18,6 +18,7 @@ package tabletmanager import ( "context" + "sync/atomic" "testing" "time" @@ -50,11 +51,11 @@ func TestWaitForGrantsToHaveApplied(t *testing.T) { type demotePrimaryStallQS struct { tabletserver.Controller waitTime time.Duration - primaryStalled bool + primaryStalled atomic.Bool } func (d *demotePrimaryStallQS) SetDemotePrimaryStalled() { - d.primaryStalled = true + d.primaryStalled.Store(true) } func (d *demotePrimaryStallQS) IsServing() bool { @@ -73,8 +74,7 @@ func TestDemotePrimaryStalled(t *testing.T) { // Create a fake query service control to intercept calls from DemotePrimary function. qsc := &demotePrimaryStallQS{ - waitTime: 2 * time.Second, - primaryStalled: false, + waitTime: 2 * time.Second, } // Create a tablet manager with a replica type tablet. tm := &TabletManager{ @@ -91,5 +91,5 @@ func TestDemotePrimaryStalled(t *testing.T) { // We make IsServing stall for over 2 seconds, which is longer than 10 * remote operation timeout. // This should cause the demote primary operation to be stalled. tm.demotePrimary(context.Background(), false) - require.True(t, qsc.primaryStalled) + require.True(t, qsc.primaryStalled.Load()) }