diff --git a/cmd/status-cli/util.go b/cmd/status-cli/util.go index 268be583e50..71fd8da79db 100644 --- a/cmd/status-cli/util.go +++ b/cmd/status-cli/util.go @@ -1,6 +1,7 @@ package main import ( + "context" "errors" "fmt" "os" @@ -64,6 +65,7 @@ func start(name string, port int, apiModules string, telemetryUrl string, useExi if telemetryUrl != "" { telemetryClient := telemetry.NewClient(nlog.Desugar(), telemetryUrl, backend.SelectedAccountKeyID(), name, "cli") + go telemetryClient.Start(context.Background()) backend.StatusNode().WakuV2Service().SetStatusTelemetryClient(telemetryClient) } wakuAPI := wakuv2ext.NewPublicAPI(wakuService) diff --git a/cmd/statusd/main.go b/cmd/statusd/main.go index ecfd99ee78e..d341d8c104c 100644 --- a/cmd/statusd/main.go +++ b/cmd/statusd/main.go @@ -280,6 +280,7 @@ func main() { gethbridge.NewNodeBridge(backend.StatusNode().GethNode(), backend.StatusNode().WakuService(), backend.StatusNode().WakuV2Service()), installationID.String(), nil, + config.Version, options..., ) if err != nil { diff --git a/protocol/messenger.go b/protocol/messenger.go index 776597a1c7b..1f096549092 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -433,6 +433,7 @@ func NewMessenger( if c.wakuService != nil { c.wakuService.SetStatusTelemetryClient(telemetryClient) } + go telemetryClient.Start(messenger.ctx) } // Initialize push notification server @@ -3862,7 +3863,11 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte statusMessages := handleMessagesResponse.StatusMessages if m.telemetryClient != nil { - go m.telemetryClient.PushReceivedMessages(filter, shhMessage, statusMessages) + m.telemetryClient.PushReceivedMessages(telemetry.ReceivedMessages{ + Filter: filter, + SSHMessage: shhMessage, + Messages: statusMessages, + }) } err = m.handleDatasyncMetadata(handleMessagesResponse) diff --git a/telemetry/client.go b/telemetry/client.go index d8c3f6c6fc8..5b8a2a65c8c 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -2,6 +2,7 @@ package telemetry import ( "bytes" + "context" "encoding/json" "fmt" "net/http" @@ -11,62 +12,187 @@ import ( "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/transport" - v1protocol "github.com/status-im/status-go/protocol/v1" "github.com/status-im/status-go/wakuv2" + v1protocol "github.com/status-im/status-go/protocol/v1" v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" ) +type TelemetryType string + +const ( + ProtocolStatsMetric TelemetryType = "ProtocolStats" + ReceivedEnvelopeMetric TelemetryType = "ReceivedEnvelope" + SentEnvelopeMetric TelemetryType = "SentEnvelope" + UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" + ReceivedMessagesMetric TelemetryType = "ReceivedMessages" +) + +type TelemetryRequest struct { + Id int `json:"id"` + TelemetryType TelemetryType `json:"telemetry_type"` + TelemetryData *json.RawMessage `json:"telemetry_data"` +} + +func (c *Client) PushReceivedMessages(receivedMessages ReceivedMessages) { + c.receivedMessagesCh <- receivedMessages +} + +func (c *Client) PushSentEnvelope(sentEnvelope wakuv2.SentEnvelope) { + c.sentEnvelopeCh <- sentEnvelope +} + +func (c *Client) PushReceivedEnvelope(receivedEnvelope *v2protocol.Envelope) { + c.receivedEnvelopeCh <- receivedEnvelope +} + +type ReceivedMessages struct { + Filter transport.Filter + SSHMessage *types.Message + Messages []*v1protocol.StatusMessage +} + type Client struct { - serverURL string - httpClient *http.Client - logger *zap.Logger - keyUID string - nodeName string - version string + serverURL string + httpClient *http.Client + logger *zap.Logger + keyUID string + nodeName string + version string + receivedMessagesCh chan ReceivedMessages + receivedEnvelopeCh chan *v2protocol.Envelope + sentEnvelopeCh chan wakuv2.SentEnvelope + telemetryCh chan TelemetryRequest + nextId int + sendPeriod time.Duration } func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string) *Client { return &Client{ - serverURL: serverURL, - httpClient: &http.Client{Timeout: time.Minute}, - logger: logger, - keyUID: keyUID, - nodeName: nodeName, - version: version, + serverURL: serverURL, + httpClient: &http.Client{Timeout: time.Minute}, + logger: logger, + keyUID: keyUID, + nodeName: nodeName, + version: version, + receivedMessagesCh: make(chan ReceivedMessages), + receivedEnvelopeCh: make(chan *v2protocol.Envelope), + sentEnvelopeCh: make(chan wakuv2.SentEnvelope), + telemetryCh: make(chan TelemetryRequest), + nextId: 0, + sendPeriod: 10 * time.Second, + } +} + +func (c *Client) CollectAndProcessTelemetry(ctx context.Context) { + go func() { + for { + select { + case receivedMessages := <-c.receivedMessagesCh: + c.processAndPushTelemetry(receivedMessages) + case receivedEnvelope := <-c.receivedEnvelopeCh: + c.processAndPushTelemetry(receivedEnvelope) + case sentEnvelope := <-c.sentEnvelopeCh: + c.processAndPushTelemetry(sentEnvelope) + case <-ctx.Done(): + return + } + } + }() +} + +func (c *Client) Start(ctx context.Context) { + go c.CollectAndProcessTelemetry(ctx) + go func() { + ticker := time.NewTicker(c.sendPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + var telemetryRequests []TelemetryRequest + collecting := true + for collecting { + select { + case telemetryRequest := <-c.telemetryCh: + telemetryRequests = append(telemetryRequests, telemetryRequest) + default: + collecting = false + } + } + if len(telemetryRequests) > 0 { + c.pushTelemetryRequest(telemetryRequests) + } + case <-ctx.Done(): + return + } + } + }() +} + +func (c *Client) processAndPushTelemetry(data interface{}) { + var telemetryRequest TelemetryRequest + switch v := data.(type) { + case ReceivedMessages: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: ReceivedMessagesMetric, + TelemetryData: c.ProcessReceivedMessages(v), + } + case *v2protocol.Envelope: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: ReceivedEnvelopeMetric, + TelemetryData: c.ProcessReceivedEnvelope(v), + } + case wakuv2.SentEnvelope: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: SentEnvelopeMetric, + TelemetryData: c.ProcessSentEnvelope(v), + } + default: + c.logger.Error("Unknown telemetry data type") + return + } + + c.nextId++ + c.telemetryCh <- telemetryRequest +} + +func (c *Client) pushTelemetryRequest(request []TelemetryRequest) { + url := fmt.Sprintf("%s/record-metrics", c.serverURL) + body, _ := json.Marshal(request) + _, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) + if err != nil { + c.logger.Error("Error sending telemetry data", zap.Error(err)) } } -func (c *Client) PushReceivedMessages(filter transport.Filter, sshMessage *types.Message, messages []*v1protocol.StatusMessage) { +func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *json.RawMessage { c.logger.Debug("Pushing received messages to telemetry server") - url := fmt.Sprintf("%s/received-messages", c.serverURL) var postBody []map[string]interface{} - for _, message := range messages { + for _, message := range receivedMessages.Messages { postBody = append(postBody, map[string]interface{}{ - "chatId": filter.ChatID, - "messageHash": types.EncodeHex(sshMessage.Hash), + "chatId": receivedMessages.Filter.ChatID, + "messageHash": types.EncodeHex(receivedMessages.SSHMessage.Hash), "messageId": message.ApplicationLayer.ID, - "sentAt": sshMessage.Timestamp, - "pubsubTopic": filter.PubsubTopic, - "topic": filter.ContentTopic.String(), + "sentAt": receivedMessages.SSHMessage.Timestamp, + "pubsubTopic": receivedMessages.Filter.PubsubTopic, + "topic": receivedMessages.Filter.ContentTopic.String(), "messageType": message.ApplicationLayer.Type.String(), "receiverKeyUID": c.keyUID, "nodeName": c.nodeName, - "messageSize": len(sshMessage.Payload), + "messageSize": len(receivedMessages.SSHMessage.Payload), "statusVersion": c.version, }) } body, _ := json.Marshal(postBody) - _, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) - if err != nil { - c.logger.Error("Error sending message to telemetry server", zap.Error(err)) - } else { - c.logger.Debug("Successfully pushed received messages to telemetry server") - } + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage } -func (c *Client) PushReceivedEnvelope(envelope *v2protocol.Envelope) { - url := fmt.Sprintf("%s/received-envelope", c.serverURL) +func (c *Client) ProcessReceivedEnvelope(envelope *v2protocol.Envelope) *json.RawMessage { postBody := map[string]interface{}{ "messageHash": envelope.Hash().String(), "sentAt": uint32(envelope.Message().GetTimestamp() / int64(time.Second)), @@ -77,33 +203,24 @@ func (c *Client) PushReceivedEnvelope(envelope *v2protocol.Envelope) { "statusVersion": c.version, } body, _ := json.Marshal(postBody) - _, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) - if err != nil { - c.logger.Error("Error sending received envelope to telemetry server", zap.Error(err)) - } else { - c.logger.Debug("Successfully pushed received envelope to telemetry server", zap.String("hash", envelope.Hash().String())) - } + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage } -func (c *Client) PushSentEnvelope(envelope *v2protocol.Envelope, publishMethod wakuv2.PublishMethod) { - url := fmt.Sprintf("%s/sent-envelope", c.serverURL) +func (c *Client) ProcessSentEnvelope(sentEnvelope wakuv2.SentEnvelope) *json.RawMessage { postBody := map[string]interface{}{ - "messageHash": envelope.Hash().String(), - "sentAt": uint32(envelope.Message().GetTimestamp() / int64(time.Second)), - "pubsubTopic": envelope.PubsubTopic(), - "topic": envelope.Message().ContentTopic, + "messageHash": sentEnvelope.Envelope.Hash().String(), + "sentAt": uint32(sentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)), + "pubsubTopic": sentEnvelope.Envelope.PubsubTopic(), + "topic": sentEnvelope.Envelope.Message().ContentTopic, "senderKeyUID": c.keyUID, "nodeName": c.nodeName, - "publishMethod": publishMethod.String(), + "publishMethod": sentEnvelope.PublishMethod.String(), "statusVersion": c.version, } body, _ := json.Marshal(postBody) - _, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) - if err != nil { - c.logger.Error("Error sending pushed envelope to telemetry server", zap.Error(err)) - } else { - c.logger.Debug("Successfully pushed sent envelope to telemetry server", zap.String("hash", envelope.Hash().String())) - } + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage } func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { @@ -126,7 +243,5 @@ func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, proces _, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) if err != nil { c.logger.Error("Error sending envelope update to telemetry server", zap.Error(err)) - } else { - c.logger.Debug("Successfully pushed envelope processing error to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) } } diff --git a/telemetry/client_test.go b/telemetry/client_test.go new file mode 100644 index 00000000000..bc9db935a65 --- /dev/null +++ b/telemetry/client_test.go @@ -0,0 +1,156 @@ +package telemetry + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/protocol/transport" + v1protocol "github.com/status-im/status-go/protocol/v1" + "github.com/status-im/status-go/wakuv2" + v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" +) + +func createMockServer(t *testing.T) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + t.Errorf("Expected 'POST' request, got '%s'", r.Method) + } + if r.URL.EscapedPath() != "/record-metrics" { + t.Errorf("Expected request to '/record-metrics', got '%s'", r.URL.EscapedPath()) + } + + // Check the request body is as expected + var received []TelemetryRequest + err := json.NewDecoder(r.Body).Decode(&received) + if err != nil { + t.Fatal(err) + } + + if len(received) != 1 { + t.Errorf("Unexpected data received: %+v", received) + } else { + // If the data is as expected, respond with success + t.Log("Responding with success") + w.WriteHeader(http.StatusOK) + } + })) +} + +func TestClient_ProcessReceivedMessages(t *testing.T) { + // Setup a mock server to handle post requests + mockServer := createMockServer(t) + defer mockServer.Close() + + // Create a client with the mock server URL + config := zap.NewDevelopmentConfig() + config.Level = zap.NewAtomicLevelAt(zap.DebugLevel) + logger, err := config.Build() + if err != nil { + t.Fatalf("Failed to create logger: %v", err) + } + client := NewClient(logger, mockServer.URL, "testUID", "testNode", "1.0") + + // Create a telemetry request to send + data := ReceivedMessages{ + Filter: transport.Filter{ + ChatID: "testChat", + PubsubTopic: "testTopic", + ContentTopic: types.StringToTopic("testContentTopic"), + }, + SSHMessage: &types.Message{ + Hash: []byte("hash"), + Timestamp: uint32(time.Now().Unix()), + }, + Messages: []*v1protocol.StatusMessage{ + { + ApplicationLayer: v1protocol.ApplicationLayer{ + ID: types.HexBytes("123"), + Type: 1, + }, + }, + }, + } + telemetryData := client.ProcessReceivedMessages(data) + telemetryRequest := TelemetryRequest{ + Id: 1, + TelemetryType: ReceivedMessagesMetric, + TelemetryData: telemetryData, + } + + // Send the telemetry request + client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest}) +} + +func TestClient_ProcessReceivedEnvelope(t *testing.T) { + // Setup a mock server to handle post requests + mockServer := createMockServer(t) + defer mockServer.Close() + + // Create a client with the mock server URL + config := zap.NewDevelopmentConfig() + config.Level = zap.NewAtomicLevelAt(zap.DebugLevel) + logger, err := config.Build() + if err != nil { + t.Fatalf("Failed to create logger: %v", err) + } + client := NewClient(logger, mockServer.URL, "testUID", "testNode", "1.0") + + // Create a telemetry request to send + envelope := v2protocol.NewEnvelope(&pb.WakuMessage{ + Payload: []byte{1, 2, 3, 4, 5}, + ContentTopic: "testContentTopic", + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().Unix()), + }, 0, "") + telemetryData := client.ProcessReceivedEnvelope(envelope) + telemetryRequest := TelemetryRequest{ + Id: 2, + TelemetryType: ReceivedEnvelopeMetric, + TelemetryData: telemetryData, + } + + // Send the telemetry request + client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest}) +} + +func TestClient_ProcessSentEnvelope(t *testing.T) { + // Setup a mock server to handle post requests + mockServer := createMockServer(t) + defer mockServer.Close() + + // Create a client with the mock server URL + config := zap.NewDevelopmentConfig() + config.Level = zap.NewAtomicLevelAt(zap.DebugLevel) + logger, err := config.Build() + if err != nil { + t.Fatalf("Failed to create logger: %v", err) + } + client := NewClient(logger, mockServer.URL, "testUID", "testNode", "1.0") + + // Create a telemetry request to send + sentEnvelope := wakuv2.SentEnvelope{ + Envelope: v2protocol.NewEnvelope(&pb.WakuMessage{ + Payload: []byte{1, 2, 3, 4, 5}, + ContentTopic: "testContentTopic", + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().Unix()), + }, 0, ""), + PublishMethod: wakuv2.LightPush, + } + telemetryData := client.ProcessSentEnvelope(sentEnvelope) + telemetryRequest := TelemetryRequest{ + Id: 3, + TelemetryType: SentEnvelopeMetric, + TelemetryData: telemetryData, + } + + // Send the telemetry request + client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest}) +} diff --git a/wakuv2/telemetry.go b/wakuv2/telemetry.go index 3ecd27bbf91..88eaaea52ff 100644 --- a/wakuv2/telemetry.go +++ b/wakuv2/telemetry.go @@ -50,7 +50,5 @@ func (c *BandwidthTelemetryClient) PushProtocolStats(relayStats metrics.Stats, s _, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) if err != nil { c.logger.Error("Error sending message to telemetry server", zap.Error(err)) - } else { - c.logger.Debug("Successfully pushed protocol stats to telemetry server") } } diff --git a/wakuv2/waku.go b/wakuv2/waku.go index a585fa1d1ea..3f0c27af4ea 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -75,6 +75,7 @@ import ( "github.com/status-im/status-go/wakuv2/persistence" node "github.com/waku-org/go-waku/waku/v2/node" + v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" ) @@ -87,9 +88,14 @@ const maxHashQueryLength = 100 const hashQueryInterval = 5 * time.Second const messageSentPeriod = 5 // in seconds +type SentEnvelope struct { + Envelope *v2protocol.Envelope + PublishMethod PublishMethod +} + type ITelemetryClient interface { - PushReceivedEnvelope(*protocol.Envelope) - PushSentEnvelope(*protocol.Envelope, PublishMethod) + PushReceivedEnvelope(receivedEnvelope *v2protocol.Envelope) + PushSentEnvelope(sentEnvelope SentEnvelope) } // Waku represents a dark communication interface through the Ethereum @@ -1001,7 +1007,7 @@ func (w *Waku) broadcast() { fn = func(env *protocol.Envelope, logger *zap.Logger) error { err := sendFn(env, logger) if err == nil { - w.statusTelemetryClient.PushSentEnvelope(env, publishMethod) + w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: env, PublishMethod: publishMethod}) } // else { // TODO: send error from Relay or LightPush to Telemetry