Skip to content

Commit

Permalink
Add RETR_SOCKETS
Browse files Browse the repository at this point in the history
It's a global that contains all of the sockets currently serving RETR.
The application can use it to analyze performance on a per-socket basis.
  • Loading branch information
asomers committed Dec 10, 2024
1 parent a061a45 commit 4cf2460
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ getrandom = "0.2.15"
lazy_static = "1.4.0"
md-5 = "0.10.6"
moka = { version = "0.12.7", default-features = false, features = ["sync"] }
nix = { version = "0.29.0", default-features = false, features = ["fs"] }
nix = { version = "0.29.0", default-features = false, features = ["fs", "net", "socket"] }
prometheus = { version = "0.13.4", default-features = false }
proxy-protocol = "0.5.0"
rustls = "0.23.10"
Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub mod notification;
pub(crate) mod server;
pub mod storage;

pub use crate::server::ftpserver::{error::ServerError, options, Server, ServerBuilder};
pub use crate::server::{
ftpserver::{error::ServerError, options, Server, ServerBuilder},
RETR_SOCKETS,
};

type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
80 changes: 76 additions & 4 deletions src/server/datachan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,15 @@ use crate::{
};

use crate::server::chancomms::DataChanCmd;
use std::{path::PathBuf, sync::Arc};
use std::{
net::SocketAddr,
os::fd::{AsRawFd, BorrowedFd, RawFd},
path::PathBuf,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::net::TcpStream;
use tokio::sync::mpsc::{Receiver, Sender};
Expand Down Expand Up @@ -42,7 +50,53 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;

struct MeasuringWriter<W> {
/// Holds information about a socket processing a RETR command
#[derive(Debug)]
pub struct RetrSocket {
bytes: AtomicU64,
fd: RawFd,
peer: SocketAddr,
}

impl RetrSocket {
/// How many bytes have been written to the socket so far?
///
/// Note that this tracks bytes written to the socket, not sent on the wire.
pub fn bytes(&self) -> u64 {
self.bytes.load(Ordering::Relaxed)
}

pub fn fd(&self) -> BorrowedFd<'_> {
// Safe because we always destroy the RetrSocket when the MeasuringWriter drops
#[allow(unsafe_code)]
unsafe {
BorrowedFd::borrow_raw(self.fd)
}
}

fn new<W: AsRawFd>(w: &W) -> nix::Result<Self> {
let fd = w.as_raw_fd();
let ss: nix::sys::socket::SockaddrStorage = nix::sys::socket::getpeername(fd)?;
let peer = if let Some(sin) = ss.as_sockaddr_in() {
SocketAddr::V4((*sin).into())
} else if let Some(sin6) = ss.as_sockaddr_in6() {
SocketAddr::V6((*sin6).into())
} else {
return Err(nix::errno::Errno::EINVAL);
};
let bytes = Default::default();
Ok(RetrSocket { bytes, fd, peer })
}

pub fn peer(&self) -> &SocketAddr {
&self.peer
}
}

/// Collection of all sockets currently serving RETR commands
pub static RETR_SOCKETS: std::sync::RwLock<std::collections::BTreeMap<RawFd, RetrSocket>> = std::sync::RwLock::new(std::collections::BTreeMap::new());

struct MeasuringWriter<W: AsRawFd> {
writer: W,
command: &'static str,
}
Expand All @@ -52,12 +106,20 @@ struct MeasuringReader<R> {
command: &'static str,
}

impl<W: AsyncWrite + Unpin> AsyncWrite for MeasuringWriter<W> {
impl<W: AsRawFd + AsyncWrite + Unpin> AsyncWrite for MeasuringWriter<W> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> std::task::Poll<Result<usize, std::io::Error>> {
let this = self.get_mut();

let result = Pin::new(&mut this.writer).poll_write(cx, buf);
if let Poll::Ready(Ok(bytes_written)) = &result {
let bw = *bytes_written as u64;
RETR_SOCKETS
.read()
.unwrap()
.get(&this.writer.as_raw_fd())
.expect("TODO: better error handling")
.bytes
.fetch_add(bw, Ordering::Relaxed);
metrics::inc_sent_bytes(*bytes_written, this.command);
}

Expand Down Expand Up @@ -87,12 +149,22 @@ impl<R: AsyncRead + Unpin> AsyncRead for MeasuringReader<R> {
}
}

impl<W> MeasuringWriter<W> {
impl<W: AsRawFd> MeasuringWriter<W> {
fn new(writer: W, command: &'static str) -> MeasuringWriter<W> {
let retr_socket = RetrSocket::new(&writer).expect("TODO: better error handling");
RETR_SOCKETS.write().unwrap().insert(retr_socket.fd, retr_socket);
Self { writer, command }
}
}

impl<W: AsRawFd> Drop for MeasuringWriter<W> {
fn drop(&mut self) {
if let Ok(mut guard) = RETR_SOCKETS.write() {
guard.remove(&self.writer.as_raw_fd());
}
}
}

impl<R> MeasuringReader<R> {
fn new(reader: R, command: &'static str) -> MeasuringReader<R> {
Self { reader, command }
Expand Down
1 change: 1 addition & 0 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ pub(crate) use controlchan::reply::{Reply, ReplyCode};
pub(crate) use controlchan::ControlChanMiddleware;
pub(crate) use controlchan::Event;
pub(crate) use controlchan::{ControlChanError, ControlChanErrorKind};
pub use datachan::RETR_SOCKETS;
use session::{Session, SessionState};

0 comments on commit 4cf2460

Please sign in to comment.