From 3acc53a11f632c5760bdbfa6e19473cb402bc6f7 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 14 Sep 2024 18:35:15 +0100 Subject: [PATCH 01/11] IPv6 support for SSDP (for UPNP) --- Cargo.lock | 1 + Makefile | 2 +- crates/upnp-serve/Cargo.toml | 1 + crates/upnp-serve/src/ssdp.rs | 387 ++++++++++++++++++++++++---------- crates/upnp/src/lib.rs | 56 ++--- 5 files changed, 303 insertions(+), 144 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 00f23c05..2e00a744 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2692,6 +2692,7 @@ dependencies = [ "gethostname", "http 1.1.0", "httparse", + "libc", "librqbit-core", "librqbit-sha1-wrapper", "librqbit-upnp", diff --git a/Makefile b/Makefile index eb5b73cf..4cd643d9 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,7 @@ webui-dev: webui-deps # NOTE: on LG TV using hostname is unstable for some reason, use IP address. export RQBIT_UPNP_SERVER_ENABLE ?= true export RQBIT_UPNP_SERVER_FRIENDLY_NAME ?= rqbit-dev -export RQBIT_HTTP_API_LISTEN_ADDR ?= 0.0.0.0:3030 +export RQBIT_HTTP_API_LISTEN_ADDR ?= [::]:3030 export RQBIT_FASTRESUME = true CARGO_RUN_FLAGS ?= RQBIT_OUTPUT_FOLDER ?= /tmp/scratch diff --git a/crates/upnp-serve/Cargo.toml b/crates/upnp-serve/Cargo.toml index 176b116e..c2edf939 100644 --- a/crates/upnp-serve/Cargo.toml +++ b/crates/upnp-serve/Cargo.toml @@ -39,6 +39,7 @@ socket2 = "0.5.7" quick-xml = { version = "0.36.1", features = ["serialize"] } network-interface = "2.0.0" futures = "0.3.30" +libc = "0.2.158" [dev-dependencies] tracing-subscriber = "0.3.18" diff --git a/crates/upnp-serve/src/ssdp.rs b/crates/upnp-serve/src/ssdp.rs index 64b7c72d..87bd2c9c 100644 --- a/crates/upnp-serve/src/ssdp.rs +++ b/crates/upnp-serve/src/ssdp.rs @@ -1,24 +1,33 @@ use std::{ - net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + collections::HashSet, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, + os::fd::{AsFd, AsRawFd}, time::Duration, }; use anyhow::{bail, Context}; use bstr::BStr; use network_interface::NetworkInterfaceConfig; +use parking_lot::Mutex; use tokio::net::UdpSocket; use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; use crate::constants::{UPNP_KIND_MEDIASERVER, UPNP_KIND_ROOT_DEVICE}; -const UPNP_PORT: u16 = 1900; -const UPNP_BROADCAST_IP: Ipv4Addr = Ipv4Addr::new(239, 255, 255, 250); -const UPNP_BROADCAST_ADDR: SocketAddrV4 = SocketAddrV4::new(UPNP_BROADCAST_IP, UPNP_PORT); +const SSDP_PORT: u16 = 1900; +const SSDM_MCAST_IPV4: Ipv4Addr = Ipv4Addr::new(239, 255, 255, 250); +const SSDP_MCAST_IPV6_LINK_LOCAL: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xc); +const SSDP_MCAST_IPV6_SITE_LOCAL: Ipv6Addr = Ipv6Addr::new(0xff05, 0, 0, 0, 0, 0, 0, 0xc); const NTS_ALIVE: &str = "ssdp:alive"; const NTS_BYEBYE: &str = "ssdp:byebye"; +fn ipv6_is_link_local(ip: Ipv6Addr) -> bool { + let s = ip.segments(); + [s[0], s[1], s[2], s[3]] == [0xfe80, 0, 0, 0] +} + #[derive(Debug)] pub enum SsdpMessage<'a, 'h> { MSearch(SsdpMSearchRequest<'a>), @@ -30,6 +39,7 @@ pub enum SsdpMessage<'a, 'h> { #[derive(Debug)] pub struct SsdpMSearchRequest<'a> { + #[allow(dead_code)] pub host: &'a BStr, pub man: &'a BStr, pub st: &'a BStr, @@ -37,9 +47,6 @@ pub struct SsdpMSearchRequest<'a> { impl<'a> SsdpMSearchRequest<'a> { fn matches_media_server(&self) -> bool { - if self.host != "239.255.255.250:1900" { - return false; - } if self.man != "\"ssdp:discover\"" { return false; } @@ -103,61 +110,171 @@ pub struct SsdpRunnerOptions { pub struct SsdpRunner { opts: SsdpRunnerOptions, - socket: UdpSocket, + socket_v4: Option, + socket_v6: Option, } -impl SsdpRunner { - pub async fn new(opts: SsdpRunnerOptions) -> anyhow::Result { - let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, UPNP_PORT); - let sock = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::DGRAM, None) - .context("error creating socket")?; - #[cfg(not(target_os = "windows"))] - sock.set_reuse_port(true) - .context("error setting SO_REUSEPORT")?; - sock.set_reuse_address(true) - .context("error setting SO_REUSEADDR")?; - - trace!(addr=?bind_addr, "binding UDP"); - sock.bind(&bind_addr.into()) - .context(bind_addr) - .context("error binding")?; - - sock.set_nonblocking(true)?; - let socket = tokio::net::UdpSocket::from_std(sock.into()) - .context("error converting socket2 socket to tokio")?; - - let default_multiast_membership_ip = std::iter::once(Ipv4Addr::UNSPECIFIED); - let all_multicast_membership_ips = network_interface::NetworkInterface::show() - .into_iter() - .flatten() - .flat_map(|nic| nic.addr.into_iter()) - .filter_map(|addr| { - let ip = addr.ip(); - match ip { - std::net::IpAddr::V4(addr) if addr.is_private() && !addr.is_loopback() => { - Some(addr) - } - _ => None, +fn socket_presetup(bind_addr: SocketAddr) -> anyhow::Result { + let domain = if bind_addr.is_ipv4() { + socket2::Domain::IPV4 + } else { + socket2::Domain::IPV6 + }; + let sock = socket2::Socket::new(domain, socket2::Type::DGRAM, None) + .context(bind_addr) + .context("error creating socket")?; + #[cfg(not(target_os = "windows"))] + sock.set_reuse_port(true) + .context("error setting SO_REUSEPORT")?; + sock.set_reuse_address(true) + .context("error setting SO_REUSEADDR")?; + + trace!(addr=?bind_addr, "binding UDP"); + sock.bind(&bind_addr.into()) + .context(bind_addr) + .context("error binding")?; + + sock.set_nonblocking(true)?; + let socket = tokio::net::UdpSocket::from_std(sock.into()) + .context("error converting socket2 socket to tokio")?; + + Ok(socket) +} + +async fn bind_v4_socket() -> anyhow::Result { + let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, SSDP_PORT); + let socket = socket_presetup(bind_addr.into())?; + + let default_multiast_membership_ip = std::iter::once(Ipv4Addr::UNSPECIFIED); + let all_multicast_membership_ips = network_interface::NetworkInterface::show() + .into_iter() + .flatten() + .flat_map(|nic| nic.addr.into_iter()) + .filter_map(|addr| { + let ip = addr.ip(); + match ip { + std::net::IpAddr::V4(addr) if addr.is_private() && !addr.is_loopback() => { + Some(addr) } - }); + _ => None, + } + }); + + for ifaddr in default_multiast_membership_ip.chain(all_multicast_membership_ips) { + trace!(multiaddr=?SSDM_MCAST_IPV4, interface=?ifaddr, "joining multicast v4 group"); + if let Err(e) = socket.join_multicast_v4(SSDM_MCAST_IPV4, ifaddr) { + debug!(multiaddr=?SSDM_MCAST_IPV4, interface=?ifaddr, "error joining multicast v4 group: {e:#}"); + } + } + + Ok(socket) +} - for ifaddr in default_multiast_membership_ip.chain(all_multicast_membership_ips) { - trace!(multiaddr=?UPNP_BROADCAST_IP, interface=?ifaddr, "joining multicast v4 group"); - if let Err(e) = socket.join_multicast_v4(UPNP_BROADCAST_IP, ifaddr) { - debug!(error=?e, multiaddr=?UPNP_BROADCAST_IP, interface=?ifaddr, "error joining multicast v4 group"); +async fn bind_v6_socket() -> anyhow::Result { + let bind_addr = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, SSDP_PORT, 0, 0); + let socket = socket_presetup(bind_addr.into())?; + + for nic in network_interface::NetworkInterface::show() + .into_iter() + .flatten() + { + let mut has_link_local = false; + let mut has_site_local = false; + for addr in nic.addr.iter() { + let addr = match addr.ip() { + IpAddr::V4(_) => continue, + IpAddr::V6(v6) => v6, + }; + if addr.is_loopback() { + continue; + } + if ipv6_is_link_local(addr) { + has_link_local = true; + } else { + has_site_local = true; + } + } + for (present, multiaddr) in [ + (has_link_local, SSDP_MCAST_IPV6_LINK_LOCAL), + (has_site_local, SSDP_MCAST_IPV6_SITE_LOCAL), + ] { + if !present { + continue; + } + if let Err(e) = socket.join_multicast_v6(&multiaddr, nic.index) { + debug!(multiaddr=?multiaddr, interface=?nic.index, "error joining multicast v6 group: {e:#}"); } } + } + + Ok(socket) +} + +struct MulticastOpts { + local_interface_ip: IpAddr, + #[allow(dead_code)] + local_interface_id: u32, + addr: SocketAddr, +} + +fn set_mcast_if(sock: &UdpSocket, local_ip: Ipv4Addr) -> std::io::Result<()> { + let addr = libc::in_addr { + s_addr: u32::from_ne_bytes(local_ip.octets()), + }; + const SZ: usize = std::mem::size_of::(); + + trace!(addr = %local_ip, "setting IP_MULTICAST_IF"); + let ret = unsafe { + libc::setsockopt( + sock.as_fd().as_raw_fd(), + libc::IPPROTO_IP, + libc::IP_MULTICAST_IF, + &addr as *const _ as _, + SZ as u32, + ) + }; + if ret < 0 { + return Err(std::io::Error::last_os_error()); + } + Ok(()) +} + +impl MulticastOpts { + fn addr_no_scope(&self) -> SocketAddr { + let mut addr = self.addr; + if let SocketAddr::V6(v6) = &mut addr { + v6.set_scope_id(0); + } + addr + } +} - Ok(Self { opts, socket }) +impl SsdpRunner { + pub async fn new(opts: SsdpRunnerOptions) -> anyhow::Result { + let socket_v4 = bind_v4_socket() + .await + .map_err(|e| warn!("error creating IPv4 SSDP socket: {e:#}")) + .ok(); + let socket_v6 = bind_v6_socket() + .await + .map_err(|e| warn!("error creating IPv6 SSDP socket: {e:#}")) + .ok(); + Ok(Self { + opts, + socket_v4, + socket_v6, + }) } - fn generate_notify_message(&self, kind: &str, nts: &str, location: &url::Url) -> String { + fn generate_notify_message(&self, kind: &str, nts: &str, opts: &MulticastOpts) -> String { let usn: &str = &self.opts.usn; let server: &str = &self.opts.server_string; - let bcast_addr = UPNP_BROADCAST_ADDR; + let host = opts.addr_no_scope(); + let mut location = self.opts.description_http_location.clone(); + let _ = location.set_ip_host(opts.local_interface_ip); format!( "NOTIFY * HTTP/1.1\r -Host: {bcast_addr}\r +Host: {host}\r Cache-Control: max-age=75\r Location: {location}\r NT: {kind}\r @@ -177,7 +294,7 @@ USN: {usn}::{kind}\r let local_ip = ::librqbit_upnp::get_local_ip_relative_to(addr.ip())?; let location = { let mut loc = self.opts.description_http_location.clone(); - loc.set_host(Some(&format!("{local_ip}")))?; + let _ = loc.set_ip_host(local_ip); loc }; let usn = &self.opts.usn; @@ -194,7 +311,10 @@ Content-Length: 0\r\n\r\n" )) } - async fn try_send_notifies(&self, nts: &str) { + async fn try_send_mcast_everywhere( + &self, + get_payload: &impl Fn(&MulticastOpts) -> bstr::BString, + ) { use network_interface::NetworkInterfaceConfig; let interfaces = network_interface::NetworkInterface::show(); let interfaces = match interfaces { @@ -205,52 +325,79 @@ Content-Length: 0\r\n\r\n" } }; + let sent = Mutex::new(HashSet::new()); + let sent = &sent; + let futs = interfaces .into_iter() - .flat_map(|ni| ni.addr) - .filter_map(|addr| { - match addr.ip() { - std::net::IpAddr::V4(a) if !a.is_loopback() && a.is_private() => Some(a), - _ => None + .flat_map(|ni| ni.addr.into_iter().map(move |a| (ni.index, a))) + .filter_map(|(ifidx, addr)| match addr.ip() { + std::net::IpAddr::V4(a) if !a.is_loopback() && a.is_private() => { + Some(MulticastOpts { + local_interface_ip: addr.ip(), + local_interface_id: ifidx, + addr: SocketAddr::V4(SocketAddrV4::new(SSDM_MCAST_IPV4, SSDP_PORT)), + }) } + std::net::IpAddr::V6(a) if !a.is_loopback() => Some(MulticastOpts { + local_interface_ip: addr.ip(), + local_interface_id: ifidx, + addr: { + let bip = if ipv6_is_link_local(a) { + SSDP_MCAST_IPV6_LINK_LOCAL + } else { + SSDP_MCAST_IPV6_SITE_LOCAL + }; + SocketAddr::V6(SocketAddrV6::new(bip, SSDP_PORT, 0, ifidx)) + }, + }), + _ => None, }) - .map(|ip| async move { - let addr = SocketAddrV4::new(ip, 0); - let sock = match tokio::net::UdpSocket::bind(addr).await { - Ok(sock) => sock, - Err(e) => { - debug!(%addr, error=?e, "error binding UDP to send NOTIFY"); - return; - } - }; + .map(|opts| async move { + let payload = get_payload(&opts); + if !sent + .lock() + .insert((payload.clone(), opts.local_interface_id, opts.addr)) + { + // don't send duplicates + return; + } - let mut location = self.opts.description_http_location.clone(); - location.set_host(Some(&format!("{ip}"))).unwrap(); - - macro_rules! gen { - ($kind:expr) => { - async { - let msg = self.generate_notify_message($kind, nts, &location); - trace!(content=?msg, addr=?UPNP_BROADCAST_ADDR, "sending SSDP NOTIFY"); - if let Err(e) = sock.send_to(msg.as_bytes(), UPNP_BROADCAST_ADDR).await { - debug!(sock_addr=%addr, error=%e, kind=$kind, nts, "error sending SSDP NOTIFY") - } else { - debug!(kind=$kind, nts, %location, "sent SSDP NOTIFY") - } + let sock = match ( + opts.local_interface_ip, + self.socket_v4.as_ref(), + self.socket_v6.as_ref(), + ) { + (IpAddr::V4(ip), Some(sock_v4), _) => { + if let Err(e) = set_mcast_if(sock_v4, ip) { + debug!(addr=%ip, "error calling set_mcast_if: {e:#}"); } + sock_v4 } - } - - let f1 = gen!(UPNP_KIND_ROOT_DEVICE); - let f2 = gen!(UPNP_KIND_MEDIASERVER) ; + (IpAddr::V6(_), _, Some(sock_v6)) => sock_v6, + _ => return, + }; - tokio::join!(f1, f2); + match sock.send_to(payload.as_slice(), opts.addr).await { + Ok(sz) => trace!(payload=?payload, addr=%opts.addr, size=sz, "sent"), + Err(e) => { + debug!(payload=?payload, addr=%opts.addr, "error sending: {e:#}") + } + }; }); futures::future::join_all(futs).await; } - async fn task_send_alive_notifies_periodically(&self) -> anyhow::Result<()> { + async fn try_send_notifies(&self, nts: &str) { + self.try_send_mcast_everywhere(&|opts| { + self.generate_notify_message(UPNP_KIND_MEDIASERVER, nts, opts) + .into() + }) + .await + } + + async fn task_send_alive_notifies_periodically(&self) { let mut interval = tokio::time::interval(self.opts.notify_interval); loop { interval.tick().await; @@ -258,7 +405,12 @@ Content-Length: 0\r\n\r\n" } } - async fn process_incoming_message(&self, msg: &[u8], addr: SocketAddr) -> anyhow::Result<()> { + async fn process_incoming_message( + &self, + msg: &[u8], + sock: &UdpSocket, + addr: SocketAddr, + ) -> anyhow::Result<()> { let mut headers = [httparse::EMPTY_HEADER; 16]; trace!(content = ?BStr::new(msg), ?addr, "received message"); let parsed = try_parse_ssdp(msg, &mut headers); @@ -281,8 +433,7 @@ Content-Length: 0\r\n\r\n" if let Ok(st) = std::str::from_utf8(msg.st) { let response = self.generate_ssdp_discover_response(st, addr)?; trace!(content = response, ?addr, "sending SSDP discover response"); - self.socket - .send_to(response.as_bytes(), addr) + sock.send_to(response.as_bytes(), addr) .await .context("error sending")?; } @@ -290,51 +441,57 @@ Content-Length: 0\r\n\r\n" Ok(()) } - async fn task_respond_on_msearches(&self) -> anyhow::Result<()> { + async fn task_respond_on_msearches(&self, sock: Option<&UdpSocket>) { let mut buf = vec![0u8; 16184]; + let sock = match sock { + Some(sock) => sock, + None => return, + }; loop { - let (sz, addr) = self - .socket - .recv_from(&mut buf) - .await - .context("error receiving")?; + let (sz, addr) = match sock.recv_from(&mut buf).await { + Ok((sz, addr)) => (sz, addr), + Err(e) => { + warn!(error=?e, "error receving"); + return; + } + }; let msg = &buf[..sz]; - if let Err(e) = self.process_incoming_message(msg, addr).await { + if let Err(e) = self.process_incoming_message(msg, sock, addr).await { warn!(error=?e, ?addr, "error processing incoming SSDP message") } } } - async fn send_msearch(&self) -> anyhow::Result<()> { - let msearch_msg = "M-SEARCH * HTTP/1.1\r -HOST: 239.255.255.250:1900\r + async fn try_send_example_msearch(&self) { + self.try_send_mcast_everywhere(&|opts| { + let dest = opts.addr_no_scope(); + format!( + "M-SEARCH * HTTP/1.1\r +HOST: {dest}\r ST: urn:schemas-upnp-org:device:MediaServer:1\r MAN: \"ssdp:discover\"\r -MX: 2\r\n\r\n"; - - trace!(content = msearch_msg, "multicasting M-SEARCH"); - - self.socket - .send_to(msearch_msg.as_bytes(), UPNP_BROADCAST_ADDR) - .await - .context("error sending msearch")?; - Ok(()) +MX: 2\r\n\r\n" + ) + .into() + }) + .await } pub async fn run_forever(&self) -> anyhow::Result<()> { // This isn't necessary, but would show that it works. - self.send_msearch().await?; - - let t1 = self.task_respond_on_msearches(); - let t2 = self.task_send_alive_notifies_periodically(); - - tokio::pin!(t1); - tokio::pin!(t2); + let t0 = self.try_send_example_msearch(); + let t1 = self.task_respond_on_msearches(self.socket_v4.as_ref()); + let t2 = self.task_respond_on_msearches(self.socket_v6.as_ref()); + let t3 = self.task_send_alive_notifies_periodically(); + + let wait = async move { + tokio::join!(t0, t1, t2, t3); + Ok(()) + }; tokio::select! { - r = &mut t1 => r, - r = &mut t2 => r, + r = wait => r, _ = self.opts.shutdown.cancelled() => { self.try_send_notifies(NTS_BYEBYE).await; bail!("canceled"); diff --git a/crates/upnp/src/lib.rs b/crates/upnp/src/lib.rs index 0feb3e62..f7448c1a 100644 --- a/crates/upnp/src/lib.rs +++ b/crates/upnp/src/lib.rs @@ -6,7 +6,7 @@ use reqwest::Client; use serde::Deserialize; use std::{ collections::HashSet, - net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4}, time::Duration, }; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; @@ -30,21 +30,21 @@ pub fn make_ssdp_search_request(kind: &str) -> String { ) } -pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result { - let local_dest = match local_dest { - IpAddr::V4(v4) => v4, - IpAddr::V6(v6) => { - anyhow::bail!("get_local_ip_relative_to not implemented for IPv6; addr={v6}") - } - }; - - // Ipv4Addr.to_bits() is only there in nightly rust, so copying here for now. - fn ip_bits(addr: Ipv4Addr) -> u32 { +pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result { + fn ip_bits_v4(addr: Ipv4Addr) -> u32 { u32::from_be_bytes(addr.octets()) } - fn masked(ip: Ipv4Addr, mask: Ipv4Addr) -> u32 { - ip_bits(ip) & ip_bits(mask) + fn masked_v4(ip: Ipv4Addr, mask: Ipv4Addr) -> u32 { + ip_bits_v4(ip) & ip_bits_v4(mask) + } + + fn ip_bits_v6(addr: Ipv6Addr) -> u128 { + u128::from_be_bytes(addr.octets()) + } + + fn masked_v6(ip: Ipv6Addr, mask: Ipv6Addr) -> u128 { + ip_bits_v6(ip) & ip_bits_v6(mask) } let interfaces = @@ -52,18 +52,18 @@ pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result for i in interfaces { for addr in i.addr { - if let network_interface::Addr::V4(v4) = addr { - let ip = v4.ip; - let mask = match v4.netmask { - Some(mask) => mask, - None => continue, - }; - trace!("found local addr {ip}/{mask}"); - // If the masked address is the same, means we are on the same network, return - // the ip address - if masked(ip, mask) == masked(local_dest, mask) { - return Ok(ip); + match (local_dest, addr.ip(), addr.netmask()) { + (IpAddr::V4(l), IpAddr::V4(a), Some(IpAddr::V4(m))) + if masked_v4(l, m) == masked_v4(a, m) => + { + return Ok(addr.ip()) + } + (IpAddr::V6(l), IpAddr::V6(a), Some(IpAddr::V6(m))) + if masked_v6(l, m) == masked_v6(a, m) => + { + return Ok(addr.ip()) } + _ => continue, } } } @@ -72,7 +72,7 @@ pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result async fn forward_port( control_url: Url, - local_ip: Ipv4Addr, + local_ip: IpAddr, port: u16, lease_duration: Duration, ) -> anyhow::Result<()> { @@ -229,7 +229,7 @@ impl UpnpEndpoint { .flat_map(move |d| d.iter_services(self_span.clone())) } - fn my_local_ip(&self) -> anyhow::Result { + fn my_local_ip(&self) -> anyhow::Result { let dest_ip = self.discover_response.received_from.ip(); let local_ip = get_local_ip_relative_to(dest_ip) .with_context(|| format!("can't determine local IP relative to {dest_ip}"))?; @@ -419,7 +419,7 @@ impl UpnpPortForwarder { } } - async fn manage_port(&self, control_url: Url, local_ip: Ipv4Addr, port: u16) -> ! { + async fn manage_port(&self, control_url: Url, local_ip: IpAddr, port: u16) -> ! { let lease_duration = self.opts.lease_duration; let mut interval = tokio::time::interval(lease_duration / 2); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); @@ -433,7 +433,7 @@ impl UpnpPortForwarder { } } - async fn manage_service(&self, control_url: Url, local_ip: Ipv4Addr) -> anyhow::Result<()> { + async fn manage_service(&self, control_url: Url, local_ip: IpAddr) -> anyhow::Result<()> { futures::future::join_all(self.ports.iter().cloned().map(|port| { self.manage_port(control_url.clone(), local_ip, port) .instrument(error_span!("manage_port", port = port)) From 7fd4730b47cef79a53e312db11602cdbb0f62e3d Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 15 Sep 2024 17:43:53 +0100 Subject: [PATCH 02/11] dont error SSDP on cancel --- crates/upnp-serve/src/ssdp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/upnp-serve/src/ssdp.rs b/crates/upnp-serve/src/ssdp.rs index 87bd2c9c..3ca75ffa 100644 --- a/crates/upnp-serve/src/ssdp.rs +++ b/crates/upnp-serve/src/ssdp.rs @@ -494,7 +494,7 @@ MX: 2\r\n\r\n" r = wait => r, _ = self.opts.shutdown.cancelled() => { self.try_send_notifies(NTS_BYEBYE).await; - bail!("canceled"); + Ok(()) } } } From b3263c24d38289b4015bada46898ab92900e25d0 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 15 Sep 2024 17:52:45 +0100 Subject: [PATCH 03/11] fixed span.enter in session --- crates/librqbit/src/session.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index b9e2d0c4..1c0dee28 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1198,17 +1198,13 @@ impl Session { peer_rx, ); - { - let span = managed_torrent.shared.span.clone(); - let _ = span.enter(); - - managed_torrent - .start(peer_rx, opts.paused) - .context("error starting torrent")?; - } + let _e = managed_torrent.shared.span.clone().entered(); + managed_torrent + .start(peer_rx, opts.paused) + .context("error starting torrent")?; if let Some(name) = managed_torrent.shared().info.name.as_ref() { - info!(?name, id, "added torrent"); + info!(?name, "added torrent"); } Ok(AddTorrentResponse::Added(id, managed_torrent)) From 8d8d5491f9624d994ffa7a53d9aa2c93f6b41a07 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 15 Sep 2024 22:28:43 +0100 Subject: [PATCH 04/11] trying to fix windows multicast --- Cargo.lock | 1 + crates/upnp-serve/Cargo.toml | 3 +++ crates/upnp-serve/src/ssdp.rs | 50 ++++++++++++++++++++++++----------- 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2e00a744..05b39cdc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2710,6 +2710,7 @@ dependencies = [ "tracing-subscriber", "url", "uuid", + "winapi", ] [[package]] diff --git a/crates/upnp-serve/Cargo.toml b/crates/upnp-serve/Cargo.toml index c2edf939..2ff0b228 100644 --- a/crates/upnp-serve/Cargo.toml +++ b/crates/upnp-serve/Cargo.toml @@ -41,6 +41,9 @@ network-interface = "2.0.0" futures = "0.3.30" libc = "0.2.158" +[target.'cfg(windows)'.dependencies] +winapi = { version = "0.3.9", features = ["winsock2"] } + [dev-dependencies] tracing-subscriber = "0.3.18" tower-http = { version = "0.5", features = ["trace"] } diff --git a/crates/upnp-serve/src/ssdp.rs b/crates/upnp-serve/src/ssdp.rs index 3ca75ffa..37afb45d 100644 --- a/crates/upnp-serve/src/ssdp.rs +++ b/crates/upnp-serve/src/ssdp.rs @@ -1,7 +1,6 @@ use std::{ collections::HashSet, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, - os::fd::{AsFd, AsRawFd}, time::Duration, }; @@ -217,24 +216,43 @@ struct MulticastOpts { addr: SocketAddr, } -fn set_mcast_if(sock: &UdpSocket, local_ip: Ipv4Addr) -> std::io::Result<()> { - let addr = libc::in_addr { - s_addr: u32::from_ne_bytes(local_ip.octets()), - }; - const SZ: usize = std::mem::size_of::(); +fn set_mcast_if(sock: &UdpSocket, local_ip: Ipv4Addr) -> anyhow::Result<()> { + // in_addr is the same on unix and windows and contains just the 4 bytes of IPv4 in network + // byte order. + let addr = u32::from_ne_bytes(local_ip.octets()); + let sz: usize = std::mem::size_of_val(&addr); trace!(addr = %local_ip, "setting IP_MULTICAST_IF"); - let ret = unsafe { - libc::setsockopt( - sock.as_fd().as_raw_fd(), - libc::IPPROTO_IP, - libc::IP_MULTICAST_IF, - &addr as *const _ as _, - SZ as u32, - ) - }; + + let ret: i32; + #[cfg(target_os = "windows")] + { + use std::os::windows::io::AsRawSocket; + ret = unsafe { + winapi::um::winsock2::setsockopt( + sock.as_raw_socket().try_into()?, + winapi::shared::ws2def::IPPROTO_IP, + winapi::shared::ws2ipdef::IP_MULTICAST_IF, + &addr as *const _ as _, + sz.try_into()?, + ) + }; + } + #[cfg(not(target_os = "windows"))] + { + use std::os::fd::{AsFd, AsRawFd}; + ret = unsafe { + libc::setsockopt( + sock.as_fd().as_raw_fd(), + libc::IPPROTO_IP, + libc::IP_MULTICAST_IF, + &addr as *const _ as _, + sz.try_into()?, + ) + }; + } if ret < 0 { - return Err(std::io::Error::last_os_error()); + return Err(std::io::Error::last_os_error().into()); } Ok(()) } From 028addcf6aa9a54a219389e48885432aac552aae Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 16 Sep 2024 09:21:21 +0100 Subject: [PATCH 05/11] Nothing: renames and tiny refactors --- crates/upnp-serve/src/constants.rs | 4 +- crates/upnp-serve/src/ssdp.rs | 65 ++++++++++++++++++------------ 2 files changed, 42 insertions(+), 27 deletions(-) diff --git a/crates/upnp-serve/src/constants.rs b/crates/upnp-serve/src/constants.rs index ad43f984..c14fe7c2 100644 --- a/crates/upnp-serve/src/constants.rs +++ b/crates/upnp-serve/src/constants.rs @@ -1,5 +1,5 @@ -pub const UPNP_KIND_ROOT_DEVICE: &str = "upnp:rootdevice"; -pub const UPNP_KIND_MEDIASERVER: &str = "urn:schemas-upnp-org:device:MediaServer:1"; +pub const UPNP_DEVICE_ROOT: &str = "upnp:rootdevice"; +pub const UPNP_DEVICE_MEDIASERVER: &str = "urn:schemas-upnp-org:device:MediaServer:1"; pub const SOAP_ACTION_CONTENT_DIRECTORY_BROWSE: &[u8] = b"\"urn:schemas-upnp-org:service:ContentDirectory:1#Browse\""; diff --git a/crates/upnp-serve/src/ssdp.rs b/crates/upnp-serve/src/ssdp.rs index 37afb45d..5ffd9a90 100644 --- a/crates/upnp-serve/src/ssdp.rs +++ b/crates/upnp-serve/src/ssdp.rs @@ -12,7 +12,7 @@ use tokio::net::UdpSocket; use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; -use crate::constants::{UPNP_KIND_MEDIASERVER, UPNP_KIND_ROOT_DEVICE}; +use crate::constants::{UPNP_DEVICE_MEDIASERVER, UPNP_DEVICE_ROOT}; const SSDP_PORT: u16 = 1900; const SSDM_MCAST_IPV4: Ipv4Addr = Ipv4Addr::new(239, 255, 255, 250); @@ -23,8 +23,10 @@ const NTS_ALIVE: &str = "ssdp:alive"; const NTS_BYEBYE: &str = "ssdp:byebye"; fn ipv6_is_link_local(ip: Ipv6Addr) -> bool { - let s = ip.segments(); - [s[0], s[1], s[2], s[3]] == [0xfe80, 0, 0, 0] + const LL: Ipv6Addr = Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 0); + const MASK: Ipv6Addr = Ipv6Addr::new(0xffff, 0xffff, 0xffff, 0xffff, 0, 0, 0, 0); + + ip.to_bits() & MASK.to_bits() == LL.to_bits() & MASK.to_bits() } #[derive(Debug)] @@ -49,7 +51,7 @@ impl<'a> SsdpMSearchRequest<'a> { if self.man != "\"ssdp:discover\"" { return false; } - if self.st == UPNP_KIND_ROOT_DEVICE || self.st == UPNP_KIND_MEDIASERVER { + if self.st == UPNP_DEVICE_ROOT || self.st == UPNP_DEVICE_MEDIASERVER { return true; } false @@ -200,6 +202,7 @@ async fn bind_v6_socket() -> anyhow::Result { if !present { continue; } + trace!(multiaddr=?multiaddr, interface=?nic.index, "joining multicast v6 group"); if let Err(e) = socket.join_multicast_v6(&multiaddr, nic.index) { debug!(multiaddr=?multiaddr, interface=?nic.index, "error joining multicast v6 group: {e:#}"); } @@ -210,10 +213,10 @@ async fn bind_v6_socket() -> anyhow::Result { } struct MulticastOpts { - local_interface_ip: IpAddr, + interface_addr: IpAddr, #[allow(dead_code)] - local_interface_id: u32, - addr: SocketAddr, + interface_id: u32, + mcast_addr: SocketAddr, } fn set_mcast_if(sock: &UdpSocket, local_ip: Ipv4Addr) -> anyhow::Result<()> { @@ -259,7 +262,7 @@ fn set_mcast_if(sock: &UdpSocket, local_ip: Ipv4Addr) -> anyhow::Result<()> { impl MulticastOpts { fn addr_no_scope(&self) -> SocketAddr { - let mut addr = self.addr; + let mut addr = self.mcast_addr; if let SocketAddr::V6(v6) = &mut addr { v6.set_scope_id(0); } @@ -284,21 +287,26 @@ impl SsdpRunner { }) } - fn generate_notify_message(&self, kind: &str, nts: &str, opts: &MulticastOpts) -> String { + fn generate_notify_message( + &self, + device_kind: &str, + nts: &str, + opts: &MulticastOpts, + ) -> String { let usn: &str = &self.opts.usn; let server: &str = &self.opts.server_string; let host = opts.addr_no_scope(); let mut location = self.opts.description_http_location.clone(); - let _ = location.set_ip_host(opts.local_interface_ip); + let _ = location.set_ip_host(opts.interface_addr); format!( "NOTIFY * HTTP/1.1\r Host: {host}\r Cache-Control: max-age=75\r Location: {location}\r -NT: {kind}\r +NT: {device_kind}\r NTS: {nts}\r Server: {server}\r -USN: {usn}::{kind}\r +USN: {usn}::{device_kind}\r \r " ) @@ -352,15 +360,15 @@ Content-Length: 0\r\n\r\n" .filter_map(|(ifidx, addr)| match addr.ip() { std::net::IpAddr::V4(a) if !a.is_loopback() && a.is_private() => { Some(MulticastOpts { - local_interface_ip: addr.ip(), - local_interface_id: ifidx, - addr: SocketAddr::V4(SocketAddrV4::new(SSDM_MCAST_IPV4, SSDP_PORT)), + interface_addr: addr.ip(), + interface_id: ifidx, + mcast_addr: SocketAddr::V4(SocketAddrV4::new(SSDM_MCAST_IPV4, SSDP_PORT)), }) } std::net::IpAddr::V6(a) if !a.is_loopback() => Some(MulticastOpts { - local_interface_ip: addr.ip(), - local_interface_id: ifidx, - addr: { + interface_addr: addr.ip(), + interface_id: ifidx, + mcast_addr: { let bip = if ipv6_is_link_local(a) { SSDP_MCAST_IPV6_LINK_LOCAL } else { @@ -375,20 +383,27 @@ Content-Length: 0\r\n\r\n" let payload = get_payload(&opts); if !sent .lock() - .insert((payload.clone(), opts.local_interface_id, opts.addr)) + .insert((payload.clone(), opts.interface_id, opts.mcast_addr)) { // don't send duplicates return; } let sock = match ( - opts.local_interface_ip, + opts.interface_addr, self.socket_v4.as_ref(), self.socket_v6.as_ref(), ) { + // For IPv4 sockets, call setsockopt(IP_MULTICAST_IF), so that the message + // gets sent out of the interface we want (otherwise it'll get sent through + // default one). + // For IPv6 it's not necessary as we specify scope_id in SocketAddr. + // + // It's important we don't .await() in between also, so that concurrent sends + // have the proper IP_MULTICAST_IF. (IpAddr::V4(ip), Some(sock_v4), _) => { if let Err(e) = set_mcast_if(sock_v4, ip) { - debug!(addr=%ip, "error calling set_mcast_if: {e:#}"); + debug!(addr=%ip, "error setting IP_MULTICAST_IF: {e:#}"); } sock_v4 } @@ -396,10 +411,10 @@ Content-Length: 0\r\n\r\n" _ => return, }; - match sock.send_to(payload.as_slice(), opts.addr).await { - Ok(sz) => trace!(payload=?payload, addr=%opts.addr, size=sz, "sent"), + match sock.send_to(payload.as_slice(), opts.mcast_addr).await { + Ok(sz) => trace!(payload=?payload, addr=%opts.mcast_addr, size=sz, "sent"), Err(e) => { - debug!(payload=?payload, addr=%opts.addr, "error sending: {e:#}") + debug!(payload=?payload, addr=%opts.mcast_addr, "error sending: {e:#}") } }; }); @@ -409,7 +424,7 @@ Content-Length: 0\r\n\r\n" async fn try_send_notifies(&self, nts: &str) { self.try_send_mcast_everywhere(&|opts| { - self.generate_notify_message(UPNP_KIND_MEDIASERVER, nts, opts) + self.generate_notify_message(UPNP_DEVICE_MEDIASERVER, nts, opts) .into() }) .await From 6193cc942b6e3713f5b5c27fa4924970416c195c Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 16 Sep 2024 10:21:40 +0100 Subject: [PATCH 06/11] doesnt help... maybe set_mcast_if need to call on ipv6 too --- crates/upnp-serve/src/ssdp.rs | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/crates/upnp-serve/src/ssdp.rs b/crates/upnp-serve/src/ssdp.rs index 5ffd9a90..d7c961d0 100644 --- a/crates/upnp-serve/src/ssdp.rs +++ b/crates/upnp-serve/src/ssdp.rs @@ -196,8 +196,8 @@ async fn bind_v6_socket() -> anyhow::Result { } } for (present, multiaddr) in [ - (has_link_local, SSDP_MCAST_IPV6_LINK_LOCAL), (has_site_local, SSDP_MCAST_IPV6_SITE_LOCAL), + (has_link_local, SSDP_MCAST_IPV6_LINK_LOCAL), ] { if !present { continue; @@ -356,7 +356,10 @@ Content-Length: 0\r\n\r\n" let futs = interfaces .into_iter() - .flat_map(|ni| ni.addr.into_iter().map(move |a| (ni.index, a))) + .flat_map(|ni| { + trace!(name=ni.name, addr=?ni.addr, id=ni.index, "found network interface"); + ni.addr.into_iter().map(move |a| (ni.index, a)) + }) .filter_map(|(ifidx, addr)| match addr.ip() { std::net::IpAddr::V4(a) if !a.is_loopback() && a.is_private() => { Some(MulticastOpts { @@ -369,15 +372,17 @@ Content-Length: 0\r\n\r\n" interface_addr: addr.ip(), interface_id: ifidx, mcast_addr: { - let bip = if ipv6_is_link_local(a) { - SSDP_MCAST_IPV6_LINK_LOCAL + if ipv6_is_link_local(a) { + SocketAddr::V6(SocketAddrV6::new(SSDP_MCAST_IPV6_LINK_LOCAL, SSDP_PORT, 0, ifidx)) } else { - SSDP_MCAST_IPV6_SITE_LOCAL - }; - SocketAddr::V6(SocketAddrV6::new(bip, SSDP_PORT, 0, ifidx)) + SocketAddr::V6(SocketAddrV6::new(SSDP_MCAST_IPV6_SITE_LOCAL, SSDP_PORT, 0, 0)) + } }, }), - _ => None, + _ => { + trace!(oif_id=ifidx, addr=%addr.ip(), "ignoring address"); + None + }, }) .map(|opts| async move { let payload = get_payload(&opts); @@ -385,7 +390,7 @@ Content-Length: 0\r\n\r\n" .lock() .insert((payload.clone(), opts.interface_id, opts.mcast_addr)) { - // don't send duplicates + trace!(oif_id=opts.interface_id, addr=%opts.mcast_addr, "not sending duplicate payload"); return; } @@ -408,13 +413,16 @@ Content-Length: 0\r\n\r\n" sock_v4 } (IpAddr::V6(_), _, Some(sock_v6)) => sock_v6, - _ => return, + _ => { + trace!(addr=%opts.interface_addr, "ignoring address, no socket to send to"); + return; + }, }; match sock.send_to(payload.as_slice(), opts.mcast_addr).await { - Ok(sz) => trace!(payload=?payload, addr=%opts.mcast_addr, size=sz, "sent"), + Ok(sz) => trace!(addr=%opts.mcast_addr, oif_id=opts.interface_id, oif_addr=%opts.interface_addr, size=sz, payload=?payload, "sent"), Err(e) => { - debug!(payload=?payload, addr=%opts.mcast_addr, "error sending: {e:#}") + debug!(addr=%opts.mcast_addr, oif_id=opts.interface_id, oif_addr=%opts.interface_addr, payload=?payload, "error sending: {e:#}") } }; }); From 403a4ce48059749563789b85bbb60bf8a53a0d7d Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 16 Sep 2024 10:49:56 +0100 Subject: [PATCH 07/11] works on linux now too --- crates/upnp-serve/src/ssdp.rs | 67 ++++++++++++++++++++++++++++------- 1 file changed, 54 insertions(+), 13 deletions(-) diff --git a/crates/upnp-serve/src/ssdp.rs b/crates/upnp-serve/src/ssdp.rs index d7c961d0..ceb71f28 100644 --- a/crates/upnp-serve/src/ssdp.rs +++ b/crates/upnp-serve/src/ssdp.rs @@ -219,7 +219,7 @@ struct MulticastOpts { mcast_addr: SocketAddr, } -fn set_mcast_if(sock: &UdpSocket, local_ip: Ipv4Addr) -> anyhow::Result<()> { +fn set_mcast_if_v4(sock: &UdpSocket, local_ip: Ipv4Addr) -> anyhow::Result<()> { // in_addr is the same on unix and windows and contains just the 4 bytes of IPv4 in network // byte order. let addr = u32::from_ne_bytes(local_ip.octets()); @@ -260,6 +260,47 @@ fn set_mcast_if(sock: &UdpSocket, local_ip: Ipv4Addr) -> anyhow::Result<()> { Ok(()) } +fn set_mcast_if_v6(sock: &UdpSocket, dev_idx: u32) -> anyhow::Result<()> { + // in_addr is the same on unix and windows and contains just the 4 bytes of IPv4 in network + // byte order. + trace!(dev_idx, "setting IP_MULTICAST_IF"); + + let ret: i32; + #[cfg(target_os = "windows")] + { + use std::os::windows::io::AsRawSocket; + let sz: usize = std::mem::size_of_val(&dev_idx); + ret = unsafe { + winapi::um::winsock2::setsockopt( + sock.as_raw_socket().try_into()?, + winapi::shared::ws2def::IPPROTO_IPV6, + winapi::shared::ws2ipdef::IPV6_MULTICAST_IF, + &dev_idx as *const _ as _, + sz.try_into()?, + ) + }; + } + #[cfg(not(target_os = "windows"))] + { + use std::os::fd::{AsFd, AsRawFd}; + let dev_idx = dev_idx as i32; + let sz: usize = std::mem::size_of_val(&dev_idx); + ret = unsafe { + libc::setsockopt( + sock.as_fd().as_raw_fd(), + libc::IPPROTO_IPV6, + libc::IPV6_MULTICAST_IF, + &dev_idx as *const _ as _, + sz.try_into()?, + ) + }; + } + if ret < 0 { + return Err(std::io::Error::last_os_error().into()); + } + Ok(()) +} + impl MulticastOpts { fn addr_no_scope(&self) -> SocketAddr { let mut addr = self.mcast_addr; @@ -356,10 +397,9 @@ Content-Length: 0\r\n\r\n" let futs = interfaces .into_iter() - .flat_map(|ni| { - trace!(name=ni.name, addr=?ni.addr, id=ni.index, "found network interface"); + .flat_map(|ni| ni.addr.into_iter().map(move |a| (ni.index, a)) - }) + ) .filter_map(|(ifidx, addr)| match addr.ip() { std::net::IpAddr::V4(a) if !a.is_loopback() && a.is_private() => { Some(MulticastOpts { @@ -375,7 +415,7 @@ Content-Length: 0\r\n\r\n" if ipv6_is_link_local(a) { SocketAddr::V6(SocketAddrV6::new(SSDP_MCAST_IPV6_LINK_LOCAL, SSDP_PORT, 0, ifidx)) } else { - SocketAddr::V6(SocketAddrV6::new(SSDP_MCAST_IPV6_SITE_LOCAL, SSDP_PORT, 0, 0)) + SocketAddr::V6(SocketAddrV6::new(SSDP_MCAST_IPV6_SITE_LOCAL, SSDP_PORT, 0, ifidx)) } }, }), @@ -399,20 +439,21 @@ Content-Length: 0\r\n\r\n" self.socket_v4.as_ref(), self.socket_v6.as_ref(), ) { - // For IPv4 sockets, call setsockopt(IP_MULTICAST_IF), so that the message + // Call setsockopt(IP_MULTICAST_IF), so that the message // gets sent out of the interface we want (otherwise it'll get sent through // default one). - // For IPv6 it's not necessary as we specify scope_id in SocketAddr. - // - // It's important we don't .await() in between also, so that concurrent sends - // have the proper IP_MULTICAST_IF. (IpAddr::V4(ip), Some(sock_v4), _) => { - if let Err(e) = set_mcast_if(sock_v4, ip) { - debug!(addr=%ip, "error setting IP_MULTICAST_IF: {e:#}"); + if let Err(e) = set_mcast_if_v4(sock_v4, ip) { + debug!(addr=%ip, "error calling set_mcast_if_v4: {e:#}"); } sock_v4 } - (IpAddr::V6(_), _, Some(sock_v6)) => sock_v6, + (IpAddr::V6(_), _, Some(sock_v6)) => { + if let Err(e) = set_mcast_if_v6(sock_v6, opts.interface_id) { + debug!(oif_id=opts.interface_id, "error calling set_mcast_if_v6: {e:#}"); + } + sock_v6 + }, _ => { trace!(addr=%opts.interface_addr, "ignoring address, no socket to send to"); return; From f7e083545250e3468dd10d7c20f852d92a091c43 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 16 Sep 2024 11:16:46 +0100 Subject: [PATCH 08/11] Remove custom libc/winapi code in favor of duplicating the socket and using both socket2 and tokio --- Cargo.lock | 2 - crates/upnp-serve/Cargo.toml | 4 -- crates/upnp-serve/src/ssdp.rs | 119 +++++++--------------------------- 3 files changed, 24 insertions(+), 101 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 05b39cdc..00f23c05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2692,7 +2692,6 @@ dependencies = [ "gethostname", "http 1.1.0", "httparse", - "libc", "librqbit-core", "librqbit-sha1-wrapper", "librqbit-upnp", @@ -2710,7 +2709,6 @@ dependencies = [ "tracing-subscriber", "url", "uuid", - "winapi", ] [[package]] diff --git a/crates/upnp-serve/Cargo.toml b/crates/upnp-serve/Cargo.toml index 2ff0b228..176b116e 100644 --- a/crates/upnp-serve/Cargo.toml +++ b/crates/upnp-serve/Cargo.toml @@ -39,10 +39,6 @@ socket2 = "0.5.7" quick-xml = { version = "0.36.1", features = ["serialize"] } network-interface = "2.0.0" futures = "0.3.30" -libc = "0.2.158" - -[target.'cfg(windows)'.dependencies] -winapi = { version = "0.3.9", features = ["winsock2"] } [dev-dependencies] tracing-subscriber = "0.3.18" diff --git a/crates/upnp-serve/src/ssdp.rs b/crates/upnp-serve/src/ssdp.rs index ceb71f28..a279ab57 100644 --- a/crates/upnp-serve/src/ssdp.rs +++ b/crates/upnp-serve/src/ssdp.rs @@ -8,7 +8,6 @@ use anyhow::{bail, Context}; use bstr::BStr; use network_interface::NetworkInterfaceConfig; use parking_lot::Mutex; -use tokio::net::UdpSocket; use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; @@ -109,13 +108,18 @@ pub struct SsdpRunnerOptions { pub shutdown: CancellationToken, } +struct UdpSocket { + sock2: socket2::Socket, + tokio: tokio::net::UdpSocket, +} + pub struct SsdpRunner { opts: SsdpRunnerOptions, socket_v4: Option, socket_v6: Option, } -fn socket_presetup(bind_addr: SocketAddr) -> anyhow::Result { +fn socket_presetup(bind_addr: SocketAddr) -> anyhow::Result { let domain = if bind_addr.is_ipv4() { socket2::Domain::IPV4 } else { @@ -136,10 +140,16 @@ fn socket_presetup(bind_addr: SocketAddr) -> anyhow::Result anyhow::Result { @@ -163,7 +173,7 @@ async fn bind_v4_socket() -> anyhow::Result { for ifaddr in default_multiast_membership_ip.chain(all_multicast_membership_ips) { trace!(multiaddr=?SSDM_MCAST_IPV4, interface=?ifaddr, "joining multicast v4 group"); - if let Err(e) = socket.join_multicast_v4(SSDM_MCAST_IPV4, ifaddr) { + if let Err(e) = socket.tokio.join_multicast_v4(SSDM_MCAST_IPV4, ifaddr) { debug!(multiaddr=?SSDM_MCAST_IPV4, interface=?ifaddr, "error joining multicast v4 group: {e:#}"); } } @@ -203,7 +213,7 @@ async fn bind_v6_socket() -> anyhow::Result { continue; } trace!(multiaddr=?multiaddr, interface=?nic.index, "joining multicast v6 group"); - if let Err(e) = socket.join_multicast_v6(&multiaddr, nic.index) { + if let Err(e) = socket.tokio.join_multicast_v6(&multiaddr, nic.index) { debug!(multiaddr=?multiaddr, interface=?nic.index, "error joining multicast v6 group: {e:#}"); } } @@ -219,88 +229,6 @@ struct MulticastOpts { mcast_addr: SocketAddr, } -fn set_mcast_if_v4(sock: &UdpSocket, local_ip: Ipv4Addr) -> anyhow::Result<()> { - // in_addr is the same on unix and windows and contains just the 4 bytes of IPv4 in network - // byte order. - let addr = u32::from_ne_bytes(local_ip.octets()); - let sz: usize = std::mem::size_of_val(&addr); - - trace!(addr = %local_ip, "setting IP_MULTICAST_IF"); - - let ret: i32; - #[cfg(target_os = "windows")] - { - use std::os::windows::io::AsRawSocket; - ret = unsafe { - winapi::um::winsock2::setsockopt( - sock.as_raw_socket().try_into()?, - winapi::shared::ws2def::IPPROTO_IP, - winapi::shared::ws2ipdef::IP_MULTICAST_IF, - &addr as *const _ as _, - sz.try_into()?, - ) - }; - } - #[cfg(not(target_os = "windows"))] - { - use std::os::fd::{AsFd, AsRawFd}; - ret = unsafe { - libc::setsockopt( - sock.as_fd().as_raw_fd(), - libc::IPPROTO_IP, - libc::IP_MULTICAST_IF, - &addr as *const _ as _, - sz.try_into()?, - ) - }; - } - if ret < 0 { - return Err(std::io::Error::last_os_error().into()); - } - Ok(()) -} - -fn set_mcast_if_v6(sock: &UdpSocket, dev_idx: u32) -> anyhow::Result<()> { - // in_addr is the same on unix and windows and contains just the 4 bytes of IPv4 in network - // byte order. - trace!(dev_idx, "setting IP_MULTICAST_IF"); - - let ret: i32; - #[cfg(target_os = "windows")] - { - use std::os::windows::io::AsRawSocket; - let sz: usize = std::mem::size_of_val(&dev_idx); - ret = unsafe { - winapi::um::winsock2::setsockopt( - sock.as_raw_socket().try_into()?, - winapi::shared::ws2def::IPPROTO_IPV6, - winapi::shared::ws2ipdef::IPV6_MULTICAST_IF, - &dev_idx as *const _ as _, - sz.try_into()?, - ) - }; - } - #[cfg(not(target_os = "windows"))] - { - use std::os::fd::{AsFd, AsRawFd}; - let dev_idx = dev_idx as i32; - let sz: usize = std::mem::size_of_val(&dev_idx); - ret = unsafe { - libc::setsockopt( - sock.as_fd().as_raw_fd(), - libc::IPPROTO_IPV6, - libc::IPV6_MULTICAST_IF, - &dev_idx as *const _ as _, - sz.try_into()?, - ) - }; - } - if ret < 0 { - return Err(std::io::Error::last_os_error().into()); - } - Ok(()) -} - impl MulticastOpts { fn addr_no_scope(&self) -> SocketAddr { let mut addr = self.mcast_addr; @@ -443,14 +371,14 @@ Content-Length: 0\r\n\r\n" // gets sent out of the interface we want (otherwise it'll get sent through // default one). (IpAddr::V4(ip), Some(sock_v4), _) => { - if let Err(e) = set_mcast_if_v4(sock_v4, ip) { - debug!(addr=%ip, "error calling set_mcast_if_v4: {e:#}"); + if let Err(e) = sock_v4.sock2.set_multicast_if_v4(&ip) { + debug!(addr=%ip, "error calling set_multicast_if_v4: {e:#}"); } sock_v4 } (IpAddr::V6(_), _, Some(sock_v6)) => { - if let Err(e) = set_mcast_if_v6(sock_v6, opts.interface_id) { - debug!(oif_id=opts.interface_id, "error calling set_mcast_if_v6: {e:#}"); + if let Err(e) = sock_v6.sock2.set_multicast_if_v6(opts.interface_id) { + debug!(oif_id=opts.interface_id, "error calling set_multicast_if_v6: {e:#}"); } sock_v6 }, @@ -460,7 +388,7 @@ Content-Length: 0\r\n\r\n" }, }; - match sock.send_to(payload.as_slice(), opts.mcast_addr).await { + match sock.tokio.send_to(payload.as_slice(), opts.mcast_addr).await { Ok(sz) => trace!(addr=%opts.mcast_addr, oif_id=opts.interface_id, oif_addr=%opts.interface_addr, size=sz, payload=?payload, "sent"), Err(e) => { debug!(addr=%opts.mcast_addr, oif_id=opts.interface_id, oif_addr=%opts.interface_addr, payload=?payload, "error sending: {e:#}") @@ -515,7 +443,8 @@ Content-Length: 0\r\n\r\n" if let Ok(st) = std::str::from_utf8(msg.st) { let response = self.generate_ssdp_discover_response(st, addr)?; trace!(content = response, ?addr, "sending SSDP discover response"); - sock.send_to(response.as_bytes(), addr) + sock.tokio + .send_to(response.as_bytes(), addr) .await .context("error sending")?; } @@ -531,7 +460,7 @@ Content-Length: 0\r\n\r\n" }; loop { - let (sz, addr) = match sock.recv_from(&mut buf).await { + let (sz, addr) = match sock.tokio.recv_from(&mut buf).await { Ok((sz, addr)) => (sz, addr), Err(e) => { warn!(error=?e, "error receving"); From 03391764ccb95943cf170556bf71fc6a8013f2fd Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 16 Sep 2024 17:09:51 +0100 Subject: [PATCH 09/11] Remove scope_id when non link local addr --- crates/upnp-serve/src/ssdp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/upnp-serve/src/ssdp.rs b/crates/upnp-serve/src/ssdp.rs index a279ab57..3d0170d9 100644 --- a/crates/upnp-serve/src/ssdp.rs +++ b/crates/upnp-serve/src/ssdp.rs @@ -343,7 +343,7 @@ Content-Length: 0\r\n\r\n" if ipv6_is_link_local(a) { SocketAddr::V6(SocketAddrV6::new(SSDP_MCAST_IPV6_LINK_LOCAL, SSDP_PORT, 0, ifidx)) } else { - SocketAddr::V6(SocketAddrV6::new(SSDP_MCAST_IPV6_SITE_LOCAL, SSDP_PORT, 0, ifidx)) + SocketAddr::V6(SocketAddrV6::new(SSDP_MCAST_IPV6_SITE_LOCAL, SSDP_PORT, 0, 0)) } }, }), From 406947f997c08c785be532359dd8172428d03d4b Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 16 Sep 2024 17:25:19 +0100 Subject: [PATCH 10/11] debugging --- crates/upnp/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/upnp/src/lib.rs b/crates/upnp/src/lib.rs index f7448c1a..44c1f878 100644 --- a/crates/upnp/src/lib.rs +++ b/crates/upnp/src/lib.rs @@ -52,6 +52,7 @@ pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result { for i in interfaces { for addr in i.addr { + trace!(%local_dest, nic=i.index, ip=?addr.ip(), nm=?addr.netmask(), "dbg"); match (local_dest, addr.ip(), addr.netmask()) { (IpAddr::V4(l), IpAddr::V4(a), Some(IpAddr::V4(m))) if masked_v4(l, m) == masked_v4(a, m) => From 189dea36a067536503a914f8260862d34b2c890f Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 16 Sep 2024 17:51:47 +0100 Subject: [PATCH 11/11] Windows doesnt have ipv6 netamasks! at least in networkinterfaces package --- crates/upnp-serve/src/ssdp.rs | 10 ++------ crates/upnp/src/lib.rs | 46 ++++++++++++++++++++++++++--------- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/crates/upnp-serve/src/ssdp.rs b/crates/upnp-serve/src/ssdp.rs index 3d0170d9..7fa3cfbe 100644 --- a/crates/upnp-serve/src/ssdp.rs +++ b/crates/upnp-serve/src/ssdp.rs @@ -6,6 +6,7 @@ use std::{ use anyhow::{bail, Context}; use bstr::BStr; +use librqbit_upnp::ipv6_is_link_local; use network_interface::NetworkInterfaceConfig; use parking_lot::Mutex; use tokio_util::sync::CancellationToken; @@ -21,13 +22,6 @@ const SSDP_MCAST_IPV6_SITE_LOCAL: Ipv6Addr = Ipv6Addr::new(0xff05, 0, 0, 0, 0, 0 const NTS_ALIVE: &str = "ssdp:alive"; const NTS_BYEBYE: &str = "ssdp:byebye"; -fn ipv6_is_link_local(ip: Ipv6Addr) -> bool { - const LL: Ipv6Addr = Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 0); - const MASK: Ipv6Addr = Ipv6Addr::new(0xffff, 0xffff, 0xffff, 0xffff, 0, 0, 0, 0); - - ip.to_bits() & MASK.to_bits() == LL.to_bits() & MASK.to_bits() -} - #[derive(Debug)] pub enum SsdpMessage<'a, 'h> { MSearch(SsdpMSearchRequest<'a>), @@ -286,7 +280,7 @@ USN: {usn}::{device_kind}\r st: &str, addr: SocketAddr, ) -> anyhow::Result { - let local_ip = ::librqbit_upnp::get_local_ip_relative_to(addr.ip())?; + let local_ip = ::librqbit_upnp::get_local_ip_relative_to(addr)?; let location = { let mut loc = self.opts.description_http_location.clone(); let _ = loc.set_ip_host(local_ip); diff --git a/crates/upnp/src/lib.rs b/crates/upnp/src/lib.rs index 44c1f878..7ff09aa1 100644 --- a/crates/upnp/src/lib.rs +++ b/crates/upnp/src/lib.rs @@ -30,7 +30,19 @@ pub fn make_ssdp_search_request(kind: &str) -> String { ) } -pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result { +// .to_bits() isn't yet available on min rust version we support (1.75 at the time of writing this) +const fn ip_bits_v6(addr: Ipv6Addr) -> u128 { + u128::from_be_bytes(addr.octets()) +} + +pub fn ipv6_is_link_local(ip: Ipv6Addr) -> bool { + const LL: Ipv6Addr = Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 0); + const MASK: Ipv6Addr = Ipv6Addr::new(0xffff, 0xffff, 0xffff, 0xffff, 0, 0, 0, 0); + + ip_bits_v6(ip) & ip_bits_v6(MASK) == ip_bits_v6(LL) & ip_bits_v6(MASK) +} + +pub fn get_local_ip_relative_to(local_dest: SocketAddr) -> anyhow::Result { fn ip_bits_v4(addr: Ipv4Addr) -> u32 { u32::from_be_bytes(addr.octets()) } @@ -39,10 +51,6 @@ pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result { ip_bits_v4(ip) & ip_bits_v4(mask) } - fn ip_bits_v6(addr: Ipv6Addr) -> u128 { - u128::from_be_bytes(addr.octets()) - } - fn masked_v6(ip: Ipv6Addr, mask: Ipv6Addr) -> u128 { ip_bits_v6(ip) & ip_bits_v6(mask) } @@ -54,16 +62,30 @@ pub fn get_local_ip_relative_to(local_dest: IpAddr) -> anyhow::Result { for addr in i.addr { trace!(%local_dest, nic=i.index, ip=?addr.ip(), nm=?addr.netmask(), "dbg"); match (local_dest, addr.ip(), addr.netmask()) { - (IpAddr::V4(l), IpAddr::V4(a), Some(IpAddr::V4(m))) - if masked_v4(l, m) == masked_v4(a, m) => + // We are connecting to ourselves, return itself. + (l, a, _) if l.ip() == a => return Ok(addr.ip()), + // IPv4 masks match. + (SocketAddr::V4(l), IpAddr::V4(a), Some(IpAddr::V4(m))) + if masked_v4(*l.ip(), m) == masked_v4(a, m) => { return Ok(addr.ip()) } - (IpAddr::V6(l), IpAddr::V6(a), Some(IpAddr::V6(m))) - if masked_v6(l, m) == masked_v6(a, m) => + // Return IPv6 link-local addresses when source is link-local address and there's a scope_id set. + (SocketAddr::V6(l), IpAddr::V6(a), _) + if ipv6_is_link_local(*l.ip()) && l.scope_id() > 0 => + { + if ipv6_is_link_local(a) && l.scope_id() == i.index { + return Ok(addr.ip()); + } + } + // If V6 masks match, return. + (SocketAddr::V6(l), IpAddr::V6(a), Some(IpAddr::V6(m))) + if masked_v6(*l.ip(), m) == masked_v6(a, m) => { return Ok(addr.ip()) } + // For IPv6 fallback to returning a random (first encountered) IPv6 address. + (SocketAddr::V6(_), IpAddr::V6(_), None) => return Ok(addr.ip()), _ => continue, } } @@ -231,9 +253,9 @@ impl UpnpEndpoint { } fn my_local_ip(&self) -> anyhow::Result { - let dest_ip = self.discover_response.received_from.ip(); - let local_ip = get_local_ip_relative_to(dest_ip) - .with_context(|| format!("can't determine local IP relative to {dest_ip}"))?; + let received_from = self.discover_response.received_from; + let local_ip = get_local_ip_relative_to(received_from) + .with_context(|| format!("can't determine local IP relative to {received_from}"))?; Ok(local_ip) }