diff --git a/crates/dht/examples/dht.rs b/crates/dht/examples/dht.rs index 108122d0..727ae9fa 100644 --- a/crates/dht/examples/dht.rs +++ b/crates/dht/examples/dht.rs @@ -20,7 +20,7 @@ async fn main() -> anyhow::Result<()> { let dht = DhtBuilder::new().await.context("error initializing DHT")?; - let mut stream = dht.get_peers(info_hash, None)?; + let mut stream = dht.get_peers(info_hash, None); let stats_printer = async { loop { diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 84512c1b..72d6297f 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -1197,17 +1197,12 @@ impl DhtState { .boxed() } - #[inline(never)] pub fn get_peers( self: &Arc, info_hash: Id20, announce_port: Option, - ) -> anyhow::Result { - Ok(RequestPeersStream::new( - self.clone(), - info_hash, - announce_port, - )) + ) -> RequestPeersStream { + RequestPeersStream::new(self.clone(), info_hash, announce_port) } pub fn listen_addr(&self) -> SocketAddr { diff --git a/crates/librqbit/src/dht_utils.rs b/crates/librqbit/src/dht_utils.rs index ce054e57..3ac4cdca 100644 --- a/crates/librqbit/src/dht_utils.rs +++ b/crates/librqbit/src/dht_utils.rs @@ -130,7 +130,7 @@ mod tests { let info_hash = Id20::from_str("cab507494d02ebb1178b38f2e9d7be299c86b862").unwrap(); let dht = DhtBuilder::new().await.unwrap(); - let peer_rx = dht.get_peers(info_hash, None).unwrap(); + let peer_rx = dht.get_peers(info_hash, None); let peer_id = generate_peer_id(); match read_metainfo_from_peer_receiver( peer_id, diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 5acc511e..6606f7ae 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -214,22 +214,10 @@ fn merge_two_optional_streams( s2: Option + Unpin + Send + 'static>, ) -> Option> { match (s1, s2) { - (Some(s1), None) => { - trace!("merge_two_optional_streams: using first"); - Some(Box::pin(s1)) - } - (None, Some(s2)) => { - trace!("merge_two_optional_streams: using second"); - Some(Box::pin(s2)) - } - (Some(s1), Some(s2)) => { - trace!("merge_two_optional_streams: using both"); - Some(Box::pin(merge_streams(s1, s2))) - } - (None, None) => { - trace!("merge_two_optional_streams: using none"); - None - } + (Some(s1), None) => Some(Box::pin(s1)), + (None, Some(s2)) => Some(Box::pin(s2)), + (Some(s1), Some(s2)) => Some(Box::pin(merge_streams(s1, s2))), + (None, None) => None, } } @@ -1019,7 +1007,6 @@ impl Session { opts.initial_peers.clone().unwrap_or_default(), private, ) - .context("error creating peer stream") }; let mut seen_peers = Vec::new(); @@ -1029,12 +1016,12 @@ impl Session { Some(metadata) => { let mut peer_rx = None; if !opts.paused && !opts.list_only { - peer_rx = make_peer_rx()?; + peer_rx = make_peer_rx(); } (metadata, peer_rx) } None => { - let peer_rx = make_peer_rx()?.context( + let peer_rx = make_peer_rx().context( "no known way to resolve peers (no DHT, no trackers, no initial_peers)", )?; let resolved_magnet = self @@ -1286,7 +1273,7 @@ impl Session { self: &Arc, t: &Arc, announce: bool, - ) -> anyhow::Result { + ) -> Option { let is_private = t.with_metadata(|m| m.info.private).unwrap_or(false); self.make_peer_rx( t.info_hash(), @@ -1295,8 +1282,7 @@ impl Session { t.shared().options.force_tracker_interval, t.shared().options.initial_peers.clone(), is_private, - )? - .context("no peer source") + ) } // Get a peer stream from both DHT and trackers. @@ -1308,7 +1294,7 @@ impl Session { force_tracker_interval: Option, initial_peers: Vec, is_private: bool, - ) -> anyhow::Result> { + ) -> Option { let announce_port = if announce { self.tcp_listen_port } else { None }; let dht_rx = if is_private { None @@ -1316,7 +1302,6 @@ impl Session { self.dht .as_ref() .map(|dht| dht.get_peers(info_hash, announce_port)) - .transpose()? }; if is_private && trackers.len() > 1 { @@ -1343,9 +1328,10 @@ impl Session { } else { Some(futures::stream::iter(initial_peers)) }; - let peer_rx = merge_two_optional_streams(dht_rx, tracker_rx); - let peer_rx = merge_two_optional_streams(peer_rx, initial_peers_rx); - Ok(peer_rx) + merge_two_optional_streams( + merge_two_optional_streams(dht_rx, tracker_rx), + initial_peers_rx, + ) } async fn try_update_persistence_metadata(&self, handle: &ManagedTorrentHandle) { @@ -1363,8 +1349,8 @@ impl Session { } pub async fn unpause(self: &Arc, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { - let peer_rx = self.make_peer_rx_managed_torrent(handle, true)?; - handle.start(Some(peer_rx), false)?; + let peer_rx = self.make_peer_rx_managed_torrent(handle, true); + handle.start(peer_rx, false)?; self.try_update_persistence_metadata(handle).await; Ok(()) }