diff --git a/go/test/endtoend/cluster/vtgate_process.go b/go/test/endtoend/cluster/vtgate_process.go index d7f5dc3dc01..8d2813f9b23 100644 --- a/go/test/endtoend/cluster/vtgate_process.go +++ b/go/test/endtoend/cluster/vtgate_process.go @@ -71,6 +71,7 @@ const defaultVtGatePlannerVersion = planbuilder.Gen4 // Setup starts Vtgate process with required arguements func (vtgate *VtgateProcess) Setup() (err error) { args := []string{ + "--alsologtostderr", "--topo_implementation", vtgate.CommonArg.TopoImplementation, "--topo_global_server_address", vtgate.CommonArg.TopoGlobalAddress, "--topo_global_root", vtgate.CommonArg.TopoGlobalRoot, diff --git a/go/test/endtoend/transaction/rollback/txn_rollback_shutdown_test.go b/go/test/endtoend/transaction/rollback/txn_rollback_shutdown_test.go index bae9596fd37..5f4e348c943 100644 --- a/go/test/endtoend/transaction/rollback/txn_rollback_shutdown_test.go +++ b/go/test/endtoend/transaction/rollback/txn_rollback_shutdown_test.go @@ -21,8 +21,11 @@ import ( "flag" "fmt" "os" + "sync" "testing" + "time" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/test/endtoend/utils" "github.com/stretchr/testify/require" @@ -120,6 +123,68 @@ func TestTransactionRollBackWhenShutDown(t *testing.T) { } +func TestTransactionRollBackWhenShutDownWithQueryRunning(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + utils.Exec(t, conn, "insert into buffer(id, msg) values(5,'alpha')") + utils.Exec(t, conn, "insert into buffer(id, msg) values(6,'beta')") + + // start an incomplete transaction with a long-running query + utils.Exec(t, conn, "begin") + _, err = conn.ExecuteFetch("select *, sleep(40) from buffer where id = 5 for update", 1, true) + assert.Error(t, err) + assert.ErrorContains(t, err, "EOF") + }() + + // wait for the long-running query to start executing + checkConn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer checkConn.Close() + waitTimeout := time.After(10 * time.Second) + running := false + for running == false { + select { + case <-waitTimeout: + t.Fatalf("Long-running query did not start executing") + case <-time.After(10 * time.Millisecond): + // We should get a lock wait timeout error once the long-running query starts executing + _, err := checkConn.ExecuteFetch("select * from buffer where id = 5 for update nowait", 1, true) + if sqlErr, ok := err.(*sqlerror.SQLError); ok { + if sqlErr.Number() == sqlerror.ERLockNowait { + running = true + continue + } + } + require.NoError(t, err) + } + } + + // Enforce a restart to enforce rollback + if err = clusterInstance.RestartVtgate(); err != nil { + t.Errorf("Fail to re-start vtgate: %v", err) + } + + // Make a new mysql connection to vtGate + vtParams = clusterInstance.GetVTParams(keyspaceName) + conn2, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn2.Close() + + // Verify previous transaction was rolled back. Row lock should be available, otherwise we'll get an error. + qr := utils.Exec(t, conn2, "select * from buffer where id = 5 for update nowait") + assert.Equal(t, 1, len(qr.Rows)) +} + func TestErrorInAutocommitSession(t *testing.T) { defer cluster.PanicHandler(t) ctx := context.Background() @@ -136,9 +201,9 @@ func TestErrorInAutocommitSession(t *testing.T) { conn2, err := mysql.Connect(ctx, &vtParams) require.NoError(t, err) defer conn2.Close() - result := utils.Exec(t, conn2, "select * from buffer order by id") + result := utils.Exec(t, conn2, "select * from buffer where id in (1,2) order by id") // if we have properly working autocommit code, both the successful inserts should be visible to a second // connection, even if we have not done an explicit commit - assert.Equal(t, `[[INT64(1) VARCHAR("foo")] [INT64(2) VARCHAR("baz")] [INT64(3) VARCHAR("mark")] [INT64(4) VARCHAR("doug")]]`, fmt.Sprintf("%v", result.Rows)) + assert.Equal(t, `[[INT64(1) VARCHAR("foo")] [INT64(2) VARCHAR("baz")]]`, fmt.Sprintf("%v", result.Rows)) } diff --git a/go/test/endtoend/vtgate/connectiondrain/main_test.go b/go/test/endtoend/vtgate/connectiondrain/main_test.go index 6dae9b72be9..a663439d5a2 100644 --- a/go/test/endtoend/vtgate/connectiondrain/main_test.go +++ b/go/test/endtoend/vtgate/connectiondrain/main_test.go @@ -20,7 +20,9 @@ import ( "context" _ "embed" "flag" + "fmt" "os" + "path" "testing" "time" @@ -177,10 +179,22 @@ func TestConnectionDrainOnTermTimeout(t *testing.T) { // Run a busy query that returns only after the onterm_timeout is reached, this should fail when we reach the timeout _, err = vtConn.ExecuteFetch("select sleep(40)", 1, false) require.Error(t, err) + fmt.Printf("error from sleep query: %v\n", err) // Running a query after we have reached the onterm_timeout should fail _, err = vtConn2.ExecuteFetch("select id from t1", 1, false) require.Error(t, err) + fmt.Printf("error after onterm_timeout: %v\n", err) + + time.Sleep(5 * time.Second) + + if !clusterInstance.VtgateProcess.IsShutdown() { + fmt.Printf("vtgate is still running\n") + } + + logDir := clusterInstance.VtgateProcess.LogDir + all, _ := os.ReadFile(path.Join(logDir, "vtgate-stderr.txt")) + fmt.Printf("stderr:\n%s\n", all) // By now vtgate will be shutdown becaused it reached its onterm_timeout, despite idle connections still being opened require.True(t, clusterInstance.VtgateProcess.IsShutdown()) diff --git a/go/vt/vtgate/plugin_mysql_server.go b/go/vt/vtgate/plugin_mysql_server.go index 33935bca461..7a6f6d30126 100644 --- a/go/vt/vtgate/plugin_mysql_server.go +++ b/go/vt/vtgate/plugin_mysql_server.go @@ -150,8 +150,10 @@ func (vh *vtgateHandler) ComResetConnection(c *mysql.Conn) { } func (vh *vtgateHandler) ConnectionClosed(c *mysql.Conn) { + log.Infof("Connection closed: %d", c.ConnectionID) // Rollback if there is an ongoing transaction. Ignore error. defer func() { + log.Infof("Deleting closed connection: %d", c.ConnectionID) vh.mu.Lock() delete(vh.connections, c.ConnectionID) vh.mu.Unlock() @@ -169,7 +171,10 @@ func (vh *vtgateHandler) ConnectionClosed(c *mysql.Conn) { if session.InTransaction { defer vh.busyConnections.Add(-1) } - _ = vh.vtg.CloseSession(ctx, session) + err := vh.vtg.CloseSession(ctx, session) + if err != nil { + log.Errorf("Ignoring error on CloseSession: %s", err) + } } // Regexp to extract parent span id over the sql query @@ -627,7 +632,7 @@ func (srv *mysqlServer) shutdown(timeout time.Duration) { defer cancel() // Reserve some time for rollbacks to happen after draining - drainCtx, cancel := context.WithTimeout(ctx, timeout-time.Second) + drainCtx, cancel := context.WithTimeout(ctx, timeout-(2*time.Second)) defer cancel() srv.shutdownMysqlProtocolAndDrain(drainCtx) @@ -712,27 +717,37 @@ func (srv *mysqlServer) rollbackAtShutdown(ctx context.Context) { return } - // Close all open connections. If they're waiting for reads, this will cause - // them to error out, which will automatically rollback open transactions. + // Collect all open connections + connections := make(map[uint32]*mysql.Conn) func() { if srv.vtgateHandle != nil { srv.vtgateHandle.mu.Lock() defer srv.vtgateHandle.mu.Unlock() for id, c := range srv.vtgateHandle.connections { if c != nil { - log.Infof("Closing connection and rolling back any open transactions. Connection ID: %v", id) - c.Close() + connections[id] = c } } } }() + // Close all open connections and cancel any inflight queries. This will cause them to error out and automatically + // rollback open transactions. + // + // Doing this without holding a lock on `srv.vtgateHandle` to avoid any potential deadlocks with the + // per-connection locks acquired in `c.CancelCtx()` + for id, c := range connections { + log.Infof("Closing connection and rolling back any open transactions. Connection ID: %v", id) + c.Close() + c.CancelCtx() + } + // If vtgate is instead busy executing a query or in a transaction, the number of open conns // will be non-zero. Give some time for those queries to finish. for srv.vtgateHandle.numConnections() > 0 { select { case <-ctx.Done(): - log.Errorf("All connections did not go idle. Shutting down anyway.") + log.Errorf("Connections are still open. Shutting down anyway.") return case <-time.After(10 * time.Millisecond): }