Skip to content

Commit

Permalink
feat: add metric for missing messages
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Sep 9, 2024
1 parent 0c91a74 commit 042a753
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 8 deletions.
2 changes: 2 additions & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func main() {
server.RegisterMetric(types.ReceivedMessagesMetric, &metrics.ReceivedMessage{})
server.RegisterMetric(types.SentEnvelopeMetric, &metrics.SentEnvelope{})
server.RegisterMetric(types.StoreConfirmationFailedMetric, &metrics.StoreConfirmationFailed{})
server.RegisterMetric(types.MissingMessageMetric, &metrics.MissingMessage{})
server.RegisterMetric(types.MissingRelevantMessageMetric, &metrics.MissingRelevantMessage{})

server.Start(*port)
}
12 changes: 12 additions & 0 deletions lib/common/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func DropTables(db *sql.DB) {
"peerconnfailure",
"errorsendingenvelope",
"storeconfirmationfailed",
"missingmessages",
"missingrelevantmessages",
"schema_migrations",
}

Expand Down Expand Up @@ -90,6 +92,16 @@ func DropTables(db *sql.DB) {
log.Fatalf("an error '%s' was not expected when dropping the index", err)
}

_, err = tx.Exec("DROP INDEX IF EXISTS missingMessages_unique")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the index", err)
}

_, err = tx.Exec("DROP INDEX IF EXISTS missingRelevantMessages_unique")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the index", err)
}

_, err = tx.Exec("DROP TABLE IF EXISTS schema_migrations")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the table", err)
Expand Down
35 changes: 29 additions & 6 deletions lib/database/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions lib/database/sql/000017_store_confirmation_failure.up.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
CREATE TABLE IF NOT EXISTS storeConfirmationFailed (
id SERIAL PRIMARY KEY,
messageHash TEXT NOT NULL,
recordId INTEGER NOT NULL
recordId INTEGER NOT NULL,
timestamp INTEGER NOT NULL
);

ALTER TABLE storeConfirmationFailed ADD CONSTRAINT fk_storeConfirmationFailed_telemetryRecord
Expand All @@ -11,5 +12,6 @@ ALTER TABLE storeConfirmationFailed
ADD CONSTRAINT storeConfirmationFailed_unique
UNIQUE (
recordId,
messageHash
messageHash,
timestamp
);
39 changes: 39 additions & 0 deletions lib/database/sql/000018_missing_messages.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
CREATE TABLE IF NOT EXISTS missingMessages (
id SERIAL PRIMARY KEY,
messageHash TEXT NOT NULL,
sentAt INTEGER NOT NULL,
contentTopic TEXT NOT NULL,
pubsubTopic TEXT NOT NULL,
recordId INTEGER NOT NULL
);

ALTER TABLE missingMessages ADD CONSTRAINT fk_missingMessages_telemetryRecord
FOREIGN KEY (recordId) REFERENCES telemetryRecord(id);

ALTER TABLE missingMessages
ADD CONSTRAINT missingMessages_unique
UNIQUE (
recordId,
messageHash,
contentTopic
);

CREATE TABLE IF NOT EXISTS missingRelevantMessages (
id SERIAL PRIMARY KEY,
messageHash TEXT NOT NULL,
sentAt INTEGER NOT NULL,
contentTopic TEXT NOT NULL,
pubsubTopic TEXT NOT NULL,
recordId INTEGER NOT NULL
);

ALTER TABLE missingRelevantMessages ADD CONSTRAINT fk_missingRelevantMessages_telemetryRecord
FOREIGN KEY (recordId) REFERENCES telemetryRecord(id);

ALTER TABLE missingRelevantMessages
ADD CONSTRAINT missingRelevantMessages_unique
UNIQUE (
recordId,
messageHash,
contentTopic
);
106 changes: 106 additions & 0 deletions lib/metrics/missing_messages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package metrics

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"

"github.com/status-im/telemetry/lib/common"
"github.com/status-im/telemetry/pkg/types"
)

type MissingMessage struct {
types.MissingMessage
}

func (m *MissingMessage) Process(ctx context.Context, db *sql.DB, errs *common.MetricErrors, data *types.TelemetryRequest) error {
if err := json.Unmarshal(*data.TelemetryData, &m); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error decoding missing messages: %v", err))
return err
}

if err := m.Put(ctx, db); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error saving missing messages: %v", err))
return err
}

log.Printf("AK: missing messages metric saved: %v", m)
return nil
}

func (m *MissingMessage) Put(ctx context.Context, db *sql.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()

recordId, err := InsertTelemetryRecord(tx, &m.TelemetryRecord)
if err != nil {
return err
}
result := tx.QueryRow("INSERT INTO missingMessages (recordId, messageHash, sentAt, contentTopic, pubsubTopic) VALUES ($1, $2, $3, $4, $5) RETURNING id;", recordId, m.MessageHash, m.SentAt, m.ContentTopic, m.PubsubTopic)
if result.Err() != nil {
return result.Err()
}

err = result.Scan(&m.ID)
if err != nil {
return err
}

return nil
}

func (m *MissingMessage) Clean(db *sql.DB, before int64) (int64, error) {
return common.Cleanup(db, "missingMessages", before)
}

type MissingRelevantMessage struct {
types.MissingRelevantMessage
}

func (m *MissingRelevantMessage) Process(ctx context.Context, db *sql.DB, errs *common.MetricErrors, data *types.TelemetryRequest) error {
if err := json.Unmarshal(*data.TelemetryData, &m); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error decoding missing messages: %v", err))
return err
}

if err := m.Put(ctx, db); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error saving missing messages: %v", err))
return err
}

log.Printf("AK: missing messages metric saved: %v", m)
return nil
}

func (m *MissingRelevantMessage) Put(ctx context.Context, db *sql.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()

recordId, err := InsertTelemetryRecord(tx, &m.TelemetryRecord)
if err != nil {
return err
}
result := tx.QueryRow("INSERT INTO missingRelevantMessages (recordId, messageHash, sentAt, contentTopic, pubsubTopic) VALUES ($1, $2, $3, $4, $5) RETURNING id;", recordId, m.MessageHash, m.SentAt, m.ContentTopic, m.PubsubTopic)
if result.Err() != nil {
return result.Err()
}

err = result.Scan(&m.ID)
if err != nil {
return err
}

return nil
}

func (m *MissingRelevantMessage) Clean(db *sql.DB, before int64) (int64, error) {
return common.Cleanup(db, "missingRelevantMessages", before)
}
20 changes: 20 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const (
PeerCountMetric TelemetryType = "PeerCount"
PeerConnFailureMetric TelemetryType = "PeerConnFailure"
StoreConfirmationFailedMetric TelemetryType = "StoreConfirmationFailed"
MissingMessageMetric TelemetryType = "MissingMessage"
MissingRelevantMessageMetric TelemetryType = "MissingRelevantMessage"
)

type TelemetryRequest struct {
Expand Down Expand Up @@ -111,3 +113,21 @@ type StoreConfirmationFailed struct {
MessageHash string `json:"messageHash"`
Timestamp int64 `json:"timestamp"`
}

type MissingMessage struct {
TelemetryRecord
ID int `json:"id"`
ContentTopic string `json:"contentTopic"`
MessageHash string `json:"messageHash"`
SentAt int64 `json:"sentAt"`
PubsubTopic string `json:"pubsubTopic"`
}

type MissingRelevantMessage struct {
TelemetryRecord
ID int `json:"id"`
ContentTopic string `json:"contentTopic"`
MessageHash string `json:"messageHash"`
SentAt int64 `json:"sentAt"`
PubsubTopic string `json:"pubsubTopic"`
}

0 comments on commit 042a753

Please sign in to comment.