Skip to content

Commit

Permalink
try 2 to fix the build on windows
Browse files Browse the repository at this point in the history
  • Loading branch information
asomers committed Dec 10, 2024
1 parent 101c5d4 commit 0fd4222
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 16 deletions.
61 changes: 45 additions & 16 deletions src/server/datachan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ use crate::{

use crate::server::chancomms::DataChanCmd;
use std::{
net::SocketAddr,
path::PathBuf,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
sync::Arc,
};
#[cfg(unix)]
use std::os::fd::{AsRawFd, BorrowedFd, RawFd};
use std::{
net::SocketAddr,
os::fd::{AsRawFd, BorrowedFd, RawFd},
sync::atomic::{AtomicU64, Ordering},
};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::net::TcpStream;
use tokio::sync::mpsc::{Receiver, Sender};
Expand Down Expand Up @@ -100,33 +100,62 @@ impl RetrSocket {
#[cfg(unix)]
pub static RETR_SOCKETS: std::sync::RwLock<std::collections::BTreeMap<RawFd, RetrSocket>> = std::sync::RwLock::new(std::collections::BTreeMap::new());

#[cfg(unix)]
struct MeasuringWriter<W: AsRawFd> {
writer: W,
command: &'static str,
}
#[cfg(not(unix))]
struct MeasuringWriter {
writer: W,
command: &'static str,
}

struct MeasuringReader<R> {
reader: R,
command: &'static str,
}

#[cfg(unix)]
impl<W: AsRawFd + AsyncWrite + Unpin> AsyncWrite for MeasuringWriter<W> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> std::task::Poll<Result<usize, std::io::Error>> {
let this = self.get_mut();

let result = Pin::new(&mut this.writer).poll_write(cx, buf);
if let Poll::Ready(Ok(bytes_written)) = &result {
let bw = *bytes_written as u64;
#[cfg(unix)]
{
RETR_SOCKETS
.read()
.unwrap()
.get(&this.writer.as_raw_fd())
.expect("TODO: better error handling")
.bytes
.fetch_add(bw, Ordering::Relaxed);
}
RETR_SOCKETS
.read()
.unwrap()
.get(&this.writer.as_raw_fd())
.expect("TODO: better error handling")
.bytes
.fetch_add(bw, Ordering::Relaxed);
metrics::inc_sent_bytes(*bytes_written, this.command);
}

result
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
let this = self.get_mut();
Pin::new(&mut this.writer).poll_flush(cx)
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
let this = self.get_mut();
Pin::new(&mut this.writer).poll_shutdown(cx)
}
}

#[cfg(not(unix))]
impl<W: AsyncWrite + Unpin> AsyncWrite for MeasuringWriter<W> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> std::task::Poll<Result<usize, std::io::Error>> {
let this = self.get_mut();

let result = Pin::new(&mut this.writer).poll_write(cx, buf);
if let Poll::Ready(Ok(bytes_written)) = &result {
let bw = *bytes_written as u64;
metrics::inc_sent_bytes(*bytes_written, this.command);
}

Expand Down
1 change: 1 addition & 0 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ pub(crate) use controlchan::reply::{Reply, ReplyCode};
pub(crate) use controlchan::ControlChanMiddleware;
pub(crate) use controlchan::Event;
pub(crate) use controlchan::{ControlChanError, ControlChanErrorKind};
#[cfg(unix)]
pub use datachan::RETR_SOCKETS;
use session::{Session, SessionState};

0 comments on commit 0fd4222

Please sign in to comment.