Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a way to know if DemotePrimary is blocked and send it in the health stream #17289

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions go/vt/vterrors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ var (
VT09028 = errorWithState("VT09028", vtrpcpb.Code_FAILED_PRECONDITION, CTERecursiveForbiddenJoinOrder, "In recursive query block of Recursive Common Table Expression '%s', the recursive table must neither be in the right argument of a LEFT JOIN, nor be forced to be non-first with join order hints", "")
VT09029 = errorWithState("VT09029", vtrpcpb.Code_FAILED_PRECONDITION, CTERecursiveRequiresSingleReference, "In recursive query block of Recursive Common Table Expression %s, the recursive table must be referenced only once, and not in any subquery", "")
VT09030 = errorWithState("VT09030", vtrpcpb.Code_FAILED_PRECONDITION, CTEMaxRecursionDepth, "Recursive query aborted after 1000 iterations.", "")
VT09031 = errorWithoutState("VT09031", vtrpcpb.Code_FAILED_PRECONDITION, "Primary demotion is stalled", "")

VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.")
VT10002 = errorWithoutState("VT10002", vtrpcpb.Code_ABORTED, "atomic distributed transaction not allowed: %s", "The distributed transaction cannot be committed. A rollback decision is taken.")
Expand Down Expand Up @@ -192,6 +193,8 @@ var (
VT09027,
VT09028,
VT09029,
VT09030,
VT09031,
VT10001,
VT10002,
VT12001,
Expand Down
19 changes: 19 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package tabletmanager
import (
"context"
"fmt"
"runtime"
"strings"
"time"

Expand All @@ -29,6 +30,7 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver"
Expand Down Expand Up @@ -520,6 +522,23 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure
}
defer tm.unlock()

finishCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-finishCtx.Done():
// Finished running DemotePrimary. Nothing to do.
case <-time.After(10 * topo.RemoteOperationTimeout):
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
// We waited for over 10 times of remote operation timeout, but DemotePrimary is still not done.
// Collect more information and signal demote primary is indefinitely stalled.
log.Errorf("DemotePrimary seems to be stalled. Collecting more information.")
tm.QueryServiceControl.SetDemotePrimaryStalled()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't seem to ever reset this. Is that because once it is stalled the only solution is to restart the tablet?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we read the end of demotePrimary and we have called SetDemotePrimaryStalled, what is the correct course of action? it seems like we're assuming this will never happen. should we do something like block forever without returning, to ensure that the tablet doesn't accidentally make forward progress or re-enter the set of serving tablets?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is an inherent race between the finishCtx completing (DemotePrimary finishing) and the timeout triggering. For that matter, DemotePrimary can unblock and finish, after we've marked the tablet as Stalled. If it is successful, even then I don't really see an issue with the tablet rejoining the serving tablets, until it is eventually restarted by the operator.

Copy link
Collaborator

@maxenglander maxenglander Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even then I don't really see an issue with the tablet rejoining the serving tablets, until it is eventually restarted by the operator.

hm I think that is potentially a problem. because if the operator gets notified that a tablet is stalled, it's going to forcefully throw that tablet away with the assumption that (a) there is another tablet that is the real primary and (b) the stalled primary is not serving any traffic. if the stalled primary is able to rejoin the set of serving tablets, both of those assumptions go out the window, and it is unsafe for the operator to safely throw it away.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is true, this would trigger an ERS. Let me see if we can make the tablet not become serving ever again if it is stalled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay! I added few more safeties to ensure nothing goes wrong -

  1. After we set DemotePrimaryStalled we immediately trigger a health check update, which would make vtgate mark this tablet not-serving and not send it any requests ever again, because we never clear the field.
  2. For any requests already sent, if DemotePrimaryStalled is set, we won't process it on vttablet and instead just return an error.

I think with these safeguards we can be sure htat a vttablet is not going to accept any new writes once we mark it as stalled.

WDYT @maxenglander? Let's also wait for @deepthi to be able to take a look.

buf := make([]byte, 1<<16) // 64 KB buffer size
stackSize := runtime.Stack(buf, true)
log.Errorf("Stack trace:\n%s", string(buf[:stackSize]))
}
}()

tablet := tm.Tablet()
wasPrimary := tablet.Type == topodatapb.TabletType_PRIMARY
wasServing := tm.QueryServiceControl.IsServing()
Expand Down
51 changes: 51 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ package tabletmanager

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/tabletserver"
)

// TestWaitForGrantsToHaveApplied tests that waitForGrantsToHaveApplied only succeeds after waitForDBAGrants has been called.
Expand All @@ -42,3 +47,49 @@ func TestWaitForGrantsToHaveApplied(t *testing.T) {
err = tm.waitForGrantsToHaveApplied(secondContext)
require.NoError(t, err)
}

type demotePrimaryStallQS struct {
tabletserver.Controller
waitTime time.Duration
primaryStalled atomic.Bool
}

func (d *demotePrimaryStallQS) SetDemotePrimaryStalled() {
d.primaryStalled.Store(true)
}

func (d *demotePrimaryStallQS) IsServing() bool {
time.Sleep(d.waitTime)
return false
}

// TestDemotePrimaryStalled checks that if demote primary takes too long, then we mark it as stalled.
func TestDemotePrimaryStalled(t *testing.T) {
// Set remote operation timeout to a very low value.
origVal := topo.RemoteOperationTimeout
topo.RemoteOperationTimeout = 100 * time.Millisecond
defer func() {
topo.RemoteOperationTimeout = origVal
}()

// Create a fake query service control to intercept calls from DemotePrimary function.
qsc := &demotePrimaryStallQS{
waitTime: 2 * time.Second,
}
// Create a tablet manager with a replica type tablet.
tm := &TabletManager{
actionSema: semaphore.NewWeighted(1),
MysqlDaemon: newTestMysqlDaemon(t, 1),
tmState: &tmState{
displayState: displayState{
tablet: newTestTablet(t, 100, "ks", "-", map[string]string{}),
},
},
QueryServiceControl: qsc,
}

// We make IsServing stall for over 2 seconds, which is longer than 10 * remote operation timeout.
// This should cause the demote primary operation to be stalled.
tm.demotePrimary(context.Background(), false)
require.True(t, qsc.primaryStalled.Load())
}
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ type Controller interface {

// WaitForPreparedTwoPCTransactions waits for all prepared transactions to be resolved.
WaitForPreparedTwoPCTransactions(ctx context.Context) error

// SetDemotePrimaryStalled marks that demote primary is stalled in the state manager.
SetDemotePrimaryStalled()
}

// Ensure TabletServer satisfies Controller interface.
Expand Down
33 changes: 19 additions & 14 deletions go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,19 @@ type stateManager struct {
//
// If a transition fails, we set retrying to true and launch
// retryTransition which loops until the state converges.
mu sync.Mutex
wantState servingState
wantTabletType topodatapb.TabletType
state servingState
target *querypb.Target
ptsTimestamp time.Time
retrying bool
replHealthy bool
lameduck bool
alsoAllow []topodatapb.TabletType
reason string
transitionErr error
mu sync.Mutex
wantState servingState
wantTabletType topodatapb.TabletType
state servingState
target *querypb.Target
ptsTimestamp time.Time
retrying bool
replHealthy bool
demotePrimaryStalled bool
lameduck bool
alsoAllow []topodatapb.TabletType
reason string
transitionErr error

rw *requestsWaiter

Expand Down Expand Up @@ -387,7 +388,7 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target
sm.mu.Lock()
defer sm.mu.Unlock()

if sm.state != StateServing || !sm.replHealthy {
if sm.state != StateServing || !sm.replHealthy || sm.demotePrimaryStalled {
// This specific error string needs to be returned for vtgate buffering to work.
return vterrors.New(vtrpcpb.Code_CLUSTER_EVENT, vterrors.NotServing)
}
Expand Down Expand Up @@ -715,6 +716,10 @@ func (sm *stateManager) Broadcast() {
defer sm.mu.Unlock()

lag, err := sm.refreshReplHealthLocked()
if sm.demotePrimaryStalled {
// If we are stalled while demoting primary, we should send an error for it.
err = vterrors.VT09031()
}
sm.hs.ChangeState(sm.target.TabletType, sm.ptsTimestamp, lag, err, sm.isServingLocked())
}

Expand Down Expand Up @@ -772,7 +777,7 @@ func (sm *stateManager) IsServing() bool {
}

func (sm *stateManager) isServingLocked() bool {
return sm.state == StateServing && sm.wantState == StateServing && sm.replHealthy && !sm.lameduck
return sm.state == StateServing && sm.wantState == StateServing && sm.replHealthy && !sm.demotePrimaryStalled && !sm.lameduck
}

func (sm *stateManager) AppendDetails(details []*kv) []*kv {
Expand Down
39 changes: 39 additions & 0 deletions go/vt/vttablet/tabletserver/state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,45 @@ func TestStateManagerNotify(t *testing.T) {
sm.StopService()
}

func TestDemotePrimaryStalled(t *testing.T) {
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
sm := newTestStateManager()
defer sm.StopService()
err := sm.SetServingType(topodatapb.TabletType_PRIMARY, testNow, StateServing, "")
require.NoError(t, err)
// Stopping the ticker so that we don't get unexpected health streams.
sm.hcticks.Stop()

ch := make(chan *querypb.StreamHealthResponse, 5)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := sm.hs.Stream(context.Background(), func(shr *querypb.StreamHealthResponse) error {
ch <- shr
return nil
})
assert.Contains(t, err.Error(), "tabletserver is shutdown")
}()
defer wg.Wait()

// Send a broadcast message and check we have no error there.
sm.Broadcast()
gotshr := <-ch
require.Empty(t, gotshr.RealtimeStats.HealthError)

// If demote primary is stalled, then we should get an error.
sm.demotePrimaryStalled = true
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
sm.Broadcast()
gotshr = <-ch
require.EqualValues(t, "VT09031: Primary demotion is stalled", gotshr.RealtimeStats.HealthError)
// Verify that we can't start a new request once we have a demote primary stalled.
err = sm.StartRequest(context.Background(), &querypb.Target{TabletType: topodatapb.TabletType_PRIMARY}, false)
require.ErrorContains(t, err, "operation not allowed in state NOT_SERVING")

// Stop the state manager.
sm.StopService()
}

func TestRefreshReplHealthLocked(t *testing.T) {
sm := newTestStateManager()
defer sm.StopService()
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,14 @@ func (tsv *TabletServer) WaitForPreparedTwoPCTransactions(ctx context.Context) e
}
}

// SetDemotePrimaryStalled marks that demote primary is stalled in the state manager.
func (tsv *TabletServer) SetDemotePrimaryStalled() {
tsv.sm.mu.Lock()
tsv.sm.demotePrimaryStalled = true
tsv.sm.mu.Unlock()
tsv.BroadcastHealth()
}

// CreateTransaction creates the metadata for a 2PC transaction.
func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb.Target, dtid string, participants []*querypb.Target) (err error) {
return tsv.execRequest(
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletservermock/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ func (tqsc *Controller) WaitForPreparedTwoPCTransactions(context.Context) error
return nil
}

// SetDemotePrimaryStalled is part of the tabletserver.Controller interface
func (tqsc *Controller) SetDemotePrimaryStalled() {
tqsc.MethodCalled["SetDemotePrimaryStalled"] = true
}

// EnterLameduck implements tabletserver.Controller.
func (tqsc *Controller) EnterLameduck() {
tqsc.mu.Lock()
Expand Down
Loading