diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index d95d4afc41f..66e81aef949 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -124,6 +124,7 @@ const ( readyToCompleteHint = "ready_to_complete" databasePoolSize = 3 qrBufferExtraTimeout = 5 * time.Second + grpcTimeout = 30 * time.Second vreplicationTestSuiteWaitSeconds = 5 ) @@ -735,9 +736,6 @@ func (e *Executor) primaryPosition(ctx context.Context) (pos replication.Positio // terminateVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) error { - tmClient := e.tabletManagerClient() - defer tmClient.Close() - tablet, err := e.ts.GetTablet(ctx, e.tabletAlias) if err != nil { return err @@ -916,11 +914,13 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er e.toggleBufferTableFunc(bufferingCtx, onlineDDL.Table, timeout, bufferQueries) if !bufferQueries { + grpcCtx, cancel := context.WithTimeout(ctx, grpcTimeout) + defer cancel() // called after new table is in place. // unbuffer existing queries: bufferingContextCancel() // force re-read of tables - if err := tmClient.RefreshState(ctx, tablet.Tablet); err != nil { + if err := tmClient.RefreshState(grpcCtx, tablet.Tablet); err != nil { return err } } @@ -3684,7 +3684,10 @@ func (e *Executor) vreplicationExec(ctx context.Context, tablet *topodatapb.Tabl tmClient := e.tabletManagerClient() defer tmClient.Close() - return tmClient.VReplicationExec(ctx, tablet, query) + grpcCtx, cancel := context.WithTimeout(ctx, grpcTimeout) + defer cancel() + + return tmClient.VReplicationExec(grpcCtx, tablet, query) } // reloadSchema issues a ReloadSchema on this tablet @@ -3696,7 +3699,11 @@ func (e *Executor) reloadSchema(ctx context.Context) error { if err != nil { return err } - return tmClient.ReloadSchema(ctx, tablet.Tablet, "") + + grpcCtx, cancel := context.WithTimeout(ctx, grpcTimeout) + defer cancel() + + return tmClient.ReloadSchema(grpcCtx, tablet.Tablet, "") } // deleteVReplicationEntry cleans up a _vt.vreplication entry; this function is called as part of