Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize file reads in fs module #1071

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ compression-gzip = ["async-compression/deflate", "async-compression/gzip"]
[profile.release]
codegen-units = 1
incremental = false
lto = "fat"
strip = true

[profile.bench]
codegen-units = 1
Expand Down Expand Up @@ -97,7 +99,9 @@ required-features = ["websocket"]
[[example]]
name = "query_string"


[[example]]
name = "multipart"
required-features = ["multipart"]

[[example]]
name = "file"
138 changes: 46 additions & 92 deletions src/filters/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,24 @@

use std::cmp;
use std::convert::Infallible;
use std::fs::Metadata;
use std::fs::{File as StdFile, Metadata};
use std::future::Future;
use std::io;
use std::io::{self, BufReader, Error, Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::task::{Context, Poll};

use bytes::{Bytes, BytesMut};
use futures_util::future::Either;
use futures_util::{future, ready, stream, FutureExt, Stream, StreamExt, TryFutureExt};
use futures_util::{future, Stream, TryFutureExt};
use headers::{
AcceptRanges, ContentLength, ContentRange, ContentType, HeaderMapExt, IfModifiedSince, IfRange,
IfUnmodifiedSince, LastModified, Range,
};
use http::StatusCode;
use hyper::Body;
use percent_encoding::percent_decode_str;
use tokio::fs::File as TkFile;
use tokio::io::AsyncSeekExt;
use tokio_util::io::poll_read_buf;

use crate::filter::{Filter, FilterClone, One};
use crate::reject::{self, Rejection};
Expand Down Expand Up @@ -92,8 +89,7 @@ fn path_from_tail(
) -> impl FilterClone<Extract = One<ArcPath>, Error = Rejection> {
crate::path::tail().and_then(move |tail: crate::path::Tail| {
future::ready(sanitize_path(base.as_ref(), tail.as_str())).and_then(|mut buf| async {
let is_dir = tokio::fs::metadata(buf.clone())
.await
let is_dir = std::fs::metadata(buf.clone())
.map(|m| m.is_dir())
.unwrap_or(false);

Expand Down Expand Up @@ -264,7 +260,7 @@ fn file_reply(
path: ArcPath,
conditionals: Conditionals,
) -> impl Future<Output = Result<File, Rejection>> + Send {
TkFile::open(path.clone()).then(move |res| match res {
match StdFile::open(path.clone()) {
Ok(f) => Either::Left(file_conditional(f, path, conditionals)),
Err(err) => {
let rej = match err.kind() {
Expand All @@ -287,11 +283,11 @@ fn file_reply(
};
Either::Right(future::err(rej))
}
})
}
}

async fn file_metadata(f: TkFile) -> Result<(TkFile, Metadata), Rejection> {
match f.metadata().await {
async fn file_metadata(f: StdFile) -> Result<(StdFile, Metadata), Rejection> {
match f.metadata() {
Ok(meta) => Ok((f, meta)),
Err(err) => {
tracing::debug!("file metadata error: {}", err);
Expand All @@ -301,22 +297,27 @@ async fn file_metadata(f: TkFile) -> Result<(TkFile, Metadata), Rejection> {
}

fn file_conditional(
f: TkFile,
f: StdFile,
path: ArcPath,
conditionals: Conditionals,
) -> impl Future<Output = Result<File, Rejection>> + Send {
file_metadata(f).map_ok(move |(file, meta)| {
file_metadata(f).map_ok(move |(mut file, meta)| {
let mut len = meta.len();
let modified = meta.modified().ok().map(LastModified::from);

let resp = match conditionals.check(modified) {
Cond::NoBody(resp) => resp,
Cond::WithBody(range) => {
let buf_size = optimal_buf_size(&meta);
bytes_range(range, len)
.map(|(start, end)| {
file.seek(SeekFrom::Start(start))
.expect("error while seeking the file to the specified offset");

let sub_len = end - start;
let buf_size = optimal_buf_size(&meta);
let stream = file_stream(file, buf_size, (start, end));
let reader = BufReader::new(file).take(sub_len);
let stream = FileStream { reader, buf_size };

let body = Body::wrap_stream(stream);

let mut resp = Response::new(body);
Expand Down Expand Up @@ -402,68 +403,10 @@ fn bytes_range(range: Option<Range>, max_len: u64) -> Result<(u64, u64), BadRang
ret
}

fn file_stream(
mut file: TkFile,
buf_size: usize,
(start, end): (u64, u64),
) -> impl Stream<Item = Result<Bytes, io::Error>> + Send {
use std::io::SeekFrom;

let seek = async move {
if start != 0 {
file.seek(SeekFrom::Start(start)).await?;
}
Ok(file)
};

seek.into_stream()
.map(move |result| {
let mut buf = BytesMut::new();
let mut len = end - start;
let mut f = match result {
Ok(f) => f,
Err(f) => return Either::Left(stream::once(future::err(f))),
};

Either::Right(stream::poll_fn(move |cx| {
if len == 0 {
return Poll::Ready(None);
}
reserve_at_least(&mut buf, buf_size);

let n = match ready!(poll_read_buf(Pin::new(&mut f), cx, &mut buf)) {
Ok(n) => n as u64,
Err(err) => {
tracing::debug!("file read error: {}", err);
return Poll::Ready(Some(Err(err)));
}
};

if n == 0 {
tracing::debug!("file read found EOF before expected length");
return Poll::Ready(None);
}

let mut chunk = buf.split().freeze();
if n > len {
chunk = chunk.split_to(len as usize);
len = 0;
} else {
len -= n;
}

Poll::Ready(Some(Ok(chunk)))
}))
})
.flatten()
}

fn reserve_at_least(buf: &mut BytesMut, cap: usize) {
if buf.capacity() - buf.len() < cap {
buf.reserve(cap);
}
}
#[cfg(unix)]
const DEFAULT_READ_BUF_SIZE: usize = 4_096;

#[cfg(not(unix))]
const DEFAULT_READ_BUF_SIZE: usize = 8_192;

fn optimal_buf_size(metadata: &Metadata) -> usize {
Expand All @@ -489,6 +432,31 @@ fn get_block_size(_metadata: &Metadata) -> usize {
DEFAULT_READ_BUF_SIZE
}

#[derive(Debug)]
struct FileStream<T> {
reader: T,
buf_size: usize,
}

impl<T: Read + Unpin> Stream for FileStream<T> {
type Item = Result<Bytes, Error>;

fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut buf = BytesMut::zeroed(self.buf_size);
match Pin::into_inner(self).reader.read(&mut buf[..]) {
Ok(n) => {
if n == 0 {
Poll::Ready(None)
} else {
buf.truncate(n);
Poll::Ready(Some(Ok(buf.freeze())))
}
}
Err(err) => Poll::Ready(Some(Err(err))),
}
}
}

// ===== Rejections =====

unit_error! {
Expand All @@ -502,7 +470,6 @@ unit_error! {
#[cfg(test)]
mod tests {
use super::sanitize_path;
use bytes::BytesMut;

#[test]
fn test_sanitize_path() {
Expand All @@ -522,17 +489,4 @@ mod tests {

sanitize_path(base, "/C:\\/foo.html").expect_err("C:\\");
}

#[test]
fn test_reserve_at_least() {
let mut buf = BytesMut::new();
let cap = 8_192;

assert_eq!(buf.len(), 0);
assert_eq!(buf.capacity(), 0);

super::reserve_at_least(&mut buf, cap);
assert_eq!(buf.len(), 0);
assert_eq!(buf.capacity(), cap);
}
}
Loading