From 1c2c3771f9c55be39f5a922cff4d2bd22455d787 Mon Sep 17 00:00:00 2001 From: dgarus Date: Thu, 16 Nov 2023 14:23:45 +0300 Subject: [PATCH 1/7] attempt to extend QUIC transport impl --- Cargo.lock | 91 ++++++++++-- examples/ping/Cargo.toml | 2 +- examples/ping/src/main.rs | 4 +- transports/quic/Cargo.toml | 11 ++ transports/quic/src/config.rs | 32 ++++- transports/quic/src/connection/connecting.rs | 5 +- transports/quic/src/lib.rs | 4 + transports/quic/src/transport.rs | 136 +++++++++++++----- transports/quic/src/webtransport.rs | 9 ++ .../quic/src/webtransport/connecting.rs | 126 ++++++++++++++++ .../quic/src/webtransport/connection.rs | 127 ++++++++++++++++ .../quic/src/webtransport/fingerprint.rs | 77 ++++++++++ transports/quic/src/webtransport/stream.rs | 69 +++++++++ transports/tls/Cargo.toml | 1 + transports/tls/src/lib.rs | 21 +++ 15 files changed, 664 insertions(+), 51 deletions(-) create mode 100644 transports/quic/src/webtransport.rs create mode 100644 transports/quic/src/webtransport/connecting.rs create mode 100644 transports/quic/src/webtransport/connection.rs create mode 100644 transports/quic/src/webtransport/fingerprint.rs create mode 100644 transports/quic/src/webtransport/stream.rs diff --git a/Cargo.lock b/Cargo.lock index aa8ad584ade..a07c370f92a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1259,6 +1259,15 @@ dependencies = [ "rusticata-macros", ] +[[package]] +name = "deranged" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3" +dependencies = [ + "powerfmt", +] + [[package]] name = "digest" version = "0.9.0" @@ -1504,9 +1513,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" [[package]] name = "ff" @@ -1848,6 +1857,48 @@ dependencies = [ "tracing", ] +[[package]] +name = "h3" +version = "0.0.3" +source = "git+https://github.com/hyperium/h3#7f099d4f5d363568c15abc51bb785ea75304630b" +dependencies = [ + "bytes", + "fastrand 2.0.1", + "futures-util", + "http", + "pin-project-lite", + "tokio", + "tracing", +] + +[[package]] +name = "h3-quinn" +version = "0.0.4" +source = "git+https://github.com/hyperium/h3#7f099d4f5d363568c15abc51bb785ea75304630b" +dependencies = [ + "bytes", + "futures", + "h3", + "quinn", + "quinn-proto", + "tokio", + "tokio-util", +] + +[[package]] +name = "h3-webtransport" +version = "0.1.0" +source = "git+https://github.com/hyperium/h3#7f099d4f5d363568c15abc51bb785ea75304630b" +dependencies = [ + "bytes", + "futures-util", + "h3", + "http", + "pin-project-lite", + "tokio", + "tracing", +] + [[package]] name = "half" version = "1.8.2" @@ -2014,9 +2065,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.9" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" +checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" dependencies = [ "bytes", "fnv", @@ -2977,6 +3028,12 @@ dependencies = [ "bytes", "futures", "futures-timer", + "h3", + "h3-quinn", + "h3-webtransport", + "hex", + "hex-literal", + "http", "if-watch", "libp2p-core", "libp2p-identity", @@ -2989,10 +3046,13 @@ dependencies = [ "quickcheck", "quinn", "rand 0.8.5", + "rcgen", "ring 0.16.20", "rustls 0.21.8", + "sha2 0.10.8", "socket2 0.5.5", "thiserror", + "time", "tokio", "tracing", "tracing-subscriber", @@ -3201,6 +3261,7 @@ dependencies = [ "rustls 0.21.8", "rustls-webpki", "thiserror", + "time", "tokio", "x509-parser", "yasna", @@ -4268,6 +4329,12 @@ dependencies = [ "universal-hash 0.5.1", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -5569,7 +5636,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" dependencies = [ "cfg-if", - "fastrand 2.0.0", + "fastrand 2.0.1", "redox_syscall 0.4.1", "rustix 0.38.21", "windows-sys", @@ -5654,11 +5721,13 @@ dependencies = [ [[package]] name = "time" -version = "0.3.23" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59e399c068f43a5d116fedaf73b203fa4f9c519f17e2b34f63221d3792f81446" +checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" dependencies = [ + "deranged", "itoa", + "powerfmt", "serde", "time-core", "time-macros", @@ -5666,15 +5735,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.10" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96ba15a897f3c86766b757e5ac7221554c6750054d74d5b28844fce5fb36a6c4" +checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" dependencies = [ "time-core", ] diff --git a/examples/ping/Cargo.toml b/examples/ping/Cargo.toml index 58cee54409e..88ca2fa657a 100644 --- a/examples/ping/Cargo.toml +++ b/examples/ping/Cargo.toml @@ -10,7 +10,7 @@ release = false [dependencies] futures = "0.3.29" -libp2p = { path = "../../libp2p", features = ["noise", "ping", "tcp", "tokio", "yamux"] } +libp2p = { path = "../../libp2p", features = ["noise", "ping", "tcp", "quic", "tokio", "yamux"] } tokio = { version = "1.33.0", features = ["full"] } tracing = "0.1.37" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/ping/src/main.rs b/examples/ping/src/main.rs index 911b0384f89..45638171390 100644 --- a/examples/ping/src/main.rs +++ b/examples/ping/src/main.rs @@ -38,13 +38,15 @@ async fn main() -> Result<(), Box> { noise::Config::new, yamux::Config::default, )? + .with_quic() .with_behaviour(|_| ping::Behaviour::default())? .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX))) .build(); // Tell the swarm to listen on all interfaces and a random, OS-assigned // port. - swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; + // swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; + swarm.listen_on("/ip4/0.0.0.0/udp/0/quic".parse()?)?; // Dial the peer identified by the multi-address given as the second // command-line argument, if any. diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml index 4ce23bf1207..bceee66e96e 100644 --- a/transports/quic/Cargo.toml +++ b/transports/quic/Cargo.toml @@ -26,6 +26,16 @@ tokio = { version = "1.33.0", default-features = false, features = ["net", "rt", tracing = "0.1.37" socket2 = "0.5.5" ring = "0.16.20" +sha2 = "0.10.7" +hex = "0.4" + +# WebTransport +http = "0.2.11" +h3 = { git = "https://github.com/hyperium/h3" } +h3-quinn = { git = "https://github.com/hyperium/h3" } +h3-webtransport = { git = "https://github.com/hyperium/h3" } +rcgen = "0.11.3" +time = "0.3" [features] tokio = ["dep:tokio", "if-watch/tokio", "quinn/runtime-tokio"] @@ -48,6 +58,7 @@ libp2p-yamux = { workspace = true } quickcheck = "1" tokio = { version = "1.33.0", features = ["macros", "rt-multi-thread", "time"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] } +hex-literal = "0.4" [[test]] name = "stream_compliance" diff --git a/transports/quic/src/config.rs b/transports/quic/src/config.rs index 5351a537c76..ae9ff521ba4 100644 --- a/transports/quic/src/config.rs +++ b/transports/quic/src/config.rs @@ -20,6 +20,8 @@ use quinn::VarInt; use std::{sync::Arc, time::Duration}; +use libp2p_core::multihash::Multihash; +use crate::webtransport::fingerprint::Fingerprint; /// Config for the transport. #[derive(Clone)] @@ -57,10 +59,21 @@ pub struct Config { /// As client the version is chosen based on the remote's address. pub support_draft_29: bool, + // pub web_transport_config: WebTransportConfig, + /// TLS client config for the inner [`quinn::ClientConfig`]. client_tls_config: Arc, + //todo At first boot of the node, it creates one self-signed certificate with a validity of 14 days, + // starting immediately, and another certificate with the 14 day validity period starting on + // the expiry date of the first certificate. + // The node advertises a multiaddress containing the certificate hashes of these two certificates. + // Once the first certificate has expired, the node starts using the already generated next certificate. + // At the same time, it again generates a new certificate for the following period and updates + // the multiaddress it advertises. /// TLS server config for the inner [`quinn::ServerConfig`]. server_tls_config: Arc, + pub(crate) cert_hash: Multihash<64>, + /// Libp2p identity of the node. keypair: libp2p_identity::Keypair, } @@ -69,10 +82,26 @@ impl Config { /// Creates a new configuration object with default values. pub fn new(keypair: &libp2p_identity::Keypair) -> Self { let client_tls_config = Arc::new(libp2p_tls::make_client_config(keypair, None).unwrap()); - let server_tls_config = Arc::new(libp2p_tls::make_server_config(keypair).unwrap()); + + //todo set up cert's dates + let (cert, private_key) = libp2p_tls::certificate::generate(keypair).unwrap(); + + let server_tls_config = Arc::new( + libp2p_tls::make_server_config_with_cert( + cert.clone(), private_key, vec![ + b"libp2p".to_vec(), + b"h3".to_vec(), + b"h3-32".to_vec(), + b"h3-31".to_vec(), + b"h3-30".to_vec(), + b"h3-29".to_vec(), + ] + ).unwrap() + ); Self { client_tls_config, server_tls_config, + cert_hash: Fingerprint::from_certificate(cert.as_ref()).to_multihash(), support_draft_29: false, handshake_timeout: Duration::from_secs(5), max_idle_timeout: 30 * 1000, @@ -100,6 +129,7 @@ impl From for QuinnConfig { let Config { client_tls_config, server_tls_config, + cert_hash: _, max_idle_timeout, max_concurrent_stream_limit, keep_alive_interval, diff --git a/transports/quic/src/connection/connecting.rs b/transports/quic/src/connection/connecting.rs index 141f0b5542e..4881b6f6f6c 100644 --- a/transports/quic/src/connection/connecting.rs +++ b/transports/quic/src/connection/connecting.rs @@ -33,6 +33,7 @@ use std::{ task::{Context, Poll}, time::Duration, }; +use libp2p_core::muxing::StreamMuxerBox; /// A QUIC connection currently being negotiated. #[derive(Debug)] @@ -67,7 +68,7 @@ impl Connecting { } impl Future for Connecting { - type Output = Result<(PeerId, Connection), Error>; + type Output = Result<(PeerId, StreamMuxerBox), Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let connection = match futures::ready!(self.connecting.poll_unpin(cx)) { @@ -77,6 +78,6 @@ impl Future for Connecting { let peer_id = Self::remote_peer_id(&connection); let muxer = Connection::new(connection); - Poll::Ready(Ok((peer_id, muxer))) + Poll::Ready(Ok((peer_id, StreamMuxerBox::new(muxer)))) } } diff --git a/transports/quic/src/lib.rs b/transports/quic/src/lib.rs index 7ae649b6914..587b34796b2 100644 --- a/transports/quic/src/lib.rs +++ b/transports/quic/src/lib.rs @@ -62,6 +62,7 @@ mod connection; mod hole_punching; mod provider; mod transport; +mod webtransport; use std::net::SocketAddr; @@ -101,6 +102,9 @@ pub enum Error { /// Error when holepunching for a remote is already in progress #[error("Already punching hole for {0}).")] HolePunchInProgress(SocketAddr), + + #[error("WebTransport error.")] + WebTransportError, } /// Dialing a remote peer failed. diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index feda501464f..efaad36b38d 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -21,7 +21,7 @@ use crate::config::{Config, QuinnConfig}; use crate::hole_punching::hole_puncher; use crate::provider::Provider; -use crate::{ConnectError, Connecting, Connection, Error}; +use crate::{ConnectError, Connecting, Error, webtransport}; use futures::channel::oneshot; use futures::future::{BoxFuture, Either}; @@ -31,11 +31,7 @@ use futures::{prelude::*, stream::SelectAll}; use if_watch::IfEvent; -use libp2p_core::{ - multiaddr::{Multiaddr, Protocol}, - transport::{ListenerId, TransportError, TransportEvent}, - Transport, -}; +use libp2p_core::{multiaddr::{Multiaddr, Protocol}, muxing::StreamMuxerBox, transport::{ListenerId, TransportError, TransportEvent}, Transport}; use libp2p_identity::PeerId; use socket2::{Domain, Socket, Type}; use std::collections::hash_map::{DefaultHasher, Entry}; @@ -49,6 +45,7 @@ use std::{ pin::Pin, task::{Context, Poll, Waker}, }; +use libp2p_core::multihash::Multihash; /// Implementation of the [`Transport`] trait for QUIC. /// @@ -67,6 +64,10 @@ pub struct GenTransport { quinn_config: QuinnConfig, /// Timeout for the [`Connecting`] future. handshake_timeout: Duration, + + // todo Temporary solution + cert_hash: Multihash<64>, + /// Whether draft-29 is supported for dialing and listening. support_draft_29: bool, /// Streams of active [`Listener`]s. @@ -76,7 +77,12 @@ pub struct GenTransport { /// Waker to poll the transport again when a new dialer or listener is added. waker: Option, /// Holepunching attempts - hole_punch_attempts: HashMap>, + hole_punch_attempts: HashMap< + SocketAddr, + oneshot::Sender< + BoxFuture<'static, Result< as Transport>::Output, as Transport>::Error>> + > + >, } impl GenTransport

{ @@ -84,6 +90,7 @@ impl GenTransport

{ pub fn new(config: Config) -> Self { let handshake_timeout = config.handshake_timeout; let support_draft_29 = config.support_draft_29; + let cert_hash = config.cert_hash; let quinn_config = config.into(); Self { listeners: SelectAll::new(), @@ -93,6 +100,7 @@ impl GenTransport

{ waker: None, support_draft_29, hole_punch_attempts: Default::default(), + cert_hash, } } @@ -134,16 +142,16 @@ impl GenTransport

{ addr: Multiaddr, check_unspecified_addr: bool, ) -> Result< - (SocketAddr, ProtocolVersion, Option), + (SocketAddr, ProtocolVersion, Option, bool), TransportError<::Error>, > { - let (socket_addr, version, peer_id) = multiaddr_to_socketaddr(&addr, self.support_draft_29) + let (socket_addr, version, peer_id, wt) = multiaddr_to_socketaddr(&addr, self.support_draft_29) .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?; if check_unspecified_addr && (socket_addr.port() == 0 || socket_addr.ip().is_unspecified()) { return Err(TransportError::MultiaddrNotSupported(addr)); } - Ok((socket_addr, version, peer_id)) + Ok((socket_addr, version, peer_id, wt)) } /// Pick any listener to use for dialing. @@ -198,9 +206,9 @@ impl GenTransport

{ } impl Transport for GenTransport

{ - type Output = (PeerId, Connection); + type Output = (PeerId, StreamMuxerBox); type Error = Error; - type ListenerUpgrade = Connecting; + type ListenerUpgrade = BoxFuture<'static, Result>; type Dial = BoxFuture<'static, Result>; fn listen_on( @@ -208,7 +216,7 @@ impl Transport for GenTransport

{ listener_id: ListenerId, addr: Multiaddr, ) -> Result<(), TransportError> { - let (socket_addr, version, _peer_id) = self.remote_multiaddr_to_socketaddr(addr, false)?; + let (socket_addr, version, _peer_id, wt) = self.remote_multiaddr_to_socketaddr(addr, false)?; let endpoint_config = self.quinn_config.endpoint_config.clone(); let server_config = self.quinn_config.server_config.clone(); let socket = self.create_socket(socket_addr).map_err(Self::Error::from)?; @@ -221,6 +229,8 @@ impl Transport for GenTransport

{ endpoint, self.handshake_timeout, version, + wt, + self.cert_hash, )?; self.listeners.push(listener); @@ -257,7 +267,7 @@ impl Transport for GenTransport

{ } fn dial(&mut self, addr: Multiaddr) -> Result> { - let (socket_addr, version, _peer_id) = self.remote_multiaddr_to_socketaddr(addr, true)?; + let (socket_addr, version, _peer_id, wt) = self.remote_multiaddr_to_socketaddr(addr, true)?; let endpoint = match self.eligible_listener(&socket_addr) { None => { @@ -291,14 +301,20 @@ impl Transport for GenTransport

{ if version == ProtocolVersion::Draft29 { client_config.version(0xff00_001d); } + + // This `"l"` seems necessary because an empty string is an invalid domain + // name. While we don't use domain names, the underlying rustls library + // is based upon the assumption that we do. + let connecting = endpoint + .connect_with(client_config, socket_addr, "l") + .map_err(ConnectError).unwrap(); // todo should be `?` Ok(Box::pin(async move { - // This `"l"` seems necessary because an empty string is an invalid domain - // name. While we don't use domain names, the underlying rustls library - // is based upon the assumption that we do. - let connecting = endpoint - .connect_with(client_config, socket_addr, "l") - .map_err(ConnectError)?; - Connecting::new(connecting, handshake_timeout).await + if wt { + todo!("Here should be part that creates connection to a WT server") + //webtransport::Connecting::new(connecting, handshake_timeout).await + } else { + Connecting::new(connecting, handshake_timeout).await + } })) } @@ -306,7 +322,7 @@ impl Transport for GenTransport

{ &mut self, addr: Multiaddr, ) -> Result> { - let (socket_addr, _version, peer_id) = + let (socket_addr, _version, peer_id, _wt) = self.remote_multiaddr_to_socketaddr(addr.clone(), true)?; let peer_id = peer_id.ok_or(TransportError::MultiaddrNotSupported(addr.clone()))?; @@ -424,6 +440,9 @@ struct Listener { /// An underlying copy of the socket to be able to hole punch with socket: UdpSocket, + web_transport_addr: bool, + cert_hash: Multihash<64>, + /// A future to poll new incoming connections. accept: BoxFuture<'static, Option>, /// Timeout for connection establishment on inbound connections. @@ -453,6 +472,8 @@ impl Listener

{ endpoint: quinn::Endpoint, handshake_timeout: Duration, version: ProtocolVersion, + web_transport_addr: bool, + cert_hash: Multihash<64>, ) -> Result { let if_watcher; let pending_event; @@ -464,7 +485,7 @@ impl Listener

{ } else { if_watcher = None; listening_addresses.insert(local_addr.ip()); - let ma = socketaddr_to_multiaddr(&local_addr, version); + let ma = socketaddr_to_multiaddr(&local_addr, version, web_transport_addr, Some(cert_hash)); pending_event = Some(TransportEvent::NewAddress { listener_id, listen_addr: ma, @@ -486,6 +507,8 @@ impl Listener

{ pending_event, close_listener_waker: None, listening_addresses, + web_transport_addr, + cert_hash, }) } @@ -530,7 +553,8 @@ impl Listener

{ match ready!(P::poll_if_event(if_watcher, cx)) { Ok(IfEvent::Up(inet)) => { if let Some(listen_addr) = - ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version) + ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version, + self.web_transport_addr, Some(self.cert_hash)) { tracing::debug!( address=%listen_addr, @@ -545,7 +569,8 @@ impl Listener

{ } Ok(IfEvent::Down(inet)) => { if let Some(listen_addr) = - ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version) + ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version, + self.web_transport_addr, Some(self.cert_hash)) { tracing::debug!( address=%listen_addr, @@ -588,12 +613,34 @@ impl Stream for Listener

{ let endpoint = self.endpoint.clone(); self.accept = async move { endpoint.accept().await }.boxed(); - let local_addr = socketaddr_to_multiaddr(&self.socket_addr(), self.version); + let local_addr = socketaddr_to_multiaddr( + &self.socket_addr(), + self.version, + self.web_transport_addr, + Some(self.cert_hash), + ); let remote_addr = connecting.remote_address(); - let send_back_addr = socketaddr_to_multiaddr(&remote_addr, self.version); + //todo It's unclear should I add anything to a send_back_addr? + let send_back_addr = socketaddr_to_multiaddr( + &remote_addr, + self.version, + false, + None, + ); + + let timeout = self.handshake_timeout.clone(); + let fut = if self.web_transport_addr { + async move { + webtransport::Connecting::new(connecting, timeout).await + }.boxed() + } else { + async move { + Connecting::new(connecting, timeout).await + }.boxed() + }; let event = TransportEvent::Incoming { - upgrade: Connecting::new(connecting, self.handshake_timeout), + upgrade: fut, local_addr, send_back_addr, listener_id: self.listener_id, @@ -667,13 +714,15 @@ fn ip_to_listenaddr( endpoint_addr: &SocketAddr, ip: IpAddr, version: ProtocolVersion, + wt: bool, + cert_hash: Option>, ) -> Option { // True if either both addresses are Ipv4 or both Ipv6. if !SocketFamily::is_same(&endpoint_addr.ip(), &ip) { return None; } let socket_addr = SocketAddr::new(ip, endpoint_addr.port()); - Some(socketaddr_to_multiaddr(&socket_addr, version)) + Some(socketaddr_to_multiaddr(&socket_addr, version, wt, cert_hash)) } /// Tries to turn a QUIC multiaddress into a UDP [`SocketAddr`]. Returns None if the format @@ -681,11 +730,12 @@ fn ip_to_listenaddr( fn multiaddr_to_socketaddr( addr: &Multiaddr, support_draft_29: bool, -) -> Option<(SocketAddr, ProtocolVersion, Option)> { +) -> Option<(SocketAddr, ProtocolVersion, Option, bool)> { let mut iter = addr.iter(); let proto1 = iter.next()?; let proto2 = iter.next()?; let proto3 = iter.next()?; + let proto4 = iter.next()?; let mut peer_id = None; for proto in iter { @@ -702,12 +752,14 @@ fn multiaddr_to_socketaddr( _ => return None, }; + let wt = proto4 == Protocol::WebTransport; + match (proto1, proto2) { (Protocol::Ip4(ip), Protocol::Udp(port)) => { - Some((SocketAddr::new(ip.into(), port), version, peer_id)) + Some((SocketAddr::new(ip.into(), port), version, peer_id, wt)) } (Protocol::Ip6(ip), Protocol::Udp(port)) => { - Some((SocketAddr::new(ip.into(), port), version, peer_id)) + Some((SocketAddr::new(ip.into(), port), version, peer_id, wt)) } _ => None, } @@ -744,15 +796,29 @@ fn is_quic_addr(addr: &Multiaddr, support_draft_29: bool) -> bool { } /// Turns an IP address and port into the corresponding QUIC multiaddr. -fn socketaddr_to_multiaddr(socket_addr: &SocketAddr, version: ProtocolVersion) -> Multiaddr { +fn socketaddr_to_multiaddr( + socket_addr: &SocketAddr, + version: ProtocolVersion, + web_transport_addr: bool, + cert_hash: Option>, +) -> Multiaddr { let quic_proto = match version { ProtocolVersion::V1 => Protocol::QuicV1, ProtocolVersion::Draft29 => Protocol::Quic, }; - Multiaddr::empty() + let mut res = Multiaddr::empty() .with(socket_addr.ip().into()) .with(Protocol::Udp(socket_addr.port())) - .with(quic_proto) + .with(quic_proto); + + if web_transport_addr { + res = res.with(Protocol::WebTransport); + if let Some(hash) = cert_hash { + res = res.with(Protocol::Certhash(hash.clone())); + } + } + + res } #[cfg(test)] diff --git a/transports/quic/src/webtransport.rs b/transports/quic/src/webtransport.rs new file mode 100644 index 00000000000..2964d631f2c --- /dev/null +++ b/transports/quic/src/webtransport.rs @@ -0,0 +1,9 @@ + +pub(crate) mod fingerprint; +mod connection; +mod connecting; +mod stream; + +pub use connecting::WebTransportError; +pub(crate) use connection::Connection; +pub(crate) use connecting::Connecting; \ No newline at end of file diff --git a/transports/quic/src/webtransport/connecting.rs b/transports/quic/src/webtransport/connecting.rs new file mode 100644 index 00000000000..eaca1b32b67 --- /dev/null +++ b/transports/quic/src/webtransport/connecting.rs @@ -0,0 +1,126 @@ +//! Future that drives a QUIC connection until is has performed its TLS handshake. +//! And then it drives a http3 connection based on the QUIC connection. +//! And then it drives a WebTransport session based on the http3 connection. + +use std::{ + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use bytes::Bytes; +use futures::{ + future::{select, Select}, + prelude::*, +}; +use futures::future::{BoxFuture, Either}; +use futures_timer::Delay; +use h3::ext::Protocol; +use h3_quinn::Connection as Http3Connection; +use h3_webtransport::server::WebTransportSession; +use http::Method; + +use libp2p_core::muxing::StreamMuxerBox; +use libp2p_identity::PeerId; + +use crate::{Error, webtransport}; + +// #[derive(Debug)] +pub struct Connecting { + connecting: Select< + BoxFuture<'static, Result<(PeerId, WebTransportSession), WebTransportError>>, + Delay + >, +} + +impl Connecting { + pub(crate) fn new(connecting: quinn::Connecting, timeout: Duration) -> Self { + Connecting { + connecting: select(web_transport_session(connecting).boxed(), Delay::new(timeout)) + } + } +} + +impl Future for Connecting { + type Output = Result<(PeerId, StreamMuxerBox), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let res = match futures::ready!(self.connecting.poll_unpin(cx)) { + Either::Right(_) => return Poll::Ready(Err(Error::HandshakeTimedOut)), + Either::Left((Ok((peer_id, session)), _)) => { + let muxer = webtransport::Connection::new(session); + Ok((peer_id, StreamMuxerBox::new(muxer))) + } + Either::Left((Err(_e), _)) => return Poll::Ready(Err(Error::WebTransportError)), + }; + + Poll::Ready(res) + } +} + +fn remote_peer_id(connection: &quinn::Connection) -> PeerId { + let identity = connection + .peer_identity() + .expect("connection got identity because it passed TLS handshake; qed"); + let certificates: Box> = + identity.downcast().expect("we rely on rustls feature; qed"); + let end_entity = certificates + .first() + .expect("there should be exactly one certificate; qed"); + let p2p_cert = libp2p_tls::certificate::parse(end_entity) + .expect("the certificate was validated during TLS handshake; qed"); + p2p_cert.peer_id() +} + +async fn web_transport_session(connecting: quinn::Connecting) + -> Result<(PeerId, WebTransportSession), WebTransportError> { + let quic_conn = connecting.await + .map_err(|e| WebTransportError::ConnectionError(e))?; + // info!("new http3 established"); + + let peer_id = remote_peer_id(&quic_conn); + + let mut h3_conn = h3::server::builder() + .enable_webtransport(true) + .enable_connect(true) + .enable_datagram(true) + .max_webtransport_sessions(1) + .send_grease(true) + .build(h3_quinn::Connection::new(quic_conn)) + .await + .unwrap(); + + match h3_conn.accept().await { + Ok(Some((request, stream))) => { + let ext = request.extensions(); + let proto = ext.get::(); + if Some(&Protocol::WEB_TRANSPORT) == proto { + if request.method() == &Method::CONNECT { + let session = WebTransportSession::accept(request, stream, h3_conn) + .await.map_err(|e| WebTransportError::Http3Error(e))?; + Ok((peer_id, session)) + } else { + Err(WebTransportError::UnexpectedMethod(request.method().clone())) + } + } else { + Err(WebTransportError::UnexpectedProtocol(proto.cloned())) + } + } + Ok(None) => { + // indicating no more streams to be received + Err(WebTransportError::ClosedConnection) + } + Err(err) => { + Err(WebTransportError::Http3Error(err)) + } + } +} + +// #[derive(Debug)] +pub enum WebTransportError { + ConnectionError(quinn::ConnectionError), + UnexpectedMethod(Method), + UnexpectedProtocol(Option), + ClosedConnection, + Http3Error(h3::Error), +} \ No newline at end of file diff --git a/transports/quic/src/webtransport/connection.rs b/transports/quic/src/webtransport/connection.rs new file mode 100644 index 00000000000..2f7d8589845 --- /dev/null +++ b/transports/quic/src/webtransport/connection.rs @@ -0,0 +1,127 @@ +use h3_quinn::Connection as Http3Connection; +use h3_webtransport::server::WebTransportSession; +use bytes::Bytes; +use crate::Error; +use futures::{future::BoxFuture, FutureExt}; +use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use h3::quic::BidiStream; +use crate::webtransport::stream::Stream; + +/// State for a single opened WebTransport session. +pub struct Connection { + /// Underlying connection. + session: WebTransportSession, + /// Future for accepting a new incoming bidirectional stream. + incoming: Option< + BoxFuture<'static, (h3_webtransport::stream::SendStream, Bytes>, + h3_webtransport::stream::RecvStream)>, + >, + /// Future for opening a new outgoing bidirectional stream. + outgoing: Option< + BoxFuture<'static, (h3_webtransport::stream::SendStream, Bytes>, + h3_webtransport::stream::RecvStream)>, + >, + /// Future to wait for the connection to be closed. + closing: Option>, +} + +impl Connection { + /// Build a [`Connection`] from raw components. + /// + /// This function assumes that the [`quinn::Connection`] is completely fresh and none of + /// its methods has ever been called. Failure to comply might lead to logic errors and panics. + pub(crate) fn new(session: WebTransportSession) -> Self { + Self { + session, + incoming: None, + outgoing: None, + closing: None, + } + } +} + +impl StreamMuxer for Connection { + type Substream = Stream; + type Error = Error; + + fn poll_inbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + + let incoming = this.incoming.get_or_insert_with(|| { + let accept = this.session.accept_bi(); + async move { + let res = accept.await.unwrap(); + match res { + Some(h3_webtransport::server::AcceptedBi::BidiStream(_, stream)) => { + stream.split() + } + _ => unreachable!("fix me!") + } + }.boxed() + }); + + let (send, recv) = futures::ready!(incoming.poll_unpin(cx)); + this.incoming.take(); + let stream = Stream::new(send, recv); + Poll::Ready(Ok(stream)) + } + + fn poll_outbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + + let session_id = this.session.session_id(); + let open_bi = this.session.open_bi(session_id); + + let outgoing = this.outgoing.get_or_insert_with(|| { + async move { + // let stream = open_bi.await.unwrap(); + // + // stream.split() + unreachable!("fix me!") + }.boxed() + }); + + let (send, recv) = futures::ready!(outgoing.poll_unpin(cx)); + this.outgoing.take(); + let stream = Stream::new(send, recv); + Poll::Ready(Ok(stream)) + } + + fn poll( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + // TODO: If connection migration is enabled (currently disabled) address + // change on the connection needs to be handled. + Poll::Pending + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + /*let this = self.get_mut(); + + let closing = this.closing.get_or_insert_with(|| { + this.session. + // this.connection.close(From::from(0u32), &[]); + // let connection = this.connection.clone(); + async move { connection.closed().await }.boxed() + }); + + match futures::ready!(closing.poll_unpin(cx)) { + // Expected error given that `connection.close` was called above. + quinn::ConnectionError::LocallyClosed => {} + error => return Poll::Ready(Err(Error::Connection(ConnectionError(error)))), + };*/ + + Poll::Ready(Ok(())) + } +} \ No newline at end of file diff --git a/transports/quic/src/webtransport/fingerprint.rs b/transports/quic/src/webtransport/fingerprint.rs new file mode 100644 index 00000000000..14e0930fe54 --- /dev/null +++ b/transports/quic/src/webtransport/fingerprint.rs @@ -0,0 +1,77 @@ +use sha2::Digest as _; +use std::fmt; + +const SHA256: &str = "sha-256"; +const MULTIHASH_SHA256_CODE: u64 = 0x12; + +type Multihash = libp2p_core::multihash::Multihash<64>; + +/// A certificate fingerprint that is assumed to be created using the SHA256 hash algorithm. +#[derive(Eq, PartialEq, Copy, Clone)] +pub struct Fingerprint([u8; 32]); + +impl Fingerprint { + pub(crate) const FF: Fingerprint = Fingerprint([0xFF; 32]); + + #[cfg(test)] + pub fn raw(bytes: [u8; 32]) -> Self { + Self(bytes) + } + + /// Creates a fingerprint from a raw certificate. + pub fn from_certificate(bytes: &[u8]) -> Self { + Fingerprint(sha2::Sha256::digest(bytes).into()) + } + + /// Converts [`Multihash`](multihash::Multihash) to [`Fingerprint`]. + pub fn try_from_multihash(hash: Multihash) -> Option { + if hash.code() != MULTIHASH_SHA256_CODE { + // Only support SHA256 for now. + return None; + } + + let bytes = hash.digest().try_into().ok()?; + + Some(Self(bytes)) + } + + /// Converts this fingerprint to [`Multihash`](multihash::Multihash). + pub fn to_multihash(self) -> Multihash { + Multihash::wrap(MULTIHASH_SHA256_CODE, &self.0).expect("fingerprint's len to be 32 bytes") + } + + /// Formats this fingerprint as uppercase hex, separated by colons (`:`). + /// + /// This is the format described in . + pub fn to_sdp_format(self) -> String { + self.0.map(|byte| format!("{byte:02X}")).join(":") + } + + /// Returns the algorithm used (e.g. "sha-256"). + /// See + pub fn algorithm(&self) -> String { + SHA256.to_owned() + } +} + +impl fmt::Debug for Fingerprint { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&hex::encode(self.0)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sdp_format() { + let fp = Fingerprint::raw(hex_literal::hex!( + "7DE3D83F81A680592A471E6B6ABB0747ABD35385A8093FDFE112C1EEBB6CC6AC" + )); + + let sdp_format = fp.to_sdp_format(); + + assert_eq!(sdp_format, "7D:E3:D8:3F:81:A6:80:59:2A:47:1E:6B:6A:BB:07:47:AB:D3:53:85:A8:09:3F:DF:E1:12:C1:EE:BB:6C:C6:AC") + } +} \ No newline at end of file diff --git a/transports/quic/src/webtransport/stream.rs b/transports/quic/src/webtransport/stream.rs new file mode 100644 index 00000000000..6e05d585675 --- /dev/null +++ b/transports/quic/src/webtransport/stream.rs @@ -0,0 +1,69 @@ +use std::{ + io::{self}, + pin::Pin, + task::{Context, Poll}, +}; +use bytes::Bytes; +use futures::{AsyncRead, AsyncWrite}; + +/// A single stream on a connection +pub struct Stream { + /// A send part of the stream + send: h3_webtransport::stream::SendStream, Bytes>, + /// A receive part of the stream + recv: h3_webtransport::stream::RecvStream, + /// Whether the stream is closed or not + close_result: Option>, +} + +impl Stream { + pub(super) fn new( + send: h3_webtransport::stream::SendStream, Bytes>, + recv: h3_webtransport::stream::RecvStream + ) -> Self { + Self { + send, + recv, + close_result: None, + } + } +} + +impl AsyncRead for Stream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + if let Some(close_result) = self.close_result { + if close_result.is_err() { + return Poll::Ready(Ok(0)); + } + } + Pin::new(&mut self.recv).poll_read(cx, buf) + } +} + +impl AsyncWrite for Stream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.send).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.send).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + if let Some(close_result) = self.close_result { + // For some reason poll_close needs to be 'fuse'able + return Poll::Ready(close_result.map_err(Into::into)); + } + let close_result = futures::ready!(Pin::new(&mut self.send).poll_close(cx)); + self.close_result = Some(close_result.as_ref().map_err(|e| e.kind()).copied()); + Poll::Ready(close_result) + } +} \ No newline at end of file diff --git a/transports/tls/Cargo.toml b/transports/tls/Cargo.toml index 3df1674c4b3..926ecc70d29 100644 --- a/transports/tls/Cargo.toml +++ b/transports/tls/Cargo.toml @@ -19,6 +19,7 @@ thiserror = "1.0.50" webpki = { version = "0.101.4", package = "rustls-webpki", features = ["std"] } x509-parser = "0.15.1" yasna = "0.5.2" +time = "0.3" # Exposed dependencies. Breaking changes to these are breaking changes to us. [dependencies.rustls] diff --git a/transports/tls/src/lib.rs b/transports/tls/src/lib.rs index 1edd83e9807..4a5434414c4 100644 --- a/transports/tls/src/lib.rs +++ b/transports/tls/src/lib.rs @@ -34,6 +34,7 @@ use libp2p_identity::PeerId; use std::sync::Arc; pub use futures_rustls::TlsStream; +use rustls::{Certificate, PrivateKey}; pub use upgrade::Config; pub use upgrade::UpgradeError; @@ -79,3 +80,23 @@ pub fn make_server_config( Ok(crypto) } + +/// Create a TLS server configuration for libp2p. +pub fn make_server_config_with_cert( + certificate: Certificate, + private_key: PrivateKey, + alpn: Vec> +) -> Result { + let mut crypto = rustls::ServerConfig::builder() + .with_cipher_suites(verifier::CIPHERSUITES) + .with_safe_default_kx_groups() + .with_protocol_versions(verifier::PROTOCOL_VERSIONS) + .expect("Cipher suites and kx groups are configured; qed") + .with_client_cert_verifier(Arc::new(verifier::Libp2pCertificateVerifier::new())) + .with_single_cert(vec![certificate], private_key) + .expect("Server cert key DER is valid; qed"); + + crypto.alpn_protocols = alpn; + + Ok(crypto) +} From 8bf8d0c522947bd8942ee74669e53f45b9f412bb Mon Sep 17 00:00:00 2001 From: dgarus Date: Thu, 16 Nov 2023 14:28:57 +0300 Subject: [PATCH 2/7] example --- examples/ping/Cargo.toml | 2 +- examples/ping/src/main.rs | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/examples/ping/Cargo.toml b/examples/ping/Cargo.toml index 88ca2fa657a..58cee54409e 100644 --- a/examples/ping/Cargo.toml +++ b/examples/ping/Cargo.toml @@ -10,7 +10,7 @@ release = false [dependencies] futures = "0.3.29" -libp2p = { path = "../../libp2p", features = ["noise", "ping", "tcp", "quic", "tokio", "yamux"] } +libp2p = { path = "../../libp2p", features = ["noise", "ping", "tcp", "tokio", "yamux"] } tokio = { version = "1.33.0", features = ["full"] } tracing = "0.1.37" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/ping/src/main.rs b/examples/ping/src/main.rs index 45638171390..bbc9b42d7b8 100644 --- a/examples/ping/src/main.rs +++ b/examples/ping/src/main.rs @@ -45,8 +45,7 @@ async fn main() -> Result<(), Box> { // Tell the swarm to listen on all interfaces and a random, OS-assigned // port. - // swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; - swarm.listen_on("/ip4/0.0.0.0/udp/0/quic".parse()?)?; + swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; // Dial the peer identified by the multi-address given as the second // command-line argument, if any. From cabc7dee343bb012e877b99cb27f2daaf8e0e60c Mon Sep 17 00:00:00 2001 From: dgarus Date: Thu, 16 Nov 2023 14:29:30 +0300 Subject: [PATCH 3/7] example --- examples/ping/src/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/ping/src/main.rs b/examples/ping/src/main.rs index bbc9b42d7b8..911b0384f89 100644 --- a/examples/ping/src/main.rs +++ b/examples/ping/src/main.rs @@ -38,7 +38,6 @@ async fn main() -> Result<(), Box> { noise::Config::new, yamux::Config::default, )? - .with_quic() .with_behaviour(|_| ping::Behaviour::default())? .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX))) .build(); From 13cc3df6c535465de19f3edd77d3c06fe6a1ef97 Mon Sep 17 00:00:00 2001 From: dgarus Date: Fri, 22 Dec 2023 17:03:46 +0300 Subject: [PATCH 4/7] added CertManager --- Cargo.lock | 90 +++++++-- transports/quic/Cargo.toml | 4 +- transports/quic/src/certificate_manager.rs | 157 ++++++++++++++++ transports/quic/src/config.rs | 164 +++++++--------- transports/quic/src/lib.rs | 5 + transports/quic/src/transport.rs | 176 ++++++++++-------- transports/quic/src/webtransport.rs | 3 - .../quic/src/webtransport/connecting.rs | 97 +++++++--- .../quic/src/webtransport/connection.rs | 39 +++- .../quic/src/webtransport/fingerprint.rs | 77 -------- transports/quic/src/webtransport/stream.rs | 20 ++ transports/tls/src/certificate.rs | 35 +++- transports/tls/src/lib.rs | 25 +-- transports/tls/src/verifier.rs | 8 +- 14 files changed, 565 insertions(+), 335 deletions(-) create mode 100644 transports/quic/src/certificate_manager.rs delete mode 100644 transports/quic/src/webtransport/fingerprint.rs diff --git a/Cargo.lock b/Cargo.lock index 2e79300cff9..33b9eed0e17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -452,7 +452,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d9a9bf8b79a749ee0b911b91b671cc2b6c670bdbc7e3dfd537576ddc94bb2a2" dependencies = [ - "http 0.2.9", + "http 0.2.11", "log", "url", ] @@ -486,7 +486,7 @@ dependencies = [ "bitflags 1.3.2", "bytes", "futures-util", - "http 0.2.9", + "http 0.2.11", "http-body 0.4.5", "hyper 0.14.27", "itoa", @@ -545,7 +545,7 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http 0.2.9", + "http 0.2.11", "http-body 0.4.5", "mime", "rustversion", @@ -1516,7 +1516,7 @@ dependencies = [ "cookie", "futures-core", "futures-util", - "http 0.2.9", + "http 0.2.11", "hyper 0.14.27", "hyper-rustls", "mime", @@ -1539,9 +1539,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" [[package]] name = "ff" @@ -1876,7 +1876,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http 0.2.9", + "http 0.2.11", "indexmap 1.9.3", "slab", "tokio", @@ -1903,6 +1903,48 @@ dependencies = [ "tracing", ] +[[package]] +name = "h3" +version = "0.0.3" +source = "git+https://github.com/hyperium/h3#5c161952b02e663f31f9b83829bafa7a047b6627" +dependencies = [ + "bytes", + "fastrand 2.0.1", + "futures-util", + "http 1.0.0", + "pin-project-lite", + "tokio", + "tracing", +] + +[[package]] +name = "h3-quinn" +version = "0.0.4" +source = "git+https://github.com/hyperium/h3#5c161952b02e663f31f9b83829bafa7a047b6627" +dependencies = [ + "bytes", + "futures", + "h3", + "quinn", + "quinn-proto", + "tokio", + "tokio-util", +] + +[[package]] +name = "h3-webtransport" +version = "0.1.0" +source = "git+https://github.com/hyperium/h3#5c161952b02e663f31f9b83829bafa7a047b6627" +dependencies = [ + "bytes", + "futures-util", + "h3", + "http 1.0.0", + "pin-project-lite", + "tokio", + "tracing", +] + [[package]] name = "half" version = "1.8.2" @@ -2069,9 +2111,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.9" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" +checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" dependencies = [ "bytes", "fnv", @@ -2096,7 +2138,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", - "http 0.2.9", + "http 0.2.11", "pin-project-lite", ] @@ -2158,7 +2200,7 @@ dependencies = [ "futures-core", "futures-util", "h2 0.3.20", - "http 0.2.9", + "http 0.2.11", "http-body 0.4.5", "httparse", "httpdate", @@ -2196,7 +2238,7 @@ version = "0.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" dependencies = [ - "http 0.2.9", + "http 0.2.11", "hyper 0.14.27", "log", "rustls 0.20.8", @@ -2322,7 +2364,7 @@ dependencies = [ "attohttpc", "bytes", "futures", - "http 0.2.9", + "http 0.2.11", "hyper 0.14.27", "log", "rand 0.8.5", @@ -3116,6 +3158,12 @@ dependencies = [ "bytes", "futures", "futures-timer", + "h3", + "h3-quinn", + "h3-webtransport", + "hex", + "hex-literal", + "http 1.0.0", "if-watch", "libp2p-core", "libp2p-identity", @@ -3128,10 +3176,13 @@ dependencies = [ "quickcheck", "quinn", "rand 0.8.5", + "rcgen", "ring 0.16.20", "rustls 0.21.9", + "sha2 0.10.8", "socket2 0.5.5", "thiserror", + "time", "tokio", "tracing", "tracing-subscriber", @@ -3340,6 +3391,7 @@ dependencies = [ "rustls 0.21.9", "rustls-webpki", "thiserror", + "time", "tokio", "x509-parser", "yasna", @@ -4088,7 +4140,7 @@ checksum = "7e5e5a5c4135864099f3faafbe939eb4d7f9b80ebf68a8448da961b32a7c1275" dependencies = [ "async-trait", "futures-core", - "http 0.2.9", + "http 0.2.11", "opentelemetry-proto", "opentelemetry-semantic-conventions", "opentelemetry_api", @@ -4850,7 +4902,7 @@ dependencies = [ "futures-core", "futures-util", "h2 0.3.20", - "http 0.2.9", + "http 0.2.11", "http-body 0.4.5", "hyper 0.14.27", "hyper-tls", @@ -5708,7 +5760,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" dependencies = [ "cfg-if", - "fastrand 2.0.0", + "fastrand 2.0.1", "redox_syscall 0.4.1", "rustix 0.38.21", "windows-sys 0.48.0", @@ -5734,7 +5786,7 @@ dependencies = [ "cookie", "fantoccini", "futures", - "http 0.2.9", + "http 0.2.11", "indexmap 1.9.3", "log", "parking_lot", @@ -5943,7 +5995,7 @@ dependencies = [ "futures-core", "futures-util", "h2 0.3.20", - "http 0.2.9", + "http 0.2.11", "http-body 0.4.5", "hyper 0.14.27", "hyper-timeout", @@ -6492,7 +6544,7 @@ dependencies = [ "base64 0.13.1", "bytes", "cookie", - "http 0.2.9", + "http 0.2.11", "log", "serde", "serde_derive", diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml index 8c1ee3714e3..f8bee39f2b9 100644 --- a/transports/quic/Cargo.toml +++ b/transports/quic/Cargo.toml @@ -30,10 +30,11 @@ sha2 = "0.10.7" hex = "0.4" # WebTransport -http = "0.2.11" +http = "1.0.0" h3 = { git = "https://github.com/hyperium/h3" } h3-quinn = { git = "https://github.com/hyperium/h3" } h3-webtransport = { git = "https://github.com/hyperium/h3" } +libp2p-noise = { workspace = true } rcgen = "0.11.3" time = "0.3" @@ -52,7 +53,6 @@ rustc-args = ["--cfg", "docsrs"] async-std = { version = "1.12.0", features = ["attributes"] } libp2p-identity = { workspace = true, features = ["rand"] } libp2p-muxer-test-harness = { path = "../../muxers/test-harness" } -libp2p-noise = { workspace = true } libp2p-tcp = { workspace = true, features = ["async-io"] } libp2p-yamux = { workspace = true } quickcheck = "1" diff --git a/transports/quic/src/certificate_manager.rs b/transports/quic/src/certificate_manager.rs new file mode 100644 index 00000000000..0a17716c2cc --- /dev/null +++ b/transports/quic/src/certificate_manager.rs @@ -0,0 +1,157 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::sync::Arc; + +use rustls; +use sha2::Digest; +use time::{Duration, OffsetDateTime}; + +use libp2p_core::multihash::Multihash; +use libp2p_tls::{ + certificate, P2P_ALPN, verifier, +}; + +const SHA256: &str = "sha-256"; +const MULTIHASH_SHA256_CODE: u64 = 0x12; +const CERT_VALID_PERIOD: Duration = Duration::days(14); + +#[derive(Clone, Debug)] +pub(crate) struct ServerCertManager { + keypair: libp2p_identity::Keypair, + + items: Vec, +} + +#[derive(Clone, Debug)] +struct CertItem { + server_tls_config: Arc, + start: OffsetDateTime, + end: OffsetDateTime, + cert_hash: Multihash<64>, +} + +impl ServerCertManager { + pub(crate) fn new(keypair: libp2p_identity::Keypair) -> Self { + Self { + keypair, + items: Vec::with_capacity(3), + } + } + + /// Gets TLS server config and certificate hashes. + pub(crate) fn get_config(&mut self) -> Result<(Arc, Vec>), certificate::GenError> { + self.check_and_roll_items()?; + + let cur_item = self.items.first() + .expect("Element with index 0 exists"); + let cert_hashes = self.items.iter() + .map(|item| item.cert_hash.clone()) + .collect(); + + Ok((cur_item.server_tls_config.clone(), cert_hashes)) + } + + fn create_cert_item(&self, start: OffsetDateTime) -> Result { + let not_after = start.clone() + .checked_add(CERT_VALID_PERIOD) + .expect("Addition does not overflow"); + + let (cert, private_key) = + certificate::generate_webtransport_certificate(&self.keypair, start, not_after)?; + + let cert_hash = Multihash::wrap( + MULTIHASH_SHA256_CODE, sha2::Sha256::digest(cert.as_ref().as_ref()).as_ref(), + ).expect("fingerprint's len to be 32 bytes"); + + let mut tls_config = rustls::ServerConfig::builder() + .with_cipher_suites(verifier::CIPHERSUITES) + .with_safe_default_kx_groups() + .with_protocol_versions(verifier::PROTOCOL_VERSIONS) + .expect("Cipher suites and kx groups are configured; qed") + .with_client_cert_verifier(Arc::new(verifier::Libp2pCertificateVerifier::new())) + .with_single_cert(vec![cert], private_key) + .expect("Server cert key DER is valid; qed"); + + tls_config.alpn_protocols = alpn_protocols(); + + Ok(CertItem { server_tls_config: Arc::new(tls_config), start, end: not_after, cert_hash }) + } + + ///https://github.com/libp2p/specs/tree/master/webtransport#certificates + /// Servers need to take care of regularly renewing their certificates.At first boot of the node, + /// it creates one self-signed certificate with a validity of 14 days, starting immediately, + /// and another certificate with the 14 day validity period starting on the expiry date of the first certificate. + /// The node advertises a multiaddress containing the certificate hashes of these two certificates. + /// Once the first certificate has expired, the node starts using the already generated next certificate. + /// At the same time, it again generates a new certificate for the following period and updates the multiaddress it advertises. + fn check_and_roll_items(&mut self) -> Result<(), certificate::GenError> { + if self.items.len() == 0 { + let current = self.create_cert_item(OffsetDateTime::now_utc())?; + let next_start = current.end.clone(); + self.items.push(current); + self.items.push(self.create_cert_item(next_start)?); + } else { + let next = self.items.get(1) + .expect("Element with index 1 exists"); + + if OffsetDateTime::now_utc() >= next.start { + let next_start = next.end.clone(); + self.items.push(self.create_cert_item(next_start)?); + if self.items.len() == 3 { + self.items.remove(0); + } + } + }; + + Ok(()) + } +} + +fn alpn_protocols() -> Vec> { + vec![P2P_ALPN.to_vec(), + b"h3".to_vec(), + b"h3-32".to_vec(), + b"h3-31".to_vec(), + b"h3-30".to_vec(), + b"h3-29".to_vec(), ] +} + +/* +#[cfg(test)] +mod tests { + use std::fmt::{Debug, Formatter}; + use rcgen::SerialNumber; + use ring::rand::{SecureRandom, SystemRandom}; + use ring::{hkdf, signature}; + use ring::error::Unspecified; + use ring::signature::EcdsaKeyPair; + use time::macros::datetime; + + #[test] + fn key_pair_generate() { + let alg = &signature::ECDSA_P256_SHA256_ASN1_SIGNING; + let rnd = SystemRandom::new(); + + let doc = EcdsaKeyPair::generate_pkcs8(alg, &rnd); + + assert!(doc.is_ok()) + } +}*/ \ No newline at end of file diff --git a/transports/quic/src/config.rs b/transports/quic/src/config.rs index 3ffec319288..89c455b8c38 100644 --- a/transports/quic/src/config.rs +++ b/transports/quic/src/config.rs @@ -18,13 +18,14 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use quinn::{MtuDiscoveryConfig, VarInt}; +use quinn::{MtuDiscoveryConfig, TransportConfig, VarInt}; use std::{sync::Arc, time::Duration}; use libp2p_core::multihash::Multihash; -use crate::webtransport::fingerprint::Fingerprint; +use libp2p_tls::certificate; +use crate::certificate_manager::ServerCertManager; /// Config for the transport. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Config { /// Timeout for the initial handshake when establishing a connection. /// The actual timeout is the minimum of this and the [`Config::max_idle_timeout`]. @@ -63,16 +64,14 @@ pub struct Config { /// TLS client config for the inner [`quinn::ClientConfig`]. client_tls_config: Arc, - //todo At first boot of the node, it creates one self-signed certificate with a validity of 14 days, - // starting immediately, and another certificate with the 14 day validity period starting on - // the expiry date of the first certificate. - // The node advertises a multiaddress containing the certificate hashes of these two certificates. - // Once the first certificate has expired, the node starts using the already generated next certificate. - // At the same time, it again generates a new certificate for the following period and updates - // the multiaddress it advertises. - /// TLS server config for the inner [`quinn::ServerConfig`]. - server_tls_config: Arc, - pub(crate) cert_hash: Multihash<64>, + + transport: Arc, + + endpoint_config: quinn::EndpointConfig, + + /// TLS server config manager for the inner [`quinn::ServerConfig`] and + /// self-signed certificate hashes. + cert_manager: ServerCertManager, /// Libp2p identity of the node. keypair: libp2p_identity::Keypair, @@ -86,93 +85,26 @@ impl Config { pub fn new(keypair: &libp2p_identity::Keypair) -> Self { let client_tls_config = Arc::new(libp2p_tls::make_client_config(keypair, None).unwrap()); - //todo set up cert's dates - let (cert, private_key) = libp2p_tls::certificate::generate(keypair).unwrap(); - - let server_tls_config = Arc::new( - libp2p_tls::make_server_config_with_cert( - cert.clone(), private_key, vec![ - b"libp2p".to_vec(), - b"h3".to_vec(), - b"h3-32".to_vec(), - b"h3-31".to_vec(), - b"h3-30".to_vec(), - b"h3-29".to_vec(), - ] - ).unwrap() - ); - Self { - client_tls_config, - server_tls_config, - cert_hash: Fingerprint::from_certificate(cert.as_ref()).to_multihash(), - support_draft_29: false, - handshake_timeout: Duration::from_secs(5), - max_idle_timeout: 10 * 1000, - max_concurrent_stream_limit: 256, - keep_alive_interval: Duration::from_secs(5), - max_connection_data: 15_000_000, - - // Ensure that one stream is not consuming the whole connection. - max_stream_data: 10_000_000, - keypair: keypair.clone(), - mtu_discovery_config: Some(Default::default()), - } - } + let max_concurrent_stream_limit = 256; + let keep_alive_interval = Duration::from_secs(5); + let max_idle_timeout = 10 * 1000; + let max_stream_data = 10_000_000; + let max_connection_data = 15_000_000; + let mtu_discovery_config = Some(Default::default()); + let support_draft_29 = false; - /// Disable MTU path discovery (it is enabled by default). - pub fn disable_path_mtu_discovery(mut self) -> Self { - self.mtu_discovery_config = None; - self - } -} - -/// Represents the inner configuration for [`quinn`]. -#[derive(Debug, Clone)] -pub(crate) struct QuinnConfig { - pub(crate) client_config: quinn::ClientConfig, - pub(crate) server_config: quinn::ServerConfig, - pub(crate) endpoint_config: quinn::EndpointConfig, -} - -impl From for QuinnConfig { - fn from(config: Config) -> QuinnConfig { - let Config { - client_tls_config, - server_tls_config, - cert_hash: _, - max_idle_timeout, - max_concurrent_stream_limit, - keep_alive_interval, - max_connection_data, - max_stream_data, - support_draft_29, - handshake_timeout: _, - keypair, - mtu_discovery_config, - } = config; let mut transport = quinn::TransportConfig::default(); // Disable uni-directional streams. transport.max_concurrent_uni_streams(0u32.into()); transport.max_concurrent_bidi_streams(max_concurrent_stream_limit.into()); // Disable datagrams. transport.datagram_receive_buffer_size(None); - transport.keep_alive_interval(Some(keep_alive_interval)); + transport.keep_alive_interval(Some(keep_alive_interval.clone())); transport.max_idle_timeout(Some(VarInt::from_u32(max_idle_timeout).into())); transport.allow_spin(false); transport.stream_receive_window(max_stream_data.into()); transport.receive_window(max_connection_data.into()); - transport.mtu_discovery_config(mtu_discovery_config); - let transport = Arc::new(transport); - - let mut server_config = quinn::ServerConfig::with_crypto(server_tls_config); - server_config.transport = Arc::clone(&transport); - // Disables connection migration. - // Long-term this should be enabled, however we then need to handle address change - // on connections in the `Connection`. - server_config.migration(false); - - let mut client_config = quinn::ClientConfig::new(client_tls_config); - client_config.transport_config(transport); + transport.mtu_discovery_config(mtu_discovery_config.clone()); let mut endpoint_config = keypair .derive_secret(b"libp2p quic stateless reset key") @@ -186,10 +118,58 @@ impl From for QuinnConfig { endpoint_config.supported_versions(vec![1]); } - QuinnConfig { - client_config, - server_config, + Self { + keypair: keypair.clone(), + client_tls_config, + transport: Arc::new(transport), endpoint_config, + cert_manager: ServerCertManager::new(keypair.clone()), + support_draft_29, + handshake_timeout: Duration::from_secs(5), + max_idle_timeout, + max_concurrent_stream_limit, + keep_alive_interval, + max_connection_data, + // Ensure that one stream is not consuming the whole connection. + max_stream_data, + mtu_discovery_config, } } + + /// Disable MTU path discovery (it is enabled by default). + pub fn disable_path_mtu_discovery(mut self) -> Self { + self.mtu_discovery_config = None; + self + } + + pub fn server_quinn_config(&mut self + ) -> Result<(quinn::ServerConfig, libp2p_noise::Config, Vec>), certificate::GenError> { + let (server_tls_config, cert_hashes) = self.cert_manager.get_config()?; + + let mut server_config = quinn::ServerConfig::with_crypto(server_tls_config); + server_config.transport = Arc::clone(&self.transport); + // Disables connection migration. + // Long-term this should be enabled, however we then need to handle address change + // on connections in the `Connection`. + server_config.migration(false); + + let mut noise = libp2p_noise::Config::new(&self.keypair) + .expect("Gets noise config"); + noise = noise.with_webtransport_certhashes( + cert_hashes.clone().into_iter().collect() + ); + + Ok((server_config, noise, cert_hashes)) + } + + pub fn endpoint_config(&self) -> quinn::EndpointConfig { + self.endpoint_config.clone() + } + + pub fn client_quinn_config(&self) -> quinn::ClientConfig { + let mut client_config = quinn::ClientConfig::new(self.client_tls_config.clone()); + client_config.transport_config(Arc::clone(&self.transport)); + + client_config + } } diff --git a/transports/quic/src/lib.rs b/transports/quic/src/lib.rs index 587b34796b2..59b37c3acad 100644 --- a/transports/quic/src/lib.rs +++ b/transports/quic/src/lib.rs @@ -57,6 +57,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +mod certificate_manager; mod config; mod connection; mod hole_punching; @@ -68,6 +69,7 @@ use std::net::SocketAddr; pub use config::Config; pub use connection::{Connecting, Connection, Stream}; +use libp2p_tls::certificate; #[cfg(feature = "async-std")] pub use provider::async_std; @@ -103,6 +105,9 @@ pub enum Error { #[error("Already punching hole for {0}).")] HolePunchInProgress(SocketAddr), + #[error(transparent)] + CertificateGenerationError(#[from] certificate::GenError), + #[error("WebTransport error.")] WebTransportError, } diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index b5a76b697b6..45c3f44b9f0 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::config::{Config, QuinnConfig}; +use crate::config::Config; use crate::hole_punching::hole_puncher; use crate::provider::Provider; use crate::{ConnectError, Connecting, Error, webtransport}; @@ -60,16 +60,8 @@ use libp2p_core::multihash::Multihash; /// See . #[derive(Debug)] pub struct GenTransport { - /// Config for the inner [`quinn`] structs. - quinn_config: QuinnConfig, - /// Timeout for the [`Connecting`] future. - handshake_timeout: Duration, - - // todo Temporary solution - cert_hash: Multihash<64>, - - /// Whether draft-29 is supported for dialing and listening. - support_draft_29: bool, + /// Config for the transport. + config: Config, /// Streams of active [`Listener`]s. listeners: SelectAll>, /// Dialer for each socket family if no matching listener exists. @@ -88,19 +80,12 @@ pub struct GenTransport { impl GenTransport

{ /// Create a new [`GenTransport`] with the given [`Config`]. pub fn new(config: Config) -> Self { - let handshake_timeout = config.handshake_timeout; - let support_draft_29 = config.support_draft_29; - let cert_hash = config.cert_hash; - let quinn_config = config.into(); Self { + config: config.clone(), listeners: SelectAll::new(), - quinn_config, - handshake_timeout, dialer: HashMap::new(), waker: None, - support_draft_29, hole_punch_attempts: Default::default(), - cert_hash, } } @@ -145,13 +130,13 @@ impl GenTransport

{ (SocketAddr, ProtocolVersion, Option, bool), TransportError<::Error>, > { - let (socket_addr, version, peer_id, wt) = multiaddr_to_socketaddr(&addr, self.support_draft_29) + let (socket_addr, version, peer_id, is_webtransport_addr) = multiaddr_to_socketaddr(&addr, self.config.support_draft_29) .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?; if check_unspecified_addr && (socket_addr.port() == 0 || socket_addr.ip().is_unspecified()) { return Err(TransportError::MultiaddrNotSupported(addr)); } - Ok((socket_addr, version, peer_id, wt)) + Ok((socket_addr, version, peer_id, is_webtransport_addr)) } /// Pick any listener to use for dialing. @@ -216,21 +201,27 @@ impl Transport for GenTransport

{ listener_id: ListenerId, addr: Multiaddr, ) -> Result<(), TransportError> { - let (socket_addr, version, _peer_id, wt) = self.remote_multiaddr_to_socketaddr(addr, false)?; - let endpoint_config = self.quinn_config.endpoint_config.clone(); - let server_config = self.quinn_config.server_config.clone(); + let (socket_addr, version, _peer_id, is_webtransport_addr) = self.remote_multiaddr_to_socketaddr(addr, false)?; + + let (server_config, noise, cert_hashes) = self.config.server_quinn_config() + .map_err(|e| Error::CertificateGenerationError(e))?; + let socket = self.create_socket(socket_addr).map_err(Self::Error::from)?; let socket_c = socket.try_clone().map_err(Self::Error::from)?; - let endpoint = Self::new_endpoint(endpoint_config, Some(server_config), socket)?; + let endpoint = Self::new_endpoint(self.config.endpoint_config(), Some(server_config), socket)?; + let webtransport = WebTransport { + in_use: is_webtransport_addr, + cert_hashes, + noise, + }; let listener = Listener::new( listener_id, socket_c, endpoint, - self.handshake_timeout, + self.config.handshake_timeout.clone(), version, - wt, - self.cert_hash, + webtransport )?; self.listeners.push(listener); @@ -258,8 +249,8 @@ impl Transport for GenTransport

{ } fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option { - if !is_quic_addr(listen, self.support_draft_29) - || !is_quic_addr(observed, self.support_draft_29) + if !is_quic_addr(listen, self.config.support_draft_29) + || !is_quic_addr(observed, self.config.support_draft_29) { return None; } @@ -267,7 +258,12 @@ impl Transport for GenTransport

{ } fn dial(&mut self, addr: Multiaddr) -> Result> { - let (socket_addr, version, _peer_id, wt) = self.remote_multiaddr_to_socketaddr(addr, true)?; + let (socket_addr, version, _peer_id, is_webtransport) = self.remote_multiaddr_to_socketaddr(addr.clone(), true)?; + + if is_webtransport { + // WebTransport implementation doesn't support dialling. + return Err(TransportError::MultiaddrNotSupported(addr)); + } let endpoint = match self.eligible_listener(&socket_addr) { None => { @@ -285,8 +281,7 @@ impl Transport for GenTransport

{ }; let socket = UdpSocket::bind(listen_socket_addr).map_err(Self::Error::from)?; - let endpoint_config = self.quinn_config.endpoint_config.clone(); - let endpoint = Self::new_endpoint(endpoint_config, None, socket)?; + let endpoint = Self::new_endpoint(self.config.endpoint_config(), None, socket)?; vacant.insert(endpoint.clone()); endpoint @@ -296,8 +291,8 @@ impl Transport for GenTransport

{ } Some(listener) => listener.endpoint.clone(), }; - let handshake_timeout = self.handshake_timeout; - let mut client_config = self.quinn_config.client_config.clone(); + let handshake_timeout = self.config.handshake_timeout; + let mut client_config = self.config.client_quinn_config(); if version == ProtocolVersion::Draft29 { client_config.version(0xff00_001d); } @@ -308,13 +303,9 @@ impl Transport for GenTransport

{ let connecting = endpoint .connect_with(client_config, socket_addr, "l") .map_err(ConnectError).unwrap(); // todo should be `?` + Ok(Box::pin(async move { - if wt { - todo!("Here should be part that creates connection to a WT server") - //webtransport::Connecting::new(connecting, handshake_timeout).await - } else { - Connecting::new(connecting, handshake_timeout).await - } + Connecting::new(connecting, handshake_timeout).await })) } @@ -322,7 +313,7 @@ impl Transport for GenTransport

{ &mut self, addr: Multiaddr, ) -> Result> { - let (socket_addr, _version, peer_id, _wt) = + let (socket_addr, _version, peer_id, _is_webtransport) = self.remote_multiaddr_to_socketaddr(addr.clone(), true)?; let peer_id = peer_id.ok_or(TransportError::MultiaddrNotSupported(addr.clone()))?; @@ -336,7 +327,7 @@ impl Transport for GenTransport

{ tracing::debug!("Preparing for hole-punch from {addr}"); - let hole_puncher = hole_puncher::

(socket, socket_addr, self.handshake_timeout); + let hole_puncher = hole_puncher::

(socket, socket_addr, self.config.handshake_timeout); let (sender, receiver) = oneshot::channel(); @@ -391,7 +382,7 @@ impl Transport for GenTransport

{ send_back_addr, } => { let socket_addr = - multiaddr_to_socketaddr(&send_back_addr, self.support_draft_29) + multiaddr_to_socketaddr(&send_back_addr, self.config.support_draft_29) .unwrap() .0; @@ -440,8 +431,7 @@ struct Listener { /// An underlying copy of the socket to be able to hole punch with socket: UdpSocket, - web_transport_addr: bool, - cert_hash: Multihash<64>, + webtransport: WebTransport, /// A future to poll new incoming connections. accept: BoxFuture<'static, Option>, @@ -472,8 +462,7 @@ impl Listener

{ endpoint: quinn::Endpoint, handshake_timeout: Duration, version: ProtocolVersion, - web_transport_addr: bool, - cert_hash: Multihash<64>, + webtransport: WebTransport, ) -> Result { let if_watcher; let pending_event; @@ -485,7 +474,9 @@ impl Listener

{ } else { if_watcher = None; listening_addresses.insert(local_addr.ip()); - let ma = socketaddr_to_multiaddr(&local_addr, version, web_transport_addr, Some(cert_hash)); + let mut ma = socketaddr_to_multiaddr(&local_addr, version); + ma = webtransport.update_multiaddr(ma); + pending_event = Some(TransportEvent::NewAddress { listener_id, listen_addr: ma, @@ -507,8 +498,7 @@ impl Listener

{ pending_event, close_listener_waker: None, listening_addresses, - web_transport_addr, - cert_hash, + webtransport, }) } @@ -551,10 +541,11 @@ impl Listener

{ loop { match ready!(P::poll_if_event(if_watcher, cx)) { Ok(IfEvent::Up(inet)) => { - if let Some(listen_addr) = - ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version, - self.web_transport_addr, Some(self.cert_hash)) + if let Some(mut listen_addr) = + ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version) { + listen_addr = self.webtransport.update_multiaddr(listen_addr); + tracing::debug!( address=%listen_addr, "New listen address" @@ -567,10 +558,10 @@ impl Listener

{ } } Ok(IfEvent::Down(inet)) => { - if let Some(listen_addr) = - ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version, - self.web_transport_addr, Some(self.cert_hash)) + if let Some(mut listen_addr) = + ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version) { + listen_addr = self.webtransport.update_multiaddr(listen_addr); tracing::debug!( address=%listen_addr, "Expired listen address" @@ -612,25 +603,30 @@ impl Stream for Listener

{ let endpoint = self.endpoint.clone(); self.accept = async move { endpoint.accept().await }.boxed(); - let local_addr = socketaddr_to_multiaddr( + let mut local_addr = socketaddr_to_multiaddr( &self.socket_addr(), self.version, - self.web_transport_addr, - Some(self.cert_hash), ); + local_addr = self.webtransport.update_multiaddr(local_addr); + let remote_addr = connecting.remote_address(); + //todo It's unclear should I add anything to a send_back_addr? + let send_back_addr = socketaddr_to_multiaddr( &remote_addr, self.version, - false, - None, ); let timeout = self.handshake_timeout.clone(); - let fut = if self.web_transport_addr { + let noise = self.webtransport.noise.clone(); + let fut = if self.webtransport.in_use { async move { - webtransport::Connecting::new(connecting, timeout).await + webtransport::Connecting::new( + noise, + connecting, + timeout, + ).await }.boxed() } else { async move { @@ -713,15 +709,14 @@ fn ip_to_listenaddr( endpoint_addr: &SocketAddr, ip: IpAddr, version: ProtocolVersion, - wt: bool, - cert_hash: Option>, ) -> Option { // True if either both addresses are Ipv4 or both Ipv6. if !SocketFamily::is_same(&endpoint_addr.ip(), &ip) { return None; } let socket_addr = SocketAddr::new(ip, endpoint_addr.port()); - Some(socketaddr_to_multiaddr(&socket_addr, version, wt, cert_hash)) + + Some(socketaddr_to_multiaddr(&socket_addr, version)) } /// Tries to turn a QUIC multiaddress into a UDP [`SocketAddr`]. Returns None if the format @@ -734,7 +729,8 @@ fn multiaddr_to_socketaddr( let proto1 = iter.next()?; let proto2 = iter.next()?; let proto3 = iter.next()?; - let proto4 = iter.next()?; + let proto4 = iter.next(); + let proto5 = iter.next(); let mut peer_id = None; for proto in iter { @@ -751,14 +747,22 @@ fn multiaddr_to_socketaddr( _ => return None, }; - let wt = proto4 == Protocol::WebTransport; + let is_webtransport = match proto4 { + Some(Protocol::WebTransport) => { + if let Some(Protocol::Certhash(_)) = proto5 { + panic!("Cannot listen on a specific certhash for WebTransport addr {addr}"); + } + true + } + _ => false, + }; match (proto1, proto2) { (Protocol::Ip4(ip), Protocol::Udp(port)) => { - Some((SocketAddr::new(ip.into(), port), version, peer_id, wt)) + Some((SocketAddr::new(ip.into(), port), version, peer_id, is_webtransport)) } (Protocol::Ip6(ip), Protocol::Udp(port)) => { - Some((SocketAddr::new(ip.into(), port), version, peer_id, wt)) + Some((SocketAddr::new(ip.into(), port), version, peer_id, is_webtransport)) } _ => None, } @@ -795,26 +799,36 @@ fn is_quic_addr(addr: &Multiaddr, support_draft_29: bool) -> bool { fn socketaddr_to_multiaddr( socket_addr: &SocketAddr, version: ProtocolVersion, - web_transport_addr: bool, - cert_hash: Option>, ) -> Multiaddr { let quic_proto = match version { ProtocolVersion::V1 => Protocol::QuicV1, ProtocolVersion::Draft29 => Protocol::Quic, }; - let mut res = Multiaddr::empty() + Multiaddr::empty() .with(socket_addr.ip().into()) .with(Protocol::Udp(socket_addr.port())) - .with(quic_proto); + .with(quic_proto) +} + +struct WebTransport { + in_use: bool, + + cert_hashes: Vec>, + noise: libp2p_noise::Config, +} - if web_transport_addr { - res = res.with(Protocol::WebTransport); - if let Some(hash) = cert_hash { - res = res.with(Protocol::Certhash(hash.clone())); +impl WebTransport { + pub(crate) fn update_multiaddr(&self, addr: Multiaddr) -> Multiaddr { + if self.in_use { + let mut vec = self.cert_hashes.clone(); + + return addr.with(Protocol::WebTransport) + .with(Protocol::Certhash(vec.pop().expect("Gets the last element"))) + .with(Protocol::Certhash(vec.pop().expect("Gets the last element"))); } - } - res + addr + } } #[cfg(test)] diff --git a/transports/quic/src/webtransport.rs b/transports/quic/src/webtransport.rs index 2964d631f2c..630d44c0b15 100644 --- a/transports/quic/src/webtransport.rs +++ b/transports/quic/src/webtransport.rs @@ -1,9 +1,6 @@ - -pub(crate) mod fingerprint; mod connection; mod connecting; mod stream; -pub use connecting::WebTransportError; pub(crate) use connection::Connection; pub(crate) use connecting::Connecting; \ No newline at end of file diff --git a/transports/quic/src/webtransport/connecting.rs b/transports/quic/src/webtransport/connecting.rs index eaca1b32b67..c808a3dce7e 100644 --- a/transports/quic/src/webtransport/connecting.rs +++ b/transports/quic/src/webtransport/connecting.rs @@ -1,3 +1,23 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + //! Future that drives a QUIC connection until is has performed its TLS handshake. //! And then it drives a http3 connection based on the QUIC connection. //! And then it drives a WebTransport session based on the http3 connection. @@ -8,7 +28,6 @@ use std::{ time::Duration, }; -use bytes::Bytes; use futures::{ future::{select, Select}, prelude::*, @@ -16,27 +35,33 @@ use futures::{ use futures::future::{BoxFuture, Either}; use futures_timer::Delay; use h3::ext::Protocol; -use h3_quinn::Connection as Http3Connection; use h3_webtransport::server::WebTransportSession; -use http::Method; +use http::{Method, Request}; use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::upgrade::InboundConnectionUpgrade; use libp2p_identity::PeerId; use crate::{Error, webtransport}; +use crate::webtransport::connecting::WebTransportError::{Http3Error, NoiseError}; + +const WEBTRANSPORT_PATH: &str = "/.well-known/libp2p-webtransport"; +const NOISE_QUERY: &str = "type=noise"; // #[derive(Debug)] pub struct Connecting { connecting: Select< - BoxFuture<'static, Result<(PeerId, WebTransportSession), WebTransportError>>, + BoxFuture<'static, Result<(PeerId, StreamMuxerBox), WebTransportError>>, Delay >, } impl Connecting { - pub(crate) fn new(connecting: quinn::Connecting, timeout: Duration) -> Self { + pub(crate) fn new( + noise: libp2p_noise::Config, + connecting: quinn::Connecting, timeout: Duration) -> Self { Connecting { - connecting: select(web_transport_session(connecting).boxed(), Delay::new(timeout)) + connecting: select(web_transport_connection(connecting, noise).boxed(), Delay::new(timeout)) } } } @@ -47,9 +72,8 @@ impl Future for Connecting { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let res = match futures::ready!(self.connecting.poll_unpin(cx)) { Either::Right(_) => return Poll::Ready(Err(Error::HandshakeTimedOut)), - Either::Left((Ok((peer_id, session)), _)) => { - let muxer = webtransport::Connection::new(session); - Ok((peer_id, StreamMuxerBox::new(muxer))) + Either::Left((Ok((peer_id, muxer)), _)) => { + Ok((peer_id, muxer)) } Either::Left((Err(_e), _)) => return Poll::Ready(Err(Error::WebTransportError)), }; @@ -72,10 +96,11 @@ fn remote_peer_id(connection: &quinn::Connection) -> PeerId { p2p_cert.peer_id() } -async fn web_transport_session(connecting: quinn::Connecting) - -> Result<(PeerId, WebTransportSession), WebTransportError> { +async fn web_transport_connection(connecting: quinn::Connecting, _noise: libp2p_noise::Config) + -> Result<(PeerId, StreamMuxerBox), WebTransportError> { let quic_conn = connecting.await .map_err(|e| WebTransportError::ConnectionError(e))?; + // info!("new http3 established"); let peer_id = remote_peer_id(&quic_conn); @@ -90,37 +115,63 @@ async fn web_transport_session(connecting: quinn::Connecting) .await .unwrap(); - match h3_conn.accept().await { - Ok(Some((request, stream))) => { + match h3_conn.accept().await? { + Some((request, stream)) => { let ext = request.extensions(); let proto = ext.get::(); if Some(&Protocol::WEB_TRANSPORT) == proto { - if request.method() == &Method::CONNECT { + if check_request(&request) { let session = WebTransportSession::accept(request, stream, h3_conn) - .await.map_err(|e| WebTransportError::Http3Error(e))?; - Ok((peer_id, session)) + .await?; + let connection = webtransport::Connection::new(session); + + // let (_, out) = noise.upgrade_inbound(muxer, "").await?; + + Ok((peer_id, StreamMuxerBox::new(connection))) } else { - Err(WebTransportError::UnexpectedMethod(request.method().clone())) + Err(WebTransportError::BadRequest(request.method().clone())) } } else { Err(WebTransportError::UnexpectedProtocol(proto.cloned())) } } - Ok(None) => { + None => { // indicating no more streams to be received Err(WebTransportError::ClosedConnection) } - Err(err) => { - Err(WebTransportError::Http3Error(err)) - } } } +fn check_request(req: &Request<()>) -> bool { + req.method() == &Method::CONNECT && + req.uri().path() == WEBTRANSPORT_PATH && + req.uri().query() == Some(NOISE_QUERY) +} + // #[derive(Debug)] pub enum WebTransportError { - ConnectionError(quinn::ConnectionError), - UnexpectedMethod(Method), UnexpectedProtocol(Option), + BadRequest(Method), + ConnectionError(quinn::ConnectionError), ClosedConnection, Http3Error(h3::Error), + NoiseError(libp2p_noise::Error), +} + +impl From for WebTransportError { + fn from(e: libp2p_noise::Error) -> Self { + NoiseError(e) + } +} + +impl From for WebTransportError { + fn from(e: h3::Error) -> Self { + Http3Error(e) + } +} + +impl From for WebTransportError { + fn from(e: quinn::ConnectionError) -> Self { + WebTransportError::ConnectionError(e) + } } \ No newline at end of file diff --git a/transports/quic/src/webtransport/connection.rs b/transports/quic/src/webtransport/connection.rs index 2f7d8589845..cb436187bcd 100644 --- a/transports/quic/src/webtransport/connection.rs +++ b/transports/quic/src/webtransport/connection.rs @@ -1,3 +1,23 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + use h3_quinn::Connection as Http3Connection; use h3_webtransport::server::WebTransportSession; use bytes::Bytes; @@ -8,13 +28,14 @@ use std::{ pin::Pin, task::{Context, Poll}, }; +use std::sync::Arc; use h3::quic::BidiStream; use crate::webtransport::stream::Stream; /// State for a single opened WebTransport session. pub struct Connection { /// Underlying connection. - session: WebTransportSession, + session: Arc>, /// Future for accepting a new incoming bidirectional stream. incoming: Option< BoxFuture<'static, (h3_webtransport::stream::SendStream, Bytes>, @@ -36,7 +57,7 @@ impl Connection { /// its methods has ever been called. Failure to comply might lead to logic errors and panics. pub(crate) fn new(session: WebTransportSession) -> Self { Self { - session, + session: Arc::new(session), incoming: None, outgoing: None, closing: None, @@ -53,11 +74,10 @@ impl StreamMuxer for Connection { cx: &mut Context<'_>, ) -> Poll> { let this = self.get_mut(); - + let t_session = Arc::clone(&this.session); let incoming = this.incoming.get_or_insert_with(|| { - let accept = this.session.accept_bi(); async move { - let res = accept.await.unwrap(); + let res = t_session.accept_bi().await.unwrap(); match res { Some(h3_webtransport::server::AcceptedBi::BidiStream(_, stream)) => { stream.split() @@ -80,14 +100,13 @@ impl StreamMuxer for Connection { let this = self.get_mut(); let session_id = this.session.session_id(); - let open_bi = this.session.open_bi(session_id); + let t_session = Arc::clone(&this.session); let outgoing = this.outgoing.get_or_insert_with(|| { async move { - // let stream = open_bi.await.unwrap(); - // - // stream.split() - unreachable!("fix me!") + let stream = t_session.open_bi(session_id).await.unwrap(); + + stream.split() }.boxed() }); diff --git a/transports/quic/src/webtransport/fingerprint.rs b/transports/quic/src/webtransport/fingerprint.rs deleted file mode 100644 index 14e0930fe54..00000000000 --- a/transports/quic/src/webtransport/fingerprint.rs +++ /dev/null @@ -1,77 +0,0 @@ -use sha2::Digest as _; -use std::fmt; - -const SHA256: &str = "sha-256"; -const MULTIHASH_SHA256_CODE: u64 = 0x12; - -type Multihash = libp2p_core::multihash::Multihash<64>; - -/// A certificate fingerprint that is assumed to be created using the SHA256 hash algorithm. -#[derive(Eq, PartialEq, Copy, Clone)] -pub struct Fingerprint([u8; 32]); - -impl Fingerprint { - pub(crate) const FF: Fingerprint = Fingerprint([0xFF; 32]); - - #[cfg(test)] - pub fn raw(bytes: [u8; 32]) -> Self { - Self(bytes) - } - - /// Creates a fingerprint from a raw certificate. - pub fn from_certificate(bytes: &[u8]) -> Self { - Fingerprint(sha2::Sha256::digest(bytes).into()) - } - - /// Converts [`Multihash`](multihash::Multihash) to [`Fingerprint`]. - pub fn try_from_multihash(hash: Multihash) -> Option { - if hash.code() != MULTIHASH_SHA256_CODE { - // Only support SHA256 for now. - return None; - } - - let bytes = hash.digest().try_into().ok()?; - - Some(Self(bytes)) - } - - /// Converts this fingerprint to [`Multihash`](multihash::Multihash). - pub fn to_multihash(self) -> Multihash { - Multihash::wrap(MULTIHASH_SHA256_CODE, &self.0).expect("fingerprint's len to be 32 bytes") - } - - /// Formats this fingerprint as uppercase hex, separated by colons (`:`). - /// - /// This is the format described in . - pub fn to_sdp_format(self) -> String { - self.0.map(|byte| format!("{byte:02X}")).join(":") - } - - /// Returns the algorithm used (e.g. "sha-256"). - /// See - pub fn algorithm(&self) -> String { - SHA256.to_owned() - } -} - -impl fmt::Debug for Fingerprint { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(&hex::encode(self.0)) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn sdp_format() { - let fp = Fingerprint::raw(hex_literal::hex!( - "7DE3D83F81A680592A471E6B6ABB0747ABD35385A8093FDFE112C1EEBB6CC6AC" - )); - - let sdp_format = fp.to_sdp_format(); - - assert_eq!(sdp_format, "7D:E3:D8:3F:81:A6:80:59:2A:47:1E:6B:6A:BB:07:47:AB:D3:53:85:A8:09:3F:DF:E1:12:C1:EE:BB:6C:C6:AC") - } -} \ No newline at end of file diff --git a/transports/quic/src/webtransport/stream.rs b/transports/quic/src/webtransport/stream.rs index 6e05d585675..03199c1eb11 100644 --- a/transports/quic/src/webtransport/stream.rs +++ b/transports/quic/src/webtransport/stream.rs @@ -1,3 +1,23 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + use std::{ io::{self}, pin::Pin, diff --git a/transports/tls/src/certificate.rs b/transports/tls/src/certificate.rs index ff9d296bb16..3375274f76c 100644 --- a/transports/tls/src/certificate.rs +++ b/transports/tls/src/certificate.rs @@ -24,7 +24,10 @@ use libp2p_identity as identity; use libp2p_identity::PeerId; -use x509_parser::{prelude::*, signature_algorithm::SignatureAlgorithm}; +use x509_parser::{ + der_parser, prelude::{oid_registry, X509Certificate, FromDer}, + signature_algorithm::SignatureAlgorithm, +}; /// The libp2p Public Key Extension is a X.509 extension /// with the Object Identier 1.3.6.1.4.1.53594.1.1, @@ -72,6 +75,36 @@ pub fn generate( Ok((rustls_certificate, rustls_key)) } +/// Generates a self-signed TLS certificate with passed `not_before` and `not_after` dates +/// that includes a libp2p-specific certificate extension containing +/// the public key of the given keypair. +pub fn generate_webtransport_certificate( + identity_keypair: &identity::Keypair, + not_before: time::OffsetDateTime, + not_after: time::OffsetDateTime, +) -> Result<(rustls::Certificate, rustls::PrivateKey), GenError> { + let certificate_keypair = rcgen::KeyPair::generate(P2P_SIGNATURE_ALGORITHM)?; + let rustls_key = rustls::PrivateKey(certificate_keypair.serialize_der()); + + let certificate = { + let mut params = rcgen::CertificateParams::new(vec![]); + params.distinguished_name = rcgen::DistinguishedName::new(); + params.custom_extensions.push(make_libp2p_extension( + identity_keypair, + &certificate_keypair, + )?); + params.alg = P2P_SIGNATURE_ALGORITHM; + params.key_pair = Some(certificate_keypair); + params.not_before = not_before.into(); + params.not_after = not_after.into(); + rcgen::Certificate::from_params(params)? + }; + + let rustls_certificate = rustls::Certificate(certificate.serialize_der()?); + + Ok((rustls_certificate, rustls_key)) +} + /// Attempts to parse the provided bytes as a [`P2pCertificate`]. /// /// For this to succeed, the certificate must contain the specified extension and the signature must diff --git a/transports/tls/src/lib.rs b/transports/tls/src/lib.rs index 4a5434414c4..cf369e2f880 100644 --- a/transports/tls/src/lib.rs +++ b/transports/tls/src/lib.rs @@ -27,18 +27,17 @@ pub mod certificate; mod upgrade; -mod verifier; +pub mod verifier; use libp2p_identity::Keypair; use libp2p_identity::PeerId; use std::sync::Arc; pub use futures_rustls::TlsStream; -use rustls::{Certificate, PrivateKey}; pub use upgrade::Config; pub use upgrade::UpgradeError; -const P2P_ALPN: [u8; 6] = *b"libp2p"; +pub const P2P_ALPN: [u8; 6] = *b"libp2p"; /// Create a TLS client configuration for libp2p. pub fn make_client_config( @@ -80,23 +79,3 @@ pub fn make_server_config( Ok(crypto) } - -/// Create a TLS server configuration for libp2p. -pub fn make_server_config_with_cert( - certificate: Certificate, - private_key: PrivateKey, - alpn: Vec> -) -> Result { - let mut crypto = rustls::ServerConfig::builder() - .with_cipher_suites(verifier::CIPHERSUITES) - .with_safe_default_kx_groups() - .with_protocol_versions(verifier::PROTOCOL_VERSIONS) - .expect("Cipher suites and kx groups are configured; qed") - .with_client_cert_verifier(Arc::new(verifier::Libp2pCertificateVerifier::new())) - .with_single_cert(vec![certificate], private_key) - .expect("Server cert key DER is valid; qed"); - - crypto.alpn_protocols = alpn; - - Ok(crypto) -} diff --git a/transports/tls/src/verifier.rs b/transports/tls/src/verifier.rs index 01fdb8fdf11..211efe76985 100644 --- a/transports/tls/src/verifier.rs +++ b/transports/tls/src/verifier.rs @@ -42,11 +42,11 @@ use std::sync::Arc; /// /// > The libp2p handshake uses TLS 1.3 (and higher). /// > Endpoints MUST NOT negotiate lower TLS versions. -pub(crate) static PROTOCOL_VERSIONS: &[&SupportedProtocolVersion] = &[&rustls::version::TLS13]; +pub static PROTOCOL_VERSIONS: &[&SupportedProtocolVersion] = &[&rustls::version::TLS13]; /// A list of the TLS 1.3 cipher suites supported by rustls. // By default rustls creates client/server configs with both // TLS 1.3 __and__ 1.2 cipher suites. But we don't need 1.2. -pub(crate) static CIPHERSUITES: &[SupportedCipherSuite] = &[ +pub static CIPHERSUITES: &[SupportedCipherSuite] = &[ // TLS1.3 suites TLS13_CHACHA20_POLY1305_SHA256, TLS13_AES_256_GCM_SHA384, @@ -56,7 +56,7 @@ pub(crate) static CIPHERSUITES: &[SupportedCipherSuite] = &[ /// Implementation of the `rustls` certificate verification traits for libp2p. /// /// Only TLS 1.3 is supported. TLS 1.2 should be disabled in the configuration of `rustls`. -pub(crate) struct Libp2pCertificateVerifier { +pub struct Libp2pCertificateVerifier { /// The peer ID we intend to connect to remote_peer_id: Option, } @@ -68,7 +68,7 @@ pub(crate) struct Libp2pCertificateVerifier { /// - The certificate must have a valid libp2p extension that includes a /// signature of its public key. impl Libp2pCertificateVerifier { - pub(crate) fn new() -> Self { + pub fn new() -> Self { Self { remote_peer_id: None, } From 10d97fe27c9dad385ca25885d65b0e739e3b06ad Mon Sep 17 00:00:00 2001 From: dgarus Date: Mon, 25 Dec 2023 12:33:47 +0300 Subject: [PATCH 5/7] noise --- transports/quic/src/certificate_manager.rs | 1 - transports/quic/src/transport.rs | 13 ++-- .../quic/src/webtransport/connecting.rs | 14 ++--- .../quic/src/webtransport/connection.rs | 59 ++++++++----------- transports/quic/src/webtransport/stream.rs | 2 +- 5 files changed, 37 insertions(+), 52 deletions(-) diff --git a/transports/quic/src/certificate_manager.rs b/transports/quic/src/certificate_manager.rs index 0a17716c2cc..0d93b393e8a 100644 --- a/transports/quic/src/certificate_manager.rs +++ b/transports/quic/src/certificate_manager.rs @@ -29,7 +29,6 @@ use libp2p_tls::{ certificate, P2P_ALPN, verifier, }; -const SHA256: &str = "sha-256"; const MULTIHASH_SHA256_CODE: u64 = 0x12; const CERT_VALID_PERIOD: Duration = Duration::days(14); diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 45c3f44b9f0..697a0a9925b 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -297,14 +297,13 @@ impl Transport for GenTransport

{ client_config.version(0xff00_001d); } - // This `"l"` seems necessary because an empty string is an invalid domain - // name. While we don't use domain names, the underlying rustls library - // is based upon the assumption that we do. - let connecting = endpoint - .connect_with(client_config, socket_addr, "l") - .map_err(ConnectError).unwrap(); // todo should be `?` - Ok(Box::pin(async move { + // This `"l"` seems necessary because an empty string is an invalid domain + // name. While we don't use domain names, the underlying rustls library + // is based upon the assumption that we do. + let connecting = endpoint + .connect_with(client_config, socket_addr, "l") + .map_err(ConnectError)?; Connecting::new(connecting, handshake_timeout).await })) } diff --git a/transports/quic/src/webtransport/connecting.rs b/transports/quic/src/webtransport/connecting.rs index c808a3dce7e..3a912b9c932 100644 --- a/transports/quic/src/webtransport/connecting.rs +++ b/transports/quic/src/webtransport/connecting.rs @@ -39,7 +39,6 @@ use h3_webtransport::server::WebTransportSession; use http::{Method, Request}; use libp2p_core::muxing::StreamMuxerBox; -use libp2p_core::upgrade::InboundConnectionUpgrade; use libp2p_identity::PeerId; use crate::{Error, webtransport}; @@ -49,7 +48,7 @@ const WEBTRANSPORT_PATH: &str = "/.well-known/libp2p-webtransport"; const NOISE_QUERY: &str = "type=noise"; // #[derive(Debug)] -pub struct Connecting { +pub(crate) struct Connecting { connecting: Select< BoxFuture<'static, Result<(PeerId, StreamMuxerBox), WebTransportError>>, Delay @@ -96,10 +95,9 @@ fn remote_peer_id(connection: &quinn::Connection) -> PeerId { p2p_cert.peer_id() } -async fn web_transport_connection(connecting: quinn::Connecting, _noise: libp2p_noise::Config) +async fn web_transport_connection(connecting: quinn::Connecting, noise: libp2p_noise::Config) -> Result<(PeerId, StreamMuxerBox), WebTransportError> { - let quic_conn = connecting.await - .map_err(|e| WebTransportError::ConnectionError(e))?; + let quic_conn = connecting.await?; // info!("new http3 established"); @@ -123,9 +121,7 @@ async fn web_transport_connection(connecting: quinn::Connecting, _noise: libp2p_ if check_request(&request) { let session = WebTransportSession::accept(request, stream, h3_conn) .await?; - let connection = webtransport::Connection::new(session); - - // let (_, out) = noise.upgrade_inbound(muxer, "").await?; + let connection = webtransport::Connection::new(session, noise); Ok((peer_id, StreamMuxerBox::new(connection))) } else { @@ -149,7 +145,7 @@ fn check_request(req: &Request<()>) -> bool { } // #[derive(Debug)] -pub enum WebTransportError { +pub(crate) enum WebTransportError { UnexpectedProtocol(Option), BadRequest(Method), ConnectionError(quinn::ConnectionError), diff --git a/transports/quic/src/webtransport/connection.rs b/transports/quic/src/webtransport/connection.rs index cb436187bcd..dd66caae83b 100644 --- a/transports/quic/src/webtransport/connection.rs +++ b/transports/quic/src/webtransport/connection.rs @@ -30,21 +30,19 @@ use std::{ }; use std::sync::Arc; use h3::quic::BidiStream; +use libp2p_core::upgrade::InboundConnectionUpgrade; +use libp2p_noise::Output; use crate::webtransport::stream::Stream; /// State for a single opened WebTransport session. -pub struct Connection { +pub(crate) struct Connection { /// Underlying connection. session: Arc>, + //Noise config to auth incoming connections. + noise: libp2p_noise::Config, /// Future for accepting a new incoming bidirectional stream. incoming: Option< - BoxFuture<'static, (h3_webtransport::stream::SendStream, Bytes>, - h3_webtransport::stream::RecvStream)>, - >, - /// Future for opening a new outgoing bidirectional stream. - outgoing: Option< - BoxFuture<'static, (h3_webtransport::stream::SendStream, Bytes>, - h3_webtransport::stream::RecvStream)>, + BoxFuture<'static, Output>, >, /// Future to wait for the connection to be closed. closing: Option>, @@ -55,18 +53,21 @@ impl Connection { /// /// This function assumes that the [`quinn::Connection`] is completely fresh and none of /// its methods has ever been called. Failure to comply might lead to logic errors and panics. - pub(crate) fn new(session: WebTransportSession) -> Self { + pub(crate) fn new( + session: WebTransportSession, + noise: libp2p_noise::Config, + ) -> Self { Self { session: Arc::new(session), + noise, incoming: None, - outgoing: None, closing: None, } } } impl StreamMuxer for Connection { - type Substream = Stream; + type Substream = Output; type Error = Error; fn poll_inbound( @@ -75,45 +76,35 @@ impl StreamMuxer for Connection { ) -> Poll> { let this = self.get_mut(); let t_session = Arc::clone(&this.session); + let t_noise = this.noise.clone(); let incoming = this.incoming.get_or_insert_with(|| { async move { let res = t_session.accept_bi().await.unwrap(); match res { Some(h3_webtransport::server::AcceptedBi::BidiStream(_, stream)) => { - stream.split() + let (send, recv) = stream.split(); + let stream = Stream::new(send, recv); + + // todo should we apply `handshake_timeout` here? + let (_peer_id, out) = t_noise.upgrade_inbound(stream, "").await.unwrap(); + + out } _ => unreachable!("fix me!") } }.boxed() }); - let (send, recv) = futures::ready!(incoming.poll_unpin(cx)); + let res = futures::ready!(incoming.poll_unpin(cx)); this.incoming.take(); - let stream = Stream::new(send, recv); - Poll::Ready(Ok(stream)) + Poll::Ready(Ok(res)) } fn poll_outbound( self: Pin<&mut Self>, - cx: &mut Context<'_>, + _cx: &mut Context<'_>, ) -> Poll> { - let this = self.get_mut(); - - let session_id = this.session.session_id(); - let t_session = Arc::clone(&this.session); - - let outgoing = this.outgoing.get_or_insert_with(|| { - async move { - let stream = t_session.open_bi(session_id).await.unwrap(); - - stream.split() - }.boxed() - }); - - let (send, recv) = futures::ready!(outgoing.poll_unpin(cx)); - this.outgoing.take(); - let stream = Stream::new(send, recv); - Poll::Ready(Ok(stream)) + panic!("WebTransport implementation doesn't support outbound streams.") } fn poll( @@ -125,7 +116,7 @@ impl StreamMuxer for Connection { Poll::Pending } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { /*let this = self.get_mut(); let closing = this.closing.get_or_insert_with(|| { diff --git a/transports/quic/src/webtransport/stream.rs b/transports/quic/src/webtransport/stream.rs index 03199c1eb11..e9bd6e93aa3 100644 --- a/transports/quic/src/webtransport/stream.rs +++ b/transports/quic/src/webtransport/stream.rs @@ -27,7 +27,7 @@ use bytes::Bytes; use futures::{AsyncRead, AsyncWrite}; /// A single stream on a connection -pub struct Stream { +pub(crate) struct Stream { /// A send part of the stream send: h3_webtransport::stream::SendStream, Bytes>, /// A receive part of the stream From d418dc18963b5ee9646587134da49bba529266fe Mon Sep 17 00:00:00 2001 From: dgarus Date: Mon, 25 Dec 2023 17:03:48 +0300 Subject: [PATCH 6/7] single endpoint for quic and wt --- transports/quic/src/transport.rs | 44 +++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index 697a0a9925b..c1cef3ca3f9 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -139,6 +139,20 @@ impl GenTransport

{ Ok((socket_addr, version, peer_id, is_webtransport_addr)) } + /// Gets an endpoint with the same `socket_addr`. + /// To check that an endpoint uses the same cert we can compare cert hashes. + fn get_existed_endpoint(&mut self, socket_addr: &SocketAddr, cert_hashes: &Vec>) -> Option { + if let Some(listener) = self.listeners + .iter_mut() + .find(|l| { + !l.is_closed && &l.socket_addr() == socket_addr && l.same_cert_hashes(cert_hashes) + }) { + return Some(listener.endpoint.clone()) + } + + None + } + /// Pick any listener to use for dialing. fn eligible_listener(&mut self, socket_addr: &SocketAddr) -> Option<&mut Listener

> { let mut listeners: Vec<_> = self @@ -203,18 +217,23 @@ impl Transport for GenTransport

{ ) -> Result<(), TransportError> { let (socket_addr, version, _peer_id, is_webtransport_addr) = self.remote_multiaddr_to_socketaddr(addr, false)?; + let socket = self.create_socket(socket_addr).map_err(Self::Error::from)?; + let socket_c = socket.try_clone().map_err(Self::Error::from)?; + let (server_config, noise, cert_hashes) = self.config.server_quinn_config() .map_err(|e| Error::CertificateGenerationError(e))?; - let socket = self.create_socket(socket_addr).map_err(Self::Error::from)?; + let endpoint = match self.get_existed_endpoint(&socket_addr, &cert_hashes) { + Some(res) => res, + None => Self::new_endpoint(self.config.endpoint_config(), Some(server_config), socket)?, + }; - let socket_c = socket.try_clone().map_err(Self::Error::from)?; - let endpoint = Self::new_endpoint(self.config.endpoint_config(), Some(server_config), socket)?; let webtransport = WebTransport { in_use: is_webtransport_addr, cert_hashes, noise, }; + let listener = Listener::new( listener_id, socket_c, @@ -531,6 +550,19 @@ impl Listener

{ .expect("Cannot fail because the socket is bound") } + fn same_cert_hashes(&self, cert_hashes: &Vec>) -> bool { + if self.webtransport.cert_hashes.len() != cert_hashes.len() { + return false + } + let mut set: HashSet> = self.webtransport.cert_hashes + .clone().into_iter().collect(); + for hash in cert_hashes { + set.remove(hash); + } + + set.is_empty() + } + /// Poll for a next If Event. fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Poll<::Item> { let endpoint_addr = self.socket_addr(); @@ -609,17 +641,15 @@ impl Stream for Listener

{ local_addr = self.webtransport.update_multiaddr(local_addr); let remote_addr = connecting.remote_address(); - - //todo It's unclear should I add anything to a send_back_addr? - let send_back_addr = socketaddr_to_multiaddr( &remote_addr, self.version, ); let timeout = self.handshake_timeout.clone(); - let noise = self.webtransport.noise.clone(); let fut = if self.webtransport.in_use { + let noise = self.webtransport.noise.clone(); + async move { webtransport::Connecting::new( noise, From dc08909cc213f1b9c5bb563974a69253b3920918 Mon Sep 17 00:00:00 2001 From: dgarus Date: Thu, 25 Jan 2024 15:09:13 +0300 Subject: [PATCH 7/7] msg --- transports/quic/src/certificate_manager.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/transports/quic/src/certificate_manager.rs b/transports/quic/src/certificate_manager.rs index 0d93b393e8a..13f29c9d196 100644 --- a/transports/quic/src/certificate_manager.rs +++ b/transports/quic/src/certificate_manager.rs @@ -60,7 +60,7 @@ impl ServerCertManager { self.check_and_roll_items()?; let cur_item = self.items.first() - .expect("Element with index 0 exists"); + .expect("The first element exists"); let cert_hashes = self.items.iter() .map(|item| item.cert_hash.clone()) .collect(); @@ -94,7 +94,7 @@ impl ServerCertManager { Ok(CertItem { server_tls_config: Arc::new(tls_config), start, end: not_after, cert_hash }) } - ///https://github.com/libp2p/specs/tree/master/webtransport#certificates + /// https://github.com/libp2p/specs/tree/master/webtransport#certificates /// Servers need to take care of regularly renewing their certificates.At first boot of the node, /// it creates one self-signed certificate with a validity of 14 days, starting immediately, /// and another certificate with the 14 day validity period starting on the expiry date of the first certificate.