From 696206ce4e96ed6792dcbb8fd25aa254dfc96de7 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Sun, 6 Mar 2022 21:17:07 +0400 Subject: [PATCH] Made rustus go Brrrr. (#51) Made rustus go Brrrr. Description: * Optimized number of syscalls for writing by using Buffered IO; * Rmoved redundant clone calls; * Changed Storage add_bytes signature; * Moved serde calls to tokio; * Removed redundant memory allocations. * Async runtime is changed back to actix. * Spawning tasks with actix runtime. * Super mega solution to speedup fs ops. Signed-off-by: Pavel Kirilin --- Cargo.lock | 1 + Cargo.toml | 5 +- src/config.rs | 3 + src/errors.rs | 2 + src/info_storages/db_info_storage.rs | 2 +- src/info_storages/file_info_storage.rs | 77 ++++++---- src/info_storages/models/file_info.rs | 26 +++- src/info_storages/redis_info_storage.rs | 2 +- src/main.rs | 2 +- src/notifiers/dir_notifier.rs | 1 + src/protocol/core/write_bytes.rs | 28 ++-- src/protocol/creation/routes.rs | 12 +- src/protocol/getting/routes.rs | 4 +- src/protocol/termination/routes.rs | 2 +- src/state.rs | 1 + src/storages/file_storage.rs | 185 +++++++++++++----------- src/storages/models/available_stores.rs | 1 + src/storages/models/storage.rs | 3 +- src/utils/headers.rs | 2 +- 19 files changed, 214 insertions(+), 145 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cc866e4..decd5b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2398,6 +2398,7 @@ dependencies = [ "actix-web", "async-trait", "base64", + "bytes", "chrono", "derive_more", "fern", diff --git a/Cargo.toml b/Cargo.toml index b4c2507..4af3e82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ serde_json = "1" strfmt = "^0.1.6" thiserror = "^1.0" url = "2.2.2" +bytes = "1.1.0" [dependencies.futures] version = "0.3.21" @@ -82,7 +83,7 @@ features = ["derive"] version = "0.23" [dependencies.tokio] -features = ["time", "process", "fs"] +features = ["time", "process", "fs", "io-std", "io-util", "rt-multi-thread", "bytes"] version = "1.4.0" [dependencies.tokio-amqp] @@ -114,7 +115,7 @@ httptest = "0.15.4" [profile] [profile.release] -lto = true +lto = "fat" panic = "abort" opt-level = 3 codegen-units = 1 diff --git a/src/config.rs b/src/config.rs index 07cd41e..f1d862c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -27,6 +27,9 @@ pub struct StorageOptions { #[structopt(long, env = "RUSTUS_DIR_STRUCTURE", default_value = "")] pub dir_structure: String, + + #[structopt(long, parse(from_flag))] + pub force_fsync: bool, } #[derive(StructOpt, Debug, Clone)] diff --git a/src/errors.rs b/src/errors.rs index 942fe39..99eaec5 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -58,6 +58,8 @@ pub enum RustusError { AMQPPoolError(#[from] mobc_lapin::mobc::Error), #[error("Std error: {0}")] StdError(#[from] std::io::Error), + #[error("Can't spawn task: {0}")] + TokioSpawnError(#[from] tokio::task::JoinError), } /// This conversion allows us to use `RustusError` in the `main` function. diff --git a/src/info_storages/db_info_storage.rs b/src/info_storages/db_info_storage.rs index 164f2bc..baa0092 100644 --- a/src/info_storages/db_info_storage.rs +++ b/src/info_storages/db_info_storage.rs @@ -66,7 +66,7 @@ impl InfoStorage for DBInfoStorage { async fn get_info(&self, file_id: &str) -> RustusResult { let model: Option = self.db.fetch_by_column("id", file_id).await?; if let Some(info) = model { - serde_json::from_str(info.info.as_str()).map_err(RustusError::from) + FileInfo::from_json(info.info.to_string()).await } else { Err(RustusError::FileNotFound) } diff --git a/src/info_storages/file_info_storage.rs b/src/info_storages/file_info_storage.rs index 51f5567..309b47a 100644 --- a/src/info_storages/file_info_storage.rs +++ b/src/info_storages/file_info_storage.rs @@ -1,9 +1,11 @@ +use std::io::{Read, Write}; use std::path::PathBuf; use async_trait::async_trait; use log::error; -use tokio::fs::{read_to_string, remove_file, DirBuilder, OpenOptions}; -use tokio::io::copy; +use std::fs::{remove_file, File, OpenOptions}; +use std::io::{BufReader, BufWriter}; +use tokio::fs::DirBuilder; use crate::errors::{RustusError, RustusResult}; use crate::info_storages::{FileInfo, InfoStorage}; @@ -35,42 +37,57 @@ impl InfoStorage for FileInfoStorage { } async fn set_info(&self, file_info: &FileInfo, create: bool) -> RustusResult<()> { - let mut file = OpenOptions::new() - .write(true) - .create(create) - .truncate(true) - .open(self.info_file_path(file_info.id.as_str()).as_path()) - .await - .map_err(|err| { - error!("{:?}", err); - RustusError::UnableToWrite(err.to_string()) - })?; - copy(&mut file_info.json().await?.as_bytes(), &mut file).await?; - tokio::task::spawn(async move { file.sync_data().await }); - Ok(()) + let info = file_info.clone(); + let path = self.info_file_path(info.id.as_str()); + actix_web::rt::task::spawn_blocking(move || { + let file = OpenOptions::new() + .write(true) + .create(create) + .truncate(true) + .open(path) + .map_err(|err| { + error!("{:?}", err); + RustusError::UnableToWrite(err.to_string()) + })?; + let data = serde_json::to_string(&info).map_err(RustusError::from)?; + { + let mut writer = BufWriter::new(file); + writer.write_all(data.as_bytes())?; + writer.flush()?; + } + Ok(()) + }) + .await? } async fn get_info(&self, file_id: &str) -> RustusResult { let info_path = self.info_file_path(file_id); - if !info_path.exists() { - return Err(RustusError::FileNotFound); - } - let contents = read_to_string(info_path).await.map_err(|err| { - error!("{:?}", err); - RustusError::UnableToReadInfo - })?; - serde_json::from_str::(contents.as_str()).map_err(RustusError::from) + actix_web::rt::task::spawn_blocking(move || { + if !info_path.exists() { + return Err(RustusError::FileNotFound); + } + let info = File::open(info_path)?; + let mut contents = String::new(); + let mut reader = BufReader::new(info); + reader.read_to_string(&mut contents)?; + serde_json::from_str::(contents.as_str()).map_err(RustusError::from) + }) + .await? } async fn remove_info(&self, file_id: &str) -> RustusResult<()> { - let info_path = self.info_file_path(file_id); - if !info_path.exists() { - return Err(RustusError::FileNotFound); - } - remove_file(info_path).await.map_err(|err| { - error!("{:?}", err); - RustusError::UnableToRemove(String::from(file_id)) + let id = String::from(file_id); + let info_path = self.info_file_path(id.as_str()); + actix_web::rt::task::spawn_blocking(move || { + if !info_path.exists() { + return Err(RustusError::FileNotFound); + } + remove_file(info_path).map_err(|err| { + error!("{:?}", err); + RustusError::UnableToRemove(id) + }) }) + .await? } } diff --git a/src/info_storages/models/file_info.rs b/src/info_storages/models/file_info.rs index 5202706..806ef3e 100644 --- a/src/info_storages/models/file_info.rs +++ b/src/info_storages/models/file_info.rs @@ -93,13 +93,25 @@ impl FileInfo { pub async fn json(&self) -> RustusResult { let info_clone = self.clone(); - let data = tokio::task::spawn_blocking(move || serde_json::to_string(&info_clone)) - .await - .map_err(|err| { - error!("{}", err); - RustusError::UnableToWrite("Can't serialize info".into()) - })??; - Ok(data) + actix_web::rt::task::spawn_blocking(move || { + serde_json::to_string(&info_clone).map_err(RustusError::from) + }) + .await + .map_err(|err| { + error!("{}", err); + RustusError::UnableToWrite("Can't serialize info".into()) + })? + } + + pub async fn from_json(data: String) -> RustusResult { + actix_web::rt::task::spawn_blocking(move || { + serde_json::from_str::(data.as_str()).map_err(RustusError::from) + }) + .await + .map_err(|err| { + error!("{}", err); + RustusError::UnableToWrite("Can't serialize info".into()) + })? } #[cfg(test)] diff --git a/src/info_storages/redis_info_storage.rs b/src/info_storages/redis_info_storage.rs index 5e412a0..01bde22 100644 --- a/src/info_storages/redis_info_storage.rs +++ b/src/info_storages/redis_info_storage.rs @@ -46,7 +46,7 @@ impl InfoStorage for RedisStorage { if res.is_none() { return Err(RustusError::FileNotFound); } - serde_json::from_str(res.unwrap().as_str()).map_err(RustusError::from) + FileInfo::from_json(res.unwrap()).await } async fn remove_info(&self, file_id: &str) -> RustusResult<()> { diff --git a/src/main.rs b/src/main.rs index 5b5330c..99c82d3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,9 +3,9 @@ use std::str::FromStr; use std::sync::Arc; -use actix_web::http::Method; use actix_web::{ dev::{Server, Service}, + http::Method, middleware, web, App, HttpServer, }; use fern::colors::{Color, ColoredLevelConfig}; diff --git a/src/notifiers/dir_notifier.rs b/src/notifiers/dir_notifier.rs index 008d79c..73b0b9a 100644 --- a/src/notifiers/dir_notifier.rs +++ b/src/notifiers/dir_notifier.rs @@ -19,6 +19,7 @@ impl DirNotifier { #[async_trait] impl Notifier for DirNotifier { + #[cfg_attr(coverage, no_coverage)] async fn prepare(&mut self) -> RustusResult<()> { Ok(()) } diff --git a/src/protocol/core/write_bytes.rs b/src/protocol/core/write_bytes.rs index 322b219..9adb0b4 100644 --- a/src/protocol/core/write_bytes.rs +++ b/src/protocol/core/write_bytes.rs @@ -31,7 +31,7 @@ pub async fn write_bytes( // Parses header `Upload-Length` only if the creation-defer-length extension is enabled. let updated_len = if state .config - .extensions_vec() + .tus_extensions .contains(&Extensions::CreationDeferLength) { parse_header(&request, "Upload-Length") @@ -82,20 +82,19 @@ pub async fn write_bytes( if Some(file_info.offset) == file_info.length { return Err(RustusError::FrozenFile); } - + let chunk_len = bytes.len(); // Appending bytes to file. - state - .data_storage - .add_bytes(&file_info, bytes.as_ref()) - .await?; + state.data_storage.add_bytes(&file_info, bytes).await?; // Updating offset. - file_info.offset += bytes.len(); + file_info.offset += chunk_len; // Saving info to info storage. state.info_storage.set_info(&file_info, false).await?; let mut hook = Hook::PostReceive; + let mut keep_alive = true; if file_info.length == Some(file_info.offset) { hook = Hook::PostFinish; + keep_alive = false; } if state.config.hook_is_active(hook) { let message = state @@ -104,16 +103,23 @@ pub async fn write_bytes( .hooks_format .format(&request, &file_info)?; let headers = request.headers().clone(); - tokio::spawn(async move { + actix_web::rt::spawn(async move { state .notification_manager .send_message(message, hook, &headers) .await }); } - Ok(HttpResponse::NoContent() - .insert_header(("Upload-Offset", file_info.offset.to_string())) - .finish()) + if keep_alive { + Ok(HttpResponse::NoContent() + .insert_header(("Upload-Offset", file_info.offset.to_string())) + .keep_alive() + .finish()) + } else { + Ok(HttpResponse::NoContent() + .insert_header(("Upload-Offset", file_info.offset.to_string())) + .finish()) + } } #[cfg(test)] diff --git a/src/protocol/creation/routes.rs b/src/protocol/creation/routes.rs index 20df718..7a6828b 100644 --- a/src/protocol/creation/routes.rs +++ b/src/protocol/creation/routes.rs @@ -180,11 +180,11 @@ pub async fn create_file( let octet_stream = |val: &str| val == "application/offset+octet-stream"; if check_header(&request, "Content-Type", octet_stream) { // Writing first bytes. - state - .data_storage - .add_bytes(&file_info, bytes.as_ref()) - .await?; - file_info.offset += bytes.len(); + let chunk_len = bytes.len(); + // Appending bytes to file. + state.data_storage.add_bytes(&file_info, bytes).await?; + // Updating offset. + file_info.offset += chunk_len; } } @@ -199,7 +199,7 @@ pub async fn create_file( let headers = request.headers().clone(); // Adding send_message task to tokio reactor. // Thin function would be executed in background. - tokio::spawn(async move { + actix_web::rt::spawn(async move { state .notification_manager .send_message(message, Hook::PostCreate, &headers) diff --git a/src/protocol/getting/routes.rs b/src/protocol/getting/routes.rs index e3e7c0a..f16b8b5 100644 --- a/src/protocol/getting/routes.rs +++ b/src/protocol/getting/routes.rs @@ -21,12 +21,12 @@ pub async fn get_file(request: HttpRequest, state: web::Data) -> RustusRe } #[cfg(test)] -#[cfg_attr(coverage, no_coverage)] mod test { use crate::{rustus_service, State}; use actix_web::http::StatusCode; use actix_web::test::{call_service, init_service, TestRequest}; use actix_web::{web, App}; + use bytes::Bytes; #[actix_rt::test] async fn success() { @@ -38,7 +38,7 @@ mod test { let file_info = state.create_test_file().await; state .data_storage - .add_bytes(&file_info, "data".as_bytes()) + .add_bytes(&file_info, Bytes::from("testing")) .await .unwrap(); let request = TestRequest::get() diff --git a/src/protocol/termination/routes.rs b/src/protocol/termination/routes.rs index 82ae88e..33f11a9 100644 --- a/src/protocol/termination/routes.rs +++ b/src/protocol/termination/routes.rs @@ -27,7 +27,7 @@ pub async fn terminate( .hooks_format .format(&request, &file_info)?; let headers = request.headers().clone(); - tokio::spawn(async move { + actix_web::rt::spawn(async move { state .notification_manager .send_message(message, Hook::PostTerminate, &headers) diff --git a/src/state.rs b/src/state.rs index 07de073..45fa622 100644 --- a/src/state.rs +++ b/src/state.rs @@ -31,6 +31,7 @@ impl State { data_storage: Box::new(crate::storages::file_storage::FileStorage::new( config.storage_opts.data_dir.clone(), config.storage_opts.dir_structure.clone(), + config.storage_opts.force_fsync, )), info_storage: Box::new( crate::info_storages::file_info_storage::FileInfoStorage::new( diff --git a/src/storages/file_storage.rs b/src/storages/file_storage.rs index b2a850b..4f348d7 100644 --- a/src/storages/file_storage.rs +++ b/src/storages/file_storage.rs @@ -1,10 +1,12 @@ +use std::io::Write; use std::path::PathBuf; use actix_files::NamedFile; use async_trait::async_trait; +use bytes::Bytes; use log::error; -use tokio::fs::{remove_file, DirBuilder, OpenOptions}; -use tokio::io::copy; +use std::fs::{remove_file, DirBuilder, OpenOptions}; +use std::io::{copy, BufReader, BufWriter}; use crate::errors::{RustusError, RustusResult}; use crate::info_storages::FileInfo; @@ -17,17 +19,19 @@ use derive_more::Display; pub struct FileStorage { data_dir: PathBuf, dir_struct: String, + force_fsync: bool, } impl FileStorage { - pub fn new(data_dir: PathBuf, dir_struct: String) -> FileStorage { + pub fn new(data_dir: PathBuf, dir_struct: String, force_fsync: bool) -> FileStorage { FileStorage { data_dir, dir_struct, + force_fsync, } } - pub async fn data_file_path(&self, file_id: &str) -> RustusResult { + pub fn data_file_path(&self, file_id: &str) -> RustusResult { let dir = self .data_dir // We're working wit absolute paths, because tus.io says so. @@ -40,7 +44,6 @@ impl FileStorage { DirBuilder::new() .recursive(true) .create(dir.as_path()) - .await .map_err(|err| { error!("{}", err); RustusError::UnableToWrite(err.to_string()) @@ -58,7 +61,6 @@ impl Storage for FileStorage { DirBuilder::new() .recursive(true) .create(self.data_dir.as_path()) - .await .map_err(|err| RustusError::UnableToPrepareStorage(err.to_string()))?; } Ok(()) @@ -76,48 +78,61 @@ impl Storage for FileStorage { }) } - async fn add_bytes(&self, info: &FileInfo, bytes: &[u8]) -> RustusResult<()> { + async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> RustusResult<()> { // In normal situation this `if` statement is not // gonna be called, but what if it is ... - if info.path.is_none() { + if file_info.path.is_none() { return Err(RustusError::FileNotFound); } - // Opening file in w+a mode. - // It means that we're going to append some - // bytes to the end of a file. - let mut file = OpenOptions::new() - .write(true) - .append(true) - .create(false) - .open(info.path.as_ref().unwrap()) - .await - .map_err(|err| { - error!("{:?}", err); - RustusError::UnableToWrite(err.to_string()) - })?; - #[allow(clippy::clone_double_ref)] - let mut buffer = bytes.clone(); - copy(&mut buffer, &mut file).await?; - tokio::task::spawn(async move { file.sync_data().await }); - Ok(()) + let path = String::from(file_info.path.as_ref().unwrap()); + let force_sync = self.force_fsync; + actix_web::rt::task::spawn_blocking(move || { + // Opening file in w+a mode. + // It means that we're going to append some + // bytes to the end of a file. + let file = OpenOptions::new() + .write(true) + .append(true) + .create(false) + .read(false) + .truncate(false) + .open(path.as_str()) + .map_err(|err| { + error!("{:?}", err); + RustusError::UnableToWrite(err.to_string()) + })?; + { + let mut writer = BufWriter::new(file); + writer.write_all(bytes.as_ref())?; + writer.flush()?; + if force_sync { + writer.get_ref().sync_data()?; + } + } + Ok(()) + }) + .await? } async fn create_file(&self, file_info: &FileInfo) -> RustusResult { + let info = file_info.clone(); // New path to file. - let file_path = self.data_file_path(file_info.id.as_str()).await?; - // Creating new file. - OpenOptions::new() - .create(true) - .write(true) - .truncate(true) - .create_new(true) - .open(file_path.as_path()) - .await - .map_err(|err| { - error!("{:?}", err); - RustusError::FileAlreadyExists(file_info.id.clone()) - })?; - Ok(file_path.display().to_string()) + let file_path = self.data_file_path(info.id.as_str())?; + actix_web::rt::task::spawn_blocking(move || { + // Creating new file. + OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .create_new(true) + .open(file_path.as_path()) + .map_err(|err| { + error!("{:?}", err); + RustusError::FileAlreadyExists(info.id.clone()) + })?; + Ok(file_path.display().to_string()) + }) + .await? } async fn concat_files( @@ -125,41 +140,48 @@ impl Storage for FileStorage { file_info: &FileInfo, parts_info: Vec, ) -> RustusResult<()> { - let mut file = OpenOptions::new() - .write(true) - .append(true) - .create(true) - .open(file_info.path.as_ref().unwrap().clone()) - .await - .map_err(|err| { - error!("{:?}", err); - RustusError::UnableToWrite(err.to_string()) - })?; - for part in parts_info { - if part.path.is_none() { - return Err(RustusError::FileNotFound); + let info = file_info.clone(); + actix_web::rt::task::spawn_blocking(move || { + let mut file = OpenOptions::new() + .write(true) + .append(true) + .create(true) + .open(info.path.as_ref().unwrap().clone()) + .map_err(|err| { + error!("{:?}", err); + RustusError::UnableToWrite(err.to_string()) + })?; + for part in parts_info { + if part.path.is_none() { + return Err(RustusError::FileNotFound); + } + let part_file = OpenOptions::new() + .read(true) + .open(part.path.as_ref().unwrap())?; + let mut reader = BufReader::new(part_file); + copy(&mut reader, &mut file)?; } - let mut part_file = OpenOptions::new() - .read(true) - .open(part.path.as_ref().unwrap()) - .await?; - copy(&mut part_file, &mut file).await?; - } - file.sync_data().await?; - Ok(()) + file.sync_data()?; + Ok(()) + }) + .await? } async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()> { - // Let's remove the file itself. - let data_path = PathBuf::from(file_info.path.as_ref().unwrap().clone()); - if !data_path.exists() { - return Err(RustusError::FileNotFound); - } - remove_file(data_path).await.map_err(|err| { - error!("{:?}", err); - RustusError::UnableToRemove(file_info.id.clone()) - })?; - Ok(()) + let info = file_info.clone(); + actix_web::rt::task::spawn_blocking(move || { + // Let's remove the file itself. + let data_path = PathBuf::from(info.path.as_ref().unwrap().clone()); + if !data_path.exists() { + return Err(RustusError::FileNotFound); + } + remove_file(data_path).map_err(|err| { + error!("{:?}", err); + RustusError::UnableToRemove(info.id.clone()) + })?; + Ok(()) + }) + .await? } } @@ -168,6 +190,7 @@ mod tests { use super::FileStorage; use crate::info_storages::FileInfo; use crate::Storage; + use bytes::Bytes; use std::fs::File; use std::io::{Read, Write}; use std::path::PathBuf; @@ -176,7 +199,7 @@ mod tests { async fn preparation() { let dir = tempdir::TempDir::new("file_storage").unwrap(); let target_path = dir.into_path().join("not_exist"); - let mut storage = FileStorage::new(target_path.clone(), "".into()); + let mut storage = FileStorage::new(target_path.clone(), "".into(), false); assert_eq!(target_path.exists(), false); storage.prepare().await.unwrap(); assert_eq!(target_path.exists(), true); @@ -185,7 +208,7 @@ mod tests { #[actix_rt::test] async fn create_file() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into()); + let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); let file_info = FileInfo::new("test_id", Some(5), None, storage.to_string(), None); let new_path = storage.create_file(&file_info).await.unwrap(); assert!(PathBuf::from(new_path).exists()); @@ -195,7 +218,7 @@ mod tests { async fn create_file_but_it_exists() { let dir = tempdir::TempDir::new("file_storage").unwrap(); let base_path = dir.into_path().clone(); - let storage = FileStorage::new(base_path.clone(), "".into()); + let storage = FileStorage::new(base_path.clone(), "".into(), false); let file_info = FileInfo::new("test_id", Some(5), None, storage.to_string(), None); File::create(base_path.join("test_id")).unwrap(); let result = storage.create_file(&file_info).await; @@ -205,13 +228,13 @@ mod tests { #[actix_rt::test] async fn adding_bytes() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into()); + let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); let mut file_info = FileInfo::new("test_id", Some(5), None, storage.to_string(), None); let new_path = storage.create_file(&file_info).await.unwrap(); let test_data = "MyTestData"; file_info.path = Some(new_path.clone()); storage - .add_bytes(&file_info, test_data.as_bytes()) + .add_bytes(&file_info, Bytes::from(test_data)) .await .unwrap(); let mut file = File::open(new_path).unwrap(); @@ -223,7 +246,7 @@ mod tests { #[actix_rt::test] async fn adding_bytes_to_unknown_file() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into()); + let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); let file_info = FileInfo::new( "test_id", Some(5), @@ -232,14 +255,14 @@ mod tests { None, ); let test_data = "MyTestData"; - let result = storage.add_bytes(&file_info, test_data.as_bytes()).await; + let result = storage.add_bytes(&file_info, Bytes::from(test_data)).await; assert!(result.is_err()) } #[actix_rt::test] async fn get_contents_of_unknown_file() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into()); + let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); let file_info = FileInfo::new( "test_id", Some(5), @@ -254,7 +277,7 @@ mod tests { #[actix_rt::test] async fn remove_unknown_file() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into()); + let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); let file_info = FileInfo::new( "test_id", Some(5), @@ -269,7 +292,7 @@ mod tests { #[actix_rt::test] async fn success_concatenation() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into()); + let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); let mut parts = Vec::new(); let part1_path = storage.data_dir.as_path().join("part1"); diff --git a/src/storages/models/available_stores.rs b/src/storages/models/available_stores.rs index 7bde098..25ff19e 100644 --- a/src/storages/models/available_stores.rs +++ b/src/storages/models/available_stores.rs @@ -26,6 +26,7 @@ impl AvailableStores { Self::FileStorage => Box::new(file_storage::FileStorage::new( config.storage_opts.data_dir.clone(), config.storage_opts.dir_structure.clone(), + config.storage_opts.force_fsync, )), } } diff --git a/src/storages/models/storage.rs b/src/storages/models/storage.rs index 0d790b9..c14820b 100644 --- a/src/storages/models/storage.rs +++ b/src/storages/models/storage.rs @@ -2,6 +2,7 @@ use crate::errors::RustusResult; use crate::info_storages::FileInfo; use actix_files::NamedFile; use async_trait::async_trait; +use bytes::Bytes; use std::fmt::Display; #[async_trait] @@ -41,7 +42,7 @@ pub trait Storage: Display { /// # Params /// `file_info` - info about current file. /// `bytes` - bytes to append to the file. - async fn add_bytes(&self, file_info: &FileInfo, bytes: &[u8]) -> RustusResult<()>; + async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> RustusResult<()>; /// Create file in storage. /// diff --git a/src/utils/headers.rs b/src/utils/headers.rs index 493b09a..1bec0b8 100644 --- a/src/utils/headers.rs +++ b/src/utils/headers.rs @@ -17,7 +17,7 @@ pub fn parse_header(request: &HttpRequest, header_name: &str) -> Opt .and_then(|value| // Parsing it to string. match value.to_str() { - Ok(header_str) => Some(String::from(header_str)), + Ok(header_str) => Some(header_str), Err(_) => None, }) .and_then(|val|