Skip to content

Commit

Permalink
Fix #27 #28
Browse files Browse the repository at this point in the history
  • Loading branch information
SajjadPourali committed Mar 27, 2024
1 parent 20cb3e0 commit 84accb2
Showing 1 changed file with 29 additions and 14 deletions.
43 changes: 29 additions & 14 deletions src/stream/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ use std::{
cmp,
future::Future,
io::{Error, ErrorKind},
mem::MaybeUninit,
net::SocketAddr,
pin::Pin,
task::{Context, Poll, Waker},
time::Duration,
};
use tokio::{
io::{AsyncRead, AsyncWrite},
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
runtime::Handle,
sync::mpsc::{self, UnboundedReceiver, UnboundedSender},
task,
};

use log::{trace, warn};
Expand Down Expand Up @@ -244,6 +247,7 @@ impl AsyncRead for IpStackTcpStream {
self.shutdown.ready();
return Poll::Ready(Ok(()));
}
continue;
}
if let Some(b) = self.tcb.get_unordered_packets() {
self.tcb.add_ack(b.len() as u32);
Expand All @@ -265,6 +269,7 @@ impl AsyncRead for IpStackTcpStream {
{
self.packet_to_send =
Some(self.create_rev_packet(FIN | ACK, TTL, None, Vec::new())?);
self.tcb.add_seq_one();
self.tcb.change_state(TcpState::FinWait1(false));
continue;
}
Expand Down Expand Up @@ -402,6 +407,7 @@ impl AsyncRead for IpStackTcpStream {
}
} else if matches!(self.tcb.get_state(), TcpState::FinWait1(false)) {
if t.flags() == ACK {
self.tcb.change_last_ack(t.inner().acknowledgment_number);
self.tcb.change_state(TcpState::FinWait2(true));
continue;
} else if t.flags() == (FIN | ACK) {
Expand All @@ -417,16 +423,15 @@ impl AsyncRead for IpStackTcpStream {
if t.flags() == ACK {
self.tcb.change_state(TcpState::FinWait2(false));
} else if t.flags() == (FIN | ACK) {
self.tcb.add_ack(1);
self.packet_to_send =
Some(self.create_rev_packet(ACK, TTL, None, Vec::new())?);
self.tcb.change_state(TcpState::FinWait2(false));
}
}
}
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => {
return Poll::Pending;
}
Poll::Pending => return Poll::Pending,
}
}
}
Expand Down Expand Up @@ -509,21 +514,31 @@ impl AsyncWrite for IpStackTcpStream {
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::io::Result<()>> {
match &self.shutdown {
Shutdown::Ready => Poll::Ready(Ok(())),
Shutdown::Pending(_) => Poll::Pending,
Shutdown::None => {
self.shutdown.pending(cx.waker().clone());
Poll::Pending
}
if matches!(self.shutdown, Shutdown::Ready) {
return Poll::Ready(Ok(()));
} else if matches!(self.shutdown, Shutdown::None) {
self.shutdown.pending(cx.waker().clone());
}
self.poll_read(
cx,
&mut tokio::io::ReadBuf::uninit(&mut [MaybeUninit::<u8>::uninit()]),
)
}
}

impl Drop for IpStackTcpStream {
fn drop(&mut self) {
if let Ok(p) = self.create_rev_packet(NON, DROP_TTL, None, Vec::new()) {
_ = self.packet_sender.send(p);
}
task::block_in_place(move || {
Handle::current().block_on(async move {
_ = self.shutdown().await;
println!(
"Shudown done, Drop for IpStackTcpStream {:?}",
self.dst_addr
);
if let Ok(p) = self.create_rev_packet(NON, DROP_TTL, None, Vec::new()) {
_ = self.packet_sender.send(p);
}
});
});
}
}

0 comments on commit 84accb2

Please sign in to comment.