diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index fb537fb028..1a095f78dd 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -3,7 +3,7 @@ name: Publish Lava Release on: push: tags: - - ‘v[0-9]+.[0-9]+.[0-9]+.*’ + - 'v[0-9]+.[0-9]+.[0-9]+*' workflow_dispatch: inputs: release_tag: diff --git a/protocol/chainlib/consumer_ws_subscription_manager_test.go b/protocol/chainlib/consumer_ws_subscription_manager_test.go index 48573a3512..cc39f4d3aa 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager_test.go +++ b/protocol/chainlib/consumer_ws_subscription_manager_test.go @@ -725,7 +725,7 @@ func CreateConsumerSessionManager(chainID, apiInterface, consumerPublicAddress s baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better return lavasession.NewConsumerSessionManager( &lavasession.RPCEndpoint{NetworkAddress: "stub", ChainID: chainID, ApiInterface: apiInterface, TLSEnabled: false, HealthCheckPath: "/", Geolocation: 0}, - provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1), + provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1, nil, "dontcare"), nil, nil, consumerPublicAddress, lavasession.NewActiveSubscriptionProvidersStorage(), ) diff --git a/protocol/common/cobra_common.go b/protocol/common/cobra_common.go index 7574e9dd23..38213fbf40 100644 --- a/protocol/common/cobra_common.go +++ b/protocol/common/cobra_common.go @@ -39,6 +39,10 @@ const ( SetProviderOptimizerWorstTierPickChance = "set-provider-optimizer-worst-tier-pick-chance" SetProviderOptimizerNumberOfTiersToCreate = "set-provider-optimizer-number-of-tiers-to-create" + // optimizer qos server flags + OptimizerQosServerAddressFlag = "optimizer-qos-server-address" // address of the optimizer qos server to send the qos reports + OptimizerQosServerPushIntervalFlag = "optimizer-qos-push-interval" // interval to push the qos reports to the optimizer qos server + OptimizerQosServerSamplingIntervalFlag = "optimizer-qos-sampling-interval" // interval to sample the qos reports // websocket flags RateLimitWebSocketFlag = "rate-limit-websocket-requests-per-connection" BanDurationForWebsocketRateLimitExceededFlag = "ban-duration-for-websocket-rate-limit-exceeded" diff --git a/protocol/common/safe_sync_map.go b/protocol/common/safe_sync_map.go index b0e94a421c..95263bcf84 100644 --- a/protocol/common/safe_sync_map.go +++ b/protocol/common/safe_sync_map.go @@ -46,6 +46,20 @@ func (ssm *SafeSyncMap[K, V]) LoadOrStore(key K, value V) (ret V, loaded bool, e return value, false, nil } -func (ssm *SafeSyncMap[K, V]) Range(f func(key, value any) bool) { - ssm.localMap.Range(f) +func (ssm *SafeSyncMap[K, V]) Range(f func(key K, value V) bool) { + ssm.localMap.Range(func(key, value any) bool { + unboxedKey, ok := key.(K) + if !ok { + utils.LavaFormatError("invalid usage of sync map, could not cast key into a type", nil) + return false + } + + unboxedValue, ok := value.(V) + if !ok { + utils.LavaFormatError("invalid usage of sync map, could not cast value into a type", nil) + return false + } + + return f(unboxedKey, unboxedValue) + }) } diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index cf3a0412d0..ea3b21feb3 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -205,7 +205,7 @@ func createRpcConsumer(t *testing.T, ctx context.Context, rpcConsumerOptions rpc finalizationConsensus := finalizationconsensus.NewFinalizationConsensus(rpcEndpoint.ChainID) _, averageBlockTime, _, _ := chainParser.ChainBlockStats() baseLatency := common.AverageWorldLatency / 2 - optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2) + optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2, nil, "dontcare") consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, nil, nil, "test", lavasession.NewActiveSubscriptionProvidersStorage()) consumerSessionManager.UpdateAllProviders(rpcConsumerOptions.epoch, rpcConsumerOptions.pairingList) @@ -219,7 +219,7 @@ func createRpcConsumer(t *testing.T, ctx context.Context, rpcConsumerOptions rpc consumerConsistency := rpcconsumer.NewConsumerConsistency(rpcConsumerOptions.specId) consumerCmdFlags := common.ConsumerCmdFlags{} - rpcconsumerLogs, err := metrics.NewRPCConsumerLogs(nil, nil) + rpcconsumerLogs, err := metrics.NewRPCConsumerLogs(nil, nil, nil) require.NoError(t, err) err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, rpcConsumerOptions.requiredResponses, rpcConsumerOptions.account.SK, rpcConsumerOptions.lavaChainID, cache, rpcconsumerLogs, rpcConsumerOptions.account.Addr, consumerConsistency, nil, consumerCmdFlags, false, nil, nil, nil) require.NoError(t, err) diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index ad22d15f6f..970c27baa1 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -114,7 +114,8 @@ func (csm *ConsumerSessionManager) UpdateAllProviders(epoch uint64, pairingList csm.setValidAddressesToDefaultValue("", nil) // the starting point is that valid addresses are equal to pairing addresses. // reset session related metrics csm.consumerMetricsManager.ResetSessionRelatedMetrics() - csm.providerOptimizer.UpdateWeights(CalcWeightsByStake(pairingList)) + go csm.providerOptimizer.UpdateWeights(CalcWeightsByStake(pairingList), epoch) + utils.LavaFormatDebug("updated providers", utils.Attribute{Key: "epoch", Value: epoch}, utils.Attribute{Key: "spec", Value: csm.rpcEndpoint.Key()}) return nil } @@ -1127,7 +1128,14 @@ func (csm *ConsumerSessionManager) GenerateReconnectCallback(consumerSessionsWit } } -func NewConsumerSessionManager(rpcEndpoint *RPCEndpoint, providerOptimizer ProviderOptimizer, consumerMetricsManager *metrics.ConsumerMetricsManager, reporter metrics.Reporter, consumerPublicAddress string, activeSubscriptionProvidersStorage *ActiveSubscriptionProvidersStorage) *ConsumerSessionManager { +func NewConsumerSessionManager( + rpcEndpoint *RPCEndpoint, + providerOptimizer ProviderOptimizer, + consumerMetricsManager *metrics.ConsumerMetricsManager, + reporter metrics.Reporter, + consumerPublicAddress string, + activeSubscriptionProvidersStorage *ActiveSubscriptionProvidersStorage, +) *ConsumerSessionManager { csm := &ConsumerSessionManager{ reportedProviders: NewReportedProviders(reporter, rpcEndpoint.ChainID), consumerMetricsManager: consumerMetricsManager, diff --git a/protocol/lavasession/consumer_session_manager_test.go b/protocol/lavasession/consumer_session_manager_test.go index 178cee24dc..9e7c340979 100644 --- a/protocol/lavasession/consumer_session_manager_test.go +++ b/protocol/lavasession/consumer_session_manager_test.go @@ -162,7 +162,7 @@ func TestEndpointSortingFlow(t *testing.T) { func CreateConsumerSessionManager() *ConsumerSessionManager { rand.InitRandomSeed() baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better - return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1), nil, nil, "lava@test", NewActiveSubscriptionProvidersStorage()) + return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1, nil, "dontcare"), nil, nil, "lava@test", NewActiveSubscriptionProvidersStorage()) } func TestMain(m *testing.M) { diff --git a/protocol/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go index 67c8b7b259..897c9b4664 100644 --- a/protocol/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -76,7 +76,7 @@ type ProviderOptimizer interface { ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (addresses []string, tier int) GetExcellenceQoSReportForProvider(string) (*pairingtypes.QualityOfServiceReport, *pairingtypes.QualityOfServiceReport) Strategy() provideroptimizer.Strategy - UpdateWeights(map[string]int64) + UpdateWeights(map[string]int64, uint64) } type ignoredProviders struct { diff --git a/protocol/metrics/consumer_optimizer_qos_client.go b/protocol/metrics/consumer_optimizer_qos_client.go new file mode 100644 index 0000000000..ef20fed8bf --- /dev/null +++ b/protocol/metrics/consumer_optimizer_qos_client.go @@ -0,0 +1,281 @@ +package metrics + +import ( + "context" + "os" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/goccy/go-json" + "github.com/lavanet/lava/v3/utils" + "github.com/lavanet/lava/v3/utils/rand" + spectypes "github.com/lavanet/lava/v3/x/spec/types" + "golang.org/x/exp/maps" +) + +var ( + OptimizerQosServerPushInterval time.Duration + OptimizerQosServerSamplingInterval time.Duration +) + +type ConsumerOptimizerQoSClient struct { + consumerOrigin string + queueSender *QueueSender + optimizers map[string]OptimizerInf // keys are chain ids + // keys are chain ids, values are maps with provider addresses as keys + chainIdToProviderToRelaysCount map[string]map[string]uint64 + chainIdToProviderToNodeErrorsCount map[string]map[string]uint64 + chainIdToProviderToEpochToStake map[string]map[string]map[uint64]int64 // third key is epoch + currentEpoch atomic.Uint64 + lock sync.RWMutex +} + +type OptimizerQoSReport struct { + ProviderAddress string + SyncScore float64 + AvailabilityScore float64 + LatencyScore float64 + GenericScore float64 + EntryIndex int +} + +type optimizerQoSReportToSend struct { + Timestamp time.Time `json:"timestamp"` + SyncScore float64 `json:"sync_score"` + AvailabilityScore float64 `json:"availability_score"` + LatencyScore float64 `json:"latency_score"` + GenericScore float64 `json:"generic_score"` + ProviderAddress string `json:"provider"` + ConsumerOrigin string `json:"consumer"` + ChainId string `json:"chain_id"` + NodeErrorRate float64 `json:"node_error_rate"` + Epoch uint64 `json:"epoch"` + ProviderStake int64 `json:"provider_stake"` + EntryIndex int `json:"entry_index"` +} + +func (oqosr optimizerQoSReportToSend) String() string { + bytes, err := json.Marshal(oqosr) + if err != nil { + return "" + } + return string(bytes) +} + +type OptimizerInf interface { + CalculateQoSScoresForMetrics(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) []*OptimizerQoSReport +} + +func NewConsumerOptimizerQoSClient(endpointAddress string, interval ...time.Duration) *ConsumerOptimizerQoSClient { + hostname, err := os.Hostname() + if err != nil { + utils.LavaFormatWarning("Error while getting hostname for ConsumerOptimizerQoSClient", err) + hostname = "unknown" + strconv.FormatUint(rand.Uint64(), 10) // random seed for different unknowns + } + + return &ConsumerOptimizerQoSClient{ + consumerOrigin: hostname, + queueSender: NewQueueSender(endpointAddress, "ConsumerOptimizerQoS", nil, interval...), + optimizers: map[string]OptimizerInf{}, + chainIdToProviderToRelaysCount: map[string]map[string]uint64{}, + chainIdToProviderToNodeErrorsCount: map[string]map[string]uint64{}, + chainIdToProviderToEpochToStake: map[string]map[string]map[uint64]int64{}, + } +} + +func (coqc *ConsumerOptimizerQoSClient) getProviderChainMapCounterValue(counterStore map[string]map[string]uint64, chainId, providerAddress string) uint64 { + // must be called under read lock + if counterProvidersMap, found := counterStore[chainId]; found { + return counterProvidersMap[providerAddress] + } + return 0 +} + +func (coqc *ConsumerOptimizerQoSClient) getProviderChainRelaysCount(chainId, providerAddress string) uint64 { + // must be called under read lock + return coqc.getProviderChainMapCounterValue(coqc.chainIdToProviderToRelaysCount, chainId, providerAddress) +} + +func (coqc *ConsumerOptimizerQoSClient) getProviderChainNodeErrorsCount(chainId, providerAddress string) uint64 { + // must be called under read lock + return coqc.getProviderChainMapCounterValue(coqc.chainIdToProviderToNodeErrorsCount, chainId, providerAddress) +} + +func (coqc *ConsumerOptimizerQoSClient) getProviderChainStake(chainId, providerAddress string, epoch uint64) int64 { + // must be called under read lock + if providersMap, found := coqc.chainIdToProviderToEpochToStake[chainId]; found { + if epochMap, found := providersMap[providerAddress]; found { + if stake, found := epochMap[epoch]; found { + return stake + } + } + } + return 0 +} + +func (coqc *ConsumerOptimizerQoSClient) calculateNodeErrorRate(chainId, providerAddress string) float64 { + // must be called under read lock + relaysCount := coqc.getProviderChainRelaysCount(chainId, providerAddress) + if relaysCount > 0 { + errorsCount := coqc.getProviderChainNodeErrorsCount(chainId, providerAddress) + return float64(errorsCount) / float64(relaysCount) + } + + return 0 +} + +func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *OptimizerQoSReport, chainId string, epoch uint64) { + // must be called under read lock + + optimizerQoSReportToSend := optimizerQoSReportToSend{ + Timestamp: time.Now(), + ConsumerOrigin: coqc.consumerOrigin, + SyncScore: report.SyncScore, + AvailabilityScore: report.AvailabilityScore, + LatencyScore: report.LatencyScore, + GenericScore: report.GenericScore, + ProviderAddress: report.ProviderAddress, + EntryIndex: report.EntryIndex, + ChainId: chainId, + Epoch: epoch, + NodeErrorRate: coqc.calculateNodeErrorRate(chainId, report.ProviderAddress), + ProviderStake: coqc.getProviderChainStake(chainId, report.ProviderAddress, epoch), + } + + coqc.queueSender.appendQueue(optimizerQoSReportToSend) +} + +func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() { + coqc.lock.RLock() // we only read from the maps here + defer coqc.lock.RUnlock() + + ignoredProviders := map[string]struct{}{} + cu := uint64(10) + requestedBlock := spectypes.LATEST_BLOCK + + currentEpoch := coqc.currentEpoch.Load() + + for chainId, optimizer := range coqc.optimizers { + providersMap, ok := coqc.chainIdToProviderToEpochToStake[chainId] + if !ok { + continue + } + + reports := optimizer.CalculateQoSScoresForMetrics(maps.Keys(providersMap), ignoredProviders, cu, requestedBlock) + for _, report := range reports { + coqc.appendOptimizerQoSReport(report, chainId, currentEpoch) + } + } +} + +func (coqc *ConsumerOptimizerQoSClient) StartOptimizersQoSReportsCollecting(ctx context.Context, samplingInterval time.Duration) { + if coqc == nil { + return + } + + utils.LavaFormatTrace("Starting ConsumerOptimizerQoSClient reports collecting") + go func() { + for { + select { + case <-ctx.Done(): + utils.LavaFormatTrace("ConsumerOptimizerQoSClient context done") + return + case <-time.After(samplingInterval): + coqc.getReportsFromOptimizers() + } + } + }() +} + +func (coqc *ConsumerOptimizerQoSClient) RegisterOptimizer(optimizer OptimizerInf, chainId string) { + if coqc == nil { + return + } + + coqc.lock.Lock() + defer coqc.lock.Unlock() + + if _, found := coqc.optimizers[chainId]; found { + utils.LavaFormatWarning("Optimizer already registered for chain", nil, utils.LogAttr("chainId", chainId)) + return + } + + coqc.optimizers[chainId] = optimizer +} + +func (coqc *ConsumerOptimizerQoSClient) incrementStoreCounter(store map[string]map[string]uint64, chainId, providerAddress string) { + // must be called under write lock + if coqc == nil { + return + } + + providersMap, found := store[chainId] + if !found { + store[chainId] = map[string]uint64{providerAddress: 1} + return + } + + count, found := providersMap[providerAddress] + if !found { + store[chainId][providerAddress] = 1 + return + } + + store[chainId][providerAddress] = count + 1 +} + +func (coqc *ConsumerOptimizerQoSClient) SetRelaySentToProvider(providerAddress string, chainId string) { + if coqc == nil { + return + } + + coqc.lock.Lock() + defer coqc.lock.Unlock() + + coqc.incrementStoreCounter(coqc.chainIdToProviderToRelaysCount, chainId, providerAddress) +} + +func (coqc *ConsumerOptimizerQoSClient) SetNodeErrorToProvider(providerAddress string, chainId string) { + if coqc == nil { + return + } + + coqc.lock.Lock() + defer coqc.lock.Unlock() + + coqc.incrementStoreCounter(coqc.chainIdToProviderToNodeErrorsCount, chainId, providerAddress) +} + +func (coqc *ConsumerOptimizerQoSClient) setProviderStake(chainId, providerAddress string, epoch uint64, stake int64) { + // must be called under write lock + coqc.currentEpoch.Store(epoch) + + providersMap, found := coqc.chainIdToProviderToEpochToStake[chainId] + if !found { + coqc.chainIdToProviderToEpochToStake[chainId] = map[string]map[uint64]int64{providerAddress: {epoch: stake}} + return + } + + epochMap, found := providersMap[providerAddress] + if !found { + coqc.chainIdToProviderToEpochToStake[chainId][providerAddress] = map[uint64]int64{epoch: stake} + return + } + + epochMap[epoch] = stake +} + +func (coqc *ConsumerOptimizerQoSClient) UpdatePairingListStake(stakeMap map[string]int64, chainId string, epoch uint64) { + if coqc == nil { + return + } + + coqc.lock.Lock() + defer coqc.lock.Unlock() + + for providerAddr, stake := range stakeMap { + coqc.setProviderStake(chainId, providerAddr, epoch, stake) + } +} diff --git a/protocol/metrics/queue_sender.go b/protocol/metrics/queue_sender.go index 8a266112a0..16f41c2d7a 100644 --- a/protocol/metrics/queue_sender.go +++ b/protocol/metrics/queue_sender.go @@ -64,7 +64,12 @@ func (crc *QueueSender) sendQueueTick() { crc.lock.Lock() defer crc.lock.Unlock() - if !crc.isSendQueueRunning && len(crc.addQueue) > 0 { + if len(crc.addQueue) == 0 { + utils.LavaFormatDebug(fmt.Sprintf("[QueueSender:%s] sendQueueTick: addQueue is empty", crc.name)) + return + } + + if !crc.isSendQueueRunning { sendQueue := crc.addQueue crc.addQueue = make([]fmt.Stringer, 0) crc.isSendQueueRunning = true diff --git a/protocol/metrics/rpcconsumerlogs.go b/protocol/metrics/rpcconsumerlogs.go index dd5c36a3fd..d441e75cd0 100644 --- a/protocol/metrics/rpcconsumerlogs.go +++ b/protocol/metrics/rpcconsumerlogs.go @@ -29,16 +29,17 @@ const ( ) type RPCConsumerLogs struct { - newRelicApplication *newrelic.Application - MetricService *MetricService - StoreMetricData bool - excludeMetricsReferrers string - excludedUserAgent []string - consumerMetricsManager *ConsumerMetricsManager - consumerRelayServerClient *ConsumerRelayServerClient + newRelicApplication *newrelic.Application + MetricService *MetricService + StoreMetricData bool + excludeMetricsReferrers string + excludedUserAgent []string + consumerMetricsManager *ConsumerMetricsManager + consumerRelayServerClient *ConsumerRelayServerClient + consumerOptimizerQoSClient *ConsumerOptimizerQoSClient } -func NewRPCConsumerLogs(consumerMetricsManager *ConsumerMetricsManager, consumerRelayServerClient *ConsumerRelayServerClient) (*RPCConsumerLogs, error) { +func NewRPCConsumerLogs(consumerMetricsManager *ConsumerMetricsManager, consumerRelayServerClient *ConsumerRelayServerClient, consumerOptimizerQoSClient *ConsumerOptimizerQoSClient) (*RPCConsumerLogs, error) { err := godotenv.Load() if err != nil { utils.LavaFormatInfo("New relic missing environment file") @@ -49,7 +50,11 @@ func NewRPCConsumerLogs(consumerMetricsManager *ConsumerMetricsManager, consumer newRelicLicenseKey := os.Getenv("NEW_RELIC_LICENSE_KEY") if newRelicAppName == "" || newRelicLicenseKey == "" { utils.LavaFormatInfo("New relic missing environment variables") - return &RPCConsumerLogs{consumerMetricsManager: consumerMetricsManager, consumerRelayServerClient: consumerRelayServerClient}, nil + return &RPCConsumerLogs{ + consumerMetricsManager: consumerMetricsManager, + consumerRelayServerClient: consumerRelayServerClient, + consumerOptimizerQoSClient: consumerOptimizerQoSClient, + }, nil } newRelicApplication, err := newrelic.NewApplication( @@ -91,12 +96,19 @@ func (rpccl *RPCConsumerLogs) SetWebSocketConnectionActive(chainId string, apiIn rpccl.consumerMetricsManager.SetWebSocketConnectionActive(chainId, apiInterface, add) } -func (rpccl *RPCConsumerLogs) SetRelaySentToProviderMetric(chainId string, apiInterface string) { +func (rpccl *RPCConsumerLogs) SetRelaySentToProviderMetric(providerAddress, chainId, apiInterface string) { rpccl.consumerMetricsManager.SetRelaySentToProviderMetric(chainId, apiInterface) + rpccl.consumerOptimizerQoSClient.SetRelaySentToProvider(providerAddress, chainId) } -func (rpccl *RPCConsumerLogs) SetRelayNodeErrorMetric(chainId string, apiInterface string) { +func (rpccl *RPCConsumerLogs) SetRelayNodeErrorMetric(providerAddress, chainId, apiInterface string) { + if providerAddress == "" { + // skip if provider address is empty + return + } + rpccl.consumerMetricsManager.SetRelayNodeErrorMetric(chainId, apiInterface) + rpccl.consumerOptimizerQoSClient.SetNodeErrorToProvider(providerAddress, chainId) } func (rpccl *RPCConsumerLogs) SetNodeErrorRecoveredSuccessfullyMetric(chainId string, apiInterface string, attempt string) { diff --git a/protocol/metrics/rpcconsumerlogs_test.go b/protocol/metrics/rpcconsumerlogs_test.go index ae3d96d260..1d9273bb22 100644 --- a/protocol/metrics/rpcconsumerlogs_test.go +++ b/protocol/metrics/rpcconsumerlogs_test.go @@ -23,7 +23,7 @@ type ErrorData struct { } func TestGetUniqueGuidResponseForError(t *testing.T) { - plog, err := NewRPCConsumerLogs(nil, nil) + plog, err := NewRPCConsumerLogs(nil, nil, nil) assert.Nil(t, err) responseError := errors.New("response error") @@ -39,7 +39,7 @@ func TestGetUniqueGuidResponseForError(t *testing.T) { } func TestGetUniqueGuidResponseDeterministic(t *testing.T) { - plog, err := NewRPCConsumerLogs(nil, nil) + plog, err := NewRPCConsumerLogs(nil, nil, nil) assert.Nil(t, err) responseError := errors.New("response error") @@ -58,7 +58,7 @@ func TestAnalyzeWebSocketErrorAndWriteMessage(t *testing.T) { app.Get("/", websocket.New(func(c *websocket.Conn) { mt, _, _ := c.ReadMessage() - plog, _ := NewRPCConsumerLogs(nil, nil) + plog, _ := NewRPCConsumerLogs(nil, nil, nil) responseError := errors.New("response error") formatterMsg := plog.AnalyzeWebSocketErrorAndGetFormattedMessage(c.LocalAddr().String(), responseError, "seed", []byte{}, "rpcType", 1*time.Millisecond) assert.NotNil(t, formatterMsg) diff --git a/protocol/provideroptimizer/provider_optimizer.go b/protocol/provideroptimizer/provider_optimizer.go index 806818d970..0d89897028 100644 --- a/protocol/provideroptimizer/provider_optimizer.go +++ b/protocol/provideroptimizer/provider_optimizer.go @@ -9,6 +9,7 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/dgraph-io/ristretto" "github.com/lavanet/lava/v3/protocol/common" + "github.com/lavanet/lava/v3/protocol/metrics" "github.com/lavanet/lava/v3/utils" "github.com/lavanet/lava/v3/utils/lavaslices" "github.com/lavanet/lava/v3/utils/rand" @@ -48,6 +49,9 @@ type cacheInf interface { Set(key, value interface{}, cost int64) bool } +type consumerOptimizerQoSClientInf interface { + UpdatePairingListStake(stakeMap map[string]int64, chainId string, epoch uint64) +} type ProviderOptimizer struct { strategy Strategy providersStorage cacheInf @@ -58,6 +62,8 @@ type ProviderOptimizer struct { latestSyncData ConcurrentBlockStore selectionWeighter SelectionWeighter OptimizerNumTiers int + consumerOptimizerQoSClient consumerOptimizerQoSClientInf + chainId string } type Exploration struct { @@ -86,8 +92,13 @@ const ( STRATEGY_DISTRIBUTED ) -func (po *ProviderOptimizer) UpdateWeights(weights map[string]int64) { +func (po *ProviderOptimizer) UpdateWeights(weights map[string]int64, epoch uint64) { po.selectionWeighter.SetWeights(weights) + + // Update the stake map for metrics + if po.consumerOptimizerQoSClient != nil { + po.consumerOptimizerQoSClient.UpdatePairingListStake(weights, po.chainId, epoch) + } } func (po *ProviderOptimizer) AppendRelayFailure(providerAddress string) { @@ -149,30 +160,53 @@ func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latenc ) } -func (po *ProviderOptimizer) CalculateSelectionTiers(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (SelectionTier, Exploration) { +func (po *ProviderOptimizer) calcLatencyAndSyncScores(providerData ProviderData, cu uint64, requestedBlock int64) (float64, float64) { + // latency score + latencyScoreCurrent := po.calculateLatencyScore(providerData, cu, requestedBlock) // smaller == better i.e less latency + + // sync score + syncScoreCurrent := float64(0) + if requestedBlock < 0 { + // means user didn't ask for a specific block and we want to give him the best + syncScoreCurrent = po.calculateSyncScore(providerData.Sync) // smaller == better i.e less sync lag + } + + return latencyScoreCurrent, syncScoreCurrent +} + +func (po *ProviderOptimizer) CalculateQoSScoresForMetrics(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) []*metrics.OptimizerQoSReport { + selectionTier, _, providersScores := po.CalculateSelectionTiers(allAddresses, ignoredProviders, cu, requestedBlock) + reports := []*metrics.OptimizerQoSReport{} + + rawScores := selectionTier.GetRawScores() + for idx, entry := range rawScores { + qosReport := providersScores[entry.Address] + qosReport.EntryIndex = idx + reports = append(reports, qosReport) + } + + return reports +} + +func (po *ProviderOptimizer) CalculateSelectionTiers(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (SelectionTier, Exploration, map[string]*metrics.OptimizerQoSReport) { latencyScore := math.MaxFloat64 // smaller = better i.e less latency syncScore := math.MaxFloat64 // smaller = better i.e less sync lag explorationCandidate := Exploration{address: "", time: time.Now().Add(time.Hour)} selectionTier := NewSelectionTier() + providerScores := make(map[string]*metrics.OptimizerQoSReport) for _, providerAddress := range allAddresses { if _, ok := ignoredProviders[providerAddress]; ok { // ignored provider, skip it continue } + providerData, found := po.getProviderData(providerAddress) if !found { utils.LavaFormatTrace("provider data was not found for address", utils.LogAttr("providerAddress", providerAddress)) } - // latency score - latencyScoreCurrent := po.calculateLatencyScore(providerData, cu, requestedBlock) // smaller == better i.e less latency - - // sync score - syncScoreCurrent := float64(0) - if requestedBlock < 0 { - // means user didn't ask for a specific block and we want to give him the best - syncScoreCurrent = po.calculateSyncScore(providerData.Sync) // smaller == better i.e less sync lag - } + + latencyScoreCurrent, syncScoreCurrent := po.calcLatencyAndSyncScores(providerData, cu, requestedBlock) utils.LavaFormatTrace("scores information", utils.LogAttr("providerAddress", providerAddress), @@ -181,7 +215,15 @@ func (po *ProviderOptimizer) CalculateSelectionTiers(allAddresses []string, igno utils.LogAttr("latencyScore", latencyScore), utils.LogAttr("syncScore", syncScore), ) + providerScore := po.calcProviderScore(latencyScoreCurrent, syncScoreCurrent) + providerScores[providerAddress] = &metrics.OptimizerQoSReport{ + ProviderAddress: providerAddress, + SyncScore: syncScoreCurrent, + AvailabilityScore: providerData.Availability.Num / providerData.Availability.Denom, + LatencyScore: latencyScoreCurrent, + GenericScore: providerScore, + } selectionTier.AddScore(providerAddress, providerScore) // check if candidate for exploration @@ -191,12 +233,12 @@ func (po *ProviderOptimizer) CalculateSelectionTiers(allAddresses []string, igno explorationCandidate = Exploration{address: providerAddress, time: updateTime} } } - return selectionTier, explorationCandidate + return selectionTier, explorationCandidate, providerScores } // returns a sub set of selected providers according to their scores, perturbation factor will be added to each score in order to randomly select providers that are not always on top func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (addresses []string, tier int) { - selectionTier, explorationCandidate := po.CalculateSelectionTiers(allAddresses, ignoredProviders, cu, requestedBlock) + selectionTier, explorationCandidate, _ := po.CalculateSelectionTiers(allAddresses, ignoredProviders, cu, requestedBlock) if selectionTier.ScoresCount() == 0 { // no providers to choose from return []string{}, -1 @@ -493,7 +535,7 @@ func (po *ProviderOptimizer) getRelayStatsTimes(providerAddress string) []time.T return nil } -func NewProviderOptimizer(strategy Strategy, averageBlockTIme, baseWorldLatency time.Duration, wantedNumProvidersInConcurrency uint) *ProviderOptimizer { +func NewProviderOptimizer(strategy Strategy, averageBlockTIme, baseWorldLatency time.Duration, wantedNumProvidersInConcurrency uint, consumerOptimizerQoSClientInf consumerOptimizerQoSClientInf, chainId string) *ProviderOptimizer { cache, err := ristretto.NewCache(&ristretto.Config{NumCounters: CacheNumCounters, MaxCost: CacheMaxCost, BufferItems: 64, IgnoreInternalCost: true}) if err != nil { utils.LavaFormatFatal("failed setting up cache for queries", err) @@ -515,6 +557,8 @@ func NewProviderOptimizer(strategy Strategy, averageBlockTIme, baseWorldLatency wantedNumProvidersInConcurrency: wantedNumProvidersInConcurrency, selectionWeighter: NewSelectionWeighter(), OptimizerNumTiers: OptimizerNumTiers, + consumerOptimizerQoSClient: consumerOptimizerQoSClientInf, + chainId: chainId, } } diff --git a/protocol/provideroptimizer/provider_optimizer_test.go b/protocol/provideroptimizer/provider_optimizer_test.go index 37b770e40e..859019590b 100644 --- a/protocol/provideroptimizer/provider_optimizer_test.go +++ b/protocol/provideroptimizer/provider_optimizer_test.go @@ -1,12 +1,17 @@ package provideroptimizer import ( + "context" "fmt" + "net/http" + "net/http/httptest" "strconv" "sync" "testing" "time" + "github.com/goccy/go-json" + "github.com/lavanet/lava/v3/protocol/metrics" "github.com/lavanet/lava/v3/utils" "github.com/lavanet/lava/v3/utils/rand" spectypes "github.com/lavanet/lava/v3/x/spec/types" @@ -40,7 +45,7 @@ func (posc *providerOptimizerSyncCache) Set(key, value interface{}, cost int64) func setupProviderOptimizer(maxProvidersCount int) *ProviderOptimizer { averageBlockTIme := TEST_AVERAGE_BLOCK_TIME baseWorldLatency := TEST_BASE_WORLD_LATENCY - return NewProviderOptimizer(STRATEGY_BALANCED, averageBlockTIme, baseWorldLatency, uint(maxProvidersCount)) + return NewProviderOptimizer(STRATEGY_BALANCED, averageBlockTIme, baseWorldLatency, uint(maxProvidersCount), nil, "dontcare") } type providersGenerator struct { @@ -288,7 +293,7 @@ func TestProviderOptimizerAvailabilityBlockError(t *testing.T) { providerOptimizer.AppendRelayData(providersGen.providersAddresses[i], TEST_BASE_WORLD_LATENCY, false, requestCU, syncBlock-1) // update that he doesn't have the latest requested block } time.Sleep(4 * time.Millisecond) - selectionTier, _ := providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, nil, requestCU, requestBlock) + selectionTier, _, _ := providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, nil, requestCU, requestBlock) tierChances := selectionTier.ShiftTierChance(OptimizerNumTiers, map[int]float64{0: ATierChance, OptimizerNumTiers - 1: LastTierChance}) require.Greater(t, tierChances[0], 0.7, tierChances) results, tierResults := runChooseManyTimesAndReturnResults(t, providerOptimizer, providersGen.providersAddresses, nil, requestCU, requestBlock, 1000) @@ -431,14 +436,14 @@ func TestProviderOptimizerSyncScore(t *testing.T) { sampleTime = sampleTime.Add(time.Millisecond * 5) } time.Sleep(4 * time.Millisecond) - selectionTier, _ := providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, nil, requestCU, requestBlock) + selectionTier, _, _ := providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, nil, requestCU, requestBlock) tier0 := selectionTier.GetTier(0, 4, 3) require.Greater(t, len(tier0), 0) // shouldn't be empty // we have the best score on the top tier and it's sorted require.Equal(t, providersGen.providersAddresses[chosenIndex], tier0[0].Address) // now choose with a specific block that all providers have - selectionTier, _ = providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, nil, requestCU, int64(syncBlock)) + selectionTier, _, _ = providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, nil, requestCU, int64(syncBlock)) tier0 = selectionTier.GetTier(0, 4, 3) for idx := range tier0 { // sync score doesn't matter now so the tier0 is recalculated and chosenIndex has worst latency @@ -503,7 +508,7 @@ func TestProviderOptimizerStrategiesScoring(t *testing.T) { time.Sleep(4 * time.Millisecond) providerOptimizer.strategy = STRATEGY_BALANCED // a balanced strategy should pick provider 2 because of it's high availability - selectionTier, _ := providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, nil, requestCU, requestBlock) + selectionTier, _, _ := providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, nil, requestCU, requestBlock) tier0 := selectionTier.GetTier(0, 4, 3) require.Greater(t, len(tier0), 0) // shouldn't be empty // we have the best score on the top tier and it's sorted @@ -511,7 +516,7 @@ func TestProviderOptimizerStrategiesScoring(t *testing.T) { providerOptimizer.strategy = STRATEGY_COST // with a cost strategy we expect the same as balanced - selectionTier, _ = providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, nil, requestCU, requestBlock) + selectionTier, _, _ = providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, nil, requestCU, requestBlock) tier0 = selectionTier.GetTier(0, 4, 3) require.Greater(t, len(tier0), 0) // shouldn't be empty // we have the best score on the top tier and it's sorted @@ -519,20 +524,20 @@ func TestProviderOptimizerStrategiesScoring(t *testing.T) { providerOptimizer.strategy = STRATEGY_LATENCY // latency strategy should pick the best latency - selectionTier, _ = providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[2]: {}}, requestCU, requestBlock) + selectionTier, _, _ = providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[2]: {}}, requestCU, requestBlock) tier0 = selectionTier.GetTier(0, 4, 3) require.Greater(t, len(tier0), 0) // shouldn't be empty require.Equal(t, providersGen.providersAddresses[0], tier0[0].Address) providerOptimizer.strategy = STRATEGY_SYNC_FRESHNESS // freshness strategy should pick the most advanced provider - selectionTier, _ = providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[2]: {}}, requestCU, requestBlock) + selectionTier, _, _ = providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[2]: {}}, requestCU, requestBlock) tier0 = selectionTier.GetTier(0, 4, 3) require.Greater(t, len(tier0), 0) // shouldn't be empty require.Equal(t, providersGen.providersAddresses[1], tier0[0].Address) // but if we request a past block, then it doesnt matter and we choose by latency: - selectionTier, _ = providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[2]: {}}, requestCU, int64(syncBlock)) + selectionTier, _, _ = providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[2]: {}}, requestCU, int64(syncBlock)) tier0 = selectionTier.GetTier(0, 4, 3) require.Greater(t, len(tier0), 0) // shouldn't be empty require.Equal(t, providersGen.providersAddresses[0], tier0[0].Address) @@ -686,7 +691,7 @@ func TestProviderOptimizerWeights(t *testing.T) { improvedLatency := normalLatency - 5*time.Millisecond improvedBlock := syncBlock + 2 - providerOptimizer.UpdateWeights(weights) + providerOptimizer.UpdateWeights(weights, syncBlock) for i := 0; i < 10; i++ { for idx, address := range providersGen.providersAddresses { if idx == 0 { @@ -700,7 +705,7 @@ func TestProviderOptimizerWeights(t *testing.T) { } // verify 0 has the best score - selectionTier, _ := providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, nil, requestCU, requestBlock) + selectionTier, _, _ := providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, nil, requestCU, requestBlock) tier0 := selectionTier.GetTier(0, 4, 3) require.Greater(t, len(tier0), 0) // shouldn't be empty require.Equal(t, providersGen.providersAddresses[0], tier0[0].Address) @@ -739,7 +744,7 @@ func TestProviderOptimizerTiers(t *testing.T) { time.Sleep(4 * time.Millisecond) } } - selectionTier, _ := providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, nil, requestCU, requestBlock) + selectionTier, _, _ := providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, nil, requestCU, requestBlock) shiftedChances := selectionTier.ShiftTierChance(4, map[int]float64{0: 0.75}) require.NotZero(t, shiftedChances[3]) // if we pick by sync, provider 0 is in the top tier and should be selected very often @@ -754,6 +759,50 @@ func TestProviderOptimizerTiers(t *testing.T) { } } +func TestProviderOptimizerWithOptimizerQoSClient(t *testing.T) { + rand.InitRandomSeed() + + wg := sync.WaitGroup{} + wg.Add(1) + httpServerHandler := func(w http.ResponseWriter, r *http.Request) { + data := make([]byte, r.ContentLength) + r.Body.Read(data) + + optimizerQoSReport := &[]map[string]interface{}{} + err := json.Unmarshal(data, optimizerQoSReport) + require.NoError(t, err) + require.NotZero(t, len(*optimizerQoSReport)) + w.WriteHeader(http.StatusOK) + wg.Done() + } + + mockHttpServer := httptest.NewServer(http.HandlerFunc(httpServerHandler)) + defer mockHttpServer.Close() + + chainId := "dontcare" + + consumerOptimizerQoSClient := metrics.NewConsumerOptimizerQoSClient(mockHttpServer.URL, 1*time.Second) + consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(context.Background(), 900*time.Millisecond) + + providerOptimizer := NewProviderOptimizer(STRATEGY_BALANCED, TEST_AVERAGE_BLOCK_TIME, TEST_BASE_WORLD_LATENCY, 10, consumerOptimizerQoSClient, chainId) + consumerOptimizerQoSClient.RegisterOptimizer(providerOptimizer, chainId) + + syncBlock := uint64(1000) + + providerAddr := "lava@test" + + providerOptimizer.UpdateWeights(map[string]int64{ + providerAddr: 1000000000, + }, syncBlock) + + requestCU := uint64(10) + + normalLatency := TEST_BASE_WORLD_LATENCY * 2 + providerOptimizer.appendRelayData(providerAddr, normalLatency, false, true, requestCU, syncBlock, time.Now()) + + wg.Wait() +} + // TODO: new tests we need: // check 3 providers, one with great stake one with great score // retries: groups getting smaller diff --git a/protocol/provideroptimizer/selection_tier.go b/protocol/provideroptimizer/selection_tier.go index 1a239fdf8b..09db293234 100644 --- a/protocol/provideroptimizer/selection_tier.go +++ b/protocol/provideroptimizer/selection_tier.go @@ -21,6 +21,7 @@ type SelectionTier interface { SelectTierRandomly(numTiers int, tierChances map[int]float64) int ShiftTierChance(numTiers int, initialYierChances map[int]float64) map[int]float64 ScoresCount() int + GetRawScores() []Entry } type SelectionTierInst struct { @@ -31,6 +32,10 @@ func NewSelectionTier() SelectionTier { return &SelectionTierInst{scores: []Entry{}} } +func (st *SelectionTierInst) GetRawScores() []Entry { + return st.scores +} + func (st *SelectionTierInst) ScoresCount() int { return len(st.scores) } diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index bb57bf18b5..f994fc71ea 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -32,7 +32,7 @@ const ( ) type MetricsInterface interface { - SetRelayNodeErrorMetric(chainId string, apiInterface string) + SetRelayNodeErrorMetric(providerAddress string, chainId string, apiInterface string) SetNodeErrorRecoveredSuccessfullyMetric(chainId string, apiInterface string, attempt string) SetNodeErrorAttemptMetric(chainId string, apiInterface string) } @@ -253,7 +253,8 @@ func (rp *RelayProcessor) handleResponse(response *relayResponse) { nodeError := rp.ResultsManager.SetResponse(response, rp.RelayStateMachine.GetProtocolMessage()) // send relay error metrics only on non stateful queries, as stateful queries always return X-1/X errors. if nodeError != nil && rp.selection != BestResult { - go rp.metricsInf.SetRelayNodeErrorMetric(rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface()) + chainId, apiInterface := rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface() + go rp.metricsInf.SetRelayNodeErrorMetric(response.relayResult.ProviderInfo.ProviderAddress, chainId, apiInterface) utils.LavaFormatInfo("Relay received a node error", utils.LogAttr("Error", nodeError), utils.LogAttr("provider", response.relayResult.ProviderInfo), utils.LogAttr("Request", rp.RelayStateMachine.GetProtocolMessage().GetApi().Name)) } diff --git a/protocol/rpcconsumer/relay_processor_test.go b/protocol/rpcconsumer/relay_processor_test.go index 7bd4f85151..f5f59aa986 100644 --- a/protocol/rpcconsumer/relay_processor_test.go +++ b/protocol/rpcconsumer/relay_processor_test.go @@ -19,7 +19,8 @@ import ( type relayProcessorMetricsMock struct{} -func (romm *relayProcessorMetricsMock) SetRelayNodeErrorMetric(chainId string, apiInterface string) {} +func (romm *relayProcessorMetricsMock) SetRelayNodeErrorMetric(providerAddress, chainId, apiInterface string) { +} func (romm *relayProcessorMetricsMock) SetNodeErrorRecoveredSuccessfullyMetric(chainId string, apiInterface string, attempt string) { } diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index 3dcbe3e54e..b7af5a4fd4 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -100,29 +100,30 @@ type ConsumerStateTrackerInf interface { GetLatestVirtualEpoch() uint64 } -type AnalyticsServerAddressess struct { +type AnalyticsServerAddresses struct { AddApiMethodCallsMetrics bool MetricsListenAddress string RelayServerAddress string ReportsAddressFlag string + OptimizerQoSAddress string } type RPCConsumer struct { consumerStateTracker ConsumerStateTrackerInf } type rpcConsumerStartOptions struct { - txFactory tx.Factory - clientCtx client.Context - rpcEndpoints []*lavasession.RPCEndpoint - requiredResponses int - cache *performance.Cache - strategy provideroptimizer.Strategy - maxConcurrentProviders uint - analyticsServerAddressess AnalyticsServerAddressess - cmdFlags common.ConsumerCmdFlags - stateShare bool - refererData *chainlib.RefererData - staticProvidersList []*lavasession.RPCProviderEndpoint // define static providers as backup to lava providers + txFactory tx.Factory + clientCtx client.Context + rpcEndpoints []*lavasession.RPCEndpoint + requiredResponses int + cache *performance.Cache + strategy provideroptimizer.Strategy + maxConcurrentProviders uint + analyticsServerAddresses AnalyticsServerAddresses + cmdFlags common.ConsumerCmdFlags + stateShare bool + refererData *chainlib.RefererData + staticProvidersList []*lavasession.RPCProviderEndpoint // define static providers as backup to lava providers } // spawns a new RPCConsumer server with all it's processes and internals ready for communications @@ -131,10 +132,16 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt testModeWarn("RPCConsumer running tests") } options.refererData.ReferrerClient = metrics.NewConsumerReferrerClient(options.refererData.Address) - consumerReportsManager := metrics.NewConsumerReportsClient(options.analyticsServerAddressess.ReportsAddressFlag) - consumerMetricsManager := metrics.NewConsumerMetricsManager(metrics.ConsumerMetricsManagerOptions{NetworkAddress: options.analyticsServerAddressess.MetricsListenAddress, AddMethodsApiGauge: options.analyticsServerAddressess.AddApiMethodCallsMetrics}) // start up prometheus metrics - consumerUsageserveManager := metrics.NewConsumerRelayServerClient(options.analyticsServerAddressess.RelayServerAddress) // start up relay server reporting - rpcConsumerMetrics, err := metrics.NewRPCConsumerLogs(consumerMetricsManager, consumerUsageserveManager) + consumerReportsManager := metrics.NewConsumerReportsClient(options.analyticsServerAddresses.ReportsAddressFlag) + consumerMetricsManager := metrics.NewConsumerMetricsManager(metrics.ConsumerMetricsManagerOptions{NetworkAddress: options.analyticsServerAddresses.MetricsListenAddress, AddMethodsApiGauge: options.analyticsServerAddresses.AddApiMethodCallsMetrics}) // start up prometheus metrics + consumerUsageServeManager := metrics.NewConsumerRelayServerClient(options.analyticsServerAddresses.RelayServerAddress) // start up relay server reporting + var consumerOptimizerQoSClient *metrics.ConsumerOptimizerQoSClient + if options.analyticsServerAddresses.OptimizerQoSAddress != "" { + consumerOptimizerQoSClient = metrics.NewConsumerOptimizerQoSClient(options.analyticsServerAddresses.OptimizerQoSAddress, metrics.OptimizerQosServerPushInterval) // start up optimizer qos client + consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(ctx, metrics.OptimizerQosServerSamplingInterval) // start up optimizer qos client + } + + rpcConsumerMetrics, err := metrics.NewRPCConsumerLogs(consumerMetricsManager, consumerUsageServeManager, consumerOptimizerQoSClient) if err != nil { utils.LavaFormatFatal("failed creating RPCConsumer logs", err) } @@ -243,12 +250,17 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better // Create / Use existing optimizer - newOptimizer := provideroptimizer.NewProviderOptimizer(options.strategy, averageBlockTime, baseLatency, options.maxConcurrentProviders) - optimizer, _, err = optimizers.LoadOrStore(chainID, newOptimizer) + newOptimizer := provideroptimizer.NewProviderOptimizer(options.strategy, averageBlockTime, baseLatency, options.maxConcurrentProviders, consumerOptimizerQoSClient, chainID) + optimizer, loaded, err = optimizers.LoadOrStore(chainID, newOptimizer) if err != nil { return utils.LavaFormatError("failed loading optimizer", err, utils.LogAttr("endpoint", rpcEndpoint.Key())) } + if !loaded { + // if this is a new optimizer, register it in the consumerOptimizerQoSClient + consumerOptimizerQoSClient.RegisterOptimizer(optimizer, chainID) + } + // Create / Use existing ConsumerConsistency newConsumerConsistency := NewConsumerConsistency(chainID) consumerConsistency, _, err = consumerConsistencies.LoadOrStore(chainID, newConsumerConsistency) @@ -539,11 +551,12 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 utils.LavaFormatInfo("Working with selection strategy: " + strategyFlag.String()) } - analyticsServerAddressess := AnalyticsServerAddressess{ + analyticsServerAddresses := AnalyticsServerAddresses{ AddApiMethodCallsMetrics: viper.GetBool(metrics.AddApiMethodCallsMetrics), MetricsListenAddress: viper.GetString(metrics.MetricsListenFlagName), RelayServerAddress: viper.GetString(metrics.RelayServerFlagName), ReportsAddressFlag: viper.GetString(reportsSendBEAddress), + OptimizerQoSAddress: viper.GetString(common.OptimizerQosServerAddressFlag), } var refererData *chainlib.RefererData @@ -574,7 +587,20 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 } rpcConsumerSharedState := viper.GetBool(common.SharedStateFlag) - err = rpcConsumer.Start(ctx, &rpcConsumerStartOptions{txFactory, clientCtx, rpcEndpoints, requiredResponses, cache, strategyFlag.Strategy, maxConcurrentProviders, analyticsServerAddressess, consumerPropagatedFlags, rpcConsumerSharedState, refererData, staticProviderEndpoints}) + err = rpcConsumer.Start(ctx, &rpcConsumerStartOptions{ + txFactory, + clientCtx, + rpcEndpoints, + requiredResponses, + cache, + strategyFlag.Strategy, + maxConcurrentProviders, + analyticsServerAddresses, + consumerPropagatedFlags, + rpcConsumerSharedState, + refererData, + staticProviderEndpoints, + }) return err }, } @@ -618,6 +644,10 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 cmdRPCConsumer.Flags().Float64Var(&provideroptimizer.ATierChance, common.SetProviderOptimizerBestTierPickChance, 0.75, "set the chances for picking a provider from the best group, default is 75% -> 0.75") cmdRPCConsumer.Flags().Float64Var(&provideroptimizer.LastTierChance, common.SetProviderOptimizerWorstTierPickChance, 0.0, "set the chances for picking a provider from the worse group, default is 0% -> 0.0") cmdRPCConsumer.Flags().IntVar(&provideroptimizer.OptimizerNumTiers, common.SetProviderOptimizerNumberOfTiersToCreate, 4, "set the number of groups to create, default is 4") + // optimizer qos reports + cmdRPCConsumer.Flags().String(common.OptimizerQosServerAddressFlag, "", "address to send optimizer qos reports to") + cmdRPCConsumer.Flags().DurationVar(&metrics.OptimizerQosServerPushInterval, common.OptimizerQosServerPushIntervalFlag, time.Minute*5, "interval to push optimizer qos reports") + cmdRPCConsumer.Flags().DurationVar(&metrics.OptimizerQosServerSamplingInterval, common.OptimizerQosServerSamplingIntervalFlag, time.Second*1, "interval to sample optimizer qos reports") cmdRPCConsumer.Flags().IntVar(&chainlib.WebSocketRateLimit, common.RateLimitWebSocketFlag, chainlib.WebSocketRateLimit, "rate limit (per second) websocket requests per user connection, default is unlimited") cmdRPCConsumer.Flags().DurationVar(&chainlib.WebSocketBanDuration, common.BanDurationForWebsocketRateLimitExceededFlag, chainlib.WebSocketBanDuration, "once websocket rate limit is reached, user will be banned Xfor a duration, default no ban") common.AddRollingLogConfig(cmdRPCConsumer) diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index ba700d6630..6af0f617b4 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -662,7 +662,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( endpointClient := singleConsumerSession.EndpointConnection.Client // set relay sent metric - go rpccs.rpcConsumerLogs.SetRelaySentToProviderMetric(chainId, apiInterface) + go rpccs.rpcConsumerLogs.SetRelaySentToProviderMetric(providerPublicAddress, chainId, apiInterface) if chainlib.IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) { utils.LavaFormatTrace("inside sendRelayToProvider, relay is subscription", utils.LogAttr("requestData", localRelayRequestData.Data)) diff --git a/protocol/rpcconsumer/rpcconsumer_server_test.go b/protocol/rpcconsumer/rpcconsumer_server_test.go index 639e03cb5d..87ef84266d 100644 --- a/protocol/rpcconsumer/rpcconsumer_server_test.go +++ b/protocol/rpcconsumer/rpcconsumer_server_test.go @@ -54,7 +54,7 @@ func createRpcConsumer(t *testing.T, ctrl *gomock.Controller, ctx context.Contex finalizationConsensus := finalizationconsensus.NewFinalizationConsensus(rpcEndpoint.ChainID) _, averageBlockTime, _, _ := chainParser.ChainBlockStats() baseLatency := common.AverageWorldLatency / 2 - optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2) + optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2, nil, "dontcare") consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, nil, nil, "test", lavasession.NewActiveSubscriptionProvidersStorage()) consumerSessionManager.UpdateAllProviders(epoch, map[uint64]*lavasession.ConsumerSessionsWithProvider{ epoch: { @@ -68,7 +68,7 @@ func createRpcConsumer(t *testing.T, ctrl *gomock.Controller, ctx context.Contex consumerCmdFlags := common.ConsumerCmdFlags{ RelaysHealthEnableFlag: false, } - rpcsonumerLogs, err := metrics.NewRPCConsumerLogs(nil, nil) + rpcsonumerLogs, err := metrics.NewRPCConsumerLogs(nil, nil, nil) require.NoError(t, err) err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, requiredResponses, consumeSK, lavaChainID, nil, rpcsonumerLogs, consumerAccount, consumerConsistency, nil, consumerCmdFlags, false, nil, nil, nil) require.NoError(t, err) diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 9b860b433b..fa83eff13d 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -779,7 +779,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt cmdRPCProvider.Flags().Uint(ShardIDFlagName, DefaultShardID, "shard id") cmdRPCProvider.Flags().Uint(rewardserver.RewardsSnapshotThresholdFlagName, rewardserver.DefaultRewardsSnapshotThreshold, "the number of rewards to wait until making snapshot of the rewards memory") cmdRPCProvider.Flags().Uint(rewardserver.RewardsSnapshotTimeoutSecFlagName, rewardserver.DefaultRewardsSnapshotTimeoutSec, "the seconds to wait until making snapshot of the rewards memory") - cmdRPCProvider.Flags().String(StickinessHeaderName, RPCProviderStickinessHeaderName, "the name of the header to be attacked to requests for stickiness by consumer, used for consistency") + cmdRPCProvider.Flags().String(StickinessHeaderName, RPCProviderStickinessHeaderName, "the name of the header to be attached to requests for stickiness by consumer, used for consistency") cmdRPCProvider.Flags().Uint64Var(&chaintracker.PollingMultiplier, chaintracker.PollingMultiplierFlagName, 1, "when set, forces the chain tracker to poll more often, improving the sync at the cost of more queries") cmdRPCProvider.Flags().DurationVar(&SpecValidationInterval, SpecValidationIntervalFlagName, SpecValidationInterval, "determines the interval of which to run validation on the spec for all connected chains") cmdRPCProvider.Flags().DurationVar(&SpecValidationIntervalDisabledChains, SpecValidationIntervalDisabledChainsFlagName, SpecValidationIntervalDisabledChains, "determines the interval of which to run validation on the spec for all disabled chains, determines recovery time") diff --git a/protocol/statetracker/events.go b/protocol/statetracker/events.go index fa1383e9f5..1283d633e2 100644 --- a/protocol/statetracker/events.go +++ b/protocol/statetracker/events.go @@ -723,23 +723,17 @@ func countTransactionsPerDay(ctx context.Context, clientCtx client.Context, bloc } // Log the transactions per day results - totalTxPerDay.Range(func(key, value interface{}) bool { - utils.LavaFormatInfo("transactions per day results", utils.LogAttr("Day", key), utils.LogAttr("totalTx", value)) + totalTxPerDay.Range(func(day int64, totalTx int) bool { + utils.LavaFormatInfo("transactions per day results", utils.LogAttr("Day", day), utils.LogAttr("totalTx", totalTx)) return true // continue iteration }) // Prepare the JSON data jsonData := make(map[string]int) - totalTxPerDay.Range(func(key, value interface{}) bool { - day, ok := key.(int64) - if ok { - date := time.Now().AddDate(0, 0, -int(day)+1).Format("2006-01-02") - dateKey := fmt.Sprintf("date_%s", date) - val, ok2 := value.(int) - if ok2 { - jsonData[dateKey] = val - } - } + totalTxPerDay.Range(func(day int64, totalTx int) bool { + date := time.Now().AddDate(0, 0, -int(day)+1).Format("2006-01-02") + dateKey := fmt.Sprintf("date_%s", date) + jsonData[dateKey] = totalTx return true }) diff --git a/utils/maps/maps.go b/utils/maps/maps.go index 4658ebf3f3..dff615f026 100644 --- a/utils/maps/maps.go +++ b/utils/maps/maps.go @@ -3,6 +3,7 @@ package maps import ( "github.com/lavanet/lava/v3/utils/lavaslices" "golang.org/x/exp/constraints" + "golang.org/x/exp/maps" ) func FindLargestIntValueInMap[K comparable](myMap map[K]int) (K, int) { @@ -22,11 +23,7 @@ func FindLargestIntValueInMap[K comparable](myMap map[K]int) (K, int) { } func StableSortedKeys[T constraints.Ordered, V any](m map[T]V) []T { - keys := make([]T, 0, len(m)) - for k := range m { - keys = append(keys, k) - } - + keys := maps.Keys(m) lavaslices.SortStable(keys) return keys }