Skip to content

Commit

Permalink
allowing to retry disabled endpoints when trying to reconnect providers.
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet committed Apr 8, 2024
1 parent 47e1273 commit 550a318
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 8 deletions.
10 changes: 5 additions & 5 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (csm *ConsumerSessionManager) probeProviders(ctx context.Context, pairingLi
go func(consumerSessionsWithProvider *ConsumerSessionsWithProvider) {
// Call the probeProvider function and defer the WaitGroup Done call
defer wg.Done()
latency, providerAddress, err := csm.probeProvider(ctx, consumerSessionsWithProvider, epoch)
latency, providerAddress, err := csm.probeProvider(ctx, consumerSessionsWithProvider, epoch, false)
success := err == nil // if failure then regard it in availability
csm.providerOptimizer.AppendProbeRelayData(providerAddress, latency, success)
}(consumerSessionWithProvider)
Expand All @@ -194,9 +194,9 @@ func (csm *ConsumerSessionManager) probeProviders(ctx context.Context, pairingLi
}

// this code needs to be thread safe
func (csm *ConsumerSessionManager) probeProvider(ctx context.Context, consumerSessionsWithProvider *ConsumerSessionsWithProvider, epoch uint64) (latency time.Duration, providerAddress string, err error) {
func (csm *ConsumerSessionManager) probeProvider(ctx context.Context, consumerSessionsWithProvider *ConsumerSessionsWithProvider, epoch uint64, tryReconnectToDisabledEndpoints bool) (latency time.Duration, providerAddress string, err error) {
// TODO: fetch all endpoints not just one
connected, endpoint, providerAddress, err := consumerSessionsWithProvider.fetchEndpointConnectionFromConsumerSessionWithProvider(ctx)
connected, endpoint, providerAddress, err := consumerSessionsWithProvider.fetchEndpointConnectionFromConsumerSessionWithProvider(ctx, tryReconnectToDisabledEndpoints)
if err != nil || !connected {
if AllProviderEndpointsDisabledError.Is(err) {
csm.blockProvider(providerAddress, true, epoch, MaxConsecutiveConnectionAttempts, 0, csm.GenerateReconnectCallback(consumerSessionsWithProvider)) // reporting and blocking provider this epoch
Expand Down Expand Up @@ -374,7 +374,7 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS
sessionEpoch := sessionWithProvider.CurrentEpoch

// Get a valid Endpoint from the provider chosen
connected, endpoint, _, err := consumerSessionsWithProvider.fetchEndpointConnectionFromConsumerSessionWithProvider(ctx)
connected, endpoint, _, err := consumerSessionsWithProvider.fetchEndpointConnectionFromConsumerSessionWithProvider(ctx, false)
if err != nil {
// verify err is AllProviderEndpointsDisabled and report.
if AllProviderEndpointsDisabledError.Is(err) {
Expand Down Expand Up @@ -937,7 +937,7 @@ func (csm *ConsumerSessionManager) OnSessionDoneIncreaseCUOnly(consumerSession *

func (csm *ConsumerSessionManager) GenerateReconnectCallback(consumerSessionsWithProvider *ConsumerSessionsWithProvider) func() error {
return func() error {
_, _, err := csm.probeProvider(context.Background(), consumerSessionsWithProvider, csm.atomicReadCurrentEpoch())
_, _, err := csm.probeProvider(context.Background(), consumerSessionsWithProvider, csm.atomicReadCurrentEpoch(), true)
return err
}
}
Expand Down
6 changes: 4 additions & 2 deletions protocol/lavasession/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,15 @@ func (cswp *ConsumerSessionsWithProvider) GetConsumerSessionInstanceFromEndpoint

// fetching an endpoint from a ConsumerSessionWithProvider and establishing a connection,
// can fail without an error if trying to connect once to each endpoint but none of them are active.
func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSessionWithProvider(ctx context.Context) (connected bool, endpointPtr *Endpoint, providerAddress string, err error) {
func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSessionWithProvider(ctx context.Context, retryDisabledEndpoints bool) (connected bool, endpointPtr *Endpoint, providerAddress string, err error) {
getConnectionFromConsumerSessionsWithProvider := func(ctx context.Context) (connected bool, endpointPtr *Endpoint, allDisabled bool) {
cswp.Lock.Lock()
defer cswp.Lock.Unlock()

for idx, endpoint := range cswp.Endpoints {
if !endpoint.Enabled {
// retryDisabledEndpoints will attempt to reconnect to the provider even though we have disabled the endpoint
// this is used on a routine that tries to reconnect to a provider that has been disabled due to being unable to connect to it.
if !retryDisabledEndpoints && !endpoint.Enabled {
continue
}
connectEndpoint := func(cswp *ConsumerSessionsWithProvider, ctx context.Context, endpoint *Endpoint) (connected_ bool) {
Expand Down
20 changes: 19 additions & 1 deletion protocol/lavasession/reported_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

const (
ReconnectCandidateTime = 2 * time.Minute
debugReportedProviders = false
)

type ReportedProviders struct {
Expand All @@ -29,6 +30,9 @@ type ReportedProviderEntry struct {
func (rp *ReportedProviders) Reset() {
rp.lock.Lock()
defer rp.lock.Unlock()
if debugReportedProviders {
utils.LavaFormatDebug("[debugReportedProviders] Reset called")
}
rp.addedToPurgeAndReport = make(map[string]*ReportedProviderEntry, 0)
}

Expand Down Expand Up @@ -61,12 +65,18 @@ func (rp *ReportedProviders) ReportProvider(address string, errors uint64, disco
if reconnectCB != nil {
rp.addedToPurgeAndReport[address].reconnectCB = reconnectCB
}
if debugReportedProviders {
utils.LavaFormatDebug("[debugReportedProviders] adding provider to reported providers", utils.LogAttr("rp.addedToPurgeAndReport", rp.addedToPurgeAndReport))
}
}

// will be called after a disconnected provider got a valid connection
func (rp *ReportedProviders) RemoveReport(address string) {
rp.lock.Lock()
defer rp.lock.Unlock()
if debugReportedProviders {
utils.LavaFormatDebug("[debugReportedProviders] Removing Report", utils.LogAttr("address", address))
}
delete(rp.addedToPurgeAndReport, address)
}

Expand All @@ -86,6 +96,9 @@ func (rp *ReportedProviders) ReconnectCandidates() []reconnectCandidate {
rp.lock.RLock()
defer rp.lock.RUnlock()
candidates := []reconnectCandidate{}
if debugReportedProviders {
utils.LavaFormatDebug("[debugReportedProviders] Reconnect candidates", utils.LogAttr("candidate list", rp.addedToPurgeAndReport))
}
for address, entry := range rp.addedToPurgeAndReport {
// only reconnect providers that didn't have consecutive errors
if entry.Errors == 0 && time.Since(entry.addedTime) > ReconnectCandidateTime {
Expand All @@ -103,6 +116,9 @@ func (rp *ReportedProviders) ReconnectProviders() {
candidates := rp.ReconnectCandidates()
for _, candidate := range candidates {
if candidate.reconnectCB != nil {
if debugReportedProviders {
utils.LavaFormatDebug("[debugReportedProviders] Trying to reconnect candidate", utils.LogAttr("candidate", candidate.address))
}
err := candidate.reconnectCB()
if err == nil {
rp.RemoveReport(candidate.address)
Expand All @@ -118,7 +134,9 @@ func (rp *ReportedProviders) AppendReport(report metrics.ReportsRequest) {
if rp == nil || rp.reporter == nil {
return
}
utils.LavaFormatDebug("sending report on provider", utils.LogAttr("provider", report.Provider))
if debugReportedProviders {
utils.LavaFormatDebug("[debugReportedProviders] Sending report on provider", utils.LogAttr("provider", report.Provider))
}
rp.reporter.AppendReport(report)
}

Expand Down

0 comments on commit 550a318

Please sign in to comment.