Skip to content

Commit

Permalink
Keep track of peers state
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanjermakov committed Oct 24, 2023
1 parent dc9ffaf commit 8b05e20
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 20 deletions.
17 changes: 9 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[macro_use]
extern crate log;

use anyhow::{Context, Result, Error};
use anyhow::{ensure, Context, Result};
use futures::future::join_all;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::{collections::BTreeMap, fs, path::PathBuf, sync::Arc};
Expand Down Expand Up @@ -86,13 +86,14 @@ async fn main() -> Result<()> {
}

let state = state.lock().await;
if state.pieces.len() != state.metainfo.info.pieces.len() {
return Err(Error::msg("pieces length mismatch"))
}

if state.pieces.values().any(|p| !p.completed) {
return Err(Error::msg("incomplete pieces"))
}
ensure!(
state.pieces.len() == state.metainfo.info.pieces.len(),
"pieces length mismatch"
);
ensure!(
state.pieces.values().all(|p| p.completed),
"incomplete pieces"
);

// TODO: split pieces into files and save to disk

Expand Down
32 changes: 20 additions & 12 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use anyhow::{ensure, Context, Error, Result};
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::join;
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpStream;
use tokio::select;
use tokio::sync::Mutex;
use tokio::time::{sleep, timeout};

use crate::hex::hex;
use crate::sha1;
use crate::state::{Block, PeerInfo, State, BLOCK_SIZE};
use crate::state::{Block, Peer, PeerInfo, State, BLOCK_SIZE};
use crate::types::ByteString;

#[derive(Debug)]
Expand Down Expand Up @@ -258,31 +258,36 @@ pub async fn send_message(stream: &mut OwnedWriteHalf, message: Message) -> Resu

pub async fn handle_peer(peer: PeerInfo, state: Arc<Mutex<State>>) -> Result<()> {
let (info_hash, peer_id) = {
let state = state.lock().await;
let mut state = state.lock().await;
state
.peers
.insert(peer.peer_id.clone(), Peer::new(peer.clone()));
(state.info_hash.clone(), state.peer_id.clone())
};
let stream = handshake(&peer, &info_hash, &peer_id)
.await
.context("handshake error")?;
info!("successfull handshake with peer {:?}", peer);

if let Some(p) = state.lock().await.peers.get_mut(&peer.peer_id) {
p.connected = true;
}

let (r_stream, mut w_stream) = stream.into_split();

send_message(&mut w_stream, Message::Unchoke).await?;
send_message(&mut w_stream, Message::Interested).await?;

let (w_res, r_res) = join!(
{
select!(
r = {
let state = state.clone();
write_loop(w_stream, peer.clone(), state)
},
{
} => r.context("write error"),
r = {
let state = state.clone();
read_loop(r_stream, peer.clone(), state)
}
);
w_res.context("write error")?;
r_res.context("read error")?;
} => r.context("read error")
)?;

Ok(())
}
Expand All @@ -304,7 +309,10 @@ pub async fn write_loop(
}
let piece = match { state.lock().await.next_piece() } {
Some(p) => p,
None => return Ok(()),
None => {
debug!("no more pieces to request, disconnecting");
return Ok(());
}
};
debug!("next request piece: {:?}", piece);
let total_blocks = piece.total_blocks();
Expand Down
15 changes: 15 additions & 0 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,28 @@ impl fmt::Debug for Block {
#[derive(Clone, Debug, PartialEq, PartialOrd, Hash)]
pub struct Peer {
pub info: PeerInfo,
pub connected: bool,
pub am_choked: bool,
pub am_interested: bool,
pub choked: bool,
pub interested: bool,
pub bitfield: Option<Vec<u8>>,
}

impl Peer {
pub fn new(info: PeerInfo) -> Peer {
Peer {
info,
connected: false,
am_choked: true,
am_interested: false,
choked: true,
interested: false,
bitfield: None,
}
}
}

#[derive(Clone, PartialEq, PartialOrd, Hash)]
pub struct PeerInfo {
pub peer_id: ByteString,
Expand Down

0 comments on commit 8b05e20

Please sign in to comment.