diff --git a/go/mysql/sql_error.go b/go/mysql/sql_error.go index 22cd2c2ae9e..347c1abcdad 100644 --- a/go/mysql/sql_error.go +++ b/go/mysql/sql_error.go @@ -21,6 +21,7 @@ import ( "fmt" "regexp" "strconv" + "strings" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" @@ -135,7 +136,11 @@ func mapToSQLErrorFromErrorCode(err error, msg string) *SQLError { ss = SSAccessDeniedError case vtrpcpb.Code_RESOURCE_EXHAUSTED: num = demuxResourceExhaustedErrors(err.Error()) - ss = SSClientError + // 1041 ER_OUT_OF_RESOURCES has SQLSTATE HYOOO as per https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html#error_er_out_of_resources, + // so don't override it here in that case. + if num != EROutOfResources { + ss = SSClientError + } case vtrpcpb.Code_UNIMPLEMENTED: num = ERNotSupportedYet ss = SSClientError @@ -223,6 +228,8 @@ func demuxResourceExhaustedErrors(msg string) int { switch { case isGRPCOverflowRE.Match([]byte(msg)): return ERNetPacketTooLarge + case strings.Contains(msg, "Transaction throttled"): + return EROutOfResources default: return ERTooManyUserConnections } diff --git a/go/mysql/sql_error_test.go b/go/mysql/sql_error_test.go index c6fe2f65251..e3b6edf47cc 100644 --- a/go/mysql/sql_error_test.go +++ b/go/mysql/sql_error_test.go @@ -25,7 +25,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestDumuxResourceExhaustedErrors(t *testing.T) { +func TestDemuxResourceExhaustedErrors(t *testing.T) { type testCase struct { msg string want int @@ -42,6 +42,7 @@ func TestDumuxResourceExhaustedErrors(t *testing.T) { // This should be explicitly handled by returning ERNetPacketTooLarge from the execturo directly // and therefore shouldn't need to be teased out of another error. {"in-memory row count exceeded allowed limit of 13", ERTooManyUserConnections}, + {"rpc error: code = ResourceExhausted desc = Transaction throttled", EROutOfResources}, } for _, c := range cases { @@ -151,6 +152,11 @@ func TestNewSQLErrorFromError(t *testing.T) { num: ERNoDb, ss: SSNoDB, }, + { + err: vterrors.Errorf(vtrpc.Code_RESOURCE_EXHAUSTED, "vttablet: rpc error: code = ResourceExhausted desc = Transaction throttled"), + num: EROutOfResources, + ss: SSUnknownSQLState, + }, } for _, tc := range tCases { diff --git a/go/vt/throttler/max_replication_lag_module.go b/go/vt/throttler/max_replication_lag_module.go index f8037f7f975..e1a76f89c57 100644 --- a/go/vt/throttler/max_replication_lag_module.go +++ b/go/vt/throttler/max_replication_lag_module.go @@ -301,6 +301,12 @@ func (m *MaxReplicationLagModule) recalculateRate(lagRecordNow replicationLagRec if lagRecordNow.isZero() { panic("rate recalculation was triggered with a zero replication lag record") } + + // Protect against nil stats + if lagRecordNow.Stats == nil { + return + } + now := lagRecordNow.time lagNow := lagRecordNow.lag() @@ -375,7 +381,6 @@ logResult: r.Reason += clearReason } - log.Infof("%v", r) m.results.add(r) } diff --git a/go/vt/throttler/max_replication_lag_module_test.go b/go/vt/throttler/max_replication_lag_module_test.go index f0324df192c..6379b067412 100644 --- a/go/vt/throttler/max_replication_lag_module_test.go +++ b/go/vt/throttler/max_replication_lag_module_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/discovery" @@ -83,6 +85,12 @@ func (tf *testFixture) process(lagRecord replicationLagRecord) { tf.m.processRecord(lagRecord) } +// recalculateRate does the same thing as MaxReplicationLagModule.recalculateRate() does +// for a new "lagRecord". +func (tf *testFixture) recalculateRate(lagRecord replicationLagRecord) { + tf.m.recalculateRate(lagRecord) +} + func (tf *testFixture) checkState(state state, rate int64, lastRateChange time.Time) error { if got, want := tf.m.currentState, state; got != want { return fmt.Errorf("module in wrong state. got = %v, want = %v", got, want) @@ -96,6 +104,47 @@ func (tf *testFixture) checkState(state state, rate int64, lastRateChange time.T return nil } +func TestNewMaxReplicationLagModule_recalculateRate(t *testing.T) { + testCases := []struct { + name string + lagRecord replicationLagRecord + expectPanic bool + }{ + { + name: "Zero lag", + lagRecord: replicationLagRecord{ + time: time.Time{}, + TabletHealth: discovery.TabletHealth{Stats: nil}, + }, + expectPanic: true, + }, + { + name: "nil lag record stats", + lagRecord: replicationLagRecord{ + time: time.Now(), + TabletHealth: discovery.TabletHealth{Stats: nil}, + }, + expectPanic: false, + }, + } + + for _, aTestCase := range testCases { + theCase := aTestCase + + t.Run(theCase.name, func(t *testing.T) { + t.Parallel() + + fixture, err := newTestFixtureWithMaxReplicationLag(5) + assert.NoError(t, err) + + if theCase.expectPanic { + assert.Panics(t, func() { fixture.recalculateRate(theCase.lagRecord) }) + } + }, + ) + } +} + func TestMaxReplicationLagModule_RateNotZeroWhenDisabled(t *testing.T) { tf, err := newTestFixtureWithMaxReplicationLag(ReplicationLagModuleDisabled) if err != nil {