Skip to content

Commit

Permalink
txthrottler: further code cleanup (vitessio#12902)
Browse files Browse the repository at this point in the history
* txthrottler: further code cleanup

Signed-off-by: Tim Vaillancourt <[email protected]>

* Fix bad merge resolution

Signed-off-by: Tim Vaillancourt <[email protected]>

---------

Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed Apr 16, 2024
1 parent 95d008f commit 8a04679
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 53 deletions.
103 changes: 53 additions & 50 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ var (
throttlerFactory throttlerFactoryFunc
)

func init() {
resetTxThrottlerFactories()
}

func resetTxThrottlerFactories() {
healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck {
return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ","))
Expand All @@ -69,6 +65,10 @@ func resetTxThrottlerFactories() {
}
}

func init() {
resetTxThrottlerFactories()
}

// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler
// It is only used here to allow mocking out a throttler object.
type ThrottlerInterface interface {
Expand Down Expand Up @@ -132,14 +132,46 @@ type TxThrottler struct {
// if the TransactionThrottler is closed.
state *txThrottlerState

target *querypb.Target
target *querypb.Target
topoServer *topo.Server

// stats
throttlerRunning *stats.Gauge
requestsTotal *stats.Counter
requestsThrottled *stats.Counter
}

// txThrottlerConfig holds the parameters that need to be
// passed when constructing a TxThrottler object.
type txThrottlerConfig struct {
// enabled is true if the transaction throttler is enabled. All methods
// of a disabled transaction throttler do nothing and Throttle() always
// returns false.
enabled bool

throttlerConfig *throttlerdatapb.Configuration
// healthCheckCells stores the cell names in which running vttablets will be monitored for
// replication lag.
healthCheckCells []string

// tabletTypes stores the tablet types for throttling
tabletTypes *topoproto.TabletTypeListFlag
}

// txThrottlerState holds the state of an open TxThrottler object.
type txThrottlerState struct {
config *txThrottlerConfig

// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
// That method is required to be called in serial for each threadId.
throttleMu sync.Mutex
throttler ThrottlerInterface
stopHealthCheck context.CancelFunc

healthCheck discovery.HealthCheck
topologyWatchers []TopologyWatcherInterface
}

// NewTxThrottler tries to construct a TxThrottler from the
// relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if
// any error occurs.
Expand All @@ -151,7 +183,7 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) *TxThrottler {
log.Errorf("Error creating transaction throttler. Transaction throttling will"+
" be disabled. Error: %v", err)
// newTxThrottler with disabled config never returns an error
txThrottler, _ = newTxThrottler(env, &txThrottlerConfig{enabled: false})
txThrottler, _ = newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false})
} else {
log.Infof("Initialized transaction throttler with config: %+v", txThrottler.config)
}
Expand All @@ -165,7 +197,7 @@ func (t *TxThrottler) InitDBConfig(target *querypb.Target) {

func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*TxThrottler, error) {
if !env.Config().EnableTxThrottler {
return newTxThrottler(env, &txThrottlerConfig{enabled: false})
return newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false})
}

var throttlerConfig throttlerdatapb.Configuration
Expand All @@ -178,48 +210,15 @@ func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*TxThrott
healthCheckCells := make([]string, len(env.Config().TxThrottlerHealthCheckCells))
copy(healthCheckCells, env.Config().TxThrottlerHealthCheckCells)

return newTxThrottler(env, &txThrottlerConfig{
return newTxThrottler(env, topoServer, &txThrottlerConfig{
enabled: true,
topoServer: topoServer,
tabletTypes: env.Config().TxThrottlerTabletTypes,
throttlerConfig: &throttlerConfig,
healthCheckCells: healthCheckCells,
})
}

// txThrottlerConfig holds the parameters that need to be
// passed when constructing a TxThrottler object.
type txThrottlerConfig struct {
// enabled is true if the transaction throttler is enabled. All methods
// of a disabled transaction throttler do nothing and Throttle() always
// returns false.
enabled bool

topoServer *topo.Server
throttlerConfig *throttlerdatapb.Configuration
// healthCheckCells stores the cell names in which running vttablets will be monitored for
// replication lag.
healthCheckCells []string

// tabletTypes stores the tablet types for throttling
tabletTypes *topoproto.TabletTypeListFlag
}

// txThrottlerState holds the state of an open TxThrottler object.
type txThrottlerState struct {
config *txThrottlerConfig

// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
// That method is required to be called in serial for each threadId.
throttleMu sync.Mutex
throttler ThrottlerInterface
stopHealthCheck context.CancelFunc

healthCheck discovery.HealthCheck
topologyWatchers []TopologyWatcherInterface
}

func newTxThrottler(env tabletenv.Env, config *txThrottlerConfig) (*TxThrottler, error) {
func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrottlerConfig) (*TxThrottler, error) {
if config.enabled {
// Verify config.
err := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}.Verify()
Expand All @@ -232,6 +231,7 @@ func newTxThrottler(env tabletenv.Env, config *txThrottlerConfig) (*TxThrottler,
}
return &TxThrottler{
config: config,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"),
requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"),
Expand All @@ -248,7 +248,7 @@ func (t *TxThrottler) Open() (err error) {
}
log.Info("TxThrottler: opening")
t.throttlerRunning.Set(1)
t.state, err = newTxThrottlerState(t.config, t.target.Keyspace, t.target.Shard, t.target.Cell)
t.state, err = newTxThrottlerState(t.topoServer, t.config, t.target)
return err
}

Expand Down Expand Up @@ -276,6 +276,9 @@ func (t *TxThrottler) Throttle(priority int) (result bool) {
if !t.config.enabled {
return false
}
if t.state == nil {
return false
}

// Throttle according to both what the throttle state says, and the priority. Workloads with lower priority value
// are less likely to be throttled.
Expand All @@ -288,7 +291,7 @@ func (t *TxThrottler) Throttle(priority int) (result bool) {
return result
}

func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string) (*txThrottlerState, error) {
func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) {
maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}

t, err := throttlerFactory(
Expand All @@ -309,29 +312,29 @@ func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string
config: config,
throttler: t,
}
createTxThrottlerHealthCheck(config, result, cell)
createTxThrottlerHealthCheck(topoServer, config, result, target.Cell)

result.topologyWatchers = make(
[]TopologyWatcherInterface, 0, len(config.healthCheckCells))
for _, cell := range config.healthCheckCells {
result.topologyWatchers = append(
result.topologyWatchers,
topologyWatcherFactory(
config.topoServer,
topoServer,
result.healthCheck,
cell,
keyspace,
shard,
target.Keyspace,
target.Shard,
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency))
}
return result, nil
}

func createTxThrottlerHealthCheck(config *txThrottlerConfig, result *txThrottlerState, cell string) {
func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerConfig, result *txThrottlerState, cell string) {
ctx, cancel := context.WithCancel(context.Background())
result.stopHealthCheck = cancel
result.healthCheck = healthCheckFactory(config.topoServer, cell, config.healthCheckCells)
result.healthCheck = healthCheckFactory(topoServer, cell, config.healthCheckCells)
ch := result.healthCheck.Subscribe()
go func(ctx context.Context) {
for {
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ func TestNewTxThrottler(t *testing.T) {

{
// disabled config
throttler, err := newTxThrottler(env, &txThrottlerConfig{enabled: false})
throttler, err := newTxThrottler(env, nil, &txThrottlerConfig{enabled: false})
assert.Nil(t, err)
assert.NotNil(t, throttler)
}
{
// enabled with invalid throttler config
throttler, err := newTxThrottler(env, &txThrottlerConfig{
throttler, err := newTxThrottler(env, nil, &txThrottlerConfig{
enabled: true,
throttlerConfig: &throttlerdatapb.Configuration{},
})
Expand All @@ -169,7 +169,7 @@ func TestNewTxThrottler(t *testing.T) {
}
{
// enabled
throttler, err := newTxThrottler(env, &txThrottlerConfig{
throttler, err := newTxThrottler(env, nil, &txThrottlerConfig{
enabled: true,
healthCheckCells: []string{"cell1"},
throttlerConfig: throttler.DefaultMaxReplicationLagModuleConfig().Configuration,
Expand Down

0 comments on commit 8a04679

Please sign in to comment.