Skip to content

Commit

Permalink
tech debt 4
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet committed Nov 24, 2024
1 parent 7bc4fb7 commit fe3c8bd
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 9 deletions.
22 changes: 18 additions & 4 deletions protocol/rpcconsumer/relay_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,22 @@ type RelayState struct {
lock sync.RWMutex
}

func GetEmptyRelayState(ctx context.Context, protocolMessage chainlib.ProtocolMessage) *RelayState {
archiveStatus := &ArchiveStatus{}
archiveStatus.isEarliestUsed.Store(true)
return &RelayState{
ctx: ctx,
protocolMessage: protocolMessage,
archiveStatus: archiveStatus,
}
}

func NewRelayState(ctx context.Context, protocolMessage chainlib.ProtocolMessage, stateNumber int, cache RetryHashCacheInf, relayParser RelayParserInf, archiveStatus *ArchiveStatus) *RelayState {
relayRequestData := protocolMessage.RelayPrivateData()
if archiveStatus == nil {
utils.LavaFormatError("misuse detected archiveStatus is nil", nil, utils.Attribute{Key: "protocolMessage.GetApi", Value: protocolMessage.GetApi()})
archiveStatus = &ArchiveStatus{}
}
rs := &RelayState{
ctx: ctx,
protocolMessage: protocolMessage,
Expand All @@ -76,21 +90,21 @@ func (rs *RelayState) CheckIsArchive(relayRequestData *pairingtypes.RelayPrivate
}

func (rs *RelayState) GetIsEarliestUsed() bool {
if rs == nil {
if rs == nil || rs.archiveStatus == nil {
return true
}
return rs.archiveStatus.isEarliestUsed.Load()
}

func (rs *RelayState) SetIsEarliestUsed() {
if rs == nil {
if rs == nil || rs.archiveStatus == nil {
return
}
rs.archiveStatus.isEarliestUsed.Store(true)
}

func (rs *RelayState) SetIsArchive(isArchive bool) {
if rs == nil {
if rs == nil || rs.archiveStatus == nil {
return
}
rs.archiveStatus.isArchive.Store(isArchive)
Expand Down Expand Up @@ -122,7 +136,7 @@ func (rs *RelayState) SetProtocolMessage(protocolMessage chainlib.ProtocolMessag
}

func (rs *RelayState) upgradeToArchiveIfNeeded(numberOfRetriesLaunched int, numberOfNodeErrors uint64) {
if rs == nil || numberOfNodeErrors == 0 {
if rs == nil || rs.archiveStatus == nil || numberOfNodeErrors == 0 {
return
}
hashes := rs.GetProtocolMessage().GetRequestedBlocksHashes()
Expand Down
10 changes: 5 additions & 5 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,11 @@ func (rpccs *RPCConsumerServer) sendRelayWithRetries(ctx context.Context, retrie
usedProvidersResets++
relayProcessor.GetUsedProviders().ClearUnwanted()
}
err = rpccs.sendRelayToProvider(ctx, protocolMessage, nil, relayProcessor, nil)
err = rpccs.sendRelayToProvider(ctx, GetEmptyRelayState(ctx, protocolMessage), relayProcessor, nil)
if lavasession.PairingListEmptyError.Is(err) {
// we don't have pairings anymore, could be related to unwanted providers
relayProcessor.GetUsedProviders().ClearUnwanted()
err = rpccs.sendRelayToProvider(ctx, protocolMessage, nil, relayProcessor, nil)
err = rpccs.sendRelayToProvider(ctx, GetEmptyRelayState(ctx, protocolMessage), relayProcessor, nil)
}
if err != nil {
utils.LavaFormatError("[-] failed sending init relay", err, []utils.Attribute{{Key: "chainID", Value: rpccs.listenEndpoint.ChainID}, {Key: "APIInterface", Value: rpccs.listenEndpoint.ApiInterface}, {Key: "relayProcessor", Value: relayProcessor}}...)
Expand Down Expand Up @@ -445,7 +445,7 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, protocolMe
if task.IsDone() {
return relayProcessor, task.err
}
err := rpccs.sendRelayToProvider(ctx, task.relayState.GetProtocolMessage(), task.relayState, relayProcessor, task.analytics)
err := rpccs.sendRelayToProvider(ctx, task.relayState, relayProcessor, task.analytics)
relayProcessor.UpdateBatch(err)
}

Expand Down Expand Up @@ -561,7 +561,6 @@ func (rpccs *RPCConsumerServer) newBlocksHashesToHeightsSliceFromFinalizationCon

func (rpccs *RPCConsumerServer) sendRelayToProvider(
ctx context.Context,
protocolMessage chainlib.ProtocolMessage,
relayState *RelayState,
relayProcessor *RelayProcessor,
analytics *metrics.RelayMetrics,
Expand All @@ -577,6 +576,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider(
// if necessary send detection tx for hashes consensus mismatch
// handle QoS updates
// in case connection totally fails, update unresponsive providers in ConsumerSessionManager
protocolMessage := relayState.GetProtocolMessage()
userData := protocolMessage.GetUserData()
var sharedStateId string // defaults to "", if shared state is disabled then no shared state will be used.
if rpccs.sharedState {
Expand Down Expand Up @@ -1282,7 +1282,7 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context
rpccs.relayRetriesManager,
NewRelayStateMachine(ctx, relayProcessor.usedProviders, rpccs, dataReliabilityProtocolMessage, nil, rpccs.debugRelays, rpccs.rpcConsumerLogs),
)
err := rpccs.sendRelayToProvider(ctx, dataReliabilityProtocolMessage, nil, relayProcessorDataReliability, nil)
err := rpccs.sendRelayToProvider(ctx, GetEmptyRelayState(ctx, dataReliabilityProtocolMessage), relayProcessorDataReliability, nil)
if err != nil {
return utils.LavaFormatWarning("failed data reliability relay to provider", err, utils.LogAttr("relayProcessorDataReliability", relayProcessorDataReliability))
}
Expand Down

0 comments on commit fe3c8bd

Please sign in to comment.