From 1bd34db54d9e8af3c8ed5667236bc1da7d0bd785 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Wed, 8 Nov 2023 14:52:08 +0000 Subject: [PATCH 1/7] Add new `KillWithContext` function. Signed-off-by: Arthur Schreiber --- .../vttablet/tabletserver/connpool/dbconn.go | 42 +++++++++++++++---- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn.go b/go/vt/vttablet/tabletserver/connpool/dbconn.go index 63f4c73520e..3d3f866b180 100644 --- a/go/vt/vttablet/tabletserver/connpool/dbconn.go +++ b/go/vt/vttablet/tabletserver/connpool/dbconn.go @@ -362,10 +362,19 @@ func (dbc *Conn) IsClosed() bool { return dbc.conn.IsClosed() } +// Kill wraps KillWithContext using context.Background. +func (dbc *Conn) Kill(reason string, elapsed time.Duration) error { + return dbc.KillWithContext(context.Background(), reason, elapsed) +} + // Kill kills the currently executing query both on MySQL side // and on the connection side. If no query is executing, it's a no-op. // Kill will also not kill a query more than once. -func (dbc *Conn) Kill(reason string, elapsed time.Duration) error { +func (dbc *Conn) KillWithContext(ctx context.Context, reason string, elapsed time.Duration) error { + if cause := context.Cause(ctx); cause != nil { + return cause + } + dbc.stats.KillCounters.Add("Queries", 1) log.Infof("Due to %s, elapsed time: %v, killing query ID %v %s", reason, elapsed, dbc.conn.ID(), dbc.CurrentForLogging()) @@ -376,20 +385,37 @@ func (dbc *Conn) Kill(reason string, elapsed time.Duration) error { dbc.conn.Close() // Server side action. Kill the session. - killConn, err := dbc.dbaPool.Get(context.TODO()) + killConn, err := dbc.dbaPool.Get(ctx) if err != nil { log.Warningf("Failed to get conn from dba pool: %v", err) return err } defer killConn.Recycle() - sql := fmt.Sprintf("kill %d", dbc.conn.ID()) - _, err = killConn.Conn.ExecuteFetch(sql, 10000, false) - if err != nil { - log.Errorf("Could not kill query ID %v %s: %v", dbc.conn.ID(), - dbc.CurrentForLogging(), err) + + errChan := make(chan error, 1) + resultChan := make(chan *sqltypes.Result, 1) + + go func() { + sql := fmt.Sprintf("kill %d", dbc.conn.ID()) + // TODO: Allow pushing ctx down to ExecuteFetch. + result, err := killConn.Conn.ExecuteFetch(sql, 10000, false) + if err != nil { + errChan <- err + } else { + resultChan <- result + } + }() + + select { + case <-ctx.Done(): + killConn.Close() + return context.Cause(ctx) + case err := <-errChan: + log.Errorf("Could not kill query ID %v %s: %v", dbc.conn.ID(), dbc.CurrentForLogging(), err) return err + case <-resultChan: + return nil } - return nil } // Current returns the currently executing query. From a0f8d667ed57f83f900beb38ee39b95fc00b1712 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Thu, 9 Nov 2023 09:59:51 +0000 Subject: [PATCH 2/7] Remove `setDeadline`. Signed-off-by: Arthur Schreiber --- .../vttablet/tabletserver/connpool/dbconn.go | 107 +++++++++--------- 1 file changed, 53 insertions(+), 54 deletions(-) diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn.go b/go/vt/vttablet/tabletserver/connpool/dbconn.go index 3d3f866b180..ab5b69535d4 100644 --- a/go/vt/vttablet/tabletserver/connpool/dbconn.go +++ b/go/vt/vttablet/tabletserver/connpool/dbconn.go @@ -163,17 +163,38 @@ func (dbc *Conn) execOnce(ctx context.Context, query string, maxrows int, wantfi defer dbc.stats.MySQLTimings.Record("Exec", time.Now()) - done, wg := dbc.setDeadline(ctx) - qr, err := dbc.conn.ExecuteFetch(query, maxrows, wantfields) + resultChan := make(chan *sqltypes.Result, 1) + errChan := make(chan error, 1) + + startTime := time.Now() + go func() { + result, err := dbc.conn.ExecuteFetch(query, maxrows, wantfields) + if err != nil { + errChan <- err + } else { + resultChan <- result + } + }() - if done != nil { - close(done) - wg.Wait() + var err error + var result *sqltypes.Result + + select { + case <-ctx.Done(): + killCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(startTime)) + return nil, dbc.Err() + case err = <-errChan: + case result = <-resultChan: } - if dbcerr := dbc.Err(); dbcerr != nil { - return nil, dbcerr + + if dbcErr := dbc.Err(); dbcErr != nil { + return nil, dbcErr } - return qr, err + + return result, err } // ExecOnce executes the specified query, but does not retry on connection errors. @@ -254,16 +275,29 @@ func (dbc *Conn) streamOnce(ctx context.Context, query string, callback func(*sq dbc.current.Store(query) defer dbc.current.Store("") - done, wg := dbc.setDeadline(ctx) - err := dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize) + errChan := make(chan error, 1) + startTime := time.Now() + + go func() { + errChan <- dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize) + }() - if done != nil { - close(done) - wg.Wait() + var err error + + select { + case <-ctx.Done(): + killCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(startTime)) + return dbc.Err() + case err = <-errChan: } - if dbcerr := dbc.Err(); dbcerr != nil { - return dbcerr + + if dbcErr := dbc.Err(); dbcErr != nil { + return dbcErr } + return err } @@ -409,6 +443,10 @@ func (dbc *Conn) KillWithContext(ctx context.Context, reason string, elapsed tim select { case <-ctx.Done(): killConn.Close() + + dbc.stats.InternalErrors.Add("HungQuery", 1) + log.Warningf("Query may be hung: %s", dbc.CurrentForLogging()) + return context.Cause(ctx) case err := <-errChan: log.Errorf("Could not kill query ID %v %s: %v", dbc.conn.ID(), dbc.CurrentForLogging(), err) @@ -462,45 +500,6 @@ func (dbc *Conn) Reconnect(ctx context.Context) error { return nil } -// setDeadline starts a goroutine that will kill the currently executing query -// if the deadline is exceeded. It returns a channel and a waitgroup. After the -// query is done executing, the caller is required to close the done channel -// and wait for the waitgroup to make sure that the necessary cleanup is done. -func (dbc *Conn) setDeadline(ctx context.Context) (chan bool, *sync.WaitGroup) { - if ctx.Done() == nil { - return nil, nil - } - done := make(chan bool) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - startTime := time.Now() - select { - case <-ctx.Done(): - dbc.Kill(ctx.Err().Error(), time.Since(startTime)) - case <-done: - return - } - elapsed := time.Since(startTime) - - // Give 2x the elapsed time and some buffer as grace period - // for the query to get killed. - tmr2 := time.NewTimer(2*elapsed + 5*time.Second) - defer tmr2.Stop() - select { - case <-tmr2.C: - dbc.stats.InternalErrors.Add("HungQuery", 1) - log.Warningf("Query may be hung: %s", dbc.CurrentForLogging()) - case <-done: - return - } - <-done - log.Warningf("Hung query returned") - }() - return done, &wg -} - // CurrentForLogging applies transformations to the query making it suitable to log. // It applies sanitization rules based on tablet settings and limits the max length of // queries. From c4af4264791caf084684280afce009d9f60415f8 Mon Sep 17 00:00:00 2001 From: Vicent Marti Date: Mon, 20 Nov 2023 10:20:59 +0100 Subject: [PATCH 3/7] dbconn: reduce the number of channels Signed-off-by: Vicent Marti --- .../vttablet/tabletserver/connpool/dbconn.go | 103 ++++++++---------- 1 file changed, 43 insertions(+), 60 deletions(-) diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn.go b/go/vt/vttablet/tabletserver/connpool/dbconn.go index ab5b69535d4..5632c651e84 100644 --- a/go/vt/vttablet/tabletserver/connpool/dbconn.go +++ b/go/vt/vttablet/tabletserver/connpool/dbconn.go @@ -52,7 +52,7 @@ type Conn struct { env tabletenv.Env dbaPool *dbconnpool.ConnectionPool stats *tabletenv.Stats - current atomic.Value + current atomic.Pointer[string] // err will be set if a query is killed through a Kill. errmu sync.Mutex @@ -76,7 +76,6 @@ func newPooledConn(ctx context.Context, pool *Pool, appParams dbconfigs.Connecto stats: pool.env.Stats(), dbaPool: pool.dbaPool, } - db.current.Store("") return db, nil } @@ -91,7 +90,6 @@ func NewConn(ctx context.Context, params dbconfigs.Connector, dbaPool *dbconnpoo dbaPool: dbaPool, stats: tabletenv.NewStats(servenv.NewExporter("Temp", "Tablet")), } - dbconn.current.Store("") if setting == nil { return dbconn, nil } @@ -152,8 +150,8 @@ func (dbc *Conn) Exec(ctx context.Context, query string, maxrows int, wantfields } func (dbc *Conn) execOnce(ctx context.Context, query string, maxrows int, wantfields bool) (*sqltypes.Result, error) { - dbc.current.Store(query) - defer dbc.current.Store("") + dbc.current.Store(&query) + defer dbc.current.Store(nil) // Check if the context is already past its deadline before // trying to execute the query. @@ -161,40 +159,33 @@ func (dbc *Conn) execOnce(ctx context.Context, query string, maxrows int, wantfi return nil, fmt.Errorf("%v before execution started", err) } - defer dbc.stats.MySQLTimings.Record("Exec", time.Now()) + now := time.Now() + defer dbc.stats.MySQLTimings.Record("Exec", now) - resultChan := make(chan *sqltypes.Result, 1) - errChan := make(chan error, 1) + type execResult struct { + result *sqltypes.Result + err error + } - startTime := time.Now() + ch := make(chan execResult) go func() { result, err := dbc.conn.ExecuteFetch(query, maxrows, wantfields) - if err != nil { - errChan <- err - } else { - resultChan <- result - } + ch <- execResult{result, err} }() - var err error - var result *sqltypes.Result - select { case <-ctx.Done(): killCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(startTime)) + _ = dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(now)) return nil, dbc.Err() - case err = <-errChan: - case result = <-resultChan: - } - - if dbcErr := dbc.Err(); dbcErr != nil { - return nil, dbcErr + case r := <-ch: + if dbcErr := dbc.Err(); dbcErr != nil { + return nil, dbcErr + } + return r.result, r.err } - - return result, err } // ExecOnce executes the specified query, but does not retry on connection errors. @@ -270,35 +261,30 @@ func (dbc *Conn) Stream(ctx context.Context, query string, callback func(*sqltyp } func (dbc *Conn) streamOnce(ctx context.Context, query string, callback func(*sqltypes.Result) error, alloc func() *sqltypes.Result, streamBufferSize int) error { - defer dbc.stats.MySQLTimings.Record("ExecStream", time.Now()) - - dbc.current.Store(query) - defer dbc.current.Store("") + dbc.current.Store(&query) + defer dbc.current.Store(nil) - errChan := make(chan error, 1) - startTime := time.Now() + now := time.Now() + defer dbc.stats.MySQLTimings.Record("ExecStream", now) + ch := make(chan error) go func() { - errChan <- dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize) + ch <- dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize) }() - var err error - select { case <-ctx.Done(): killCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(startTime)) + _ = dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(now)) return dbc.Err() - case err = <-errChan: - } - - if dbcErr := dbc.Err(); dbcErr != nil { - return dbcErr + case err := <-ch: + if dbcErr := dbc.Err(); dbcErr != nil { + return dbcErr + } + return err } - - return err } // StreamOnce executes the query and streams the results. But, does not retry on connection errors. @@ -401,7 +387,7 @@ func (dbc *Conn) Kill(reason string, elapsed time.Duration) error { return dbc.KillWithContext(context.Background(), reason, elapsed) } -// Kill kills the currently executing query both on MySQL side +// KillWithContext kills the currently executing query both on MySQL side // and on the connection side. If no query is executing, it's a no-op. // Kill will also not kill a query more than once. func (dbc *Conn) KillWithContext(ctx context.Context, reason string, elapsed time.Duration) error { @@ -426,18 +412,11 @@ func (dbc *Conn) KillWithContext(ctx context.Context, reason string, elapsed tim } defer killConn.Recycle() - errChan := make(chan error, 1) - resultChan := make(chan *sqltypes.Result, 1) - + ch := make(chan error) + sql := fmt.Sprintf("kill %d", dbc.conn.ID()) go func() { - sql := fmt.Sprintf("kill %d", dbc.conn.ID()) - // TODO: Allow pushing ctx down to ExecuteFetch. - result, err := killConn.Conn.ExecuteFetch(sql, 10000, false) - if err != nil { - errChan <- err - } else { - resultChan <- result - } + _, err := killConn.Conn.ExecuteFetch(sql, -1, false) + ch <- err }() select { @@ -448,17 +427,21 @@ func (dbc *Conn) KillWithContext(ctx context.Context, reason string, elapsed tim log.Warningf("Query may be hung: %s", dbc.CurrentForLogging()) return context.Cause(ctx) - case err := <-errChan: - log.Errorf("Could not kill query ID %v %s: %v", dbc.conn.ID(), dbc.CurrentForLogging(), err) - return err - case <-resultChan: + case err := <-ch: + if err != nil { + log.Errorf("Could not kill query ID %v %s: %v", dbc.conn.ID(), dbc.CurrentForLogging(), err) + return err + } return nil } } // Current returns the currently executing query. func (dbc *Conn) Current() string { - return dbc.current.Load().(string) + if q := dbc.current.Load(); q != nil { + return *q + } + return "" } // ID returns the connection id. From b7e6d3d6131c3a9d01456ed83358293a0cbddf2b Mon Sep 17 00:00:00 2001 From: Daniel Joos Date: Wed, 21 Feb 2024 11:55:15 +0100 Subject: [PATCH 4/7] Add tests for `KillWithContext` logic Signed-off-by: Daniel Joos --- go/mysql/fakesqldb/server.go | 9 +- .../vttablet/tabletserver/connpool/dbconn.go | 24 +++-- .../tabletserver/connpool/dbconn_test.go | 98 +++++++++++++++++++ 3 files changed, 121 insertions(+), 10 deletions(-) diff --git a/go/mysql/fakesqldb/server.go b/go/mysql/fakesqldb/server.go index cb3d20ae04b..4dc71dd3662 100644 --- a/go/mysql/fakesqldb/server.go +++ b/go/mysql/fakesqldb/server.go @@ -376,11 +376,11 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R } key := strings.ToLower(query) db.mu.Lock() - defer db.mu.Unlock() db.queryCalled[key]++ db.querylog = append(db.querylog, key) // Check if we should close the connection and provoke errno 2013. if db.shouldClose.Load() { + defer db.mu.Unlock() c.Close() // log error @@ -394,6 +394,8 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R // The driver may send this at connection time, and we don't want it to // interfere. if key == "set names utf8" || strings.HasPrefix(key, "set collation_connection = ") { + defer db.mu.Unlock() + // log error if err := callback(&sqltypes.Result{}); err != nil { log.Errorf("callback failed : %v", err) @@ -403,12 +405,14 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R // check if we should reject it. if err, ok := db.rejectedData[key]; ok { + db.mu.Unlock() return err } // Check explicit queries from AddQuery(). result, ok := db.data[key] if ok { + db.mu.Unlock() if f := result.BeforeFunc; f != nil { f() } @@ -419,6 +423,7 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R for _, pat := range db.patternData { if pat.expr.MatchString(query) { userCallback, ok := db.queryPatternUserCallback[pat.expr] + db.mu.Unlock() if ok { userCallback(query) } @@ -429,6 +434,8 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R } } + defer db.mu.Unlock() + if db.neverFail.Load() { return callback(&sqltypes.Result{}) } diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn.go b/go/vt/vttablet/tabletserver/connpool/dbconn.go index 5632c651e84..b39299e49e6 100644 --- a/go/vt/vttablet/tabletserver/connpool/dbconn.go +++ b/go/vt/vttablet/tabletserver/connpool/dbconn.go @@ -40,6 +40,8 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) +const defaultKillTimeout = 5 * time.Second + // Conn is a db connection for tabletserver. // It performs automatic reconnects as needed. // Its Execute function has a timeout that can kill @@ -57,6 +59,8 @@ type Conn struct { // err will be set if a query is killed through a Kill. errmu sync.Mutex err error + + killTimeout time.Duration } // NewConnection creates a new DBConn. It triggers a CheckMySQL if creation fails. @@ -71,10 +75,11 @@ func newPooledConn(ctx context.Context, pool *Pool, appParams dbconfigs.Connecto return nil, err } db := &Conn{ - conn: c, - env: pool.env, - stats: pool.env.Stats(), - dbaPool: pool.dbaPool, + conn: c, + env: pool.env, + stats: pool.env.Stats(), + dbaPool: pool.dbaPool, + killTimeout: defaultKillTimeout, } return db, nil } @@ -86,9 +91,10 @@ func NewConn(ctx context.Context, params dbconfigs.Connector, dbaPool *dbconnpoo return nil, err } dbconn := &Conn{ - conn: c, - dbaPool: dbaPool, - stats: tabletenv.NewStats(servenv.NewExporter("Temp", "Tablet")), + conn: c, + dbaPool: dbaPool, + stats: tabletenv.NewStats(servenv.NewExporter("Temp", "Tablet")), + killTimeout: defaultKillTimeout, } if setting == nil { return dbconn, nil @@ -175,7 +181,7 @@ func (dbc *Conn) execOnce(ctx context.Context, query string, maxrows int, wantfi select { case <-ctx.Done(): - killCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + killCtx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout) defer cancel() _ = dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(now)) @@ -274,7 +280,7 @@ func (dbc *Conn) streamOnce(ctx context.Context, query string, callback func(*sq select { case <-ctx.Done(): - killCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + killCtx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout) defer cancel() _ = dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(now)) diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn_test.go b/go/vt/vttablet/tabletserver/connpool/dbconn_test.go index 9717c95d9f7..b0e10c4c0d8 100644 --- a/go/vt/vttablet/tabletserver/connpool/dbconn_test.go +++ b/go/vt/vttablet/tabletserver/connpool/dbconn_test.go @@ -33,6 +33,8 @@ import ( "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" ) func compareTimingCounts(t *testing.T, op string, delta int64, before, after map[string]int64) { @@ -291,6 +293,57 @@ func TestDBConnKill(t *testing.T) { } } +func TestDBKillWithContext(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + connPool := newPool() + connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) + defer connPool.Close() + dbConn, err := newPooledConn(context.Background(), connPool, db.ConnParams()) + if dbConn != nil { + defer dbConn.Close() + } + require.NoError(t, err) + + query := fmt.Sprintf("kill %d", dbConn.ID()) + db.AddQuery(query, &sqltypes.Result{}) + db.SetBeforeFunc(query, func() { + // should take longer than our context deadline below. + time.Sleep(200 * time.Millisecond) + }) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + // KillWithContext should return context.DeadlineExceeded + err = dbConn.KillWithContext(ctx, "test kill", 0) + require.ErrorIs(t, err, context.DeadlineExceeded) +} + +func TestDBKillWithContextDoneContext(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + connPool := newPool() + connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) + defer connPool.Close() + dbConn, err := newPooledConn(context.Background(), connPool, db.ConnParams()) + if dbConn != nil { + defer dbConn.Close() + } + require.NoError(t, err) + + query := fmt.Sprintf("kill %d", dbConn.ID()) + db.AddRejectedQuery(query, errors.New("rejected")) + + contextErr := errors.New("context error") + ctx, cancel := context.WithCancelCause(context.Background()) + cancel(contextErr) // cancel the context immediately + + // KillWithContext should return the cancellation cause + err = dbConn.KillWithContext(ctx, "test kill", 0) + require.ErrorIs(t, err, contextErr) +} + // TestDBConnClose tests that an Exec returns immediately if a connection // is asynchronously killed (and closed) in the middle of an execution. func TestDBConnClose(t *testing.T) { @@ -519,3 +572,48 @@ func TestDBConnReApplySetting(t *testing.T) { db.VerifyAllExecutedOrFail() } + +func TestDBExecOnceKillTimeout(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + connPool := newPool() + connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) + defer connPool.Close() + dbConn, err := newPooledConn(context.Background(), connPool, db.ConnParams()) + if dbConn != nil { + defer dbConn.Close() + } + require.NoError(t, err) + + // A very long running query that will be killed. + expectedQuery := "select 1" + var timestampQuery time.Time + db.AddQuery(expectedQuery, &sqltypes.Result{}) + db.SetBeforeFunc(expectedQuery, func() { + timestampQuery = time.Now() + // should take longer than our context deadline below. + time.Sleep(1000 * time.Millisecond) + }) + + // We expect a kill-query to be fired, too. + // It should also run into a timeout. + var timestampKill time.Time + dbConn.killTimeout = 100 * time.Millisecond + db.AddQueryPatternWithCallback(`kill \d+`, &sqltypes.Result{}, func(string) { + timestampKill = time.Now() + // should take longer than the configured kill timeout above. + time.Sleep(200 * time.Millisecond) + }) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + result, err := dbConn.ExecOnce(ctx, "select 1", 1, false) + timestampDone := time.Now() + + require.Error(t, err) + require.Equal(t, vtrpcpb.Code_CANCELED, vterrors.Code(err)) + require.Nil(t, result) + require.WithinDuration(t, timestampQuery, timestampKill, 150*time.Millisecond) + require.WithinDuration(t, timestampKill, timestampDone, 150*time.Millisecond) +} From db9c819929004bbc594d2d7044242eb4804021f9 Mon Sep 17 00:00:00 2001 From: Daniel Joos Date: Wed, 21 Feb 2024 12:20:29 +0100 Subject: [PATCH 5/7] Adapt new tests to upstream changes Signed-off-by: Daniel Joos --- go/mysql/fakesqldb/server.go | 4 ---- .../vttablet/tabletserver/connpool/dbconn_test.go | 15 +++++++++------ 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/go/mysql/fakesqldb/server.go b/go/mysql/fakesqldb/server.go index d42f21b7007..d4ad5ad9dac 100644 --- a/go/mysql/fakesqldb/server.go +++ b/go/mysql/fakesqldb/server.go @@ -425,11 +425,7 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R userCallback, ok := db.queryPatternUserCallback[pat.expr] db.mu.Unlock() if ok { - // Since the user call back can be indefinitely stuck, we shouldn't hold the lock indefinitely. - // This is only test code, so no actual cause for concern. - db.mu.Unlock() userCallback(query) - db.mu.Lock() } if pat.err != "" { return fmt.Errorf(pat.err) diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn_test.go b/go/vt/vttablet/tabletserver/connpool/dbconn_test.go index 09264c51203..8d0d0fa7935 100644 --- a/go/vt/vttablet/tabletserver/connpool/dbconn_test.go +++ b/go/vt/vttablet/tabletserver/connpool/dbconn_test.go @@ -303,9 +303,10 @@ func TestDBKillWithContext(t *testing.T) { db := fakesqldb.New(t) defer db.Close() connPool := newPool() - connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) + params := dbconfigs.New(db.ConnParams()) + connPool.Open(params, params, params) defer connPool.Close() - dbConn, err := newPooledConn(context.Background(), connPool, db.ConnParams()) + dbConn, err := newPooledConn(context.Background(), connPool, params) if dbConn != nil { defer dbConn.Close() } @@ -330,9 +331,10 @@ func TestDBKillWithContextDoneContext(t *testing.T) { db := fakesqldb.New(t) defer db.Close() connPool := newPool() - connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) + params := dbconfigs.New(db.ConnParams()) + connPool.Open(params, params, params) defer connPool.Close() - dbConn, err := newPooledConn(context.Background(), connPool, db.ConnParams()) + dbConn, err := newPooledConn(context.Background(), connPool, params) if dbConn != nil { defer dbConn.Close() } @@ -589,9 +591,10 @@ func TestDBExecOnceKillTimeout(t *testing.T) { db := fakesqldb.New(t) defer db.Close() connPool := newPool() - connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) + params := dbconfigs.New(db.ConnParams()) + connPool.Open(params, params, params) defer connPool.Close() - dbConn, err := newPooledConn(context.Background(), connPool, db.ConnParams()) + dbConn, err := newPooledConn(context.Background(), connPool, params) if dbConn != nil { defer dbConn.Close() } From ee52f635bdabbf9dc3cefaa62d1893052e3bfcf5 Mon Sep 17 00:00:00 2001 From: Daniel Joos Date: Wed, 28 Feb 2024 17:07:11 +0000 Subject: [PATCH 6/7] Fix detected race in dbconn_test.go Signed-off-by: Daniel Joos --- .../tabletserver/connpool/dbconn_test.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn_test.go b/go/vt/vttablet/tabletserver/connpool/dbconn_test.go index 8d0d0fa7935..09a85a3e11a 100644 --- a/go/vt/vttablet/tabletserver/connpool/dbconn_test.go +++ b/go/vt/vttablet/tabletserver/connpool/dbconn_test.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "strings" + "sync/atomic" "testing" "time" @@ -602,20 +603,20 @@ func TestDBExecOnceKillTimeout(t *testing.T) { // A very long running query that will be killed. expectedQuery := "select 1" - var timestampQuery time.Time + var timestampQuery atomic.Int64 db.AddQuery(expectedQuery, &sqltypes.Result{}) db.SetBeforeFunc(expectedQuery, func() { - timestampQuery = time.Now() + timestampQuery.Store(time.Now().UnixMicro()) // should take longer than our context deadline below. time.Sleep(1000 * time.Millisecond) }) // We expect a kill-query to be fired, too. // It should also run into a timeout. - var timestampKill time.Time + var timestampKill atomic.Int64 dbConn.killTimeout = 100 * time.Millisecond db.AddQueryPatternWithCallback(`kill \d+`, &sqltypes.Result{}, func(string) { - timestampKill = time.Now() + timestampKill.Store(time.Now().UnixMicro()) // should take longer than the configured kill timeout above. time.Sleep(200 * time.Millisecond) }) @@ -624,11 +625,13 @@ func TestDBExecOnceKillTimeout(t *testing.T) { defer cancel() result, err := dbConn.ExecOnce(ctx, "select 1", 1, false) - timestampDone := time.Now() + timeDone := time.Now() require.Error(t, err) require.Equal(t, vtrpcpb.Code_CANCELED, vterrors.Code(err)) require.Nil(t, result) - require.WithinDuration(t, timestampQuery, timestampKill, 150*time.Millisecond) - require.WithinDuration(t, timestampKill, timestampDone, 150*time.Millisecond) + timeQuery := time.UnixMicro(timestampQuery.Load()) + timeKill := time.UnixMicro(timestampKill.Load()) + require.WithinDuration(t, timeQuery, timeKill, 150*time.Millisecond) + require.WithinDuration(t, timeKill, timeDone, 150*time.Millisecond) } From c1ca7b621cfff6eff12ba24ee80cd6b173218f69 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Mon, 4 Mar 2024 11:01:44 +0000 Subject: [PATCH 7/7] Fix test case to properly wait for all queries to be pending. Signed-off-by: Arthur Schreiber --- go/vt/vttablet/tabletserver/tabletserver_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index df68c8b0a83..0374fb416a6 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -1122,6 +1122,20 @@ func TestSerializeTransactionsSameRow_ConcurrentTransactions(t *testing.T) { db.SetBeforeFunc("update test_table set name_string = 'tx1' where pk = 1 and `name` = 1 limit 10001", func() { close(tx1Started) + + // Wait for other queries to be pending. + <-allQueriesPending + }) + + db.SetBeforeFunc("update test_table set name_string = 'tx2' where pk = 1 and `name` = 1 limit 10001", + func() { + // Wait for other queries to be pending. + <-allQueriesPending + }) + + db.SetBeforeFunc("update test_table set name_string = 'tx3' where pk = 1 and `name` = 1 limit 10001", + func() { + // Wait for other queries to be pending. <-allQueriesPending }) @@ -1190,6 +1204,8 @@ func TestSerializeTransactionsSameRow_ConcurrentTransactions(t *testing.T) { // to allow more than connection attempt at a time. err := waitForTxSerializationPendingQueries(tsv, "test_table where pk = 1 and `name` = 1", 3) require.NoError(t, err) + + // Signal that all queries are pending now. close(allQueriesPending) wg.Wait()