diff --git a/go/vt/vterrors/code.go b/go/vt/vterrors/code.go index 31c98cef280..0ca275b71fc 100644 --- a/go/vt/vterrors/code.go +++ b/go/vt/vterrors/code.go @@ -102,6 +102,7 @@ var ( VT09028 = errorWithState("VT09028", vtrpcpb.Code_FAILED_PRECONDITION, CTERecursiveForbiddenJoinOrder, "In recursive query block of Recursive Common Table Expression '%s', the recursive table must neither be in the right argument of a LEFT JOIN, nor be forced to be non-first with join order hints", "") VT09029 = errorWithState("VT09029", vtrpcpb.Code_FAILED_PRECONDITION, CTERecursiveRequiresSingleReference, "In recursive query block of Recursive Common Table Expression %s, the recursive table must be referenced only once, and not in any subquery", "") VT09030 = errorWithState("VT09030", vtrpcpb.Code_FAILED_PRECONDITION, CTEMaxRecursionDepth, "Recursive query aborted after 1000 iterations.", "") + VT09031 = errorWithoutState("VT09031", vtrpcpb.Code_FAILED_PRECONDITION, "Primary demotion is stalled", "") VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.") VT10002 = errorWithoutState("VT10002", vtrpcpb.Code_ABORTED, "atomic distributed transaction not allowed: %s", "The distributed transaction cannot be committed. A rollback decision is taken.") @@ -192,6 +193,8 @@ var ( VT09027, VT09028, VT09029, + VT09030, + VT09031, VT10001, VT10002, VT12001, diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 90e4d835a79..49c8d0f7a7d 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -19,6 +19,7 @@ package tabletmanager import ( "context" "fmt" + "runtime" "strings" "time" @@ -29,6 +30,7 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver" @@ -520,6 +522,23 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure } defer tm.unlock() + finishCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + select { + case <-finishCtx.Done(): + // Finished running DemotePrimary. Nothing to do. + case <-time.After(10 * topo.RemoteOperationTimeout): + // We waited for over 10 times of remote operation timeout, but DemotePrimary is still not done. + // Collect more information and signal demote primary is indefinitely stalled. + log.Errorf("DemotePrimary seems to be stalled. Collecting more information.") + tm.QueryServiceControl.SetDemotePrimaryStalled() + buf := make([]byte, 1<<16) // 64 KB buffer size + stackSize := runtime.Stack(buf, true) + log.Errorf("Stack trace:\n%s", string(buf[:stackSize])) + } + }() + tablet := tm.Tablet() wasPrimary := tablet.Type == topodatapb.TabletType_PRIMARY wasServing := tm.QueryServiceControl.IsServing() diff --git a/go/vt/vttablet/tabletmanager/rpc_replication_test.go b/go/vt/vttablet/tabletmanager/rpc_replication_test.go index c587f1e24b8..b388235811b 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication_test.go @@ -18,10 +18,15 @@ package tabletmanager import ( "context" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/require" + "golang.org/x/sync/semaphore" + + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vttablet/tabletserver" ) // TestWaitForGrantsToHaveApplied tests that waitForGrantsToHaveApplied only succeeds after waitForDBAGrants has been called. @@ -42,3 +47,49 @@ func TestWaitForGrantsToHaveApplied(t *testing.T) { err = tm.waitForGrantsToHaveApplied(secondContext) require.NoError(t, err) } + +type demotePrimaryStallQS struct { + tabletserver.Controller + waitTime time.Duration + primaryStalled atomic.Bool +} + +func (d *demotePrimaryStallQS) SetDemotePrimaryStalled() { + d.primaryStalled.Store(true) +} + +func (d *demotePrimaryStallQS) IsServing() bool { + time.Sleep(d.waitTime) + return false +} + +// TestDemotePrimaryStalled checks that if demote primary takes too long, then we mark it as stalled. +func TestDemotePrimaryStalled(t *testing.T) { + // Set remote operation timeout to a very low value. + origVal := topo.RemoteOperationTimeout + topo.RemoteOperationTimeout = 100 * time.Millisecond + defer func() { + topo.RemoteOperationTimeout = origVal + }() + + // Create a fake query service control to intercept calls from DemotePrimary function. + qsc := &demotePrimaryStallQS{ + waitTime: 2 * time.Second, + } + // Create a tablet manager with a replica type tablet. + tm := &TabletManager{ + actionSema: semaphore.NewWeighted(1), + MysqlDaemon: newTestMysqlDaemon(t, 1), + tmState: &tmState{ + displayState: displayState{ + tablet: newTestTablet(t, 100, "ks", "-", map[string]string{}), + }, + }, + QueryServiceControl: qsc, + } + + // We make IsServing stall for over 2 seconds, which is longer than 10 * remote operation timeout. + // This should cause the demote primary operation to be stalled. + tm.demotePrimary(context.Background(), false) + require.True(t, qsc.primaryStalled.Load()) +} diff --git a/go/vt/vttablet/tabletserver/controller.go b/go/vt/vttablet/tabletserver/controller.go index cef0dd2baee..c4a4bef99fc 100644 --- a/go/vt/vttablet/tabletserver/controller.go +++ b/go/vt/vttablet/tabletserver/controller.go @@ -119,6 +119,9 @@ type Controller interface { // WaitForPreparedTwoPCTransactions waits for all prepared transactions to be resolved. WaitForPreparedTwoPCTransactions(ctx context.Context) error + + // SetDemotePrimaryStalled marks that demote primary is stalled in the state manager. + SetDemotePrimaryStalled() } // Ensure TabletServer satisfies Controller interface. diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index cae6a237dc8..4512b26f177 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -87,18 +87,19 @@ type stateManager struct { // // If a transition fails, we set retrying to true and launch // retryTransition which loops until the state converges. - mu sync.Mutex - wantState servingState - wantTabletType topodatapb.TabletType - state servingState - target *querypb.Target - ptsTimestamp time.Time - retrying bool - replHealthy bool - lameduck bool - alsoAllow []topodatapb.TabletType - reason string - transitionErr error + mu sync.Mutex + wantState servingState + wantTabletType topodatapb.TabletType + state servingState + target *querypb.Target + ptsTimestamp time.Time + retrying bool + replHealthy bool + demotePrimaryStalled bool + lameduck bool + alsoAllow []topodatapb.TabletType + reason string + transitionErr error rw *requestsWaiter @@ -387,7 +388,7 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target sm.mu.Lock() defer sm.mu.Unlock() - if sm.state != StateServing || !sm.replHealthy { + if sm.state != StateServing || !sm.replHealthy || sm.demotePrimaryStalled { // This specific error string needs to be returned for vtgate buffering to work. return vterrors.New(vtrpcpb.Code_CLUSTER_EVENT, vterrors.NotServing) } @@ -715,6 +716,10 @@ func (sm *stateManager) Broadcast() { defer sm.mu.Unlock() lag, err := sm.refreshReplHealthLocked() + if sm.demotePrimaryStalled { + // If we are stalled while demoting primary, we should send an error for it. + err = vterrors.VT09031() + } sm.hs.ChangeState(sm.target.TabletType, sm.ptsTimestamp, lag, err, sm.isServingLocked()) } @@ -772,7 +777,7 @@ func (sm *stateManager) IsServing() bool { } func (sm *stateManager) isServingLocked() bool { - return sm.state == StateServing && sm.wantState == StateServing && sm.replHealthy && !sm.lameduck + return sm.state == StateServing && sm.wantState == StateServing && sm.replHealthy && !sm.demotePrimaryStalled && !sm.lameduck } func (sm *stateManager) AppendDetails(details []*kv) []*kv { diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index df819c6f05c..f8059d6edea 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -669,6 +669,45 @@ func TestStateManagerNotify(t *testing.T) { sm.StopService() } +func TestDemotePrimaryStalled(t *testing.T) { + sm := newTestStateManager() + defer sm.StopService() + err := sm.SetServingType(topodatapb.TabletType_PRIMARY, testNow, StateServing, "") + require.NoError(t, err) + // Stopping the ticker so that we don't get unexpected health streams. + sm.hcticks.Stop() + + ch := make(chan *querypb.StreamHealthResponse, 5) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := sm.hs.Stream(context.Background(), func(shr *querypb.StreamHealthResponse) error { + ch <- shr + return nil + }) + assert.Contains(t, err.Error(), "tabletserver is shutdown") + }() + defer wg.Wait() + + // Send a broadcast message and check we have no error there. + sm.Broadcast() + gotshr := <-ch + require.Empty(t, gotshr.RealtimeStats.HealthError) + + // If demote primary is stalled, then we should get an error. + sm.demotePrimaryStalled = true + sm.Broadcast() + gotshr = <-ch + require.EqualValues(t, "VT09031: Primary demotion is stalled", gotshr.RealtimeStats.HealthError) + // Verify that we can't start a new request once we have a demote primary stalled. + err = sm.StartRequest(context.Background(), &querypb.Target{TabletType: topodatapb.TabletType_PRIMARY}, false) + require.ErrorContains(t, err, "operation not allowed in state NOT_SERVING") + + // Stop the state manager. + sm.StopService() +} + func TestRefreshReplHealthLocked(t *testing.T) { sm := newTestStateManager() defer sm.StopService() diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 847de25eb02..deeac10bd05 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -758,6 +758,14 @@ func (tsv *TabletServer) WaitForPreparedTwoPCTransactions(ctx context.Context) e } } +// SetDemotePrimaryStalled marks that demote primary is stalled in the state manager. +func (tsv *TabletServer) SetDemotePrimaryStalled() { + tsv.sm.mu.Lock() + tsv.sm.demotePrimaryStalled = true + tsv.sm.mu.Unlock() + tsv.BroadcastHealth() +} + // CreateTransaction creates the metadata for a 2PC transaction. func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb.Target, dtid string, participants []*querypb.Target) (err error) { return tsv.execRequest( diff --git a/go/vt/vttablet/tabletservermock/controller.go b/go/vt/vttablet/tabletservermock/controller.go index 9d570b8f6c7..a5242751454 100644 --- a/go/vt/vttablet/tabletservermock/controller.go +++ b/go/vt/vttablet/tabletservermock/controller.go @@ -274,6 +274,11 @@ func (tqsc *Controller) WaitForPreparedTwoPCTransactions(context.Context) error return nil } +// SetDemotePrimaryStalled is part of the tabletserver.Controller interface +func (tqsc *Controller) SetDemotePrimaryStalled() { + tqsc.MethodCalled["SetDemotePrimaryStalled"] = true +} + // EnterLameduck implements tabletserver.Controller. func (tqsc *Controller) EnterLameduck() { tqsc.mu.Lock()