From 8c6ad2256a96cf2653c041b912af46ef2dcbc4e9 Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Tue, 3 Dec 2024 12:27:08 +0200 Subject: [PATCH 1/2] Add the consumer address to the QoS report --- .../metrics/consumer_optimizer_qos_client.go | 18 +++--- protocol/rpcconsumer/rpcconsumer.go | 55 ++++++++++++------- 2 files changed, 45 insertions(+), 28 deletions(-) diff --git a/protocol/metrics/consumer_optimizer_qos_client.go b/protocol/metrics/consumer_optimizer_qos_client.go index 3e12e23dfc..72183853d9 100644 --- a/protocol/metrics/consumer_optimizer_qos_client.go +++ b/protocol/metrics/consumer_optimizer_qos_client.go @@ -21,9 +21,10 @@ var ( ) type ConsumerOptimizerQoSClient struct { - consumerOrigin string - queueSender *QueueSender - optimizers map[string]OptimizerInf // keys are chain ids + consumerHostname string + consumerAddress string + queueSender *QueueSender + optimizers map[string]OptimizerInf // keys are chain ids // keys are chain ids, values are maps with provider addresses as keys chainIdToProviderToRelaysCount map[string]map[string]uint64 chainIdToProviderToNodeErrorsCount map[string]map[string]uint64 @@ -49,7 +50,8 @@ type OptimizerQoSReportToSend struct { LatencyScore float64 `json:"latency_score"` GenericScore float64 `json:"generic_score"` ProviderAddress string `json:"provider"` - ConsumerOrigin string `json:"consumer"` + ConsumerHostname string `json:"consumer_hostname"` + ConsumerAddress string `json:"consumer_pub_address"` ChainId string `json:"chain_id"` NodeErrorRate float64 `json:"node_error_rate"` Epoch uint64 `json:"epoch"` @@ -69,14 +71,15 @@ type OptimizerInf interface { CalculateQoSScoresForMetrics(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) []*OptimizerQoSReport } -func NewConsumerOptimizerQoSClient(endpointAddress string, interval ...time.Duration) *ConsumerOptimizerQoSClient { +func NewConsumerOptimizerQoSClient(consumerAddress, endpointAddress string, interval ...time.Duration) *ConsumerOptimizerQoSClient { hostname, err := os.Hostname() if err != nil { utils.LavaFormatWarning("Error while getting hostname for ConsumerOptimizerQoSClient", err) hostname = "unknown" + strconv.FormatUint(rand.Uint64(), 10) // random seed for different unknowns } return &ConsumerOptimizerQoSClient{ - consumerOrigin: hostname, + consumerHostname: hostname, + consumerAddress: consumerAddress, queueSender: NewQueueSender(endpointAddress, "ConsumerOptimizerQoS", nil, interval...), optimizers: map[string]OptimizerInf{}, chainIdToProviderToRelaysCount: map[string]map[string]uint64{}, @@ -130,7 +133,8 @@ func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *Optimiz // must be called under read lock optimizerQoSReportToSend := OptimizerQoSReportToSend{ Timestamp: time.Now(), - ConsumerOrigin: coqc.consumerOrigin, + ConsumerHostname: coqc.consumerHostname, + ConsumerAddress: coqc.consumerAddress, SyncScore: report.SyncScore, AvailabilityScore: report.AvailabilityScore, LatencyScore: report.LatencyScore, diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index 94784cdd2e..dc92b3e117 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -131,6 +131,33 @@ type rpcConsumerStartOptions struct { staticProvidersList []*lavasession.RPCProviderEndpoint // define static providers as backup to lava providers } +func getConsumerAddressAndKeys(clientCtx client.Context) (sdk.AccAddress, *secp256k1.PrivateKey, error) { + keyName, err := sigs.GetKeyName(clientCtx) + if err != nil { + return nil, nil, fmt.Errorf("failed getting key name from clientCtx: %w", err) + } + + privKey, err := sigs.GetPrivKey(clientCtx, keyName) + if err != nil { + return nil, nil, fmt.Errorf("failed getting private key from key name %s: %w", keyName, err) + } + + clientKey, _ := clientCtx.Keyring.Key(keyName) + pubkey, err := clientKey.GetPubKey() + if err != nil { + return nil, nil, fmt.Errorf("failed getting public key from key name %s: %w", keyName, err) + } + + var consumerAddr sdk.AccAddress + err = consumerAddr.Unmarshal(pubkey.Address()) + if err != nil { + return nil, nil, fmt.Errorf("failed unmarshaling public address for key %s (pubkey: %v): %w", + keyName, pubkey.Address(), err) + } + + return consumerAddr, privKey, nil +} + // spawns a new RPCConsumer server with all it's processes and internals ready for communications func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOptions) (err error) { if common.IsTestMode(ctx) { @@ -139,11 +166,16 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt options.refererData.ReferrerClient = metrics.NewConsumerReferrerClient(options.refererData.Address) consumerReportsManager := metrics.NewConsumerReportsClient(options.analyticsServerAddresses.ReportsAddressFlag) + consumerAddr, privKey, err := getConsumerAddressAndKeys(options.clientCtx) + if err != nil { + utils.LavaFormatFatal("failed to get consumer address and keys", err) + } + consumerUsageServeManager := metrics.NewConsumerRelayServerClient(options.analyticsServerAddresses.RelayServerAddress) // start up relay server reporting var consumerOptimizerQoSClient *metrics.ConsumerOptimizerQoSClient if options.analyticsServerAddresses.OptimizerQoSAddress != "" || options.analyticsServerAddresses.OptimizerQoSListen { - consumerOptimizerQoSClient = metrics.NewConsumerOptimizerQoSClient(options.analyticsServerAddresses.OptimizerQoSAddress, metrics.OptimizerQosServerPushInterval) // start up optimizer qos client - consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(ctx, metrics.OptimizerQosServerSamplingInterval) // start up optimizer qos client + consumerOptimizerQoSClient = metrics.NewConsumerOptimizerQoSClient(consumerAddr.String(), options.analyticsServerAddresses.OptimizerQoSAddress, metrics.OptimizerQosServerPushInterval) // start up optimizer qos client + consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(ctx, metrics.OptimizerQosServerSamplingInterval) // start up optimizer qos client } consumerMetricsManager := metrics.NewConsumerMetricsManager(metrics.ConsumerMetricsManagerOptions{ NetworkAddress: options.analyticsServerAddresses.MetricsListenAddress, @@ -179,26 +211,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt lavaChainFetcher.FetchLatestBlockNum(ctx) lavaChainID := options.clientCtx.ChainID - keyName, err := sigs.GetKeyName(options.clientCtx) - if err != nil { - utils.LavaFormatFatal("failed getting key name from clientCtx", err) - } - privKey, err := sigs.GetPrivKey(options.clientCtx, keyName) - if err != nil { - utils.LavaFormatFatal("failed getting private key from key name", err, utils.Attribute{Key: "keyName", Value: keyName}) - } - clientKey, _ := options.clientCtx.Keyring.Key(keyName) - pubkey, err := clientKey.GetPubKey() - if err != nil { - utils.LavaFormatFatal("failed getting public key from key name", err, utils.Attribute{Key: "keyName", Value: keyName}) - } - - var consumerAddr sdk.AccAddress - err = consumerAddr.Unmarshal(pubkey.Address()) - if err != nil { - utils.LavaFormatFatal("failed unmarshaling public address", err, utils.Attribute{Key: "keyName", Value: keyName}, utils.Attribute{Key: "pubkey", Value: pubkey.Address()}) - } // we want one provider optimizer per chain so we will store them for reuse across rpcEndpoints chainMutexes := map[string]*sync.Mutex{} for _, endpoint := range options.rpcEndpoints { From 0593bb22bb3078dd434db36fb106751160a3a511 Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Tue, 3 Dec 2024 12:38:42 +0200 Subject: [PATCH 2/2] Test fix --- protocol/provideroptimizer/provider_optimizer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/provideroptimizer/provider_optimizer_test.go b/protocol/provideroptimizer/provider_optimizer_test.go index fc8427a9dc..6de13de8b6 100644 --- a/protocol/provideroptimizer/provider_optimizer_test.go +++ b/protocol/provideroptimizer/provider_optimizer_test.go @@ -781,7 +781,7 @@ func TestProviderOptimizerWithOptimizerQoSClient(t *testing.T) { chainId := "dontcare" - consumerOptimizerQoSClient := metrics.NewConsumerOptimizerQoSClient(mockHttpServer.URL, 1*time.Second) + consumerOptimizerQoSClient := metrics.NewConsumerOptimizerQoSClient("lava@test", mockHttpServer.URL, 1*time.Second) consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(context.Background(), 900*time.Millisecond) providerOptimizer := NewProviderOptimizer(STRATEGY_BALANCED, TEST_AVERAGE_BLOCK_TIME, TEST_BASE_WORLD_LATENCY, 10, consumerOptimizerQoSClient, chainId)