Skip to content

Commit

Permalink
WIP - cancel inflight queries
Browse files Browse the repository at this point in the history
Signed-off-by: Brendan Dougherty <[email protected]>
  • Loading branch information
brendar committed Oct 3, 2024
1 parent 6d88741 commit 09ec6f5
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 9 deletions.
1 change: 1 addition & 0 deletions go/test/endtoend/cluster/vtgate_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const defaultVtGatePlannerVersion = planbuilder.Gen4
// Setup starts Vtgate process with required arguements
func (vtgate *VtgateProcess) Setup() (err error) {
args := []string{
"--alsologtostderr",
"--topo_implementation", vtgate.CommonArg.TopoImplementation,
"--topo_global_server_address", vtgate.CommonArg.TopoGlobalAddress,
"--topo_global_root", vtgate.CommonArg.TopoGlobalRoot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"flag"
"fmt"
"os"
"sync"
"testing"
"time"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/test/endtoend/utils"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -120,6 +123,68 @@ func TestTransactionRollBackWhenShutDown(t *testing.T) {

}

func TestTransactionRollBackWhenShutDownWithQueryRunning(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()

conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

utils.Exec(t, conn, "insert into buffer(id, msg) values(5,'alpha')")
utils.Exec(t, conn, "insert into buffer(id, msg) values(6,'beta')")

// start an incomplete transaction with a long-running query
utils.Exec(t, conn, "begin")
_, err = conn.ExecuteFetch("select *, sleep(40) from buffer where id = 5 for update", 1, true)
assert.Error(t, err)
assert.ErrorContains(t, err, "EOF")
}()

// wait for the long-running query to start executing
checkConn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer checkConn.Close()
waitTimeout := time.After(10 * time.Second)
running := false
for running == false {
select {
case <-waitTimeout:
t.Fatalf("Long-running query did not start executing")
case <-time.After(10 * time.Millisecond):
// We should get a lock wait timeout error once the long-running query starts executing
_, err := checkConn.ExecuteFetch("select * from buffer where id = 5 for update nowait", 1, true)
if sqlErr, ok := err.(*sqlerror.SQLError); ok {
if sqlErr.Number() == sqlerror.ERLockNowait {
running = true
continue
}
}
require.NoError(t, err)
}
}

// Enforce a restart to enforce rollback
if err = clusterInstance.RestartVtgate(); err != nil {
t.Errorf("Fail to re-start vtgate: %v", err)
}

// Make a new mysql connection to vtGate
vtParams = clusterInstance.GetVTParams(keyspaceName)
conn2, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn2.Close()

// Verify previous transaction was rolled back. Row lock should be available, otherwise we'll get an error.
qr := utils.Exec(t, conn2, "select * from buffer where id = 5 for update nowait")
assert.Equal(t, 1, len(qr.Rows))
}

func TestErrorInAutocommitSession(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
Expand All @@ -136,9 +201,9 @@ func TestErrorInAutocommitSession(t *testing.T) {
conn2, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn2.Close()
result := utils.Exec(t, conn2, "select * from buffer order by id")
result := utils.Exec(t, conn2, "select * from buffer where id in (1,2) order by id")

// if we have properly working autocommit code, both the successful inserts should be visible to a second
// connection, even if we have not done an explicit commit
assert.Equal(t, `[[INT64(1) VARCHAR("foo")] [INT64(2) VARCHAR("baz")] [INT64(3) VARCHAR("mark")] [INT64(4) VARCHAR("doug")]]`, fmt.Sprintf("%v", result.Rows))
assert.Equal(t, `[[INT64(1) VARCHAR("foo")] [INT64(2) VARCHAR("baz")]]`, fmt.Sprintf("%v", result.Rows))
}
14 changes: 14 additions & 0 deletions go/test/endtoend/vtgate/connectiondrain/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"context"
_ "embed"
"flag"
"fmt"
"os"
"path"
"testing"
"time"

Expand Down Expand Up @@ -177,10 +179,22 @@ func TestConnectionDrainOnTermTimeout(t *testing.T) {
// Run a busy query that returns only after the onterm_timeout is reached, this should fail when we reach the timeout
_, err = vtConn.ExecuteFetch("select sleep(40)", 1, false)
require.Error(t, err)
fmt.Printf("error from sleep query: %v\n", err)

// Running a query after we have reached the onterm_timeout should fail
_, err = vtConn2.ExecuteFetch("select id from t1", 1, false)
require.Error(t, err)
fmt.Printf("error after onterm_timeout: %v\n", err)

time.Sleep(5 * time.Second)

if !clusterInstance.VtgateProcess.IsShutdown() {
fmt.Printf("vtgate is still running\n")
}

logDir := clusterInstance.VtgateProcess.LogDir
all, _ := os.ReadFile(path.Join(logDir, "vtgate-stderr.txt"))
fmt.Printf("stderr:\n%s\n", all)

// By now vtgate will be shutdown becaused it reached its onterm_timeout, despite idle connections still being opened
require.True(t, clusterInstance.VtgateProcess.IsShutdown())
Expand Down
29 changes: 22 additions & 7 deletions go/vt/vtgate/plugin_mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,10 @@ func (vh *vtgateHandler) ComResetConnection(c *mysql.Conn) {
}

func (vh *vtgateHandler) ConnectionClosed(c *mysql.Conn) {
log.Infof("Connection closed: %d", c.ConnectionID)
// Rollback if there is an ongoing transaction. Ignore error.
defer func() {
log.Infof("Deleting closed connection: %d", c.ConnectionID)
vh.mu.Lock()
delete(vh.connections, c.ConnectionID)
vh.mu.Unlock()
Expand All @@ -169,7 +171,10 @@ func (vh *vtgateHandler) ConnectionClosed(c *mysql.Conn) {
if session.InTransaction {
defer vh.busyConnections.Add(-1)
}
_ = vh.vtg.CloseSession(ctx, session)
err := vh.vtg.CloseSession(ctx, session)
if err != nil {
log.Errorf("Ignoring error on CloseSession: %s", err)
}
}

// Regexp to extract parent span id over the sql query
Expand Down Expand Up @@ -627,7 +632,7 @@ func (srv *mysqlServer) shutdown(timeout time.Duration) {
defer cancel()

// Reserve some time for rollbacks to happen after draining
drainCtx, cancel := context.WithTimeout(ctx, timeout-time.Second)
drainCtx, cancel := context.WithTimeout(ctx, timeout-(2*time.Second))
defer cancel()

srv.shutdownMysqlProtocolAndDrain(drainCtx)
Expand Down Expand Up @@ -712,27 +717,37 @@ func (srv *mysqlServer) rollbackAtShutdown(ctx context.Context) {
return
}

// Close all open connections. If they're waiting for reads, this will cause
// them to error out, which will automatically rollback open transactions.
// Collect all open connections
connections := make(map[uint32]*mysql.Conn)
func() {
if srv.vtgateHandle != nil {
srv.vtgateHandle.mu.Lock()
defer srv.vtgateHandle.mu.Unlock()
for id, c := range srv.vtgateHandle.connections {
if c != nil {
log.Infof("Closing connection and rolling back any open transactions. Connection ID: %v", id)
c.Close()
connections[id] = c
}
}
}
}()

// Close all open connections and cancel any inflight queries. This will cause them to error out and automatically
// rollback open transactions.
//
// Doing this without holding a lock on `srv.vtgateHandle` to avoid any potential deadlocks with the
// per-connection locks acquired in `c.CancelCtx()`
for id, c := range connections {
log.Infof("Closing connection and rolling back any open transactions. Connection ID: %v", id)
c.Close()
c.CancelCtx()
}

// If vtgate is instead busy executing a query or in a transaction, the number of open conns
// will be non-zero. Give some time for those queries to finish.
for srv.vtgateHandle.numConnections() > 0 {
select {
case <-ctx.Done():
log.Errorf("All connections did not go idle. Shutting down anyway.")
log.Errorf("Connections are still open. Shutting down anyway.")
return
case <-time.After(10 * time.Millisecond):
}
Expand Down

0 comments on commit 09ec6f5

Please sign in to comment.