Skip to content

Commit

Permalink
Made rustus go Brrrr. (#51)
Browse files Browse the repository at this point in the history
Made rustus go Brrrr.
Description:
* Optimized number of syscalls for writing by using Buffered IO;
* Rmoved redundant clone calls;
* Changed Storage add_bytes signature;
* Moved serde calls to tokio;
* Removed redundant memory allocations.
* Async runtime is changed back to actix.
* Spawning tasks with actix runtime.
* Super mega solution to speedup fs ops.

Signed-off-by: Pavel Kirilin <[email protected]>
  • Loading branch information
s3rius authored Mar 6, 2022
1 parent c98c542 commit 696206c
Show file tree
Hide file tree
Showing 19 changed files with 214 additions and 145 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ serde_json = "1"
strfmt = "^0.1.6"
thiserror = "^1.0"
url = "2.2.2"
bytes = "1.1.0"

[dependencies.futures]
version = "0.3.21"
Expand Down Expand Up @@ -82,7 +83,7 @@ features = ["derive"]
version = "0.23"

[dependencies.tokio]
features = ["time", "process", "fs"]
features = ["time", "process", "fs", "io-std", "io-util", "rt-multi-thread", "bytes"]
version = "1.4.0"

[dependencies.tokio-amqp]
Expand Down Expand Up @@ -114,7 +115,7 @@ httptest = "0.15.4"

[profile]
[profile.release]
lto = true
lto = "fat"
panic = "abort"
opt-level = 3
codegen-units = 1
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub struct StorageOptions {

#[structopt(long, env = "RUSTUS_DIR_STRUCTURE", default_value = "")]
pub dir_structure: String,

#[structopt(long, parse(from_flag))]
pub force_fsync: bool,
}

#[derive(StructOpt, Debug, Clone)]
Expand Down
2 changes: 2 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pub enum RustusError {
AMQPPoolError(#[from] mobc_lapin::mobc::Error<lapin::Error>),
#[error("Std error: {0}")]
StdError(#[from] std::io::Error),
#[error("Can't spawn task: {0}")]
TokioSpawnError(#[from] tokio::task::JoinError),
}

/// This conversion allows us to use `RustusError` in the `main` function.
Expand Down
2 changes: 1 addition & 1 deletion src/info_storages/db_info_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl InfoStorage for DBInfoStorage {
async fn get_info(&self, file_id: &str) -> RustusResult<FileInfo> {
let model: Option<DbModel> = self.db.fetch_by_column("id", file_id).await?;
if let Some(info) = model {
serde_json::from_str(info.info.as_str()).map_err(RustusError::from)
FileInfo::from_json(info.info.to_string()).await
} else {
Err(RustusError::FileNotFound)
}
Expand Down
77 changes: 47 additions & 30 deletions src/info_storages/file_info_storage.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::io::{Read, Write};
use std::path::PathBuf;

use async_trait::async_trait;
use log::error;
use tokio::fs::{read_to_string, remove_file, DirBuilder, OpenOptions};
use tokio::io::copy;
use std::fs::{remove_file, File, OpenOptions};
use std::io::{BufReader, BufWriter};
use tokio::fs::DirBuilder;

use crate::errors::{RustusError, RustusResult};
use crate::info_storages::{FileInfo, InfoStorage};
Expand Down Expand Up @@ -35,42 +37,57 @@ impl InfoStorage for FileInfoStorage {
}

async fn set_info(&self, file_info: &FileInfo, create: bool) -> RustusResult<()> {
let mut file = OpenOptions::new()
.write(true)
.create(create)
.truncate(true)
.open(self.info_file_path(file_info.id.as_str()).as_path())
.await
.map_err(|err| {
error!("{:?}", err);
RustusError::UnableToWrite(err.to_string())
})?;
copy(&mut file_info.json().await?.as_bytes(), &mut file).await?;
tokio::task::spawn(async move { file.sync_data().await });
Ok(())
let info = file_info.clone();
let path = self.info_file_path(info.id.as_str());
actix_web::rt::task::spawn_blocking(move || {
let file = OpenOptions::new()
.write(true)
.create(create)
.truncate(true)
.open(path)
.map_err(|err| {
error!("{:?}", err);
RustusError::UnableToWrite(err.to_string())
})?;
let data = serde_json::to_string(&info).map_err(RustusError::from)?;
{
let mut writer = BufWriter::new(file);
writer.write_all(data.as_bytes())?;
writer.flush()?;
}
Ok(())
})
.await?
}

async fn get_info(&self, file_id: &str) -> RustusResult<FileInfo> {
let info_path = self.info_file_path(file_id);
if !info_path.exists() {
return Err(RustusError::FileNotFound);
}
let contents = read_to_string(info_path).await.map_err(|err| {
error!("{:?}", err);
RustusError::UnableToReadInfo
})?;
serde_json::from_str::<FileInfo>(contents.as_str()).map_err(RustusError::from)
actix_web::rt::task::spawn_blocking(move || {
if !info_path.exists() {
return Err(RustusError::FileNotFound);
}
let info = File::open(info_path)?;
let mut contents = String::new();
let mut reader = BufReader::new(info);
reader.read_to_string(&mut contents)?;
serde_json::from_str::<FileInfo>(contents.as_str()).map_err(RustusError::from)
})
.await?
}

async fn remove_info(&self, file_id: &str) -> RustusResult<()> {
let info_path = self.info_file_path(file_id);
if !info_path.exists() {
return Err(RustusError::FileNotFound);
}
remove_file(info_path).await.map_err(|err| {
error!("{:?}", err);
RustusError::UnableToRemove(String::from(file_id))
let id = String::from(file_id);
let info_path = self.info_file_path(id.as_str());
actix_web::rt::task::spawn_blocking(move || {
if !info_path.exists() {
return Err(RustusError::FileNotFound);
}
remove_file(info_path).map_err(|err| {
error!("{:?}", err);
RustusError::UnableToRemove(id)
})
})
.await?
}
}

Expand Down
26 changes: 19 additions & 7 deletions src/info_storages/models/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,25 @@ impl FileInfo {

pub async fn json(&self) -> RustusResult<String> {
let info_clone = self.clone();
let data = tokio::task::spawn_blocking(move || serde_json::to_string(&info_clone))
.await
.map_err(|err| {
error!("{}", err);
RustusError::UnableToWrite("Can't serialize info".into())
})??;
Ok(data)
actix_web::rt::task::spawn_blocking(move || {
serde_json::to_string(&info_clone).map_err(RustusError::from)
})
.await
.map_err(|err| {
error!("{}", err);
RustusError::UnableToWrite("Can't serialize info".into())
})?
}

pub async fn from_json(data: String) -> RustusResult<Self> {
actix_web::rt::task::spawn_blocking(move || {
serde_json::from_str::<Self>(data.as_str()).map_err(RustusError::from)
})
.await
.map_err(|err| {
error!("{}", err);
RustusError::UnableToWrite("Can't serialize info".into())
})?
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion src/info_storages/redis_info_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl InfoStorage for RedisStorage {
if res.is_none() {
return Err(RustusError::FileNotFound);
}
serde_json::from_str(res.unwrap().as_str()).map_err(RustusError::from)
FileInfo::from_json(res.unwrap()).await
}

async fn remove_info(&self, file_id: &str) -> RustusResult<()> {
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
use std::str::FromStr;
use std::sync::Arc;

use actix_web::http::Method;
use actix_web::{
dev::{Server, Service},
http::Method,
middleware, web, App, HttpServer,
};
use fern::colors::{Color, ColoredLevelConfig};
Expand Down
1 change: 1 addition & 0 deletions src/notifiers/dir_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ impl DirNotifier {

#[async_trait]
impl Notifier for DirNotifier {
#[cfg_attr(coverage, no_coverage)]
async fn prepare(&mut self) -> RustusResult<()> {
Ok(())
}
Expand Down
28 changes: 17 additions & 11 deletions src/protocol/core/write_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub async fn write_bytes(
// Parses header `Upload-Length` only if the creation-defer-length extension is enabled.
let updated_len = if state
.config
.extensions_vec()
.tus_extensions
.contains(&Extensions::CreationDeferLength)
{
parse_header(&request, "Upload-Length")
Expand Down Expand Up @@ -82,20 +82,19 @@ pub async fn write_bytes(
if Some(file_info.offset) == file_info.length {
return Err(RustusError::FrozenFile);
}

let chunk_len = bytes.len();
// Appending bytes to file.
state
.data_storage
.add_bytes(&file_info, bytes.as_ref())
.await?;
state.data_storage.add_bytes(&file_info, bytes).await?;
// Updating offset.
file_info.offset += bytes.len();
file_info.offset += chunk_len;
// Saving info to info storage.
state.info_storage.set_info(&file_info, false).await?;

let mut hook = Hook::PostReceive;
let mut keep_alive = true;
if file_info.length == Some(file_info.offset) {
hook = Hook::PostFinish;
keep_alive = false;
}
if state.config.hook_is_active(hook) {
let message = state
Expand All @@ -104,16 +103,23 @@ pub async fn write_bytes(
.hooks_format
.format(&request, &file_info)?;
let headers = request.headers().clone();
tokio::spawn(async move {
actix_web::rt::spawn(async move {
state
.notification_manager
.send_message(message, hook, &headers)
.await
});
}
Ok(HttpResponse::NoContent()
.insert_header(("Upload-Offset", file_info.offset.to_string()))
.finish())
if keep_alive {
Ok(HttpResponse::NoContent()
.insert_header(("Upload-Offset", file_info.offset.to_string()))
.keep_alive()
.finish())
} else {
Ok(HttpResponse::NoContent()
.insert_header(("Upload-Offset", file_info.offset.to_string()))
.finish())
}
}

#[cfg(test)]
Expand Down
12 changes: 6 additions & 6 deletions src/protocol/creation/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ pub async fn create_file(
let octet_stream = |val: &str| val == "application/offset+octet-stream";
if check_header(&request, "Content-Type", octet_stream) {
// Writing first bytes.
state
.data_storage
.add_bytes(&file_info, bytes.as_ref())
.await?;
file_info.offset += bytes.len();
let chunk_len = bytes.len();
// Appending bytes to file.
state.data_storage.add_bytes(&file_info, bytes).await?;
// Updating offset.
file_info.offset += chunk_len;
}
}

Expand All @@ -199,7 +199,7 @@ pub async fn create_file(
let headers = request.headers().clone();
// Adding send_message task to tokio reactor.
// Thin function would be executed in background.
tokio::spawn(async move {
actix_web::rt::spawn(async move {
state
.notification_manager
.send_message(message, Hook::PostCreate, &headers)
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/getting/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ pub async fn get_file(request: HttpRequest, state: web::Data<State>) -> RustusRe
}

#[cfg(test)]
#[cfg_attr(coverage, no_coverage)]
mod test {
use crate::{rustus_service, State};
use actix_web::http::StatusCode;
use actix_web::test::{call_service, init_service, TestRequest};
use actix_web::{web, App};
use bytes::Bytes;

#[actix_rt::test]
async fn success() {
Expand All @@ -38,7 +38,7 @@ mod test {
let file_info = state.create_test_file().await;
state
.data_storage
.add_bytes(&file_info, "data".as_bytes())
.add_bytes(&file_info, Bytes::from("testing"))
.await
.unwrap();
let request = TestRequest::get()
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/termination/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub async fn terminate(
.hooks_format
.format(&request, &file_info)?;
let headers = request.headers().clone();
tokio::spawn(async move {
actix_web::rt::spawn(async move {
state
.notification_manager
.send_message(message, Hook::PostTerminate, &headers)
Expand Down
1 change: 1 addition & 0 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl State {
data_storage: Box::new(crate::storages::file_storage::FileStorage::new(
config.storage_opts.data_dir.clone(),
config.storage_opts.dir_structure.clone(),
config.storage_opts.force_fsync,
)),
info_storage: Box::new(
crate::info_storages::file_info_storage::FileInfoStorage::new(
Expand Down
Loading

0 comments on commit 696206c

Please sign in to comment.