diff --git a/server/service.go b/server/service.go index 788f585a..6c931dd2 100644 --- a/server/service.go +++ b/server/service.go @@ -270,11 +270,11 @@ func (m *BoostService) handleRegisterValidator(w http.ResponseWriter, req *http. log := log.WithField("url", url) _, err := SendHTTPRequest(context.Background(), m.httpClientRegVal, http.MethodPost, url, ua, headers, payload, nil) - relayRespCh <- err if err != nil { log.WithError(err).Warn("error calling registerValidator on relay") - return } + relayRespCh <- err + }(relay) } @@ -547,13 +547,17 @@ func (m *BoostService) processCapellaPayload(w http.ResponseWriter, req *http.Re // Prepare the request context, which will be cancelled after the first successful response from a relay requestCtx, requestCtxCancel := context.WithCancel(context.Background()) defer requestCtxCancel() + resultCh := make(chan *builderApi.VersionedSubmitBlindedBlockResponse, len(m.relays)) + var received atomic.Bool + go func() { + // Make sure we receive a response within the timeout + time.Sleep(m.httpClientGetPayload.Timeout * time.Duration(m.requestMaxRetries+1)) + resultCh <- nil + }() - var wg sync.WaitGroup for _, relay := range m.relays { - wg.Add(1) go func(relay types.RelayEntry) { - defer wg.Done() url := relay.GetURI(params.PathGetPayload) log := log.WithField("url", url) log.Debug("calling getPayload") @@ -598,14 +602,16 @@ func (m *BoostService) processCapellaPayload(w http.ResponseWriter, req *http.Re // Received successful response. Try to cancel other requests and return immediately requestCtxCancel() - resultCh <- responsePayload - log.Info("received payload from relay") + if received.CompareAndSwap(false, true) { + resultCh <- responsePayload + log.Info("received payload from relay") + } else { + log.Trace("Discarding response, already received a correct response") + } }(relay) } - // Wait for all requests to complete... - wg.Wait() - close(resultCh) + // Wait for the first request to complete result := <-resultCh // If no payload has been received from relay, log loudly about withholding! @@ -667,17 +673,20 @@ func (m *BoostService) processDenebPayload(w http.ResponseWriter, req *http.Requ } // Prepare for requests - var wg sync.WaitGroup resultCh := make(chan *builderApi.VersionedSubmitBlindedBlockResponse, len(m.relays)) + var received atomic.Bool + go func() { + // Make sure we receive a response within the timeout + time.Sleep(m.httpClientGetPayload.Timeout) + resultCh <- nil + }() // Prepare the request context, which will be cancelled after the first successful response from a relay requestCtx, requestCtxCancel := context.WithCancel(context.Background()) defer requestCtxCancel() for _, relay := range m.relays { - wg.Add(1) go func(relay types.RelayEntry) { - defer wg.Done() url := relay.GetURI(params.PathGetPayload) log := log.WithField("url", url) log.Debug("calling getPayload") @@ -733,14 +742,16 @@ func (m *BoostService) processDenebPayload(w http.ResponseWriter, req *http.Requ } requestCtxCancel() - resultCh <- responsePayload - log.Info("received payload from relay") + if received.CompareAndSwap(false, true) { + resultCh <- responsePayload + log.Info("received payload from relay") + } else { + log.Trace("Discarding response, already received a correct response") + } }(relay) } - // Wait for all requests to complete... - wg.Wait() - close(resultCh) + // Wait for the first request to complete result := <-resultCh // If no payload has been received from relay, log loudly about withholding!