Skip to content

Commit

Permalink
Fix deadlock in messager and health streamer (#17230)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Nov 15, 2024
1 parent 810ef09 commit eace5a2
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 35 deletions.
56 changes: 33 additions & 23 deletions go/vt/vttablet/tabletserver/health_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,18 @@ type healthStreamer struct {
degradedThreshold time.Duration
unhealthyThreshold atomic.Int64

mu sync.Mutex
ctx context.Context
cancel context.CancelFunc
clients map[chan *querypb.StreamHealthResponse]struct{}
state *querypb.StreamHealthResponse
// cancelMu is a mutex used to protect the cancel variable
// and for ensuring we don't call setup functions in parallel.
cancelMu sync.Mutex
ctx context.Context
cancel context.CancelFunc

// fieldsMu is used to protect access to the fields below.
// We require two separate mutexes, so that we don't have to acquire the same mutex
// in Close and reload that can lead to a deadlock described in https://github.com/vitessio/vitess/issues/17229#issuecomment-2476136610.
fieldsMu sync.Mutex
clients map[chan *querypb.StreamHealthResponse]struct{}
state *querypb.StreamHealthResponse
// isServingPrimary stores if this tablet is currently the serving primary or not.
isServingPrimary bool

Expand Down Expand Up @@ -126,8 +133,8 @@ func (hs *healthStreamer) InitDBConfig(target *querypb.Target, cp dbconfigs.Conn
}

func (hs *healthStreamer) Open() {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.cancelMu.Lock()
defer hs.cancelMu.Unlock()

if hs.cancel != nil {
return
Expand All @@ -140,8 +147,8 @@ func (hs *healthStreamer) Open() {
}

func (hs *healthStreamer) Close() {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.cancelMu.Lock()
defer hs.cancelMu.Unlock()

if hs.cancel != nil {
hs.se.UnregisterNotifier("healthStreamer")
Expand Down Expand Up @@ -182,13 +189,16 @@ func (hs *healthStreamer) Stream(ctx context.Context, callback func(*querypb.Str
}

func (hs *healthStreamer) register() (chan *querypb.StreamHealthResponse, context.Context) {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.cancelMu.Lock()
defer hs.cancelMu.Unlock()

if hs.cancel == nil {
return nil, nil
}

hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()

ch := make(chan *querypb.StreamHealthResponse, streamHealthBufferSize)
hs.clients[ch] = struct{}{}

Expand All @@ -198,15 +208,15 @@ func (hs *healthStreamer) register() (chan *querypb.StreamHealthResponse, contex
}

func (hs *healthStreamer) unregister(ch chan *querypb.StreamHealthResponse) {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()

delete(hs.clients, ch)
}

func (hs *healthStreamer) ChangeState(tabletType topodatapb.TabletType, ptsTimestamp time.Time, lag time.Duration, err error, serving bool) {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()

hs.state.Target.TabletType = tabletType
if tabletType == topodatapb.TabletType_PRIMARY {
Expand Down Expand Up @@ -260,8 +270,8 @@ func (hs *healthStreamer) broadCastToClients(shr *querypb.StreamHealthResponse)
}

func (hs *healthStreamer) AppendDetails(details []*kv) []*kv {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()
if hs.state.Target.TabletType == topodatapb.TabletType_PRIMARY {
return details
}
Expand Down Expand Up @@ -306,8 +316,8 @@ func (hs *healthStreamer) SetUnhealthyThreshold(v time.Duration) {
// MakePrimary tells the healthstreamer that the current tablet is now the primary,
// so it can read and write to the MySQL instance for schema-tracking.
func (hs *healthStreamer) MakePrimary(serving bool) {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()
hs.isServingPrimary = serving
// We register for notifications from the schema Engine only when schema tracking is enabled,
// and we are going to a serving primary state.
Expand All @@ -322,15 +332,15 @@ func (hs *healthStreamer) MakePrimary(serving bool) {

// MakeNonPrimary tells the healthstreamer that the current tablet is now not a primary.
func (hs *healthStreamer) MakeNonPrimary() {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()
hs.isServingPrimary = false
}

// reload reloads the schema from the underlying mysql for the tables that we get the alert on.
func (hs *healthStreamer) reload(created, altered, dropped []*schema.Table, udfsChanged bool) error {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.fieldsMu.Lock()
defer hs.fieldsMu.Unlock()
// Schema Reload to happen only on primary when it is serving.
// We can be in a state when the primary is not serving after we have run DemotePrimary. In that case,
// we don't want to run any queries in MySQL, so we shouldn't reload anything in the healthStreamer.
Expand Down
40 changes: 40 additions & 0 deletions go/vt/vttablet/tabletserver/health_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,3 +566,43 @@ func testStream(hs *healthStreamer) (<-chan *querypb.StreamHealthResponse, conte
func testBlpFunc() (int64, int32) {
return 1, 2
}

// TestDeadlockBwCloseAndReload tests the deadlock observed between Close and Reload
// functions. More details can be found in the issue https://github.com/vitessio/vitess/issues/17229#issuecomment-2476136610.
func TestDeadlockBwCloseAndReload(t *testing.T) {
cfg := newConfig(nil)
env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TestNotServingPrimary")
alias := &topodatapb.TabletAlias{
Cell: "cell",
Uid: 1,
}
se := schema.NewEngineForTests()
// Create a new health streamer and set it to a serving primary state
hs := newHealthStreamer(env, alias, se)
hs.signalWhenSchemaChange = true
hs.Open()
hs.MakePrimary(true)
defer hs.Close()

wg := sync.WaitGroup{}
wg.Add(2)
// Try running Close and reload in parallel multiple times.
// This reproduces the deadlock quite readily.
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
hs.Close()
hs.Open()
}
}()

go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
se.BroadcastForTesting(nil, nil, nil, true)
}
}()

// Wait for wait group to finish.
wg.Wait()
}
30 changes: 19 additions & 11 deletions go/vt/vttablet/tabletserver/messager/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,16 @@ type VStreamer interface {

// Engine is the engine for handling messages.
type Engine struct {
mu sync.Mutex
isOpen bool
managers map[string]*messageManager
// mu is a mutex used to protect the isOpen variable
// and for ensuring we don't call setup functions in parallel.
mu sync.Mutex
isOpen bool

// managersMu is a mutex used to protect the managers field.
// We require two separate mutexes, so that we don't have to acquire the same mutex
// in Close and schemaChanged which can lead to a deadlock described in https://github.com/vitessio/vitess/issues/17229.
managersMu sync.Mutex
managers map[string]*messageManager

tsv TabletService
se *schema.Engine
Expand All @@ -75,15 +82,12 @@ func NewEngine(tsv TabletService, se *schema.Engine, vs VStreamer) *Engine {
// Open starts the Engine service.
func (me *Engine) Open() {
me.mu.Lock()
defer me.mu.Unlock()
if me.isOpen {
me.mu.Unlock()
return
}
me.isOpen = true
me.mu.Unlock()
log.Info("Messager: opening")
// Unlock before invoking RegisterNotifier because it
// obtains the same lock.
me.se.RegisterNotifier("messages", me.schemaChanged, true)
}

Expand All @@ -101,6 +105,8 @@ func (me *Engine) Close() {
log.Infof("messager Engine - unregistering notifiers")
me.se.UnregisterNotifier("messages")
log.Infof("messager Engine - closing all managers")
me.managersMu.Lock()
defer me.managersMu.Unlock()
for _, mm := range me.managers {
mm.Close()
}
Expand All @@ -109,8 +115,8 @@ func (me *Engine) Close() {
}

func (me *Engine) GetGenerator(name string) (QueryGenerator, error) {
me.mu.Lock()
defer me.mu.Unlock()
me.managersMu.Lock()
defer me.managersMu.Unlock()
mm := me.managers[name]
if mm == nil {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "message table %s not found in schema", name)
Expand All @@ -131,6 +137,8 @@ func (me *Engine) Subscribe(ctx context.Context, name string, send func(*sqltype
if !me.isOpen {
return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "messager engine is closed, probably because this is not a primary any more")
}
me.managersMu.Lock()
defer me.managersMu.Unlock()
mm := me.managers[name]
if mm == nil {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "message table %s not found", name)
Expand All @@ -139,8 +147,8 @@ func (me *Engine) Subscribe(ctx context.Context, name string, send func(*sqltype
}

func (me *Engine) schemaChanged(tables map[string]*schema.Table, created, altered, dropped []*schema.Table, _ bool) {
me.mu.Lock()
defer me.mu.Unlock()
me.managersMu.Lock()
defer me.managersMu.Unlock()
for _, table := range append(dropped, altered...) {
name := table.Name.String()
mm := me.managers[name]
Expand Down
33 changes: 32 additions & 1 deletion go/vt/vttablet/tabletserver/messager/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package messager
import (
"context"
"reflect"
"sync"
"testing"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -156,7 +157,7 @@ func newTestEngine() *Engine {
tsv := &fakeTabletServer{
Env: tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "MessagerTest"),
}
se := schema.NewEngine(tsv)
se := schema.NewEngineForTests()
te := NewEngine(tsv, se, newFakeVStreamer())
te.Open()
return te
Expand All @@ -169,3 +170,33 @@ func newEngineReceiver() (f func(qr *sqltypes.Result) error, ch chan *sqltypes.R
return nil
}, ch
}

// TestDeadlockBwCloseAndSchemaChange tests the deadlock observed between Close and schemaChanged
// functions. More details can be found in the issue https://github.com/vitessio/vitess/issues/17229.
func TestDeadlockBwCloseAndSchemaChange(t *testing.T) {
engine := newTestEngine()
defer engine.Close()
se := engine.se

wg := sync.WaitGroup{}
wg.Add(2)
// Try running Close and schemaChanged in parallel multiple times.
// This reproduces the deadlock quite readily.
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
engine.Close()
engine.Open()
}
}()

go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
se.BroadcastForTesting(nil, nil, nil, true)
}
}()

// Wait for wait group to finish.
wg.Wait()
}
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,13 @@ func (se *Engine) broadcast(created, altered, dropped []*Table, udfsChanged bool
}
}

// BroadcastForTesting is meant to be a testing function that triggers a broadcast call.
func (se *Engine) BroadcastForTesting(created, altered, dropped []*Table, udfsChanged bool) {
se.mu.Lock()
defer se.mu.Unlock()
se.broadcast(created, altered, dropped, udfsChanged)
}

// GetTable returns the info for a table.
func (se *Engine) GetTable(tableName sqlparser.IdentifierCS) *Table {
se.mu.Lock()
Expand Down Expand Up @@ -886,6 +893,7 @@ func NewEngineForTests() *Engine {
tables: make(map[string]*Table),
historian: newHistorian(false, 0, nil),
env: tabletenv.NewEnv(vtenv.NewTestEnv(), tabletenv.NewDefaultConfig(), "SchemaEngineForTests"),
notifiers: make(map[string]notifier),
}
return se
}
Expand Down

0 comments on commit eace5a2

Please sign in to comment.