Skip to content

Commit

Permalink
refactor: refactor code and use a single tmc for all vtorc calls
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Feb 26, 2024
1 parent 47e1375 commit 1df6961
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 22 deletions.
19 changes: 5 additions & 14 deletions go/vt/vtorc/inst/tablet_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,12 @@ import (

// ErrTabletAliasNil is a fixed error message.
var ErrTabletAliasNil = errors.New("tablet alias is nil")
var tmc tmclient.TabletManagerClient

// ResetReplicationParameters resets the replication parameters on the given tablet.
func ResetReplicationParameters(tabletAlias string) error {
tablet, err := ReadTablet(tabletAlias)
if err != nil {
return err
}
tmc := tmclient.NewTabletManagerClient()
tmcCtx, tmcCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer tmcCancel()
if err := tmc.ResetReplicationParameters(tmcCtx, tablet); err != nil {
return err
}
return nil
// InitializeTMC initializes the tablet manager client to use for all VTOrc RPC calls.
func InitializeTMC() tmclient.TabletManagerClient {
tmc = tmclient.NewTabletManagerClient()
return tmc
}

// FullStatus gets the full status of the MySQL running in vttablet.
Expand All @@ -57,7 +49,6 @@ func FullStatus(tabletAlias string) (*replicationdatapb.FullStatus, error) {
if err != nil {
return nil, err
}
tmc := tmclient.NewTabletManagerClient()
tmcCtx, tmcCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer tmcCancel()
return tmc.FullStatus(tmcCtx, tablet)
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func RegisterFlags(fs *pflag.FlagSet) {
// channel for polling.
func OpenTabletDiscovery() <-chan time.Time {
ts = topo.Open()
tmc = tmclient.NewTabletManagerClient()
tmc = inst.InitializeTMC()
// Clear existing cache and perform a new refresh.
if _, err := db.ExecVTOrc("delete from vitess_tablet"); err != nil {
log.Error(err)
Expand Down Expand Up @@ -302,6 +302,11 @@ func changeTabletType(ctx context.Context, tablet *topodatapb.Tablet, tabletType
return tmc.ChangeType(ctx, tablet, tabletType, semiSync)
}

// resetReplicationParameters resets the replication parameters on the given tablet.
func resetReplicationParameters(ctx context.Context, tablet *topodatapb.Tablet) error {
return tmc.ResetReplicationParameters(ctx, tablet)
}

// setReplicationSource calls the said RPC with the parameters provided
func setReplicationSource(ctx context.Context, replica *topodatapb.Tablet, primary *topodatapb.Tablet, semiSync bool) error {
return tmc.SetReplicationSource(ctx, replica, primary.Alias, 0, "", true, semiSync)
Expand Down
16 changes: 9 additions & 7 deletions go/vt/vtorc/logic/topology_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"vitess.io/vitess/go/vt/vtorc/config"
"vitess.io/vitess/go/vt/vtorc/inst"
"vitess.io/vitess/go/vt/vtorc/util"
"vitess.io/vitess/go/vt/vttablet/tmclient"
)

type RecoveryType string
Expand Down Expand Up @@ -210,12 +209,15 @@ func recoverPrimaryHasPrimary(ctx context.Context, analysisEntry *inst.Replicati
_ = resolveRecovery(topologyRecovery, nil)
}()

// Reset replication on current primary.
err = inst.ResetReplicationParameters(analysisEntry.AnalyzedInstanceAlias)
// Read the tablet information from the database to find the shard and keyspace of the tablet
analyzedTablet, err := inst.ReadTablet(analysisEntry.AnalyzedInstanceAlias)
if err != nil {
return false, topologyRecovery, err
return false, nil, err
}
return true, topologyRecovery, nil

// Reset replication on current primary.
err = resetReplicationParameters(ctx, analyzedTablet)
return true, topologyRecovery, err
}

// runEmergencyReparentOp runs a recovery for which we have to run ERS. Here waitForAllTablets is a boolean telling ERS whether it should wait for all the tablets
Expand Down Expand Up @@ -244,7 +246,7 @@ func runEmergencyReparentOp(ctx context.Context, analysisEntry *inst.Replication
_ = resolveRecovery(topologyRecovery, promotedReplica)
}()

ev, err := reparentutil.NewEmergencyReparenter(ts, tmclient.NewTabletManagerClient(), logutil.NewCallbackLogger(func(event *logutilpb.Event) {
ev, err := reparentutil.NewEmergencyReparenter(ts, tmc, logutil.NewCallbackLogger(func(event *logutilpb.Event) {
level := event.GetLevel()
value := event.GetValue()
// we only log the warnings and errors explicitly, everything gets logged as an information message anyways in auditing topology recovery
Expand Down Expand Up @@ -836,7 +838,7 @@ func electNewPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysi
}
_ = AuditTopologyRecovery(topologyRecovery, "starting PlannedReparentShard for electing new primary.")

ev, err := reparentutil.NewPlannedReparenter(ts, tmclient.NewTabletManagerClient(), logutil.NewCallbackLogger(func(event *logutilpb.Event) {
ev, err := reparentutil.NewPlannedReparenter(ts, tmc, logutil.NewCallbackLogger(func(event *logutilpb.Event) {
level := event.GetLevel()
value := event.GetValue()
// we only log the warnings and errors explicitly, everything gets logged as an information message anyways in auditing topology recovery
Expand Down

0 comments on commit 1df6961

Please sign in to comment.