Skip to content

Commit

Permalink
feat_: batch all telemetry data and send request every 10 seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Jun 13, 2024
1 parent e514242 commit 60d38d2
Show file tree
Hide file tree
Showing 7 changed files with 340 additions and 57 deletions.
2 changes: 2 additions & 0 deletions cmd/status-cli/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -64,6 +65,7 @@ func start(name string, port int, apiModules string, telemetryUrl string, useExi

if telemetryUrl != "" {
telemetryClient := telemetry.NewClient(nlog.Desugar(), telemetryUrl, backend.SelectedAccountKeyID(), name, "cli")
go telemetryClient.Start(context.Background())
backend.StatusNode().WakuV2Service().SetStatusTelemetryClient(telemetryClient)
}
wakuAPI := wakuv2ext.NewPublicAPI(wakuService)
Expand Down
1 change: 1 addition & 0 deletions cmd/statusd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ func main() {
gethbridge.NewNodeBridge(backend.StatusNode().GethNode(), backend.StatusNode().WakuService(), backend.StatusNode().WakuV2Service()),
installationID.String(),
nil,
config.Version,
options...,
)
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ func NewMessenger(
if c.wakuService != nil {
c.wakuService.SetStatusTelemetryClient(telemetryClient)
}
go telemetryClient.Start(messenger.ctx)
}

// Initialize push notification server
Expand Down Expand Up @@ -3862,7 +3863,11 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
statusMessages := handleMessagesResponse.StatusMessages

if m.telemetryClient != nil {
go m.telemetryClient.PushReceivedMessages(filter, shhMessage, statusMessages)
m.telemetryClient.PushReceivedMessages(telemetry.ReceivedMessages{
Filter: filter,
SSHMessage: shhMessage,
Messages: statusMessages,
})
}

err = m.handleDatasyncMetadata(handleMessagesResponse)
Expand Down
217 changes: 166 additions & 51 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package telemetry

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
Expand All @@ -11,62 +12,187 @@ import (

"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport"
v1protocol "github.com/status-im/status-go/protocol/v1"
"github.com/status-im/status-go/wakuv2"

v1protocol "github.com/status-im/status-go/protocol/v1"
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
)

type TelemetryType string

const (
ProtocolStatsMetric TelemetryType = "ProtocolStats"
ReceivedEnvelopeMetric TelemetryType = "ReceivedEnvelope"
SentEnvelopeMetric TelemetryType = "SentEnvelope"
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
)

type TelemetryRequest struct {
Id int `json:"id"`
TelemetryType TelemetryType `json:"telemetry_type"`
TelemetryData *json.RawMessage `json:"telemetry_data"`
}

func (c *Client) PushReceivedMessages(receivedMessages ReceivedMessages) {
c.receivedMessagesCh <- receivedMessages
}

func (c *Client) PushSentEnvelope(sentEnvelope wakuv2.SentEnvelope) {
c.sentEnvelopeCh <- sentEnvelope
}

func (c *Client) PushReceivedEnvelope(receivedEnvelope *v2protocol.Envelope) {
c.receivedEnvelopeCh <- receivedEnvelope
}

type ReceivedMessages struct {
Filter transport.Filter
SSHMessage *types.Message
Messages []*v1protocol.StatusMessage
}

type Client struct {
serverURL string
httpClient *http.Client
logger *zap.Logger
keyUID string
nodeName string
version string
serverURL string
httpClient *http.Client
logger *zap.Logger
keyUID string
nodeName string
version string
receivedMessagesCh chan ReceivedMessages
receivedEnvelopeCh chan *v2protocol.Envelope
sentEnvelopeCh chan wakuv2.SentEnvelope
telemetryCh chan TelemetryRequest
nextId int
sendPeriod time.Duration
}

func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string) *Client {
return &Client{
serverURL: serverURL,
httpClient: &http.Client{Timeout: time.Minute},
logger: logger,
keyUID: keyUID,
nodeName: nodeName,
version: version,
serverURL: serverURL,
httpClient: &http.Client{Timeout: time.Minute},
logger: logger,
keyUID: keyUID,
nodeName: nodeName,
version: version,
receivedMessagesCh: make(chan ReceivedMessages),
receivedEnvelopeCh: make(chan *v2protocol.Envelope),
sentEnvelopeCh: make(chan wakuv2.SentEnvelope),
telemetryCh: make(chan TelemetryRequest),
nextId: 0,
sendPeriod: 10 * time.Second,
}
}

func (c *Client) CollectAndProcessTelemetry(ctx context.Context) {
go func() {
for {
select {
case receivedMessages := <-c.receivedMessagesCh:
c.processAndPushTelemetry(receivedMessages)
case receivedEnvelope := <-c.receivedEnvelopeCh:
c.processAndPushTelemetry(receivedEnvelope)
case sentEnvelope := <-c.sentEnvelopeCh:
c.processAndPushTelemetry(sentEnvelope)
case <-ctx.Done():
return
}
}
}()
}

func (c *Client) Start(ctx context.Context) {
go c.CollectAndProcessTelemetry(ctx)
go func() {
ticker := time.NewTicker(c.sendPeriod)
defer ticker.Stop()

for {
select {
case <-ticker.C:
var telemetryRequests []TelemetryRequest
collecting := true
for collecting {
select {
case telemetryRequest := <-c.telemetryCh:
telemetryRequests = append(telemetryRequests, telemetryRequest)
default:
collecting = false
}
}
if len(telemetryRequests) > 0 {
c.pushTelemetryRequest(telemetryRequests)
}
case <-ctx.Done():
return
}
}
}()
}

func (c *Client) processAndPushTelemetry(data interface{}) {
var telemetryRequest TelemetryRequest
switch v := data.(type) {
case ReceivedMessages:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: ReceivedMessagesMetric,
TelemetryData: c.ProcessReceivedMessages(v),
}
case *v2protocol.Envelope:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: ReceivedEnvelopeMetric,
TelemetryData: c.ProcessReceivedEnvelope(v),
}
case wakuv2.SentEnvelope:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: SentEnvelopeMetric,
TelemetryData: c.ProcessSentEnvelope(v),
}
default:
c.logger.Error("Unknown telemetry data type")
return
}

c.nextId++
c.telemetryCh <- telemetryRequest
}

func (c *Client) pushTelemetryRequest(request []TelemetryRequest) {
url := fmt.Sprintf("%s/record-metrics", c.serverURL)
body, _ := json.Marshal(request)
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
if err != nil {
c.logger.Error("Error sending telemetry data", zap.Error(err))
}
}

func (c *Client) PushReceivedMessages(filter transport.Filter, sshMessage *types.Message, messages []*v1protocol.StatusMessage) {
func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *json.RawMessage {
c.logger.Debug("Pushing received messages to telemetry server")
url := fmt.Sprintf("%s/received-messages", c.serverURL)
var postBody []map[string]interface{}
for _, message := range messages {
for _, message := range receivedMessages.Messages {
postBody = append(postBody, map[string]interface{}{
"chatId": filter.ChatID,
"messageHash": types.EncodeHex(sshMessage.Hash),
"chatId": receivedMessages.Filter.ChatID,
"messageHash": types.EncodeHex(receivedMessages.SSHMessage.Hash),
"messageId": message.ApplicationLayer.ID,
"sentAt": sshMessage.Timestamp,
"pubsubTopic": filter.PubsubTopic,
"topic": filter.ContentTopic.String(),
"sentAt": receivedMessages.SSHMessage.Timestamp,
"pubsubTopic": receivedMessages.Filter.PubsubTopic,
"topic": receivedMessages.Filter.ContentTopic.String(),
"messageType": message.ApplicationLayer.Type.String(),
"receiverKeyUID": c.keyUID,
"nodeName": c.nodeName,
"messageSize": len(sshMessage.Payload),
"messageSize": len(receivedMessages.SSHMessage.Payload),
"statusVersion": c.version,
})
}
body, _ := json.Marshal(postBody)
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
if err != nil {
c.logger.Error("Error sending message to telemetry server", zap.Error(err))
} else {
c.logger.Debug("Successfully pushed received messages to telemetry server")
}
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) PushReceivedEnvelope(envelope *v2protocol.Envelope) {
url := fmt.Sprintf("%s/received-envelope", c.serverURL)
func (c *Client) ProcessReceivedEnvelope(envelope *v2protocol.Envelope) *json.RawMessage {
postBody := map[string]interface{}{
"messageHash": envelope.Hash().String(),
"sentAt": uint32(envelope.Message().GetTimestamp() / int64(time.Second)),
Expand All @@ -77,33 +203,24 @@ func (c *Client) PushReceivedEnvelope(envelope *v2protocol.Envelope) {
"statusVersion": c.version,
}
body, _ := json.Marshal(postBody)
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
if err != nil {
c.logger.Error("Error sending received envelope to telemetry server", zap.Error(err))
} else {
c.logger.Debug("Successfully pushed received envelope to telemetry server", zap.String("hash", envelope.Hash().String()))
}
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) PushSentEnvelope(envelope *v2protocol.Envelope, publishMethod wakuv2.PublishMethod) {
url := fmt.Sprintf("%s/sent-envelope", c.serverURL)
func (c *Client) ProcessSentEnvelope(sentEnvelope wakuv2.SentEnvelope) *json.RawMessage {
postBody := map[string]interface{}{
"messageHash": envelope.Hash().String(),
"sentAt": uint32(envelope.Message().GetTimestamp() / int64(time.Second)),
"pubsubTopic": envelope.PubsubTopic(),
"topic": envelope.Message().ContentTopic,
"messageHash": sentEnvelope.Envelope.Hash().String(),
"sentAt": uint32(sentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)),
"pubsubTopic": sentEnvelope.Envelope.PubsubTopic(),
"topic": sentEnvelope.Envelope.Message().ContentTopic,
"senderKeyUID": c.keyUID,
"nodeName": c.nodeName,
"publishMethod": publishMethod.String(),
"publishMethod": sentEnvelope.PublishMethod.String(),
"statusVersion": c.version,
}
body, _ := json.Marshal(postBody)
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
if err != nil {
c.logger.Error("Error sending pushed envelope to telemetry server", zap.Error(err))
} else {
c.logger.Debug("Successfully pushed sent envelope to telemetry server", zap.String("hash", envelope.Hash().String()))
}
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
Expand All @@ -126,7 +243,5 @@ func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, proces
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
if err != nil {
c.logger.Error("Error sending envelope update to telemetry server", zap.Error(err))
} else {
c.logger.Debug("Successfully pushed envelope processing error to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
}
}
Loading

0 comments on commit 60d38d2

Please sign in to comment.