From 5e14251bbfe486d22a021754d6dafc83670f0134 Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Fri, 15 Nov 2024 10:07:58 +0530 Subject: [PATCH] Fix deadlock in messager and health streamer (#17230) Signed-off-by: Manan Gupta --- .../vttablet/tabletserver/health_streamer.go | 56 +++++++++++-------- .../tabletserver/health_streamer_test.go | 40 +++++++++++++ .../vttablet/tabletserver/messager/engine.go | 30 ++++++---- .../tabletserver/messager/engine_test.go | 33 ++++++++++- go/vt/vttablet/tabletserver/schema/engine.go | 8 +++ 5 files changed, 132 insertions(+), 35 deletions(-) diff --git a/go/vt/vttablet/tabletserver/health_streamer.go b/go/vt/vttablet/tabletserver/health_streamer.go index 8ff6834aeaf..ffb42df747f 100644 --- a/go/vt/vttablet/tabletserver/health_streamer.go +++ b/go/vt/vttablet/tabletserver/health_streamer.go @@ -68,11 +68,18 @@ type healthStreamer struct { degradedThreshold time.Duration unhealthyThreshold atomic.Int64 - mu sync.Mutex - ctx context.Context - cancel context.CancelFunc - clients map[chan *querypb.StreamHealthResponse]struct{} - state *querypb.StreamHealthResponse + // cancelMu is a mutex used to protect the cancel variable + // and for ensuring we don't call setup functions in parallel. + cancelMu sync.Mutex + ctx context.Context + cancel context.CancelFunc + + // fieldsMu is used to protect access to the fields below. + // We require two separate mutexes, so that we don't have to acquire the same mutex + // in Close and reload that can lead to a deadlock described in https://github.com/vitessio/vitess/issues/17229#issuecomment-2476136610. + fieldsMu sync.Mutex + clients map[chan *querypb.StreamHealthResponse]struct{} + state *querypb.StreamHealthResponse // isServingPrimary stores if this tablet is currently the serving primary or not. isServingPrimary bool @@ -126,8 +133,8 @@ func (hs *healthStreamer) InitDBConfig(target *querypb.Target, cp dbconfigs.Conn } func (hs *healthStreamer) Open() { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.cancelMu.Lock() + defer hs.cancelMu.Unlock() if hs.cancel != nil { return @@ -140,8 +147,8 @@ func (hs *healthStreamer) Open() { } func (hs *healthStreamer) Close() { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.cancelMu.Lock() + defer hs.cancelMu.Unlock() if hs.cancel != nil { hs.se.UnregisterNotifier("healthStreamer") @@ -182,13 +189,16 @@ func (hs *healthStreamer) Stream(ctx context.Context, callback func(*querypb.Str } func (hs *healthStreamer) register() (chan *querypb.StreamHealthResponse, context.Context) { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.cancelMu.Lock() + defer hs.cancelMu.Unlock() if hs.cancel == nil { return nil, nil } + hs.fieldsMu.Lock() + defer hs.fieldsMu.Unlock() + ch := make(chan *querypb.StreamHealthResponse, streamHealthBufferSize) hs.clients[ch] = struct{}{} @@ -198,15 +208,15 @@ func (hs *healthStreamer) register() (chan *querypb.StreamHealthResponse, contex } func (hs *healthStreamer) unregister(ch chan *querypb.StreamHealthResponse) { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.fieldsMu.Lock() + defer hs.fieldsMu.Unlock() delete(hs.clients, ch) } func (hs *healthStreamer) ChangeState(tabletType topodatapb.TabletType, ptsTimestamp time.Time, lag time.Duration, err error, serving bool) { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.fieldsMu.Lock() + defer hs.fieldsMu.Unlock() hs.state.Target.TabletType = tabletType if tabletType == topodatapb.TabletType_PRIMARY { @@ -260,8 +270,8 @@ func (hs *healthStreamer) broadCastToClients(shr *querypb.StreamHealthResponse) } func (hs *healthStreamer) AppendDetails(details []*kv) []*kv { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.fieldsMu.Lock() + defer hs.fieldsMu.Unlock() if hs.state.Target.TabletType == topodatapb.TabletType_PRIMARY { return details } @@ -306,8 +316,8 @@ func (hs *healthStreamer) SetUnhealthyThreshold(v time.Duration) { // MakePrimary tells the healthstreamer that the current tablet is now the primary, // so it can read and write to the MySQL instance for schema-tracking. func (hs *healthStreamer) MakePrimary(serving bool) { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.fieldsMu.Lock() + defer hs.fieldsMu.Unlock() hs.isServingPrimary = serving // We register for notifications from the schema Engine only when schema tracking is enabled, // and we are going to a serving primary state. @@ -322,15 +332,15 @@ func (hs *healthStreamer) MakePrimary(serving bool) { // MakeNonPrimary tells the healthstreamer that the current tablet is now not a primary. func (hs *healthStreamer) MakeNonPrimary() { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.fieldsMu.Lock() + defer hs.fieldsMu.Unlock() hs.isServingPrimary = false } // reload reloads the schema from the underlying mysql for the tables that we get the alert on. func (hs *healthStreamer) reload(full map[string]*schema.Table, created, altered, dropped []*schema.Table) error { - hs.mu.Lock() - defer hs.mu.Unlock() + hs.fieldsMu.Lock() + defer hs.fieldsMu.Unlock() // Schema Reload to happen only on primary when it is serving. // We can be in a state when the primary is not serving after we have run DemotePrimary. In that case, // we don't want to run any queries in MySQL, so we shouldn't reload anything in the healthStreamer. diff --git a/go/vt/vttablet/tabletserver/health_streamer_test.go b/go/vt/vttablet/tabletserver/health_streamer_test.go index ad764a970a2..c5fe93bcf6c 100644 --- a/go/vt/vttablet/tabletserver/health_streamer_test.go +++ b/go/vt/vttablet/tabletserver/health_streamer_test.go @@ -564,3 +564,43 @@ func testStream(hs *healthStreamer) (<-chan *querypb.StreamHealthResponse, conte func testBlpFunc() (int64, int32) { return 1, 2 } + +// TestDeadlockBwCloseAndReload tests the deadlock observed between Close and Reload +// functions. More details can be found in the issue https://github.com/vitessio/vitess/issues/17229#issuecomment-2476136610. +func TestDeadlockBwCloseAndReload(t *testing.T) { + cfg := newConfig(nil) + env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TestNotServingPrimary") + alias := &topodatapb.TabletAlias{ + Cell: "cell", + Uid: 1, + } + se := schema.NewEngineForTests() + // Create a new health streamer and set it to a serving primary state + hs := newHealthStreamer(env, alias, se) + hs.signalWhenSchemaChange = true + hs.Open() + hs.MakePrimary(true) + defer hs.Close() + + wg := sync.WaitGroup{} + wg.Add(2) + // Try running Close and reload in parallel multiple times. + // This reproduces the deadlock quite readily. + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + hs.Close() + hs.Open() + } + }() + + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + se.BroadcastForTesting(nil, nil, nil, true) + } + }() + + // Wait for wait group to finish. + wg.Wait() +} diff --git a/go/vt/vttablet/tabletserver/messager/engine.go b/go/vt/vttablet/tabletserver/messager/engine.go index 4204c5c0b7e..bd405d47cec 100644 --- a/go/vt/vttablet/tabletserver/messager/engine.go +++ b/go/vt/vttablet/tabletserver/messager/engine.go @@ -51,9 +51,16 @@ type VStreamer interface { // Engine is the engine for handling messages. type Engine struct { - mu sync.Mutex - isOpen bool - managers map[string]*messageManager + // mu is a mutex used to protect the isOpen variable + // and for ensuring we don't call setup functions in parallel. + mu sync.Mutex + isOpen bool + + // managersMu is a mutex used to protect the managers field. + // We require two separate mutexes, so that we don't have to acquire the same mutex + // in Close and schemaChanged which can lead to a deadlock described in https://github.com/vitessio/vitess/issues/17229. + managersMu sync.Mutex + managers map[string]*messageManager tsv TabletService se *schema.Engine @@ -75,15 +82,12 @@ func NewEngine(tsv TabletService, se *schema.Engine, vs VStreamer) *Engine { // Open starts the Engine service. func (me *Engine) Open() { me.mu.Lock() + defer me.mu.Unlock() if me.isOpen { - me.mu.Unlock() return } me.isOpen = true - me.mu.Unlock() log.Info("Messager: opening") - // Unlock before invoking RegisterNotifier because it - // obtains the same lock. me.se.RegisterNotifier("messages", me.schemaChanged, true) } @@ -101,6 +105,8 @@ func (me *Engine) Close() { log.Infof("messager Engine - unregistering notifiers") me.se.UnregisterNotifier("messages") log.Infof("messager Engine - closing all managers") + me.managersMu.Lock() + defer me.managersMu.Unlock() for _, mm := range me.managers { mm.Close() } @@ -109,8 +115,8 @@ func (me *Engine) Close() { } func (me *Engine) GetGenerator(name string) (QueryGenerator, error) { - me.mu.Lock() - defer me.mu.Unlock() + me.managersMu.Lock() + defer me.managersMu.Unlock() mm := me.managers[name] if mm == nil { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "message table %s not found in schema", name) @@ -131,6 +137,8 @@ func (me *Engine) Subscribe(ctx context.Context, name string, send func(*sqltype if !me.isOpen { return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "messager engine is closed, probably because this is not a primary any more") } + me.managersMu.Lock() + defer me.managersMu.Unlock() mm := me.managers[name] if mm == nil { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "message table %s not found", name) @@ -139,8 +147,8 @@ func (me *Engine) Subscribe(ctx context.Context, name string, send func(*sqltype } func (me *Engine) schemaChanged(tables map[string]*schema.Table, created, altered, dropped []*schema.Table) { - me.mu.Lock() - defer me.mu.Unlock() + me.managersMu.Lock() + defer me.managersMu.Unlock() for _, table := range append(dropped, altered...) { name := table.Name.String() mm := me.managers[name] diff --git a/go/vt/vttablet/tabletserver/messager/engine_test.go b/go/vt/vttablet/tabletserver/messager/engine_test.go index eda585694f1..a28af647249 100644 --- a/go/vt/vttablet/tabletserver/messager/engine_test.go +++ b/go/vt/vttablet/tabletserver/messager/engine_test.go @@ -19,6 +19,7 @@ package messager import ( "context" "reflect" + "sync" "testing" "vitess.io/vitess/go/sqltypes" @@ -156,7 +157,7 @@ func newTestEngine() *Engine { tsv := &fakeTabletServer{ Env: tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "MessagerTest"), } - se := schema.NewEngine(tsv) + se := schema.NewEngineForTests() te := NewEngine(tsv, se, newFakeVStreamer()) te.Open() return te @@ -169,3 +170,33 @@ func newEngineReceiver() (f func(qr *sqltypes.Result) error, ch chan *sqltypes.R return nil }, ch } + +// TestDeadlockBwCloseAndSchemaChange tests the deadlock observed between Close and schemaChanged +// functions. More details can be found in the issue https://github.com/vitessio/vitess/issues/17229. +func TestDeadlockBwCloseAndSchemaChange(t *testing.T) { + engine := newTestEngine() + defer engine.Close() + se := engine.se + + wg := sync.WaitGroup{} + wg.Add(2) + // Try running Close and schemaChanged in parallel multiple times. + // This reproduces the deadlock quite readily. + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + engine.Close() + engine.Open() + } + }() + + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + se.BroadcastForTesting(nil, nil, nil, true) + } + }() + + // Wait for wait group to finish. + wg.Wait() +} diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 3e9b5eabd3e..ebe2143b819 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -743,6 +743,13 @@ func (se *Engine) broadcast(created, altered, dropped []*Table) { } } +// BroadcastForTesting is meant to be a testing function that triggers a broadcast call. +func (se *Engine) BroadcastForTesting(created, altered, dropped []*Table, udfsChanged bool) { + se.mu.Lock() + defer se.mu.Unlock() + se.broadcast(created, altered, dropped, udfsChanged) +} + // GetTable returns the info for a table. func (se *Engine) GetTable(tableName sqlparser.IdentifierCS) *Table { se.mu.Lock() @@ -829,6 +836,7 @@ func NewEngineForTests() *Engine { tables: make(map[string]*Table), historian: newHistorian(false, 0, nil), env: tabletenv.NewEnv(vtenv.NewTestEnv(), tabletenv.NewDefaultConfig(), "SchemaEngineForTests"), + notifiers: make(map[string]notifier), } return se }