From c7c2c3479f102f4f42ecfc0fae1694cbc7e43d2a Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Mon, 30 Sep 2024 19:28:01 -0700 Subject: [PATCH] feat: add delivery confirmed metric --- cmd/server/main.go | 2 +- lib/common/test_utils.go | 1 + lib/database/bindata.go | 111 +++++++++++------- .../000022_message_delivery_confirmed.up.sql | 10 ++ lib/metrics/missing_messages.go | 9 +- pkg/types/types.go | 40 ++++--- 6 files changed, 110 insertions(+), 63 deletions(-) create mode 100644 lib/database/sql/000022_message_delivery_confirmed.up.sql diff --git a/cmd/server/main.go b/cmd/server/main.go index 48f6d77..1c44132 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -67,7 +67,7 @@ func main() { server.RegisterMetric(types.PeerCountByOriginMetric, &metrics.PeerCountByOrigin{}) server.RegisterMetric(types.MissingMessageMetric, &metrics.MissingMessage{}) server.RegisterMetric(types.MissingRelevantMessageMetric, &metrics.MissingRelevantMessage{}) - + server.RegisterMetric(types.MessageDeliveryConfirmedMetric, &metrics.MessageDeliveryConfirmed{}) server.RegisterMetric(types.MessageCheckSuccessMetric, &metrics.MessageCheckSuccess{}) server.RegisterMetric(types.MessageCheckFailureMetric, &metrics.MessageCheckFailure{}) server.RegisterMetric(types.DialFailureMetric, &metrics.DialFailure{}) diff --git a/lib/common/test_utils.go b/lib/common/test_utils.go index 4af4e54..b2003bf 100644 --- a/lib/common/test_utils.go +++ b/lib/common/test_utils.go @@ -45,6 +45,7 @@ func DropTables(db *sql.DB) { "dialFailure", "missingmessages", "missingrelevantmessages", + "messageDeliveryConfirmed", "schema_migrations", } diff --git a/lib/database/bindata.go b/lib/database/bindata.go index 4d5c701..e9b1c41 100644 --- a/lib/database/bindata.go +++ b/lib/database/bindata.go @@ -21,6 +21,7 @@ // 000019_message_check.up.sql (1.298kB) // 000020_dial_failure.up.sql (955B) // 000021_missing_messages.up.sql (1.05kB) +// 000022_message_delivery_confirmed.up.sql (430B) // doc.go (72B) package database @@ -509,6 +510,26 @@ func _000021_missing_messagesUpSql() (*asset, error) { return a, nil } +var __000022_message_delivery_confirmedUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x74\x90\xc1\x4e\xc3\x30\x10\x44\xef\xf9\x8a\x3d\x26\x52\xfe\xa0\x27\x93\x6c\x8a\x85\x71\x60\xbd\x91\xda\x53\x15\xe1\x2d\x58\xd4\x2d\x38\x29\x52\xff\x1e\x11\x5a\xb5\xaa\x94\xbd\x8e\x66\xe6\xed\x54\x84\x8a\x11\x58\x3d\x18\x04\xdd\x80\x6d\x19\x70\xa5\x1d\x3b\x88\x32\x0c\xfd\xbb\xd4\xb2\x0b\x3f\x92\x4e\xd5\x61\xbf\x0d\x29\x8a\x87\x3c\x03\x00\x08\x1e\x1c\x92\x56\x06\x5e\x48\x3f\x2b\x5a\xc3\x13\xae\xcb\x49\x4a\xf2\x76\x48\x5e\x7b\xd0\x96\x71\x89\x34\xa5\xda\xce\x98\x7f\xf9\x1c\xfc\xd8\x0f\x1f\xc0\xb8\xe2\x3b\x79\x0c\x51\x86\xb1\x8f\x5f\x33\xf6\xaa\xb5\x8e\x49\x69\xcb\xb3\x88\x9b\xe3\x3e\x7c\x1f\x05\x3a\xab\x5f\x3b\x84\xfc\x02\x54\xde\x76\x97\xd7\xa6\x22\x2b\x16\x59\xa6\x0c\x23\x9d\xa7\x98\x7d\x5e\xd5\xf5\x2d\xc1\xf6\x73\x33\x0b\x31\xca\x4e\xa2\x8c\xe9\x44\x53\xfd\x04\x7f\xb9\xa6\x25\xd4\x4b\xfb\xb7\xd9\x15\xaf\x00\xc2\x06\x09\x6d\x85\x0e\xee\xdc\x79\xf0\xc5\xe2\x37\x00\x00\xff\xff\x97\xa4\xc0\x91\xae\x01\x00\x00") + +func _000022_message_delivery_confirmedUpSqlBytes() ([]byte, error) { + return bindataRead( + __000022_message_delivery_confirmedUpSql, + "000022_message_delivery_confirmed.up.sql", + ) +} + +func _000022_message_delivery_confirmedUpSql() (*asset, error) { + bytes, err := _000022_message_delivery_confirmedUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "000022_message_delivery_confirmed.up.sql", size: 430, mode: os.FileMode(0644), modTime: time.Unix(1727749551, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xc5, 0x7f, 0xab, 0xa9, 0xe9, 0x88, 0x53, 0xad, 0x22, 0x1c, 0xb, 0x83, 0xdf, 0xbf, 0xf2, 0xb4, 0x45, 0xed, 0xb0, 0xed, 0x77, 0x38, 0x35, 0xdc, 0x1c, 0xde, 0x19, 0x2, 0x69, 0x3, 0x11, 0x6d}} + return a, nil +} + var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xc1\x09\x80\x30\x0c\x05\xd0\x7b\xa7\xf8\x0b\x24\xb9\xbb\x4d\xaa\xe1\x23\x95\xa6\xda\xee\x8f\x08\xde\x1e\xbc\xe1\x7b\x73\x06\xe6\x7d\x95\x62\xc6\xdc\x18\x3d\x1e\x5f\x01\xa6\xd4\xb3\x1f\xbe\x1c\x32\x1a\xf1\xa9\xfa\x0c\x48\x42\xd5\xfe\x53\x26\xd4\xca\x1b\x00\x00\xff\xff\xeb\x76\xd0\x31\x48\x00\x00\x00") func docGoBytes() ([]byte, error) { @@ -620,28 +641,29 @@ func AssetNames() []string { // _bindata is a table, holding each asset generator, mapped to its name. var _bindata = map[string]func() (*asset, error){ - "000001_message_type.up.sql": _000001_message_typeUpSql, - "000002_bandwidth_protocol.up.sql": _000002_bandwidth_protocolUpSql, - "000003_index_truncate.up.sql": _000003_index_truncateUpSql, - "000004_envelope.table.up.sql": _000004_envelopeTableUpSql, - "000005_pushed_envelope.up.sql": _000005_pushed_envelopeUpSql, - "000006_status_version.up.sql": _000006_status_versionUpSql, - "000007_waku_push_filter.up.sql": _000007_waku_push_filterUpSql, - "000008_error_sending_envelope.up.sql": _000008_error_sending_envelopeUpSql, - "000009_peer_count.up.sql": _000009_peer_countUpSql, - "000010_peer_id.up.sql": _000010_peer_idUpSql, - "000011_waku_push_error.up.sql": _000011_waku_push_errorUpSql, - "000012_waku_generic.up.sql": _000012_waku_genericUpSql, - "000013_peer_conn_failure.up.sql": _000013_peer_conn_failureUpSql, - "000014_bandwidth_column.up.sql": _000014_bandwidth_columnUpSql, - "000015_device_type.up.sql": _000015_device_typeUpSql, - "000016_common_fields.up.sql": _000016_common_fieldsUpSql, - "000017_peer_count_shard_origin.up.sql": _000017_peer_count_shard_originUpSql, - "000018_waku_req_res.up.sql": _000018_waku_req_resUpSql, - "000019_message_check.up.sql": _000019_message_checkUpSql, - "000020_dial_failure.up.sql": _000020_dial_failureUpSql, - "000021_missing_messages.up.sql": _000021_missing_messagesUpSql, - "doc.go": docGo, + "000001_message_type.up.sql": _000001_message_typeUpSql, + "000002_bandwidth_protocol.up.sql": _000002_bandwidth_protocolUpSql, + "000003_index_truncate.up.sql": _000003_index_truncateUpSql, + "000004_envelope.table.up.sql": _000004_envelopeTableUpSql, + "000005_pushed_envelope.up.sql": _000005_pushed_envelopeUpSql, + "000006_status_version.up.sql": _000006_status_versionUpSql, + "000007_waku_push_filter.up.sql": _000007_waku_push_filterUpSql, + "000008_error_sending_envelope.up.sql": _000008_error_sending_envelopeUpSql, + "000009_peer_count.up.sql": _000009_peer_countUpSql, + "000010_peer_id.up.sql": _000010_peer_idUpSql, + "000011_waku_push_error.up.sql": _000011_waku_push_errorUpSql, + "000012_waku_generic.up.sql": _000012_waku_genericUpSql, + "000013_peer_conn_failure.up.sql": _000013_peer_conn_failureUpSql, + "000014_bandwidth_column.up.sql": _000014_bandwidth_columnUpSql, + "000015_device_type.up.sql": _000015_device_typeUpSql, + "000016_common_fields.up.sql": _000016_common_fieldsUpSql, + "000017_peer_count_shard_origin.up.sql": _000017_peer_count_shard_originUpSql, + "000018_waku_req_res.up.sql": _000018_waku_req_resUpSql, + "000019_message_check.up.sql": _000019_message_checkUpSql, + "000020_dial_failure.up.sql": _000020_dial_failureUpSql, + "000021_missing_messages.up.sql": _000021_missing_messagesUpSql, + "000022_message_delivery_confirmed.up.sql": _000022_message_delivery_confirmedUpSql, + "doc.go": docGo, } // AssetDebug is true if the assets were built with the debug flag enabled. @@ -690,28 +712,29 @@ type bintree struct { } var _bintree = &bintree{nil, map[string]*bintree{ - "000001_message_type.up.sql": {_000001_message_typeUpSql, map[string]*bintree{}}, - "000002_bandwidth_protocol.up.sql": {_000002_bandwidth_protocolUpSql, map[string]*bintree{}}, - "000003_index_truncate.up.sql": {_000003_index_truncateUpSql, map[string]*bintree{}}, - "000004_envelope.table.up.sql": {_000004_envelopeTableUpSql, map[string]*bintree{}}, - "000005_pushed_envelope.up.sql": {_000005_pushed_envelopeUpSql, map[string]*bintree{}}, - "000006_status_version.up.sql": {_000006_status_versionUpSql, map[string]*bintree{}}, - "000007_waku_push_filter.up.sql": {_000007_waku_push_filterUpSql, map[string]*bintree{}}, - "000008_error_sending_envelope.up.sql": {_000008_error_sending_envelopeUpSql, map[string]*bintree{}}, - "000009_peer_count.up.sql": {_000009_peer_countUpSql, map[string]*bintree{}}, - "000010_peer_id.up.sql": {_000010_peer_idUpSql, map[string]*bintree{}}, - "000011_waku_push_error.up.sql": {_000011_waku_push_errorUpSql, map[string]*bintree{}}, - "000012_waku_generic.up.sql": {_000012_waku_genericUpSql, map[string]*bintree{}}, - "000013_peer_conn_failure.up.sql": {_000013_peer_conn_failureUpSql, map[string]*bintree{}}, - "000014_bandwidth_column.up.sql": {_000014_bandwidth_columnUpSql, map[string]*bintree{}}, - "000015_device_type.up.sql": {_000015_device_typeUpSql, map[string]*bintree{}}, - "000016_common_fields.up.sql": {_000016_common_fieldsUpSql, map[string]*bintree{}}, - "000017_peer_count_shard_origin.up.sql": {_000017_peer_count_shard_originUpSql, map[string]*bintree{}}, - "000018_waku_req_res.up.sql": {_000018_waku_req_resUpSql, map[string]*bintree{}}, - "000019_message_check.up.sql": {_000019_message_checkUpSql, map[string]*bintree{}}, - "000020_dial_failure.up.sql": {_000020_dial_failureUpSql, map[string]*bintree{}}, - "000021_missing_messages.up.sql": {_000021_missing_messagesUpSql, map[string]*bintree{}}, - "doc.go": {docGo, map[string]*bintree{}}, + "000001_message_type.up.sql": {_000001_message_typeUpSql, map[string]*bintree{}}, + "000002_bandwidth_protocol.up.sql": {_000002_bandwidth_protocolUpSql, map[string]*bintree{}}, + "000003_index_truncate.up.sql": {_000003_index_truncateUpSql, map[string]*bintree{}}, + "000004_envelope.table.up.sql": {_000004_envelopeTableUpSql, map[string]*bintree{}}, + "000005_pushed_envelope.up.sql": {_000005_pushed_envelopeUpSql, map[string]*bintree{}}, + "000006_status_version.up.sql": {_000006_status_versionUpSql, map[string]*bintree{}}, + "000007_waku_push_filter.up.sql": {_000007_waku_push_filterUpSql, map[string]*bintree{}}, + "000008_error_sending_envelope.up.sql": {_000008_error_sending_envelopeUpSql, map[string]*bintree{}}, + "000009_peer_count.up.sql": {_000009_peer_countUpSql, map[string]*bintree{}}, + "000010_peer_id.up.sql": {_000010_peer_idUpSql, map[string]*bintree{}}, + "000011_waku_push_error.up.sql": {_000011_waku_push_errorUpSql, map[string]*bintree{}}, + "000012_waku_generic.up.sql": {_000012_waku_genericUpSql, map[string]*bintree{}}, + "000013_peer_conn_failure.up.sql": {_000013_peer_conn_failureUpSql, map[string]*bintree{}}, + "000014_bandwidth_column.up.sql": {_000014_bandwidth_columnUpSql, map[string]*bintree{}}, + "000015_device_type.up.sql": {_000015_device_typeUpSql, map[string]*bintree{}}, + "000016_common_fields.up.sql": {_000016_common_fieldsUpSql, map[string]*bintree{}}, + "000017_peer_count_shard_origin.up.sql": {_000017_peer_count_shard_originUpSql, map[string]*bintree{}}, + "000018_waku_req_res.up.sql": {_000018_waku_req_resUpSql, map[string]*bintree{}}, + "000019_message_check.up.sql": {_000019_message_checkUpSql, map[string]*bintree{}}, + "000020_dial_failure.up.sql": {_000020_dial_failureUpSql, map[string]*bintree{}}, + "000021_missing_messages.up.sql": {_000021_missing_messagesUpSql, map[string]*bintree{}}, + "000022_message_delivery_confirmed.up.sql": {_000022_message_delivery_confirmedUpSql, map[string]*bintree{}}, + "doc.go": {docGo, map[string]*bintree{}}, }} // RestoreAsset restores an asset under the given directory. diff --git a/lib/database/sql/000022_message_delivery_confirmed.up.sql b/lib/database/sql/000022_message_delivery_confirmed.up.sql new file mode 100644 index 0000000..2d74fcf --- /dev/null +++ b/lib/database/sql/000022_message_delivery_confirmed.up.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS messageDeliveryConfirmed ( + id SERIAL PRIMARY KEY, + recordId INTEGER NOT NULL, + messageHash TEXT NOT NULL, + timestamp INTEGER NOT NULL, + CONSTRAINT messageDeliveryConfirmed_unique UNIQUE (recordId, messageHash, timestamp) +); + +ALTER TABLE messageDeliveryConfirmed ADD CONSTRAINT fk_messageDeliveryConfirmed_telemetryRecord + FOREIGN KEY (recordId) REFERENCES telemetryRecord(id); \ No newline at end of file diff --git a/lib/metrics/missing_messages.go b/lib/metrics/missing_messages.go index 34826f0..d8f3fb3 100644 --- a/lib/metrics/missing_messages.go +++ b/lib/metrics/missing_messages.go @@ -12,7 +12,12 @@ type MissingRelevantMessage struct { GenericMetric[types.MissingRelevantMessage] } +type MessageDeliveryConfirmed struct { + GenericMetric[types.MessageDeliveryConfirmed] +} + var ( - MissingMessageProcessor = &MissingMessage{} - MissingRelevantMessageProcessor = &MissingRelevantMessage{} + MissingMessageProcessor = &MissingMessage{} + MissingRelevantMessageProcessor = &MissingRelevantMessage{} + MessageDeliveryConfirmedProcessor = &MessageDeliveryConfirmed{} ) diff --git a/pkg/types/types.go b/pkg/types/types.go index 2f57a1e..3b8fa1f 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -5,22 +5,23 @@ import "encoding/json" type TelemetryType string const ( - ProtocolStatsMetric TelemetryType = "ProtocolStats" - ReceivedEnvelopeMetric TelemetryType = "ReceivedEnvelope" - SentEnvelopeMetric TelemetryType = "SentEnvelope" - UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" - ReceivedMessagesMetric TelemetryType = "ReceivedMessages" - ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" - PeerCountMetric TelemetryType = "PeerCount" - PeerConnFailureMetric TelemetryType = "PeerConnFailure" - PeerCountByShardMetric TelemetryType = "PeerCountByShard" - PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin" - MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess" - MessageCheckFailureMetric TelemetryType = "MessageCheckFailure" - DialFailureMetric TelemetryType = "DialFailure" - StoreConfrimationErrorMetric TelemetryType = "StoreConfrimationError" - MissingMessageMetric TelemetryType = "MissedMessage" - MissingRelevantMessageMetric TelemetryType = "MissedRelevantMessage" + ProtocolStatsMetric TelemetryType = "ProtocolStats" + ReceivedEnvelopeMetric TelemetryType = "ReceivedEnvelope" + SentEnvelopeMetric TelemetryType = "SentEnvelope" + UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" + ReceivedMessagesMetric TelemetryType = "ReceivedMessages" + ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" + PeerCountMetric TelemetryType = "PeerCount" + PeerConnFailureMetric TelemetryType = "PeerConnFailure" + PeerCountByShardMetric TelemetryType = "PeerCountByShard" + PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin" + MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess" + MessageCheckFailureMetric TelemetryType = "MessageCheckFailure" + DialFailureMetric TelemetryType = "DialFailure" + StoreConfrimationErrorMetric TelemetryType = "StoreConfrimationError" + MissingMessageMetric TelemetryType = "MissedMessage" + MissingRelevantMessageMetric TelemetryType = "MissedRelevantMessage" + MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed" ) type Origin int64 @@ -189,3 +190,10 @@ type MissingRelevantMessage struct { SentAt int64 `json:"sentAt"` PubsubTopic string `json:"pubsubTopic"` } + +type MessageDeliveryConfirmed struct { + TelemetryRecord + ID int `json:"id"` + MessageHash string `json:"messageHash"` + Timestamp int64 `json:"timestamp"` +}