From d4e1182141a06ae79d4b03e1c4cee4b02f0b8182 Mon Sep 17 00:00:00 2001 From: Jean-Pierre Smith Date: Sun, 17 Dec 2023 17:02:12 +0100 Subject: [PATCH 1/6] feat: add datagram trait and path aware datagram variant --- .../scion-proto/src/address/scion_address.rs | 16 + .../scion-proto/src/address/socket_address.rs | 16 + crates/scion-proto/src/path.rs | 27 +- crates/scion-proto/src/path/dataplane.rs | 39 +- crates/scion/Cargo.toml | 1 + crates/scion/src/lib.rs | 1 + crates/scion/src/pan.rs | 142 +++++++ crates/scion/src/pan/datagram.rs | 221 +++++++++++ crates/scion/src/pan/error.rs | 86 ++++ crates/scion/src/pan/path_service.rs | 30 ++ crates/scion/src/udp_socket.rs | 369 +++++++----------- crates/scion/tests/test_udp_socket.rs | 27 +- 12 files changed, 727 insertions(+), 248 deletions(-) create mode 100644 crates/scion/src/pan.rs create mode 100644 crates/scion/src/pan/datagram.rs create mode 100644 crates/scion/src/pan/error.rs create mode 100644 crates/scion/src/pan/path_service.rs diff --git a/crates/scion-proto/src/address/scion_address.rs b/crates/scion-proto/src/address/scion_address.rs index 7c1b1e4..7d25ac0 100644 --- a/crates/scion-proto/src/address/scion_address.rs +++ b/crates/scion-proto/src/address/scion_address.rs @@ -62,6 +62,16 @@ impl ScionAddr { } } +impl AsRef for ScionAddr { + fn as_ref(&self) -> &IsdAsn { + match self { + ScionAddr::V4(addr) => addr.as_ref(), + ScionAddr::V6(addr) => addr.as_ref(), + ScionAddr::Svc(addr) => addr.as_ref(), + } + } +} + macro_rules! impl_from { ($base:ty, $variant:expr) => { impl From<$base> for ScionAddr { @@ -155,6 +165,12 @@ macro_rules! scion_address { } } + impl AsRef for $name { + fn as_ref(&self) -> &IsdAsn { + &self.isd_asn + } + } + impl core::fmt::Display for $name { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{},{}", self.isd_asn(), self.host()) diff --git a/crates/scion-proto/src/address/socket_address.rs b/crates/scion-proto/src/address/socket_address.rs index 4392354..e19c0da 100644 --- a/crates/scion-proto/src/address/socket_address.rs +++ b/crates/scion-proto/src/address/socket_address.rs @@ -113,6 +113,16 @@ impl SocketAddr { } } +impl AsRef for SocketAddr { + fn as_ref(&self) -> &IsdAsn { + match self { + SocketAddr::V4(addr) => addr.as_ref(), + SocketAddr::V6(addr) => addr.as_ref(), + SocketAddr::Svc(addr) => addr.as_ref(), + } + } +} + impl From for SocketAddr { /// Converts a [`SocketAddrV4`] into a [`SocketAddr::V4`]. #[inline] @@ -244,6 +254,12 @@ macro_rules! socket_address { Ok(Self {scion_addr, port }) } } + + impl AsRef for $name { + fn as_ref(&self) -> &IsdAsn { + self.scion_addr.as_ref() + } + } }; } diff --git a/crates/scion-proto/src/path.rs b/crates/scion-proto/src/path.rs index b8e323d..6de92c8 100644 --- a/crates/scion-proto/src/path.rs +++ b/crates/scion-proto/src/path.rs @@ -3,7 +3,7 @@ //! This module contains types for SCION paths and metadata as well as encoding and decoding //! functions. -use std::net::SocketAddr; +use std::{net::SocketAddr, ops::Deref}; use bytes::Bytes; use scion_grpc::daemon::v1 as daemon_grpc; @@ -27,7 +27,7 @@ pub mod epic; pub use epic::EpicAuths; /// A SCION end-to-end path with optional metadata. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub struct Path { /// The raw bytes to be added as the path header to SCION dataplane packets. pub dataplane_path: DataplanePath, @@ -125,6 +125,29 @@ impl Path { } } +impl From> for Path { + fn from(value: Path<&mut [u8]>) -> Self { + Self { + dataplane_path: value.dataplane_path.into(), + underlay_next_hop: value.underlay_next_hop, + isd_asn: value.isd_asn, + metadata: value.metadata, + } + } +} + +impl PartialEq for Path +where + T: Deref, +{ + fn eq(&self, other: &Self) -> bool { + self.dataplane_path == other.dataplane_path + && self.underlay_next_hop == other.underlay_next_hop + && self.isd_asn == other.isd_asn + && self.metadata == other.metadata + } +} + #[cfg(test)] mod tests { use std::net::{IpAddr, Ipv4Addr}; diff --git a/crates/scion-proto/src/path/dataplane.rs b/crates/scion-proto/src/path/dataplane.rs index da50571..f6d6d6b 100644 --- a/crates/scion-proto/src/path/dataplane.rs +++ b/crates/scion-proto/src/path/dataplane.rs @@ -61,7 +61,7 @@ impl From for PathType { pub struct UnsupportedPathType(pub u8); /// Dataplane path found in a SCION packet. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub enum DataplanePath { /// The empty path type, used for intra-AS hops. EmptyPath, @@ -131,6 +131,17 @@ where } } + /// Reverse the path to the provided slice. + /// + /// Unsupported path types are copied to the slice, as is. + pub fn reverse_to_slice<'b>(&self, buffer: &'b mut [u8]) -> DataplanePath<&'b mut [u8]> { + match self { + DataplanePath::EmptyPath => DataplanePath::EmptyPath, + DataplanePath::Standard(path) => DataplanePath::Standard(path.reverse_to_slice(buffer)), + DataplanePath::Unsupported { .. } => self.copy_to_slice(buffer), + } + } + /// Reverses the path. pub fn to_reversed(&self) -> Result { match self { @@ -188,6 +199,30 @@ impl From for DataplanePath { } } +impl PartialEq> for DataplanePath +where + T: Deref, + U: Deref, +{ + fn eq(&self, other: &DataplanePath) -> bool { + match (self, other) { + (Self::Standard(lhs), DataplanePath::Standard(rhs)) => lhs.raw() == rhs.raw(), + ( + Self::Unsupported { + path_type: l_path_type, + bytes: l_bytes, + }, + DataplanePath::Unsupported { + path_type: r_path_type, + bytes: r_bytes, + }, + ) => l_path_type == r_path_type && l_bytes.deref() == r_bytes.deref(), + (Self::EmptyPath, DataplanePath::EmptyPath) => true, + _ => false, + } + } +} + impl WireEncode for DataplanePath { type Error = InadequateBufferSize; @@ -275,7 +310,7 @@ mod tests { #[test] fn reverse_empty() { - let dataplane_path = DataplanePath::EmptyPath; + let dataplane_path = DataplanePath::::EmptyPath; let reverse_path = dataplane_path.to_reversed().unwrap(); assert_eq!(dataplane_path, reverse_path); assert_eq!(reverse_path.to_reversed().unwrap(), dataplane_path); diff --git a/crates/scion/Cargo.toml b/crates/scion/Cargo.toml index 95157c5..dd321fd 100644 --- a/crates/scion/Cargo.toml +++ b/crates/scion/Cargo.toml @@ -6,6 +6,7 @@ license = "Apache-2.0" publish = false [dependencies] +async-trait = "0.1.74" bytes = "1.5.0" chrono = { workspace = true, features = ["clock"] } scion-grpc = { version = "0.1.0", path = "../scion-grpc" } diff --git a/crates/scion/src/lib.rs b/crates/scion/src/lib.rs index 4a4ca1d..2a6ea0e 100644 --- a/crates/scion/src/lib.rs +++ b/crates/scion/src/lib.rs @@ -2,4 +2,5 @@ pub mod daemon; pub mod dispatcher; +pub mod pan; pub mod udp_socket; diff --git a/crates/scion/src/pan.rs b/crates/scion/src/pan.rs new file mode 100644 index 0000000..a7d1ac5 --- /dev/null +++ b/crates/scion/src/pan.rs @@ -0,0 +1,142 @@ +//! Path aware networking socket and services. +mod datagram; +pub use datagram::{AsyncScionDatagram, PathAwareDatagram}; + +mod path_service; +pub use path_service::AsyncPathService; + +mod error; +pub use error::{PathErrorKind, ReceiveError, SendError}; + +// use std::{ +// borrow::Borrow, +// collections::{HashMap, VecDeque}, +// marker::PhantomData, +// sync::Arc, +// }; +// +// use bytes::Bytes; +// use scion_proto::{ +// address::{IsdAsn, SocketAddr}, +// path::Path, +// }; + +// TODO(jsmith): + +// pub struct PathLookupError; +// +// /// Trait for retrieving paths to SCION ASes. +// #[async_trait::async_trait] +// pub trait PathService { +// /// Return a path to the specified AS. +// async fn path_to(&self, scion_as: IsdAsn) -> Result; +// +// /// Propose a path to the service. +// /// +// /// The service may or may not choose to store the path. +// fn add_path(&self, path: &Path); +// +// /// Notify the service of a path-related SCMP message. +// fn on_scmp(&self, _args: ()) { +// todo!() +// } +// } +// +// #[derive(Debug, Default)] +// pub struct PathSet { +// paths: HashMap>, +// } +// +// impl PathSet { +// pub fn new() -> Self { +// Self::default() +// } +// } +// +// #[async_trait::async_trait] +// impl PathService for PathSet { +// async fn path_to(&self, scion_as: IsdAsn) -> Result { +// todo!() +// } +// +// fn add_path(&self, path: &Path) { +// todo!() +// } +// } +// +// +// impl PathAwareDatagram +// where +// D: ScionDatagramSocket, +// P: PathService, +// { +// pub fn new(datagram_socket: D, path_service: Arc

) -> Self { +// Self { +// datagram_socket, +// path_service, +// } +// } +// +// pub fn set_path_service(&mut self, path_service: Arc

) { +// self.path_service = path_service; +// } +// +// // pub fn set_path_service(&mut self, path_service: P) { +// // self.path_service = path_service; +// // } +// } +// +// // #[async_trait::async_trait] +// // impl ScionDatagramSocket for PathAwareDatagram +// // where +// // D: ScionDatagramSocket, +// // P: PathService, +// // { +// // async fn recv_with_path(&self, buffer: &mut [u8]) -> Result<(usize, Path), ReceiveError> { +// // self.datagram_socket.recv_with_path(buffer).await +// // } +// // // async fn recv_from_with_path( +// // // &self, +// // // buffer: &mut [u8], +// // // ) -> Result<(usize, SocketAddr, Path), Self::RecvErr>; +// // // async fn send_via(&self, payload: Bytes, path: &Path) -> Result<(), Self::SendErr>; +// // // async fn send_to_via( +// // // &self, +// // // payload: Bytes, +// // // destination: SocketAddr, +// // // path: &Path, +// // // ) -> Result<(), Self::SendErr>; +// // } +// +// #[cfg(test)] +// mod tests { +// +// use std::time::Duration; +// +// use tokio::net::UnixStream; +// +// use super::*; +// use crate::{dispatcher::DispatcherStream, udp_socket::UdpSocket}; +// +// type TestResult = Result>; +// +// pub fn socket_from(source: SocketAddr) -> TestResult<(UdpSocket, DispatcherStream)> { +// let (inner, inner_remote) = UnixStream::pair()?; +// Ok(( +// UdpSocket::new(DispatcherStream::new(inner), source), +// DispatcherStream::new(inner_remote), +// )) +// } +// +// // #[test] +// // fn sanity() -> TestResult { +// // let (socket, _) = socket_from("[1-ff00:0:110,3.3.3.3]:8080".parse()?)?; +// // let dgram_socket2 = PathAwareDatagram::new(socket, Arc::new(PathSet::new())); +// // dgram_socket2.hello(); +// // +// // let (socket, _) = socket_from("[1-ff00:0:110,3.3.3.3]:8080".parse()?)?; +// // let dgram_socket2 = PathAwareDatagram::new(socket, Arc::new(PathSet::new())); +// // dgram_socket2.hello(); +// // Ok(()) +// // } +// } diff --git a/crates/scion/src/pan/datagram.rs b/crates/scion/src/pan/datagram.rs new file mode 100644 index 0000000..666327a --- /dev/null +++ b/crates/scion/src/pan/datagram.rs @@ -0,0 +1,221 @@ +use std::{io::ErrorKind, sync::Arc}; + +use async_trait; +use bytes::Bytes; +use scion_proto::{ + address::IsdAsn, + path::{DataplanePath, Path}, +}; +use tokio::sync::Mutex; + +use super::{AsyncPathService, ReceiveError, SendError}; + +/// Interface for sending and receiving datagrams asynchronously on the SCION network. +#[async_trait::async_trait] +pub trait AsyncScionDatagram { + /// The type of the address used for sending and receiving datagrams. + type Addr: AsRef + Sync + Send; + + /// Receive a datagram, its sender, and path from the socket. + /// + /// The payload of the datagram is written into the provided buffer, which must not be + /// empty. If there is insufficient space in the buffer, excess data may be dropped. + /// + /// This function returns the number of bytes in the payload (irrespective of whether any + /// were dropped), the address of the sender, and the SCION [`Path`] over which the packet + /// was received. + /// + /// The returned path corresponds to the reversed path observed in the packet for known path + /// types, or a copy of the opaque path data for unknown path types. In either case, the raw + /// raw data comprising the returned path is written to path_buffer, which must be at least + /// [DataplanePath::MAX_LEN][`DataplanePath::::MAX_LEN`] bytes in length. + async fn recv_from_with_path<'p>( + &self, + buffer: &mut [u8], + path_buffer: &'p mut [u8], + ) -> Result<(usize, Self::Addr, Path<&'p mut [u8]>), ReceiveError>; + + /// Receive a datagram and its sender. + /// + /// This behaves like [`Self::recv_from_with_path`] but does not return the path over which + /// the packet was received. + /// + /// In the case where the path is not needed, this method should be used as the + /// implementation may avoid copying the path. + /// + /// See [`Self::recv_from_with_path`] for more information. + async fn recv_from(&self, buffer: &mut [u8]) -> Result<(usize, Self::Addr), ReceiveError>; + + /// Receive a datagram and its path from the socket. + /// + /// Similar to [`Self::recv_from_with_path`], this receives the datagram into the provided + /// buffer. However, as this does not return any information about the sender, this is + /// primarily used where the sender is already known, such as with connected sockets + /// + /// See [`Self::recv_from_with_path`] for more information. + async fn recv_with_path<'p>( + &self, + buffer: &mut [u8], + path_buffer: &'p mut [u8], + ) -> Result<(usize, Path<&'p mut [u8]>), ReceiveError> { + let (len, _, path) = self.recv_from_with_path(buffer, path_buffer).await?; + Ok((len, path)) + } + + /// Receive a datagram. + /// + /// Similar to [`Self::recv_from_with_path`], this receives the datagram into the provided + /// buffer. However, as this does not return any information about the sender, this is + /// primarily used where the sender is already known, such as with connected sockets + /// + /// In the case where neither the path nor the sender is needed, this method should be used + /// instead of [`Self::recv_with_path`] as the implementation may avoid copying the path. + /// + /// See [`Self::recv_from_with_path`] for more information. + async fn recv(&self, buffer: &mut [u8]) -> Result { + let (len, _) = self.recv_from(buffer).await?; + Ok(len) + } + + /// Sends the payload to the specified remote address and using the specified path. + async fn send_to_via( + &self, + payload: Bytes, + destination: Self::Addr, + path: &Path, + ) -> Result<(), SendError>; + + /// Sends the payload using the specified path. + /// + /// This assumes that the underlying socket is aware of the destination, and will + /// return an error if the socket cannot identify the destination. + async fn send_via(&self, payload: Bytes, path: &Path) -> Result<(), SendError>; + + /// Returns the remote address of the socket, if any. + fn remote_addr(&self) -> Option; +} + +/// A SCION path-aware socket. +/// +/// This socket wraps an [`AsyncScionDatagram`] and [`AsyncPathService`] and handles providing +/// paths to the socket from the path service and notifying the path service of any observed +/// paths from the network as well as any issues related to the paths. +pub struct PathAwareDatagram { + socket: D, + path_service: Arc

, + path_buffer: Mutex>, +} + +impl PathAwareDatagram +where + D: AsyncScionDatagram + Send + Sync, + P: AsyncPathService + Send + Sync, +{ + /// Creates a new socket that wraps the provided socket and path service. + pub fn new(socket: D, path_service: Arc

) -> Self { + Self { + socket, + path_service, + path_buffer: Mutex::new(vec![0u8; DataplanePath::::MAX_LEN]), + } + } + + /// Changes the path service associated with this socket. + pub fn set_path_service(&mut self, path_service: Arc

) { + self.path_service = path_service; + } + + /// Returns the path service associated with this socket. + pub fn path_service(&self) -> &P { + &self.path_service + } + + /// Send a datagram using [`AsyncScionDatagram::send_to_via`] with a path from the path service. + pub async fn send_to( + &self, + payload: Bytes, + destination: ::Addr, + ) -> Result<(), SendError> { + let path = self.path_to(*destination.as_ref()).await?; + self.send_to_via(payload, destination, path).await + } + + /// Send a datagram using [`AsyncScionDatagram::send_via`] with a path from the path service. + pub async fn send(&self, payload: Bytes) -> Result<(), SendError> { + if let Some(remote_addr) = self.remote_addr() { + let path = self.path_to(*remote_addr.as_ref()).await?; + self.send_via(payload, path).await + } else { + Err(SendError::Io(ErrorKind::NotConnected.into())) + } + } + + async fn path_to(&self, remote_ia: IsdAsn) -> Result<&Path, SendError> { + self.path_service + .path_to(remote_ia) + .await + .map_err(|_err| todo!("handle path failures")) + } +} + +// TODO(jsmith): We could allow the AsyncPathService to disable receiving paths from the network. +// This could improve the performance in these special cases. +#[async_trait::async_trait] +impl AsyncScionDatagram for PathAwareDatagram +where + D: AsyncScionDatagram + Send + Sync, + P: AsyncPathService + Send + Sync, +{ + type Addr = ::Addr; + + async fn recv_from_with_path<'p>( + &self, + buffer: &mut [u8], + path_buffer: &'p mut [u8], + ) -> Result<(usize, Self::Addr, Path<&'p mut [u8]>), ReceiveError> { + let (len, sender, path) = self.socket.recv_from_with_path(buffer, path_buffer).await?; + + self.path_service.maybe_add_path(&path); + + Ok((len, sender, path)) + } + + /// Receive a datagram and its sender. + /// + /// In order to observe the network path, this implementation of recv_from uses an internal + /// path_buffer guarded by a mutex. As a result, only one recv_from call can be ongoing at + /// a time. If multiple asynchronous recvs are desired, then use the recv_from_with_path + /// method instead. + /// + /// See the trait [`AsyncScionDatagram::recv_from_with_path`] for more information on the + /// method. + async fn recv_from(&self, buffer: &mut [u8]) -> Result<(usize, Self::Addr), ReceiveError> { + // TODO(jsmith): Determine if we need to remove this mutex. + // The use of this mutex means only a single recv_from call be in progress at once. To await + // allow multiple calls we would likely need multiple buffers. + let mut path_buffer = self.path_buffer.lock().await; + let (len, sender, _) = self + .recv_from_with_path(buffer, path_buffer.as_mut_slice()) + .await?; + Ok((len, sender)) + } + + async fn send_to_via( + &self, + payload: Bytes, + destination: Self::Addr, + path: &Path, + ) -> Result<(), SendError> { + self.path_service.maybe_add_shared_path(path); + self.socket.send_to_via(payload, destination, path).await + } + + async fn send_via(&self, payload: Bytes, path: &Path) -> Result<(), SendError> { + self.path_service.maybe_add_shared_path(path); + self.socket.send_via(payload, path).await + } + + fn remote_addr(&self) -> Option { + self.socket.remote_addr() + } +} diff --git a/crates/scion/src/pan/error.rs b/crates/scion/src/pan/error.rs new file mode 100644 index 0000000..be04a89 --- /dev/null +++ b/crates/scion/src/pan/error.rs @@ -0,0 +1,86 @@ +use std::{fmt::Display, io::ErrorKind}; + +use scion_proto::packet::{self, EncodeError}; + +use crate::dispatcher; + +/// Kinds of path-related failures that may occur when sending a packet. +#[derive(Debug)] +pub enum PathErrorKind { + /// The provided path has already expired. + Expired, + /// No path to the destination is available. + NoPath, + /// The path should have provided the next-hop in the underlay but did not. + /// + /// Currently, only intra-AS paths do not require a next-hop, all inter-AS paths do. + NoUnderlayNextHop, +} + +impl Display for PathErrorKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let description = match self { + PathErrorKind::Expired => "the provided path has already expired", + PathErrorKind::NoPath => "no path to the destination available", + PathErrorKind::NoUnderlayNextHop => "no underlay next hop provided by path", + }; + f.write_str(description) + } +} + +/// Error returned when attempting to send a datagram on the SCION network. +#[allow(missing_docs)] +#[derive(Debug, thiserror::Error)] +pub enum SendError { + /// An IO error raised from the OS or from the socket. + #[error(transparent)] + Io(#[from] std::io::Error), + /// An issue with the provided or fetched path. + #[error("issue with the provided path: {0}")] + PathIssue(PathErrorKind), + /// The packet is too large to be sent on the network. + #[error("packet is too large to be sent")] + PacketTooLarge, +} + +impl From for SendError { + fn from(value: PathErrorKind) -> Self { + SendError::PathIssue(value) + } +} + +impl From for SendError { + fn from(value: ErrorKind) -> Self { + Self::Io(value.into()) + } +} + +impl From for SendError { + fn from(value: dispatcher::SendError) -> Self { + match value { + dispatcher::SendError::Io(io) => Self::Io(io), + dispatcher::SendError::PayloadTooLarge(_) => Self::PacketTooLarge, + } + } +} + +impl From for SendError { + fn from(value: packet::EncodeError) -> Self { + match value { + EncodeError::PayloadTooLarge | EncodeError::HeaderTooLarge => Self::PacketTooLarge, + } + } +} + +/// Error messages returned from the UDP socket. +#[derive(Debug, thiserror::Error, PartialEq, Eq)] +pub enum ReceiveError { + /// A buffer with zero-length was provided to which to be written. + /// + /// Retry the operation with a buffer of non-zero length to receive datagrams. + #[error("attempted to receive with a zero-length buffer")] + ZeroLengthBuffer, + /// The provided path buffer does not meet the minimum length requirement. + #[error("path buffer too short")] + PathBufferTooShort, +} diff --git a/crates/scion/src/pan/path_service.rs b/crates/scion/src/pan/path_service.rs new file mode 100644 index 0000000..5942055 --- /dev/null +++ b/crates/scion/src/pan/path_service.rs @@ -0,0 +1,30 @@ +use bytes::Bytes; +use scion_proto::{address::IsdAsn, path::Path}; + +#[derive(Debug, thiserror::Error)] +pub enum PathLookupError {} + +/// Trait for asynchronously retrieving paths to SCION ASes. +#[async_trait::async_trait] +pub trait AsyncPathService { + /// Return a path to the specified AS. + async fn path_to(&self, scion_as: IsdAsn) -> Result<&Path, PathLookupError>; + + /// Provide the service with a path that it may choose to store. + /// + /// Returns true if the path was stored by the service. + /// + /// Since [`Path`] stores the raw path data as a Bytes object, this method allows + /// the service to cheaply clone the Path without copying the raw data. + /// + /// See [`Self::maybe_add_path`] for a variant that always copies the path. + fn maybe_add_shared_path(&self, path: &Path) -> bool; + + /// Provide the service with a path that it may choose to store. + /// + /// Similarly to [`Self::maybe_add_shared_path`] this returns true if the path was copied. + /// However, this method always copies the path to an owned variety for storage. + /// + /// See [`Self::maybe_add_shared_path`] for a variant which may avoid copying the path. + fn maybe_add_path(&self, path: &Path<&mut [u8]>) -> bool; +} diff --git a/crates/scion/src/udp_socket.rs b/crates/scion/src/udp_socket.rs index 7d9d4ed..8939738 100644 --- a/crates/scion/src/udp_socket.rs +++ b/crates/scion/src/udp_socket.rs @@ -4,7 +4,7 @@ use std::{ cmp, - io, + io::{self, ErrorKind}, sync::{Arc, RwLock}, }; @@ -13,14 +13,17 @@ use chrono::Utc; use scion_proto::{ address::SocketAddr, datagram::{UdpDatagram, UdpEncodeError}, - packet::{self, ByEndpoint, EncodeError, ScionPacketRaw, ScionPacketUdp}, - path::{DataplanePath, Path, UnsupportedPathType}, + packet::{ByEndpoint, ScionPacketRaw, ScionPacketUdp}, + path::{DataplanePath, Path}, reliable::Packet, wire_encoding::WireDecode, }; use tokio::sync::Mutex; -use crate::dispatcher::{self, get_dispatcher_path, DispatcherStream, RegistrationError}; +use crate::{ + dispatcher::{self, get_dispatcher_path, DispatcherStream, RegistrationError}, + pan::{AsyncScionDatagram, PathErrorKind, ReceiveError, SendError}, +}; #[allow(missing_docs)] #[derive(Debug, thiserror::Error)] @@ -31,32 +34,6 @@ pub enum BindError { RegistrationFailed(#[from] RegistrationError), } -#[allow(missing_docs)] -#[derive(Debug, thiserror::Error)] -pub enum SendError { - #[error(transparent)] - Io(#[from] std::io::Error), - #[error("packet is too large to be sent")] - PacketTooLarge, - #[error("path is expired")] - PathExpired, - #[error("remote address is not set")] - NotConnected, - #[error("path is not set")] - NoPath, - #[error("no underlay next hop provided by path")] - NoUnderlayNextHop, -} - -impl From for SendError { - fn from(value: dispatcher::SendError) -> Self { - match value { - dispatcher::SendError::Io(io) => Self::Io(io), - dispatcher::SendError::PayloadTooLarge(_) => Self::PacketTooLarge, - } - } -} - impl From for SendError { fn from(value: UdpEncodeError) -> Self { match value { @@ -65,14 +42,6 @@ impl From for SendError { } } -impl From for SendError { - fn from(value: packet::EncodeError) -> Self { - match value { - EncodeError::PayloadTooLarge | EncodeError::HeaderTooLarge => Self::PacketTooLarge, - } - } -} - #[derive(Debug)] pub struct UdpSocket { inner: Arc, @@ -104,148 +73,67 @@ impl UdpSocket { self.inner.local_addr() } - /// Receive a SCION UDP packet. - /// - /// The UDP payload is written into the provided buffer. If there is insufficient space, excess - /// data is dropped. The returned number of bytes always refers to the amount of data in the UDP - /// payload. - pub async fn recv(&self, buffer: &mut [u8]) -> Result { - let (packet_len, _) = self.recv_from(buffer).await?; - Ok(packet_len) + /// Registers a remote address for this socket. + pub fn connect(&self, remote_address: SocketAddr) { + self.inner.set_remote_address(Some(remote_address)); } - /// Receive a SCION UDP packet from a remote endpoint. - /// - /// This behaves like [`Self::recv`] but additionally returns the remote SCION socket address. - pub async fn recv_from(&self, buffer: &mut [u8]) -> Result<(usize, SocketAddr), ReceiveError> { - self.inner - .recv_loop(buffer, None) - .await - .map(|(len, addr, _)| (len, addr)) + /// Clears the association, if any, with the remote address. + pub fn disconnect(&self) { + self.inner.set_remote_address(None); } +} - /// Receive a SCION UDP packet from a remote endpoint with path information. - /// - /// This behaves like [`Self::recv`] but additionally returns - /// - the remote SCION socket address and - /// - the path over which the packet was received. For supported path types, this path is - /// already reversed such that it can be used directly to send reply packets; for unsupported - /// path types, the path is copied unmodified. - pub async fn recv_from_with_path( +#[async_trait::async_trait] +impl AsyncScionDatagram for UdpSocket { + /// The type of the address used for sending and receiving datagrams. + type Addr = SocketAddr; + + async fn recv_from_with_path<'p>( &self, buffer: &mut [u8], - ) -> Result<(usize, SocketAddr, Path), ReceiveError> { + path_buffer: &'p mut [u8], + ) -> Result<(usize, Self::Addr, Path<&'p mut [u8]>), ReceiveError> { if buffer.is_empty() { return Err(ReceiveError::ZeroLengthBuffer); } - if buffer.len() < DataplanePath::::MAX_LEN { + if path_buffer.len() < DataplanePath::::MAX_LEN { return Err(ReceiveError::PathBufferTooShort); } - // TODO(jsmith): Refactor to accept two buffers and return a Path referring into one. - let split_point = buffer.len() - DataplanePath::::MAX_LEN; - let (buffer, path_buf) = buffer.split_at_mut(split_point); - - let (packet_len, sender, path) = self.inner.recv_loop(buffer, Some(path_buf)).await?; - let Path { - dataplane_path, - underlay_next_hop, - isd_asn, - .. - } = path.expect("non-None path since path_buf was provided"); - - // Explicit match here in case we add other errors to the `reverse` method at some point - let dataplane_path = match dataplane_path.to_reversed() { - Ok(reversed_dataplane) => reversed_dataplane, - Err(UnsupportedPathType(_)) => dataplane_path.into(), + let (len, sender, Some(path)) = self.inner.recv_loop(buffer, Some(path_buffer)).await? + else { + unreachable!("path is always returned when providing a buffer") }; - - Ok(( - packet_len, - sender, - Path::new(dataplane_path, isd_asn.into_reversed(), underlay_next_hop), - )) - } - - /// Receive a SCION UDP packet with path information. - /// - /// This behaves like [`Self::recv`] but additionally returns the path over which the packet was - /// received. For supported path types, this path is already reversed such that it can be used - /// directly to send reply packets; for unsupported path types, the path is copied unmodified. - pub async fn recv_with_path(&self, buffer: &mut [u8]) -> Result<(usize, Path), ReceiveError> { - let (packet_len, _, path) = self.recv_from_with_path(buffer).await?; - Ok((packet_len, path)) - } - - /// Returns the remote SCION address set for this socket, if any. - pub fn remote_addr(&self) -> Option { - self.inner.remote_addr() - } - - /// Returns the SCION path set for this socket, if any. - pub fn path(&self) -> Option { - self.inner.path() - } - - /// Registers a remote address for this socket. - pub fn connect(&self, remote_address: SocketAddr) { - self.inner.set_remote_address(Some(remote_address)); - } - - /// Clears the association, if any, with the remote address. - pub fn disconnect(&self) { - self.inner.set_remote_address(None); + Ok((len, sender, path)) } - /// Registers or clears a path for this socket. - pub fn set_path(&self, path: Option) -> &Self { - self.inner.set_path(path); - self - } - - /// Sends the payload using the registered remote address and path - /// - /// Returns an error if the remote address or path are unset - pub async fn send(&self, payload: Bytes) -> Result<(), SendError> { - self.inner.send_to_via(payload, None, None).await - } - - /// Sends the payload to the specified destination using the registered path - /// - /// Returns an error if the path is unset - pub async fn send_to(&self, payload: Bytes, destination: SocketAddr) -> Result<(), SendError> { + async fn recv_from(&self, buffer: &mut [u8]) -> Result<(usize, Self::Addr), ReceiveError> { self.inner - .send_to_via(payload, Some(destination), None) + .recv_loop(buffer, None) .await + .map(|(len, addr, _)| (len, addr)) } - /// Sends the payload to the registered destination using the specified path - /// - /// Returns an error if the remote address is unset - pub async fn send_via(&self, payload: Bytes, path: &Path) -> Result<(), SendError> { - self.inner.send_to_via(payload, None, Some(path)).await - } - - /// Sends the payload to the specified remote address and path - pub async fn send_to_via( + async fn send_to_via( &self, payload: Bytes, - destination: SocketAddr, + destination: Self::Addr, path: &Path, ) -> Result<(), SendError> { self.inner - .send_to_via(payload, Some(destination), Some(path)) + .send_to_via(payload, Some(destination), path) .await } -} -/// Error messages returned from the UDP socket. -#[derive(Debug, thiserror::Error, PartialEq, Eq)] -pub enum ReceiveError { - #[error("attempted to receive with a zero-length buffer")] - ZeroLengthBuffer, - #[error("path buffer too short")] - PathBufferTooShort, + async fn send_via(&self, payload: Bytes, path: &Path) -> Result<(), SendError> { + self.inner.send_to_via(payload, None, path).await + } + + /// Returns the remote address of the socket, if any. + fn remote_addr(&self) -> Option { + self.inner.remote_addr() + } } macro_rules! log_err { @@ -269,7 +157,6 @@ impl UdpSocketInner { state: RwLock::new(Arc::new(State { local_address, remote_address: None, - path: None, })), stream: Mutex::new(stream), } @@ -279,17 +166,16 @@ impl UdpSocketInner { &self, payload: Bytes, destination: Option, - path: Option<&Path>, + path: &Path, ) -> Result<(), SendError> { let state = self.state.read().unwrap().clone(); - let path = path.or(state.path.as_ref()).ok_or(SendError::NoPath)?; let Some(destination) = destination.or(state.remote_address) else { - return Err(SendError::NotConnected); + return Err(ErrorKind::NotConnected.into()); }; if let Some(metadata) = &path.metadata { if metadata.expiration < Utc::now() { - return Err(SendError::PathExpired); + return Err(PathErrorKind::Expired.into()); } } @@ -301,7 +187,7 @@ impl UdpSocketInner { socket_addr }) } else { - return Err(SendError::NoUnderlayNextHop); + return Err(PathErrorKind::NoUnderlayNextHop.into()); }; let packet = ScionPacketUdp::new( @@ -351,10 +237,15 @@ impl UdpSocketInner { { if let Some(path_buf) = path_buf { let path_len = path.dataplane_path.raw().len(); - let dataplane_path = - path.dataplane_path.copy_to_slice(&mut path_buf[..path_len]); - let path = - Path::new(dataplane_path, path.isd_asn, path.underlay_next_hop); + let dataplane_path = path + .dataplane_path + .reverse_to_slice(&mut path_buf[..path_len]); + + let path = Path::new( + dataplane_path, + path.isd_asn.into_reversed(), + path.underlay_next_hop, + ); return Ok((packet_len, sender, Some(path))); } else { return Ok((packet_len, sender, None)); @@ -433,27 +324,18 @@ impl UdpSocketInner { pub fn set_remote_address(&self, remote_address: Option) { Arc::make_mut(&mut *self.state.write().unwrap()).remote_address = remote_address; } - - pub fn path(&self) -> Option { - self.state.read().unwrap().path.clone() - } - - pub fn set_path(&self, path: Option) { - Arc::make_mut(&mut *self.state.write().unwrap()).path = path; - } } #[derive(Debug, Clone)] struct State { local_address: SocketAddr, remote_address: Option, - path: Option, } #[cfg(test)] mod tests { use scion_proto::path::DataplanePath; - use tokio::{net::UnixStream, sync::Notify}; + use tokio::net::UnixStream; use super::*; @@ -510,6 +392,22 @@ mod tests { Ok(()) } + + pub async fn recv_from_helper<'p>( + socket: &UdpSocket, + buffer: &mut [u8], + path_buffer: &'p mut [u8], + use_from: bool, + ) -> Result<(usize, Option, Path<&'p mut [u8]>), ReceiveError> { + if use_from { + let (len, remote_addr, path) = + socket.recv_from_with_path(buffer, path_buffer).await?; + Ok((len, Some(remote_addr), path)) + } else { + let (len, path) = socket.recv_with_path(buffer, path_buffer).await?; + Ok((len, None, path)) + } + } } macro_rules! async_test_case { @@ -588,12 +486,11 @@ mod tests { .await .expect_err("should fail on unconnected socket"); - assert!( - matches!(err, SendError::NotConnected), - "expected {:?}, got {:?}", - SendError::NotConnected, - err - ); + if let SendError::Io(io_err) = err { + assert_eq!(io_err.kind(), ErrorKind::NotConnected); + } else { + panic!("expected Io(ErrorKind::NotConnected), got {}", err); + } Ok(()) } @@ -629,23 +526,25 @@ mod tests { }; assert_eq!(endpoints.source.isd_asn(), endpoints.destination.isd_asn()); - let mut buffer = vec![0u8; 1500]; + let mut buffer = vec![0u8; 64]; + let mut path_buffer = vec![0u8; 1024]; let (socket, mut dispatcher) = utils::socket_from(endpoints.source)?; utils::local_send_raw(&mut dispatcher, endpoints, MESSAGE).await?; - let (length, incoming_remote_addr, incoming_path) = if use_from { - socket.recv_from_with_path(&mut buffer).await? - } else { - let res = socket.recv_with_path(&mut buffer).await?; - (res.0, endpoints.source, res.1) - }; + let (length, incoming_remote_addr, incoming_path) = + utils::recv_from_helper(&socket, &mut buffer, &mut path_buffer, use_from).await?; assert_eq!(&buffer[..length], MESSAGE); - assert_eq!(incoming_remote_addr, endpoints.source); - assert_eq!(incoming_path.dataplane_path, DataplanePath::EmptyPath); + assert_eq!( + incoming_path.dataplane_path, + DataplanePath::::EmptyPath + ); assert_eq!(incoming_path.isd_asn, endpoints.map(SocketAddr::isd_asn)); assert_eq!(incoming_path.metadata, None); assert_ne!(incoming_path.underlay_next_hop, None); + if let Some(incoming_remote_addr) = incoming_remote_addr { + assert_eq!(incoming_remote_addr, endpoints.source); + } Ok(()) } @@ -673,7 +572,8 @@ mod tests { }; assert_eq!(other_endpoints.source.isd_asn(), endpoints.source.isd_asn()); - let mut buffer = vec![0u8; 1500]; + let mut buffer = vec![0u8; 64]; + let mut path_buffer = vec![0u8; 1024]; let (socket, mut dispatcher) = utils::socket_from(endpoints.source)?; // Write packets to be received @@ -688,30 +588,26 @@ mod tests { // Connect to the remote source socket.connect(endpoints.source); - let length = if use_from { - let (length, remote_addr, _) = socket.recv_from_with_path(&mut buffer).await?; - assert_eq!(remote_addr, endpoints.source); - length - } else { - socket.recv_with_path(&mut buffer).await?.0 - }; + let (length, remote_addr, _) = + utils::recv_from_helper(&socket, &mut buffer, &mut path_buffer, use_from).await?; // The first packet received is the second packet written. assert_eq!(&buffer[..length], messages[1]); + if let Some(remote_addr) = remote_addr { + assert_eq!(remote_addr, endpoints.source); + } // Disconnect the association to receive packets with other addresses socket.disconnect(); - let length = if use_from { - let (length, remote_addr, _) = socket.recv_from_with_path(&mut buffer).await?; - assert_eq!(remote_addr, other_endpoints.source); - length - } else { - socket.recv_with_path(&mut buffer).await?.0 - }; + let (length, remote_addr, _) = + utils::recv_from_helper(&socket, &mut buffer, &mut path_buffer, use_from).await?; // The second packet packet received is the third packet written. assert_eq!(&buffer[..length], messages[2]); + if let Some(remote_addr) = remote_addr { + assert_eq!(remote_addr, other_endpoints.source); + } Ok(()) } @@ -739,18 +635,22 @@ mod tests { }; assert_eq!(endpoints.source.isd_asn(), endpoints.destination.isd_asn()); - let mut buffer = vec![0u8; 1500]; + let mut buffer = vec![0u8; 64]; + let mut path_buffer = vec![0u8; 1024]; + let (socket, mut dispatcher) = utils::socket_from(endpoints.source)?; utils::local_send_raw(&mut dispatcher, endpoints, MESSAGE).await?; let err = socket - .recv_from_with_path(&mut []) + .recv_from_with_path(&mut [], &mut path_buffer) .await .expect_err("should fail due to zero-length buffer"); assert_eq!(err, ReceiveError::ZeroLengthBuffer); // The data should still be available to read - let (length, incoming_remote_addr, _) = socket.recv_from_with_path(&mut buffer).await?; + let (length, incoming_remote_addr, _) = socket + .recv_from_with_path(&mut buffer, &mut path_buffer) + .await?; assert_eq!(&buffer[..length], MESSAGE); assert_eq!(incoming_remote_addr, endpoints.source); @@ -775,39 +675,40 @@ mod tests { } } - #[tokio::test] - async fn set_path() -> TestResult { - let local_addr: SocketAddr = "[1-f:0:1,9.8.7.6]:80".parse()?; - let (socket, _) = utils::socket_from(local_addr)?; - let path = Path::local(local_addr.isd_asn()); + // TODO(jsmith): Convert to test for for Pan socket + // #[tokio::test] + // async fn set_path() -> TestResult { + // let local_addr: SocketAddr = "[1-f:0:1,9.8.7.6]:80".parse()?; + // let (socket, _) = utils::socket_from(local_addr)?; + // let path = Path::local(local_addr.isd_asn()); - let notify = Arc::new(Notify::new()); - let notify2 = Arc::new(Notify::new()); + // let notify = Arc::new(Notify::new()); + // let notify2 = Arc::new(Notify::new()); - let (result1, result2) = tokio::join!( - async { - let initial = socket.path(); - socket.set_path(Some(path.clone())); - notify.notify_one(); + // let (result1, result2) = tokio::join!( + // async { + // let initial = socket.path(); + // socket.set_path(Some(path.clone())); + // notify.notify_one(); - notify2.notified().await; - let last_set = socket.path(); + // notify2.notified().await; + // let last_set = socket.path(); - (initial, last_set) - }, - async { - notify.notified().await; - let first_set = socket.path(); - socket.set_path(None); - notify2.notify_one(); + // (initial, last_set) + // }, + // async { + // notify.notified().await; + // let first_set = socket.path(); + // socket.set_path(None); + // notify2.notify_one(); - first_set - } - ); + // first_set + // } + // ); - assert_eq!(result1, (None, None)); - assert_eq!(result2, Some(path)); + // assert_eq!(result1, (None, None)); + // assert_eq!(result2, Some(path)); - Ok(()) - } + // Ok(()) + // } } diff --git a/crates/scion/tests/test_udp_socket.rs b/crates/scion/tests/test_udp_socket.rs index 038d399..6017755 100644 --- a/crates/scion/tests/test_udp_socket.rs +++ b/crates/scion/tests/test_udp_socket.rs @@ -3,6 +3,7 @@ use std::{sync::OnceLock, time::Duration}; use bytes::Bytes; use scion::{ daemon::{get_daemon_address, DaemonClient}, + pan::AsyncScionDatagram, udp_socket::UdpSocket, }; use scion_proto::{address::SocketAddr, packet::ByEndpoint, path::Path}; @@ -42,7 +43,6 @@ macro_rules! test_send_receive_reply { .next() .unwrap(); println!("Forward path: {:?}", path_forward.dataplane_path); - socket_source.set_path(Some(path_forward.clone())); Ok((socket_source, socket_destination, path_forward)) } @@ -52,8 +52,10 @@ macro_rules! test_send_receive_reply { async fn message() -> TestResult { let _lock = lock().lock().await; - let (socket_source, socket_destination, ..) = get_sockets().await?; - socket_source.send(MESSAGE.clone()).await?; + let (socket_source, socket_destination, path_forward) = get_sockets().await?; + socket_source + .send_via(MESSAGE.clone(), &path_forward) + .await?; let mut buffer = vec![0_u8; 1500]; let (length, sender) = @@ -70,12 +72,15 @@ macro_rules! test_send_receive_reply { let _lock = lock().lock().await; let (socket_source, socket_destination, path_forward) = get_sockets().await?; - socket_source.send(MESSAGE.clone()).await?; + socket_source + .send_via(MESSAGE.clone(), &path_forward) + .await?; - let mut buffer = vec![0_u8; 1500]; + let mut buffer = vec![0_u8; 128]; + let mut path_buffer = vec![0_u8; 1024]; let (length, sender, path) = tokio::time::timeout( TIMEOUT, - socket_destination.recv_from_with_path(&mut buffer), + socket_destination.recv_from_with_path(&mut buffer, &mut path_buffer), ) .await??; assert_eq!(sender, socket_source.local_addr()); @@ -83,12 +88,14 @@ macro_rules! test_send_receive_reply { println!("Reply path: {:?}", path.dataplane_path); socket_destination - .send_to_via(MESSAGE.clone(), sender, &path) + .send_to_via(MESSAGE.clone(), sender, &path.into()) .await?; - let (_, path_return) = - tokio::time::timeout(TIMEOUT, socket_source.recv_with_path(&mut buffer)) - .await??; + let (_, path_return) = tokio::time::timeout( + TIMEOUT, + socket_source.recv_with_path(&mut buffer, &mut path_buffer), + ) + .await??; assert_eq!(path_return.isd_asn, path_forward.isd_asn); assert_eq!(path_return.dataplane_path, path_forward.dataplane_path); From b4c3f81f5c8d0e4cc1df5d03cfc54338b3b0e1d1 Mon Sep 17 00:00:00 2001 From: Jean-Pierre Smith Date: Mon, 18 Dec 2023 11:38:10 +0100 Subject: [PATCH 2/6] feat: add path service implementation for static paths. --- crates/scion/src/pan/path_service.rs | 37 ++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/crates/scion/src/pan/path_service.rs b/crates/scion/src/pan/path_service.rs index 5942055..ae6abd7 100644 --- a/crates/scion/src/pan/path_service.rs +++ b/crates/scion/src/pan/path_service.rs @@ -1,14 +1,18 @@ use bytes::Bytes; +use chrono::Utc; use scion_proto::{address::IsdAsn, path::Path}; #[derive(Debug, thiserror::Error)] -pub enum PathLookupError {} +pub enum PathLookupError { + #[error("no path available to destination")] + NoPath, +} /// Trait for asynchronously retrieving paths to SCION ASes. #[async_trait::async_trait] pub trait AsyncPathService { /// Return a path to the specified AS. - async fn path_to(&self, scion_as: IsdAsn) -> Result<&Path, PathLookupError>; + async fn path_to<'a>(&'a self, scion_as: IsdAsn) -> Result<&'a Path, PathLookupError>; /// Provide the service with a path that it may choose to store. /// @@ -28,3 +32,32 @@ pub trait AsyncPathService { /// See [`Self::maybe_add_shared_path`] for a variant which may avoid copying the path. fn maybe_add_path(&self, path: &Path<&mut [u8]>) -> bool; } + +#[async_trait::async_trait] +impl AsyncPathService for Path { + /// Return a path to the specified AS. + async fn path_to(&self, scion_as: IsdAsn) -> Result<&Path, PathLookupError> { + if self.isd_asn.destination != scion_as { + return Err(PathLookupError::NoPath); + } + if let Some(metadata) = self.metadata.as_ref() { + if metadata.expiration < Utc::now() { + tracing::warn!( + destination=%scion_as, + path=?self, + "attempted to send packet with expired, static path" + ); + return Err(PathLookupError::NoPath); + } + } + Ok(self) + } + + fn maybe_add_shared_path(&self, _path: &Path) -> bool { + false + } + + fn maybe_add_path(&self, _path: &Path<&mut [u8]>) -> bool { + false + } +} From 0edd074be01821a91dc98fa230797a40b9e1b1a4 Mon Sep 17 00:00:00 2001 From: Jean-Pierre Smith Date: Tue, 19 Dec 2023 16:54:54 +0100 Subject: [PATCH 3/6] refactor: remove code for observing the network path --- crates/scion/src/pan.rs | 133 --------------------------- crates/scion/src/pan/datagram.rs | 51 +++------- crates/scion/src/pan/error.rs | 7 +- crates/scion/src/pan/path_service.rs | 28 +----- crates/scion/src/udp_socket.rs | 99 +++++++++----------- 5 files changed, 62 insertions(+), 256 deletions(-) diff --git a/crates/scion/src/pan.rs b/crates/scion/src/pan.rs index a7d1ac5..925e5f6 100644 --- a/crates/scion/src/pan.rs +++ b/crates/scion/src/pan.rs @@ -7,136 +7,3 @@ pub use path_service::AsyncPathService; mod error; pub use error::{PathErrorKind, ReceiveError, SendError}; - -// use std::{ -// borrow::Borrow, -// collections::{HashMap, VecDeque}, -// marker::PhantomData, -// sync::Arc, -// }; -// -// use bytes::Bytes; -// use scion_proto::{ -// address::{IsdAsn, SocketAddr}, -// path::Path, -// }; - -// TODO(jsmith): - -// pub struct PathLookupError; -// -// /// Trait for retrieving paths to SCION ASes. -// #[async_trait::async_trait] -// pub trait PathService { -// /// Return a path to the specified AS. -// async fn path_to(&self, scion_as: IsdAsn) -> Result; -// -// /// Propose a path to the service. -// /// -// /// The service may or may not choose to store the path. -// fn add_path(&self, path: &Path); -// -// /// Notify the service of a path-related SCMP message. -// fn on_scmp(&self, _args: ()) { -// todo!() -// } -// } -// -// #[derive(Debug, Default)] -// pub struct PathSet { -// paths: HashMap>, -// } -// -// impl PathSet { -// pub fn new() -> Self { -// Self::default() -// } -// } -// -// #[async_trait::async_trait] -// impl PathService for PathSet { -// async fn path_to(&self, scion_as: IsdAsn) -> Result { -// todo!() -// } -// -// fn add_path(&self, path: &Path) { -// todo!() -// } -// } -// -// -// impl PathAwareDatagram -// where -// D: ScionDatagramSocket, -// P: PathService, -// { -// pub fn new(datagram_socket: D, path_service: Arc

) -> Self { -// Self { -// datagram_socket, -// path_service, -// } -// } -// -// pub fn set_path_service(&mut self, path_service: Arc

) { -// self.path_service = path_service; -// } -// -// // pub fn set_path_service(&mut self, path_service: P) { -// // self.path_service = path_service; -// // } -// } -// -// // #[async_trait::async_trait] -// // impl ScionDatagramSocket for PathAwareDatagram -// // where -// // D: ScionDatagramSocket, -// // P: PathService, -// // { -// // async fn recv_with_path(&self, buffer: &mut [u8]) -> Result<(usize, Path), ReceiveError> { -// // self.datagram_socket.recv_with_path(buffer).await -// // } -// // // async fn recv_from_with_path( -// // // &self, -// // // buffer: &mut [u8], -// // // ) -> Result<(usize, SocketAddr, Path), Self::RecvErr>; -// // // async fn send_via(&self, payload: Bytes, path: &Path) -> Result<(), Self::SendErr>; -// // // async fn send_to_via( -// // // &self, -// // // payload: Bytes, -// // // destination: SocketAddr, -// // // path: &Path, -// // // ) -> Result<(), Self::SendErr>; -// // } -// -// #[cfg(test)] -// mod tests { -// -// use std::time::Duration; -// -// use tokio::net::UnixStream; -// -// use super::*; -// use crate::{dispatcher::DispatcherStream, udp_socket::UdpSocket}; -// -// type TestResult = Result>; -// -// pub fn socket_from(source: SocketAddr) -> TestResult<(UdpSocket, DispatcherStream)> { -// let (inner, inner_remote) = UnixStream::pair()?; -// Ok(( -// UdpSocket::new(DispatcherStream::new(inner), source), -// DispatcherStream::new(inner_remote), -// )) -// } -// -// // #[test] -// // fn sanity() -> TestResult { -// // let (socket, _) = socket_from("[1-ff00:0:110,3.3.3.3]:8080".parse()?)?; -// // let dgram_socket2 = PathAwareDatagram::new(socket, Arc::new(PathSet::new())); -// // dgram_socket2.hello(); -// // -// // let (socket, _) = socket_from("[1-ff00:0:110,3.3.3.3]:8080".parse()?)?; -// // let dgram_socket2 = PathAwareDatagram::new(socket, Arc::new(PathSet::new())); -// // dgram_socket2.hello(); -// // Ok(()) -// // } -// } diff --git a/crates/scion/src/pan/datagram.rs b/crates/scion/src/pan/datagram.rs index 666327a..27f9476 100644 --- a/crates/scion/src/pan/datagram.rs +++ b/crates/scion/src/pan/datagram.rs @@ -1,12 +1,8 @@ -use std::{io::ErrorKind, sync::Arc}; +use std::{io, sync::Arc}; use async_trait; use bytes::Bytes; -use scion_proto::{ - address::IsdAsn, - path::{DataplanePath, Path}, -}; -use tokio::sync::Mutex; +use scion_proto::{address::IsdAsn, path::Path}; use super::{AsyncPathService, ReceiveError, SendError}; @@ -27,8 +23,9 @@ pub trait AsyncScionDatagram { /// /// The returned path corresponds to the reversed path observed in the packet for known path /// types, or a copy of the opaque path data for unknown path types. In either case, the raw - /// raw data comprising the returned path is written to path_buffer, which must be at least - /// [DataplanePath::MAX_LEN][`DataplanePath::::MAX_LEN`] bytes in length. + /// data comprising the returned path is written to path_buffer, which must be at least + /// [`DataplanePath::MAX_LEN`][`scion_proto::path::DataplanePath::::MAX_LEN`] bytes in + /// length. async fn recv_from_with_path<'p>( &self, buffer: &mut [u8], @@ -50,7 +47,7 @@ pub trait AsyncScionDatagram { /// /// Similar to [`Self::recv_from_with_path`], this receives the datagram into the provided /// buffer. However, as this does not return any information about the sender, this is - /// primarily used where the sender is already known, such as with connected sockets + /// primarily used where the sender is already known, such as with connected sockets. /// /// See [`Self::recv_from_with_path`] for more information. async fn recv_with_path<'p>( @@ -66,7 +63,7 @@ pub trait AsyncScionDatagram { /// /// Similar to [`Self::recv_from_with_path`], this receives the datagram into the provided /// buffer. However, as this does not return any information about the sender, this is - /// primarily used where the sender is already known, such as with connected sockets + /// primarily used where the sender is already known, such as with connected sockets. /// /// In the case where neither the path nor the sender is needed, this method should be used /// instead of [`Self::recv_with_path`] as the implementation may avoid copying the path. @@ -103,7 +100,6 @@ pub trait AsyncScionDatagram { pub struct PathAwareDatagram { socket: D, path_service: Arc

, - path_buffer: Mutex>, } impl PathAwareDatagram @@ -111,12 +107,11 @@ where D: AsyncScionDatagram + Send + Sync, P: AsyncPathService + Send + Sync, { - /// Creates a new socket that wraps the provided socket and path service. + /// Creates a new `PathAwareDatagram` socket that wraps the provided socket and path service. pub fn new(socket: D, path_service: Arc

) -> Self { Self { socket, path_service, - path_buffer: Mutex::new(vec![0u8; DataplanePath::::MAX_LEN]), } } @@ -146,7 +141,7 @@ where let path = self.path_to(*remote_addr.as_ref()).await?; self.send_via(payload, path).await } else { - Err(SendError::Io(ErrorKind::NotConnected.into())) + Err(SendError::Io(io::ErrorKind::NotConnected.into())) } } @@ -158,8 +153,6 @@ where } } -// TODO(jsmith): We could allow the AsyncPathService to disable receiving paths from the network. -// This could improve the performance in these special cases. #[async_trait::async_trait] impl AsyncScionDatagram for PathAwareDatagram where @@ -173,31 +166,11 @@ where buffer: &mut [u8], path_buffer: &'p mut [u8], ) -> Result<(usize, Self::Addr, Path<&'p mut [u8]>), ReceiveError> { - let (len, sender, path) = self.socket.recv_from_with_path(buffer, path_buffer).await?; - - self.path_service.maybe_add_path(&path); - - Ok((len, sender, path)) + self.socket.recv_from_with_path(buffer, path_buffer).await } - /// Receive a datagram and its sender. - /// - /// In order to observe the network path, this implementation of recv_from uses an internal - /// path_buffer guarded by a mutex. As a result, only one recv_from call can be ongoing at - /// a time. If multiple asynchronous recvs are desired, then use the recv_from_with_path - /// method instead. - /// - /// See the trait [`AsyncScionDatagram::recv_from_with_path`] for more information on the - /// method. async fn recv_from(&self, buffer: &mut [u8]) -> Result<(usize, Self::Addr), ReceiveError> { - // TODO(jsmith): Determine if we need to remove this mutex. - // The use of this mutex means only a single recv_from call be in progress at once. To await - // allow multiple calls we would likely need multiple buffers. - let mut path_buffer = self.path_buffer.lock().await; - let (len, sender, _) = self - .recv_from_with_path(buffer, path_buffer.as_mut_slice()) - .await?; - Ok((len, sender)) + self.socket.recv_from(buffer).await } async fn send_to_via( @@ -206,12 +179,10 @@ where destination: Self::Addr, path: &Path, ) -> Result<(), SendError> { - self.path_service.maybe_add_shared_path(path); self.socket.send_to_via(payload, destination, path).await } async fn send_via(&self, payload: Bytes, path: &Path) -> Result<(), SendError> { - self.path_service.maybe_add_shared_path(path); self.socket.send_via(payload, path).await } diff --git a/crates/scion/src/pan/error.rs b/crates/scion/src/pan/error.rs index be04a89..7232679 100644 --- a/crates/scion/src/pan/error.rs +++ b/crates/scion/src/pan/error.rs @@ -1,4 +1,4 @@ -use std::{fmt::Display, io::ErrorKind}; +use std::{fmt::Display, io}; use scion_proto::packet::{self, EncodeError}; @@ -29,7 +29,6 @@ impl Display for PathErrorKind { } /// Error returned when attempting to send a datagram on the SCION network. -#[allow(missing_docs)] #[derive(Debug, thiserror::Error)] pub enum SendError { /// An IO error raised from the OS or from the socket. @@ -49,8 +48,8 @@ impl From for SendError { } } -impl From for SendError { - fn from(value: ErrorKind) -> Self { +impl From for SendError { + fn from(value: io::ErrorKind) -> Self { Self::Io(value.into()) } } diff --git a/crates/scion/src/pan/path_service.rs b/crates/scion/src/pan/path_service.rs index ae6abd7..bcffd9b 100644 --- a/crates/scion/src/pan/path_service.rs +++ b/crates/scion/src/pan/path_service.rs @@ -9,28 +9,12 @@ pub enum PathLookupError { } /// Trait for asynchronously retrieving paths to SCION ASes. +/// +/// If an implementation wants to receive paths from the network #[async_trait::async_trait] pub trait AsyncPathService { /// Return a path to the specified AS. async fn path_to<'a>(&'a self, scion_as: IsdAsn) -> Result<&'a Path, PathLookupError>; - - /// Provide the service with a path that it may choose to store. - /// - /// Returns true if the path was stored by the service. - /// - /// Since [`Path`] stores the raw path data as a Bytes object, this method allows - /// the service to cheaply clone the Path without copying the raw data. - /// - /// See [`Self::maybe_add_path`] for a variant that always copies the path. - fn maybe_add_shared_path(&self, path: &Path) -> bool; - - /// Provide the service with a path that it may choose to store. - /// - /// Similarly to [`Self::maybe_add_shared_path`] this returns true if the path was copied. - /// However, this method always copies the path to an owned variety for storage. - /// - /// See [`Self::maybe_add_shared_path`] for a variant which may avoid copying the path. - fn maybe_add_path(&self, path: &Path<&mut [u8]>) -> bool; } #[async_trait::async_trait] @@ -52,12 +36,4 @@ impl AsyncPathService for Path { } Ok(self) } - - fn maybe_add_shared_path(&self, _path: &Path) -> bool { - false - } - - fn maybe_add_path(&self, _path: &Path<&mut [u8]>) -> bool { - false - } } diff --git a/crates/scion/src/udp_socket.rs b/crates/scion/src/udp_socket.rs index 8939738..055f24d 100644 --- a/crates/scion/src/udp_socket.rs +++ b/crates/scion/src/udp_socket.rs @@ -1,10 +1,7 @@ -#![allow(missing_docs)] - //! A socket to send UDP datagrams via SCION. - use std::{ cmp, - io::{self, ErrorKind}, + io, sync::{Arc, RwLock}, }; @@ -25,11 +22,13 @@ use crate::{ pan::{AsyncScionDatagram, PathErrorKind, ReceiveError, SendError}, }; -#[allow(missing_docs)] +/// Errors that may be raised when attempted to bind a [`UdpSocket`]. #[derive(Debug, thiserror::Error)] pub enum BindError { + /// The UdpSocket was unable to connect to the dispatcher at the provided address. #[error("failed to connect to the dispatcher, reason: {0}")] DispatcherConnectFailed(#[from] io::Error), + /// An error which occurred during the registration handshake with the SCION dispatcher. #[error("failed to bind to the requested port")] RegistrationFailed(#[from] RegistrationError), } @@ -42,20 +41,44 @@ impl From for SendError { } } +/// A SCION UDP socket. +/// +/// After creating a `UdpSocket` by binding it to a SCION socket address, data can +/// be [sent to][AsyncScionDatagram::send_to_via] and [received from][AsyncScionDatagram::recv_from] +/// any other socket address by using the methods on the [`AsyncScionDatagram`] trait. +/// +/// As SCION is a path-aware internet architecture, sending packets with the `UdpSocket` allows +/// specifying the path over which the packet should be sent. See +/// [`PathAwareDatagram`][crate::pan::PathAwareDatagram] for a wrapping socket than handles +/// the selection of paths. +/// +/// Although UDP is a connectionless protocol, this implementation provides an interface to set an +/// address where data should be sent and received from. After setting a remote address with +/// [`connect`][UdpSocket::connect], data can be sent to and received from that address with the +/// [`send_via`][AsyncScionDatagram::send_via] and [`recv`][AsyncScionDatagram::recv] methods. #[derive(Debug)] pub struct UdpSocket { inner: Arc, } impl UdpSocket { + /// Creates a new UDP socket bound to the provided SCION socket address. pub async fn bind(address: SocketAddr) -> Result { Self::bind_with_dispatcher(address, get_dispatcher_path()).await } - pub async fn bind_with_dispatcher + std::fmt::Debug>( + /// Creates a new UDP socket from the given SCION socket address, by connecting to + /// and registering with the SCION dispatcher at the specified path. + /// + /// See [`bind`][Self::bind] for a variant that connects to the system's configured + /// SCION dispatcher . + pub async fn bind_with_dispatcher

( address: SocketAddr, dispatcher_path: P, - ) -> Result { + ) -> Result + where + P: AsRef + std::fmt::Debug, + { let mut stream = DispatcherStream::connect(dispatcher_path).await?; let local_address = stream.register(address).await?; @@ -170,7 +193,7 @@ impl UdpSocketInner { ) -> Result<(), SendError> { let state = self.state.read().unwrap().clone(); let Some(destination) = destination.or(state.remote_address) else { - return Err(ErrorKind::NotConnected.into()); + return Err(io::ErrorKind::NotConnected.into()); }; if let Some(metadata) = &path.metadata { @@ -487,7 +510,7 @@ mod tests { .expect_err("should fail on unconnected socket"); if let SendError::Io(io_err) = err { - assert_eq!(io_err.kind(), ErrorKind::NotConnected); + assert_eq!(io_err.kind(), io::ErrorKind::NotConnected); } else { panic!("expected Io(ErrorKind::NotConnected), got {}", err); } @@ -618,13 +641,16 @@ mod tests { pub const USE_FROM: bool = true; async_test_case! { - connected: - test_connected_recv( - "[1-f:0:3,4.4.0.1]:80", "[1-f:0:3,11.10.13.7]:443", "[1-f:0:3,10.20.30.40]:981", USE_FROM - ) + connected: test_connected_recv( + "[1-f:0:3,4.4.0.1]:80", + "[1-f:0:3,11.10.13.7]:443", + "[1-f:0:3,10.20.30.40]:981", + USE_FROM + ) } async_test_case! { - unconnected: test_unconnected_recv("[1-f:0:3,4.4.0.1]:80", "[1-f:0:3,11.10.13.7]:443", USE_FROM) + unconnected: + test_unconnected_recv("[1-f:0:3,4.4.0.1]:80", "[1-f:0:3,11.10.13.7]:443", USE_FROM) } #[tokio::test] @@ -667,48 +693,15 @@ mod tests { async_test_case! { connected: test_connected_recv( - "[1-f:0:3,4.4.0.1]:80", "[1-f:0:3,11.10.13.7]:443", "[1-f:0:3,10.20.30.40]:981", !USE_FROM + "[1-f:0:3,4.4.0.1]:80", + "[1-f:0:3,11.10.13.7]:443", + "[1-f:0:3,10.20.30.40]:981", + !USE_FROM ) } async_test_case! { - unconnected: test_unconnected_recv("[1-f:0:3,3.3.3.3]:80", "[1-f:0:3,9.9.9.81]:443", !USE_FROM) + unconnected: + test_unconnected_recv("[1-f:0:3,3.3.3.3]:80", "[1-f:0:3,9.9.9.81]:443", !USE_FROM) } } - - // TODO(jsmith): Convert to test for for Pan socket - // #[tokio::test] - // async fn set_path() -> TestResult { - // let local_addr: SocketAddr = "[1-f:0:1,9.8.7.6]:80".parse()?; - // let (socket, _) = utils::socket_from(local_addr)?; - // let path = Path::local(local_addr.isd_asn()); - - // let notify = Arc::new(Notify::new()); - // let notify2 = Arc::new(Notify::new()); - - // let (result1, result2) = tokio::join!( - // async { - // let initial = socket.path(); - // socket.set_path(Some(path.clone())); - // notify.notify_one(); - - // notify2.notified().await; - // let last_set = socket.path(); - - // (initial, last_set) - // }, - // async { - // notify.notified().await; - // let first_set = socket.path(); - // socket.set_path(None); - // notify2.notify_one(); - - // first_set - // } - // ); - - // assert_eq!(result1, (None, None)); - // assert_eq!(result2, Some(path)); - - // Ok(()) - // } } From 662ecd363a8181a9ab195c16b422ea07320997c5 Mon Sep 17 00:00:00 2001 From: Jean-Pierre Smith Date: Tue, 19 Dec 2023 18:40:40 +0100 Subject: [PATCH 4/6] feat: add simple integration test of PAN functionality --- crates/scion/src/pan/datagram.rs | 6 ++ crates/scion/tests/test_pan_socket.rs | 119 ++++++++++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 crates/scion/tests/test_pan_socket.rs diff --git a/crates/scion/src/pan/datagram.rs b/crates/scion/src/pan/datagram.rs index 27f9476..70c403d 100644 --- a/crates/scion/src/pan/datagram.rs +++ b/crates/scion/src/pan/datagram.rs @@ -190,3 +190,9 @@ where self.socket.remote_addr() } } + +impl AsRef for PathAwareDatagram { + fn as_ref(&self) -> &D { + &self.socket + } +} diff --git a/crates/scion/tests/test_pan_socket.rs b/crates/scion/tests/test_pan_socket.rs new file mode 100644 index 0000000..5dd5ec4 --- /dev/null +++ b/crates/scion/tests/test_pan_socket.rs @@ -0,0 +1,119 @@ +use std::{sync::OnceLock, time::Duration}; + +use bytes::Bytes; +use scion::{ + daemon::{get_daemon_address, DaemonClient}, + pan::{AsyncScionDatagram, PathAwareDatagram}, + udp_socket::UdpSocket, +}; +use scion_proto::{address::SocketAddr, packet::ByEndpoint, path::Path}; +use tokio::sync::Mutex; + +type TestResult = Result>; + +static MESSAGE: Bytes = Bytes::from_static(b"Hello SCION!"); +const TIMEOUT: Duration = std::time::Duration::from_secs(1); + +macro_rules! test_send_receive_reply { + ($name:ident, $source:expr, $destination:expr) => { + mod $name { + use super::*; + + // Prevent tests running simultaneously to avoid registration errors from the dispatcher + fn lock() -> &'static Mutex<()> { + static LOCK: OnceLock> = OnceLock::new(); + LOCK.get_or_init(|| Mutex::default()) + } + + async fn get_sockets( + ) -> TestResult<(PathAwareDatagram, UdpSocket, Path)> { + let endpoints: ByEndpoint = ByEndpoint { + source: $source.parse().unwrap(), + destination: $destination.parse().unwrap(), + }; + let daemon_client_source = DaemonClient::connect(&get_daemon_address()) + .await + .expect("should be able to connect"); + let socket_source = UdpSocket::bind(endpoints.source).await?; + let socket_destination = UdpSocket::bind(endpoints.destination).await?; + + socket_source.connect(endpoints.destination); + let path_forward = daemon_client_source + .paths_to(endpoints.destination.isd_asn()) + .await? + .next() + .unwrap(); + println!("Forward path: {:?}", path_forward.dataplane_path); + + let socket_source = + PathAwareDatagram::new(socket_source, path_forward.clone().into()); + + Ok((socket_source, socket_destination, path_forward)) + } + + #[tokio::test] + #[ignore = "requires daemon and dispatcher"] + async fn message() -> TestResult { + let _lock = lock().lock().await; + + let (socket_source, socket_destination, _) = get_sockets().await?; + socket_source.send(MESSAGE.clone()).await?; + + let mut buffer = vec![0_u8; 1500]; + let (length, sender) = + tokio::time::timeout(TIMEOUT, socket_destination.recv_from(&mut buffer)) + .await??; + assert_eq!(sender, socket_source.as_ref().local_addr()); + assert_eq!(buffer[..length], MESSAGE[..]); + Ok(()) + } + + #[tokio::test] + #[ignore = "requires daemon and dispatcher"] + async fn message_and_response() -> TestResult { + let _lock = lock().lock().await; + + let (socket_source, socket_destination, path_forward) = get_sockets().await?; + socket_source.send(MESSAGE.clone()).await?; + + let mut buffer = vec![0_u8; 128]; + let mut path_buffer = vec![0_u8; 1024]; + let (length, sender, path) = tokio::time::timeout( + TIMEOUT, + socket_destination.recv_from_with_path(&mut buffer, &mut path_buffer), + ) + .await??; + assert_eq!(sender, socket_source.as_ref().local_addr()); + assert_eq!(buffer[..length], MESSAGE[..]); + + println!("Reply path: {:?}", path.dataplane_path); + let path: Path = path.into(); + let socket_destination = PathAwareDatagram::new(socket_destination, path.into()); + + socket_destination.send_to(MESSAGE.clone(), sender).await?; + + let (_, path_return) = tokio::time::timeout( + TIMEOUT, + socket_source.recv_with_path(&mut buffer, &mut path_buffer), + ) + .await??; + assert_eq!(path_return.isd_asn, path_forward.isd_asn); + assert_eq!(path_return.dataplane_path, path_forward.dataplane_path); + + Ok(()) + } + } + }; +} + +test_send_receive_reply!( + send_and_receive_up_and_down_segment, + "[1-ff00:0:111,127.0.0.17]:12345", + "[1-ff00:0:112,fd00:f00d:cafe::7f00:a]:443" +); + +test_send_receive_reply!( + send_and_receive_same_as, + "[1-ff00:0:111,127.0.0.17]:12346", + "[1-ff00:0:111,127.0.0.17]:8080" +); From 555c6d38309eaf8ed632b389123f29708d0c3d9f Mon Sep 17 00:00:00 2001 From: Jean-Pierre Smith Date: Wed, 20 Dec 2023 11:43:38 +0100 Subject: [PATCH 5/6] fix: integrate review feedback --- crates/scion-proto/src/datagram.rs | 14 +++++++------- crates/scion-proto/src/packet/udp.rs | 14 +++++++------- crates/scion-proto/src/reliable/registration.rs | 4 ++-- crates/scion/src/pan/path_service.rs | 2 -- crates/scion/src/udp_socket.rs | 6 +++--- 5 files changed, 19 insertions(+), 21 deletions(-) diff --git a/crates/scion-proto/src/datagram.rs b/crates/scion-proto/src/datagram.rs index 43ade0b..b0becd6 100644 --- a/crates/scion-proto/src/datagram.rs +++ b/crates/scion-proto/src/datagram.rs @@ -30,7 +30,7 @@ pub enum UdpEncodeError { /// /// [RFC]: https://www.ietf.org/archive/id/draft-dekater-scion-dataplane-00.html #[derive(Debug, Default, PartialEq)] -pub struct UdpDatagram { +pub struct UdpMessage { /// The source and destination ports pub port: ByEndpoint, /// The length of the header and payload @@ -41,7 +41,7 @@ pub struct UdpDatagram { pub payload: Bytes, } -impl UdpDatagram { +impl UdpMessage { /// SCION protocol number for UDP. /// /// See the [IETF SCION-dataplane RFC draft][rfc] for possible values. @@ -89,7 +89,7 @@ impl UdpDatagram { } } -impl WireEncodeVec<2> for UdpDatagram { +impl WireEncodeVec<2> for UdpMessage { type Error = InadequateBufferSize; fn encode_with_unchecked(&self, buffer: &mut BytesMut) -> [Bytes; 2] { @@ -111,11 +111,11 @@ impl WireEncodeVec<2> for UdpDatagram { } } -impl WireDecode for UdpDatagram { +impl WireDecode for UdpMessage { type Error = UdpDecodeError; fn decode(data: &mut T) -> Result { - if data.remaining() < UdpDatagram::HEADER_LEN { + if data.remaining() < UdpMessage::HEADER_LEN { return Err(Self::Error::DatagramEmptyOrTruncated); } @@ -161,7 +161,7 @@ mod tests { destination: MaybeEncoded::Decoded(Ipv4Addr::from_str("10.0.0.2")?.into()), }, }; - let mut datagram = UdpDatagram::new( + let mut datagram = UdpMessage::new( ByEndpoint { source: 10001, destination: 10002, @@ -193,7 +193,7 @@ mod tests { let mut encoded_bytes = BytesMut::new(); encoded_bytes.put(encoded_datagram[0].clone()); encoded_bytes.put(encoded_datagram[1].clone()); - assert_eq!(UdpDatagram::decode(&mut encoded_bytes.freeze())?, datagram); + assert_eq!(UdpMessage::decode(&mut encoded_bytes.freeze())?, datagram); Ok(()) } diff --git a/crates/scion-proto/src/packet/udp.rs b/crates/scion-proto/src/packet/udp.rs index 558262f..7f839d9 100644 --- a/crates/scion-proto/src/packet/udp.rs +++ b/crates/scion-proto/src/packet/udp.rs @@ -5,7 +5,7 @@ use bytes::Bytes; use super::{InadequateBufferSize, ScionHeaders, ScionPacketRaw}; use crate::{ address::SocketAddr, - datagram::{UdpDatagram, UdpDecodeError}, + datagram::{UdpDecodeError, UdpMessage}, packet::{ByEndpoint, EncodeError}, path::Path, wire_encoding::{WireDecode, WireEncodeVec}, @@ -16,7 +16,7 @@ pub struct ScionPacketUdp { /// Packet headers pub headers: ScionHeaders, /// The contained UDP datagram - pub datagram: UdpDatagram, + pub datagram: UdpMessage, } impl ScionPacketUdp { @@ -62,11 +62,11 @@ impl ScionPacketUdp { let headers = ScionHeaders::new( endhosts, path, - UdpDatagram::PROTOCOL_NUMBER, - payload.len() + UdpDatagram::HEADER_LEN, + UdpMessage::PROTOCOL_NUMBER, + payload.len() + UdpMessage::HEADER_LEN, )?; let mut datagram = - UdpDatagram::new(endhosts.map(|e| e.port()), payload).map_err(|_| todo!())?; + UdpMessage::new(endhosts.map(|e| e.port()), payload).map_err(|_| todo!())?; datagram.set_checksum(&headers.address); Ok(Self { headers, datagram }) @@ -77,14 +77,14 @@ impl TryFrom for ScionPacketUdp { type Error = UdpDecodeError; fn try_from(mut value: ScionPacketRaw) -> Result { - if value.headers.common.next_header != UdpDatagram::PROTOCOL_NUMBER { + if value.headers.common.next_header != UdpMessage::PROTOCOL_NUMBER { return Err(UdpDecodeError::WrongProtocolNumber( value.headers.common.next_header, )); } Ok(Self { headers: value.headers, - datagram: UdpDatagram::decode(&mut value.payload)?, + datagram: UdpMessage::decode(&mut value.payload)?, }) } } diff --git a/crates/scion-proto/src/reliable/registration.rs b/crates/scion-proto/src/reliable/registration.rs index e153a8b..de4b54c 100644 --- a/crates/scion-proto/src/reliable/registration.rs +++ b/crates/scion-proto/src/reliable/registration.rs @@ -29,7 +29,7 @@ use bytes::{Buf, BufMut}; use super::wire_utils::LAYER4_PORT_OCTETS; use crate::{ address::{HostType, IsdAsn, ServiceAddr, SocketAddr as ScionSocketAddr}, - datagram::UdpDatagram, + datagram::UdpMessage, reliable::{ wire_utils::{encoded_address_and_port_length, encoded_address_length}, ADDRESS_TYPE_OCTETS, @@ -85,7 +85,7 @@ impl RegistrationRequest { self.encode_command_flag(buffer); - buffer.put_u8(UdpDatagram::PROTOCOL_NUMBER); + buffer.put_u8(UdpMessage::PROTOCOL_NUMBER); buffer.put_u64(self.isd_asn.as_u64()); encode_address(buffer, &self.public_address); diff --git a/crates/scion/src/pan/path_service.rs b/crates/scion/src/pan/path_service.rs index bcffd9b..578b9cb 100644 --- a/crates/scion/src/pan/path_service.rs +++ b/crates/scion/src/pan/path_service.rs @@ -9,8 +9,6 @@ pub enum PathLookupError { } /// Trait for asynchronously retrieving paths to SCION ASes. -/// -/// If an implementation wants to receive paths from the network #[async_trait::async_trait] pub trait AsyncPathService { /// Return a path to the specified AS. diff --git a/crates/scion/src/udp_socket.rs b/crates/scion/src/udp_socket.rs index 055f24d..d3dd03e 100644 --- a/crates/scion/src/udp_socket.rs +++ b/crates/scion/src/udp_socket.rs @@ -9,7 +9,7 @@ use bytes::Bytes; use chrono::Utc; use scion_proto::{ address::SocketAddr, - datagram::{UdpDatagram, UdpEncodeError}, + datagram::{UdpEncodeError, UdpMessage}, packet::{ByEndpoint, ScionPacketRaw, ScionPacketUdp}, path::{DataplanePath, Path}, reliable::Packet, @@ -47,7 +47,7 @@ impl From for SendError { /// be [sent to][AsyncScionDatagram::send_to_via] and [received from][AsyncScionDatagram::recv_from] /// any other socket address by using the methods on the [`AsyncScionDatagram`] trait. /// -/// As SCION is a path-aware internet architecture, sending packets with the `UdpSocket` allows +/// As SCION is a path-aware Internet architecture, sending packets with the `UdpSocket` allows /// specifying the path over which the packet should be sent. See /// [`PathAwareDatagram`][crate::pan::PathAwareDatagram] for a wrapping socket than handles /// the selection of paths. @@ -298,7 +298,7 @@ impl UdpSocketInner { .map_err(log_err!("failed to decode SCION packet")) .ok()?; - let udp_datagram = UdpDatagram::decode(&mut scion_packet.payload) + let udp_datagram = UdpMessage::decode(&mut scion_packet.payload) .map_err(log_err!("failed to decode UDP datagram")) .ok()?; From ccf0ac8d653f9afffada4b909f79c8512627addb Mon Sep 17 00:00:00 2001 From: Jean-Pierre Smith Date: Wed, 20 Dec 2023 11:46:08 +0100 Subject: [PATCH 6/6] docs: document choice of send_via --- crates/scion/src/pan/datagram.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/scion/src/pan/datagram.rs b/crates/scion/src/pan/datagram.rs index 70c403d..a3d2a08 100644 --- a/crates/scion/src/pan/datagram.rs +++ b/crates/scion/src/pan/datagram.rs @@ -138,6 +138,7 @@ where /// Send a datagram using [`AsyncScionDatagram::send_via`] with a path from the path service. pub async fn send(&self, payload: Bytes) -> Result<(), SendError> { if let Some(remote_addr) = self.remote_addr() { + // Use send_via here as it maintains the connected semantics of the function call. let path = self.path_to(*remote_addr.as_ref()).await?; self.send_via(payload, path).await } else {