diff --git a/go/vt/vtorc/inst/tablet_dao.go b/go/vt/vtorc/inst/tablet_dao.go index 3ee49a75781..35c71a51eea 100644 --- a/go/vt/vtorc/inst/tablet_dao.go +++ b/go/vt/vtorc/inst/tablet_dao.go @@ -35,20 +35,12 @@ import ( // ErrTabletAliasNil is a fixed error message. var ErrTabletAliasNil = errors.New("tablet alias is nil") +var tmc tmclient.TabletManagerClient -// ResetReplicationParameters resets the replication parameters on the given tablet. -func ResetReplicationParameters(tabletAlias string) error { - tablet, err := ReadTablet(tabletAlias) - if err != nil { - return err - } - tmc := tmclient.NewTabletManagerClient() - tmcCtx, tmcCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer tmcCancel() - if err := tmc.ResetReplicationParameters(tmcCtx, tablet); err != nil { - return err - } - return nil +// InitializeTMC initializes the tablet manager client to use for all VTOrc RPC calls. +func InitializeTMC() tmclient.TabletManagerClient { + tmc = tmclient.NewTabletManagerClient() + return tmc } // FullStatus gets the full status of the MySQL running in vttablet. @@ -57,7 +49,6 @@ func FullStatus(tabletAlias string) (*replicationdatapb.FullStatus, error) { if err != nil { return nil, err } - tmc := tmclient.NewTabletManagerClient() tmcCtx, tmcCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) defer tmcCancel() return tmc.FullStatus(tmcCtx, tablet) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index f08ab2c9c15..cda35b8091c 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -66,7 +66,7 @@ func RegisterFlags(fs *pflag.FlagSet) { // channel for polling. func OpenTabletDiscovery() <-chan time.Time { ts = topo.Open() - tmc = tmclient.NewTabletManagerClient() + tmc = inst.InitializeTMC() // Clear existing cache and perform a new refresh. if _, err := db.ExecVTOrc("delete from vitess_tablet"); err != nil { log.Error(err) @@ -302,6 +302,11 @@ func changeTabletType(ctx context.Context, tablet *topodatapb.Tablet, tabletType return tmc.ChangeType(ctx, tablet, tabletType, semiSync) } +// resetReplicationParameters resets the replication parameters on the given tablet. +func resetReplicationParameters(ctx context.Context, tablet *topodatapb.Tablet) error { + return tmc.ResetReplicationParameters(ctx, tablet) +} + // setReplicationSource calls the said RPC with the parameters provided func setReplicationSource(ctx context.Context, replica *topodatapb.Tablet, primary *topodatapb.Tablet, semiSync bool) error { return tmc.SetReplicationSource(ctx, replica, primary.Alias, 0, "", true, semiSync) diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index e5168fea541..c1fc2c8f9fb 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -35,7 +35,6 @@ import ( "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/inst" "vitess.io/vitess/go/vt/vtorc/util" - "vitess.io/vitess/go/vt/vttablet/tmclient" ) type RecoveryType string @@ -210,12 +209,15 @@ func recoverPrimaryHasPrimary(ctx context.Context, analysisEntry *inst.Replicati _ = resolveRecovery(topologyRecovery, nil) }() - // Reset replication on current primary. - err = inst.ResetReplicationParameters(analysisEntry.AnalyzedInstanceAlias) + // Read the tablet information from the database to find the shard and keyspace of the tablet + analyzedTablet, err := inst.ReadTablet(analysisEntry.AnalyzedInstanceAlias) if err != nil { - return false, topologyRecovery, err + return false, nil, err } - return true, topologyRecovery, nil + + // Reset replication on current primary. + err = resetReplicationParameters(ctx, analyzedTablet) + return true, topologyRecovery, err } // runEmergencyReparentOp runs a recovery for which we have to run ERS. Here waitForAllTablets is a boolean telling ERS whether it should wait for all the tablets @@ -244,7 +246,7 @@ func runEmergencyReparentOp(ctx context.Context, analysisEntry *inst.Replication _ = resolveRecovery(topologyRecovery, promotedReplica) }() - ev, err := reparentutil.NewEmergencyReparenter(ts, tmclient.NewTabletManagerClient(), logutil.NewCallbackLogger(func(event *logutilpb.Event) { + ev, err := reparentutil.NewEmergencyReparenter(ts, tmc, logutil.NewCallbackLogger(func(event *logutilpb.Event) { level := event.GetLevel() value := event.GetValue() // we only log the warnings and errors explicitly, everything gets logged as an information message anyways in auditing topology recovery @@ -836,7 +838,7 @@ func electNewPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysi } _ = AuditTopologyRecovery(topologyRecovery, "starting PlannedReparentShard for electing new primary.") - ev, err := reparentutil.NewPlannedReparenter(ts, tmclient.NewTabletManagerClient(), logutil.NewCallbackLogger(func(event *logutilpb.Event) { + ev, err := reparentutil.NewPlannedReparenter(ts, tmc, logutil.NewCallbackLogger(func(event *logutilpb.Event) { level := event.GetLevel() value := event.GetValue() // we only log the warnings and errors explicitly, everything gets logged as an information message anyways in auditing topology recovery