diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index df819c6f05c..d8f316ad45c 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -669,6 +669,42 @@ func TestStateManagerNotify(t *testing.T) { sm.StopService() } +func TestDemotePrimaryBlocked(t *testing.T) { + sm := newTestStateManager() + defer sm.StopService() + err := sm.SetServingType(topodatapb.TabletType_PRIMARY, testNow, StateServing, "") + require.NoError(t, err) + // Stopping the ticker so that we don't get unexpected health streams. + sm.hcticks.Stop() + + ch := make(chan *querypb.StreamHealthResponse, 5) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := sm.hs.Stream(context.Background(), func(shr *querypb.StreamHealthResponse) error { + ch <- shr + return nil + }) + assert.Contains(t, err.Error(), "tabletserver is shutdown") + }() + defer wg.Wait() + + // Send a broadcast message and check we have no error there. + sm.Broadcast() + gotshr := <-ch + require.Empty(t, gotshr.RealtimeStats.HealthError) + + // If demote primary is blocked, then we should get an error. + sm.demotePrimaryBlocked = true + sm.Broadcast() + gotshr = <-ch + require.EqualValues(t, "Demoting primary is blocked", gotshr.RealtimeStats.HealthError) + + // Stop the state manager. + sm.StopService() +} + func TestRefreshReplHealthLocked(t *testing.T) { sm := newTestStateManager() defer sm.StopService()