From b4bd381efd819346f68064772c1b7dc09805296a Mon Sep 17 00:00:00 2001 From: Omer <100387053+omerlavanet@users.noreply.github.com> Date: Mon, 2 Dec 2024 14:27:41 +0200 Subject: [PATCH 1/6] feat: add lava over lava secondary transport (#1769) * refactor state query access * remove direct usage of client.Context to allow the rewiring of lava over lava * refactor rpcconsumer, allow creating a server with a function * lint * added custom lava transport * add lava over lava secondary transport * lint * added initialization condition * relaysMonitor is dependent on metrics, so put the functionality in rpcconsumer server * added metrics * add vote test script * fix lint * added support for e2e * oops brackets * added secondary transport startup --------- Co-authored-by: Ran Mishael --- protocol/chainlib/base_chain_parser.go | 51 +++++++++++++++++ protocol/chainlib/chainlib.go | 26 +++++++++ protocol/chainlib/tendermintRPC.go | 6 ++ protocol/lavasession/provider_types.go | 2 +- protocol/metrics/consumer_metrics_manager.go | 27 +++++++++ protocol/metrics/rpcconsumer_logs.go | 7 +++ protocol/rpcconsumer/custom_transport.go | 47 +++++++++++++-- protocol/rpcconsumer/rpcconsumer.go | 60 +++++++++++++++++++- protocol/rpcconsumer/rpcconsumer_server.go | 36 +++++++++++- protocol/statetracker/state_tracker.go | 15 ++++- scripts/test/vote_test.sh | 31 ++++++++++ 11 files changed, 295 insertions(+), 13 deletions(-) create mode 100755 scripts/test/vote_test.sh diff --git a/protocol/chainlib/base_chain_parser.go b/protocol/chainlib/base_chain_parser.go index 6cc5e8ea24..1017fb22c0 100644 --- a/protocol/chainlib/base_chain_parser.go +++ b/protocol/chainlib/base_chain_parser.go @@ -3,6 +3,8 @@ package chainlib import ( "errors" "fmt" + "io" + "net/http" "regexp" "strings" "sync" @@ -356,6 +358,55 @@ func (apip *BaseChainParser) isValidInternalPath(path string) bool { return ok } +// take an http request and direct it through the consumer +func (apip *BaseChainParser) ExtractDataFromRequest(request *http.Request) (url string, data string, connectionType string, metadata []pairingtypes.Metadata, err error) { + // Extract relative URL path + url = request.URL.Path + // Extract connection type + connectionType = request.Method + + // Extract metadata + for key, values := range request.Header { + for _, value := range values { + metadata = append(metadata, pairingtypes.Metadata{ + Name: key, + Value: value, + }) + } + } + + // Extract data + if request.Body != nil { + bodyBytes, err := io.ReadAll(request.Body) + if err != nil { + return "", "", "", nil, err + } + data = string(bodyBytes) + } + + return url, data, connectionType, metadata, nil +} + +func (apip *BaseChainParser) SetResponseFromRelayResult(relayResult *common.RelayResult) (*http.Response, error) { + if relayResult == nil { + return nil, errors.New("relayResult is nil") + } + response := &http.Response{ + StatusCode: relayResult.StatusCode, + Header: make(http.Header), + } + + for _, values := range relayResult.Reply.Metadata { + response.Header.Add(values.Name, values.Value) + } + + if relayResult.Reply != nil && relayResult.Reply.Data != nil { + response.Body = io.NopCloser(strings.NewReader(string(relayResult.Reply.Data))) + } + + return response, nil +} + // getSupportedApi fetches service api from spec by name func (apip *BaseChainParser) getApiCollection(connectionType, internalPath, addon string) (*spectypes.ApiCollection, error) { // Guard that the GrpcChainParser instance exists diff --git a/protocol/chainlib/chainlib.go b/protocol/chainlib/chainlib.go index 49a90af1db..5c3fbd9b7f 100644 --- a/protocol/chainlib/chainlib.go +++ b/protocol/chainlib/chainlib.go @@ -3,6 +3,7 @@ package chainlib import ( "context" "fmt" + "net/http" "time" "github.com/lavanet/lava/v4/protocol/chainlib/chainproxy/rpcInterfaceMessages" @@ -11,10 +12,15 @@ import ( "github.com/lavanet/lava/v4/protocol/common" "github.com/lavanet/lava/v4/protocol/lavasession" "github.com/lavanet/lava/v4/protocol/metrics" + "github.com/lavanet/lava/v4/utils" pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" spectypes "github.com/lavanet/lava/v4/x/spec/types" ) +const ( + INTERNAL_ADDRESS = "internal-addr" +) + var ( IgnoreSubscriptionNotConfiguredError = true IgnoreSubscriptionNotConfiguredErrorFlag = "ignore-subscription-not-configured-error" @@ -44,6 +50,10 @@ func NewChainListener( refererData *RefererData, consumerWsSubscriptionManager *ConsumerWSSubscriptionManager, ) (ChainListener, error) { + if listenEndpoint.NetworkAddress == INTERNAL_ADDRESS { + utils.LavaFormatDebug("skipping chain listener for internal address") + return NewEmptyChainListener(), nil + } switch listenEndpoint.ApiInterface { case spectypes.APIInterfaceJsonRPC: return NewJrpcChainListener(ctx, listenEndpoint, relaySender, healthReporter, rpcConsumerLogs, refererData, consumerWsSubscriptionManager), nil @@ -76,6 +86,8 @@ type ChainParser interface { UpdateBlockTime(newBlockTime time.Duration) GetUniqueName() string ExtensionsParser() *extensionslib.ExtensionParser + ExtractDataFromRequest(*http.Request) (url string, data string, connectionType string, metadata []pairingtypes.Metadata, err error) + SetResponseFromRelayResult(*common.RelayResult) (*http.Response, error) } type ChainMessage interface { @@ -173,3 +185,17 @@ func GetChainRouter(ctx context.Context, nConns uint, rpcProviderEndpoint *lavas } return newChainRouter(ctx, nConns, *rpcProviderEndpoint, chainParser, proxyConstructor) } + +type EmptyChainListener struct{} + +func NewEmptyChainListener() ChainListener { + return &EmptyChainListener{} +} + +func (*EmptyChainListener) Serve(ctx context.Context, cmdFlags common.ConsumerCmdFlags) { + // do nothing +} + +func (*EmptyChainListener) GetListeningAddress() string { + return "" +} diff --git a/protocol/chainlib/tendermintRPC.go b/protocol/chainlib/tendermintRPC.go index 6844bbaa24..ad35dbc517 100644 --- a/protocol/chainlib/tendermintRPC.go +++ b/protocol/chainlib/tendermintRPC.go @@ -275,6 +275,12 @@ func (*TendermintChainParser) newBatchChainMessage(serviceApi *spectypes.Api, re return nodeMsg, err } +// overwritten because tendermintrpc doesnt use POST but an empty connecionType +func (apip *TendermintChainParser) ExtractDataFromRequest(request *http.Request) (url string, data string, connectionType string, metadata []pairingtypes.Metadata, err error) { + url, data, _, metadata, err = apip.BaseChainParser.ExtractDataFromRequest(request) + return url, data, "", metadata, err +} + func (*TendermintChainParser) newChainMessage(serviceApi *spectypes.Api, requestedBlock int64, requestedHashes []string, msg *rpcInterfaceMessages.TendermintrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool) *baseChainMessageContainer { nodeMsg := &baseChainMessageContainer{ api: serviceApi, diff --git a/protocol/lavasession/provider_types.go b/protocol/lavasession/provider_types.go index 955abe4b3d..7cc985e1da 100644 --- a/protocol/lavasession/provider_types.go +++ b/protocol/lavasession/provider_types.go @@ -50,7 +50,7 @@ func (endpoint *RPCProviderEndpoint) AddonsString() string { } func (endpoint *RPCProviderEndpoint) String() string { - return endpoint.ChainID + ":" + endpoint.ApiInterface + " Network Address:" + endpoint.NetworkAddress.Address + " Node: " + endpoint.UrlsString() + " Geolocation:" + strconv.FormatUint(endpoint.Geolocation, 10) + " Addons:" + endpoint.AddonsString() + return endpoint.ChainID + ":" + endpoint.ApiInterface + " Network Address:" + endpoint.NetworkAddress.Address + " Node:" + endpoint.UrlsString() + " Geolocation:" + strconv.FormatUint(endpoint.Geolocation, 10) + " Addons:" + endpoint.AddonsString() } func (endpoint *RPCProviderEndpoint) Validate() error { diff --git a/protocol/metrics/consumer_metrics_manager.go b/protocol/metrics/consumer_metrics_manager.go index 9a77678a4d..ae4ee74319 100644 --- a/protocol/metrics/consumer_metrics_manager.go +++ b/protocol/metrics/consumer_metrics_manager.go @@ -45,6 +45,8 @@ type ConsumerMetricsManager struct { totalFailedWsSubscriptionRequestsMetric *prometheus.CounterVec totalWsSubscriptionDissconnectMetric *prometheus.CounterVec totalDuplicatedWsSubscriptionRequestsMetric *prometheus.CounterVec + totalLoLSuccessMetric prometheus.Counter + totalLoLErrorsMetric prometheus.Counter totalWebSocketConnectionsActive *prometheus.GaugeVec blockMetric *prometheus.GaugeVec latencyMetric *prometheus.GaugeVec @@ -118,6 +120,16 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM Help: "The total number of duplicated webscket subscription requests over time per chain id per api interface.", }, []string{"spec", "apiInterface"}) + totalLoLSuccessMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "lava_consumer_total_lol_successes", + Help: "The total number of requests sent to lava over lava successfully", + }) + + totalLoLErrorsMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "lava_consumer_total_lol_errors", + Help: "The total number of requests sent to lava over lava and failed", + }) + totalWebSocketConnectionsActive := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "lava_consumer_total_websocket_connections_active", Help: "The total number of currently active websocket connections with users", @@ -241,6 +253,8 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM prometheus.MustRegister(totalFailedWsSubscriptionRequestsMetric) prometheus.MustRegister(totalDuplicatedWsSubscriptionRequestsMetric) prometheus.MustRegister(totalWsSubscriptionDissconnectMetric) + prometheus.MustRegister(totalLoLSuccessMetric) + prometheus.MustRegister(totalLoLErrorsMetric) consumerMetricsManager := &ConsumerMetricsManager{ totalCURequestedMetric: totalCURequestedMetric, @@ -274,6 +288,8 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM relayProcessingLatencyBeforeProvider: relayProcessingLatencyBeforeProvider, relayProcessingLatencyAfterProvider: relayProcessingLatencyAfterProvider, averageProcessingLatency: map[string]*LatencyTracker{}, + totalLoLSuccessMetric: totalLoLSuccessMetric, + totalLoLErrorsMetric: totalLoLErrorsMetric, consumerOptimizerQoSClient: options.ConsumerOptimizerQoSClient, } @@ -565,6 +581,17 @@ func (pme *ConsumerMetricsManager) SetWsSubscriptioDisconnectRequestMetric(chain pme.totalWsSubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, disconnectReason).Inc() } +func (pme *ConsumerMetricsManager) SetLoLResponse(success bool) { + if pme == nil { + return + } + if success { + pme.totalLoLSuccessMetric.Inc() + } else { + pme.totalLoLErrorsMetric.Inc() + } +} + func (pme *ConsumerMetricsManager) handleOptimizerQoS(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) diff --git a/protocol/metrics/rpcconsumer_logs.go b/protocol/metrics/rpcconsumer_logs.go index d09f988716..0bc0359384 100644 --- a/protocol/metrics/rpcconsumer_logs.go +++ b/protocol/metrics/rpcconsumer_logs.go @@ -92,6 +92,13 @@ func NewRPCConsumerLogs(consumerMetricsManager *ConsumerMetricsManager, consumer return rpcConsumerLogs, err } +func (rpccl *RPCConsumerLogs) SetLoLResponse(success bool) { + if rpccl == nil { + return + } + rpccl.consumerMetricsManager.SetLoLResponse(success) +} + func (rpccl *RPCConsumerLogs) SetWebSocketConnectionActive(chainId string, apiInterface string, add bool) { rpccl.consumerMetricsManager.SetWebSocketConnectionActive(chainId, apiInterface, add) } diff --git a/protocol/rpcconsumer/custom_transport.go b/protocol/rpcconsumer/custom_transport.go index aef36b3396..415fea1f85 100644 --- a/protocol/rpcconsumer/custom_transport.go +++ b/protocol/rpcconsumer/custom_transport.go @@ -2,22 +2,57 @@ package rpcconsumer import ( "net/http" + "sync" + "sync/atomic" + + "github.com/lavanet/lava/v4/utils" ) type CustomLavaTransport struct { - transport http.RoundTripper + transport http.RoundTripper + lock sync.RWMutex + secondaryTransport http.RoundTripper + consecutiveFails atomic.Uint64 // TODO: export to metrics +} + +func NewCustomLavaTransport(httpTransport http.RoundTripper, secondaryTransport http.RoundTripper) *CustomLavaTransport { + return &CustomLavaTransport{transport: httpTransport, secondaryTransport: secondaryTransport} } -func NewCustomLavaTransport(httpTransport http.RoundTripper) *CustomLavaTransport { - return &CustomLavaTransport{transport: httpTransport} +func (c *CustomLavaTransport) SetSecondaryTransport(secondaryTransport http.RoundTripper) { + c.lock.Lock() + defer c.lock.Unlock() + utils.LavaFormatDebug("Setting secondary transport for CustomLavaTransport") + c.secondaryTransport = secondaryTransport +} + +// used to switch the primary and secondary transports, in case the primary one fails too much +func (c *CustomLavaTransport) TogglePrimarySecondaryTransport() { + c.lock.Lock() + defer c.lock.Unlock() + primaryTransport := c.transport + secondaryTransport := c.secondaryTransport + c.secondaryTransport = primaryTransport + c.transport = secondaryTransport } func (c *CustomLavaTransport) RoundTrip(req *http.Request) (*http.Response, error) { // Custom logic before the request - + c.lock.RLock() + primaryTransport := c.transport + secondaryTransport := c.secondaryTransport + c.lock.RUnlock() // Delegate to the underlying RoundTripper (usually http.Transport) - resp, err := c.transport.RoundTrip(req) - + resp, err := primaryTransport.RoundTrip(req) // Custom logic after the request + if err != nil { + c.consecutiveFails.Add(1) + // If the primary transport fails, use the secondary transport + if secondaryTransport != nil { + resp, err = secondaryTransport.RoundTrip(req) + } + } else { + c.consecutiveFails.Store(0) + } return resp, err } diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index 62448fe557..94784cdd2e 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -49,6 +49,7 @@ const ( refererBackendAddressFlagName = "referer-be-address" refererMarkerFlagName = "referer-marker" reportsSendBEAddress = "reports-be-address" + LavaOverLavaBackupFlagName = "use-lava-over-lava-backup" ) var ( @@ -156,9 +157,11 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt } consumerMetricsManager.SetVersion(upgrade.GetCurrentVersion().ConsumerVersion) + var customLavaTransport *CustomLavaTransport httpClient, err := jsonrpcclient.DefaultHTTPClient(options.clientCtx.NodeURI) if err == nil { - httpClient.Transport = NewCustomLavaTransport(httpClient.Transport) + customLavaTransport = NewCustomLavaTransport(httpClient.Transport, nil) + httpClient.Transport = customLavaTransport client, err := rpchttp.NewWithClient(options.clientCtx.NodeURI, "/websocket", httpClient) if err == nil { options.clientCtx = options.clientCtx.WithClient(client) @@ -227,10 +230,25 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt for _, rpcEndpoint := range options.rpcEndpoints { go func(rpcEndpoint *lavasession.RPCEndpoint) error { defer wg.Done() - _, err := rpcc.CreateConsumerEndpoint(ctx, rpcEndpoint, errCh, consumerAddr, consumerStateTracker, + rpcConsumerServer, err := rpcc.CreateConsumerEndpoint(ctx, rpcEndpoint, errCh, consumerAddr, consumerStateTracker, policyUpdaters, optimizers, consumerConsistencies, finalizationConsensuses, chainMutexes, options, privKey, lavaChainID, rpcConsumerMetrics, consumerReportsManager, consumerOptimizerQoSClient, consumerMetricsManager, relaysMonitorAggregator) + if err == nil { + if customLavaTransport != nil && statetracker.IsLavaNativeSpec(rpcEndpoint.ChainID) && rpcEndpoint.ApiInterface == spectypes.APIInterfaceTendermintRPC { + // we can add lava over lava to the custom transport as a secondary source + go func() { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for range ticker.C { + if rpcConsumerServer.IsInitialized() { + customLavaTransport.SetSecondaryTransport(rpcConsumerServer) + return + } + } + }() + } + } return err }(rpcEndpoint) } @@ -634,6 +652,43 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 utils.LavaFormatFatal("offline spec modifications are supported only in single chain bootstrapping", nil, utils.LogAttr("len(rpcEndpoints)", len(rpcEndpoints)), utils.LogAttr("rpcEndpoints", rpcEndpoints)) } + if viper.GetBool(LavaOverLavaBackupFlagName) { + additionalEndpoint := func() *lavasession.RPCEndpoint { + for _, endpoint := range rpcEndpoints { + if statetracker.IsLavaNativeSpec(endpoint.ChainID) { + // native spec already exists, no need to add + return nil + } + } + // need to add an endpoint for the native lava chain + if strings.Contains(networkChainId, "mainnet") { + return &lavasession.RPCEndpoint{ + NetworkAddress: chainlib.INTERNAL_ADDRESS, + ChainID: statetracker.MAINNET_SPEC, + ApiInterface: spectypes.APIInterfaceTendermintRPC, + } + } else if strings.Contains(networkChainId, "testnet") { + return &lavasession.RPCEndpoint{ + NetworkAddress: chainlib.INTERNAL_ADDRESS, + ChainID: statetracker.TESTNET_SPEC, + ApiInterface: spectypes.APIInterfaceTendermintRPC, + } + } else if strings.Contains(networkChainId, "testnet") || networkChainId == "lava" { + return &lavasession.RPCEndpoint{ + NetworkAddress: chainlib.INTERNAL_ADDRESS, + ChainID: statetracker.TESTNET_SPEC, + ApiInterface: spectypes.APIInterfaceTendermintRPC, + } + } + utils.LavaFormatError("could not find a native lava chain for the current network", nil, utils.LogAttr("networkChainId", networkChainId)) + return nil + }() + if additionalEndpoint != nil { + utils.LavaFormatInfo("Lava over Lava backup is enabled", utils.Attribute{Key: "additionalEndpoint", Value: additionalEndpoint.ChainID}) + rpcEndpoints = append(rpcEndpoints, additionalEndpoint) + } + } + rpcConsumerSharedState := viper.GetBool(common.SharedStateFlag) err = rpcConsumer.Start(ctx, &rpcConsumerStartOptions{ txFactory, @@ -699,6 +754,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 cmdRPCConsumer.Flags().DurationVar(&metrics.OptimizerQosServerSamplingInterval, common.OptimizerQosServerSamplingIntervalFlag, time.Second*1, "interval to sample optimizer qos reports") cmdRPCConsumer.Flags().IntVar(&chainlib.WebSocketRateLimit, common.RateLimitWebSocketFlag, chainlib.WebSocketRateLimit, "rate limit (per second) websocket requests per user connection, default is unlimited") cmdRPCConsumer.Flags().DurationVar(&chainlib.WebSocketBanDuration, common.BanDurationForWebsocketRateLimitExceededFlag, chainlib.WebSocketBanDuration, "once websocket rate limit is reached, user will be banned Xfor a duration, default no ban") + cmdRPCConsumer.Flags().Bool(LavaOverLavaBackupFlagName, true, "enable lava over lava backup to regular rpc calls") common.AddRollingLogConfig(cmdRPCConsumer) return cmdRPCConsumer } diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index a2ae109f0f..a1c6e55823 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -4,9 +4,11 @@ import ( "context" "errors" "fmt" + "net/http" "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/goccy/go-json" @@ -77,6 +79,7 @@ type RPCConsumerServer struct { chainListener chainlib.ChainListener connectedSubscriptionsLock sync.RWMutex relayRetriesManager *lavaprotocol.RelayRetriesManager + initialized atomic.Bool } type relayResponse struct { @@ -166,8 +169,11 @@ func (rpccs *RPCConsumerServer) sendCraftedRelaysWrapper(initialRelays bool) (bo // Only start after everything is initialized - check consumer session manager rpccs.waitForPairing() } - - return rpccs.sendCraftedRelays(MaxRelayRetries, initialRelays) + success, err := rpccs.sendCraftedRelays(MaxRelayRetries, initialRelays) + if success { + rpccs.initialized.Store(true) + } + return success, err } func (rpccs *RPCConsumerServer) waitForPairing() { @@ -1551,6 +1557,32 @@ func (rpccs *RPCConsumerServer) IsHealthy() bool { return rpccs.relaysMonitor.IsHealthy() } +func (rpccs *RPCConsumerServer) IsInitialized() bool { + if rpccs == nil { + return false + } + + return rpccs.initialized.Load() +} + +func (rpccs *RPCConsumerServer) RoundTrip(req *http.Request) (*http.Response, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + guid := utils.GenerateUniqueIdentifier() + ctx = utils.WithUniqueIdentifier(ctx, guid) + url, data, connectionType, metadata, err := rpccs.chainParser.ExtractDataFromRequest(req) + if err != nil { + return nil, err + } + relayResult, err := rpccs.SendRelay(ctx, url, data, connectionType, "", "", nil, metadata) + if err != nil { + return nil, err + } + resp, err := rpccs.chainParser.SetResponseFromRelayResult(relayResult) + rpccs.rpcConsumerLogs.SetLoLResponse(err == nil) + return resp, err +} + func (rpccs *RPCConsumerServer) updateProtocolMessageIfNeededWithNewEarliestData( ctx context.Context, relayState *RelayState, diff --git a/protocol/statetracker/state_tracker.go b/protocol/statetracker/state_tracker.go index a87ecbf8b7..6a119b3ca3 100644 --- a/protocol/statetracker/state_tracker.go +++ b/protocol/statetracker/state_tracker.go @@ -18,12 +18,14 @@ import ( const ( BlocksToSaveLavaChainTracker = 1 // we only need the latest block TendermintConsensusParamsQuery = "consensus_params" + MAINNET_SPEC = "LAVA" + TESTNET_SPEC = "LAV1" ) var ( lavaSpecName = "" // TODO: add a governance param change that indicates what spec id belongs to lava. - lavaSpecOptions = []string{"LAV1", "LAVA"} + LavaSpecOptions = []string{TESTNET_SPEC, MAINNET_SPEC} ) // ConsumerStateTracker CSTis a class for tracking consumer data from the lava blockchain, such as epoch changes. @@ -68,7 +70,7 @@ func GetLavaSpecWithRetry(ctx context.Context, specQueryClient spectypes.QueryCl var err error for i := 0; i < updaters.BlockResultRetry; i++ { if lavaSpecName == "" { // spec name is not initialized, try fetching specs. - for _, specId := range lavaSpecOptions { + for _, specId := range LavaSpecOptions { specResponse, err = specQueryClient.Spec(ctx, &spectypes.QueryGetSpecRequest{ ChainID: specId, }) @@ -195,3 +197,12 @@ func (st *StateTracker) RegisterForUpdates(ctx context.Context, updater Updater) func (st *StateTracker) GetEventTracker() *updaters.EventTracker { return st.EventTracker } + +func IsLavaNativeSpec(checked string) bool { + for _, nativeLavaChain := range LavaSpecOptions { + if checked == nativeLavaChain { + return true + } + } + return false +} diff --git a/scripts/test/vote_test.sh b/scripts/test/vote_test.sh new file mode 100755 index 0000000000..79f5a22377 --- /dev/null +++ b/scripts/test/vote_test.sh @@ -0,0 +1,31 @@ +#!/bin/bash +__dir=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +source $__dir/../useful_commands.sh +. ${__dir}/vars/variables.sh +# Making sure old screens are not running +echo "current vote number $(latest_vote)" +killall screen +screen -wipe +GASPRICE="0.00002ulava" + +delegate_amount=1000000000000ulava +delegate_amount_big=49000000000000ulava +operator=$(lavad q staking validators --output json | jq -r ".validators[0].operator_address") +echo "operator: $operator" +lavad tx staking delegate $operator $delegate_amount --from bob --chain-id lava --gas-prices $GASPRICE --gas-adjustment 1.5 --gas auto -y +lavad tx staking delegate $operator $delegate_amount --from user1 --chain-id lava --gas-prices $GASPRICE --gas-adjustment 1.5 --gas auto -y +lavad tx staking delegate $operator $delegate_amount --from user2 --chain-id lava --gas-prices $GASPRICE --gas-adjustment 1.5 --gas auto -y +lavad tx staking delegate $operator $delegate_amount_big --from user3 --chain-id lava --gas-prices $GASPRICE --gas-adjustment 1.5 --gas auto -y +lavad tx staking delegate $operator $delegate_amount_big --from user4 --chain-id lava --gas-prices $GASPRICE --gas-adjustment 1.5 --gas auto -y +wait_count_blocks 1 +lavad tx gov submit-legacy-proposal plans-add ./cookbook/plans/test_plans/default.json -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE +echo; echo "#### Waiting 2 blocks ####" +wait_count_blocks 2 +# voting abstain with 50% of the voting power, yes with 2% of the voting power no with 1% of the voting power +lavad tx gov vote $(latest_vote) abstain -y --from user3 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE +lavad tx gov vote $(latest_vote) yes -y --from user2 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE +lavad tx gov vote $(latest_vote) yes -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE +lavad tx gov vote $(latest_vote) no -y --from bob --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE + +echo "latest vote: $(latest_vote)" +lavad q gov proposal $(latest_vote) \ No newline at end of file From 1b12d07c7c102d949442a0786283c4780efea700 Mon Sep 17 00:00:00 2001 From: Elad Gildnur <6321801+shleikes@users.noreply.github.com> Date: Mon, 2 Dec 2024 16:13:48 +0200 Subject: [PATCH 2/6] Remove unused flag (#1815) --- protocol/chainlib/chain_router.go | 5 +---- protocol/chainlib/chain_router_test.go | 8 -------- protocol/chainlib/chainlib.go | 5 ----- protocol/rpcprovider/rpcprovider.go | 1 - 4 files changed, 1 insertion(+), 18 deletions(-) diff --git a/protocol/chainlib/chain_router.go b/protocol/chainlib/chain_router.go index 7a00cf5b94..9c07a6bdbb 100644 --- a/protocol/chainlib/chain_router.go +++ b/protocol/chainlib/chain_router.go @@ -326,14 +326,11 @@ func newChainRouter(ctx context.Context, nConns uint, rpcProviderEndpoint lavase } } if hasSubscriptionInSpec && apiCollection.Enabled && !webSocketSupported { - err := utils.LavaFormatError("subscriptions are applicable for this chain, but websocket is not provided in 'supported' map. By not setting ws/wss your provider wont be able to accept ws subscriptions, therefore might receive less rewards and lower QOS score.", nil, + return nil, utils.LavaFormatError("subscriptions are applicable for this chain, but websocket is not provided in 'supported' map. By not setting ws/wss your provider wont be able to accept ws subscriptions, therefore might receive less rewards and lower QOS score.", nil, utils.LogAttr("apiInterface", apiCollection.CollectionData.ApiInterface), utils.LogAttr("supportedMap", supportedMap), utils.LogAttr("required", WebSocketExtension), ) - if !IgnoreSubscriptionNotConfiguredError { - return nil, err - } } utils.LavaFormatDebug("router keys", utils.LogAttr("chainProxyRouter", chainProxyRouter)) diff --git a/protocol/chainlib/chain_router_test.go b/protocol/chainlib/chain_router_test.go index aefdc84b1c..7d80f343a1 100644 --- a/protocol/chainlib/chain_router_test.go +++ b/protocol/chainlib/chain_router_test.go @@ -40,8 +40,6 @@ func TestChainRouterWithDisabledWebSocketInSpec(t *testing.T) { chainParser, err := NewChainParser(apiInterface) require.NoError(t, err) - IgnoreSubscriptionNotConfiguredError = false - addonsOptions := []string{"-addon-", "-addon2-"} extensionsOptions := []string{"-test-", "-test2-", "-test3-"} @@ -400,8 +398,6 @@ func TestChainRouterWithEnabledWebSocketInSpec(t *testing.T) { chainParser, err := NewChainParser(apiInterface) require.NoError(t, err) - IgnoreSubscriptionNotConfiguredError = false - addonsOptions := []string{"-addon-", "-addon2-"} extensionsOptions := []string{"-test-", "-test2-", "-test3-"} @@ -795,8 +791,6 @@ func TestChainRouterWithMethodRoutes(t *testing.T) { chainParser, err := NewChainParser(apiInterface) require.NoError(t, err) - IgnoreSubscriptionNotConfiguredError = false - addonsOptions := []string{"-addon-", "-addon2-"} extensionsOptions := []string{"-test-", "-test2-", "-test3-"} @@ -2181,8 +2175,6 @@ func TestChainRouterWithInternalPaths(t *testing.T) { chainParser, err := NewChainParser(play.apiInterface) require.NoError(t, err) - IgnoreSubscriptionNotConfiguredError = false - spec := testcommon.CreateMockSpec() spec.ApiCollections = play.specApiCollections chainParser.SetSpec(spec) diff --git a/protocol/chainlib/chainlib.go b/protocol/chainlib/chainlib.go index 5c3fbd9b7f..41c024ae04 100644 --- a/protocol/chainlib/chainlib.go +++ b/protocol/chainlib/chainlib.go @@ -21,11 +21,6 @@ const ( INTERNAL_ADDRESS = "internal-addr" ) -var ( - IgnoreSubscriptionNotConfiguredError = true - IgnoreSubscriptionNotConfiguredErrorFlag = "ignore-subscription-not-configured-error" -) - func NewChainParser(apiInterface string) (chainParser ChainParser, err error) { switch apiInterface { case spectypes.APIInterfaceJsonRPC: diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 794b64f295..66d99e2519 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -829,7 +829,6 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt cmdRPCProvider.Flags().Duration(common.RelayHealthIntervalFlag, RelayHealthIntervalFlagDefault, "interval between relay health checks") cmdRPCProvider.Flags().String(HealthCheckURLPathFlagName, HealthCheckURLPathFlagDefault, "the url path for the provider's grpc health check") cmdRPCProvider.Flags().DurationVar(&updaters.TimeOutForFetchingLavaBlocks, common.TimeOutForFetchingLavaBlocksFlag, time.Second*5, "setting the timeout for fetching lava blocks") - cmdRPCProvider.Flags().BoolVar(&chainlib.IgnoreSubscriptionNotConfiguredError, chainlib.IgnoreSubscriptionNotConfiguredErrorFlag, chainlib.IgnoreSubscriptionNotConfiguredError, "ignore webSocket node url not configured error, when subscription is enabled in spec") cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json") cmdRPCProvider.Flags().Uint64(common.RateLimitRequestPerSecondFlag, 0, "Measuring the load relative to this number for feedback - per second - per chain - default unlimited. Given Y simultaneous relay calls, a value of X and will measure Y/X load rate.") From 100ba7ebd70f56cbb2282a7fe7c74330e5a8fc84 Mon Sep 17 00:00:00 2001 From: Elad Gildnur <6321801+shleikes@users.noreply.github.com> Date: Tue, 3 Dec 2024 12:35:37 +0200 Subject: [PATCH 3/6] Fix solana subscription (#1816) --- cookbook/specs/solana.json | 151 +++++++++++------- .../chainlib/chainproxy/rpcclient/handler.go | 24 ++- .../chainlib/chainproxy/rpcclient/json.go | 13 +- .../chainproxy/rpcclient/subscription.go | 2 +- 4 files changed, 119 insertions(+), 71 deletions(-) diff --git a/cookbook/specs/solana.json b/cookbook/specs/solana.json index 5089de1331..07d746c62f 100755 --- a/cookbook/specs/solana.json +++ b/cookbook/specs/solana.json @@ -964,7 +964,91 @@ "stateful": 0 }, "extra_compute_units": 0 + } + ], + "headers": [], + "inheritance_apis": [], + "parse_directives": [ + { + "function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"getLatestBlockhash\",\"params\":[{\"commitment\":\"finalized\"}],\"id\":1}", + "function_tag": "GET_BLOCKNUM", + "result_parsing": { + "parser_arg": [ + "0", + "context", + "slot" + ], + "parser_func": "PARSE_CANONICAL" + }, + "api_name": "getLatestBlockhash" }, + { + "function_tag": "GET_BLOCK_BY_NUM", + "function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"getBlock\",\"params\":[%d,{\"transactionDetails\":\"none\",\"rewards\":false}],\"id\":1}", + "result_parsing": { + "parser_arg": [ + "0", + "blockhash" + ], + "parser_func": "PARSE_CANONICAL", + "encoding": "base64" + }, + "api_name": "getBlock" + } + ], + "verifications": [ + { + "name": "version", + "parse_directive": { + "function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"getVersion\",\"params\":[],\"id\":1}", + "function_tag": "VERIFICATION", + "result_parsing": { + "parser_arg": [ + "0", + "solana-core" + ], + "parser_func": "PARSE_CANONICAL" + }, + "api_name": "getVersion" + }, + "values": [ + { + "expected_value": "*" + } + ] + }, + { + "name": "tokens-owner-indexed", + "parse_directive": { + "function_template": "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"getTokenAccountsByOwner\",\"params\":[\"4Qkev8aNZcqFNSRhQzwyLMFSsi94jHqE8WNVTJzTP99F\",{\"programId\":\"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA\"},{\"encoding\":\"jsonParsed\"}]}", + "function_tag": "VERIFICATION", + "result_parsing": { + "parser_arg": [ + "0", + "value" + ], + "parser_func": "PARSE_CANONICAL" + }, + "api_name": "getTokenAccountsByOwner" + }, + "values": [ + { + "expected_value": "*", + "severity": "Warning" + } + ] + } + ] + }, + { + "enabled": true, + "collection_data": { + "api_interface": "jsonrpc", + "internal_path": "/ws", + "type": "POST", + "add_on": "" + }, + "apis": [ { "name": "accountSubscribe", "block_parsing": { @@ -1294,30 +1378,17 @@ "inheritance_apis": [], "parse_directives": [ { - "function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"getLatestBlockhash\",\"params\":[{\"commitment\":\"finalized\"}],\"id\":1}", "function_tag": "GET_BLOCKNUM", "result_parsing": { - "parser_arg": [ - "0", - "context", - "slot" - ], - "parser_func": "PARSE_CANONICAL" - }, - "api_name": "getLatestBlockhash" + "parser_func": "DEFAULT" + } }, { + "function_template": "%d", "function_tag": "GET_BLOCK_BY_NUM", - "function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"getBlock\",\"params\":[%d,{\"transactionDetails\":\"none\",\"rewards\":false}],\"id\":1}", "result_parsing": { - "parser_arg": [ - "0", - "blockhash" - ], - "parser_func": "PARSE_CANONICAL", - "encoding": "base64" - }, - "api_name": "getBlock" + "parser_func": "DEFAULT" + } }, { "function_tag": "SUBSCRIBE", @@ -1401,49 +1472,7 @@ "api_name": "voteUnsubscribe" } ], - "verifications": [ - { - "name": "version", - "parse_directive": { - "function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"getVersion\",\"params\":[],\"id\":1}", - "function_tag": "VERIFICATION", - "result_parsing": { - "parser_arg": [ - "0", - "solana-core" - ], - "parser_func": "PARSE_CANONICAL" - }, - "api_name": "getVersion" - }, - "values": [ - { - "expected_value": "*" - } - ] - }, - { - "name": "tokens-owner-indexed", - "parse_directive": { - "function_template": "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"getTokenAccountsByOwner\",\"params\":[\"4Qkev8aNZcqFNSRhQzwyLMFSsi94jHqE8WNVTJzTP99F\",{\"programId\":\"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA\"},{\"encoding\":\"jsonParsed\"}]}", - "function_tag": "VERIFICATION", - "result_parsing": { - "parser_arg": [ - "0", - "value" - ], - "parser_func": "PARSE_CANONICAL" - }, - "api_name": "getTokenAccountsByOwner" - }, - "values": [ - { - "expected_value": "*", - "severity": "Warning" - } - ] - } - ] + "verifications": [] } ] }, diff --git a/protocol/chainlib/chainproxy/rpcclient/handler.go b/protocol/chainlib/chainproxy/rpcclient/handler.go index bb1aa31199..acedfd97d9 100755 --- a/protocol/chainlib/chainproxy/rpcclient/handler.go +++ b/protocol/chainlib/chainproxy/rpcclient/handler.go @@ -237,13 +237,16 @@ func (h *handler) handleImmediate(msg *JsonrpcMessage) bool { h.handleSubscriptionResultTendermint(msg) return true case msg.isEthereumNotification(): - if strings.HasSuffix(msg.Method, notificationMethodSuffix) { + if strings.HasSuffix(msg.Method, ethereumNotificationMethodSuffix) { h.handleSubscriptionResultEthereum(msg) return true + } else if strings.HasSuffix(msg.Method, solanaNotificationMethodSuffix) { + h.handleSubscriptionResultSolana(msg) + return true } return false case msg.isStarkNetPathfinderNotification(): - if strings.HasSuffix(msg.Method, notificationMethodSuffix) { + if strings.HasSuffix(msg.Method, ethereumNotificationMethodSuffix) { h.handleSubscriptionResultStarkNetPathfinder(msg) return true } @@ -258,7 +261,7 @@ func (h *handler) handleImmediate(msg *JsonrpcMessage) bool { } func (h *handler) handleSubscriptionResultStarkNetPathfinder(msg *JsonrpcMessage) { - var result starkNetPathfinderSubscriptionResult + var result integerIdSubscriptionResult if err := json.Unmarshal(msg.Result, &result); err != nil { utils.LavaFormatTrace("Dropping invalid starknet pathfinder subscription message", utils.LogAttr("err", err), @@ -290,6 +293,21 @@ func (h *handler) handleSubscriptionResultEthereum(msg *JsonrpcMessage) { } } +func (h *handler) handleSubscriptionResultSolana(msg *JsonrpcMessage) { + var result integerIdSubscriptionResult + if err := json.Unmarshal(msg.Params, &result); err != nil { + utils.LavaFormatTrace("Dropping invalid solana subscription message", + utils.LogAttr("err", err), + utils.LogAttr("params", string(msg.Params)), + ) + h.log.Debug("Dropping invalid subscription message") + return + } + if h.clientSubs[strconv.Itoa(result.ID)] != nil { + h.clientSubs[strconv.Itoa(result.ID)].deliver(msg) + } +} + func (h *handler) handleSubscriptionResultTendermint(msg *JsonrpcMessage) { var result tendermintSubscriptionResult if err := json.Unmarshal(msg.Result, &result); err != nil { diff --git a/protocol/chainlib/chainproxy/rpcclient/json.go b/protocol/chainlib/chainproxy/rpcclient/json.go index 794ad3ebe4..84ab2e0a6e 100755 --- a/protocol/chainlib/chainproxy/rpcclient/json.go +++ b/protocol/chainlib/chainproxy/rpcclient/json.go @@ -33,11 +33,12 @@ import ( ) const ( - Vsn = "2.0" - serviceMethodSeparator = "_" - subscribeMethodSuffix = "_subscribe" - unsubscribeMethodSuffix = "_unsubscribe" - notificationMethodSuffix = "_subscription" + Vsn = "2.0" + serviceMethodSeparator = "_" + subscribeMethodSuffix = "_subscribe" + unsubscribeMethodSuffix = "_unsubscribe" + ethereumNotificationMethodSuffix = "_subscription" + solanaNotificationMethodSuffix = "Notification" defaultWriteTimeout = 10 * time.Second // used if context has no deadline ) @@ -49,7 +50,7 @@ type ethereumSubscriptionResult struct { Result json.RawMessage `json:"result,omitempty"` } -type starkNetPathfinderSubscriptionResult struct { +type integerIdSubscriptionResult struct { ID int `json:"subscription"` Result json.RawMessage `json:"result,omitempty"` } diff --git a/protocol/chainlib/chainproxy/rpcclient/subscription.go b/protocol/chainlib/chainproxy/rpcclient/subscription.go index 803ecc171c..cc882364ac 100755 --- a/protocol/chainlib/chainproxy/rpcclient/subscription.go +++ b/protocol/chainlib/chainproxy/rpcclient/subscription.go @@ -181,7 +181,7 @@ func (n *Notifier) send(sub *Subscription, data json.RawMessage) error { ctx := context.Background() return n.h.conn.writeJSON(ctx, &JsonrpcMessage{ Version: Vsn, - Method: n.namespace + notificationMethodSuffix, + Method: n.namespace + ethereumNotificationMethodSuffix, Params: params, }) } From 70bfa01558954c89f27f3d2eb681825adf08ea86 Mon Sep 17 00:00:00 2001 From: Omer <100387053+omerlavanet@users.noreply.github.com> Date: Tue, 3 Dec 2024 12:36:45 +0200 Subject: [PATCH 4/6] chore: added utils to test method routing (#1780) * added utils to test method routing * added osmosis to policy --- ... => lava_example_archive_method_route.yml} | 8 ++++---- .../policy_all_chains_with_extension.yml | 20 +++++++++++++++++++ scripts/init_chain_commands.sh | 6 +++++- scripts/test/httpServer.py | 9 +++++++-- 4 files changed, 36 insertions(+), 7 deletions(-) rename config/provider_examples/{lava_example_archive_methodroute.yml => lava_example_archive_method_route.yml} (84%) diff --git a/config/provider_examples/lava_example_archive_methodroute.yml b/config/provider_examples/lava_example_archive_method_route.yml similarity index 84% rename from config/provider_examples/lava_example_archive_methodroute.yml rename to config/provider_examples/lava_example_archive_method_route.yml index e8cbf3bad9..5e4f76b9dc 100644 --- a/config/provider_examples/lava_example_archive_methodroute.yml +++ b/config/provider_examples/lava_example_archive_method_route.yml @@ -2,14 +2,14 @@ endpoints: - api-interface: tendermintrpc chain-id: LAV1 network-address: - address: "127.0.0.1:2220" + address: "127.0.0.1:2224" node-urls: - url: ws://127.0.0.1:26657/websocket - url: http://127.0.0.1:26657 - url: http://127.0.0.1:26657 addons: - archive - - url: https://trustless-api.com + - url: http://127.0.0.1:4444 methods: - block - block_by_hash @@ -18,7 +18,7 @@ endpoints: - api-interface: grpc chain-id: LAV1 network-address: - address: "127.0.0.1:2220" + address: "127.0.0.1:2224" node-urls: - url: 127.0.0.1:9090 - url: 127.0.0.1:9090 @@ -27,7 +27,7 @@ endpoints: - api-interface: rest chain-id: LAV1 network-address: - address: "127.0.0.1:2220" + address: "127.0.0.1:2224" node-urls: - url: http://127.0.0.1:1317 - url: http://127.0.0.1:1317 diff --git a/cookbook/projects/policy_all_chains_with_extension.yml b/cookbook/projects/policy_all_chains_with_extension.yml index 491e9bd047..59c25bcb00 100644 --- a/cookbook/projects/policy_all_chains_with_extension.yml +++ b/cookbook/projects/policy_all_chains_with_extension.yml @@ -110,6 +110,26 @@ Policy: extensions: - "archive" mixed: true + - chain_id: OSMOSIS + requirements: + - collection: + api_interface: "rest" + type: "GET" + extensions: + - "archive" + mixed: true + - collection: + api_interface: "grpc" + type: "" + extensions: + - "archive" + mixed: true + - collection: + api_interface: "tendermintrpc" + type: "" + extensions: + - "archive" + mixed: true - chain_id: COSMOSHUB requirements: - collection: diff --git a/scripts/init_chain_commands.sh b/scripts/init_chain_commands.sh index 295f201531..75faa7345d 100755 --- a/scripts/init_chain_commands.sh +++ b/scripts/init_chain_commands.sh @@ -52,6 +52,7 @@ PROVIDERSTAKE="500000000000ulava" PROVIDER1_LISTENER="127.0.0.1:2221" PROVIDER2_LISTENER="127.0.0.1:2222" PROVIDER3_LISTENER="127.0.0.1:2223" +# PROVIDER4_LISTENER="127.0.0.1:2224" sleep 4 @@ -67,7 +68,7 @@ lavad tx gov vote $(latest_vote) yes -y --from alice --gas-adjustment "1.5" --ga echo; echo "#### Buy DefaultPlan subscription for user1 ####" lavad tx subscription buy DefaultPlan $(lavad keys show user1 -a) --enable-auto-renewal -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE # wait_count_blocks 2 -# lavad tx project set-policy $(lavad keys show user1 -a)-admin ./cookbook/projects/policy_all_chains_with_addon.yml -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE +# lavad tx project set-policy $(lavad keys show user1 -a)-admin ./cookbook/projects/policy_all_chains_with_extension.yml -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE # MANTLE CHAINS="ETH1,SEP1,HOL1,OSMOSIS,FTM250,CELO,LAV1,OSMOSIST,ALFAJORES,ARB1,ARBN,APT1,STRK,JUN1,COSMOSHUB,POLYGON1,EVMOS,OPTM,BASES,CANTO,SUIT,SOLANA,BSC,AXELAR,AVAX,FVM,NEAR,SQDSUBGRAPH,AGR,AGRT,KOIIT,AVAXT,CELESTIATM" @@ -82,6 +83,9 @@ lavad tx pairing bulk-stake-provider $BASE_CHAINS $PROVIDERSTAKE "$PROVIDER2_LIS echo; echo "#### Staking provider 3 ####" lavad tx pairing bulk-stake-provider $BASE_CHAINS $PROVIDERSTAKE "$PROVIDER3_LISTENER,1" 1 $(operator_address) -y --delegate-commission 50 --from servicer3 --provider-moniker "servicer3" --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE +# echo; echo "#### Staking provider 4 ####" +# lavad tx pairing bulk-stake-provider $BASE_CHAINS $PROVIDERSTAKE "$PROVIDER4_LISTENER,1" 1 $(operator_address) -y --delegate-commission 50 --from servicer4 --provider-moniker "servicer4" --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE + echo; echo "#### Waiting 1 block ####" wait_count_blocks 1 diff --git a/scripts/test/httpServer.py b/scripts/test/httpServer.py index 94358ac9d6..ddbe4e77a4 100644 --- a/scripts/test/httpServer.py +++ b/scripts/test/httpServer.py @@ -1,6 +1,8 @@ from http.server import BaseHTTPRequestHandler, HTTPServer import sys +payload_ret = "OK" + class RequestHandler(BaseHTTPRequestHandler): def do_GET(self): self.print_request() @@ -26,10 +28,11 @@ def print_request(self): print(f"Body:\n{body.decode('utf-8')}") # Send a response back to the client + response = payload_ret.encode('utf-8') self.send_response(200) - self.send_header("Content-type", "text/html") + self.send_header("Content-type", "application/json") self.end_headers() - self.wfile.write(b"OK") + self.wfile.write(response) def run_server(port=8000): server_address = ('', port) @@ -40,6 +43,8 @@ def run_server(port=8000): if __name__ == '__main__': if len(sys.argv) > 1: port = int(sys.argv[1]) + if len(sys.argv) > 2: + payload_ret = sys.argv[2] run_server(port) else: run_server() \ No newline at end of file From 064945d91d504f6127943878a39936803ae430ba Mon Sep 17 00:00:00 2001 From: Elad Gildnur <6321801+shleikes@users.noreply.github.com> Date: Tue, 3 Dec 2024 13:29:59 +0200 Subject: [PATCH 5/6] Rename leftovers of "portal" to "connsumer" (#1820) --- protocol/integration/protocol_test.go | 2 +- protocol/rpcconsumer/rpcconsumer_server.go | 2 +- scripts/pre_setups/init_eth_archive_mix.sh | 2 +- scripts/setup_providers.sh | 4 ++-- scripts/test/jail_provider_test.sh | 4 ++-- scripts/test_spec_full.sh | 2 +- 6 files changed, 8 insertions(+), 8 deletions(-) diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index a0ed5126d9..a5b2b86f84 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -2154,7 +2154,7 @@ func TestArchiveProvidersRetryOnParsedHash(t *testing.T) { ChainId: specId, SeenBlock: 1005, BlocksHashesToHeights: []*pairingtypes.BlockHashToHeight{{Hash: blockHash, Height: spectypes.NOT_APPLICABLE}}, - }) // caching in the portal doesn't care about hashes, and we don't have data on finalization yet + }) // caching in the consumer doesn't care about hashes, and we don't have data on finalization yet cancel() if err != nil { continue diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index a1c6e55823..a774f0e1da 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -632,7 +632,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( SharedStateId: sharedStateId, SeenBlock: protocolMessage.RelayPrivateData().SeenBlock, BlocksHashesToHeights: rpccs.newBlocksHashesToHeightsSliceFromRequestedBlockHashes(protocolMessage.GetRequestedBlocksHashes()), - }) // caching in the portal doesn't care about hashes, and we don't have data on finalization yet + }) // caching in the consumer doesn't care about hashes, and we don't have data on finalization yet cancel() reply := cacheReply.GetReply() diff --git a/scripts/pre_setups/init_eth_archive_mix.sh b/scripts/pre_setups/init_eth_archive_mix.sh index 2d9291a578..134dfcb21e 100755 --- a/scripts/pre_setups/init_eth_archive_mix.sh +++ b/scripts/pre_setups/init_eth_archive_mix.sh @@ -67,7 +67,7 @@ screen -d -m -S provider$i bash -c "source ~/.bashrc; lavap rpcprovider \ $EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer$i --chain-id lava 2>&1 | tee $LOGS_DIR/PROVIDER$i.log" && sleep 0.25 screen -d -m -S portals bash -c "source ~/.bashrc; lavap rpcconsumer consumer_examples/ethereum_example.yml\ -$EXTRA_PORTAL_FLAGS --cache-be "127.0.0.1:7778" --geolocation 1 --debug-relays --log_level debug --from user1 --chain-id lava --allow-insecure-provider-dialing 2>&1 | tee $LOGS_DIR/PORTAL.log" && sleep 0.25 +$EXTRA_PORTAL_FLAGS --cache-be "127.0.0.1:7778" --geolocation 1 --debug-relays --log_level debug --from user1 --chain-id lava --allow-insecure-provider-dialing 2>&1 | tee $LOGS_DIR/CONSUMER.log" && sleep 0.25 echo "--- setting up screens done ---" screen -ls diff --git a/scripts/setup_providers.sh b/scripts/setup_providers.sh index a3b5fa9b0d..127640da41 100755 --- a/scripts/setup_providers.sh +++ b/scripts/setup_providers.sh @@ -100,9 +100,9 @@ $EXTRA_PROVIDER_FLAGS --geolocation "$GEOLOCATION" --log_level debug --from serv # $PROVIDER3_LISTENER MANTLE jsonrpc '$MANTLE_JRPC' \ echo; echo "#### Starting consumer ####" -# Setup Portal +# Setup Consumer screen -d -m -S portals bash -c "source ~/.bashrc; lavap rpcconsumer consumer_examples/full_consumer_example.yml\ -$EXTRA_PORTAL_FLAGS --cache-be "127.0.0.1:7778" --geolocation "$GEOLOCATION" --debug-relays --log_level debug --from user1 --chain-id lava --allow-insecure-provider-dialing --strategy distributed 2>&1 | tee $LOGS_DIR/PORTAL.log" && sleep 0.25 +$EXTRA_PORTAL_FLAGS --cache-be "127.0.0.1:7778" --geolocation "$GEOLOCATION" --debug-relays --log_level debug --from user1 --chain-id lava --allow-insecure-provider-dialing --strategy distributed 2>&1 | tee $LOGS_DIR/CONSUMER.log" && sleep 0.25 # 127.0.0.1:3385 MANTLE jsonrpc \ echo "--- setting up screens done ---" diff --git a/scripts/test/jail_provider_test.sh b/scripts/test/jail_provider_test.sh index 878754b542..f190e70376 100755 --- a/scripts/test/jail_provider_test.sh +++ b/scripts/test/jail_provider_test.sh @@ -74,11 +74,11 @@ $PROVIDER4_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC' \ $PROVIDER4_LISTENER LAV1 grpc '$LAVA_GRPC' \ $EXTRA_PROVIDER_FLAGS --chain-id=lava --metrics-listen-address ":7780" --geolocation 1 --log_level debug --from servicer4 2>&1 | tee $LOGS_DIR/PROVIDER4.log" -# Setup Portal +# Setup Consumer screen -d -m -S portals bash -c "source ~/.bashrc; lava-protocol rpcconsumer \ 127.0.0.1:3333 ETH1 jsonrpc \ 127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \ -$EXTRA_PORTAL_FLAGS --metrics-listen-address ":7779" --geolocation 1 --log_level debug --from user1 --chain-id lava --allow-insecure-provider-dialing 2>&1 | tee $LOGS_DIR/PORTAL.log" +$EXTRA_PORTAL_FLAGS --metrics-listen-address ":7779" --geolocation 1 --log_level debug --from user1 --chain-id lava --allow-insecure-provider-dialing 2>&1 | tee $LOGS_DIR/CONSUMER.log" # need to wait 8 epochs for the provider to be jail eligible diff --git a/scripts/test_spec_full.sh b/scripts/test_spec_full.sh index 9971f9959c..cd504f47ec 100755 --- a/scripts/test_spec_full.sh +++ b/scripts/test_spec_full.sh @@ -206,7 +206,7 @@ done echo "[+]generated consumer config: $output_consumer_yaml" cat $output_consumer_yaml if [ "$dry" = false ]; then - screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer testutil/debugging/logs/consumer.yml $EXTRA_PORTAL_FLAGS --geolocation 1 --debug-relays --log_level debug --from user1 --chain-id lava --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/PORTAL.log" + screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer testutil/debugging/logs/consumer.yml $EXTRA_PORTAL_FLAGS --geolocation 1 --debug-relays --log_level debug --from user1 --chain-id lava --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMER.log" echo "[+] letting providers start and running health check then running command with flags: $test_consumer_command_args" sleep 10 From e9cfc7d16eb23cc771863104999d38237db94266 Mon Sep 17 00:00:00 2001 From: Elad Gildnur <6321801+shleikes@users.noreply.github.com> Date: Tue, 3 Dec 2024 14:16:50 +0200 Subject: [PATCH 6/6] feat: PRT - Add the consumer address to the QoS report (#1819) * Add the consumer address to the QoS report * Test fix --------- Co-authored-by: Ran Mishael <106548467+ranlavanet@users.noreply.github.com> --- .../metrics/consumer_optimizer_qos_client.go | 18 +++--- .../provider_optimizer_test.go | 2 +- protocol/rpcconsumer/rpcconsumer.go | 55 ++++++++++++------- 3 files changed, 46 insertions(+), 29 deletions(-) diff --git a/protocol/metrics/consumer_optimizer_qos_client.go b/protocol/metrics/consumer_optimizer_qos_client.go index 3e12e23dfc..72183853d9 100644 --- a/protocol/metrics/consumer_optimizer_qos_client.go +++ b/protocol/metrics/consumer_optimizer_qos_client.go @@ -21,9 +21,10 @@ var ( ) type ConsumerOptimizerQoSClient struct { - consumerOrigin string - queueSender *QueueSender - optimizers map[string]OptimizerInf // keys are chain ids + consumerHostname string + consumerAddress string + queueSender *QueueSender + optimizers map[string]OptimizerInf // keys are chain ids // keys are chain ids, values are maps with provider addresses as keys chainIdToProviderToRelaysCount map[string]map[string]uint64 chainIdToProviderToNodeErrorsCount map[string]map[string]uint64 @@ -49,7 +50,8 @@ type OptimizerQoSReportToSend struct { LatencyScore float64 `json:"latency_score"` GenericScore float64 `json:"generic_score"` ProviderAddress string `json:"provider"` - ConsumerOrigin string `json:"consumer"` + ConsumerHostname string `json:"consumer_hostname"` + ConsumerAddress string `json:"consumer_pub_address"` ChainId string `json:"chain_id"` NodeErrorRate float64 `json:"node_error_rate"` Epoch uint64 `json:"epoch"` @@ -69,14 +71,15 @@ type OptimizerInf interface { CalculateQoSScoresForMetrics(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) []*OptimizerQoSReport } -func NewConsumerOptimizerQoSClient(endpointAddress string, interval ...time.Duration) *ConsumerOptimizerQoSClient { +func NewConsumerOptimizerQoSClient(consumerAddress, endpointAddress string, interval ...time.Duration) *ConsumerOptimizerQoSClient { hostname, err := os.Hostname() if err != nil { utils.LavaFormatWarning("Error while getting hostname for ConsumerOptimizerQoSClient", err) hostname = "unknown" + strconv.FormatUint(rand.Uint64(), 10) // random seed for different unknowns } return &ConsumerOptimizerQoSClient{ - consumerOrigin: hostname, + consumerHostname: hostname, + consumerAddress: consumerAddress, queueSender: NewQueueSender(endpointAddress, "ConsumerOptimizerQoS", nil, interval...), optimizers: map[string]OptimizerInf{}, chainIdToProviderToRelaysCount: map[string]map[string]uint64{}, @@ -130,7 +133,8 @@ func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *Optimiz // must be called under read lock optimizerQoSReportToSend := OptimizerQoSReportToSend{ Timestamp: time.Now(), - ConsumerOrigin: coqc.consumerOrigin, + ConsumerHostname: coqc.consumerHostname, + ConsumerAddress: coqc.consumerAddress, SyncScore: report.SyncScore, AvailabilityScore: report.AvailabilityScore, LatencyScore: report.LatencyScore, diff --git a/protocol/provideroptimizer/provider_optimizer_test.go b/protocol/provideroptimizer/provider_optimizer_test.go index fc8427a9dc..6de13de8b6 100644 --- a/protocol/provideroptimizer/provider_optimizer_test.go +++ b/protocol/provideroptimizer/provider_optimizer_test.go @@ -781,7 +781,7 @@ func TestProviderOptimizerWithOptimizerQoSClient(t *testing.T) { chainId := "dontcare" - consumerOptimizerQoSClient := metrics.NewConsumerOptimizerQoSClient(mockHttpServer.URL, 1*time.Second) + consumerOptimizerQoSClient := metrics.NewConsumerOptimizerQoSClient("lava@test", mockHttpServer.URL, 1*time.Second) consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(context.Background(), 900*time.Millisecond) providerOptimizer := NewProviderOptimizer(STRATEGY_BALANCED, TEST_AVERAGE_BLOCK_TIME, TEST_BASE_WORLD_LATENCY, 10, consumerOptimizerQoSClient, chainId) diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index 94784cdd2e..dc92b3e117 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -131,6 +131,33 @@ type rpcConsumerStartOptions struct { staticProvidersList []*lavasession.RPCProviderEndpoint // define static providers as backup to lava providers } +func getConsumerAddressAndKeys(clientCtx client.Context) (sdk.AccAddress, *secp256k1.PrivateKey, error) { + keyName, err := sigs.GetKeyName(clientCtx) + if err != nil { + return nil, nil, fmt.Errorf("failed getting key name from clientCtx: %w", err) + } + + privKey, err := sigs.GetPrivKey(clientCtx, keyName) + if err != nil { + return nil, nil, fmt.Errorf("failed getting private key from key name %s: %w", keyName, err) + } + + clientKey, _ := clientCtx.Keyring.Key(keyName) + pubkey, err := clientKey.GetPubKey() + if err != nil { + return nil, nil, fmt.Errorf("failed getting public key from key name %s: %w", keyName, err) + } + + var consumerAddr sdk.AccAddress + err = consumerAddr.Unmarshal(pubkey.Address()) + if err != nil { + return nil, nil, fmt.Errorf("failed unmarshaling public address for key %s (pubkey: %v): %w", + keyName, pubkey.Address(), err) + } + + return consumerAddr, privKey, nil +} + // spawns a new RPCConsumer server with all it's processes and internals ready for communications func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOptions) (err error) { if common.IsTestMode(ctx) { @@ -139,11 +166,16 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt options.refererData.ReferrerClient = metrics.NewConsumerReferrerClient(options.refererData.Address) consumerReportsManager := metrics.NewConsumerReportsClient(options.analyticsServerAddresses.ReportsAddressFlag) + consumerAddr, privKey, err := getConsumerAddressAndKeys(options.clientCtx) + if err != nil { + utils.LavaFormatFatal("failed to get consumer address and keys", err) + } + consumerUsageServeManager := metrics.NewConsumerRelayServerClient(options.analyticsServerAddresses.RelayServerAddress) // start up relay server reporting var consumerOptimizerQoSClient *metrics.ConsumerOptimizerQoSClient if options.analyticsServerAddresses.OptimizerQoSAddress != "" || options.analyticsServerAddresses.OptimizerQoSListen { - consumerOptimizerQoSClient = metrics.NewConsumerOptimizerQoSClient(options.analyticsServerAddresses.OptimizerQoSAddress, metrics.OptimizerQosServerPushInterval) // start up optimizer qos client - consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(ctx, metrics.OptimizerQosServerSamplingInterval) // start up optimizer qos client + consumerOptimizerQoSClient = metrics.NewConsumerOptimizerQoSClient(consumerAddr.String(), options.analyticsServerAddresses.OptimizerQoSAddress, metrics.OptimizerQosServerPushInterval) // start up optimizer qos client + consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(ctx, metrics.OptimizerQosServerSamplingInterval) // start up optimizer qos client } consumerMetricsManager := metrics.NewConsumerMetricsManager(metrics.ConsumerMetricsManagerOptions{ NetworkAddress: options.analyticsServerAddresses.MetricsListenAddress, @@ -179,26 +211,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt lavaChainFetcher.FetchLatestBlockNum(ctx) lavaChainID := options.clientCtx.ChainID - keyName, err := sigs.GetKeyName(options.clientCtx) - if err != nil { - utils.LavaFormatFatal("failed getting key name from clientCtx", err) - } - privKey, err := sigs.GetPrivKey(options.clientCtx, keyName) - if err != nil { - utils.LavaFormatFatal("failed getting private key from key name", err, utils.Attribute{Key: "keyName", Value: keyName}) - } - clientKey, _ := options.clientCtx.Keyring.Key(keyName) - pubkey, err := clientKey.GetPubKey() - if err != nil { - utils.LavaFormatFatal("failed getting public key from key name", err, utils.Attribute{Key: "keyName", Value: keyName}) - } - - var consumerAddr sdk.AccAddress - err = consumerAddr.Unmarshal(pubkey.Address()) - if err != nil { - utils.LavaFormatFatal("failed unmarshaling public address", err, utils.Attribute{Key: "keyName", Value: keyName}, utils.Attribute{Key: "pubkey", Value: pubkey.Address()}) - } // we want one provider optimizer per chain so we will store them for reuse across rpcEndpoints chainMutexes := map[string]*sync.Mutex{} for _, endpoint := range options.rpcEndpoints {