diff --git a/cmd/server/main.go b/cmd/server/main.go index ba8eaf0..db2f01e 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -66,5 +66,7 @@ func main() { server.RegisterMetric(types.PeerCountByShardMetric, &metrics.PeerCountByShard{}) server.RegisterMetric(types.PeerCountByOriginMetric, &metrics.PeerCountByOrigin{}) + server.RegisterMetric(types.MessageCheckSuccessMetric, &metrics.MessageCheckSuccess{}) + server.RegisterMetric(types.MessageCheckFailureMetric, &metrics.MessageCheckFailure{}) server.Start(*port) } diff --git a/lib/common/test_utils.go b/lib/common/test_utils.go index 0db8016..35b8685 100644 --- a/lib/common/test_utils.go +++ b/lib/common/test_utils.go @@ -40,6 +40,8 @@ func DropTables(db *sql.DB) { "errorsendingenvelope", "peerCountByShard", "peerCountByOrigin", + "messageCheckSuccess", + "messageCheckFailure", "schema_migrations", } @@ -96,6 +98,16 @@ func DropTables(db *sql.DB) { log.Fatalf("an error '%s' was not expected when dropping the index", err) } + _, err = tx.Exec("DROP TABLE IF EXISTS messageCheckSuccess") + if err != nil { + log.Fatalf("an error '%s' was not expected when dropping the table", err) + } + + _, err = tx.Exec("DROP TABLE IF EXISTS messageCheckFailure") + if err != nil { + log.Fatalf("an error '%s' was not expected when dropping the table", err) + } + _, err = tx.Exec("DROP TABLE IF EXISTS schema_migrations") if err != nil { log.Fatalf("an error '%s' was not expected when dropping the table", err) diff --git a/lib/database/bindata.go b/lib/database/bindata.go index a6cee86..24d577b 100644 --- a/lib/database/bindata.go +++ b/lib/database/bindata.go @@ -16,7 +16,9 @@ // 000014_bandwidth_column.up.sql (101B) // 000015_device_type.up.sql (383B) // 000016_common_fields.up.sql (5.018kB) -// 000017_peer_count_shard_origin.up.sql (1.395kB) +// 000017_peer_count_shard_origin.up.sql (1.263kB) +// 000018_waku_req_res.up.sql (461B) +// 000019_message_check.up.sql (1.298kB) // doc.go (72B) package database @@ -405,7 +407,7 @@ func _000016_common_fieldsUpSql() (*asset, error) { return a, nil } -var __000017_peer_count_shard_originUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xb4\x52\x41\x73\xd3\x3c\x14\xbc\xfb\x57\xbc\x9b\xed\x6f\x34\x9d\xe6\x83\x72\xc9\x70\x70\x9c\x97\xa2\xc1\x91\x8b\xa4\x40\x73\xea\x78\x62\x91\x6a\x4a\xe4\x60\x3b\x86\xf0\xeb\x19\x4b\x8e\x13\x6a\xd2\xf6\x40\x75\x7c\x3b\x6f\x77\xdf\xae\x62\x8e\x91\x44\x90\xd1\x24\x41\xa0\x33\x60\xa9\x04\xbc\xa5\x42\x0a\xd8\x2a\x55\xc6\xc5\xce\xd4\x93\xbd\xb8\xcf\xca\x1c\x02\x0f\x00\x40\xe7\x20\x90\xd3\x28\x81\x1b\x4e\xe7\x11\x5f\xc2\x47\x5c\x12\x0b\x95\x6a\x55\x94\x39\xcd\x81\x32\x89\xd7\xc8\x2d\x1b\x5b\x24\x89\x83\x57\x2d\xd9\x19\xac\xb2\x0a\x7f\xc7\x6a\xbd\x51\x55\x9d\x6d\xb6\x67\xf0\x38\x65\x42\xf2\x88\x32\x39\xf0\x7c\xb7\x33\xfa\xfb\x4e\xc1\x82\xd1\x4f\x0b\x84\xe0\xe0\x90\x38\x33\xc4\xe9\x92\xa3\x44\xe8\x85\x63\xcf\x8b\x12\x89\xbc\x0b\x65\x10\x43\x34\x9d\x9e\x4a\x7e\x7d\xb8\x1b\xa8\xd6\xea\x9b\xda\xa8\xba\xdc\x73\xab\x67\x5d\x1e\xde\x2c\xe5\x48\xaf\x59\x9b\xda\xd1\x4f\x08\x1c\x67\xc8\x91\xc5\x28\xe0\xd1\x76\xa0\xf3\xd6\xd3\xcb\x9a\x4a\x4b\xbd\xd6\xe6\x55\xab\x2a\x9c\xc4\xbf\xec\xca\xb9\x7e\xb6\x2c\xa7\xfc\xd2\xb6\xba\x28\x9e\xac\xab\x13\x7e\x8d\xbe\x3c\x81\x09\xc6\x12\xfe\x83\x19\x4f\xe7\xc3\x6f\x94\xf2\x29\x72\x98\x2c\x4f\x22\x9b\xa2\x88\x21\xa1\x73\x2a\x61\x74\x39\x3e\x4f\xd0\x5d\xf6\x0c\xc3\xd3\x7f\xa6\xe8\x2e\xdf\x6f\x55\x75\xfc\x2e\x87\xd2\x06\xff\xc5\x64\x1b\x05\x12\x6f\x65\xdf\xa7\x8d\x9e\x32\x81\x5c\xb6\x6b\xe9\x23\x46\x9d\x13\xbb\x14\x1e\xce\x68\x2e\xda\x51\x73\xd1\x0e\x3d\x7b\x51\xf0\x39\x4a\x16\x28\x2c\x7f\x70\x49\xc0\x5f\x98\x07\x53\xfc\x30\x7e\xe8\x34\x83\x11\x01\x7f\xaa\xab\x55\x73\xd5\x8f\xfe\x27\xe0\x8b\x3a\xab\xf5\xaa\x1f\xbd\x21\xe0\xdf\x28\x55\xe2\xcf\xd5\x7d\x66\xd6\xaa\x07\xde\xb6\xeb\x4c\xb4\x0c\x45\xa3\xca\x7d\x0f\x5c\x11\xf0\xb9\x32\xb9\xfa\xd5\x14\xbb\xaa\x1f\xbf\xeb\x88\xe6\x99\xc9\xd6\xaa\xf4\x43\x2f\x84\x48\x40\x73\x72\xcb\x97\x0f\xc8\xf1\x34\x46\x97\x5c\x77\xe2\xc8\x35\xf5\x47\x10\x6e\x43\xe7\xf0\xde\x26\xe0\x85\xe3\xdf\x01\x00\x00\xff\xff\x74\x5a\x86\xf9\x73\x05\x00\x00") +var __000017_peer_count_shard_originUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xbc\x91\x41\x73\xd3\x30\x10\x85\xef\xfe\x15\x7b\xb3\x3d\xa3\xe9\x50\xa0\x5c\x3a\x1c\x5c\x67\x53\x34\x38\x72\x91\x64\x68\x4f\x1d\x8f\x2d\x52\x4d\x89\x1c\x64\xc7\x10\x7e\x3d\x63\xc9\x71\x42\x43\xa1\x07\xa8\x8e\xfb\x66\xf7\x7b\x7a\x2f\xe5\x98\x48\x04\x99\x5c\x64\x08\x74\x0e\x2c\x97\x80\xd7\x54\x48\x01\x6b\xa5\x6c\xda\x6c\x4c\x77\xb1\x15\x77\xa5\xad\x21\x0a\x00\x00\x74\x0d\x02\x39\x4d\x32\xb8\xe2\x74\x91\xf0\x1b\x78\x8f\x37\xc4\x49\x56\x55\x8d\xad\x69\x0d\x94\x49\xbc\x44\xee\xae\xb1\x22\xcb\xbc\x5c\x0d\xc7\x1e\xd1\x5a\x47\xf8\xbd\xd6\xe9\x95\x6a\xbb\x72\xb5\x7e\x44\x4f\x73\x26\x24\x4f\x28\x93\x47\x9e\x6f\x37\x46\x7f\xdd\x28\x28\x18\xfd\x50\x20\x44\x3b\x87\xc4\x9b\x21\x9e\x4b\xf6\x88\x38\x88\xcf\x83\x20\xc9\x24\xf2\x31\x94\xa3\x18\x92\xd9\xec\x10\xf9\xf9\xfe\xf6\x88\xda\xa9\x2f\x6a\xa5\x3a\xbb\xe5\x8e\xe7\x5c\xee\xde\x3c\xe7\x48\x2f\xd9\x90\xda\xde\x4f\x0c\x1c\xe7\xc8\x91\xa5\x28\xe0\xc1\x76\xa4\xeb\xc1\xd3\xd3\x9a\xca\xad\x5e\x6a\xf3\x5f\xab\x6a\x3c\xe2\x5f\x76\xe5\x5d\xff\xb5\x2c\x4f\x7e\x6a\x5b\x63\x14\x7f\xac\x6b\x04\x3f\x73\x5f\xcd\x48\xdd\xae\x55\xbb\xaf\x6a\x17\xd8\x51\x57\xa6\x5c\x29\x90\x78\x2d\xa7\x2c\xdd\xb7\x29\x13\xc8\xe5\xb0\x96\x3f\xb8\xa8\x6b\xe2\x96\xe2\x40\x60\x86\xa9\x84\xfe\x64\x18\xf5\x27\xc3\x30\x98\xf3\x7c\x01\xd1\xc7\x24\x2b\x50\xb8\xfb\xd1\x0b\x02\x61\x61\xee\x4d\xf3\xcd\x84\xb1\x67\x46\xa7\x04\xc2\x99\x6e\xab\xfe\x6c\x1a\xbd\x24\x10\x8a\xae\xec\x74\x35\x8d\x5e\x11\x08\xaf\x94\xb2\xf8\xbd\xba\x2b\xcd\x52\x4d\xc2\xeb\x61\x9d\x89\xe1\x42\xd3\x2b\xbb\x9d\x84\x33\x02\x21\x57\xa6\x56\x3f\xfa\x66\xd3\x4e\xe3\x37\xe3\xa1\x45\x69\xca\xa5\xb2\x61\x1c\xc4\x90\x08\xe8\x0f\xfe\xf2\xe9\x1d\x72\x3c\x8c\xd1\x27\x37\x7e\xf1\x14\xdc\xbf\x7e\x09\xc2\x6f\xe8\x1a\xde\xba\x04\x82\xf8\xfc\x67\x00\x00\x00\xff\xff\x81\x70\x45\x2e\xef\x04\x00\x00") func _000017_peer_count_shard_originUpSqlBytes() ([]byte, error) { return bindataRead( @@ -420,8 +422,48 @@ func _000017_peer_count_shard_originUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "000017_peer_count_shard_origin.up.sql", size: 1395, mode: os.FileMode(0644), modTime: time.Unix(1726004768, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x3c, 0x97, 0x9d, 0x53, 0xca, 0x43, 0x5f, 0x73, 0xea, 0x51, 0xb4, 0x3f, 0xc1, 0xec, 0xe2, 0xe2, 0x31, 0xfc, 0x6e, 0x83, 0xb4, 0x61, 0x67, 0xf9, 0xb, 0x78, 0xf4, 0x45, 0x7f, 0x2, 0xdb, 0xa4}} + info := bindataFileInfo{name: "000017_peer_count_shard_origin.up.sql", size: 1263, mode: os.FileMode(0644), modTime: time.Unix(1726263354, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x42, 0xe9, 0x98, 0xa, 0xfc, 0xf1, 0x85, 0xe9, 0xd, 0x30, 0x30, 0xda, 0x15, 0x15, 0x3a, 0xf6, 0xf8, 0x87, 0xa, 0x16, 0x69, 0x20, 0x6b, 0x8b, 0xec, 0x27, 0x4c, 0xa, 0x49, 0x65, 0x2a, 0xe8}} + return a, nil +} + +var __000018_waku_req_resUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x84\x8e\xcd\x4e\xeb\x30\x10\x46\xd7\xc9\x53\xcc\xb2\x95\xb2\xb8\xba\x52\x56\xac\xdc\x62\xa8\x85\xeb\x80\x33\x41\xed\x0a\xb9\xc9\x88\x46\x34\xb1\xeb\x1f\xc1\xe3\x23\x68\x85\x82\x44\x61\x3b\xe7\xe8\x9b\xc3\x24\x72\x0d\xc8\x16\x92\x83\xb8\x01\x55\x21\xf0\x8d\xa8\xb1\x86\x57\xf3\x92\x34\x1d\x13\x85\xa8\x29\x38\x3b\x06\x82\x59\x9e\xf5\x1d\xd4\x5c\x0b\x26\xe1\x5e\x8b\x35\xd3\x5b\xb8\xe3\xdb\x22\xcf\x9c\xb7\xd1\xb6\xf6\x00\x8f\x4c\x2f\x57\x4c\xcf\xca\x7f\xf3\xcf\x39\xd5\x48\x59\xe4\x19\xb9\x3d\x0d\xe4\xcd\x01\x16\x55\x25\x39\x53\x53\x18\xfb\x81\x42\x34\x83\x03\xa1\x90\xdf\x72\x3d\x85\x81\x68\xc4\xdf\x84\xd6\x93\x89\xd4\xb1\xf8\x23\xb4\x63\xa4\x31\xa2\x75\x7d\xfb\xd5\xf6\xbf\x2c\xbf\xc5\xb9\xb4\x0b\x69\xf7\x87\x43\xe4\x45\x77\x11\x0f\x14\x82\x79\xa6\x95\x09\xfb\x8b\x0e\x79\x6f\xfd\xfa\x24\x02\xf2\x0d\x7e\xdc\xde\xa2\x37\xd7\x26\x9a\xf3\x21\xcf\x96\x95\xaa\x51\x33\xa1\x10\xce\xa3\xe1\x29\x8d\xfd\x31\x11\x34\x4a\x3c\x34\x1c\x66\xa7\x96\x02\x26\x4f\xe7\xf9\xfc\xea\x3d\x00\x00\xff\xff\x49\x51\x33\x13\xcd\x01\x00\x00") + +func _000018_waku_req_resUpSqlBytes() ([]byte, error) { + return bindataRead( + __000018_waku_req_resUpSql, + "000018_waku_req_res.up.sql", + ) +} + +func _000018_waku_req_resUpSql() (*asset, error) { + bytes, err := _000018_waku_req_resUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "000018_waku_req_res.up.sql", size: 461, mode: os.FileMode(0644), modTime: time.Unix(1726623187, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xf4, 0xd3, 0xb8, 0x9d, 0x66, 0x9, 0x67, 0xeb, 0xda, 0xd8, 0x10, 0xe2, 0x42, 0xaa, 0x6f, 0xb6, 0xae, 0x28, 0xdd, 0xf7, 0xfe, 0xce, 0x88, 0x28, 0x44, 0x8e, 0xcc, 0x4f, 0xc6, 0x82, 0x8d, 0x32}} + return a, nil +} + +var __000019_message_checkUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x91\x4f\x6f\xda\x40\x10\xc5\xcf\xf8\x53\xcc\xd1\x96\x38\x54\x95\x38\xe5\xb4\x31\x43\xb2\xaa\xb3\x6e\xd7\x4b\x95\x9c\xa2\x8d\x3d\x2d\x16\xd8\xeb\xec\x1f\xb5\xfd\xf6\x95\xc1\x34\x86\x02\xed\x81\x43\x7c\xdc\x37\x9e\xf7\x7b\xf3\x52\x89\x4c\x21\x28\x76\x9b\x21\xf0\x05\x88\x5c\x01\x3e\xf2\x42\x15\xf0\x43\xaf\x83\xa4\xd7\x40\xce\x4b\x72\x9d\x69\x1d\x41\x1c\x4d\xea\x0a\x0a\x94\x9c\x65\xf0\x59\xf2\x07\x26\x9f\xe0\x13\x3e\x4d\xa3\x49\x67\x8d\x37\xa5\xd9\xc0\x57\x26\xd3\x7b\x26\xe3\xd9\x87\x64\xbb\x4e\x2c\xb3\x6c\x1a\x4d\xa8\x5b\x51\x43\x56\x6f\xe0\x36\xcf\x33\x64\x62\x2c\xfa\xba\x21\xe7\x75\xd3\x01\x17\x0a\xef\x50\x8e\x45\x47\xd4\xaa\x4b\x03\xa5\x25\xed\xa9\x62\xfe\xa4\x68\x5a\x4f\xad\x57\xa6\xab\xcb\x3f\x6c\x1f\x67\xb3\x03\xb8\x2e\xbc\xb8\xf0\xf2\x8f\x19\x22\xcb\xab\xb3\x72\x43\xce\xe9\xef\x74\xaf\xdd\xea\xec\x0c\x59\x6b\xec\xc3\x6e\x10\x14\x3e\xaa\xfe\xed\xa7\xb7\x7a\xae\xbd\x1e\x1e\xa2\x49\x9a\x8b\x42\x49\xc6\x85\x82\x61\xa9\x7b\x0e\x6d\xfd\x1a\x08\x96\x82\x7f\x59\x22\xc4\x3b\x96\x29\x8c\x4c\x93\x28\xb9\x89\xa2\x0b\x7d\x0e\xb3\xe9\x8a\xca\x75\x11\xca\x92\x9c\x83\x38\x02\x00\x38\x53\x69\x2f\x59\x2a\x8d\xad\x78\x75\xe2\xb2\xbd\x3c\x0e\xdd\xe3\x1f\xc9\x97\x6a\xed\xf5\xbf\x93\x8e\xe9\x8e\x43\xef\x59\x0e\x62\x4f\xdf\x4c\x76\x17\x60\x99\x42\x39\x1c\xe0\x54\x64\x36\x9f\x8f\x7d\xbf\xad\x9f\x4f\x59\x7b\xda\x50\x43\xde\xfe\x92\x5b\xd3\x2d\xed\xfe\x5b\xe4\x12\xf9\x9d\xe8\x8f\xf4\x06\x95\x80\xc4\x05\x4a\x14\x29\x16\x70\xf4\x77\x5c\x57\xc9\xcd\xc1\x8a\xff\xed\x69\xa1\xeb\x4d\xb0\xf4\x4e\x7b\x1a\xe8\xae\xdb\xd3\x3e\xf2\xe5\x9e\xf6\xd6\xd7\xef\xe9\x77\x00\x00\x00\xff\xff\x35\x25\x57\xa4\x12\x05\x00\x00") + +func _000019_message_checkUpSqlBytes() ([]byte, error) { + return bindataRead( + __000019_message_checkUpSql, + "000019_message_check.up.sql", + ) +} + +func _000019_message_checkUpSql() (*asset, error) { + bytes, err := _000019_message_checkUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "000019_message_check.up.sql", size: 1298, mode: os.FileMode(0644), modTime: time.Unix(1726623393, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x15, 0x17, 0x41, 0x93, 0x56, 0x36, 0x3d, 0x52, 0x0, 0x77, 0xbb, 0x1b, 0xd7, 0x88, 0x89, 0xcf, 0x48, 0x7b, 0xd5, 0xc5, 0x2c, 0xad, 0x2c, 0x33, 0xe4, 0x43, 0x1d, 0x18, 0xe8, 0xc5, 0xfd, 0xe3}} return a, nil } @@ -553,6 +595,8 @@ var _bindata = map[string]func() (*asset, error){ "000015_device_type.up.sql": _000015_device_typeUpSql, "000016_common_fields.up.sql": _000016_common_fieldsUpSql, "000017_peer_count_shard_origin.up.sql": _000017_peer_count_shard_originUpSql, + "000018_waku_req_res.up.sql": _000018_waku_req_resUpSql, + "000019_message_check.up.sql": _000019_message_checkUpSql, "doc.go": docGo, } @@ -619,6 +663,8 @@ var _bintree = &bintree{nil, map[string]*bintree{ "000015_device_type.up.sql": {_000015_device_typeUpSql, map[string]*bintree{}}, "000016_common_fields.up.sql": {_000016_common_fieldsUpSql, map[string]*bintree{}}, "000017_peer_count_shard_origin.up.sql": {_000017_peer_count_shard_originUpSql, map[string]*bintree{}}, + "000018_waku_req_res.up.sql": {_000018_waku_req_resUpSql, map[string]*bintree{}}, + "000019_message_check.up.sql": {_000019_message_checkUpSql, map[string]*bintree{}}, "doc.go": {docGo, map[string]*bintree{}}, }} diff --git a/lib/database/sql/000019_message_check.up.sql b/lib/database/sql/000019_message_check.up.sql new file mode 100644 index 0000000..c50e74a --- /dev/null +++ b/lib/database/sql/000019_message_check.up.sql @@ -0,0 +1,38 @@ +CREATE TABLE IF NOT EXISTS wakuRequestResponse ( + id SERIAL PRIMARY KEY, + protocol VARCHAR(50) NOT NULL, + ephemeral BOOLEAN NOT NULL, + timestamp INTEGER NOT NULL, + seenTimestamp INTEGER NOT NULL, + createdAt INTEGER NOT NULL, + contentTopic VARCHAR(255) NOT NULL, + pubsubTopic VARCHAR(255) NOT NULL, + peerId VARCHAR(255) NOT NULL, + messageHash VARCHAR(255) NOT NULL, + errorMessage TEXT, + extraData TEXT, + + CONSTRAINT messages_unique UNIQUE (peerId, messageHash) +); + +CREATE TABLE IF NOT EXISTS messageCheckSuccess ( + id SERIAL PRIMARY KEY, + recordId INTEGER NOT NULL, + messageHash TEXT NOT NULL, + timestamp INTEGER NOT NULL, + CONSTRAINT messageCheckSuccess_unique UNIQUE (recordId, messageHash, timestamp) +); + +ALTER TABLE messageCheckSuccess ADD CONSTRAINT fk_messageCheckSuccess_telemetryRecord + FOREIGN KEY (recordId) REFERENCES telemetryRecord(id); + +CREATE TABLE IF NOT EXISTS messageCheckFailure ( + id SERIAL PRIMARY KEY, + recordId INTEGER NOT NULL, + messageHash TEXT NOT NULL, + timestamp INTEGER NOT NULL, + CONSTRAINT messageCheckFailure_unique UNIQUE (recordId, messageHash, timestamp) +); + +ALTER TABLE messageCheckFailure ADD CONSTRAINT fk_messageCheckFailure_telemetryRecord + FOREIGN KEY (recordId) REFERENCES telemetryRecord(id); \ No newline at end of file diff --git a/lib/metrics/generic.go b/lib/metrics/generic.go new file mode 100644 index 0000000..325eb3c --- /dev/null +++ b/lib/metrics/generic.go @@ -0,0 +1,124 @@ +package metrics + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "reflect" + "strings" + + "github.com/status-im/telemetry/lib/common" + "github.com/status-im/telemetry/pkg/types" +) + +// GenericMetric is a generic struct that can handle any metric type +type GenericMetric[T any] struct { + types.TelemetryRecord + Data T +} + +// MetricProcessor is an interface for processing metrics +type MetricProcessor interface { + Process(context.Context, *sql.DB, *common.MetricErrors, *types.TelemetryRequest) error + Clean(*sql.DB, int64) (int64, error) +} + +// NewMetricProcessor creates a new MetricProcessor for the given metric type +func NewMetricProcessor[T types.TelemetryRecord]() MetricProcessor { + return &GenericMetric[T]{ + Data: *new(T), + } +} + +// Process implements the MetricProcessor interface +func (g *GenericMetric[T]) Process(ctx context.Context, db *sql.DB, errs *common.MetricErrors, data *types.TelemetryRequest) error { + // Unmarshal the TelemetryRecord fields + if err := json.Unmarshal(*data.TelemetryData, &g.TelemetryRecord); err != nil { + errs.Append(data.ID, fmt.Sprintf("Error decoding TelemetryRecord: %v", err)) + return err + } + + // Unmarshal the Data field + if err := json.Unmarshal(*data.TelemetryData, &g.Data); err != nil { + errs.Append(data.ID, fmt.Sprintf("Error decoding %T: %v", g.Data, err)) + return err + } + + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + + recordId, err := InsertTelemetryRecord(tx, &g.TelemetryRecord) + if err != nil { + return err + } + + columns, values := getColumnsAndValues(g.Data) + placeholders := make([]string, len(columns)+1) + for i := range placeholders { + placeholders[i] = fmt.Sprintf("$%d", i+1) + } + + tableName := getTableName(g.Data) + query := fmt.Sprintf(` + INSERT INTO %s (recordId, %s) + VALUES (%s) + RETURNING id; + `, tableName, strings.Join(columns, ", "), strings.Join(placeholders, ", ")) + + args := []interface{}{recordId} + args = append(args, values...) + + result := tx.QueryRowContext(ctx, query, args...) + if result.Err() != nil { + errs.Append(data.ID, fmt.Sprintf("Error saving %T: %v", g.Data, result.Err())) + return result.Err() + } + + var lastInsertId int + err = result.Scan(&lastInsertId) + if err != nil { + return err + } + + if err := tx.Commit(); err != nil { + errs.Append(data.ID, fmt.Sprintf("Error committing transaction: %v", err)) + return err + } + + return nil +} + +// Clean implements the MetricProcessor interface +func (g *GenericMetric[T]) Clean(db *sql.DB, before int64) (int64, error) { + tableName := getTableName(g.Data) + return common.Cleanup(db, tableName, before) +} + +// Helper functions + +func getColumnsAndValues(v interface{}) ([]string, []interface{}) { + var columns []string + var values []interface{} + t := reflect.TypeOf(v) + val := reflect.ValueOf(v) + + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + tag := field.Tag.Get("json") + if tag != "" && tag != "-" { + columnName := strings.Split(tag, ",")[0] + columns = append(columns, columnName) + values = append(values, val.Field(i).Interface()) + } + } + return columns, values +} + +func getTableName(v interface{}) string { + t := reflect.TypeOf(v) + return strings.ToLower(t.Name()) +} diff --git a/lib/metrics/message_check.go b/lib/metrics/message_check.go new file mode 100644 index 0000000..17d6892 --- /dev/null +++ b/lib/metrics/message_check.go @@ -0,0 +1,18 @@ +package metrics + +import ( + "github.com/status-im/telemetry/pkg/types" +) + +type MessageCheckSuccess struct { + GenericMetric[types.MessageCheckSuccess] +} + +type MessageCheckFailure struct { + GenericMetric[types.MessageCheckFailure] +} + +var ( + MessageCheckSuccessProcessor = &MessageCheckSuccess{} + MessageCheckFailureProcessor = &MessageCheckFailure{} +) diff --git a/pkg/types/types.go b/pkg/types/types.go index b8cee66..7d29d00 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -15,6 +15,8 @@ const ( PeerConnFailureMetric TelemetryType = "PeerConnFailure" PeerCountByShardMetric TelemetryType = "PeerCountByShard" PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin" + MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess" + MessageCheckFailureMetric TelemetryType = "MessageCheckFailure" ) type Origin int64 @@ -133,3 +135,15 @@ type PeerCountByOrigin struct { Origin Origin `json:"origin"` Timestamp int64 `json:"timestamp"` } + +type MessageCheckSuccess struct { + TelemetryRecord + MessageHash string `json:"messageHash"` + Timestamp int64 `json:"timestamp"` +} + +type MessageCheckFailure struct { + TelemetryRecord + MessageHash string `json:"messageHash"` + Timestamp int64 `json:"timestamp"` +}