Skip to content

Commit

Permalink
VTGate: Rollback open transactions on shutdown
Browse files Browse the repository at this point in the history
Signed-off-by: Brendan Dougherty <[email protected]>
  • Loading branch information
brendar committed Oct 2, 2024
1 parent 2e47aba commit 6d88741
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,27 +98,26 @@ func TestTransactionRollBackWhenShutDown(t *testing.T) {

// start an incomplete transaction
utils.Exec(t, conn, "begin")
utils.Exec(t, conn, "insert into buffer(id, msg) values(33,'mark')")
utils.Exec(t, conn, "select * from buffer where id = 3 for update")

// Enforce a restart to enforce rollback
if err = clusterInstance.RestartVtgate(); err != nil {
t.Errorf("Fail to re-start vtgate: %v", err)
}

want := ""

// Make a new mysql connection to vtGate
vtParams = clusterInstance.GetVTParams(keyspaceName)
conn2, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn2.Close()

vtParams = clusterInstance.GetVTParams(keyspaceName)
// Verify that rollback worked
qr := utils.Exec(t, conn2, "select id from buffer where msg='mark'")
got := fmt.Sprintf("%v", qr.Rows)
want = `[[INT64(3)]]`
assert.Equal(t, want, got)
// Start a new transaction
utils.Exec(t, conn2, "begin")
defer utils.Exec(t, conn2, "rollback")
// Verify previous transaction was rolled back. Row lock should be available, otherwise we'll get an error.
qr := utils.Exec(t, conn2, "select * from buffer where id = 3 for update nowait")
assert.Equal(t, 1, len(qr.Rows))

}

func TestErrorInAutocommitSession(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/servenv/servenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ func GetInitStartTime() time.Time {
return initStartTime
}

func GetOnTermTimeout() time.Duration {
return timeouts.OnTermTimeout
}

func populateListeningURL(port int32) {
host, err := netutil.FullyQualifiedHostname()
if err != nil {
Expand Down
67 changes: 47 additions & 20 deletions go/vt/vtgate/plugin_mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,19 @@ func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mys
}
}

func (srv *mysqlServer) shutdownMysqlProtocolAndDrain() {
func (srv *mysqlServer) shutdown(timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

// Reserve some time for rollbacks to happen after draining
drainCtx, cancel := context.WithTimeout(ctx, timeout-time.Second)
defer cancel()

srv.shutdownMysqlProtocolAndDrain(drainCtx)
srv.rollbackAtShutdown(ctx)
}

func (srv *mysqlServer) shutdownMysqlProtocolAndDrain(ctx context.Context) {
if srv.sigChan != nil {
signal.Stop(srv.sigChan)
}
Expand All @@ -637,13 +649,21 @@ func (srv *mysqlServer) shutdownMysqlProtocolAndDrain() {
setListenerToNil()
// We wait for connected clients to drain by themselves or to run into the onterm timeout
log.Infof("Starting drain loop, waiting for all clients to disconnect")
reported := time.Now()

reportTicker := time.NewTicker(2 * time.Second)
defer reportTicker.Stop()
checkTicker := time.NewTicker(1000 * time.Millisecond)
defer checkTicker.Stop()

for srv.vtgateHandle.numConnections() > 0 {
if time.Since(reported) > 2*time.Second {
select {
case <-ctx.Done():
log.Errorf("Timed out waiting for client connections to drain (%d connected)...", srv.vtgateHandle.numConnections())
return
case <-reportTicker.C:
log.Infof("Still waiting for client connections to drain (%d connected)...", srv.vtgateHandle.numConnections())
reported = time.Now()
case <-checkTicker.C:
}
time.Sleep(1000 * time.Millisecond)
}
return
}
Expand All @@ -653,15 +673,21 @@ func (srv *mysqlServer) shutdownMysqlProtocolAndDrain() {
setListenerToNil()
if busy := srv.vtgateHandle.busyConnections.Load(); busy > 0 {
log.Infof("Waiting for all client connections to be idle (%d active)...", busy)
start := time.Now()
reported := start

reportTicker := time.NewTicker(2 * time.Second)
defer reportTicker.Stop()
checkTicker := time.NewTicker(1 * time.Millisecond)
defer checkTicker.Stop()

for busy > 0 {
if time.Since(reported) > 2*time.Second {
select {
case <-ctx.Done():
log.Errorf("Timed out waiting for client connections to be idle (%d active)...", busy)
return
case <-reportTicker.C:
log.Infof("Still waiting for client connections to be idle (%d active)...", busy)
reported = time.Now()
case <-checkTicker.C:
}

time.Sleep(1 * time.Millisecond)
busy = srv.vtgateHandle.busyConnections.Load()
}
}
Expand All @@ -679,7 +705,7 @@ func stopListener(listener *mysql.Listener, shutdown bool) {
}
}

func (srv *mysqlServer) rollbackAtShutdown() {
func (srv *mysqlServer) rollbackAtShutdown(ctx context.Context) {
defer log.Flush()
if srv.vtgateHandle == nil {
// we still haven't been able to initialise the vtgateHandler, so we don't need to rollback anything
Expand All @@ -694,23 +720,24 @@ func (srv *mysqlServer) rollbackAtShutdown() {
defer srv.vtgateHandle.mu.Unlock()
for id, c := range srv.vtgateHandle.connections {
if c != nil {
log.Infof("Rolling back transactions associated with connection ID: %v", id)
log.Infof("Closing connection and rolling back any open transactions. Connection ID: %v", id)
c.Close()
}
}
}
}()

// If vtgate is instead busy executing a query, the number of open conns
// will be non-zero. Give another second for those queries to finish.
for i := 0; i < 100; i++ {
if srv.vtgateHandle.numConnections() == 0 {
log.Infof("All connections have been rolled back.")
// If vtgate is instead busy executing a query or in a transaction, the number of open conns
// will be non-zero. Give some time for those queries to finish.
for srv.vtgateHandle.numConnections() > 0 {
select {
case <-ctx.Done():
log.Errorf("All connections did not go idle. Shutting down anyway.")
return
case <-time.After(10 * time.Millisecond):
}
time.Sleep(10 * time.Millisecond)
}
log.Errorf("All connections did not go idle. Shutting down anyway.")
log.Infof("All connections have been closed")
}

func mysqlSocketPath() string {
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,9 @@ func Init(
tr.Start()
srv := initMySQLProtocol(vtgateInst)
if srv != nil {
servenv.OnTermSync(srv.shutdownMysqlProtocolAndDrain)
servenv.OnClose(srv.rollbackAtShutdown)
servenv.OnTermSync(func() {
srv.shutdown(servenv.GetOnTermTimeout())
})
}
})
servenv.OnTerm(func() {
Expand Down

0 comments on commit 6d88741

Please sign in to comment.