From 48187b3ae4f95d2cdff31166bc268f1f2aa26707 Mon Sep 17 00:00:00 2001 From: Elad Gildnur <6321801+shleikes@users.noreply.github.com> Date: Sun, 1 Dec 2024 13:47:57 +0200 Subject: [PATCH 1/5] fix: PRT - Add generic parser for Near tx (#1809) * Add generic parser for near tx * Update the parsers --- cookbook/specs/near.json | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/cookbook/specs/near.json b/cookbook/specs/near.json index 05604a66ce..121b4b4e05 100644 --- a/cookbook/specs/near.json +++ b/cookbook/specs/near.json @@ -339,7 +339,17 @@ "hanging_api": true }, "extra_compute_units": 0, - "timeout_ms": 10000 + "timeout_ms": 10000, + "parsers": [ + { + "parse_path": ".params.tx_hash", + "parse_type": "BLOCK_HASH" + }, + { + "parse_path": ".params.[0]", + "parse_type": "BLOCK_HASH" + } + ] }, { "name": "EXPERIMENTAL_tx_status", @@ -357,7 +367,17 @@ "subscription": false, "stateful": 0 }, - "extra_compute_units": 0 + "extra_compute_units": 0, + "parsers": [ + { + "parse_path": ".params.tx_hash", + "parse_type": "BLOCK_HASH" + }, + { + "parse_path": ".params.[0]", + "parse_type": "BLOCK_HASH" + } + ] }, { "name": "EXPERIMENTAL_receipt", From 5a4300753572ad98b9da47a2e5749bdbd30f9fa0 Mon Sep 17 00:00:00 2001 From: Elad Gildnur <6321801+shleikes@users.noreply.github.com> Date: Sun, 1 Dec 2024 14:37:55 +0200 Subject: [PATCH 2/5] Update the starknet spec (#1807) --- cookbook/specs/starknet.json | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/cookbook/specs/starknet.json b/cookbook/specs/starknet.json index 651bf0ade8..9228e7b7ff 100644 --- a/cookbook/specs/starknet.json +++ b/cookbook/specs/starknet.json @@ -11,12 +11,12 @@ "data_reliability_enabled": true, "block_distance_for_finalized_data": 6, "blocks_in_finalization_proof": 3, - "average_block_time": 12000, + "average_block_time": 30000, "allowed_block_lag_for_qos_sync": 2, "shares": 1, "min_stake_provider": { "denom": "ulava", - "amount": "47500000000" + "amount": "5000000000" }, "api_collections": [ { @@ -36,7 +36,7 @@ ], "parser_func": "DEFAULT" }, - "compute_units": 10, + "compute_units": 20, "enabled": true, "category": { "deterministic": true, @@ -93,7 +93,7 @@ ], "parser_func": "DEFAULT" }, - "compute_units": 10, + "compute_units": 20, "enabled": true, "category": { "deterministic": true, @@ -156,7 +156,7 @@ "parser_func": "PARSE_DICTIONARY_OR_ORDERED", "default_value": "latest" }, - "compute_units": 10, + "compute_units": 20, "enabled": true, "category": { "deterministic": true, @@ -445,7 +445,7 @@ ], "parser_func": "DEFAULT" }, - "compute_units": 10, + "compute_units": 20, "enabled": true, "category": { "deterministic": false, @@ -598,6 +598,17 @@ "expected_value": "*" } ] + }, + { + "name": "pruning", + "parse_directive": { + "function_tag": "GET_BLOCK_BY_NUM" + }, + "values": [ + { + "expected_value": "1" + } + ] } ] }, @@ -969,11 +980,11 @@ "data_reliability_enabled": true, "block_distance_for_finalized_data": 1, "blocks_in_finalization_proof": 3, - "average_block_time": 1800000, + "average_block_time": 32000, "allowed_block_lag_for_qos_sync": 1, "min_stake_provider": { "denom": "ulava", - "amount": "47500000000" + "amount": "5000000000" }, "api_collections": [ { From 8d6763d905e596602bc9a1142fe00e16c6544f67 Mon Sep 17 00:00:00 2001 From: Ran Mishael <106548467+ranlavanet@users.noreply.github.com> Date: Sun, 1 Dec 2024 14:39:18 +0100 Subject: [PATCH 3/5] feat: PRT - archive retry attempt on second relay regardless of node error (#1810) * feat: PRT - archive retry attempt on second relay regardless of node error * increase protocol version --- protocol/rpcconsumer/relay_state.go | 2 +- x/protocol/types/params.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/protocol/rpcconsumer/relay_state.go b/protocol/rpcconsumer/relay_state.go index 6dcbfc7751..37264979ea 100644 --- a/protocol/rpcconsumer/relay_state.go +++ b/protocol/rpcconsumer/relay_state.go @@ -150,7 +150,7 @@ func (rs *RelayState) SetProtocolMessage(protocolMessage chainlib.ProtocolMessag } func (rs *RelayState) upgradeToArchiveIfNeeded(numberOfRetriesLaunched int, numberOfNodeErrors uint64) { - if rs == nil || rs.archiveStatus == nil || numberOfNodeErrors == 0 { + if rs == nil || rs.archiveStatus == nil { return } hashes := rs.GetProtocolMessage().GetRequestedBlocksHashes() diff --git a/x/protocol/types/params.go b/x/protocol/types/params.go index d923b38f5f..2848f2aeb3 100644 --- a/x/protocol/types/params.go +++ b/x/protocol/types/params.go @@ -12,7 +12,7 @@ import ( var _ paramtypes.ParamSet = (*Params)(nil) const ( - TARGET_VERSION = "4.1.3" + TARGET_VERSION = "4.1.4" MIN_VERSION = "3.1.0" ) From ad245175e2ea2b845859dd781ee9c037811a485a Mon Sep 17 00:00:00 2001 From: Elad Gildnur <6321801+shleikes@users.noreply.github.com> Date: Sun, 1 Dec 2024 15:40:31 +0200 Subject: [PATCH 4/5] fix: PRT - Fix near "chunk" and "block" generic parsers (#1811) * Fix near generic parser * Add the array params back --- cookbook/specs/near.json | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/cookbook/specs/near.json b/cookbook/specs/near.json index 121b4b4e05..e0c46c2cf1 100644 --- a/cookbook/specs/near.json +++ b/cookbook/specs/near.json @@ -95,6 +95,10 @@ "rule": "=final || =optimistic", "parse_type": "DEFAULT_VALUE" }, + { + "parse_path": ".params.block_id", + "parse_type": "BLOCK_HASH" + }, { "parse_path": ".params.[0]", "parse_type": "BLOCK_HASH" @@ -141,6 +145,10 @@ }, "extra_compute_units": 0, "parsers": [ + { + "parse_path": ".params.chunk_id", + "parse_type": "BLOCK_HASH" + }, { "parse_path": ".params.[0]", "parse_type": "BLOCK_HASH" From fcfbef41259e59dbef1beae65c91978a2cd0d6f9 Mon Sep 17 00:00:00 2001 From: Omer <100387053+omerlavanet@users.noreply.github.com> Date: Sun, 1 Dec 2024 16:23:02 +0200 Subject: [PATCH 5/5] chore: refactor state query access (#1766) * refactor state query access * remove direct usage of client.Context to allow the rewiring of lava over lava * refactor rpcconsumer, allow creating a server with a function * lint * added custom lava transport --- .../pkg/state/lavavisor_state_tracker.go | 5 +- protocol/badgegenerator/tracker.go | 4 +- protocol/badgeserver/tracker.go | 6 +- protocol/rpcconsumer/custom_transport.go | 23 ++ protocol/rpcconsumer/rpcconsumer.go | 267 ++++++++++-------- protocol/rpcprovider/rpcprovider.go | 7 +- .../statetracker/consumer_state_tracker.go | 23 +- protocol/statetracker/events.go | 21 +- .../statetracker/provider_state_tracker.go | 31 +- protocol/statetracker/state_tracker.go | 9 +- .../statetracker/updaters/event_tracker.go | 22 +- .../updaters/provider_freeze_jail_updater.go | 26 +- .../provider_freeze_jail_updater_mocks.go | 4 + protocol/statetracker/updaters/state_query.go | 109 ++++--- 14 files changed, 320 insertions(+), 237 deletions(-) create mode 100644 protocol/rpcconsumer/custom_transport.go diff --git a/ecosystem/lavavisor/pkg/state/lavavisor_state_tracker.go b/ecosystem/lavavisor/pkg/state/lavavisor_state_tracker.go index b460d47b5e..fb1ca9d511 100644 --- a/ecosystem/lavavisor/pkg/state/lavavisor_state_tracker.go +++ b/ecosystem/lavavisor/pkg/state/lavavisor_state_tracker.go @@ -24,7 +24,8 @@ type LavaVisorStateTracker struct { func NewLavaVisorStateTracker(ctx context.Context, txFactory tx.Factory, clientCtx client.Context, chainFetcher chaintracker.ChainFetcher) (lvst *LavaVisorStateTracker, err error) { // validate chainId - status, err := clientCtx.Client.Status(ctx) + stateQuery := updaters.NewStateQuery(ctx, updaters.NewStateQueryAccessInst(clientCtx)) + status, err := stateQuery.Status(ctx) if err != nil { return nil, utils.LavaFormatError("[Lavavisor] failed getting status", err) } @@ -36,7 +37,7 @@ func NewLavaVisorStateTracker(ctx context.Context, txFactory tx.Factory, clientC if err != nil { utils.LavaFormatFatal("chain is missing Lava spec, cant initialize lavavisor", err) } - lst := &LavaVisorStateTracker{stateQuery: updaters.NewStateQuery(ctx, clientCtx), averageBlockTime: time.Duration(specResponse.Spec.AverageBlockTime) * time.Millisecond} + lst := &LavaVisorStateTracker{stateQuery: stateQuery, averageBlockTime: time.Duration(specResponse.Spec.AverageBlockTime) * time.Millisecond} return lst, nil } diff --git a/protocol/badgegenerator/tracker.go b/protocol/badgegenerator/tracker.go index aa82c5f90c..c7822bae83 100644 --- a/protocol/badgegenerator/tracker.go +++ b/protocol/badgegenerator/tracker.go @@ -28,11 +28,11 @@ func NewBadgeStateTracker(ctx context.Context, clientCtx cosmosclient.Context, c emergencyTracker, blockNotFoundCallback := statetracker.NewEmergencyTracker(nil) txFactory := tx.Factory{} txFactory = txFactory.WithChainID(chainId) - stateTrackerBase, err := statetracker.NewStateTracker(ctx, txFactory, clientCtx, chainFetcher, blockNotFoundCallback) + sq := updaters.NewStateQuery(ctx, updaters.NewStateQueryAccessInst(clientCtx)) + stateTrackerBase, err := statetracker.NewStateTracker(ctx, txFactory, sq, chainFetcher, blockNotFoundCallback) if err != nil { return nil, err } - sq := updaters.NewStateQuery(ctx, clientCtx) esq := updaters.NewEpochStateQuery(sq) pst := &BadgeStateTracker{StateTracker: stateTrackerBase, stateQuery: esq, ConsumerEmergencyTrackerInf: emergencyTracker} diff --git a/protocol/badgeserver/tracker.go b/protocol/badgeserver/tracker.go index 9b13ddee42..c54326a488 100644 --- a/protocol/badgeserver/tracker.go +++ b/protocol/badgeserver/tracker.go @@ -28,12 +28,12 @@ func NewBadgeStateTracker(ctx context.Context, clientCtx cosmosclient.Context, c emergencyTracker, blockNotFoundCallback := statetracker.NewEmergencyTracker(nil) txFactory := tx.Factory{} txFactory = txFactory.WithChainID(chainId) - stateTrackerBase, err := statetracker.NewStateTracker(ctx, txFactory, clientCtx, chainFetcher, blockNotFoundCallback) + stateQuery := updaters.NewStateQuery(ctx, updaters.NewStateQueryAccessInst(clientCtx)) + stateTrackerBase, err := statetracker.NewStateTracker(ctx, txFactory, stateQuery, chainFetcher, blockNotFoundCallback) if err != nil { return nil, err } - stateTracker := updaters.NewStateQuery(ctx, clientCtx) - epochStateTracker := updaters.NewEpochStateQuery(stateTracker) + epochStateTracker := updaters.NewEpochStateQuery(stateQuery) badgeStateTracker := &BadgeStateTracker{ StateTracker: stateTrackerBase, diff --git a/protocol/rpcconsumer/custom_transport.go b/protocol/rpcconsumer/custom_transport.go new file mode 100644 index 0000000000..aef36b3396 --- /dev/null +++ b/protocol/rpcconsumer/custom_transport.go @@ -0,0 +1,23 @@ +package rpcconsumer + +import ( + "net/http" +) + +type CustomLavaTransport struct { + transport http.RoundTripper +} + +func NewCustomLavaTransport(httpTransport http.RoundTripper) *CustomLavaTransport { + return &CustomLavaTransport{transport: httpTransport} +} + +func (c *CustomLavaTransport) RoundTrip(req *http.Request) (*http.Response, error) { + // Custom logic before the request + + // Delegate to the underlying RoundTripper (usually http.Transport) + resp, err := c.transport.RoundTrip(req) + + // Custom logic after the request + return resp, err +} diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index 99dcb15a01..62448fe557 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -11,11 +11,14 @@ import ( "sync" "time" + rpchttp "github.com/cometbft/cometbft/rpc/client/http" + jsonrpcclient "github.com/cometbft/cometbft/rpc/jsonrpc/client" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/config" "github.com/cosmos/cosmos-sdk/client/flags" "github.com/cosmos/cosmos-sdk/client/tx" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/decred/dcrd/dcrec/secp256k1/v4" "github.com/lavanet/lava/v4/app" "github.com/lavanet/lava/v4/protocol/chainlib" "github.com/lavanet/lava/v4/protocol/common" @@ -151,7 +154,16 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt if err != nil { utils.LavaFormatFatal("failed creating RPCConsumer logs", err) } + consumerMetricsManager.SetVersion(upgrade.GetCurrentVersion().ConsumerVersion) + httpClient, err := jsonrpcclient.DefaultHTTPClient(options.clientCtx.NodeURI) + if err == nil { + httpClient.Transport = NewCustomLavaTransport(httpClient.Transport) + client, err := rpchttp.NewWithClient(options.clientCtx.NodeURI, "/websocket", httpClient) + if err == nil { + options.clientCtx = options.clientCtx.WithClient(client) + } + } // spawn up ConsumerStateTracker lavaChainFetcher := chainlib.NewLavaChainFetcher(ctx, options.clientCtx) @@ -161,6 +173,8 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt } rpcc.consumerStateTracker = consumerStateTracker + lavaChainFetcher.FetchLatestBlockNum(ctx) + lavaChainID := options.clientCtx.ChainID keyName, err := sigs.GetKeyName(options.clientCtx) if err != nil { @@ -213,119 +227,11 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt for _, rpcEndpoint := range options.rpcEndpoints { go func(rpcEndpoint *lavasession.RPCEndpoint) error { defer wg.Done() - chainParser, err := chainlib.NewChainParser(rpcEndpoint.ApiInterface) - if err != nil { - err = utils.LavaFormatError("failed creating chain parser", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint}) - errCh <- err - return err - } - chainID := rpcEndpoint.ChainID - // create policyUpdaters per chain - newPolicyUpdater := updaters.NewPolicyUpdater(chainID, consumerStateTracker, consumerAddr.String(), chainParser, *rpcEndpoint) - policyUpdater, ok, err := policyUpdaters.LoadOrStore(chainID, newPolicyUpdater) - if err != nil { - errCh <- err - return utils.LavaFormatError("failed loading or storing policy updater", err, utils.LogAttr("endpoint", rpcEndpoint)) - } - if ok { - err := policyUpdater.AddPolicySetter(chainParser, *rpcEndpoint) - if err != nil { - errCh <- err - return utils.LavaFormatError("failed adding policy setter", err) - } - } - - err = statetracker.RegisterForSpecUpdatesOrSetStaticSpec(ctx, chainParser, options.cmdFlags.StaticSpecPath, *rpcEndpoint, rpcc.consumerStateTracker) - if err != nil { - err = utils.LavaFormatError("failed registering for spec updates", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint}) - errCh <- err - return err - } - - _, averageBlockTime, _, _ := chainParser.ChainBlockStats() - var optimizer *provideroptimizer.ProviderOptimizer - var consumerConsistency *ConsumerConsistency - var finalizationConsensus *finalizationconsensus.FinalizationConsensus - getOrCreateChainAssets := func() error { - // this is locked so we don't race optimizers creation - chainMutexes[chainID].Lock() - defer chainMutexes[chainID].Unlock() - var loaded bool - var err error - - baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better - - // Create / Use existing optimizer - newOptimizer := provideroptimizer.NewProviderOptimizer(options.strategy, averageBlockTime, baseLatency, options.maxConcurrentProviders, consumerOptimizerQoSClient, chainID) - optimizer, loaded, err = optimizers.LoadOrStore(chainID, newOptimizer) - if err != nil { - return utils.LavaFormatError("failed loading optimizer", err, utils.LogAttr("endpoint", rpcEndpoint.Key())) - } - - if !loaded { - // if this is a new optimizer, register it in the consumerOptimizerQoSClient - consumerOptimizerQoSClient.RegisterOptimizer(optimizer, chainID) - } - - // Create / Use existing ConsumerConsistency - newConsumerConsistency := NewConsumerConsistency(chainID) - consumerConsistency, _, err = consumerConsistencies.LoadOrStore(chainID, newConsumerConsistency) - if err != nil { - return utils.LavaFormatError("failed loading consumer consistency", err, utils.LogAttr("endpoint", rpcEndpoint.Key())) - } - - // Create / Use existing FinalizationConsensus - newFinalizationConsensus := finalizationconsensus.NewFinalizationConsensus(rpcEndpoint.ChainID) - finalizationConsensus, loaded, err = finalizationConsensuses.LoadOrStore(chainID, newFinalizationConsensus) - if err != nil { - return utils.LavaFormatError("failed loading finalization consensus", err, utils.LogAttr("endpoint", rpcEndpoint.Key())) - } - if !loaded { // when creating new finalization consensus instance we need to register it to updates - consumerStateTracker.RegisterFinalizationConsensusForUpdates(ctx, finalizationConsensus) - } - return nil - } - err = getOrCreateChainAssets() - if err != nil { - errCh <- err - return err - } - - if finalizationConsensus == nil || optimizer == nil || consumerConsistency == nil { - err = utils.LavaFormatError("failed getting assets, found a nil", nil, utils.Attribute{Key: "endpoint", Value: rpcEndpoint.Key()}) - errCh <- err - return err - } - - // Create active subscription provider storage for each unique chain - activeSubscriptionProvidersStorage := lavasession.NewActiveSubscriptionProvidersStorage() - consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, consumerMetricsManager, consumerReportsManager, consumerAddr.String(), activeSubscriptionProvidersStorage) - // Register For Updates - rpcc.consumerStateTracker.RegisterConsumerSessionManagerForPairingUpdates(ctx, consumerSessionManager, options.staticProvidersList) - - var relaysMonitor *metrics.RelaysMonitor - if options.cmdFlags.RelaysHealthEnableFlag { - relaysMonitor = metrics.NewRelaysMonitor(options.cmdFlags.RelaysHealthIntervalFlag, rpcEndpoint.ChainID, rpcEndpoint.ApiInterface) - relaysMonitorAggregator.RegisterRelaysMonitor(rpcEndpoint.String(), relaysMonitor) - } - - rpcConsumerServer := &RPCConsumerServer{} - - var consumerWsSubscriptionManager *chainlib.ConsumerWSSubscriptionManager - var specMethodType string - if rpcEndpoint.ApiInterface == spectypes.APIInterfaceJsonRPC { - specMethodType = http.MethodPost - } - consumerWsSubscriptionManager = chainlib.NewConsumerWSSubscriptionManager(consumerSessionManager, rpcConsumerServer, options.refererData, specMethodType, chainParser, activeSubscriptionProvidersStorage, consumerMetricsManager) - - utils.LavaFormatInfo("RPCConsumer Listening", utils.Attribute{Key: "endpoints", Value: rpcEndpoint.String()}) - err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, rpcc.consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, options.requiredResponses, privKey, lavaChainID, options.cache, rpcConsumerMetrics, consumerAddr, consumerConsistency, relaysMonitor, options.cmdFlags, options.stateShare, options.refererData, consumerReportsManager, consumerWsSubscriptionManager) - if err != nil { - err = utils.LavaFormatError("failed serving rpc requests", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint}) - errCh <- err - return err - } - return nil + _, err := rpcc.CreateConsumerEndpoint(ctx, rpcEndpoint, errCh, consumerAddr, consumerStateTracker, + policyUpdaters, optimizers, consumerConsistencies, finalizationConsensuses, chainMutexes, + options, privKey, lavaChainID, rpcConsumerMetrics, consumerReportsManager, consumerOptimizerQoSClient, + consumerMetricsManager, relaysMonitorAggregator) + return err }(rpcEndpoint) } @@ -361,6 +267,141 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt return nil } +func (rpcc *RPCConsumer) CreateConsumerEndpoint( + ctx context.Context, + rpcEndpoint *lavasession.RPCEndpoint, + errCh chan error, + consumerAddr sdk.AccAddress, + consumerStateTracker *statetracker.ConsumerStateTracker, + policyUpdaters *common.SafeSyncMap[string, *updaters.PolicyUpdater], + optimizers *common.SafeSyncMap[string, *provideroptimizer.ProviderOptimizer], + consumerConsistencies *common.SafeSyncMap[string, *ConsumerConsistency], + finalizationConsensuses *common.SafeSyncMap[string, *finalizationconsensus.FinalizationConsensus], + chainMutexes map[string]*sync.Mutex, + options *rpcConsumerStartOptions, + privKey *secp256k1.PrivateKey, + lavaChainID string, + rpcConsumerMetrics *metrics.RPCConsumerLogs, + consumerReportsManager *metrics.ConsumerReportsClient, + consumerOptimizerQoSClient *metrics.ConsumerOptimizerQoSClient, + consumerMetricsManager *metrics.ConsumerMetricsManager, + relaysMonitorAggregator *metrics.RelaysMonitorAggregator, +) (*RPCConsumerServer, error) { + chainParser, err := chainlib.NewChainParser(rpcEndpoint.ApiInterface) + if err != nil { + err = utils.LavaFormatError("failed creating chain parser", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint}) + errCh <- err + return nil, err + } + chainID := rpcEndpoint.ChainID + // create policyUpdaters per chain + newPolicyUpdater := updaters.NewPolicyUpdater(chainID, consumerStateTracker, consumerAddr.String(), chainParser, *rpcEndpoint) + policyUpdater, ok, err := policyUpdaters.LoadOrStore(chainID, newPolicyUpdater) + if err != nil { + errCh <- err + return nil, utils.LavaFormatError("failed loading or storing policy updater", err, utils.LogAttr("endpoint", rpcEndpoint)) + } + if ok { + err := policyUpdater.AddPolicySetter(chainParser, *rpcEndpoint) + if err != nil { + errCh <- err + return nil, utils.LavaFormatError("failed adding policy setter", err) + } + } + + err = statetracker.RegisterForSpecUpdatesOrSetStaticSpec(ctx, chainParser, options.cmdFlags.StaticSpecPath, *rpcEndpoint, rpcc.consumerStateTracker) + if err != nil { + err = utils.LavaFormatError("failed registering for spec updates", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint}) + errCh <- err + return nil, err + } + + _, averageBlockTime, _, _ := chainParser.ChainBlockStats() + var optimizer *provideroptimizer.ProviderOptimizer + var consumerConsistency *ConsumerConsistency + var finalizationConsensus *finalizationconsensus.FinalizationConsensus + getOrCreateChainAssets := func() error { + // this is locked so we don't race optimizers creation + chainMutexes[chainID].Lock() + defer chainMutexes[chainID].Unlock() + var loaded bool + var err error + + baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better + + // Create / Use existing optimizer + newOptimizer := provideroptimizer.NewProviderOptimizer(options.strategy, averageBlockTime, baseLatency, options.maxConcurrentProviders, consumerOptimizerQoSClient, chainID) + optimizer, loaded, err = optimizers.LoadOrStore(chainID, newOptimizer) + if err != nil { + return utils.LavaFormatError("failed loading optimizer", err, utils.LogAttr("endpoint", rpcEndpoint.Key())) + } + + if !loaded { + // if this is a new optimizer, register it in the consumerOptimizerQoSClient + consumerOptimizerQoSClient.RegisterOptimizer(optimizer, chainID) + } + + // Create / Use existing ConsumerConsistency + newConsumerConsistency := NewConsumerConsistency(chainID) + consumerConsistency, _, err = consumerConsistencies.LoadOrStore(chainID, newConsumerConsistency) + if err != nil { + return utils.LavaFormatError("failed loading consumer consistency", err, utils.LogAttr("endpoint", rpcEndpoint.Key())) + } + + // Create / Use existing FinalizationConsensus + newFinalizationConsensus := finalizationconsensus.NewFinalizationConsensus(rpcEndpoint.ChainID) + finalizationConsensus, loaded, err = finalizationConsensuses.LoadOrStore(chainID, newFinalizationConsensus) + if err != nil { + return utils.LavaFormatError("failed loading finalization consensus", err, utils.LogAttr("endpoint", rpcEndpoint.Key())) + } + if !loaded { // when creating new finalization consensus instance we need to register it to updates + consumerStateTracker.RegisterFinalizationConsensusForUpdates(ctx, finalizationConsensus) + } + return nil + } + err = getOrCreateChainAssets() + if err != nil { + errCh <- err + return nil, err + } + + if finalizationConsensus == nil || optimizer == nil || consumerConsistency == nil { + err = utils.LavaFormatError("failed getting assets, found a nil", nil, utils.Attribute{Key: "endpoint", Value: rpcEndpoint.Key()}) + errCh <- err + return nil, err + } + + // Create active subscription provider storage for each unique chain + activeSubscriptionProvidersStorage := lavasession.NewActiveSubscriptionProvidersStorage() + consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, consumerMetricsManager, consumerReportsManager, consumerAddr.String(), activeSubscriptionProvidersStorage) + // Register For Updates + rpcc.consumerStateTracker.RegisterConsumerSessionManagerForPairingUpdates(ctx, consumerSessionManager, options.staticProvidersList) + + var relaysMonitor *metrics.RelaysMonitor + if options.cmdFlags.RelaysHealthEnableFlag { + relaysMonitor = metrics.NewRelaysMonitor(options.cmdFlags.RelaysHealthIntervalFlag, rpcEndpoint.ChainID, rpcEndpoint.ApiInterface) + relaysMonitorAggregator.RegisterRelaysMonitor(rpcEndpoint.String(), relaysMonitor) + } + + rpcConsumerServer := &RPCConsumerServer{} + + var consumerWsSubscriptionManager *chainlib.ConsumerWSSubscriptionManager + var specMethodType string + if rpcEndpoint.ApiInterface == spectypes.APIInterfaceJsonRPC { + specMethodType = http.MethodPost + } + consumerWsSubscriptionManager = chainlib.NewConsumerWSSubscriptionManager(consumerSessionManager, rpcConsumerServer, options.refererData, specMethodType, chainParser, activeSubscriptionProvidersStorage, consumerMetricsManager) + + utils.LavaFormatInfo("RPCConsumer Listening", utils.Attribute{Key: "endpoints", Value: rpcEndpoint.String()}) + err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, rpcc.consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, options.requiredResponses, privKey, lavaChainID, options.cache, rpcConsumerMetrics, consumerAddr, consumerConsistency, relaysMonitor, options.cmdFlags, options.stateShare, options.refererData, consumerReportsManager, consumerWsSubscriptionManager) + if err != nil { + err = utils.LavaFormatError("failed serving rpc requests", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint}) + errCh <- err + return nil, err + } + return rpcConsumerServer, nil +} + func ParseEndpoints(viper_endpoints *viper.Viper, geolocation uint64) (endpoints []*lavasession.RPCEndpoint, err error) { err = viper_endpoints.UnmarshalKey(common.EndpointsConfigName, &endpoints) if err != nil { diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 1e83f7eea0..794b64f295 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -219,7 +219,7 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) { utils.LavaFormatInfo("RPCProvider pubkey: " + rpcp.addr.String()) - rpcp.createAndRegisterFreezeUpdatersByOptions(ctx, options.clientCtx, rpcp.addr.String()) + rpcp.createAndRegisterFreezeUpdatersByOptions(ctx, providerStateTracker.StateQuery.StateQuery, rpcp.addr.String()) utils.LavaFormatInfo("RPCProvider setting up endpoints", utils.Attribute{Key: "count", Value: strconv.Itoa(len(options.rpcProviderEndpoints))}) @@ -282,9 +282,8 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) { return nil } -func (rpcp *RPCProvider) createAndRegisterFreezeUpdatersByOptions(ctx context.Context, clientCtx client.Context, publicAddress string) { - queryClient := pairingtypes.NewQueryClient(clientCtx) - freezeJailUpdater := updaters.NewProviderFreezeJailUpdater(queryClient, publicAddress, rpcp.providerMetricsManager) +func (rpcp *RPCProvider) createAndRegisterFreezeUpdatersByOptions(ctx context.Context, stateQuery *updaters.StateQuery, publicAddress string) { + freezeJailUpdater := updaters.NewProviderFreezeJailUpdater(stateQuery, publicAddress, rpcp.providerMetricsManager) rpcp.providerStateTracker.RegisterForEpochUpdates(ctx, freezeJailUpdater) } diff --git a/protocol/statetracker/consumer_state_tracker.go b/protocol/statetracker/consumer_state_tracker.go index 8ab2bd046e..ffa21cd6a7 100644 --- a/protocol/statetracker/consumer_state_tracker.go +++ b/protocol/statetracker/consumer_state_tracker.go @@ -25,7 +25,7 @@ type ConsumerTxSenderInf interface { // ConsumerStateTracker CSTis a class for tracking consumer data from the lava blockchain, such as epoch changes. // it allows also to query specific data form the blockchain and acts as a single place to send transactions type ConsumerStateTracker struct { - stateQuery *updaters.ConsumerStateQuery + StateQuery *updaters.ConsumerStateQuery ConsumerTxSenderInf *StateTracker ConsumerEmergencyTrackerInf @@ -34,7 +34,8 @@ type ConsumerStateTracker struct { func NewConsumerStateTracker(ctx context.Context, txFactory tx.Factory, clientCtx client.Context, chainFetcher chaintracker.ChainFetcher, metrics *metrics.ConsumerMetricsManager, disableConflictTransactions bool) (ret *ConsumerStateTracker, err error) { emergencyTracker, blockNotFoundCallback := NewEmergencyTracker(metrics) - stateTrackerBase, err := NewStateTracker(ctx, txFactory, clientCtx, chainFetcher, blockNotFoundCallback) + stateQuery := updaters.NewConsumerStateQuery(ctx, clientCtx) + stateTrackerBase, err := NewStateTracker(ctx, txFactory, stateQuery.StateQuery, chainFetcher, blockNotFoundCallback) if err != nil { return nil, err } @@ -44,7 +45,7 @@ func NewConsumerStateTracker(ctx context.Context, txFactory tx.Factory, clientCt } cst := &ConsumerStateTracker{ StateTracker: stateTrackerBase, - stateQuery: updaters.NewConsumerStateQuery(ctx, clientCtx), + StateQuery: stateQuery, ConsumerTxSenderInf: txSender, ConsumerEmergencyTrackerInf: emergencyTracker, disableConflictTransactions: disableConflictTransactions, @@ -56,7 +57,7 @@ func NewConsumerStateTracker(ctx context.Context, txFactory tx.Factory, clientCt func (cst *ConsumerStateTracker) RegisterConsumerSessionManagerForPairingUpdates(ctx context.Context, consumerSessionManager *lavasession.ConsumerSessionManager, staticProvidersList []*lavasession.RPCProviderEndpoint) { // register this CSM to get the updated pairing list when a new epoch starts - pairingUpdater := updaters.NewPairingUpdater(cst.stateQuery, consumerSessionManager.RPCEndpoint().ChainID) + pairingUpdater := updaters.NewPairingUpdater(cst.StateQuery, consumerSessionManager.RPCEndpoint().ChainID) pairingUpdaterRaw := cst.StateTracker.RegisterForUpdates(ctx, pairingUpdater) pairingUpdater, ok := pairingUpdaterRaw.(*updaters.PairingUpdater) if !ok { @@ -81,7 +82,7 @@ func (cst *ConsumerStateTracker) RegisterConsumerSessionManagerForPairingUpdates } func (cst *ConsumerStateTracker) RegisterForPairingUpdates(ctx context.Context, pairingUpdatable updaters.PairingUpdatable, specId string) { - pairingUpdater := updaters.NewPairingUpdater(cst.stateQuery, specId) + pairingUpdater := updaters.NewPairingUpdater(cst.StateQuery, specId) pairingUpdaterRaw := cst.StateTracker.RegisterForUpdates(ctx, pairingUpdater) pairingUpdater, ok := pairingUpdaterRaw.(*updaters.PairingUpdater) if !ok { @@ -94,7 +95,7 @@ func (cst *ConsumerStateTracker) RegisterForPairingUpdates(ctx context.Context, } func (cst *ConsumerStateTracker) RegisterFinalizationConsensusForUpdates(ctx context.Context, finalizationConsensus *finalizationconsensus.FinalizationConsensus) { - finalizationConsensusUpdater := updaters.NewFinalizationConsensusUpdater(cst.stateQuery, finalizationConsensus.SpecId) + finalizationConsensusUpdater := updaters.NewFinalizationConsensusUpdater(cst.StateQuery, finalizationConsensus.SpecId) finalizationConsensusUpdaterRaw := cst.StateTracker.RegisterForUpdates(ctx, finalizationConsensusUpdater) finalizationConsensusUpdater, ok := finalizationConsensusUpdaterRaw.(*updaters.FinalizationConsensusUpdater) if !ok { @@ -120,7 +121,7 @@ func (cst *ConsumerStateTracker) TxConflictDetection(ctx context.Context, finali func (cst *ConsumerStateTracker) RegisterForSpecUpdates(ctx context.Context, specUpdatable updaters.SpecUpdatable, endpoint lavasession.RPCEndpoint) error { // register for spec updates sets spec and updates when a spec has been modified - specUpdater := updaters.NewSpecUpdater(endpoint.ChainID, cst.stateQuery, cst.EventTracker) + specUpdater := updaters.NewSpecUpdater(endpoint.ChainID, cst.StateQuery, cst.EventTracker) specUpdaterRaw := cst.StateTracker.RegisterForUpdates(ctx, specUpdater) specUpdater, ok := specUpdaterRaw.(*updaters.SpecUpdater) if !ok { @@ -130,11 +131,11 @@ func (cst *ConsumerStateTracker) RegisterForSpecUpdates(ctx context.Context, spe } func (cst *ConsumerStateTracker) GetConsumerPolicy(ctx context.Context, consumerAddress, chainID string) (*plantypes.Policy, error) { - return cst.stateQuery.GetEffectivePolicy(ctx, consumerAddress, chainID) + return cst.StateQuery.GetEffectivePolicy(ctx, consumerAddress, chainID) } func (cst *ConsumerStateTracker) RegisterForVersionUpdates(ctx context.Context, version *protocoltypes.Version, versionValidator updaters.VersionValidationInf) { - versionUpdater := updaters.NewVersionUpdater(cst.stateQuery, cst.EventTracker, version, versionValidator) + versionUpdater := updaters.NewVersionUpdater(cst.StateQuery, cst.EventTracker, version, versionValidator) versionUpdaterRaw := cst.StateTracker.RegisterForUpdates(ctx, versionUpdater) versionUpdater, ok := versionUpdaterRaw.(*updaters.VersionUpdater) if !ok { @@ -145,7 +146,7 @@ func (cst *ConsumerStateTracker) RegisterForVersionUpdates(ctx context.Context, func (cst *ConsumerStateTracker) RegisterForDowntimeParamsUpdates(ctx context.Context, downtimeParamsUpdatable updaters.DowntimeParamsUpdatable) error { // register for downtimeParams updates sets downtimeParams and updates when downtimeParams has been changed - downtimeParamsUpdater := updaters.NewDowntimeParamsUpdater(cst.stateQuery, cst.EventTracker) + downtimeParamsUpdater := updaters.NewDowntimeParamsUpdater(cst.StateQuery, cst.EventTracker) downtimeParamsUpdaterRaw := cst.StateTracker.RegisterForUpdates(ctx, downtimeParamsUpdater) downtimeParamsUpdater, ok := downtimeParamsUpdaterRaw.(*updaters.DowntimeParamsUpdater) if !ok { @@ -156,5 +157,5 @@ func (cst *ConsumerStateTracker) RegisterForDowntimeParamsUpdates(ctx context.Co } func (cst *ConsumerStateTracker) GetProtocolVersion(ctx context.Context) (*updaters.ProtocolVersionResponse, error) { - return cst.stateQuery.GetProtocolVersion(ctx) + return cst.StateQuery.GetProtocolVersion(ctx) } diff --git a/protocol/statetracker/events.go b/protocol/statetracker/events.go index 9b41f74842..1aa68f248a 100644 --- a/protocol/statetracker/events.go +++ b/protocol/statetracker/events.go @@ -65,11 +65,8 @@ func eventsLookup(ctx context.Context, clientCtx client.Context, blocks, fromBlo defer ticker.Stop() readEventsFromBlock := func(blockFrom int64, blockTo int64, hash string) { for block := blockFrom; block < blockTo; block++ { - brp, err := updaters.TryIntoTendermintRPC(clientCtx.Client) - if err != nil { - utils.LavaFormatFatal("invalid blockResults provider", err) - } - blockResults, err := brp.BlockResults(ctx, &block) + queryInst := updaters.NewStateQueryAccessInst(clientCtx) + blockResults, err := queryInst.BlockResults(ctx, &block) if err != nil { utils.LavaFormatError("invalid blockResults status", err) return @@ -275,14 +272,11 @@ func paymentsLookup(ctx context.Context, clientCtx client.Context, blockStart, b continue } utils.LavaFormatInfo("fetching block", utils.LogAttr("block", block)) - brp, err := updaters.TryIntoTendermintRPC(clientCtx.Client) - if err != nil { - utils.LavaFormatFatal("invalid blockResults provider", err) - } + queryInst := updaters.NewStateQueryAccessInst(clientCtx) var blockResults *coretypes.ResultBlockResults for retry := 0; retry < 3; retry++ { ctxWithTimeout, cancelContextWithTimeout := context.WithTimeout(ctx, time.Second*30) - blockResults, err = brp.BlockResults(ctxWithTimeout, &block) + blockResults, err = queryInst.BlockResults(ctxWithTimeout, &block) cancelContextWithTimeout() if err != nil { utils.LavaFormatWarning("@@@@ failed fetching block results will retry", err, utils.LogAttr("block_number", block)) @@ -660,10 +654,7 @@ func countTransactionsPerDay(ctx context.Context, clientCtx client.Context, bloc utils.LogAttr("starting_block", latestHeight-numberOfBlocksInADay), ) - tmClient, err := updaters.TryIntoTendermintRPC(clientCtx.Client) - if err != nil { - utils.LavaFormatFatal("invalid blockResults provider", err) - } + queryInst := updaters.NewStateQueryAccessInst(clientCtx) // i is days // j are blocks in that day // starting from current day and going backwards @@ -697,7 +688,7 @@ func countTransactionsPerDay(ctx context.Context, clientCtx client.Context, bloc defer wg.Done() ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - blockResults, err := tmClient.BlockResults(ctxWithTimeout, &k) + blockResults, err := queryInst.BlockResults(ctxWithTimeout, &k) if err != nil { utils.LavaFormatError("invalid blockResults status", err) return diff --git a/protocol/statetracker/provider_state_tracker.go b/protocol/statetracker/provider_state_tracker.go index 2dc0bbddcb..d6f5423b72 100644 --- a/protocol/statetracker/provider_state_tracker.go +++ b/protocol/statetracker/provider_state_tracker.go @@ -19,7 +19,7 @@ import ( // ProviderStateTracker PST is a class for tracking provider data from the lava blockchain, such as epoch changes. // it allows also to query specific data form the blockchain and acts as a single place to send transactions type ProviderStateTracker struct { - stateQuery *updaters.ProviderStateQuery + StateQuery *updaters.ProviderStateQuery txSender *ProviderTxSender *StateTracker *EmergencyTracker @@ -27,7 +27,8 @@ type ProviderStateTracker struct { func NewProviderStateTracker(ctx context.Context, txFactory tx.Factory, clientCtx client.Context, chainFetcher chaintracker.ChainFetcher, metrics *metrics.ProviderMetricsManager) (ret *ProviderStateTracker, err error) { emergencyTracker, blockNotFoundCallback := NewEmergencyTracker(metrics) - stateTrackerBase, err := NewStateTracker(ctx, txFactory, clientCtx, chainFetcher, blockNotFoundCallback) + stateQuery := updaters.NewProviderStateQuery(ctx, updaters.NewStateQueryAccessInst(clientCtx)) + stateTrackerBase, err := NewStateTracker(ctx, txFactory, stateQuery.StateQuery, chainFetcher, blockNotFoundCallback) if err != nil { return nil, err } @@ -37,7 +38,7 @@ func NewProviderStateTracker(ctx context.Context, txFactory tx.Factory, clientCt } pst := &ProviderStateTracker{ StateTracker: stateTrackerBase, - stateQuery: updaters.NewProviderStateQuery(ctx, clientCtx), + StateQuery: stateQuery, txSender: txSender, EmergencyTracker: emergencyTracker, } @@ -49,7 +50,7 @@ func NewProviderStateTracker(ctx context.Context, txFactory tx.Factory, clientCt } func (pst *ProviderStateTracker) RegisterForEpochUpdates(ctx context.Context, epochUpdatable updaters.EpochUpdatable) { - epochUpdater := updaters.NewEpochUpdater(&pst.stateQuery.EpochStateQuery) + epochUpdater := updaters.NewEpochUpdater(&pst.StateQuery.EpochStateQuery) epochUpdaterRaw := pst.StateTracker.RegisterForUpdates(ctx, epochUpdater) epochUpdater, ok := epochUpdaterRaw.(*updaters.EpochUpdater) if !ok { @@ -60,7 +61,7 @@ func (pst *ProviderStateTracker) RegisterForEpochUpdates(ctx context.Context, ep func (pst *ProviderStateTracker) RegisterForSpecUpdates(ctx context.Context, specUpdatable updaters.SpecUpdatable, endpoint lavasession.RPCEndpoint) error { // register for spec updates sets spec and updates when a spec has been modified - specUpdater := updaters.NewSpecUpdater(endpoint.ChainID, pst.stateQuery, pst.EventTracker) + specUpdater := updaters.NewSpecUpdater(endpoint.ChainID, pst.StateQuery, pst.EventTracker) specUpdaterRaw := pst.StateTracker.RegisterForUpdates(ctx, specUpdater) specUpdater, ok := specUpdaterRaw.(*updaters.SpecUpdater) if !ok { @@ -71,7 +72,7 @@ func (pst *ProviderStateTracker) RegisterForSpecUpdates(ctx context.Context, spe func (pst *ProviderStateTracker) RegisterForSpecVerifications(ctx context.Context, specVerifier updaters.SpecVerifier, chainId string) error { // register for spec verifications sets spec and verifies when a spec has been modified - specUpdater := updaters.NewSpecUpdater(chainId, pst.stateQuery, pst.EventTracker) + specUpdater := updaters.NewSpecUpdater(chainId, pst.StateQuery, pst.EventTracker) specUpdaterRaw := pst.StateTracker.RegisterForUpdates(ctx, specUpdater) specUpdater, ok := specUpdaterRaw.(*updaters.SpecUpdater) if !ok { @@ -81,7 +82,7 @@ func (pst *ProviderStateTracker) RegisterForSpecVerifications(ctx context.Contex } func (pst *ProviderStateTracker) RegisterForVersionUpdates(ctx context.Context, version *protocoltypes.Version, versionValidator updaters.VersionValidationInf) { - versionUpdater := updaters.NewVersionUpdater(pst.stateQuery, pst.EventTracker, version, versionValidator) + versionUpdater := updaters.NewVersionUpdater(pst.StateQuery, pst.EventTracker, version, versionValidator) versionUpdaterRaw := pst.StateTracker.RegisterForUpdates(ctx, versionUpdater) versionUpdater, ok := versionUpdaterRaw.(*updaters.VersionUpdater) if !ok { @@ -114,7 +115,7 @@ func (pst *ProviderStateTracker) RegisterPaymentUpdatableForPayments(ctx context func (pst *ProviderStateTracker) RegisterForDowntimeParamsUpdates(ctx context.Context, downtimeParamsUpdatable updaters.DowntimeParamsUpdatable) error { // register for downtimeParams updates sets downtimeParams and updates when downtimeParams has been changed - downtimeParamsUpdater := updaters.NewDowntimeParamsUpdater(pst.stateQuery, pst.EventTracker) + downtimeParamsUpdater := updaters.NewDowntimeParamsUpdater(pst.StateQuery, pst.EventTracker) downtimeParamsUpdaterRaw := pst.StateTracker.RegisterForUpdates(ctx, downtimeParamsUpdater) downtimeParamsUpdater, ok := downtimeParamsUpdaterRaw.(*updaters.DowntimeParamsUpdater) if !ok { @@ -141,31 +142,31 @@ func (pst *ProviderStateTracker) LatestBlock() int64 { } func (pst *ProviderStateTracker) GetMaxCuForUser(ctx context.Context, consumerAddress, chainID string, epoch uint64) (maxCu uint64, err error) { - return pst.stateQuery.GetMaxCuForUser(ctx, consumerAddress, chainID, epoch) + return pst.StateQuery.GetMaxCuForUser(ctx, consumerAddress, chainID, epoch) } func (pst *ProviderStateTracker) VerifyPairing(ctx context.Context, consumerAddress, providerAddress string, epoch uint64, chainID string) (valid bool, total int64, projectId string, err error) { - return pst.stateQuery.VerifyPairing(ctx, consumerAddress, providerAddress, epoch, chainID) + return pst.StateQuery.VerifyPairing(ctx, consumerAddress, providerAddress, epoch, chainID) } func (pst *ProviderStateTracker) GetEpochSize(ctx context.Context) (uint64, error) { - return pst.stateQuery.GetEpochSize(ctx) + return pst.StateQuery.GetEpochSize(ctx) } func (pst *ProviderStateTracker) EarliestBlockInMemory(ctx context.Context) (uint64, error) { - return pst.stateQuery.EarliestBlockInMemory(ctx) + return pst.StateQuery.EarliestBlockInMemory(ctx) } func (pst *ProviderStateTracker) GetRecommendedEpochNumToCollectPayment(ctx context.Context) (uint64, error) { - return pst.stateQuery.GetRecommendedEpochNumToCollectPayment(ctx) + return pst.StateQuery.GetRecommendedEpochNumToCollectPayment(ctx) } func (pst *ProviderStateTracker) GetEpochSizeMultipliedByRecommendedEpochNumToCollectPayment(ctx context.Context) (uint64, error) { - return pst.stateQuery.GetEpochSizeMultipliedByRecommendedEpochNumToCollectPayment(ctx) + return pst.StateQuery.GetEpochSizeMultipliedByRecommendedEpochNumToCollectPayment(ctx) } func (pst *ProviderStateTracker) GetProtocolVersion(ctx context.Context) (*updaters.ProtocolVersionResponse, error) { - return pst.stateQuery.GetProtocolVersion(ctx) + return pst.StateQuery.GetProtocolVersion(ctx) } func (pst *ProviderStateTracker) GetAverageBlockTime() time.Duration { diff --git a/protocol/statetracker/state_tracker.go b/protocol/statetracker/state_tracker.go index c50639cb34..a87ecbf8b7 100644 --- a/protocol/statetracker/state_tracker.go +++ b/protocol/statetracker/state_tracker.go @@ -5,7 +5,6 @@ import ( "sync" "time" - "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/tx" "github.com/lavanet/lava/v4/protocol/chainlib" "github.com/lavanet/lava/v4/protocol/chaintracker" @@ -93,9 +92,9 @@ func GetLavaSpecWithRetry(ctx context.Context, specQueryClient spectypes.QueryCl return specResponse, err } -func NewStateTracker(ctx context.Context, txFactory tx.Factory, clientCtx client.Context, chainFetcher chaintracker.ChainFetcher, blockNotFoundCallback func(latestBlockTime time.Time)) (ret *StateTracker, err error) { +func NewStateTracker(ctx context.Context, txFactory tx.Factory, stateQuery *updaters.StateQuery, chainFetcher chaintracker.ChainFetcher, blockNotFoundCallback func(latestBlockTime time.Time)) (ret *StateTracker, err error) { // validate chainId - status, err := clientCtx.Client.Status(ctx) + status, err := stateQuery.Status(ctx) if err != nil { return nil, utils.LavaFormatError("failed getting status", err) } @@ -103,7 +102,7 @@ func NewStateTracker(ctx context.Context, txFactory tx.Factory, clientCtx client return nil, utils.LavaFormatError("Chain ID mismatch", nil, utils.Attribute{Key: "--chain-id", Value: txFactory.ChainID()}, utils.Attribute{Key: "Node chainID", Value: status.NodeInfo.Network}) } - eventTracker := &updaters.EventTracker{ClientCtx: clientCtx} + eventTracker := &updaters.EventTracker{StateQuery: stateQuery} for i := 0; i < updaters.BlockResultRetry; i++ { err = eventTracker.UpdateBlockResults(0) if err == nil { @@ -114,7 +113,7 @@ func NewStateTracker(ctx context.Context, txFactory tx.Factory, clientCtx client if err != nil { return nil, utils.LavaFormatError("failed getting blockResults after retries", err) } - specQueryClient := spectypes.NewQueryClient(clientCtx) + specQueryClient := stateQuery.GetSpecQueryClient() specResponse, err := GetLavaSpecWithRetry(ctx, specQueryClient) if err != nil { utils.LavaFormatFatal("failed querying lava spec for state tracker", err) diff --git a/protocol/statetracker/updaters/event_tracker.go b/protocol/statetracker/updaters/event_tracker.go index 6f442a83af..ec93865abe 100644 --- a/protocol/statetracker/updaters/event_tracker.go +++ b/protocol/statetracker/updaters/event_tracker.go @@ -2,14 +2,12 @@ package updaters import ( "context" - "fmt" "sync" "time" "golang.org/x/exp/slices" ctypes "github.com/cometbft/cometbft/rpc/core/types" - "github.com/cosmos/cosmos-sdk/client" "github.com/lavanet/lava/v4/protocol/rpcprovider/reliabilitymanager" "github.com/lavanet/lava/v4/protocol/rpcprovider/rewardserver" "github.com/lavanet/lava/v4/utils" @@ -25,8 +23,8 @@ const ( var TimeOutForFetchingLavaBlocks = time.Second * 5 type EventTracker struct { - lock sync.RWMutex - ClientCtx client.Context + lock sync.RWMutex + *StateQuery blockResults *ctypes.ResultBlockResults latestUpdatedBlock int64 } @@ -38,7 +36,7 @@ func (et *EventTracker) UpdateBlockResults(latestBlock int64) (err error) { var res *ctypes.ResultStatus for i := 0; i < 3; i++ { timeoutCtx, cancel := context.WithTimeout(ctx, TimeOutForFetchingLavaBlocks) - res, err = et.ClientCtx.Client.Status(timeoutCtx) + res, err = et.StateQuery.Status(timeoutCtx) cancel() if err == nil { break @@ -50,14 +48,10 @@ func (et *EventTracker) UpdateBlockResults(latestBlock int64) (err error) { latestBlock = res.SyncInfo.LatestBlockHeight } - brp, err := TryIntoTendermintRPC(et.ClientCtx.Client) - if err != nil { - return utils.LavaFormatError("failed converting client.TendermintRPC to tendermintRPC", err) - } var blockResults *ctypes.ResultBlockResults for i := 0; i < BlockResultRetry; i++ { timeoutCtx, cancel := context.WithTimeout(ctx, TimeOutForFetchingLavaBlocks) - blockResults, err = brp.BlockResults(timeoutCtx, &latestBlock) + blockResults, err = et.StateQuery.BlockResults(timeoutCtx, &latestBlock) cancel() if err == nil { break @@ -216,11 +210,3 @@ type tendermintRPC interface { height *int64, ) (*ctypes.ResultConsensusParams, error) } - -func TryIntoTendermintRPC(cl client.TendermintRPC) (tendermintRPC, error) { - brp, ok := cl.(tendermintRPC) - if !ok { - return nil, fmt.Errorf("client does not implement tendermintRPC: %T", cl) - } - return brp, nil -} diff --git a/protocol/statetracker/updaters/provider_freeze_jail_updater.go b/protocol/statetracker/updaters/provider_freeze_jail_updater.go index 69f24ad1ab..0be8a93e8c 100644 --- a/protocol/statetracker/updaters/provider_freeze_jail_updater.go +++ b/protocol/statetracker/updaters/provider_freeze_jail_updater.go @@ -6,15 +6,15 @@ import ( "github.com/lavanet/lava/v4/utils" pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" - "google.golang.org/grpc" + grpc "google.golang.org/grpc" ) const ( CallbackKeyForFreezeUpdate = "freeze-update" ) -type ProviderPairingStatusStateQueryInf interface { - Provider(ctx context.Context, in *pairingtypes.QueryProviderRequest, opts ...grpc.CallOption) (*pairingtypes.QueryProviderResponse, error) +type ProviderQueryGetter interface { + GetPairingQueryClient() pairingtypes.QueryClient } type ProviderMetricsManagerInf interface { @@ -30,27 +30,31 @@ const ( FROZEN ) +type ProviderPairingStatusStateQueryInf interface { + Provider(ctx context.Context, in *pairingtypes.QueryProviderRequest, opts ...grpc.CallOption) (*pairingtypes.QueryProviderResponse, error) +} + type ProviderFreezeJailUpdater struct { - pairingQueryClient ProviderPairingStatusStateQueryInf - metricsManager ProviderMetricsManagerInf - publicAddress string + querier ProviderPairingStatusStateQueryInf + metricsManager ProviderMetricsManagerInf + publicAddress string } func NewProviderFreezeJailUpdater( - pairingQueryClient ProviderPairingStatusStateQueryInf, + querier ProviderPairingStatusStateQueryInf, publicAddress string, metricsManager ProviderMetricsManagerInf, ) *ProviderFreezeJailUpdater { return &ProviderFreezeJailUpdater{ - pairingQueryClient: pairingQueryClient, - publicAddress: publicAddress, - metricsManager: metricsManager, + querier: querier, + publicAddress: publicAddress, + metricsManager: metricsManager, } } func (pfu *ProviderFreezeJailUpdater) UpdateEpoch(epoch uint64) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - response, err := pfu.pairingQueryClient.Provider(ctx, &pairingtypes.QueryProviderRequest{Address: pfu.publicAddress}) + response, err := pfu.querier.Provider(ctx, &pairingtypes.QueryProviderRequest{Address: pfu.publicAddress}) cancel() if err != nil { diff --git a/protocol/statetracker/updaters/provider_freeze_jail_updater_mocks.go b/protocol/statetracker/updaters/provider_freeze_jail_updater_mocks.go index 24b0738393..5b506dd58f 100644 --- a/protocol/statetracker/updaters/provider_freeze_jail_updater_mocks.go +++ b/protocol/statetracker/updaters/provider_freeze_jail_updater_mocks.go @@ -41,6 +41,10 @@ func (m *MockProviderPairingStatusStateQueryInf) EXPECT() *MockProviderPairingSt return m.recorder } +func (m *MockProviderPairingStatusStateQueryInf) GetPairingQueryClient() ProviderPairingStatusStateQueryInf { + return m +} + // Provider mocks base method. func (m *MockProviderPairingStatusStateQueryInf) Provider(ctx context.Context, in *types.QueryProviderRequest, opts ...grpc.CallOption) (*types.QueryProviderResponse, error) { m.ctrl.T.Helper() diff --git a/protocol/statetracker/updaters/state_query.go b/protocol/statetracker/updaters/state_query.go index 51cb45ec61..0877f171c1 100644 --- a/protocol/statetracker/updaters/state_query.go +++ b/protocol/statetracker/updaters/state_query.go @@ -9,6 +9,7 @@ import ( downtimev1 "github.com/lavanet/lava/v4/x/downtime/v1" "github.com/cosmos/cosmos-sdk/client" + grpc1 "github.com/cosmos/gogoproto/grpc" "github.com/dgraph-io/ristretto" reliabilitymanager "github.com/lavanet/lava/v4/protocol/rpcprovider/reliabilitymanager" "github.com/lavanet/lava/v4/utils" @@ -37,22 +38,40 @@ type ProtocolVersionResponse struct { BlockNumber string } +type StateQueryAccessInf interface { + grpc1.ClientConn + tendermintRPC + client.TendermintRPC +} + +type StateQueryAccessInst struct { + grpc1.ClientConn + tendermintRPC + client.TendermintRPC +} + +func NewStateQueryAccessInst(clientCtx client.Context) *StateQueryAccessInst { + tenderRpc, ok := clientCtx.Client.(tendermintRPC) + if !ok { + utils.LavaFormatFatal("failed casting tendermint rpc from client context", nil) + } + return &StateQueryAccessInst{ClientConn: clientCtx, tendermintRPC: tenderRpc, TendermintRPC: clientCtx.Client} +} + type StateQuery struct { - SpecQueryClient spectypes.QueryClient - PairingQueryClient pairingtypes.QueryClient - EpochStorageQueryClient epochstoragetypes.QueryClient - ProtocolClient protocoltypes.QueryClient - DowntimeClient downtimev1.QueryClient + specQueryClient spectypes.QueryClient + pairingQueryClient pairingtypes.QueryClient + epochStorageQueryClient epochstoragetypes.QueryClient + protocolClient protocoltypes.QueryClient + downtimeClient downtimev1.QueryClient ResponsesCache *ristretto.Cache + tendermintRPC + client.TendermintRPC } -func NewStateQuery(ctx context.Context, clientCtx client.Context) *StateQuery { +func NewStateQuery(ctx context.Context, accessInf StateQueryAccessInf) *StateQuery { sq := &StateQuery{} - sq.SpecQueryClient = spectypes.NewQueryClient(clientCtx) - sq.PairingQueryClient = pairingtypes.NewQueryClient(clientCtx) - sq.EpochStorageQueryClient = epochstoragetypes.NewQueryClient(clientCtx) - sq.ProtocolClient = protocoltypes.NewQueryClient(clientCtx) - sq.DowntimeClient = downtimev1.NewQueryClient(clientCtx) + sq.UpdateAccess(accessInf) cache, err := ristretto.NewCache(&ristretto.Config{NumCounters: CacheNumCounters, MaxCost: CacheMaxCost, BufferItems: 64}) if err != nil { utils.LavaFormatFatal("failed setting up cache for queries", err) @@ -61,9 +80,27 @@ func NewStateQuery(ctx context.Context, clientCtx client.Context) *StateQuery { return sq } +func (sq *StateQuery) UpdateAccess(accessInf StateQueryAccessInf) { + sq.specQueryClient = spectypes.NewQueryClient(accessInf) + sq.pairingQueryClient = pairingtypes.NewQueryClient(accessInf) + sq.epochStorageQueryClient = epochstoragetypes.NewQueryClient(accessInf) + sq.protocolClient = protocoltypes.NewQueryClient(accessInf) + sq.downtimeClient = downtimev1.NewQueryClient(accessInf) + sq.tendermintRPC = accessInf + sq.TendermintRPC = accessInf +} + +func (sq *StateQuery) Provider(ctx context.Context, in *pairingtypes.QueryProviderRequest, opts ...grpc.CallOption) (*pairingtypes.QueryProviderResponse, error) { + return sq.pairingQueryClient.Provider(ctx, in, opts...) +} + +func (sq *StateQuery) GetSpecQueryClient() spectypes.QueryClient { + return sq.specQueryClient +} + func (csq *StateQuery) GetProtocolVersion(ctx context.Context) (*ProtocolVersionResponse, error) { header := metadata.MD{} - param, err := csq.ProtocolClient.Params(ctx, &protocoltypes.QueryParamsRequest{}, grpc.Header(&header)) + param, err := csq.protocolClient.Params(ctx, &protocoltypes.QueryParamsRequest{}, grpc.Header(&header)) if err != nil { return nil, err } @@ -76,7 +113,7 @@ func (csq *StateQuery) GetProtocolVersion(ctx context.Context) (*ProtocolVersion } func (csq *StateQuery) GetSpec(ctx context.Context, chainID string) (*spectypes.Spec, error) { - spec, err := csq.SpecQueryClient.Spec(ctx, &spectypes.QueryGetSpecRequest{ + spec, err := csq.specQueryClient.Spec(ctx, &spectypes.QueryGetSpecRequest{ ChainID: chainID, }) if err != nil { @@ -86,7 +123,7 @@ func (csq *StateQuery) GetSpec(ctx context.Context, chainID string) (*spectypes. } func (csq *StateQuery) GetDowntimeParams(ctx context.Context) (*downtimev1.Params, error) { - res, err := csq.DowntimeClient.QueryParams(ctx, &downtimev1.QueryParamsRequest{}) + res, err := csq.downtimeClient.QueryParams(ctx, &downtimev1.QueryParamsRequest{}) if err != nil { return nil, err } @@ -94,13 +131,13 @@ func (csq *StateQuery) GetDowntimeParams(ctx context.Context) (*downtimev1.Param } type ConsumerStateQuery struct { - StateQuery - clientCtx client.Context + *StateQuery + fromAddress string lastChainID string } func NewConsumerStateQuery(ctx context.Context, clientCtx client.Context) *ConsumerStateQuery { - csq := &ConsumerStateQuery{StateQuery: *NewStateQuery(ctx, clientCtx), clientCtx: clientCtx, lastChainID: ""} + csq := &ConsumerStateQuery{StateQuery: NewStateQuery(ctx, NewStateQueryAccessInst(clientCtx)), fromAddress: clientCtx.FromAddress.String(), lastChainID: ""} return csq } @@ -114,7 +151,7 @@ func (csq *ConsumerStateQuery) GetEffectivePolicy(ctx context.Context, consumerA } } - resp, err := csq.PairingQueryClient.EffectivePolicy(ctx, &pairingtypes.QueryEffectivePolicyRequest{ + resp, err := csq.pairingQueryClient.EffectivePolicy(ctx, &pairingtypes.QueryEffectivePolicyRequest{ Consumer: consumerAddress, SpecID: specID, }) @@ -141,9 +178,9 @@ func (csq *ConsumerStateQuery) GetPairing(ctx context.Context, chainID string, l } } - pairingResp, err := csq.PairingQueryClient.GetPairing(ctx, &pairingtypes.QueryGetPairingRequest{ + pairingResp, err := csq.pairingQueryClient.GetPairing(ctx, &pairingtypes.QueryGetPairingRequest{ ChainID: chainID, - Client: csq.clientCtx.FromAddress.String(), + Client: csq.fromAddress, }) if err != nil { return nil, 0, 0, utils.LavaFormatError("Failed in get pairing query", err, utils.Attribute{}) @@ -154,7 +191,7 @@ func (csq *ConsumerStateQuery) GetPairing(ctx context.Context, chainID string, l utils.LavaFormatWarning("Chain returned empty provider list, check node connection and consumer subscription status, or no providers provide this chain", nil, utils.LogAttr("chainId", chainID), utils.LogAttr("epoch", pairingResp.CurrentEpoch), - utils.LogAttr("consumer_address", csq.clientCtx.FromAddress.String()), + utils.LogAttr("consumer_address", csq.fromAddress), ) } return pairingResp.Providers, pairingResp.CurrentEpoch, pairingResp.BlockOfNextPairing, nil @@ -175,8 +212,8 @@ func (csq *ConsumerStateQuery) GetMaxCUForUser(ctx context.Context, chainID stri } if userEntryRes == nil { - address := csq.clientCtx.FromAddress.String() - userEntryRes, err = csq.PairingQueryClient.UserEntry(ctx, &pairingtypes.QueryUserEntryRequest{ChainID: chainID, Address: address, Block: epoch}) + address := csq.fromAddress + userEntryRes, err = csq.pairingQueryClient.UserEntry(ctx, &pairingtypes.QueryUserEntryRequest{ChainID: chainID, Address: address, Block: epoch}) if err != nil { return 0, utils.LavaFormatError("failed querying StakeEntry for consumer", err, utils.Attribute{Key: "chainID", Value: chainID}, utils.Attribute{Key: "address", Value: address}, utils.Attribute{Key: "block", Value: epoch}) } @@ -196,7 +233,7 @@ type EpochStateQuery struct { } func (esq *EpochStateQuery) CurrentEpochStart(ctx context.Context) (uint64, error) { - epochDetails, err := esq.EpochStorageQueryClient.EpochDetails(ctx, &epochstoragetypes.QueryGetEpochDetailsRequest{}) + epochDetails, err := esq.epochStorageQueryClient.EpochDetails(ctx, &epochstoragetypes.QueryGetEpochDetailsRequest{}) if err != nil { return 0, utils.LavaFormatError("Failed Querying EpochDetails", err) } @@ -209,15 +246,14 @@ func NewEpochStateQuery(stateQuery *StateQuery) *EpochStateQuery { } type ProviderStateQuery struct { - StateQuery + *StateQuery EpochStateQuery - clientCtx client.Context } -func NewProviderStateQuery(ctx context.Context, clientCtx client.Context) *ProviderStateQuery { - sq := NewStateQuery(ctx, clientCtx) +func NewProviderStateQuery(ctx context.Context, stateQueryAccess StateQueryAccessInf) *ProviderStateQuery { + sq := NewStateQuery(ctx, stateQueryAccess) esq := NewEpochStateQuery(sq) - csq := &ProviderStateQuery{StateQuery: *sq, EpochStateQuery: *esq, clientCtx: clientCtx} + csq := &ProviderStateQuery{StateQuery: sq, EpochStateQuery: *esq} return csq } @@ -233,7 +269,7 @@ func (psq *ProviderStateQuery) GetMaxCuForUser(ctx context.Context, consumerAddr } } if userEntryRes == nil { - userEntryRes, err = psq.PairingQueryClient.UserEntry(ctx, &pairingtypes.QueryUserEntryRequest{ChainID: chainID, Address: consumerAddress, Block: epoch}) + userEntryRes, err = psq.pairingQueryClient.UserEntry(ctx, &pairingtypes.QueryUserEntryRequest{ChainID: chainID, Address: consumerAddress, Block: epoch}) if err != nil { return 0, utils.LavaFormatError("StakeEntry querying for consumer failed", err, utils.Attribute{Key: "chainID", Value: chainID}, utils.Attribute{Key: "address", Value: consumerAddress}, utils.Attribute{Key: "block", Value: epoch}) } @@ -248,10 +284,7 @@ func (psq *ProviderStateQuery) entryKey(consumerAddress, chainID string, epoch u } func (psq *ProviderStateQuery) VoteEvents(ctx context.Context, latestBlock int64) (votes []*reliabilitymanager.VoteParams, err error) { - brp, err := TryIntoTendermintRPC(psq.clientCtx.Client) - if err != nil { - return nil, utils.LavaFormatError("failed to get block result provider", err) - } + brp := psq.StateQuery.tendermintRPC blockResults, err := brp.BlockResults(ctx, &latestBlock) if err != nil { return nil, err @@ -311,7 +344,7 @@ func (psq *ProviderStateQuery) VerifyPairing(ctx context.Context, consumerAddres } } if verifyResponse == nil { - verifyResponse, err = psq.PairingQueryClient.VerifyPairing(context.Background(), &pairingtypes.QueryVerifyPairingRequest{ + verifyResponse, err = psq.pairingQueryClient.VerifyPairing(context.Background(), &pairingtypes.QueryVerifyPairingRequest{ ChainID: chainID, Client: consumerAddress, Provider: providerAddress, @@ -334,7 +367,7 @@ func (psq *ProviderStateQuery) VerifyPairing(ctx context.Context, consumerAddres } func (psq *ProviderStateQuery) GetEpochSize(ctx context.Context) (uint64, error) { - res, err := psq.EpochStorageQueryClient.Params(ctx, &epochstoragetypes.QueryParamsRequest{}) + res, err := psq.epochStorageQueryClient.Params(ctx, &epochstoragetypes.QueryParamsRequest{}) if err != nil { return 0, err } @@ -342,7 +375,7 @@ func (psq *ProviderStateQuery) GetEpochSize(ctx context.Context) (uint64, error) } func (psq *ProviderStateQuery) EarliestBlockInMemory(ctx context.Context) (uint64, error) { - res, err := psq.EpochStorageQueryClient.EpochDetails(ctx, &epochstoragetypes.QueryGetEpochDetailsRequest{}) + res, err := psq.epochStorageQueryClient.EpochDetails(ctx, &epochstoragetypes.QueryGetEpochDetailsRequest{}) if err != nil { return 0, err } @@ -350,7 +383,7 @@ func (psq *ProviderStateQuery) EarliestBlockInMemory(ctx context.Context) (uint6 } func (psq *ProviderStateQuery) GetRecommendedEpochNumToCollectPayment(ctx context.Context) (uint64, error) { - res, err := psq.PairingQueryClient.Params(ctx, &pairingtypes.QueryParamsRequest{}) + res, err := psq.pairingQueryClient.Params(ctx, &pairingtypes.QueryParamsRequest{}) if err != nil { return 0, err }