Skip to content

Commit

Permalink
feat: generic metrics and message check metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Sep 18, 2024
1 parent 72eb47f commit 29781ae
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 4 deletions.
2 changes: 2 additions & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,7 @@ func main() {
server.RegisterMetric(types.PeerCountByShardMetric, &metrics.PeerCountByShard{})
server.RegisterMetric(types.PeerCountByOriginMetric, &metrics.PeerCountByOrigin{})

server.RegisterMetric(types.MessageCheckSuccessMetric, &metrics.MessageCheckSuccess{})
server.RegisterMetric(types.MessageCheckFailureMetric, &metrics.MessageCheckFailure{})
server.Start(*port)
}
12 changes: 12 additions & 0 deletions lib/common/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func DropTables(db *sql.DB) {
"errorsendingenvelope",
"peerCountByShard",
"peerCountByOrigin",
"messageCheckSuccess",
"messageCheckFailure",
"schema_migrations",
}

Expand Down Expand Up @@ -96,6 +98,16 @@ func DropTables(db *sql.DB) {
log.Fatalf("an error '%s' was not expected when dropping the index", err)
}

_, err = tx.Exec("DROP TABLE IF EXISTS messageCheckSuccess")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the table", err)
}

_, err = tx.Exec("DROP TABLE IF EXISTS messageCheckFailure")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the table", err)
}

_, err = tx.Exec("DROP TABLE IF EXISTS schema_migrations")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the table", err)
Expand Down
54 changes: 50 additions & 4 deletions lib/database/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions lib/database/sql/000019_message_check.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
CREATE TABLE IF NOT EXISTS wakuRequestResponse (
id SERIAL PRIMARY KEY,
protocol VARCHAR(50) NOT NULL,
ephemeral BOOLEAN NOT NULL,
timestamp INTEGER NOT NULL,
seenTimestamp INTEGER NOT NULL,
createdAt INTEGER NOT NULL,
contentTopic VARCHAR(255) NOT NULL,
pubsubTopic VARCHAR(255) NOT NULL,
peerId VARCHAR(255) NOT NULL,
messageHash VARCHAR(255) NOT NULL,
errorMessage TEXT,
extraData TEXT,

CONSTRAINT messages_unique UNIQUE (peerId, messageHash)
);

CREATE TABLE IF NOT EXISTS messageCheckSuccess (
id SERIAL PRIMARY KEY,
recordId INTEGER NOT NULL,
messageHash TEXT NOT NULL,
timestamp INTEGER NOT NULL,
CONSTRAINT messageCheckSuccess_unique UNIQUE (recordId, messageHash, timestamp)
);

ALTER TABLE messageCheckSuccess ADD CONSTRAINT fk_messageCheckSuccess_telemetryRecord
FOREIGN KEY (recordId) REFERENCES telemetryRecord(id);

CREATE TABLE IF NOT EXISTS messageCheckFailure (
id SERIAL PRIMARY KEY,
recordId INTEGER NOT NULL,
messageHash TEXT NOT NULL,
timestamp INTEGER NOT NULL,
CONSTRAINT messageCheckFailure_unique UNIQUE (recordId, messageHash, timestamp)
);

ALTER TABLE messageCheckFailure ADD CONSTRAINT fk_messageCheckFailure_telemetryRecord
FOREIGN KEY (recordId) REFERENCES telemetryRecord(id);
124 changes: 124 additions & 0 deletions lib/metrics/generic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package metrics

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"reflect"
"strings"

"github.com/status-im/telemetry/lib/common"
"github.com/status-im/telemetry/pkg/types"
)

// GenericMetric is a generic struct that can handle any metric type
type GenericMetric[T any] struct {
types.TelemetryRecord
Data T
}

// MetricProcessor is an interface for processing metrics
type MetricProcessor interface {
Process(context.Context, *sql.DB, *common.MetricErrors, *types.TelemetryRequest) error
Clean(*sql.DB, int64) (int64, error)
}

// NewMetricProcessor creates a new MetricProcessor for the given metric type
func NewMetricProcessor[T types.TelemetryRecord]() MetricProcessor {
return &GenericMetric[T]{
Data: *new(T),
}
}

// Process implements the MetricProcessor interface
func (g *GenericMetric[T]) Process(ctx context.Context, db *sql.DB, errs *common.MetricErrors, data *types.TelemetryRequest) error {
// Unmarshal the TelemetryRecord fields
if err := json.Unmarshal(*data.TelemetryData, &g.TelemetryRecord); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error decoding TelemetryRecord: %v", err))
return err
}

// Unmarshal the Data field
if err := json.Unmarshal(*data.TelemetryData, &g.Data); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error decoding %T: %v", g.Data, err))
return err
}

tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()

recordId, err := InsertTelemetryRecord(tx, &g.TelemetryRecord)
if err != nil {
return err
}

columns, values := getColumnsAndValues(g.Data)
placeholders := make([]string, len(columns)+1)
for i := range placeholders {
placeholders[i] = fmt.Sprintf("$%d", i+1)
}

tableName := getTableName(g.Data)
query := fmt.Sprintf(`
INSERT INTO %s (recordId, %s)
VALUES (%s)
RETURNING id;
`, tableName, strings.Join(columns, ", "), strings.Join(placeholders, ", "))

args := []interface{}{recordId}
args = append(args, values...)

result := tx.QueryRowContext(ctx, query, args...)
if result.Err() != nil {
errs.Append(data.ID, fmt.Sprintf("Error saving %T: %v", g.Data, result.Err()))
return result.Err()
}

var lastInsertId int
err = result.Scan(&lastInsertId)
if err != nil {
return err
}

if err := tx.Commit(); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error committing transaction: %v", err))
return err
}

return nil
}

// Clean implements the MetricProcessor interface
func (g *GenericMetric[T]) Clean(db *sql.DB, before int64) (int64, error) {
tableName := getTableName(g.Data)
return common.Cleanup(db, tableName, before)
}

// Helper functions

func getColumnsAndValues(v interface{}) ([]string, []interface{}) {
var columns []string
var values []interface{}
t := reflect.TypeOf(v)
val := reflect.ValueOf(v)

for i := 0; i < t.NumField(); i++ {
field := t.Field(i)
tag := field.Tag.Get("json")
if tag != "" && tag != "-" {
columnName := strings.Split(tag, ",")[0]
columns = append(columns, columnName)
values = append(values, val.Field(i).Interface())
}
}
return columns, values
}

func getTableName(v interface{}) string {
t := reflect.TypeOf(v)
return strings.ToLower(t.Name())
}
18 changes: 18 additions & 0 deletions lib/metrics/message_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package metrics

import (
"github.com/status-im/telemetry/pkg/types"
)

type MessageCheckSuccess struct {
GenericMetric[types.MessageCheckSuccess]
}

type MessageCheckFailure struct {
GenericMetric[types.MessageCheckFailure]
}

var (
MessageCheckSuccessProcessor = &MessageCheckSuccess{}
MessageCheckFailureProcessor = &MessageCheckFailure{}
)
14 changes: 14 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const (
PeerConnFailureMetric TelemetryType = "PeerConnFailure"
PeerCountByShardMetric TelemetryType = "PeerCountByShard"
PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin"
MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess"
MessageCheckFailureMetric TelemetryType = "MessageCheckFailure"
)

type Origin int64
Expand Down Expand Up @@ -133,3 +135,15 @@ type PeerCountByOrigin struct {
Origin Origin `json:"origin"`
Timestamp int64 `json:"timestamp"`
}

type MessageCheckSuccess struct {
TelemetryRecord
MessageHash string `json:"messageHash"`
Timestamp int64 `json:"timestamp"`
}

type MessageCheckFailure struct {
TelemetryRecord
MessageHash string `json:"messageHash"`
Timestamp int64 `json:"timestamp"`
}

0 comments on commit 29781ae

Please sign in to comment.