Skip to content

Commit

Permalink
Fix TCP reader
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanjermakov committed Oct 23, 2023
1 parent 98d5aab commit 81c4efd
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
25 changes: 15 additions & 10 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,24 @@ fn main() -> Result<()> {
info!("successfull handshake with peer {:?}", p);
send_message(&stream, Message::Unchoke)?;
send_message(&stream, Message::Interested)?;
send_message(
&stream,
Message::Request {
piece_index: 0,
begin: 0,
length: 1 << 14,
},
)?;
loop {
match read_message(&stream) {
Ok(Message::Choke) => {
continue;
}
Ok(msg) => {
if matches!(msg, Message::Piece { .. }) {
continue;
if matches!(msg, Message::Unchoke) {
for i in 0..16 {
let block_size = 1 << 14;
send_message(
&stream,
Message::Request {
piece_index: 0,
begin: i * block_size,
length: block_size,
},
)?;
}
}
}
Err(e) => {
Expand Down
16 changes: 7 additions & 9 deletions src/peer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::{ensure, Context, Error, Result};
use core::fmt;
use std::io::{BufReader, Read, Write};
use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr, TcpStream};
use std::str::FromStr;
use std::time::Duration;
Expand Down Expand Up @@ -147,7 +147,7 @@ pub fn handshake(
) -> Result<TcpStream> {
let cn_timeout = Duration::new(1, 0);
let hs_timeout = Duration::new(1, 0);
let rw_timeout = Duration::new(2, 0);
let rw_timeout = Duration::new(4, 0);
debug!("connecting to peer {peer:?}");
let mut stream = TcpStream::connect_timeout(
&SocketAddr::new(IpAddr::from_str(&peer.ip)?, peer.port as u16),
Expand All @@ -166,10 +166,9 @@ pub fn handshake(
stream.write_all(&handshake).context("write error")?;
stream.flush()?;

let mut reader = BufReader::new(&stream);
let mut read_packet = [0; 68];
debug!("reading handshake");
reader.read_exact(&mut read_packet).context("read error")?;
stream.read_exact(&mut read_packet).context("read error")?;
let msg: Vec<u8> = read_packet.to_vec();
debug!("peer response: {}", hex(&msg));
if let Message::Handshake {
Expand All @@ -190,21 +189,20 @@ pub fn handshake(
}
}

pub fn read_message(stream: &TcpStream) -> Result<Message> {
let mut reader = BufReader::new(stream);
pub fn read_message(mut stream: &TcpStream) -> Result<Message> {
fn u32_from_slice(slice: &[u8]) -> Result<u32> {
Ok(u32::from_be_bytes(slice.try_into()?))
}

let mut len_p = [0; 4];
reader.read_exact(&mut len_p)?;
stream.read_exact(&mut len_p)?;
let len = u32::from_be_bytes(len_p);
if len == 0 {
return Ok(Message::KeepAlive);
}

let mut id_p = [0; 1];
reader.read_exact(&mut id_p).context("id_p read error")?;
stream.read_exact(&mut id_p).context("id_p read error")?;
let id = u8::from_be_bytes(id_p);

let msg = match id {
Expand All @@ -215,7 +213,7 @@ pub fn read_message(stream: &TcpStream) -> Result<Message> {
_ if len == 1 => Err(Error::msg("unexpected message of size 1")),
_ => {
let mut payload_p = vec![0; len as usize - 1];
reader
stream
.read_exact(&mut payload_p)
.context("payload_p read error")?;
match id {
Expand Down

0 comments on commit 81c4efd

Please sign in to comment.