Skip to content

Commit

Permalink
Merge tag 'v3.2.2' into PRT-v-3-2-2-archive-special-api-near-experime…
Browse files Browse the repository at this point in the history
…ntal
  • Loading branch information
shleikes committed Oct 21, 2024
2 parents e719197 + 3dc3e32 commit 50ce36d
Show file tree
Hide file tree
Showing 23 changed files with 541 additions and 96 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Publish Lava Release
on:
push:
tags:
- v[0-9]+.[0-9]+.[0-9]+.*’
- 'v[0-9]+.[0-9]+.[0-9]+*'
workflow_dispatch:
inputs:
release_tag:
Expand Down
2 changes: 1 addition & 1 deletion protocol/chainlib/consumer_ws_subscription_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ func CreateConsumerSessionManager(chainID, apiInterface, consumerPublicAddress s
baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better
return lavasession.NewConsumerSessionManager(
&lavasession.RPCEndpoint{NetworkAddress: "stub", ChainID: chainID, ApiInterface: apiInterface, TLSEnabled: false, HealthCheckPath: "/", Geolocation: 0},
provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1),
provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1, nil, "dontcare"),
nil, nil, consumerPublicAddress,
lavasession.NewActiveSubscriptionProvidersStorage(),
)
Expand Down
4 changes: 4 additions & 0 deletions protocol/common/cobra_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ const (
SetProviderOptimizerWorstTierPickChance = "set-provider-optimizer-worst-tier-pick-chance"
SetProviderOptimizerNumberOfTiersToCreate = "set-provider-optimizer-number-of-tiers-to-create"

// optimizer qos server flags
OptimizerQosServerAddressFlag = "optimizer-qos-server-address" // address of the optimizer qos server to send the qos reports
OptimizerQosServerPushIntervalFlag = "optimizer-qos-push-interval" // interval to push the qos reports to the optimizer qos server
OptimizerQosServerSamplingIntervalFlag = "optimizer-qos-sampling-interval" // interval to sample the qos reports
// websocket flags
RateLimitWebSocketFlag = "rate-limit-websocket-requests-per-connection"
BanDurationForWebsocketRateLimitExceededFlag = "ban-duration-for-websocket-rate-limit-exceeded"
Expand Down
18 changes: 16 additions & 2 deletions protocol/common/safe_sync_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ func (ssm *SafeSyncMap[K, V]) LoadOrStore(key K, value V) (ret V, loaded bool, e
return value, false, nil
}

func (ssm *SafeSyncMap[K, V]) Range(f func(key, value any) bool) {
ssm.localMap.Range(f)
func (ssm *SafeSyncMap[K, V]) Range(f func(key K, value V) bool) {
ssm.localMap.Range(func(key, value any) bool {
unboxedKey, ok := key.(K)
if !ok {
utils.LavaFormatError("invalid usage of sync map, could not cast key into a type", nil)
return false
}

unboxedValue, ok := value.(V)
if !ok {
utils.LavaFormatError("invalid usage of sync map, could not cast value into a type", nil)
return false
}

return f(unboxedKey, unboxedValue)
})
}
4 changes: 2 additions & 2 deletions protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func createRpcConsumer(t *testing.T, ctx context.Context, rpcConsumerOptions rpc
finalizationConsensus := finalizationconsensus.NewFinalizationConsensus(rpcEndpoint.ChainID)
_, averageBlockTime, _, _ := chainParser.ChainBlockStats()
baseLatency := common.AverageWorldLatency / 2
optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2)
optimizer := provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, averageBlockTime, baseLatency, 2, nil, "dontcare")
consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, nil, nil, "test", lavasession.NewActiveSubscriptionProvidersStorage())
consumerSessionManager.UpdateAllProviders(rpcConsumerOptions.epoch, rpcConsumerOptions.pairingList)

Expand All @@ -219,7 +219,7 @@ func createRpcConsumer(t *testing.T, ctx context.Context, rpcConsumerOptions rpc

consumerConsistency := rpcconsumer.NewConsumerConsistency(rpcConsumerOptions.specId)
consumerCmdFlags := common.ConsumerCmdFlags{}
rpcconsumerLogs, err := metrics.NewRPCConsumerLogs(nil, nil)
rpcconsumerLogs, err := metrics.NewRPCConsumerLogs(nil, nil, nil)
require.NoError(t, err)
err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, rpcConsumerOptions.requiredResponses, rpcConsumerOptions.account.SK, rpcConsumerOptions.lavaChainID, cache, rpcconsumerLogs, rpcConsumerOptions.account.Addr, consumerConsistency, nil, consumerCmdFlags, false, nil, nil, nil)
require.NoError(t, err)
Expand Down
12 changes: 10 additions & 2 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func (csm *ConsumerSessionManager) UpdateAllProviders(epoch uint64, pairingList
csm.setValidAddressesToDefaultValue("", nil) // the starting point is that valid addresses are equal to pairing addresses.
// reset session related metrics
csm.consumerMetricsManager.ResetSessionRelatedMetrics()
csm.providerOptimizer.UpdateWeights(CalcWeightsByStake(pairingList))
go csm.providerOptimizer.UpdateWeights(CalcWeightsByStake(pairingList), epoch)

utils.LavaFormatDebug("updated providers", utils.Attribute{Key: "epoch", Value: epoch}, utils.Attribute{Key: "spec", Value: csm.rpcEndpoint.Key()})
return nil
}
Expand Down Expand Up @@ -1127,7 +1128,14 @@ func (csm *ConsumerSessionManager) GenerateReconnectCallback(consumerSessionsWit
}
}

func NewConsumerSessionManager(rpcEndpoint *RPCEndpoint, providerOptimizer ProviderOptimizer, consumerMetricsManager *metrics.ConsumerMetricsManager, reporter metrics.Reporter, consumerPublicAddress string, activeSubscriptionProvidersStorage *ActiveSubscriptionProvidersStorage) *ConsumerSessionManager {
func NewConsumerSessionManager(
rpcEndpoint *RPCEndpoint,
providerOptimizer ProviderOptimizer,
consumerMetricsManager *metrics.ConsumerMetricsManager,
reporter metrics.Reporter,
consumerPublicAddress string,
activeSubscriptionProvidersStorage *ActiveSubscriptionProvidersStorage,
) *ConsumerSessionManager {
csm := &ConsumerSessionManager{
reportedProviders: NewReportedProviders(reporter, rpcEndpoint.ChainID),
consumerMetricsManager: consumerMetricsManager,
Expand Down
2 changes: 1 addition & 1 deletion protocol/lavasession/consumer_session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestEndpointSortingFlow(t *testing.T) {
func CreateConsumerSessionManager() *ConsumerSessionManager {
rand.InitRandomSeed()
baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better
return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1), nil, nil, "lava@test", NewActiveSubscriptionProvidersStorage())
return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1, nil, "dontcare"), nil, nil, "lava@test", NewActiveSubscriptionProvidersStorage())
}

func TestMain(m *testing.M) {
Expand Down
2 changes: 1 addition & 1 deletion protocol/lavasession/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type ProviderOptimizer interface {
ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (addresses []string, tier int)
GetExcellenceQoSReportForProvider(string) (*pairingtypes.QualityOfServiceReport, *pairingtypes.QualityOfServiceReport)
Strategy() provideroptimizer.Strategy
UpdateWeights(map[string]int64)
UpdateWeights(map[string]int64, uint64)
}

type ignoredProviders struct {
Expand Down
281 changes: 281 additions & 0 deletions protocol/metrics/consumer_optimizer_qos_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
package metrics

import (
"context"
"os"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/lavanet/lava/v3/utils"
"github.com/lavanet/lava/v3/utils/rand"
spectypes "github.com/lavanet/lava/v3/x/spec/types"
"golang.org/x/exp/maps"
)

var (
OptimizerQosServerPushInterval time.Duration
OptimizerQosServerSamplingInterval time.Duration
)

type ConsumerOptimizerQoSClient struct {
consumerOrigin string
queueSender *QueueSender
optimizers map[string]OptimizerInf // keys are chain ids
// keys are chain ids, values are maps with provider addresses as keys
chainIdToProviderToRelaysCount map[string]map[string]uint64
chainIdToProviderToNodeErrorsCount map[string]map[string]uint64
chainIdToProviderToEpochToStake map[string]map[string]map[uint64]int64 // third key is epoch
currentEpoch atomic.Uint64
lock sync.RWMutex
}

type OptimizerQoSReport struct {
ProviderAddress string
SyncScore float64
AvailabilityScore float64
LatencyScore float64
GenericScore float64
EntryIndex int
}

type optimizerQoSReportToSend struct {
Timestamp time.Time `json:"timestamp"`
SyncScore float64 `json:"sync_score"`
AvailabilityScore float64 `json:"availability_score"`
LatencyScore float64 `json:"latency_score"`
GenericScore float64 `json:"generic_score"`
ProviderAddress string `json:"provider"`
ConsumerOrigin string `json:"consumer"`
ChainId string `json:"chain_id"`
NodeErrorRate float64 `json:"node_error_rate"`
Epoch uint64 `json:"epoch"`
ProviderStake int64 `json:"provider_stake"`
EntryIndex int `json:"entry_index"`
}

func (oqosr optimizerQoSReportToSend) String() string {
bytes, err := json.Marshal(oqosr)
if err != nil {
return ""
}
return string(bytes)
}

type OptimizerInf interface {
CalculateQoSScoresForMetrics(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) []*OptimizerQoSReport
}

func NewConsumerOptimizerQoSClient(endpointAddress string, interval ...time.Duration) *ConsumerOptimizerQoSClient {
hostname, err := os.Hostname()
if err != nil {
utils.LavaFormatWarning("Error while getting hostname for ConsumerOptimizerQoSClient", err)
hostname = "unknown" + strconv.FormatUint(rand.Uint64(), 10) // random seed for different unknowns
}

return &ConsumerOptimizerQoSClient{
consumerOrigin: hostname,
queueSender: NewQueueSender(endpointAddress, "ConsumerOptimizerQoS", nil, interval...),
optimizers: map[string]OptimizerInf{},
chainIdToProviderToRelaysCount: map[string]map[string]uint64{},
chainIdToProviderToNodeErrorsCount: map[string]map[string]uint64{},
chainIdToProviderToEpochToStake: map[string]map[string]map[uint64]int64{},
}
}

func (coqc *ConsumerOptimizerQoSClient) getProviderChainMapCounterValue(counterStore map[string]map[string]uint64, chainId, providerAddress string) uint64 {
// must be called under read lock
if counterProvidersMap, found := counterStore[chainId]; found {
return counterProvidersMap[providerAddress]
}
return 0
}

func (coqc *ConsumerOptimizerQoSClient) getProviderChainRelaysCount(chainId, providerAddress string) uint64 {
// must be called under read lock
return coqc.getProviderChainMapCounterValue(coqc.chainIdToProviderToRelaysCount, chainId, providerAddress)
}

func (coqc *ConsumerOptimizerQoSClient) getProviderChainNodeErrorsCount(chainId, providerAddress string) uint64 {
// must be called under read lock
return coqc.getProviderChainMapCounterValue(coqc.chainIdToProviderToNodeErrorsCount, chainId, providerAddress)
}

func (coqc *ConsumerOptimizerQoSClient) getProviderChainStake(chainId, providerAddress string, epoch uint64) int64 {
// must be called under read lock
if providersMap, found := coqc.chainIdToProviderToEpochToStake[chainId]; found {
if epochMap, found := providersMap[providerAddress]; found {
if stake, found := epochMap[epoch]; found {
return stake
}
}
}
return 0
}

func (coqc *ConsumerOptimizerQoSClient) calculateNodeErrorRate(chainId, providerAddress string) float64 {
// must be called under read lock
relaysCount := coqc.getProviderChainRelaysCount(chainId, providerAddress)
if relaysCount > 0 {
errorsCount := coqc.getProviderChainNodeErrorsCount(chainId, providerAddress)
return float64(errorsCount) / float64(relaysCount)
}

return 0
}

func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *OptimizerQoSReport, chainId string, epoch uint64) {
// must be called under read lock

optimizerQoSReportToSend := optimizerQoSReportToSend{
Timestamp: time.Now(),
ConsumerOrigin: coqc.consumerOrigin,
SyncScore: report.SyncScore,
AvailabilityScore: report.AvailabilityScore,
LatencyScore: report.LatencyScore,
GenericScore: report.GenericScore,
ProviderAddress: report.ProviderAddress,
EntryIndex: report.EntryIndex,
ChainId: chainId,
Epoch: epoch,
NodeErrorRate: coqc.calculateNodeErrorRate(chainId, report.ProviderAddress),
ProviderStake: coqc.getProviderChainStake(chainId, report.ProviderAddress, epoch),
}

coqc.queueSender.appendQueue(optimizerQoSReportToSend)
}

func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() {
coqc.lock.RLock() // we only read from the maps here
defer coqc.lock.RUnlock()

ignoredProviders := map[string]struct{}{}
cu := uint64(10)
requestedBlock := spectypes.LATEST_BLOCK

currentEpoch := coqc.currentEpoch.Load()

for chainId, optimizer := range coqc.optimizers {
providersMap, ok := coqc.chainIdToProviderToEpochToStake[chainId]
if !ok {
continue
}

reports := optimizer.CalculateQoSScoresForMetrics(maps.Keys(providersMap), ignoredProviders, cu, requestedBlock)
for _, report := range reports {
coqc.appendOptimizerQoSReport(report, chainId, currentEpoch)
}
}
}

func (coqc *ConsumerOptimizerQoSClient) StartOptimizersQoSReportsCollecting(ctx context.Context, samplingInterval time.Duration) {
if coqc == nil {
return
}

utils.LavaFormatTrace("Starting ConsumerOptimizerQoSClient reports collecting")
go func() {
for {
select {
case <-ctx.Done():
utils.LavaFormatTrace("ConsumerOptimizerQoSClient context done")
return
case <-time.After(samplingInterval):
coqc.getReportsFromOptimizers()
}
}
}()
}

func (coqc *ConsumerOptimizerQoSClient) RegisterOptimizer(optimizer OptimizerInf, chainId string) {
if coqc == nil {
return
}

coqc.lock.Lock()
defer coqc.lock.Unlock()

if _, found := coqc.optimizers[chainId]; found {
utils.LavaFormatWarning("Optimizer already registered for chain", nil, utils.LogAttr("chainId", chainId))
return
}

coqc.optimizers[chainId] = optimizer
}

func (coqc *ConsumerOptimizerQoSClient) incrementStoreCounter(store map[string]map[string]uint64, chainId, providerAddress string) {
// must be called under write lock
if coqc == nil {
return
}

providersMap, found := store[chainId]
if !found {
store[chainId] = map[string]uint64{providerAddress: 1}
return
}

count, found := providersMap[providerAddress]
if !found {
store[chainId][providerAddress] = 1
return
}

store[chainId][providerAddress] = count + 1
}

func (coqc *ConsumerOptimizerQoSClient) SetRelaySentToProvider(providerAddress string, chainId string) {
if coqc == nil {
return
}

coqc.lock.Lock()
defer coqc.lock.Unlock()

coqc.incrementStoreCounter(coqc.chainIdToProviderToRelaysCount, chainId, providerAddress)
}

func (coqc *ConsumerOptimizerQoSClient) SetNodeErrorToProvider(providerAddress string, chainId string) {
if coqc == nil {
return
}

coqc.lock.Lock()
defer coqc.lock.Unlock()

coqc.incrementStoreCounter(coqc.chainIdToProviderToNodeErrorsCount, chainId, providerAddress)
}

func (coqc *ConsumerOptimizerQoSClient) setProviderStake(chainId, providerAddress string, epoch uint64, stake int64) {
// must be called under write lock
coqc.currentEpoch.Store(epoch)

providersMap, found := coqc.chainIdToProviderToEpochToStake[chainId]
if !found {
coqc.chainIdToProviderToEpochToStake[chainId] = map[string]map[uint64]int64{providerAddress: {epoch: stake}}
return
}

epochMap, found := providersMap[providerAddress]
if !found {
coqc.chainIdToProviderToEpochToStake[chainId][providerAddress] = map[uint64]int64{epoch: stake}
return
}

epochMap[epoch] = stake
}

func (coqc *ConsumerOptimizerQoSClient) UpdatePairingListStake(stakeMap map[string]int64, chainId string, epoch uint64) {
if coqc == nil {
return
}

coqc.lock.Lock()
defer coqc.lock.Unlock()

for providerAddr, stake := range stakeMap {
coqc.setProviderStake(chainId, providerAddr, epoch, stake)
}
}
Loading

0 comments on commit 50ce36d

Please sign in to comment.