diff --git a/go/flags/endtoend/vtbackup.txt b/go/flags/endtoend/vtbackup.txt index d0c5a328052..1b9f5aca61f 100644 --- a/go/flags/endtoend/vtbackup.txt +++ b/go/flags/endtoend/vtbackup.txt @@ -206,7 +206,7 @@ Flags: --stderrthreshold severityFlag logs at or above this threshold go to stderr (default 1) --tablet_manager_grpc_ca string the server ca to use to validate servers when connecting --tablet_manager_grpc_cert string the cert to use to connect - --tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8) + --tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8) --tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100) --tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting --tablet_manager_grpc_key string the key to use to connect diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 0843ee46563..04a46d9e754 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -342,7 +342,7 @@ Flags: --tablet_hostname string if not empty, this hostname will be assumed instead of trying to resolve it --tablet_manager_grpc_ca string the server ca to use to validate servers when connecting --tablet_manager_grpc_cert string the cert to use to connect - --tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8) + --tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8) --tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100) --tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting --tablet_manager_grpc_key string the key to use to connect diff --git a/go/flags/endtoend/vtctld.txt b/go/flags/endtoend/vtctld.txt index 62d819fb759..da60dfeb3b1 100644 --- a/go/flags/endtoend/vtctld.txt +++ b/go/flags/endtoend/vtctld.txt @@ -141,7 +141,7 @@ Flags: --tablet_health_keep_alive duration close streaming tablet health connection if there are no requests for this long (default 5m0s) --tablet_manager_grpc_ca string the server ca to use to validate servers when connecting --tablet_manager_grpc_cert string the cert to use to connect - --tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8) + --tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8) --tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100) --tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting --tablet_manager_grpc_key string the key to use to connect diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index 8073323ec1c..1e14056460e 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -81,7 +81,7 @@ Flags: --table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class --tablet_manager_grpc_ca string the server ca to use to validate servers when connecting --tablet_manager_grpc_cert string the cert to use to connect - --tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8) + --tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8) --tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100) --tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting --tablet_manager_grpc_key string the key to use to connect diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index bb7403ef1cb..6ff475badfa 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -349,7 +349,7 @@ Flags: --tablet_hostname string if not empty, this hostname will be assumed instead of trying to resolve it --tablet_manager_grpc_ca string the server ca to use to validate servers when connecting --tablet_manager_grpc_cert string the cert to use to connect - --tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8) + --tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8) --tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100) --tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting --tablet_manager_grpc_key string the key to use to connect diff --git a/go/flags/endtoend/vttestserver.txt b/go/flags/endtoend/vttestserver.txt index d3af635e353..3ca21d3d60e 100644 --- a/go/flags/endtoend/vttestserver.txt +++ b/go/flags/endtoend/vttestserver.txt @@ -121,7 +121,7 @@ Flags: --tablet_hostname string The hostname to use for the tablet otherwise it will be derived from OS' hostname (default "localhost") --tablet_manager_grpc_ca string the server ca to use to validate servers when connecting --tablet_manager_grpc_cert string the cert to use to connect - --tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8) + --tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8) --tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100) --tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting --tablet_manager_grpc_key string the key to use to connect diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index c396a89ef21..250d2bd6ba6 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -178,7 +178,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named var waitGroup sync.WaitGroup var tablet *topodatapb.Tablet - var fullStatus *replicationdatapb.FullStatus + var fs *replicationdatapb.FullStatus readingStartTime := time.Now() instance := NewInstance() instanceFound := false @@ -208,7 +208,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named goto Cleanup } - fullStatus, err = FullStatus(tabletAlias) + fs, err = fullStatus(tabletAlias) if err != nil { goto Cleanup } @@ -218,48 +218,48 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named instance.Port = int(tablet.MysqlPort) { // We begin with a few operations we can run concurrently, and which do not depend on anything - instance.ServerID = uint(fullStatus.ServerId) - instance.Version = fullStatus.Version - instance.ReadOnly = fullStatus.ReadOnly - instance.LogBinEnabled = fullStatus.LogBinEnabled - instance.BinlogFormat = fullStatus.BinlogFormat - instance.LogReplicationUpdatesEnabled = fullStatus.LogReplicaUpdates - instance.VersionComment = fullStatus.VersionComment - - if instance.LogBinEnabled && fullStatus.PrimaryStatus != nil { - binlogPos, err := getBinlogCoordinatesFromPositionString(fullStatus.PrimaryStatus.FilePosition) + instance.ServerID = uint(fs.ServerId) + instance.Version = fs.Version + instance.ReadOnly = fs.ReadOnly + instance.LogBinEnabled = fs.LogBinEnabled + instance.BinlogFormat = fs.BinlogFormat + instance.LogReplicationUpdatesEnabled = fs.LogReplicaUpdates + instance.VersionComment = fs.VersionComment + + if instance.LogBinEnabled && fs.PrimaryStatus != nil { + binlogPos, err := getBinlogCoordinatesFromPositionString(fs.PrimaryStatus.FilePosition) instance.SelfBinlogCoordinates = binlogPos errorChan <- err } - instance.SemiSyncPrimaryEnabled = fullStatus.SemiSyncPrimaryEnabled - instance.SemiSyncReplicaEnabled = fullStatus.SemiSyncReplicaEnabled - instance.SemiSyncPrimaryWaitForReplicaCount = uint(fullStatus.SemiSyncWaitForReplicaCount) - instance.SemiSyncPrimaryTimeout = fullStatus.SemiSyncPrimaryTimeout + instance.SemiSyncPrimaryEnabled = fs.SemiSyncPrimaryEnabled + instance.SemiSyncReplicaEnabled = fs.SemiSyncReplicaEnabled + instance.SemiSyncPrimaryWaitForReplicaCount = uint(fs.SemiSyncWaitForReplicaCount) + instance.SemiSyncPrimaryTimeout = fs.SemiSyncPrimaryTimeout - instance.SemiSyncPrimaryClients = uint(fullStatus.SemiSyncPrimaryClients) - instance.SemiSyncPrimaryStatus = fullStatus.SemiSyncPrimaryStatus - instance.SemiSyncReplicaStatus = fullStatus.SemiSyncReplicaStatus + instance.SemiSyncPrimaryClients = uint(fs.SemiSyncPrimaryClients) + instance.SemiSyncPrimaryStatus = fs.SemiSyncPrimaryStatus + instance.SemiSyncReplicaStatus = fs.SemiSyncReplicaStatus if instance.IsOracleMySQL() || instance.IsPercona() { // Stuff only supported on Oracle / Percona MySQL // ... // @@gtid_mode only available in Oracle / Percona MySQL >= 5.6 - instance.GTIDMode = fullStatus.GtidMode - instance.ServerUUID = fullStatus.ServerUuid - if fullStatus.PrimaryStatus != nil { - GtidExecutedPos, err := replication.DecodePosition(fullStatus.PrimaryStatus.Position) + instance.GTIDMode = fs.GtidMode + instance.ServerUUID = fs.ServerUuid + if fs.PrimaryStatus != nil { + GtidExecutedPos, err := replication.DecodePosition(fs.PrimaryStatus.Position) errorChan <- err if err == nil && GtidExecutedPos.GTIDSet != nil { instance.ExecutedGtidSet = GtidExecutedPos.GTIDSet.String() } } - GtidPurgedPos, err := replication.DecodePosition(fullStatus.GtidPurged) + GtidPurgedPos, err := replication.DecodePosition(fs.GtidPurged) errorChan <- err if err == nil && GtidPurgedPos.GTIDSet != nil { instance.GtidPurged = GtidPurgedPos.GTIDSet.String() } - instance.BinlogRowImage = fullStatus.BinlogRowImage + instance.BinlogRowImage = fs.BinlogRowImage if instance.GTIDMode != "" && instance.GTIDMode != "OFF" { instance.SupportsOracleGTID = true @@ -269,45 +269,45 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named instance.ReplicationIOThreadState = ReplicationThreadStateNoThread instance.ReplicationSQLThreadState = ReplicationThreadStateNoThread - if fullStatus.ReplicationStatus != nil { - instance.HasReplicationCredentials = fullStatus.ReplicationStatus.SourceUser != "" + if fs.ReplicationStatus != nil { + instance.HasReplicationCredentials = fs.ReplicationStatus.SourceUser != "" - instance.ReplicationIOThreadState = ReplicationThreadStateFromReplicationState(replication.ReplicationState(fullStatus.ReplicationStatus.IoState)) - instance.ReplicationSQLThreadState = ReplicationThreadStateFromReplicationState(replication.ReplicationState(fullStatus.ReplicationStatus.SqlState)) + instance.ReplicationIOThreadState = ReplicationThreadStateFromReplicationState(replication.ReplicationState(fs.ReplicationStatus.IoState)) + instance.ReplicationSQLThreadState = ReplicationThreadStateFromReplicationState(replication.ReplicationState(fs.ReplicationStatus.SqlState)) instance.ReplicationIOThreadRuning = instance.ReplicationIOThreadState.IsRunning() instance.ReplicationSQLThreadRuning = instance.ReplicationSQLThreadState.IsRunning() - binlogPos, err := getBinlogCoordinatesFromPositionString(fullStatus.ReplicationStatus.RelayLogSourceBinlogEquivalentPosition) + binlogPos, err := getBinlogCoordinatesFromPositionString(fs.ReplicationStatus.RelayLogSourceBinlogEquivalentPosition) instance.ReadBinlogCoordinates = binlogPos errorChan <- err - binlogPos, err = getBinlogCoordinatesFromPositionString(fullStatus.ReplicationStatus.FilePosition) + binlogPos, err = getBinlogCoordinatesFromPositionString(fs.ReplicationStatus.FilePosition) instance.ExecBinlogCoordinates = binlogPos errorChan <- err instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates() - binlogPos, err = getBinlogCoordinatesFromPositionString(fullStatus.ReplicationStatus.RelayLogFilePosition) + binlogPos, err = getBinlogCoordinatesFromPositionString(fs.ReplicationStatus.RelayLogFilePosition) instance.RelaylogCoordinates = binlogPos instance.RelaylogCoordinates.Type = RelayLog errorChan <- err - instance.LastSQLError = emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(fullStatus.ReplicationStatus.LastSqlError), "") - instance.LastIOError = emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(fullStatus.ReplicationStatus.LastIoError), "") + instance.LastSQLError = emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(fs.ReplicationStatus.LastSqlError), "") + instance.LastIOError = emptyQuotesRegexp.ReplaceAllString(strconv.QuoteToASCII(fs.ReplicationStatus.LastIoError), "") - instance.SQLDelay = fullStatus.ReplicationStatus.SqlDelay - instance.UsingOracleGTID = fullStatus.ReplicationStatus.AutoPosition - instance.UsingMariaDBGTID = fullStatus.ReplicationStatus.UsingGtid - instance.SourceUUID = fullStatus.ReplicationStatus.SourceUuid - instance.HasReplicationFilters = fullStatus.ReplicationStatus.HasReplicationFilters + instance.SQLDelay = fs.ReplicationStatus.SqlDelay + instance.UsingOracleGTID = fs.ReplicationStatus.AutoPosition + instance.UsingMariaDBGTID = fs.ReplicationStatus.UsingGtid + instance.SourceUUID = fs.ReplicationStatus.SourceUuid + instance.HasReplicationFilters = fs.ReplicationStatus.HasReplicationFilters - instance.SourceHost = fullStatus.ReplicationStatus.SourceHost - instance.SourcePort = int(fullStatus.ReplicationStatus.SourcePort) + instance.SourceHost = fs.ReplicationStatus.SourceHost + instance.SourcePort = int(fs.ReplicationStatus.SourcePort) - if fullStatus.ReplicationStatus.ReplicationLagUnknown { + if fs.ReplicationStatus.ReplicationLagUnknown { instance.SecondsBehindPrimary.Valid = false } else { instance.SecondsBehindPrimary.Valid = true - instance.SecondsBehindPrimary.Int64 = int64(fullStatus.ReplicationStatus.ReplicationLagSeconds) + instance.SecondsBehindPrimary.Int64 = int64(fs.ReplicationStatus.ReplicationLagSeconds) } if instance.SecondsBehindPrimary.Valid && instance.SecondsBehindPrimary.Int64 < 0 { log.Warningf("Alias: %+v, instance.SecondsBehindPrimary < 0 [%+v], correcting to 0", tabletAlias, instance.SecondsBehindPrimary.Int64) @@ -316,7 +316,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named // And until told otherwise: instance.ReplicationLagSeconds = instance.SecondsBehindPrimary - instance.AllowTLS = fullStatus.ReplicationStatus.SslAllowed + instance.AllowTLS = fs.ReplicationStatus.SslAllowed } instanceFound = true diff --git a/go/vt/vtorc/inst/tablet_dao.go b/go/vt/vtorc/inst/tablet_dao.go index 3ee49a75781..af304292a70 100644 --- a/go/vt/vtorc/inst/tablet_dao.go +++ b/go/vt/vtorc/inst/tablet_dao.go @@ -35,29 +35,20 @@ import ( // ErrTabletAliasNil is a fixed error message. var ErrTabletAliasNil = errors.New("tablet alias is nil") +var tmc tmclient.TabletManagerClient -// ResetReplicationParameters resets the replication parameters on the given tablet. -func ResetReplicationParameters(tabletAlias string) error { - tablet, err := ReadTablet(tabletAlias) - if err != nil { - return err - } - tmc := tmclient.NewTabletManagerClient() - tmcCtx, tmcCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer tmcCancel() - if err := tmc.ResetReplicationParameters(tmcCtx, tablet); err != nil { - return err - } - return nil +// InitializeTMC initializes the tablet manager client to use for all VTOrc RPC calls. +func InitializeTMC() tmclient.TabletManagerClient { + tmc = tmclient.NewTabletManagerClient() + return tmc } -// FullStatus gets the full status of the MySQL running in vttablet. -func FullStatus(tabletAlias string) (*replicationdatapb.FullStatus, error) { +// fullStatus gets the full status of the MySQL running in vttablet. +func fullStatus(tabletAlias string) (*replicationdatapb.FullStatus, error) { tablet, err := ReadTablet(tabletAlias) if err != nil { return nil, err } - tmc := tmclient.NewTabletManagerClient() tmcCtx, tmcCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) defer tmcCancel() return tmc.FullStatus(tmcCtx, tablet) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index f08ab2c9c15..cda35b8091c 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -66,7 +66,7 @@ func RegisterFlags(fs *pflag.FlagSet) { // channel for polling. func OpenTabletDiscovery() <-chan time.Time { ts = topo.Open() - tmc = tmclient.NewTabletManagerClient() + tmc = inst.InitializeTMC() // Clear existing cache and perform a new refresh. if _, err := db.ExecVTOrc("delete from vitess_tablet"); err != nil { log.Error(err) @@ -302,6 +302,11 @@ func changeTabletType(ctx context.Context, tablet *topodatapb.Tablet, tabletType return tmc.ChangeType(ctx, tablet, tabletType, semiSync) } +// resetReplicationParameters resets the replication parameters on the given tablet. +func resetReplicationParameters(ctx context.Context, tablet *topodatapb.Tablet) error { + return tmc.ResetReplicationParameters(ctx, tablet) +} + // setReplicationSource calls the said RPC with the parameters provided func setReplicationSource(ctx context.Context, replica *topodatapb.Tablet, primary *topodatapb.Tablet, semiSync bool) error { return tmc.SetReplicationSource(ctx, replica, primary.Alias, 0, "", true, semiSync) diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index e5168fea541..c1fc2c8f9fb 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -35,7 +35,6 @@ import ( "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/inst" "vitess.io/vitess/go/vt/vtorc/util" - "vitess.io/vitess/go/vt/vttablet/tmclient" ) type RecoveryType string @@ -210,12 +209,15 @@ func recoverPrimaryHasPrimary(ctx context.Context, analysisEntry *inst.Replicati _ = resolveRecovery(topologyRecovery, nil) }() - // Reset replication on current primary. - err = inst.ResetReplicationParameters(analysisEntry.AnalyzedInstanceAlias) + // Read the tablet information from the database to find the shard and keyspace of the tablet + analyzedTablet, err := inst.ReadTablet(analysisEntry.AnalyzedInstanceAlias) if err != nil { - return false, topologyRecovery, err + return false, nil, err } - return true, topologyRecovery, nil + + // Reset replication on current primary. + err = resetReplicationParameters(ctx, analyzedTablet) + return true, topologyRecovery, err } // runEmergencyReparentOp runs a recovery for which we have to run ERS. Here waitForAllTablets is a boolean telling ERS whether it should wait for all the tablets @@ -244,7 +246,7 @@ func runEmergencyReparentOp(ctx context.Context, analysisEntry *inst.Replication _ = resolveRecovery(topologyRecovery, promotedReplica) }() - ev, err := reparentutil.NewEmergencyReparenter(ts, tmclient.NewTabletManagerClient(), logutil.NewCallbackLogger(func(event *logutilpb.Event) { + ev, err := reparentutil.NewEmergencyReparenter(ts, tmc, logutil.NewCallbackLogger(func(event *logutilpb.Event) { level := event.GetLevel() value := event.GetValue() // we only log the warnings and errors explicitly, everything gets logged as an information message anyways in auditing topology recovery @@ -836,7 +838,7 @@ func electNewPrimary(ctx context.Context, analysisEntry *inst.ReplicationAnalysi } _ = AuditTopologyRecovery(topologyRecovery, "starting PlannedReparentShard for electing new primary.") - ev, err := reparentutil.NewPlannedReparenter(ts, tmclient.NewTabletManagerClient(), logutil.NewCallbackLogger(func(event *logutilpb.Event) { + ev, err := reparentutil.NewPlannedReparenter(ts, tmc, logutil.NewCallbackLogger(func(event *logutilpb.Event) { level := event.GetLevel() value := event.GetValue() // we only log the warnings and errors explicitly, everything gets logged as an information message anyways in auditing topology recovery diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index 089b7c014dd..69c80932657 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -55,7 +55,7 @@ var ( ) func registerFlags(fs *pflag.FlagSet) { - fs.IntVar(&concurrency, "tablet_manager_grpc_concurrency", concurrency, "concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler)") + fs.IntVar(&concurrency, "tablet_manager_grpc_concurrency", concurrency, "concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus)") fs.StringVar(&cert, "tablet_manager_grpc_cert", cert, "the cert to use to connect") fs.StringVar(&key, "tablet_manager_grpc_key", key, "the key to use to connect") fs.StringVar(&ca, "tablet_manager_grpc_ca", ca, "the server ca to use to validate servers when connecting") @@ -94,8 +94,8 @@ type tmc struct { // grpcClient implements both dialer and poolDialer. type grpcClient struct { - // This cache of connections is to maximize QPS for ExecuteFetchAs{Dba,App} and - // CheckThrottler. Note we'll keep the clients open and close them upon Close() only. + // This cache of connections is to maximize QPS for ExecuteFetchAs{Dba,App}, + // CheckThrottler and FullStatus. Note we'll keep the clients open and close them upon Close() only. // But that's OK because usually the tasks that use them are one-purpose only. // The map is protected by the mutex. mu sync.Mutex @@ -115,7 +115,7 @@ type poolDialer interface { // // Connections are produced by the dialer implementation, which is either the // grpcClient implementation, which reuses connections only for ExecuteFetchAs{Dba,App} -// and CheckThrottler, otherwise making single-purpose connections that are closed +// CheckThrottler, and FullStatus, otherwise making single-purpose connections that are closed // after use. // // In order to more efficiently use the underlying tcp connections, you can @@ -569,12 +569,28 @@ func (client *Client) ReplicationStatus(ctx context.Context, tablet *topodatapb. } // FullStatus is part of the tmclient.TabletManagerClient interface. +// It always tries to use a cached client via the dialer pool as this is +// called very frequently from VTOrc, and the overhead of creating a new gRPC connection/channel +// and dialing the other tablet every time is not practical. func (client *Client) FullStatus(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.FullStatus, error) { - c, closer, err := client.dialer.dial(ctx, tablet) - if err != nil { - return nil, err + var c tabletmanagerservicepb.TabletManagerClient + var err error + if poolDialer, ok := client.dialer.(poolDialer); ok { + c, err = poolDialer.dialPool(ctx, tablet) + if err != nil { + return nil, err + } } - defer closer.Close() + + if c == nil { + var closer io.Closer + c, closer, err = client.dialer.dial(ctx, tablet) + if err != nil { + return nil, err + } + defer closer.Close() + } + response, err := c.FullStatus(ctx, &tabletmanagerdatapb.FullStatusRequest{}) if err != nil { return nil, err