Skip to content

Commit

Permalink
fix: use tx for common fields and other fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Aug 30, 2024
1 parent 0f2fb73 commit 8a5db99
Show file tree
Hide file tree
Showing 11 changed files with 227 additions and 99 deletions.
31 changes: 4 additions & 27 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
)

type TelemetryRequest struct {
Id int `json:"id"`
ID int `json:"id"`
TelemetryType TelemetryType `json:"telemetry_type"`
TelemetryData *json.RawMessage `json:"telemetry_data"`
}
Expand All @@ -28,41 +28,18 @@ type CommonFields struct {
DeviceType string `json:"deviceType"`
}

func (c CommonFields) GetNodeName() string {
return c.NodeName
}

func (c CommonFields) GetPeerID() string {
return c.PeerID
}

func (c CommonFields) GetStatusVersion() string {
return c.StatusVersion
}

func (c CommonFields) GetDeviceType() string {
return c.DeviceType
}

type CommonFieldsAccessor interface {
GetNodeName() string
GetPeerID() string
GetStatusVersion() string
GetDeviceType() string
}

type PeerCount struct {
CommonFields
ID int `json:"id"`
NodeKeyUid string `json:"nodeKeyUid"`
NodeKeyUID string `json:"nodeKeyUID"`
PeerCount int `json:"peerCount"`
Timestamp int64 `json:"timestamp"`
}

type PeerConnFailure struct {
CommonFields
ID int `json:"id"`
NodeKeyUid string `json:"nodeKeyUid"`
NodeKeyUID string `json:"nodeKeyUID"`
FailedPeerId string `json:"failedPeerId"`
FailureCount int `json:"failureCount"`
Timestamp int64 `json:"timestamp"`
Expand All @@ -81,7 +58,7 @@ type SentEnvelope struct {
}

type ErrorSendingEnvelope struct {
Id int `json:"id"`
ID int `json:"id"`
Error string `json:"error"`
SentEnvelope SentEnvelope `json:"sentEnvelope"`
}
Expand Down
63 changes: 54 additions & 9 deletions telemetry/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 6 additions & 7 deletions telemetry/common_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,22 @@ import (
"github.com/status-im/telemetry/pkg/types"
)

func InsertCommonFields(db *sql.DB, data types.CommonFieldsAccessor) (int, error) {
stmt, err := db.Prepare(`
func InsertCommonFields(tx *sql.Tx, data *types.CommonFields) (int, error) {
stmt, err := tx.Prepare(`
INSERT INTO commonFields (nodeName, peerId, statusVersion, deviceType)
VALUES ($1, $2, $3, $4)
RETURNING id;
`)
if err != nil {
return 0, err
}
defer stmt.Close()

var commonFieldsId int
err = stmt.QueryRow(
data.GetNodeName(),
data.GetPeerID(),
data.GetStatusVersion(),
data.GetDeviceType(),
data.NodeName,
data.PeerID,
data.StatusVersion,
data.DeviceType,
).Scan(&commonFieldsId)
if err != nil {
return 0, err
Expand Down
42 changes: 32 additions & 10 deletions telemetry/peer_count.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package telemetry

import (
"context"
"database/sql"
"encoding/json"
"fmt"
Expand All @@ -14,37 +15,47 @@ type PeerCount struct {

func (r *PeerCount) process(db *sql.DB, errs *MetricErrors, data *types.TelemetryRequest) error {
if err := json.Unmarshal(*data.TelemetryData, &r.data); err != nil {
errs.Append(data.Id, fmt.Sprintf("Error decoding peer count: %v", err))
errs.Append(data.ID, fmt.Sprintf("Error decoding peer count: %v", err))
return err
}

commonFieldsId, err := InsertCommonFields(db, r.data)
tx, err := db.BeginTx(context.Background(), nil)
if err != nil {
return err
}
defer tx.Rollback()

peerCountStmt, err := db.Prepare(`
commonFieldsId, err := InsertCommonFields(tx, &r.data.CommonFields)
if err != nil {
return err
}

peerCountStmt, err := tx.Prepare(`
INSERT INTO peerCount (common_fields_id, nodeKeyUid, peerCount)
VALUES ($1, $2, $3)
RETURNING id;
`)
if err != nil {
return err
}
defer peerCountStmt.Close()

var lastInsertId int
err = peerCountStmt.QueryRow(
commonFieldsId,
r.data.NodeKeyUid,
r.data.NodeKeyUID,
r.data.PeerCount,
).Scan(&lastInsertId)
if err != nil {
errs.Append(data.Id, fmt.Sprintf("Error saving peer count: %v", err))
errs.Append(data.ID, fmt.Sprintf("Error saving peer count: %v", err))
return err
}
r.data.ID = lastInsertId

if err := tx.Commit(); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error committing transaction: %v", err))
return err
}

return nil
}

Expand All @@ -54,11 +65,17 @@ type PeerConnFailure struct {

func (r *PeerConnFailure) process(db *sql.DB, errs *MetricErrors, data *types.TelemetryRequest) error {
if err := json.Unmarshal(*data.TelemetryData, &r.data); err != nil {
errs.Append(data.Id, fmt.Sprintf("Error decoding peer connection failure: %v", err))
errs.Append(data.ID, fmt.Sprintf("Error decoding peer connection failure: %v", err))
return err
}

commonFieldsId, err := InsertCommonFields(db, r.data)
tx, err := db.BeginTx(context.Background(), nil)
if err != nil {
return err
}
defer tx.Rollback()

commonFieldsId, err := InsertCommonFields(tx, &r.data.CommonFields)
if err != nil {
return err
}
Expand All @@ -73,15 +90,20 @@ func (r *PeerConnFailure) process(db *sql.DB, errs *MetricErrors, data *types.Te
lastInsertId := 0
err = stmt.QueryRow(
commonFieldsId,
r.data.NodeKeyUid,
r.data.NodeKeyUID,
r.data.FailedPeerId,
r.data.FailureCount,
).Scan(&lastInsertId)
if err != nil {
errs.Append(data.Id, fmt.Sprintf("Error saving peer connection failure: %v", err))
errs.Append(data.ID, fmt.Sprintf("Error saving peer connection failure: %v", err))
return err
}
r.data.ID = lastInsertId

if err := tx.Commit(); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error committing transaction: %v", err))
return err
}

return nil
}
4 changes: 2 additions & 2 deletions telemetry/protocolstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ func (r *ProtocolStats) put(db *sql.DB) error {

func (r *ProtocolStats) process(db *sql.DB, errs *MetricErrors, data *types.TelemetryRequest) (err error) {
if err := json.Unmarshal(*data.TelemetryData, &r.data); err != nil {
errs.Append(data.Id, fmt.Sprintf("Error decoding protocol stats: %v", err))
errs.Append(data.ID, fmt.Sprintf("Error decoding protocol stats: %v", err))
return err
}

if err := r.put(db); err != nil {
errs.Append(data.Id, fmt.Sprintf("Error saving protocol stats: %v", err))
errs.Append(data.ID, fmt.Sprintf("Error saving protocol stats: %v", err))
return err
}

Expand Down
Loading

0 comments on commit 8a5db99

Please sign in to comment.