Skip to content

Commit

Permalink
Merge pull request #300 from ikatson/unpause-zero-peers
Browse files Browse the repository at this point in the history
Can unpause torrents even if there's no peers
  • Loading branch information
ikatson authored Jan 13, 2025
2 parents 6fea795 + c84e3ad commit bbae577
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 38 deletions.
2 changes: 1 addition & 1 deletion crates/dht/examples/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn main() -> anyhow::Result<()> {

let dht = DhtBuilder::new().await.context("error initializing DHT")?;

let mut stream = dht.get_peers(info_hash, None)?;
let mut stream = dht.get_peers(info_hash, None);

let stats_printer = async {
loop {
Expand Down
9 changes: 2 additions & 7 deletions crates/dht/src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1197,17 +1197,12 @@ impl DhtState {
.boxed()
}

#[inline(never)]
pub fn get_peers(
self: &Arc<Self>,
info_hash: Id20,
announce_port: Option<u16>,
) -> anyhow::Result<RequestPeersStream> {
Ok(RequestPeersStream::new(
self.clone(),
info_hash,
announce_port,
))
) -> RequestPeersStream {
RequestPeersStream::new(self.clone(), info_hash, announce_port)
}

pub fn listen_addr(&self) -> SocketAddr {
Expand Down
2 changes: 1 addition & 1 deletion crates/librqbit/src/dht_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ mod tests {
let info_hash = Id20::from_str("cab507494d02ebb1178b38f2e9d7be299c86b862").unwrap();
let dht = DhtBuilder::new().await.unwrap();

let peer_rx = dht.get_peers(info_hash, None).unwrap();
let peer_rx = dht.get_peers(info_hash, None);
let peer_id = generate_peer_id();
match read_metainfo_from_peer_receiver(
peer_id,
Expand Down
44 changes: 15 additions & 29 deletions crates/librqbit/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,22 +214,10 @@ fn merge_two_optional_streams<T>(
s2: Option<impl Stream<Item = T> + Unpin + Send + 'static>,
) -> Option<BoxStream<'static, T>> {
match (s1, s2) {
(Some(s1), None) => {
trace!("merge_two_optional_streams: using first");
Some(Box::pin(s1))
}
(None, Some(s2)) => {
trace!("merge_two_optional_streams: using second");
Some(Box::pin(s2))
}
(Some(s1), Some(s2)) => {
trace!("merge_two_optional_streams: using both");
Some(Box::pin(merge_streams(s1, s2)))
}
(None, None) => {
trace!("merge_two_optional_streams: using none");
None
}
(Some(s1), None) => Some(Box::pin(s1)),
(None, Some(s2)) => Some(Box::pin(s2)),
(Some(s1), Some(s2)) => Some(Box::pin(merge_streams(s1, s2))),
(None, None) => None,
}
}

Expand Down Expand Up @@ -1019,7 +1007,6 @@ impl Session {
opts.initial_peers.clone().unwrap_or_default(),
private,
)
.context("error creating peer stream")
};

let mut seen_peers = Vec::new();
Expand All @@ -1029,12 +1016,12 @@ impl Session {
Some(metadata) => {
let mut peer_rx = None;
if !opts.paused && !opts.list_only {
peer_rx = make_peer_rx()?;
peer_rx = make_peer_rx();
}
(metadata, peer_rx)
}
None => {
let peer_rx = make_peer_rx()?.context(
let peer_rx = make_peer_rx().context(
"no known way to resolve peers (no DHT, no trackers, no initial_peers)",
)?;
let resolved_magnet = self
Expand Down Expand Up @@ -1286,7 +1273,7 @@ impl Session {
self: &Arc<Self>,
t: &Arc<ManagedTorrent>,
announce: bool,
) -> anyhow::Result<PeerStream> {
) -> Option<PeerStream> {
let is_private = t.with_metadata(|m| m.info.private).unwrap_or(false);
self.make_peer_rx(
t.info_hash(),
Expand All @@ -1295,8 +1282,7 @@ impl Session {
t.shared().options.force_tracker_interval,
t.shared().options.initial_peers.clone(),
is_private,
)?
.context("no peer source")
)
}

// Get a peer stream from both DHT and trackers.
Expand All @@ -1308,15 +1294,14 @@ impl Session {
force_tracker_interval: Option<Duration>,
initial_peers: Vec<SocketAddr>,
is_private: bool,
) -> anyhow::Result<Option<PeerStream>> {
) -> Option<PeerStream> {
let announce_port = if announce { self.tcp_listen_port } else { None };
let dht_rx = if is_private {
None
} else {
self.dht
.as_ref()
.map(|dht| dht.get_peers(info_hash, announce_port))
.transpose()?
};

if is_private && trackers.len() > 1 {
Expand All @@ -1343,9 +1328,10 @@ impl Session {
} else {
Some(futures::stream::iter(initial_peers))
};
let peer_rx = merge_two_optional_streams(dht_rx, tracker_rx);
let peer_rx = merge_two_optional_streams(peer_rx, initial_peers_rx);
Ok(peer_rx)
merge_two_optional_streams(
merge_two_optional_streams(dht_rx, tracker_rx),
initial_peers_rx,
)
}

async fn try_update_persistence_metadata(&self, handle: &ManagedTorrentHandle) {
Expand All @@ -1363,8 +1349,8 @@ impl Session {
}

pub async fn unpause(self: &Arc<Self>, handle: &ManagedTorrentHandle) -> anyhow::Result<()> {
let peer_rx = self.make_peer_rx_managed_torrent(handle, true)?;
handle.start(Some(peer_rx), false)?;
let peer_rx = self.make_peer_rx_managed_torrent(handle, true);
handle.start(peer_rx, false)?;
self.try_update_persistence_metadata(handle).await;
Ok(())
}
Expand Down

0 comments on commit bbae577

Please sign in to comment.