From 8082ca32b42cc2b1a5663dcce16c0ded347b21d3 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 5 Oct 2023 12:49:57 +0300 Subject: [PATCH] Online DDL: timeouts for all gRPC calls (#14182) Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/onlineddl/executor.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 9b3c502c004..c92fcc2355b 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -125,6 +125,7 @@ const ( readyToCompleteHint = "ready_to_complete" databasePoolSize = 3 qrBufferExtraTimeout = 5 * time.Second + grpcTimeout = 30 * time.Second vreplicationTestSuiteWaitSeconds = 5 ) @@ -737,9 +738,6 @@ func (e *Executor) primaryPosition(ctx context.Context) (pos mysql.Position, err // terminateVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) error { - tmClient := e.tabletManagerClient() - defer tmClient.Close() - tablet, err := e.ts.GetTablet(ctx, e.tabletAlias) if err != nil { return err @@ -918,11 +916,13 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er e.toggleBufferTableFunc(bufferingCtx, onlineDDL.Table, timeout, bufferQueries) if !bufferQueries { + grpcCtx, cancel := context.WithTimeout(ctx, grpcTimeout) + defer cancel() // called after new table is in place. // unbuffer existing queries: bufferingContextCancel() // force re-read of tables - if err := tmClient.RefreshState(ctx, tablet.Tablet); err != nil { + if err := tmClient.RefreshState(grpcCtx, tablet.Tablet); err != nil { return err } } @@ -3754,7 +3754,10 @@ func (e *Executor) vreplicationExec(ctx context.Context, tablet *topodatapb.Tabl tmClient := e.tabletManagerClient() defer tmClient.Close() - return tmClient.VReplicationExec(ctx, tablet, query) + grpcCtx, cancel := context.WithTimeout(ctx, grpcTimeout) + defer cancel() + + return tmClient.VReplicationExec(grpcCtx, tablet, query) } // reloadSchema issues a ReloadSchema on this tablet @@ -3766,7 +3769,11 @@ func (e *Executor) reloadSchema(ctx context.Context) error { if err != nil { return err } - return tmClient.ReloadSchema(ctx, tablet.Tablet, "") + + grpcCtx, cancel := context.WithTimeout(ctx, grpcTimeout) + defer cancel() + + return tmClient.ReloadSchema(grpcCtx, tablet.Tablet, "") } // deleteVReplicationEntry cleans up a _vt.vreplication entry; this function is called as part of