Skip to content

Commit

Permalink
Discover DHT peers
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanjermakov committed Oct 27, 2023
1 parent 3953389 commit b023e00
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 8 deletions.
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod sha1;
mod state;
mod torrent;
mod tracker;
mod tracker_udp;
mod types;
mod udp;

Expand Down
21 changes: 16 additions & 5 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub enum Message {
length: u32,
},
Port {
port: u8,
port: u16,
},
}

Expand All @@ -66,8 +66,10 @@ impl From<Message> for Vec<u8> {
Message::Handshake { info_hash, peer_id } => {
let pstr = "BitTorrent protocol";
let pstrlen = &[pstr.len() as u8];
let reserved = &[0u8; 8];
[pstrlen, pstr.as_bytes(), reserved, &info_hash, &peer_id].concat()
let mut reserved = [0u8; 8];
// DHT protocol support, 63rd feature bit
reserved[7] |= 1;
[pstrlen, pstr.as_bytes(), &reserved, &info_hash, &peer_id].concat()
}
Message::KeepAlive => [u32tb(0).as_slice()].concat(),
Message::Choke => [u32tb(1).as_slice(), &[0]].concat(),
Expand Down Expand Up @@ -116,7 +118,7 @@ impl From<Message> for Vec<u8> {
&u32tb(length),
]
.concat(),
Message::Port { port } => [u32tb(3).as_slice(), &[9], &[port]].concat(),
Message::Port { port } => [u32tb(3).as_slice(), &[9], &port.to_be_bytes()].concat(),
}
}
}
Expand Down Expand Up @@ -252,7 +254,9 @@ pub async fn read_message(stream: &mut OwnedReadHalf) -> Result<Message> {
begin: u32_from_slice(&payload_p[4..8])?,
length: u32_from_slice(&payload_p[8..12])?,
}),
9 if len == 3 => Ok(Message::Port { port: payload_p[0] }),
9 if len == 3 => Ok(Message::Port {
port: u16::from_be_bytes(payload_p[0..2].try_into()?),
}),
_ => Err(Error::msg(format!(
"unexpected message: {}",
hex(&[len_p.as_ref(), &id_p, payload_p.as_slice()].concat())
Expand Down Expand Up @@ -500,6 +504,13 @@ async fn read_loop(
);
}
}
Ok(Message::Port { port }) => match state.lock().await.peers.get_mut(&peer) {
Some(p) => {
debug!("received port {}", port);
p.dht_port = Some(port)
}
_ => debug!("no peer {:?}", peer),
},
Ok(msg) => {
debug!("no handler for message, skipping: {:?}", msg);
}
Expand Down
4 changes: 3 additions & 1 deletion src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub struct Peer {
pub choked: bool,
pub interested: bool,
pub bitfield: Option<Vec<u8>>,
pub dht_port: Option<u16>
}

impl Peer {
Expand All @@ -103,6 +104,7 @@ impl Peer {
choked: true,
interested: false,
bitfield: None,
dht_port: None
}
}
}
Expand All @@ -117,7 +119,7 @@ pub enum PeerStatus {
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct PeerInfo {
pub ip: String,
pub port: i64,
pub port: u16,
}

pub fn init_pieces(info: &Info) -> BTreeMap<u32, Piece> {
Expand Down
12 changes: 12 additions & 0 deletions src/torrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use tokio::{spawn, sync::Mutex};

use crate::abort::EnsureAbort;
use crate::config::Config;
use crate::state::PeerInfo;
use crate::{
bencode::{parse_bencoded, BencodeValue},
metainfo::{FileInfo, Metainfo, PathInfo},
Expand Down Expand Up @@ -93,6 +94,17 @@ pub async fn download_torrent(path: &Path, peer_id: &ByteString, config: &Config
"incomplete pieces"
);

let dht_peers: Vec<PeerInfo> = state
.peers
.values()
.filter(|p| p.dht_port.is_some())
.map(|p| PeerInfo {
ip: p.info.ip.clone(),
port: p.dht_port.unwrap(),
})
.collect();
debug!("discovered {} dht peers: {:?}", dht_peers.len(), dht_peers);

info!("writing files to disk");
write_to_disk(state).await?;

Expand Down
2 changes: 1 addition & 1 deletion src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl TryFrom<BencodeValue> for TrackerResponse {
_ => return Err("'ip' missing".into()),
},
port: match p_dict.get("port") {
Some(BencodeValue::Int(p)) => *p,
Some(BencodeValue::Int(p)) => *p as u16,
_ => return Err("'port' missing".into()),
},
}),
Expand Down
2 changes: 1 addition & 1 deletion src/tracker_udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub async fn tracker_request_udp(
.map(|b| b.to_string())
.collect::<Vec<_>>()
.join("."),
port: u16::from_be_bytes(pkg[i + 4..i + 6].try_into().unwrap()) as i64,
port: u16::from_be_bytes(pkg[i + 4..i + 6].try_into().unwrap()),
})
.collect();

Expand Down

0 comments on commit b023e00

Please sign in to comment.