Skip to content

Commit

Permalink
Torrent state; async peer handling
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanjermakov committed Oct 24, 2023
1 parent 81c4efd commit b54da89
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 118 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ anyhow = "1.0.75"
env_logger = "0.10.0"
log = "0.4.20"
rand = "0.8.5"
reqwest = { version = "0.11", features=["blocking"] }
tokio = { version = "1.33.0", features=["full"] }
reqwest = "0.11"
sha1 = "0.10.6"
urlencoding = "2.1.3"
futures = "0.3.28"
69 changes: 28 additions & 41 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@
extern crate log;

use anyhow::{Context, Result};
use peer::handshake;
use futures::future::join_all;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::{fs, path::PathBuf};
use std::{collections::BTreeMap, fs, path::PathBuf, sync::Arc};
use tokio::sync::Mutex;

use bencode::parse_bencoded;
use types::ByteString;

use crate::{
metainfo::Metainfo,
peer::{read_message, send_message, Message},
peer::handle_peer,
state::{init_pieces, State},
tracker::{tracker_request, TrackerRequest, TrackerResponse},
};

Expand All @@ -20,10 +22,12 @@ mod hex;
mod metainfo;
mod peer;
mod sha1;
mod state;
mod tracker;
mod types;

fn main() -> Result<()> {
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init_from_env(
env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"),
);
Expand All @@ -48,54 +52,37 @@ fn main() -> Result<()> {
let peer_id = generate_peer_id();
info!("peer id {}", String::from_utf8_lossy(peer_id.as_slice()));
let tracker_response = tracker_request(
metainfo.announce,
metainfo.announce.clone(),
TrackerRequest::new(
info_hash.clone(),
peer_id.clone(),
tracker::TrackerEvent::Started,
None,
),
)
.await
.context("request failed")?;
info!("tracker response: {tracker_response:?}");

let state = Arc::new(Mutex::new(State {
metainfo: metainfo.clone(),
info_hash,
peer_id,
pieces: init_pieces(&metainfo.info),
peers: BTreeMap::new(),
}));

if let TrackerResponse::Success(resp) = tracker_response {
for p in resp.peers {
match handshake(&p, &info_hash, &peer_id) {
Ok(stream) => {
info!("successfull handshake with peer {:?}", p);
send_message(&stream, Message::Unchoke)?;
send_message(&stream, Message::Interested)?;
loop {
match read_message(&stream) {
Ok(Message::Choke) => {
continue;
}
Ok(msg) => {
if matches!(msg, Message::Unchoke) {
for i in 0..16 {
let block_size = 1 << 14;
send_message(
&stream,
Message::Request {
piece_index: 0,
begin: i * block_size,
length: block_size,
},
)?;
}
}
}
Err(e) => {
warn!("{}", e);
break;
}
};
}
}
Err(e) => warn!("handshake error: {}", e),
}
}
let handles = resp
.peers
.into_iter()
.take(4)
.map(|p| {
let state = state.clone();
handle_peer(p, state)
})
.collect::<Vec<_>>();
join_all(handles).await;
}
Ok(())
}
Expand Down
28 changes: 14 additions & 14 deletions src/metainfo.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use core::fmt;
use std::path::PathBuf;

use crate::{bencode::BencodeValue, hex::hex, types::ByteString};
use crate::{bencode::BencodeValue, state::PieceHash};

#[derive(Debug, PartialEq, Eq, Hash)]
#[derive(Clone, Debug, PartialEq, PartialOrd, Hash)]
pub struct Metainfo {
pub info: Info,
pub announce: String,
Expand All @@ -14,16 +14,7 @@ pub struct Metainfo {
pub encoding: Option<String>,
}

#[derive(PartialEq, Eq, Hash)]
pub struct PieceHash(ByteString);

impl fmt::Debug for PieceHash {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "#{}", hex(&self.0))
}
}

#[derive(PartialEq, Eq, Hash)]
#[derive(Clone, PartialEq, PartialOrd, Hash)]
pub struct Info {
pub piece_length: i64,
pub pieces: Vec<PieceHash>,
Expand All @@ -43,7 +34,7 @@ impl fmt::Debug for Info {
}
}

#[derive(Debug, PartialEq, Eq, Hash)]
#[derive(Clone, Debug, PartialEq, PartialOrd, Hash)]
pub enum FileInfo {
Single {
length: i64,
Expand All @@ -54,7 +45,16 @@ pub enum FileInfo {
},
}

#[derive(Debug, PartialEq, Eq, Hash)]
impl FileInfo {
pub fn total_length(&self) -> i64 {
match self {
FileInfo::Single { length, .. } => *length,
FileInfo::Multi { files } => files.iter().map(|f| f.length).sum(),
}
}
}

#[derive(Clone, Debug, PartialEq, PartialOrd, Hash)]
pub struct FilesInfo {
pub length: i64,
pub path: PathBuf,
Expand Down
104 changes: 69 additions & 35 deletions src/peer.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
use anyhow::{ensure, Context, Error, Result};
use core::fmt;
use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr, TcpStream};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio::time::timeout;

use crate::hex::hex;
use crate::tracker::TrackerPeer;
use crate::state::{Block, PeerInfo, State};
use crate::types::ByteString;

pub struct Block(Vec<u8>);

impl fmt::Debug for Block {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("<block>")
}
}

#[derive(Debug)]
pub enum Message {
Handshake {
Expand Down Expand Up @@ -140,35 +133,33 @@ impl TryFrom<Vec<u8>> for Message {
}
}

pub fn handshake(
peer: &TrackerPeer,
pub async fn handshake(
peer: &PeerInfo,
info_hash: &ByteString,
peer_id: &ByteString,
) -> Result<TcpStream> {
let cn_timeout = Duration::new(1, 0);
let hs_timeout = Duration::new(1, 0);
let rw_timeout = Duration::new(4, 0);
debug!("connecting to peer {peer:?}");
let mut stream = TcpStream::connect_timeout(
&SocketAddr::new(IpAddr::from_str(&peer.ip)?, peer.port as u16),
cn_timeout,
)?;
// special short timeout for the first read
stream.set_read_timeout(Some(hs_timeout))?;
stream.set_write_timeout(Some(rw_timeout))?;
let mut stream = timeout(
Duration::new(4, 0),
TcpStream::connect(format!("{}:{}", peer.ip, peer.port)),
)
.await??;
let handshake: Vec<u8> = Message::Handshake {
info_hash: info_hash.clone(),
peer_id: peer_id.clone(),
}
.into();

debug!("writing handshake {}", hex(&handshake.to_vec()));
stream.write_all(&handshake).context("write error")?;
stream.flush()?;
stream.write_all(&handshake).await.context("write error")?;
stream.flush().await?;

let mut read_packet = [0; 68];
debug!("reading handshake");
stream.read_exact(&mut read_packet).context("read error")?;
stream
.read_exact(&mut read_packet)
.await
.context("read error")?;
let msg: Vec<u8> = read_packet.to_vec();
debug!("peer response: {}", hex(&msg));
if let Message::Handshake {
Expand All @@ -182,27 +173,29 @@ pub fn handshake(
if h_peer_id != peer.peer_id {
debug!("peer id differ")
}
stream.set_read_timeout(Some(rw_timeout))?;
Ok(stream)
} else {
Err(Error::msg("unexpected message"))
}
}

pub fn read_message(mut stream: &TcpStream) -> Result<Message> {
pub async fn read_message(stream: &mut TcpStream) -> Result<Message> {
fn u32_from_slice(slice: &[u8]) -> Result<u32> {
Ok(u32::from_be_bytes(slice.try_into()?))
}

let mut len_p = [0; 4];
stream.read_exact(&mut len_p)?;
stream.read_exact(&mut len_p).await?;
let len = u32::from_be_bytes(len_p);
if len == 0 {
return Ok(Message::KeepAlive);
}

let mut id_p = [0; 1];
stream.read_exact(&mut id_p).context("id_p read error")?;
stream
.read_exact(&mut id_p)
.await
.context("id_p read error")?;
let id = u8::from_be_bytes(id_p);

let msg = match id {
Expand All @@ -215,6 +208,7 @@ pub fn read_message(mut stream: &TcpStream) -> Result<Message> {
let mut payload_p = vec![0; len as usize - 1];
stream
.read_exact(&mut payload_p)
.await
.context("payload_p read error")?;
match id {
4 if len == 5 => Ok(Message::Have {
Expand Down Expand Up @@ -250,11 +244,51 @@ pub fn read_message(mut stream: &TcpStream) -> Result<Message> {
Ok(msg)
}

pub fn send_message(mut stream: &TcpStream, message: Message) -> Result<()> {
pub async fn send_message(stream: &mut TcpStream, message: Message) -> Result<()> {
debug!(">>> sending message: {:?}", message);
let msg_p: Vec<u8> = message.into();
trace!("raw message: {}", hex(&msg_p));
stream.write_all(&msg_p)?;
stream.flush()?;
stream.write_all(&msg_p).await?;
stream.flush().await?;
Ok(())
}

pub async fn handle_peer(peer: PeerInfo, state: Arc<Mutex<State>>) -> Result<()> {
let (info_hash, peer_id) = {
let state = state.lock().await;
(state.info_hash.clone(), state.peer_id.clone())
};
match handshake(&peer, &info_hash, &peer_id).await {
Ok(mut stream) => {
info!("successfull handshake with peer {:?}", peer);
send_message(&mut stream, Message::Unchoke).await?;
send_message(&mut stream, Message::Interested).await?;
loop {
match read_message(&mut stream).await {
Ok(Message::Choke) => {
continue;
}
Ok(msg) => {
if matches!(msg, Message::Unchoke) {
for i in 0..16 {
let block_size = 1 << 14;
let request_msg = Message::Request {
piece_index: 0,
begin: i * block_size,
length: block_size,
};
send_message(&mut stream, request_msg).await?;
}
}
}
Err(e) => {
warn!("{}", e);
break;
}
};
}
}
Err(e) => warn!("handshake error: {}", e),
};
Ok(())
}
2 changes: 1 addition & 1 deletion src/sha1.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use sha1::{Sha1, Digest};
use sha1::{Digest, Sha1};

use crate::types::ByteString;

Expand Down
Loading

0 comments on commit b54da89

Please sign in to comment.