From 36f01defd14f5862e08e02b87fbe69f8d2e81309 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Tue, 6 Aug 2024 18:42:30 +0200 Subject: [PATCH] fix near archive queries issues. --- protocol/rpcconsumer/rpcconsumer_server.go | 23 +++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 1ebfdf379a..edc3834475 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -376,6 +376,25 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveH if rpccs.debugRelays { utils.LavaFormatDebug("Relay initiated with the following timeout schedule", utils.LogAttr("processingTimeout", processingTimeout), utils.LogAttr("newRelayTimeout", relayTimeout)) } + + apiName := chainMessage.GetApi().Name + resetUsedOnce := true + if apiName == "tx" || apiName == "chunk" { + relayTimeout = time.Millisecond * 500 + } + + setArchiveOnSpecialApi := func() { + if apiName == "tx" || apiName == "chunk" { + archiveExtensionArray := []string{"archive"} + chainMessage.OverrideExtensions(archiveExtensionArray, rpccs.chainParser.ExtensionsParser()) + relayRequestData.Extensions = archiveExtensionArray + if resetUsedOnce { + resetUsedOnce = false + relayProcessor.usedProviders = lavasession.NewUsedProviders(directiveHeaders) + } + } + } + // create the processing timeout prior to entering the method so it wont reset every time processingCtx, processingCtxCancel := context.WithTimeout(ctx, processingTimeout) defer processingCtxCancel() @@ -387,6 +406,7 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveH if relayProcessor.HasRequiredNodeResults() { gotResults <- true } else { + setArchiveOnSpecialApi() gotResults <- false } } @@ -435,6 +455,7 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveH case <-startNewBatchTicker.C: // only trigger another batch for non BestResult relays or if we didn't pass the retry limit. if relayProcessor.ShouldRetry(numberOfRetriesLaunched) { + setArchiveOnSpecialApi() // limit the number of retries called from the new batch ticker flow. // if we pass the limit we just wait for the relays we sent to return. err := rpccs.sendRelayToProvider(processingCtx, chainMessage, relayRequestData, dappID, consumerIp, relayProcessor, nil) @@ -800,7 +821,7 @@ func (rpccs *RPCConsumerServer) relayInner(ctx context.Context, singleConsumerSe providerPublicAddress := relayResult.ProviderInfo.ProviderAddress relayRequest := relayResult.Request if rpccs.debugRelays { - utils.LavaFormatDebug("Sending relay", utils.LogAttr("timeout", relayTimeout), utils.LogAttr("requestedBlock", relayRequest.RelayData.RequestBlock), utils.LogAttr("GUID", ctx), utils.LogAttr("provider", relayRequest.RelaySession.Provider)) + utils.LavaFormatDebug("Sending relay", utils.LogAttr("timeout", relayTimeout), utils.LogAttr("extensions", relayResult.Request.RelayData.Extensions), utils.LogAttr("requestedBlock", relayRequest.RelayData.RequestBlock), utils.LogAttr("GUID", ctx), utils.LogAttr("provider", relayRequest.RelaySession.Provider)) } callRelay := func() (reply *pairingtypes.RelayReply, relayLatency time.Duration, err error, backoff bool) { relaySentTime := time.Now()