From 8585ccb558966b3d237c79461e68c5c52730de68 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 24 Jun 2024 04:40:03 +0200 Subject: [PATCH 1/7] `txthrottler`: monitor log on `PRIMARY` only Signed-off-by: Tim Vaillancourt --- go/vt/vttablet/tabletserver/state_manager.go | 6 + .../tabletserver/txthrottler/tx_throttler.go | 138 +++++++++++------- .../txthrottler/tx_throttler_test.go | 23 ++- 3 files changed, 104 insertions(+), 63 deletions(-) diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 308f9165ba6..11da06d6d4e 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -174,6 +174,8 @@ type ( txThrottler interface { Open() error Close() + MakePrimary() + MakeNonPrimary() } onlineDDLExecutor interface { @@ -456,6 +458,7 @@ func (sm *stateManager) servePrimary() error { sm.hs.MakePrimary(true) sm.se.MakePrimary(true) sm.rt.MakePrimary() + sm.txThrottler.MakePrimary() sm.tracker.Open() // We instantly kill all stateful queries to allow for // te to quickly transition into RW, but olap and stateless @@ -482,6 +485,7 @@ func (sm *stateManager) unservePrimary() error { sm.se.MakePrimary(false) sm.hs.MakePrimary(false) sm.rt.MakePrimary() + sm.txThrottler.MakeNonPrimary() sm.setState(topodatapb.TabletType_PRIMARY, StateNotServing) return nil } @@ -498,6 +502,7 @@ func (sm *stateManager) serveNonPrimary(wantTabletType topodatapb.TabletType) er sm.tracker.Close() sm.se.MakeNonPrimary() sm.hs.MakeNonPrimary() + sm.txThrottler.MakeNonPrimary() if err := sm.connect(wantTabletType); err != nil { return err @@ -516,6 +521,7 @@ func (sm *stateManager) unserveNonPrimary(wantTabletType topodatapb.TabletType) sm.se.MakeNonPrimary() sm.hs.MakeNonPrimary() + sm.txThrottler.MakeNonPrimary() if err := sm.connect(wantTabletType); err != nil { return err diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 7cb774663a4..074095acdcd 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -67,6 +67,8 @@ type TxThrottler interface { InitDBConfig(target *querypb.Target) Open() (err error) Close() + MakePrimary() + MakeNonPrimary() Throttle(priority int, workload string) (result bool) } @@ -146,6 +148,8 @@ type txThrottler struct { } type txThrottlerState interface { + makePrimary() + makeNonPrimary() deallocateResources() StatsUpdate(tabletStats *discovery.TabletHealth) throttle() bool @@ -154,25 +158,24 @@ type txThrottlerState interface { // txThrottlerStateImpl holds the state of an open TxThrottler object. type txThrottlerStateImpl struct { config *tabletenv.TabletConfig + target *querypb.Target txThrottler *txThrottler // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). // That method is required to be called in serial for each threadId. - throttleMu sync.Mutex - throttler ThrottlerInterface - stopHealthCheck context.CancelFunc + throttleMu sync.Mutex + throttler ThrottlerInterface - healthCheck discovery.HealthCheck - healthCheckChan chan *discovery.TabletHealth - healthCheckCells []string - cellsFromTopo bool + cellsFromTopo bool + healthCheck discovery.HealthCheck + healthCheckCells []string + healthCheckChan chan *discovery.TabletHealth + lagChecksCancelFunc context.CancelFunc + maxLag int64 + wg sync.WaitGroup // tabletTypes stores the tablet types for throttling tabletTypes map[topodatapb.TabletType]bool - - maxLag int64 - done chan bool - waitForTermination sync.WaitGroup } // NewTxThrottler tries to construct a txThrottler from the relevant @@ -237,6 +240,18 @@ func (t *txThrottler) Close() { log.Info("txThrottler: closed") } +func (t *txThrottler) MakePrimary() { + if t.state != nil { + t.state.makePrimary() + } +} + +func (t *txThrottler) MakeNonPrimary() { + if t.state != nil { + t.state.makeNonPrimary() + } +} + // Throttle should be called before a new transaction is started. // It returns true if the transaction should not proceed (the caller // should back off). Throttle requires that Open() was previously called @@ -284,61 +299,57 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi tabletTypes[tabletType] = true } - state := &txThrottlerStateImpl{ - config: config, - healthCheckCells: config.TxThrottlerHealthCheckCells, - tabletTypes: tabletTypes, - throttler: t, - txThrottler: txThrottler, - done: make(chan bool, 1), - } - // get cells from topo if none defined in tabletenv config - if len(state.healthCheckCells) == 0 { + var cellsFromTopo bool + healthCheckCells := config.TxThrottlerHealthCheckCells + if len(healthCheckCells) == 0 { ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) defer cancel() - state.healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target) - state.cellsFromTopo = true + healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target) + cellsFromTopo = true } - ctx, cancel := context.WithCancel(context.Background()) - state.stopHealthCheck = cancel - state.initHealthCheckStream(txThrottler.topoServer, target) - go state.healthChecksProcessor(ctx, txThrottler.topoServer, target) - state.waitForTermination.Add(1) - go state.updateMaxLag() - - return state, nil + return &txThrottlerStateImpl{ + config: config, + cellsFromTopo: cellsFromTopo, + healthCheckCells: healthCheckCells, + tabletTypes: tabletTypes, + target: target, + throttler: t, + txThrottler: txThrottler, + }, nil } -func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) { - ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells) +func (ts *txThrottlerStateImpl) initHealthCheck() { + ts.healthCheck = healthCheckFactory(ts.txThrottler.topoServer, ts.target.Cell, ts.healthCheckCells) ts.healthCheckChan = ts.healthCheck.Subscribe() - } -func (ts *txThrottlerStateImpl) closeHealthCheckStream() { +func (ts *txThrottlerStateImpl) closeHealthCheck() { if ts.healthCheck == nil { return } - ts.stopHealthCheck() + ts.lagChecksCancelFunc() + ts.wg.Wait() ts.healthCheck.Close() + ts.healthCheck = nil } -func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { +func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context) { fetchCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() - knownCells := fetchKnownCells(fetchCtx, topoServer, target) + knownCells := fetchKnownCells(fetchCtx, ts.txThrottler.topoServer, ts.target) if !reflect.DeepEqual(knownCells, ts.healthCheckCells) { log.Info("txThrottler: restarting healthcheck stream due to topology cells update") ts.healthCheckCells = knownCells - ts.closeHealthCheckStream() - ts.initHealthCheckStream(topoServer, target) + ts.closeHealthCheck() + ts.initHealthCheck() } } -func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { +func (ts *txThrottlerStateImpl) healthCheckProcessor(ctx context.Context) { + defer ts.wg.Done() var cellsUpdateTicks <-chan time.Time if ts.cellsFromTopo { ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval) @@ -350,18 +361,40 @@ func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoS case <-ctx.Done(): return case <-cellsUpdateTicks: - ts.updateHealthCheckCells(ctx, topoServer, target) + ts.updateHealthCheckCells(ctx) case th := <-ts.healthCheckChan: ts.StatsUpdate(th) } } } +func (ts *txThrottlerStateImpl) makePrimary() { + ts.initHealthCheck() + var ctx context.Context + ctx, ts.lagChecksCancelFunc = context.WithCancel(context.Background()) + + ts.wg.Add(1) + go ts.healthCheckProcessor(ctx) + + ts.wg.Add(1) + go ts.updateMaxLag(ctx) +} + +func (ts *txThrottlerStateImpl) makeNonPrimary() { + ts.closeHealthCheck() + ts.maxLag = 0 +} + func (ts *txThrottlerStateImpl) throttle() bool { if ts.throttler == nil { log.Error("txThrottler: throttle called after deallocateResources was called") return false } + // return false if we are not watching lag + if ts.healthCheck == nil { + return false + } + // Serialize calls to ts.throttle.Throttle() ts.throttleMu.Lock() defer ts.throttleMu.Unlock() @@ -372,17 +405,17 @@ func (ts *txThrottlerStateImpl) throttle() bool { ts.throttler.Throttle(0 /* threadId */) > 0 } -func (ts *txThrottlerStateImpl) updateMaxLag() { - defer ts.waitForTermination.Done() +func (ts *txThrottlerStateImpl) updateMaxLag(ctx context.Context) { + defer ts.wg.Done() // We use half of the target lag to ensure we have enough resolution to see changes in lag below that value ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second) defer ticker.Stop() -outerloop: for { select { + case <-ctx.Done(): + return case <-ticker.C: var maxLag uint32 - for tabletType := range ts.tabletTypes { maxLagPerTabletType := ts.throttler.MaxLag(tabletType) if maxLagPerTabletType > maxLag { @@ -390,28 +423,23 @@ outerloop: } } atomic.StoreInt64(&ts.maxLag, int64(maxLag)) - case <-ts.done: - break outerloop } } } func (ts *txThrottlerStateImpl) deallocateResources() { - // Close healthcheck and topo watchers - ts.closeHealthCheckStream() - ts.healthCheck = nil + // Close healthcheck and max lag updater + ts.closeHealthCheck() - ts.done <- true - ts.waitForTermination.Wait() // After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not // to be executing, so we can safely close the throttler. ts.throttler.Close() ts.throttler = nil } -// StatsUpdate updates the health of a tablet with the given healthcheck. +// StatsUpdate updates the health of a tablet with the given healthcheck, when primary. func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.TabletHealth) { - if len(ts.tabletTypes) == 0 { + if ts.healthCheck == nil || len(ts.tabletTypes) == 0 { return } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index fe352cf96f4..66a32b7d832 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -51,6 +51,7 @@ func TestDisabledThrottler(t *testing.T) { Shard: "shard", }) assert.Nil(t, throttler.Open()) + throttler.MakePrimary() assert.False(t, throttler.Throttle(0, "some-workload")) throttlerImpl, _ := throttler.(*txThrottler) assert.Zero(t, throttlerImpl.throttlerRunning.Get()) @@ -141,9 +142,17 @@ func TestEnabledThrottler(t *testing.T) { assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes) assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) - // Stop the go routine that keeps updating the cached shard's max lag to prevent it from changing the value in a + // check .throttle() returns false when non-primary and healthCheck is nil (not watching for lag) + assert.False(t, throttlerStateImpl.throttle()) + assert.Nil(t, throttlerStateImpl.healthCheck) + + // makePrimary and confirm healthcheck starts + throttlerStateImpl.makePrimary() + assert.NotNil(t, throttlerStateImpl.healthCheck) + + // Stop the lag/healthcheck go routines that keeps updating the cached shard's max lag to prevent it from changing the value in a // way that will interfere with how we manipulate that value in our tests to evaluate different cases: - throttlerStateImpl.done <- true + throttlerStateImpl.lagChecksCancelFunc() // 1 should not throttle due to return value of underlying Throttle(), despite high lag atomic.StoreInt64(&throttlerStateImpl.maxLag, 20) @@ -240,12 +249,10 @@ type mockTxThrottlerState struct { shouldThrottle bool } -func (t *mockTxThrottlerState) deallocateResources() { - -} -func (t *mockTxThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) { - -} +func (t *mockTxThrottlerState) makePrimary() {} +func (t *mockTxThrottlerState) makeNonPrimary() {} +func (t *mockTxThrottlerState) deallocateResources() {} +func (t *mockTxThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) {} func (t *mockTxThrottlerState) throttle() bool { return t.shouldThrottle From 188aa0df91ed95bd61d7254cbe9c6684a6e5e218 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Mon, 24 Jun 2024 05:01:28 +0200 Subject: [PATCH 2/7] Add comments Signed-off-by: Tim Vaillancourt --- go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 074095acdcd..8259c3945e5 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -240,12 +240,16 @@ func (t *txThrottler) Close() { log.Info("txThrottler: closed") } +// MakePrimary performs a transition to a primary tablet. This will enable healthchecks to +// enable live replication lag state. func (t *txThrottler) MakePrimary() { if t.state != nil { t.state.makePrimary() } } +// MakePrimary performs a transition to a non-primary tablet. This disables healthchecks +// (for replication state) if they exist. func (t *txThrottler) MakeNonPrimary() { if t.state != nil { t.state.makeNonPrimary() From 8b6db41c16fa2b204ab602c9b2f7c8bb25f58a9e Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 26 Jun 2024 02:03:05 +0200 Subject: [PATCH 3/7] rename cancel func Signed-off-by: Tim Vaillancourt --- .../tabletserver/txthrottler/tx_throttler.go | 24 +++++++++---------- .../txthrottler/tx_throttler_test.go | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 8259c3945e5..11615d52af5 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -166,13 +166,13 @@ type txThrottlerStateImpl struct { throttleMu sync.Mutex throttler ThrottlerInterface - cellsFromTopo bool - healthCheck discovery.HealthCheck - healthCheckCells []string - healthCheckChan chan *discovery.TabletHealth - lagChecksCancelFunc context.CancelFunc - maxLag int64 - wg sync.WaitGroup + cellsFromTopo bool + healthCheck discovery.HealthCheck + healthCheckCancel context.CancelFunc + healthCheckCells []string + healthCheckChan chan *discovery.TabletHealth + maxLag int64 + wg sync.WaitGroup // tabletTypes stores the tablet types for throttling tabletTypes map[topodatapb.TabletType]bool @@ -241,7 +241,7 @@ func (t *txThrottler) Close() { } // MakePrimary performs a transition to a primary tablet. This will enable healthchecks to -// enable live replication lag state. +// watch the replication lag state of other tablets. func (t *txThrottler) MakePrimary() { if t.state != nil { t.state.makePrimary() @@ -249,7 +249,7 @@ func (t *txThrottler) MakePrimary() { } // MakePrimary performs a transition to a non-primary tablet. This disables healthchecks -// (for replication state) if they exist. +// for replication lag state if we were primary. func (t *txThrottler) MakeNonPrimary() { if t.state != nil { t.state.makeNonPrimary() @@ -333,10 +333,11 @@ func (ts *txThrottlerStateImpl) closeHealthCheck() { if ts.healthCheck == nil { return } - ts.lagChecksCancelFunc() + ts.healthCheckCancel() ts.wg.Wait() ts.healthCheck.Close() ts.healthCheck = nil + ts.maxLag = 0 } func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context) { @@ -375,7 +376,7 @@ func (ts *txThrottlerStateImpl) healthCheckProcessor(ctx context.Context) { func (ts *txThrottlerStateImpl) makePrimary() { ts.initHealthCheck() var ctx context.Context - ctx, ts.lagChecksCancelFunc = context.WithCancel(context.Background()) + ctx, ts.healthCheckCancel = context.WithCancel(context.Background()) ts.wg.Add(1) go ts.healthCheckProcessor(ctx) @@ -386,7 +387,6 @@ func (ts *txThrottlerStateImpl) makePrimary() { func (ts *txThrottlerStateImpl) makeNonPrimary() { ts.closeHealthCheck() - ts.maxLag = 0 } func (ts *txThrottlerStateImpl) throttle() bool { diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 66a32b7d832..da761855c38 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -152,7 +152,7 @@ func TestEnabledThrottler(t *testing.T) { // Stop the lag/healthcheck go routines that keeps updating the cached shard's max lag to prevent it from changing the value in a // way that will interfere with how we manipulate that value in our tests to evaluate different cases: - throttlerStateImpl.lagChecksCancelFunc() + throttlerStateImpl.healthCheckCancel() // 1 should not throttle due to return value of underlying Throttle(), despite high lag atomic.StoreInt64(&throttlerStateImpl.maxLag, 20) From 32ae43338bc51c0ca6077ae0f83ec75b34e5d9a1 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 20 Aug 2024 02:15:12 +0200 Subject: [PATCH 4/7] fix mockTxThrottler Signed-off-by: Tim Vaillancourt --- go/vt/vttablet/tabletserver/query_executor_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index 771d9e3479d..33a0a680799 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -1857,8 +1857,9 @@ func (m mockTxThrottler) Open() (err error) { return nil } -func (m mockTxThrottler) Close() { -} +func (m mockTxThrottler) Close() {} +func (m mockTxThrottler) MakePrimary() {} +func (m mockTxThrottler) MakeNonPrimary() {} func (m mockTxThrottler) Throttle(priority int, workload string) (result bool) { return m.throttle From e33c8aec9b8d9844394117b73206b96d3c34b0c6 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 20 Aug 2024 20:16:00 +0200 Subject: [PATCH 5/7] fix tests Signed-off-by: Tim Vaillancourt --- go/vt/vttablet/tabletserver/state_manager.go | 2 +- .../tabletserver/state_manager_test.go | 45 +++++++++++-------- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 11da06d6d4e..e231a628748 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -485,7 +485,7 @@ func (sm *stateManager) unservePrimary() error { sm.se.MakePrimary(false) sm.hs.MakePrimary(false) sm.rt.MakePrimary() - sm.txThrottler.MakeNonPrimary() + sm.txThrottler.MakePrimary() sm.setState(topodatapb.TabletType_PRIMARY, StateNotServing) return nil } diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 02896eeefe0..45257c995ad 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -77,18 +77,17 @@ func TestStateManagerServePrimary(t *testing.T) { assert.Equal(t, testNow, sm.ptsTimestamp) verifySubcomponent(t, 1, sm.watcher, testStateClosed) - verifySubcomponent(t, 2, sm.se, testStateOpen) verifySubcomponent(t, 3, sm.vstreamer, testStateOpen) verifySubcomponent(t, 4, sm.qe, testStateOpen) - verifySubcomponent(t, 5, sm.txThrottler, testStateOpen) verifySubcomponent(t, 6, sm.rt, testStatePrimary) - verifySubcomponent(t, 7, sm.tracker, testStateOpen) - verifySubcomponent(t, 8, sm.te, testStatePrimary) - verifySubcomponent(t, 9, sm.messager, testStateOpen) - verifySubcomponent(t, 10, sm.throttler, testStateOpen) - verifySubcomponent(t, 11, sm.tableGC, testStateOpen) - verifySubcomponent(t, 12, sm.ddle, testStateOpen) + verifySubcomponent(t, 7, sm.txThrottler, testStatePrimary) + verifySubcomponent(t, 8, sm.tracker, testStateOpen) + verifySubcomponent(t, 9, sm.te, testStatePrimary) + verifySubcomponent(t, 10, sm.messager, testStateOpen) + verifySubcomponent(t, 11, sm.throttler, testStateOpen) + verifySubcomponent(t, 12, sm.tableGC, testStateOpen) + verifySubcomponent(t, 13, sm.ddle, testStateOpen) assert.False(t, sm.se.(*testSchemaEngine).nonPrimary) assert.True(t, sm.se.(*testSchemaEngine).ensureCalled) @@ -109,14 +108,14 @@ func TestStateManagerServeNonPrimary(t *testing.T) { verifySubcomponent(t, 4, sm.tracker, testStateClosed) assert.True(t, sm.se.(*testSchemaEngine).nonPrimary) - verifySubcomponent(t, 5, sm.se, testStateOpen) - verifySubcomponent(t, 6, sm.vstreamer, testStateOpen) - verifySubcomponent(t, 7, sm.qe, testStateOpen) - verifySubcomponent(t, 8, sm.txThrottler, testStateOpen) - verifySubcomponent(t, 9, sm.te, testStateNonPrimary) - verifySubcomponent(t, 10, sm.rt, testStateNonPrimary) - verifySubcomponent(t, 11, sm.watcher, testStateOpen) - verifySubcomponent(t, 12, sm.throttler, testStateOpen) + verifySubcomponent(t, 6, sm.se, testStateOpen) + verifySubcomponent(t, 7, sm.vstreamer, testStateOpen) + verifySubcomponent(t, 8, sm.qe, testStateOpen) + verifySubcomponent(t, 9, sm.txThrottler, testStateOpen) + verifySubcomponent(t, 10, sm.te, testStateNonPrimary) + verifySubcomponent(t, 11, sm.rt, testStateNonPrimary) + verifySubcomponent(t, 12, sm.watcher, testStateOpen) + verifySubcomponent(t, 13, sm.throttler, testStateOpen) assert.Equal(t, topodatapb.TabletType_REPLICA, sm.target.TabletType) assert.Equal(t, StateServing, sm.state) @@ -139,9 +138,9 @@ func TestStateManagerUnservePrimary(t *testing.T) { verifySubcomponent(t, 8, sm.se, testStateOpen) verifySubcomponent(t, 9, sm.vstreamer, testStateOpen) verifySubcomponent(t, 10, sm.qe, testStateOpen) - verifySubcomponent(t, 11, sm.txThrottler, testStateOpen) verifySubcomponent(t, 12, sm.rt, testStatePrimary) + verifySubcomponent(t, 13, sm.txThrottler, testStatePrimary) assert.Equal(t, topodatapb.TabletType_PRIMARY, sm.target.TabletType) assert.Equal(t, StateNotServing, sm.state) @@ -165,7 +164,7 @@ func TestStateManagerUnserveNonPrimary(t *testing.T) { verifySubcomponent(t, 7, sm.se, testStateOpen) verifySubcomponent(t, 8, sm.vstreamer, testStateOpen) verifySubcomponent(t, 9, sm.qe, testStateOpen) - verifySubcomponent(t, 10, sm.txThrottler, testStateOpen) + verifySubcomponent(t, 10, sm.txThrottler, testStateNonPrimary) verifySubcomponent(t, 11, sm.rt, testStateNonPrimary) verifySubcomponent(t, 12, sm.watcher, testStateOpen) @@ -932,6 +931,16 @@ func (te *testTxThrottler) Close() { te.state = testStateClosed } +func (te *testTxThrottler) MakePrimary() { + te.order = order.Add(1) + te.state = testStatePrimary +} + +func (te *testTxThrottler) MakeNonPrimary() { + te.order = order.Add(1) + te.state = testStateNonPrimary +} + type testOnlineDDLExecutor struct { testOrderState } From 46155c337f7303dd84521bb3558eaa8d726d0294 Mon Sep 17 00:00:00 2001 From: "Eduardo J. Ortega U." <5791035+ejortegau@users.noreply.github.com> Date: Wed, 18 Dec 2024 15:57:47 +0100 Subject: [PATCH 6/7] Fix TxThrottler init healtcheck args Signed-off-by: Eduardo J. Ortega U. <5791035+ejortegau@users.noreply.github.com> --- .../vttablet/tabletserver/txthrottler/tx_throttler.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index bd0d1cc5f52..2413c13718d 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -329,10 +329,11 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi return state, nil } - func (ts *txThrottlerStateImpl) initHealthCheck(topoServer *topo.Server, target *querypb.Target) (err error) { ts.healthCheck, err = healthCheckFactory(ts.ctx, topoServer, target.Cell, target.Keyspace, target.Shard, ts.healthCheckCells) if err != nil { + ts.healthCheck = nil + return err } @@ -391,7 +392,13 @@ func (ts *txThrottlerStateImpl) healthChecksProcessor(topoServer *topo.Server, t } func (ts *txThrottlerStateImpl) makePrimary() { - ts.initHealthCheck() + err := ts.initHealthCheck(ts.txThrottler.topoServer, ts.target) + if err != nil { + log.Errorf("txThrottler: failed to initialize health check while attempting to make primary: %v", err) + + return + } + var ctx context.Context ctx, ts.healthCheckCancel = context.WithCancel(context.Background()) From 5d7568d521bb820ce7a3b085f91dee3aa8ff35e7 Mon Sep 17 00:00:00 2001 From: "Eduardo J. Ortega U." <5791035+ejortegau@users.noreply.github.com> Date: Wed, 18 Dec 2024 17:20:10 +0100 Subject: [PATCH 7/7] Fix StateManager unit tests Signed-off-by: Eduardo J. Ortega U. <5791035+ejortegau@users.noreply.github.com> --- go/vt/vttablet/tabletserver/state_manager.go | 2 +- .../tabletserver/state_manager_test.go | 96 ++++++++++--------- 2 files changed, 52 insertions(+), 46 deletions(-) diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 6e03cd7ab62..b069f2704e0 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -545,7 +545,7 @@ func (sm *stateManager) connect(tabletType topodatapb.TabletType, serving bool) if err := sm.qe.Open(); err != nil { return err } - return sm.txThrottler.Open() + return nil } func (sm *stateManager) unserveCommon() { diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index d06bbde4aea..1fcc6a734f3 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -80,14 +80,14 @@ func TestStateManagerServePrimary(t *testing.T) { verifySubcomponent(t, 2, sm.se, testStateOpen) verifySubcomponent(t, 3, sm.vstreamer, testStateOpen) verifySubcomponent(t, 4, sm.qe, testStateOpen) - verifySubcomponent(t, 6, sm.rt, testStatePrimary) - verifySubcomponent(t, 7, sm.txThrottler, testStatePrimary) - verifySubcomponent(t, 8, sm.tracker, testStateOpen) - verifySubcomponent(t, 9, sm.te, testStatePrimary) - verifySubcomponent(t, 10, sm.messager, testStateOpen) - verifySubcomponent(t, 11, sm.throttler, testStateOpen) - verifySubcomponent(t, 12, sm.tableGC, testStateOpen) - verifySubcomponent(t, 13, sm.ddle, testStateOpen) + verifySubcomponent(t, 5, sm.rt, testStatePrimary) + verifySubcomponent(t, 6, sm.txThrottler, testStatePrimary) + verifySubcomponent(t, 7, sm.tracker, testStateOpen) + verifySubcomponent(t, 8, sm.te, testStatePrimary) + verifySubcomponent(t, 9, sm.messager, testStateOpen) + verifySubcomponent(t, 10, sm.throttler, testStateOpen) + verifySubcomponent(t, 11, sm.tableGC, testStateOpen) + verifySubcomponent(t, 12, sm.ddle, testStateOpen) assert.False(t, sm.se.(*testSchemaEngine).nonPrimary) assert.True(t, sm.se.(*testSchemaEngine).ensureCalled) @@ -108,14 +108,14 @@ func TestStateManagerServeNonPrimary(t *testing.T) { verifySubcomponent(t, 4, sm.tracker, testStateClosed) assert.True(t, sm.se.(*testSchemaEngine).nonPrimary) + verifySubcomponent(t, 5, sm.txThrottler, testStateNonPrimary) verifySubcomponent(t, 6, sm.se, testStateOpen) verifySubcomponent(t, 7, sm.vstreamer, testStateOpen) verifySubcomponent(t, 8, sm.qe, testStateOpen) - verifySubcomponent(t, 9, sm.txThrottler, testStateOpen) - verifySubcomponent(t, 10, sm.te, testStateNonPrimary) - verifySubcomponent(t, 11, sm.rt, testStateNonPrimary) - verifySubcomponent(t, 12, sm.watcher, testStateOpen) - verifySubcomponent(t, 13, sm.throttler, testStateOpen) + verifySubcomponent(t, 9, sm.te, testStateNonPrimary) + verifySubcomponent(t, 10, sm.rt, testStateNonPrimary) + verifySubcomponent(t, 11, sm.watcher, testStateOpen) + verifySubcomponent(t, 12, sm.throttler, testStateOpen) assert.Equal(t, topodatapb.TabletType_REPLICA, sm.target.TabletType) assert.Equal(t, StateServing, sm.state) @@ -139,8 +139,8 @@ func TestStateManagerUnservePrimary(t *testing.T) { verifySubcomponent(t, 9, sm.vstreamer, testStateOpen) verifySubcomponent(t, 10, sm.qe, testStateOpen) - verifySubcomponent(t, 12, sm.rt, testStatePrimary) - verifySubcomponent(t, 13, sm.txThrottler, testStatePrimary) + verifySubcomponent(t, 11, sm.rt, testStatePrimary) + verifySubcomponent(t, 12, sm.txThrottler, testStatePrimary) assert.Equal(t, topodatapb.TabletType_PRIMARY, sm.target.TabletType) assert.Equal(t, StateNotServing, sm.state) @@ -161,10 +161,10 @@ func TestStateManagerUnserveNonPrimary(t *testing.T) { verifySubcomponent(t, 6, sm.tracker, testStateClosed) assert.True(t, sm.se.(*testSchemaEngine).nonPrimary) - verifySubcomponent(t, 7, sm.se, testStateOpen) - verifySubcomponent(t, 8, sm.vstreamer, testStateOpen) - verifySubcomponent(t, 9, sm.qe, testStateOpen) - verifySubcomponent(t, 10, sm.txThrottler, testStateNonPrimary) + verifySubcomponent(t, 7, sm.txThrottler, testStateNonPrimary) + verifySubcomponent(t, 8, sm.se, testStateOpen) + verifySubcomponent(t, 9, sm.vstreamer, testStateOpen) + verifySubcomponent(t, 10, sm.qe, testStateOpen) verifySubcomponent(t, 11, sm.rt, testStateNonPrimary) verifySubcomponent(t, 12, sm.watcher, testStateOpen) @@ -299,10 +299,10 @@ func TestStateManagerSetServingTypeNoChange(t *testing.T) { verifySubcomponent(t, 4, sm.tracker, testStateClosed) assert.True(t, sm.se.(*testSchemaEngine).nonPrimary) - verifySubcomponent(t, 5, sm.se, testStateOpen) - verifySubcomponent(t, 6, sm.vstreamer, testStateOpen) - verifySubcomponent(t, 7, sm.qe, testStateOpen) - verifySubcomponent(t, 8, sm.txThrottler, testStateOpen) + verifySubcomponent(t, 5, sm.txThrottler, testStateNonPrimary) + verifySubcomponent(t, 6, sm.se, testStateOpen) + verifySubcomponent(t, 7, sm.vstreamer, testStateOpen) + verifySubcomponent(t, 8, sm.qe, testStateOpen) verifySubcomponent(t, 9, sm.te, testStateNonPrimary) verifySubcomponent(t, 10, sm.rt, testStateNonPrimary) verifySubcomponent(t, 11, sm.watcher, testStateOpen) @@ -818,7 +818,7 @@ func (te *testSchemaEngine) EnsureConnectionAndDB(topodatapb.TabletType, bool) e } func (te *testSchemaEngine) Open() error { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateOpen return nil } @@ -832,7 +832,7 @@ func (te *testSchemaEngine) MakePrimary(serving bool) { } func (te *testSchemaEngine) Close() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateClosed } @@ -843,17 +843,17 @@ type testReplTracker struct { } func (te *testReplTracker) MakePrimary() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStatePrimary } func (te *testReplTracker) MakeNonPrimary() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateNonPrimary } func (te *testReplTracker) Close() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateClosed } @@ -868,7 +868,7 @@ type testQueryEngine struct { } func (te *testQueryEngine) Open() error { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateOpen return nil } @@ -882,7 +882,7 @@ func (te *testQueryEngine) IsMySQLReachable() error { } func (te *testQueryEngine) Close() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateClosed } @@ -891,17 +891,17 @@ type testTxEngine struct { } func (te *testTxEngine) AcceptReadWrite() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStatePrimary } func (te *testTxEngine) AcceptReadOnly() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateNonPrimary } func (te *testTxEngine) Close() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateClosed } @@ -912,12 +912,12 @@ type testSubcomponent struct { } func (te *testSubcomponent) Open() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateOpen } func (te *testSubcomponent) Close() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateClosed } @@ -926,38 +926,44 @@ type testTxThrottler struct { } func (te *testTxThrottler) Open() error { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateOpen return nil } func (te *testTxThrottler) Close() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateClosed } func (te *testTxThrottler) MakePrimary() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStatePrimary } func (te *testTxThrottler) MakeNonPrimary() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateNonPrimary } +func addOrder() int64 { + newVal := order.Add(1) + + return newVal +} + type testOnlineDDLExecutor struct { testOrderState } func (te *testOnlineDDLExecutor) Open() error { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateOpen return nil } func (te *testOnlineDDLExecutor) Close() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateClosed } @@ -966,13 +972,13 @@ type testLagThrottler struct { } func (te *testLagThrottler) Open() error { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateOpen return nil } func (te *testLagThrottler) Close() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateClosed } @@ -981,12 +987,12 @@ type testTableGC struct { } func (te *testTableGC) Open() error { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateOpen return nil } func (te *testTableGC) Close() { - te.order = order.Add(1) + te.order = addOrder() te.state = testStateClosed }