diff --git a/hermes/Cargo.lock b/hermes/Cargo.lock index 06c74ebc54..1d43b7ee7d 100644 --- a/hermes/Cargo.lock +++ b/hermes/Cargo.lock @@ -391,6 +391,28 @@ dependencies = [ "trust-dns-resolver", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite 0.2.10", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2 1.0.66", + "quote 1.0.31", + "syn 2.0.26", +] + [[package]] name = "async-task" version = "4.4.0" @@ -1902,6 +1924,8 @@ dependencies = [ "mock_instant", "nonzero_ext", "prometheus-client", + "prost 0.12.1", + "prost-build 0.12.1", "pyth-sdk", "pythnet-sdk", "rand 0.8.5", @@ -1917,6 +1941,8 @@ dependencies = [ "solana-sdk", "strum", "tokio", + "tonic", + "tonic-build", "tower-http", "tracing", "tracing-subscriber", @@ -2081,11 +2107,23 @@ dependencies = [ "futures-util", "http", "hyper", - "rustls 0.21.5", + "rustls 0.21.7", "tokio", "tokio-rustls 0.24.1", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite 0.2.10", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -2431,8 +2469,8 @@ dependencies = [ "libp2p-request-response", "libp2p-swarm", "log", - "prost", - "prost-build", + "prost 0.9.0", + "prost-build 0.9.0", "rand 0.8.5", ] @@ -2458,8 +2496,8 @@ dependencies = [ "multistream-select", "parking_lot 0.11.2", "pin-project 1.1.2", - "prost", - "prost-build", + "prost 0.9.0", + "prost-build 0.9.0", "rand 0.8.5", "ring", "rw-stream-sink", @@ -2508,8 +2546,8 @@ dependencies = [ "libp2p-core", "libp2p-swarm", "log", - "prost", - "prost-build", + "prost 0.9.0", + "prost-build 0.9.0", "rand 0.7.3", "smallvec", ] @@ -2534,8 +2572,8 @@ dependencies = [ "log", "open-metrics-client", "pin-project 1.1.2", - "prost", - "prost-build", + "prost 0.9.0", + "prost-build 0.9.0", "rand 0.7.3", "regex", "sha2 0.10.7", @@ -2555,8 +2593,8 @@ dependencies = [ "libp2p-swarm", "log", "lru", - "prost", - "prost-build", + "prost 0.9.0", + "prost-build 0.9.0", "smallvec", ] @@ -2577,8 +2615,8 @@ dependencies = [ "libp2p-core", "libp2p-swarm", "log", - "prost", - "prost-build", + "prost 0.9.0", + "prost-build 0.9.0", "rand 0.7.3", "sha2 0.10.7", "smallvec", @@ -2655,8 +2693,8 @@ dependencies = [ "lazy_static", "libp2p-core", "log", - "prost", - "prost-build", + "prost 0.9.0", + "prost-build 0.9.0", "rand 0.8.5", "sha2 0.10.7", "snow", @@ -2692,8 +2730,8 @@ dependencies = [ "futures", "libp2p-core", "log", - "prost", - "prost-build", + "prost 0.9.0", + "prost-build 0.9.0", "unsigned-varint", "void", ] @@ -2728,8 +2766,8 @@ dependencies = [ "libp2p-swarm", "log", "pin-project 1.1.2", - "prost", - "prost-build", + "prost 0.9.0", + "prost-build 0.9.0", "rand 0.8.5", "smallvec", "static_assertions", @@ -2752,8 +2790,8 @@ dependencies = [ "libp2p-core", "libp2p-swarm", "log", - "prost", - "prost-build", + "prost 0.9.0", + "prost-build 0.9.0", "rand 0.8.5", "sha2 0.10.7", "thiserror", @@ -3916,6 +3954,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "prettyplease" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c64d9ba0963cdcea2e1b2230fbae2bab30eb25a174be395c41e764bfb65dd62" +dependencies = [ + "proc-macro2 1.0.66", + "syn 2.0.26", +] + [[package]] name = "proc-macro-crate" version = "0.1.5" @@ -4007,7 +4055,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.9.0", +] + +[[package]] +name = "prost" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4fdd22f3b9c31b53c060df4a0613a1c7f062d4115a2b984dd15b1858f7e340d" +dependencies = [ + "bytes", + "prost-derive 0.12.1", ] [[package]] @@ -4023,13 +4081,35 @@ dependencies = [ "log", "multimap", "petgraph", - "prost", - "prost-types", + "prost 0.9.0", + "prost-types 0.9.0", "regex", "tempfile", "which", ] +[[package]] +name = "prost-build" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac" +dependencies = [ + "bytes", + "heck 0.4.1", + "itertools", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost 0.12.1", + "prost-types 0.12.1", + "regex", + "syn 2.0.26", + "tempfile", + "which", +] + [[package]] name = "prost-derive" version = "0.9.0" @@ -4043,6 +4123,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "prost-derive" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2 1.0.66", + "quote 1.0.31", + "syn 2.0.26", +] + [[package]] name = "prost-types" version = "0.9.0" @@ -4050,7 +4143,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a" dependencies = [ "bytes", - "prost", + "prost 0.9.0", +] + +[[package]] +name = "prost-types" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e081b29f63d83a4bc75cfc9f3fe424f9156cf92d8a4f0c9407cce9a1b67327cf" +dependencies = [ + "prost 0.12.1", ] [[package]] @@ -4420,7 +4522,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite 0.2.10", - "rustls 0.21.5", + "rustls 0.21.7", "rustls-pemfile 1.0.3", "serde", "serde_json", @@ -4605,9 +4707,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.5" +version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36" +checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" dependencies = [ "log", "ring", @@ -4647,9 +4749,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.101.1" +version = "0.101.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15f36a6828982f422756984e47912a7a51dcbc2a197aa791158f8ca61cd8204e" +checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe" dependencies = [ "ring", "untrusted", @@ -6035,6 +6137,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite 0.2.10", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.1.0" @@ -6073,7 +6185,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.5", + "rustls 0.21.7", "tokio", ] @@ -6156,6 +6268,49 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c00bc15e49625f3d2f20b17082601e5e17cf27ead69e805174026c194b6664" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.2", + "bytes", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project 1.1.2", + "prost 0.12.1", + "rustls 0.21.7", + "rustls-pemfile 1.0.3", + "tokio", + "tokio-rustls 0.24.1", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9d37bb15da06ae9bb945963066baca6561b505af93a52e949a85d28558459a2" +dependencies = [ + "prettyplease", + "proc-macro2 1.0.66", + "prost-build 0.12.1", + "quote 1.0.31", + "syn 2.0.26", +] + [[package]] name = "tower" version = "0.4.13" @@ -6164,9 +6319,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project 1.1.2", "pin-project-lite 0.2.10", + "rand 0.8.5", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", diff --git a/hermes/Cargo.toml b/hermes/Cargo.toml index 0f590a73dd..4eea0c25f7 100644 --- a/hermes/Cargo.toml +++ b/hermes/Cargo.toml @@ -5,14 +5,15 @@ description = "Hermes is an agent that provides Verified Prices from the Pythnet edition = "2021" [dependencies] -async-trait = { version = "0.1.73" } anyhow = { version = "1.0.69" } +async-trait = { version = "0.1.73" } axum = { version = "0.6.20", features = ["json", "ws", "macros"] } axum-macros = { version = "0.3.8" } base64 = { version = "0.21.0" } borsh = { version = "0.10.3" } byteorder = { version = "1.4.3" } chrono = { version = "0.4.28" } +clap = { version = "4.4.4", features = ["derive", "env", "cargo"] } dashmap = { version = "5.4.0" } derive_more = { version = "0.99.17" } env_logger = { version = "0.10.0" } @@ -27,6 +28,7 @@ log = { version = "0.4.17" } mock_instant = { version = "0.3.1", features = ["sync"] } nonzero_ext = { version = "0.3.0" } prometheus-client = { version = "0.21.1" } +prost = { version = "0.12.1" } pyth-sdk = { version = "0.8.0" } pythnet-sdk = { path = "../pythnet/pythnet_sdk/", version = "2.0.0", features = ["strum"] } rand = { version = "0.8.5" } @@ -37,9 +39,9 @@ serde_json = { version = "1.0.93" } serde_qs = { version = "0.12.0", features = ["axum"] } serde_wormhole = { git = "https://github.com/wormhole-foundation/wormhole", tag = "v2.17.1" } sha3 = { version = "0.10.4" } -clap = { version = "4.4.4", features = ["derive", "env", "cargo"] } strum = { version = "0.24.1", features = ["derive"] } tokio = { version = "1.26.0", features = ["full"] } +tonic = { version = "0.10.1", features = ["tls"] } tower-http = { version = "0.4.0", features = ["cors"] } tracing = { version = "0.1.37", features = ["log"] } tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } @@ -67,6 +69,12 @@ solana-client = { version = "=1.13.3" } solana-sdk = { version = "=1.13.3" } solana-account-decoder = { version = "=1.13.3" } + +[build-dependencies] +prost-build = { version = "0.12.1" } +tonic-build = { version = "0.10.1" } + + # Wormhole uses patching to resolve some of its own dependencies. We need to # make sure that we use the same patch instead of simply pointing the original # dependency at git otherwise those relative imports will fail. diff --git a/hermes/buf.gen.yaml b/hermes/buf.gen.yaml deleted file mode 100644 index 56378a67f5..0000000000 --- a/hermes/buf.gen.yaml +++ /dev/null @@ -1,11 +0,0 @@ -# This file specifies how `buf` should generate `p2p.pb.go`. The `- M` flag -# rewrites the module from `gossipsubv1` as specified in the Wormhole repo to -# `main` so it can compile in our repo (as we don't have a full Go project). -version: v1beta1 -plugins: - - name: go - out: . - path: tools/bin/protoc-gen-go - opt: - - Msrc/network/p2p.proto=github.com/pyth-network/main;main - - paths=source_relative diff --git a/hermes/build.rs b/hermes/build.rs index 00afb2730b..aa22092c4b 100644 --- a/hermes/build.rs +++ b/hermes/build.rs @@ -1,113 +1,49 @@ use std::{ env, path::PathBuf, - process::{ - Command, - Stdio, - }, + process::Command, }; fn main() { let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); - let out_var = env::var("OUT_DIR").unwrap(); - // Download the Wormhole repository at a certain tag, which we need to access the protobuf definitions - // for Wormhole P2P message types. - // - // TODO: This is ugly. Instead of this we should have our own tool - // build process that can generate protobuf definitions for this and other user cases. For now - // this is easy and works and matches upstream Wormhole's `Makefile`. - - const WORMHOLE_VERSION: &str = "2.18.1"; - - let wh_curl = Command::new("curl") - .args([ - "-s", - "-L", - format!("https://github.com/wormhole-foundation/wormhole/archive/refs/tags/v{WORMHOLE_VERSION}.tar.gz").as_str(), - ]) - .stdout(Stdio::piped()) - .spawn() - .expect("failed to download wormhole archive"); - - let _ = Command::new("tar") - .args(["xvz"]) - .stdin(Stdio::from(wh_curl.stdout.unwrap())) - .output() - .expect("failed to extract wormhole archive"); - - // Move the tools directory to the root of the repo because that's where the build script - // expects it to be, paths get hardcoded into the binaries. - let _ = Command::new("mv") - .args([ - format!("wormhole-{WORMHOLE_VERSION}/tools").as_str(), - "tools", - ]) + // Print OUT_DIR for debugging build issues. + println!("OUT_DIR={}", out_dir.display()); + + // We'll use git to pull in protobuf dependencies. This trick lets us use the Rust OUT_DIR + // directory as a mini-repo with wormhole and googleapis as remotes, so we can copy out the + // TREEISH paths we want. + let protobuf_setup = r#" + git init . + git clean -df + git remote add wormhole https://github.com/wormhole-foundation/wormhole.git + git remote add googleapis https://github.com/googleapis/googleapis.git + git fetch --depth=1 --porcelain wormhole main + git fetch --depth=1 --porcelain googleapis master + git read-tree --prefix=proto/ -u wormhole/main:proto + git read-tree --prefix=proto/google/api/ -u googleapis/master:google/api + "#; + + // Run each command to prepare the OUT_DIR with the protobuf definitions. We need to make sure + // to change the working directory to OUT_DIR, otherwise git will complain. + let _ = Command::new("sh") + .args(["-c", protobuf_setup]) + .current_dir(&out_dir) .output() - .expect("failed to move wormhole tools directory"); - - // Move the protobuf definitions to the src/network directory, we don't have to do this - // but it is more intuitive when debugging. - let _ = Command::new("mv") - .args([ - format!("wormhole-{WORMHOLE_VERSION}/proto/gossip/v1/gossip.proto").as_str(), - "src/network/p2p.proto", - ]) - .output() - .expect("failed to move wormhole protobuf definitions"); - - // Build the protobuf compiler. - let _ = Command::new("./build.sh") - .current_dir("tools") - .output() - .expect("failed to run protobuf compiler build script"); - - // Make the protobuf compiler executable. - let _ = Command::new("chmod") - .args(["+x", "tools/bin/*"]) - .output() - .expect("failed to make protofuf compiler executable"); - - // Generate the protobuf definitions. See buf.gen.yaml to see how we rename the module for our - // particular use case. - let _ = Command::new("./tools/bin/buf") - .args(["generate", "--path", "src"]) - .output() - .expect("failed to generate protobuf definitions"); - - let rust_target_arch = std::env::var("CARGO_CFG_TARGET_ARCH").unwrap(); - - // Build the Go library. - let mut cmd = Command::new("go"); - cmd.arg("build") - .arg("-buildmode=c-archive") - .arg("-o") - .arg(out_dir.join("libpythnet.a")) - .arg("src/network/p2p.go") - .arg("src/network/p2p.pb.go"); - - // Cross-compile the Go binary based on the Rust target architecture - match &*rust_target_arch { - "x86_64" => { - // CGO_ENABLED required for building amd64 on mac os - cmd.env("GOARCH", "amd64").env("CGO_ENABLED", "1"); - } - "aarch64" => { - cmd.env("GOARCH", "arm64"); - } - // Add other target architectures as needed. - _ => {} - } - - - // Tell Rust to link our Go library at compile time. - println!("cargo:rustc-link-search=native={out_var}"); - println!("cargo:rustc-link-lib=static=pythnet"); - println!("cargo:rustc-link-lib=resolv"); - - let go_build_output = cmd.output().expect("Failed to execute Go build command"); - if !go_build_output.status.success() { - let error_message = String::from_utf8_lossy(&go_build_output.stderr); - panic!("Go build failed:\n{}", error_message); - } + .expect("failed to setup protobuf definitions"); + + // We build the resulting protobuf definitions using Rust's prost_build crate, which generates + // Rust code from the protobuf definitions. + tonic_build::configure() + .build_server(false) + .compile( + &[ + out_dir.join("proto/spy/v1/spy.proto"), + out_dir.join("proto/gossip/v1/gossip.proto"), + out_dir.join("proto/node/v1/node.proto"), + out_dir.join("proto/publicrpc/v1/publicrpc.proto"), + ], + &[out_dir.join("proto")], + ) + .expect("failed to compile protobuf definitions"); } diff --git a/hermes/src/aggregate.rs b/hermes/src/aggregate.rs index 05c8a27666..0468c2c0ea 100644 --- a/hermes/src/aggregate.rs +++ b/hermes/src/aggregate.rs @@ -19,6 +19,7 @@ use { WormholeMerkleState, }, crate::{ + network::wormhole::VaaBytes, state::{ benchmarks::Benchmarks, cache::{ @@ -28,7 +29,6 @@ use { }, State, }, - wormhole::VaaBytes, }, anyhow::{ anyhow, diff --git a/hermes/src/aggregate/wormhole_merkle.rs b/hermes/src/aggregate/wormhole_merkle.rs index 86cc72ad7f..f9bf410e73 100644 --- a/hermes/src/aggregate/wormhole_merkle.rs +++ b/hermes/src/aggregate/wormhole_merkle.rs @@ -5,11 +5,11 @@ use { Slot, }, crate::{ + network::wormhole::VaaBytes, state::cache::{ AggregateCache, MessageState, }, - wormhole::VaaBytes, }, anyhow::{ anyhow, diff --git a/hermes/src/config/wormhole.rs b/hermes/src/config/wormhole.rs index 2b62c5c56f..89b3696e6b 100644 --- a/hermes/src/config/wormhole.rs +++ b/hermes/src/config/wormhole.rs @@ -1,48 +1,27 @@ use { clap::Args, - libp2p::Multiaddr, solana_sdk::pubkey::Pubkey, }; -const DEFAULT_LISTEN_ADDRS: &str = "/ip4/0.0.0.0/udp/30910/quic,/ip6/::/udp/30910/quic"; const DEFAULT_CONTRACT_ADDR: &str = "H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU"; -const DEFAULT_NETWORK_ID: &str = "/wormhole/mainnet/2"; -const DEFAULT_BOOTSTRAP_ADDRS: &str = concat![ - "/dns4/wormhole-mainnet-v2-bootstrap.certus.one/udp/8999/quic/p2p/12D3KooWQp644DK27fd3d4Km3jr7gHiuJJ5ZGmy8hH4py7fP4FP7,", - "/dns4/wormhole-v2-mainnet-bootstrap.xlabs.xyz/udp/8999/quic/p2p/12D3KooWNQ9tVrcb64tw6bNs2CaNrUGPM7yRrKvBBheQ5yCyPHKC", -]; +const DEFAULT_WORMHOLE_RPC_ADDR: &str = "grpc://127.0.0.1:7073"; #[derive(Args, Clone, Debug)] #[command(next_help_heading = "Wormhole Options")] #[group(id = "Wormhole")] pub struct Options { - /// Multiaddresses for Wormhole bootstrap peers (separated by comma). - /// - /// Bootstraps can be found from the official Wormhole repository, note that these addresses - /// are only used to bootstrap peer discovery and are not necessarily used for data transfer. - /// Adding more peers will speed up P2P peer discovery. - #[arg(long = "wormhole-bootstrap-addrs")] - #[arg(value_delimiter = ',')] - #[arg(default_value = DEFAULT_BOOTSTRAP_ADDRS)] - #[arg(env = "WORMHOLE_BOOTSTRAP_ADDRS")] - pub bootstrap_addrs: Vec, - /// Address of the Wormhole contract on the target PythNet cluster. #[arg(long = "wormhole-contract-addr")] #[arg(default_value = DEFAULT_CONTRACT_ADDR)] #[arg(env = "WORMHOLE_CONTRACT_ADDR")] pub contract_addr: Pubkey, - /// Multiaddresses to bind for Wormhole P2P (separated by comma) - #[arg(long = "wormhole-listen-addrs")] - #[arg(value_delimiter = ',')] - #[arg(default_value = DEFAULT_LISTEN_ADDRS)] - #[arg(env = "WORMHOLE_LISTEN_ADDRS")] - pub listen_addrs: Vec, - - /// Network ID for Wormhole - #[arg(long = "wormhole-network-id")] - #[arg(default_value = DEFAULT_NETWORK_ID)] - #[arg(env = "WORMHOLE_NETWORK_ID")] - pub network_id: String, + /// gRPC endpoint for a Wormhole node. + /// + /// This can either be a standard Wormhole node gRPC endpoint or a beacon endpoint if + /// load-balancing is desired. + #[arg(long = "wormhole-spy-rpc-addr")] + #[arg(default_value = DEFAULT_WORMHOLE_RPC_ADDR)] + #[arg(env = "WORMHOLE_RPC_ADDR")] + pub spy_rpc_addr: String, } diff --git a/hermes/src/main.rs b/hermes/src/main.rs index 6e2fa4b0ba..638b8f4865 100644 --- a/hermes/src/main.rs +++ b/hermes/src/main.rs @@ -2,13 +2,13 @@ #![feature(btree_cursors)] use { - crate::state::State, anyhow::Result, clap::{ CommandFactory, Parser, }, futures::future::join_all, + state::State, std::{ io::IsTerminal, sync::atomic::AtomicBool, @@ -23,7 +23,6 @@ mod doc_examples; mod network; mod serde; mod state; -mod wormhole; // A static exit flag to indicate to running threads that we're shutting down. This is used to // gracefully shutdown the application. @@ -61,7 +60,7 @@ async fn init() -> Result<()> { // Spawn all worker tasks, and wait for all to complete (which will happen if a shutdown // signal has been observed). let tasks = join_all([ - Box::pin(spawn(network::p2p::spawn(opts.clone(), store.clone()))), + Box::pin(spawn(network::wormhole::spawn(opts.clone(), store.clone()))), Box::pin(spawn(network::pythnet::spawn(opts.clone(), store.clone()))), Box::pin(spawn(api::run(opts.clone(), store.clone(), update_rx))), ]) diff --git a/hermes/src/network.rs b/hermes/src/network.rs index 96d3190dbb..83d86f90b8 100644 --- a/hermes/src/network.rs +++ b/hermes/src/network.rs @@ -1,2 +1,2 @@ -pub mod p2p; pub mod pythnet; +pub mod wormhole; diff --git a/hermes/src/network/p2p.go b/hermes/src/network/p2p.go deleted file mode 100644 index 306ee8a63b..0000000000 --- a/hermes/src/network/p2p.go +++ /dev/null @@ -1,284 +0,0 @@ -// This package is derived from the node/pkgs/p2p.go file in the Wormhole project. -// -// This file has been stripped down to only what is necessary to participate in -// P2P and receive message and VAA observations from the network. It is not -// intended to be used as a full node implementation and can be replaced with -// Rust code once QUIC+TLS stable support is available in rust-libp2p. - -package main - -// #include -// #include -// -// // A structure containing Wormhole VAA observations. This must match on both -// // the Go and Rust side. -// typedef struct { -// char const *vaa; -// size_t vaa_len; -// } observation_t; -// -// // A small proxy method to invoke the Rust callback from CGo. This is due -// // to the fact that CGo does not support calling C functions directly from -// // Go. By passing it to this proxy Go is able to suspend the GC correctly -// // and the callback is invoked from a separate thread. -// typedef void (*callback_t)(observation_t); -// static void invoke(callback_t f, observation_t o) { f(o); } -import "C" - -import ( - "context" - "fmt" - "os" - "strings" - "time" - - "net/http" - _ "net/http/pprof" - - "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/protocol" - "github.com/libp2p/go-libp2p/core/routing" - "github.com/libp2p/go-libp2p/p2p/net/connmgr" - "github.com/multiformats/go-multiaddr" - "google.golang.org/protobuf/proto" - "gopkg.in/DataDog/dd-trace-go.v1/profiler" - - golog "github.com/ipfs/go-log/v2" - dht "github.com/libp2p/go-libp2p-kad-dht" - pubsub "github.com/libp2p/go-libp2p-pubsub" - libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls" - libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" - libp2pquicreuse "github.com/libp2p/go-libp2p/p2p/transport/quicreuse" -) - -//export RegisterObservationCallback -func RegisterObservationCallback(f C.callback_t, network_id, bootstrap_addrs, listen_addrs *C.char) { - networkID := C.GoString(network_id) - bootstrapAddrs := strings.Split(C.GoString(bootstrap_addrs), ",") - listenAddrs := strings.Split(C.GoString(listen_addrs), ",") - ctx := context.Background() - - // Check ENV variable "GO_LOG_ALL", and set all Go loggers (accross all libp2p - // protocols) to INFO level. - if os.Getenv("GO_LOG") != "" { - level, _ := golog.LevelFromString(os.Getenv("GO_LOG")) - golog.SetAllLoggers(level) - } - - // Bind pprof to 6060 for debugging Go code. - go func() { - http.ListenAndServe("127.0.0.1:6060", nil) - }() - - var startTime int64 - var recoverRerun func() - - routine := func() { - // If the current goroutine fails for any reasons, recoverRerun will be - // triggered which will sleep for 1 second and rerun the routine. - defer recoverRerun() - - // Record the current time for profiling and backoff purposes. - startTime = time.Now().UnixNano() - - // Start the datadog profiler. Only starts if an environment is - // specified and a datadog API key is provided. - if os.Getenv("DD_PROFILING_ENV") != "" && os.Getenv("DD_AGENT_ADDR") != "" { - err := profiler.Start( - profiler.WithAgentAddr(os.Getenv("DD_AGENT_ADDR")), - profiler.WithService("hermes-p2p"), - profiler.WithEnv(os.Getenv("DD_PROFILING_ENV")), - profiler.WithProfileTypes( - profiler.CPUProfile, - profiler.HeapProfile, - profiler.GoroutineProfile, - ), - ) - - if err != nil { - err := fmt.Errorf("Failed to start profiler: %w", err) - fmt.Println(err) - return - } - - defer profiler.Stop() - } - - // Setup base network configuration. - priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1) - if err != nil { - err := fmt.Errorf("Failed to generate key pair: %w", err) - fmt.Println(err) - return - } - - // Setup libp2p Connection Manager. - mgr, err := connmgr.NewConnManager( - 100, - 400, - connmgr.WithGracePeriod(0), - ) - - if err != nil { - err := fmt.Errorf("Failed to create connection manager: %w", err) - fmt.Println(err) - return - } - - // Setup libp2p Reactor. - h, err := libp2p.New( - libp2p.Identity(priv), - libp2p.ListenAddrStrings(listenAddrs...), - libp2p.Security(libp2ptls.ID, libp2ptls.New), - // Disable Reuse because upon panic, the Close() call on the p2p reactor does not properly clean up the - // open ports (they are kept around for re-use, this seems to be a libp2p bug in the reuse `gc()` call - // which can be found here: - // - // https://github.com/libp2p/go-libp2p/blob/master/p2p/transport/quicreuse/reuse.go#L97 - // - // By disabling this we get correct Close() behaviour. - // - // IMPORTANT: Normally re-use allows libp2p to dial on the same port that is used to listen for traffic - // and by disabling this dialing uses a random high port (32768-60999) which causes the nodes that we - // connect to by dialing (instead of them connecting to us) will respond on the high range port instead - // of the specified Dial port. This requires firewalls to be configured to allow (UDP 32768-60999) which - // should be specified in our documentation. - // - // The best way to securely enable this range is via the conntrack module, which can statefully allow - // UDP packets only when a sent UDP packet is present in the conntrack table. This rule looks roughly - // like this: - // - // iptables -A INPUT -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT - // - // Which is a standard rule in many firewall configurations (RELATED is the key flag). - libp2p.QUICReuse(libp2pquicreuse.NewConnManager, libp2pquicreuse.DisableReuseport()), - libp2p.Transport(libp2pquic.NewTransport), - libp2p.ConnectionManager(mgr), - libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { - bootstrappers := make([]peer.AddrInfo, 0) - for _, addr := range bootstrapAddrs { - ma, err := multiaddr.NewMultiaddr(addr) - if err != nil { - continue - } - - pi, err := peer.AddrInfoFromP2pAddr(ma) - if err != nil || pi.ID == h.ID() { - continue - } - - bootstrappers = append(bootstrappers, *pi) - } - idht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer), - dht.ProtocolPrefix(protocol.ID("/"+networkID)), - dht.BootstrapPeers(bootstrappers...), - dht.RoutingTableFilter(func(dht interface{}, p peer.ID) bool { - peerInfo := h.Peerstore().PeerInfo(p) - fmt.Printf("Accepted Peer: %s; Addresses: ", peerInfo.ID) - for _, addr := range peerInfo.Addrs { - fmt.Printf("%s ", addr) - } - fmt.Println("") - - return true - }), - ) - return idht, err - }), - ) - - if err != nil { - err := fmt.Errorf("Failed to create libp2p host: %w", err) - fmt.Println(err) - return - } - - defer h.Close() - - topic := fmt.Sprintf("%s/%s", networkID, "broadcast") - ps, err := pubsub.NewGossipSub( - ctx, - h, - pubsub.WithValidateQueueSize(1024), - ) - if err != nil { - err := fmt.Errorf("Failed to create Pubsub: %w", err) - fmt.Println(err) - return - } - - th, err := ps.Join(topic) - if err != nil { - err := fmt.Errorf("Failed to join topic: %w", err) - fmt.Println(err) - return - } - - defer th.Close() - - sub, err := th.Subscribe(pubsub.WithBufferSize(1024)) - if err != nil { - err := fmt.Errorf("Failed to subscribe topic: %w", err) - fmt.Println(err) - return - } - - defer sub.Cancel() - - for { - for { - select { - case <-ctx.Done(): - return - default: - envelope, err := sub.Next(ctx) - if err != nil { - err := fmt.Errorf("Failed to receive Pubsub message: %w", err) - fmt.Println(err) - return - } - - // Definition for GossipMessage is generated by Protobuf, see `p2p.proto`. - var msg GossipMessage - err = proto.Unmarshal(envelope.Data, &msg) - - switch msg.Message.(type) { - case *GossipMessage_SignedObservation: - case *GossipMessage_SignedVaaWithQuorum: - vaaBytes := msg.GetSignedVaaWithQuorum().GetVaa() - cBytes := C.CBytes(vaaBytes) - C.invoke(f, C.observation_t{ - vaa: (*C.char)(cBytes), - vaa_len: C.size_t(len(vaaBytes)), - }) - C.free(cBytes) - } - } - } - } - } - - recoverRerun = func() { - // Print the error if any and recall routine - if err := recover(); err != nil { - fmt.Fprintf(os.Stderr, "p2p.go error: %v\n", err) - } - - // Sleep for 1 second if the time elapsed is less than 30 seconds - // to avoid spamming the network with requests. - elapsed := time.Duration(time.Now().UnixNano() - startTime) - if elapsed < 30*time.Second { - time.Sleep(1 * time.Second) - } - - go routine() - } - - go routine() -} - -func main() { -} diff --git a/hermes/src/network/p2p.rs b/hermes/src/network/p2p.rs deleted file mode 100644 index 1fecd4f06b..0000000000 --- a/hermes/src/network/p2p.rs +++ /dev/null @@ -1,183 +0,0 @@ -//! This module implements P2P connections for the PythNet service. -//! -//! Originally this code contained a full implementation of a libp2p node, but due to the fact that -//! QUIC+TLS is not yet supported by the Rust ecosystem, we had to resort to replacing this with a -//! small library implemented in Go that connects to the Wormhole network. -//! -//! This works and allows us to keep the program structure the same as if we could do it natively -//! in Rust but it should absolutely be removed once QUIC+TLS is supported in Rust. The change to -//! the program structure should be minimal, and users of the service won't need to change any of -//! their infrastructure. - -use { - crate::{ - config::RunOptions, - state::State, - wormhole::{ - forward_vaa, - VaaBytes, - }, - }, - anyhow::Result, - libp2p::Multiaddr, - std::{ - ffi::{ - c_char, - CString, - }, - sync::{ - atomic::Ordering, - Arc, - }, - }, - tokio::sync::{ - mpsc::{ - Receiver, - Sender, - }, - Mutex, - }, -}; - -extern "C" { - fn RegisterObservationCallback( - cb: extern "C" fn(o: ObservationC), - network_id: *const c_char, - bootstrap_addrs: *const c_char, - listen_addrs: *const c_char, - ); -} - -// An `Observation` C type passed back to us from Go. -#[repr(C)] -#[derive(Debug, Clone, Copy)] -pub struct ObservationC { - pub vaa: *const u8, - pub vaa_len: usize, -} - -const CHANNEL_SIZE: usize = 1000; - -// A Static Channel to pipe the `Observation` from the callback into the local Rust handler for -// observation messages. It has to be static for now because there's no way to capture state in -// the callback passed into Go-land. -lazy_static::lazy_static! { - pub static ref OBSERVATIONS: ( - Mutex>, - Mutex>, - ) = { - let (tx, rc) = tokio::sync::mpsc::channel(CHANNEL_SIZE); - (Mutex::new(tx), Mutex::new(rc)) - }; -} - -/// This function is passed as a callback to the Go libp2p runtime, it passes observations back and -/// acts as a proxy forwarding these observations into our main loop. -#[no_mangle] -#[tracing::instrument(skip(o))] -extern "C" fn proxy(o: ObservationC) { - // Create a fixed slice from the pointer and length. - let vaa = unsafe { std::slice::from_raw_parts(o.vaa, o.vaa_len) }.to_owned(); - - // The chances of the mutex getting poisioned is very low and if it happens there is no way for - // us to recover from it. - if OBSERVATIONS - .0 - .blocking_lock() - .blocking_send(vaa) - .map_err(|_| ()) - .is_err() - { - crate::SHOULD_EXIT.store(true, Ordering::Release); - tracing::error!("Failed to lock p2p observation channel or to send observation."); - } -} - -/// This function handles bootstrapping libp2p (in Go) and listening for Wormhole Observations. -/// -/// TODO: handle_message should be capable of handling more than just Observations. But we don't -/// have our own P2P network, we pass it in to keep the code structure and read directly from the -/// OBSERVATIONS channel in the RPC for now. -#[tracing::instrument(skip(wh_bootstrap_addrs, wh_listen_addrs))] -pub fn bootstrap( - network_id: String, - wh_bootstrap_addrs: Vec, - wh_listen_addrs: Vec, -) -> Result<()> { - let network_id_cstr = CString::new(network_id)?; - let wh_bootstrap_addrs_cstr = CString::new( - wh_bootstrap_addrs - .iter() - .map(|a| a.to_string()) - .collect::>() - .join(","), - )?; - let wh_listen_addrs_cstr = CString::new( - wh_listen_addrs - .iter() - .map(|a| a.to_string()) - .collect::>() - .join(","), - )?; - - // Launch the Go LibP2P Reactor. - unsafe { - RegisterObservationCallback( - proxy as extern "C" fn(observation: ObservationC), - network_id_cstr.as_ptr(), - wh_bootstrap_addrs_cstr.as_ptr(), - wh_listen_addrs_cstr.as_ptr(), - ); - } - - tracing::info!("Registered observation callback."); - - Ok(()) -} - -// Spawn's the P2P layer as a separate thread via Go. -#[tracing::instrument(skip(opts, state))] -pub async fn spawn(opts: RunOptions, state: Arc) -> Result<()> { - tracing::info!(listeners = ?opts.wormhole.listen_addrs, "Starting P2P Server"); - - std::thread::spawn(|| { - if bootstrap( - opts.wormhole.network_id, - opts.wormhole.bootstrap_addrs, - opts.wormhole.listen_addrs, - ) - .is_err() - { - tracing::error!("Failed to bootstrap P2P server."); - crate::SHOULD_EXIT.store(true, Ordering::Release); - } - }); - - tokio::spawn(async move { - // Listen in the background for new VAA's from the p2p layer - // and update the state accordingly. - while !crate::SHOULD_EXIT.load(Ordering::Acquire) { - let vaa_bytes = { - let mut observation = OBSERVATIONS.1.lock().await; - - match observation.recv().await { - Some(vaa) => vaa, - None => { - // This should never happen, but if it does, we want to shutdown the - // application as it is unrecoverable. - tracing::error!("Failed to receive p2p observation. Channel closed."); - crate::SHOULD_EXIT.store(true, Ordering::Release); - return Err(anyhow::anyhow!("Failed to receive p2p observation.")); - } - } - }; - - forward_vaa(state.clone(), vaa_bytes).await; - } - - tracing::info!("Shutting down P2P server..."); - Ok::<(), anyhow::Error>(()) - }); - - Ok(()) -} diff --git a/hermes/src/network/pythnet.rs b/hermes/src/network/pythnet.rs index d7843f12a4..e026c8802c 100644 --- a/hermes/src/network/pythnet.rs +++ b/hermes/src/network/pythnet.rs @@ -9,13 +9,13 @@ use { Update, }, config::RunOptions, - state::State, - wormhole::{ + network::wormhole::{ update_guardian_set, BridgeData, GuardianSet, GuardianSetData, }, + state::State, }, anyhow::{ anyhow, diff --git a/hermes/src/network/wormhole.rs b/hermes/src/network/wormhole.rs new file mode 100644 index 0000000000..ec725c7948 --- /dev/null +++ b/hermes/src/network/wormhole.rs @@ -0,0 +1,357 @@ +//! Wormhole gRPC Service +//! +//! This module defines a service that connects to a Wormhole gRPC server and subscribes to VAA +//! updates. These updates are then stored in Hermes and made available to the rest of the +//! application. + +use { + crate::{ + config::RunOptions, + state::State, + }, + anyhow::{ + anyhow, + ensure, + Result, + }, + chrono::NaiveDateTime, + futures::StreamExt, + proto::spy::v1::{ + filter_entry::Filter, + spy_rpc_service_client::SpyRpcServiceClient, + EmitterFilter, + FilterEntry, + SubscribeSignedVaaRequest, + }, + pythnet_sdk::{ + wire::v1::{ + WormholeMessage, + WormholePayload, + }, + ACCUMULATOR_EMITTER_ADDRESS, + }, + secp256k1::{ + ecdsa::{ + RecoverableSignature, + RecoveryId, + }, + Message, + Secp256k1, + }, + serde_wormhole::RawMessage, + sha3::{ + Digest, + Keccak256, + }, + std::sync::{ + atomic::Ordering, + Arc, + }, + tonic::Request, + wormhole_sdk::{ + vaa::{ + Body, + Header, + }, + Address, + Chain, + Vaa, + }, +}; + +const OBSERVED_CACHE_SIZE: usize = 1000; + +pub type VaaBytes = Vec; + +#[derive(Eq, PartialEq, Clone, Hash, Debug)] +pub struct GuardianSet { + pub keys: Vec<[u8; 20]>, +} + +impl std::fmt::Display for GuardianSet { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[")?; + for (i, key) in self.keys.iter().enumerate() { + // Comma seperated printing of the keys using hex encoding. + if i != 0 { + write!(f, ", ")?; + } + + write!(f, "{}", hex::encode(key))?; + } + write!(f, "]") + } +} + +/// BridgeData extracted from wormhole bridge account, due to no API. +#[derive(borsh::BorshDeserialize)] +#[allow(dead_code)] +pub struct BridgeData { + pub guardian_set_index: u32, + pub last_lamports: u64, + pub config: BridgeConfig, +} + +/// BridgeConfig extracted from wormhole bridge account, due to no API. +#[derive(borsh::BorshDeserialize)] +#[allow(dead_code)] +pub struct BridgeConfig { + pub guardian_set_expiration_time: u32, + pub fee: u64, +} + +/// GuardianSetData extracted from wormhole bridge account, due to no API. +#[derive(borsh::BorshDeserialize)] +pub struct GuardianSetData { + pub index: u32, + pub keys: Vec<[u8; 20]>, + pub creation_time: u32, + pub expiration_time: u32, +} + +/// Update the guardian set with the given ID in the state. +#[tracing::instrument(skip(state, guardian_set))] +pub async fn update_guardian_set(state: &State, id: u32, guardian_set: GuardianSet) { + let mut guardian_sets = state.guardian_set.write().await; + guardian_sets.insert(id, guardian_set); +} + +/// Wormhole `prost` compiled definitions. +/// +/// We use `prost` to build the protobuf definitions from the upstream Wormhole repository. Which +/// outputs `.rs` files during execution of build.rs, these can be included into the source while +/// compilation is happening. +/// +/// The following module structure must match the protobuf definitions, so that the generated code +/// can correctly reference modules from each other. +mod proto { + pub mod node { + pub mod v1 { + include!(concat!(env!("OUT_DIR"), "/node.v1.rs")); + } + } + + pub mod gossip { + pub mod v1 { + include!(concat!(env!("OUT_DIR"), "/gossip.v1.rs")); + } + } + + pub mod spy { + pub mod v1 { + include!(concat!(env!("OUT_DIR"), "/spy.v1.rs")); + } + } + + pub mod publicrpc { + pub mod v1 { + include!(concat!(env!("OUT_DIR"), "/publicrpc.v1.rs")); + } + } +} + +// Launches the Wormhole gRPC service. +#[tracing::instrument(skip(opts, state))] +pub async fn spawn(opts: RunOptions, state: Arc) -> Result<()> { + while !crate::SHOULD_EXIT.load(Ordering::Acquire) { + if let Err(e) = run(opts.clone(), state.clone()).await { + tracing::error!(error = ?e, "Wormhole gRPC service failed."); + } + + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + } + Ok(()) +} + +#[tracing::instrument(skip(opts, state))] +async fn run(opts: RunOptions, state: Arc) -> Result<()> { + let mut client = SpyRpcServiceClient::connect(opts.wormhole.spy_rpc_addr).await?; + let mut stream = client + .subscribe_signed_vaa(Request::new(SubscribeSignedVaaRequest { + filters: vec![FilterEntry { + filter: Some(Filter::EmitterFilter(EmitterFilter { + chain_id: Into::::into(Chain::Pythnet).into(), + emitter_address: hex::encode(ACCUMULATOR_EMITTER_ADDRESS), + })), + }], + })) + .await? + .into_inner(); + + while let Some(Ok(message)) = stream.next().await { + if crate::SHOULD_EXIT.load(Ordering::Acquire) { + return Ok(()); + } + + if let Err(e) = process_message(state.clone(), message.vaa_bytes).await { + tracing::debug!(error = ?e, "Skipped VAA."); + } + } + + Ok(()) +} + +/// Process a message received via a Wormhole gRPC connection. +#[tracing::instrument(skip(state, vaa_bytes))] +pub async fn process_message(state: Arc, vaa_bytes: Vec) -> Result<()> { + let vaa = serde_wormhole::from_slice::>(&vaa_bytes)?; + + // Log VAA Processing. + let vaa_timestamp = NaiveDateTime::from_timestamp_opt(vaa.timestamp as i64, 0); + let vaa_timestamp = vaa_timestamp.unwrap(); + let vaa_timestamp = vaa_timestamp.format("%Y-%m-%dT%H:%M:%S.%fZ").to_string(); + let slot = match WormholeMessage::try_from_bytes(vaa.payload)?.payload { + WormholePayload::Merkle(proof) => proof.slot, + }; + tracing::info!(slot = slot, vaa_timestamp = vaa_timestamp, "Observed VAA"); + + // Check VAA hasn't already been seen. + ensure!( + !state.observed_vaa_seqs.read().await.contains(&vaa.sequence), + "Previously observed VAA: {}", + vaa.sequence + ); + + // Check VAA source is valid, we don't want to process other protocols VAAs. + validate_vaa_source(&vaa)?; + + // Verify the VAA has been signed by a known guardian set. + let vaa = verify_vaa( + state + .guardian_set + .read() + .await + .get(&vaa.guardian_set_index) + .ok_or_else(|| anyhow!("Unknown guardian set: {}", vaa.guardian_set_index))?, + vaa, + )?; + + // Finally, store the resulting VAA in Hermes. + store_vaa(state.clone(), vaa.sequence, vaa_bytes).await?; + + Ok(()) +} + +// Rejects VAAs from invalid sources. +#[tracing::instrument(skip(vaa))] +fn validate_vaa_source(vaa: &Vaa<&RawMessage>) -> Result<()> { + ensure!( + vaa.emitter_chain == Chain::Pythnet, + "VAA from non-Pythnet Chain." + ); + ensure!( + vaa.emitter_address == Address(ACCUMULATOR_EMITTER_ADDRESS), + "VAA from non-Pythnet Emitter: {}", + vaa.emitter_address + ); + Ok(()) +} + +/// Validate a VAA extracted from a Wormhole gRPC message. +#[tracing::instrument(skip(guardian_set, vaa))] +pub fn verify_vaa<'a>( + guardian_set: &GuardianSet, + vaa: Vaa<&'a RawMessage>, +) -> Result> { + let (header, body): (Header, Body<&RawMessage>) = vaa.into(); + let digest = body.digest()?; + + // Ideally we need to test the signatures but currently Wormhole doesn't give us any easy way + // to do it, so we just bypass the check in tests. + let quorum = if cfg!(test) { + 0 + } else { + (guardian_set.keys.len() * 2) / 3 + 1 + }; + + let secp = Secp256k1::new(); + let mut last_signer_id: Option = None; + let mut signatures = vec![]; + for signature in header.signatures.into_iter() { + // Do not collect more signatures than necessary to reduce on-chain gas spent during + // signature verification. + if signatures.len() >= quorum { + break; + } + + let signer_id: usize = signature.index.into(); + if signer_id >= guardian_set.keys.len() { + return Err(anyhow!( + "Signer ID is out of range. Signer ID: {}, guardian set size: {}", + signer_id, + guardian_set.keys.len() + )); + } + + // On-chain verification expects signatures to be sorted by signer ID. We can exit early if + // this constraint is violated. + if let Some(true) = last_signer_id.map(|v| v >= signer_id) { + return Err(anyhow!( + "Signatures are not sorted by signer ID. Last signer ID: {:?}, current signer ID: {}", + last_signer_id, + signer_id + )); + } + + // Recover the public key from an [u8; 65] serialized ECDSA signature in (v, r, s) format + let recid = RecoveryId::from_i32(signature.signature[64].into())?; + + // An address is the last 20 bytes of the Keccak256 hash of the uncompressed public key. + let pubkey: &[u8; 65] = &secp + .recover_ecdsa( + &Message::from_slice(&digest.secp256k_hash)?, + &RecoverableSignature::from_compact(&signature.signature[..64], recid)?, + )? + .serialize_uncompressed(); + + // The address is the last 20 bytes of the Keccak256 hash of the public key + let address: [u8; 32] = Keccak256::new_with_prefix(&pubkey[1..]).finalize().into(); + let address: [u8; 20] = address[address.len() - 20..].try_into()?; + + // Confirm the recovered address matches an address in the guardian set. + if guardian_set.keys.get(signer_id) == Some(&address) { + signatures.push(signature); + } + + last_signer_id = Some(signer_id); + } + + // Check if we have enough correct signatures + if signatures.len() < quorum { + return Err(anyhow!( + "Not enough correct signatures. Expected {:?}, received {:?}", + quorum, + signatures.len() + )); + } + + Ok(( + Header { + signatures, + ..header + }, + body, + ) + .into()) +} + +#[tracing::instrument(skip(state, vaa_bytes))] +pub async fn store_vaa(state: Arc, sequence: u64, vaa_bytes: Vec) -> Result<()> { + // Check VAA hasn't already been seen, this may have been checked previously + // but due to async nature It's possible other threads have mutated the state + // since this VAA started processing. + let mut observed_vaa_seqs = state.observed_vaa_seqs.write().await; + ensure!( + !observed_vaa_seqs.contains(&sequence), + "Previously observed VAA: {}", + sequence, + ); + + // Clear old cached VAA sequences. + while observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE { + observed_vaa_seqs.pop_first(); + } + + // Hand the VAA to the aggregate store. + crate::aggregate::store_update(&state, crate::aggregate::Update::Vaa(vaa_bytes)).await +} diff --git a/hermes/src/state.rs b/hermes/src/state.rs index 8a40629913..b9f3b7b6aa 100644 --- a/hermes/src/state.rs +++ b/hermes/src/state.rs @@ -7,7 +7,7 @@ use { AggregateState, AggregationEvent, }, - wormhole::GuardianSet, + network::wormhole::GuardianSet, }, reqwest::Url, std::{ @@ -69,7 +69,7 @@ impl State { pub mod test { use { super::*, - crate::wormhole::update_guardian_set, + crate::network::wormhole::update_guardian_set, tokio::sync::mpsc::Receiver, }; diff --git a/hermes/src/wormhole.rs b/hermes/src/wormhole.rs deleted file mode 100644 index 975daf907a..0000000000 --- a/hermes/src/wormhole.rs +++ /dev/null @@ -1,258 +0,0 @@ -use { - super::State, - crate::aggregate::Update, - anyhow::{ - anyhow, - Result, - }, - pythnet_sdk::wire::v1::{ - WormholeMessage, - WormholePayload, - }, - secp256k1::{ - ecdsa::{ - RecoverableSignature, - RecoveryId, - }, - Message, - Secp256k1, - }, - serde_wormhole::RawMessage, - sha3::{ - Digest, - Keccak256, - }, - std::sync::Arc, - tracing::trace, - wormhole_sdk::{ - vaa::{ - Body, - Header, - }, - Address, - Chain, - Vaa, - }, -}; - - -pub type VaaBytes = Vec; -const OBSERVED_CACHE_SIZE: usize = 1000; - -#[derive(Eq, PartialEq, Clone, Hash, Debug)] -pub struct GuardianSet { - pub keys: Vec<[u8; 20]>, -} - -impl std::fmt::Display for GuardianSet { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "[")?; - for (i, key) in self.keys.iter().enumerate() { - // Comma seperated printing of the keys using hex encoding. - if i != 0 { - write!(f, ", ")?; - } - - write!(f, "{}", hex::encode(key))?; - } - write!(f, "]") - } -} - -/// BridgeData extracted from wormhole bridge account, due to no API. -#[derive(borsh::BorshDeserialize)] -#[allow(dead_code)] -pub struct BridgeData { - pub guardian_set_index: u32, - pub last_lamports: u64, - pub config: BridgeConfig, -} - -/// BridgeConfig extracted from wormhole bridge account, due to no API. -#[derive(borsh::BorshDeserialize)] -#[allow(dead_code)] -pub struct BridgeConfig { - pub guardian_set_expiration_time: u32, - pub fee: u64, -} - -/// GuardianSetData extracted from wormhole bridge account, due to no API. -#[derive(borsh::BorshDeserialize)] -pub struct GuardianSetData { - pub index: u32, - pub keys: Vec<[u8; 20]>, - pub creation_time: u32, - pub expiration_time: u32, -} - -/// Verifies a VAA to ensure it is signed by the Wormhole guardian set. -pub async fn verify_vaa<'a>( - state: &State, - vaa: Vaa<&'a RawMessage>, -) -> Result> { - let (header, body): (Header, Body<&RawMessage>) = vaa.into(); - let digest = body.digest()?; - let guardian_set = state.guardian_set.read().await; - let guardian_set = guardian_set - .get(&header.guardian_set_index) - .ok_or_else(|| { - anyhow!( - "Message signed by an unknown guardian set: {}", - header.guardian_set_index - ) - })?; - - // TODO: This check bypass checking the signatures on tests. - // Ideally we need to test the signatures but currently Wormhole - // doesn't give us any easy way for it. - let quorum = if cfg!(test) { - 0 - } else { - (guardian_set.keys.len() * 2) / 3 + 1 - }; - - let mut last_signer_id: Option = None; - let mut signatures = vec![]; - for signature in header.signatures.into_iter() { - // Do not collect more signatures than necessary to reduce - // on-chain gas spent on signature verification. - if signatures.len() >= quorum { - break; - } - - let signer_id: usize = signature.index.into(); - - if signer_id >= guardian_set.keys.len() { - return Err(anyhow!( - "Signer ID is out of range. Signer ID: {}, guardian set size: {}", - signer_id, - guardian_set.keys.len() - )); - } - - if let Some(true) = last_signer_id.map(|v| v >= signer_id) { - return Err(anyhow!( - "Signatures are not sorted by signer ID. Last signer ID: {:?}, current signer ID: {}", - last_signer_id, - signer_id - )); - } - - let sig = signature.signature; - - // Recover the public key from ecdsa signature from [u8; 65] that has (v, r, s) format - let recid = RecoveryId::from_i32(sig[64].into())?; - - let secp = Secp256k1::new(); - - // To get the address we need to use the uncompressed public key - let pubkey: &[u8; 65] = &secp - .recover_ecdsa( - &Message::from_slice(&digest.secp256k_hash)?, - &RecoverableSignature::from_compact(&sig[..64], recid)?, - )? - .serialize_uncompressed(); - - // The address is the last 20 bytes of the Keccak256 hash of the public key - let mut keccak = Keccak256::new(); - keccak.update(&pubkey[1..]); - let address: [u8; 32] = keccak.finalize().into(); - let address: [u8; 20] = address[address.len() - 20..].try_into()?; - - if guardian_set.keys.get(signer_id) == Some(&address) { - signatures.push(signature); - } - - last_signer_id = Some(signer_id); - } - - // Check if we have enough correct signatures - if signatures.len() < quorum { - return Err(anyhow!( - "Not enough correct signatures. Expected {:?}, received {:?}", - quorum, - signatures.len() - )); - } - - Ok(( - Header { - signatures, - ..header - }, - body, - ) - .into()) -} - -/// Update the guardian set with the given ID in the state. -pub async fn update_guardian_set(state: &State, id: u32, guardian_set: GuardianSet) { - let mut guardian_sets = state.guardian_set.write().await; - guardian_sets.insert(id, guardian_set); -} - -/// Process a VAA from the Wormhole p2p and aggregate it if it is -/// verified and is new and belongs to the Accumulator. -pub async fn forward_vaa(state: Arc, vaa_bytes: VaaBytes) { - // Deserialize VAA - let vaa = match serde_wormhole::from_slice::>(&vaa_bytes) { - Ok(vaa) => vaa, - Err(e) => { - tracing::error!(error = ?e, "Failed to deserialize VAA."); - return; - } - }; - - if vaa.emitter_chain != Chain::Pythnet - || vaa.emitter_address != Address(pythnet_sdk::ACCUMULATOR_EMITTER_ADDRESS) - { - return; // Ignore VAA from other emitters - } - - // Get the slot from the VAA. - let slot = match WormholeMessage::try_from_bytes(vaa.payload) - .unwrap() - .payload - { - WormholePayload::Merkle(proof) => proof.slot, - }; - - // Find the observation time for said VAA (which is a unix timestamp) and serialize as a ISO 8601 string. - let vaa_timestamp = vaa.timestamp; - let vaa_timestamp = chrono::NaiveDateTime::from_timestamp_opt(vaa_timestamp as i64, 0).unwrap(); - let vaa_timestamp = vaa_timestamp.format("%Y-%m-%dT%H:%M:%S.%fZ").to_string(); - tracing::info!(slot = slot, vaa_timestamp = vaa_timestamp, "Observed VAA"); - - if state.observed_vaa_seqs.read().await.contains(&vaa.sequence) { - return; // Ignore VAA if we have already seen it - } - - let vaa = match verify_vaa(&state, vaa).await { - Ok(vaa) => vaa, - Err(e) => { - trace!(error = ?e, "Ignoring invalid VAA."); - return; - } - }; - - { - let mut observed_vaa_seqs = state.observed_vaa_seqs.write().await; - - // Check again if we have already seen the VAA. Due to concurrency - // the above check might not catch all the cases. - if observed_vaa_seqs.contains(&vaa.sequence) { - return; // Ignore VAA if we have already seen it - } - observed_vaa_seqs.insert(vaa.sequence); - while observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE { - observed_vaa_seqs.pop_first(); - } - } - - let state = state.clone(); - tokio::spawn(async move { - if let Err(e) = crate::aggregate::store_update(&state, Update::Vaa(vaa_bytes)).await { - tracing::error!(error = ?e, "Failed to process VAA."); - } - }); -}