diff --git a/Cargo.lock b/Cargo.lock index 9e7078348..36edd8ec3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -104,7 +104,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", - "zstd", + "zstd 0.13.2", ] [[package]] @@ -827,6 +827,7 @@ dependencies = [ "serde_json", "serde_repr", "sqlx", + "tempdir", "thiserror", "tokio", "tokio-stream", @@ -851,7 +852,7 @@ dependencies = [ "base64ct", "blake2", "cpufeatures", - "password-hash", + "password-hash 0.5.0", ] [[package]] @@ -967,8 +968,8 @@ dependencies = [ "memchr", "pin-project-lite", "xz2", - "zstd", - "zstd-safe", + "zstd 0.13.2", + "zstd-safe 7.2.0", ] [[package]] @@ -2237,7 +2238,7 @@ dependencies = [ [[package]] name = "collab" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=ee8cb917ed68301c123f10c01f0803305569ec7d#ee8cb917ed68301c123f10c01f0803305569ec7d" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=e77fd84e0e32b4dc1dcfa271547517e5b0d8e987#e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" dependencies = [ "anyhow", "arc-swap", @@ -2262,7 +2263,7 @@ dependencies = [ [[package]] name = "collab-database" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=ee8cb917ed68301c123f10c01f0803305569ec7d#ee8cb917ed68301c123f10c01f0803305569ec7d" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=e77fd84e0e32b4dc1dcfa271547517e5b0d8e987#e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" dependencies = [ "anyhow", "async-trait", @@ -2301,7 +2302,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=ee8cb917ed68301c123f10c01f0803305569ec7d#ee8cb917ed68301c123f10c01f0803305569ec7d" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=e77fd84e0e32b4dc1dcfa271547517e5b0d8e987#e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" dependencies = [ "anyhow", "arc-swap", @@ -2322,7 +2323,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=ee8cb917ed68301c123f10c01f0803305569ec7d#ee8cb917ed68301c123f10c01f0803305569ec7d" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=e77fd84e0e32b4dc1dcfa271547517e5b0d8e987#e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" dependencies = [ "anyhow", "bytes", @@ -2342,7 +2343,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=ee8cb917ed68301c123f10c01f0803305569ec7d#ee8cb917ed68301c123f10c01f0803305569ec7d" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=e77fd84e0e32b4dc1dcfa271547517e5b0d8e987#e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" dependencies = [ "anyhow", "arc-swap", @@ -2364,7 +2365,7 @@ dependencies = [ [[package]] name = "collab-importer" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=ee8cb917ed68301c123f10c01f0803305569ec7d#ee8cb917ed68301c123f10c01f0803305569ec7d" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=e77fd84e0e32b4dc1dcfa271547517e5b0d8e987#e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" dependencies = [ "anyhow", "async-recursion", @@ -2396,6 +2397,7 @@ dependencies = [ "tracing", "uuid", "walkdir", + "zip", ] [[package]] @@ -2465,7 +2467,7 @@ dependencies = [ [[package]] name = "collab-user" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=ee8cb917ed68301c123f10c01f0803305569ec7d#ee8cb917ed68301c123f10c01f0803305569ec7d" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=e77fd84e0e32b4dc1dcfa271547517e5b0d8e987#e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" dependencies = [ "anyhow", "collab", @@ -2543,6 +2545,12 @@ dependencies = [ "tiny-keccak", ] +[[package]] +name = "constant_time_eq" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" + [[package]] name = "convert_case" version = "0.4.0" @@ -3366,6 +3374,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + [[package]] name = "funty" version = "2.0.0" @@ -4953,6 +4967,17 @@ dependencies = [ "regex", ] +[[package]] +name = "password-hash" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7676374caaee8a325c9e7a2ae557f216c5563a171d6997b0ef8a65af35147700" +dependencies = [ + "base64ct", + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "password-hash" version = "0.5.0" @@ -4970,6 +4995,18 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pbkdf2" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83a0692ec44e4cf1ef28ca317f14f8f07da2d95ec3fa01f86e4467b725e60917" +dependencies = [ + "digest", + "hmac", + "password-hash 0.4.2", + "sha2", +] + [[package]] name = "pem" version = "1.1.1" @@ -5612,6 +5649,19 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" +dependencies = [ + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "rdrand", + "winapi", +] + [[package]] name = "rand" version = "0.7.3" @@ -5656,6 +5706,21 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +dependencies = [ + "rand_core 0.4.2", +] + +[[package]] +name = "rand_core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" + [[package]] name = "rand_core" version = "0.5.1" @@ -5725,6 +5790,15 @@ dependencies = [ "yasna", ] +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +dependencies = [ + "rand_core 0.3.1", +] + [[package]] name = "redis" version = "0.23.3" @@ -5852,6 +5926,15 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +[[package]] +name = "remove_dir_all" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" +dependencies = [ + "winapi", +] + [[package]] name = "rend" version = "0.4.2" @@ -7185,6 +7268,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tempdir" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" +dependencies = [ + "rand 0.4.6", + "remove_dir_all", +] + [[package]] name = "tempfile" version = "3.10.1" @@ -8568,13 +8661,52 @@ dependencies = [ "syn 2.0.72", ] +[[package]] +name = "zip" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "760394e246e4c28189f19d488c058bf16f564016aefac5d32bb1f3b51d5e9261" +dependencies = [ + "aes", + "byteorder", + "bzip2", + "constant_time_eq", + "crc32fast", + "crossbeam-utils", + "flate2", + "hmac", + "pbkdf2", + "sha1", + "time", + "zstd 0.11.2+zstd.1.5.2", +] + +[[package]] +name = "zstd" +version = "0.11.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" +dependencies = [ + "zstd-safe 5.0.2+zstd.1.5.2", +] + [[package]] name = "zstd" version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9" dependencies = [ - "zstd-safe", + "zstd-safe 7.2.0", +] + +[[package]] +name = "zstd-safe" +version = "5.0.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" +dependencies = [ + "libc", + "zstd-sys", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 09e234be8..afaa123d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -304,13 +304,13 @@ debug = true [patch.crates-io] # It's diffcult to resovle different version with the same crate used in AppFlowy Frontend and the Client-API crate. # So using patch to workaround this issue. -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ee8cb917ed68301c123f10c01f0803305569ec7d" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ee8cb917ed68301c123f10c01f0803305569ec7d" } -collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ee8cb917ed68301c123f10c01f0803305569ec7d" } -collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ee8cb917ed68301c123f10c01f0803305569ec7d" } -collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ee8cb917ed68301c123f10c01f0803305569ec7d" } -collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ee8cb917ed68301c123f10c01f0803305569ec7d" } -collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ee8cb917ed68301c123f10c01f0803305569ec7d" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" } +collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" } +collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" } +collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" } +collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" } +collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" } [features] history = [] diff --git a/services/appflowy-worker/Cargo.toml b/services/appflowy-worker/Cargo.toml index 80a076f26..048d3fd1a 100644 --- a/services/appflowy-worker/Cargo.toml +++ b/services/appflowy-worker/Cargo.toml @@ -48,5 +48,6 @@ uuid.workspace = true mailer.workspace = true md5.workspace = true base64.workspace = true +tempdir = "0.3.7" diff --git a/services/appflowy-worker/src/import_worker/worker.rs b/services/appflowy-worker/src/import_worker/worker.rs index 4e4aa0816..b18344261 100644 --- a/services/appflowy-worker/src/import_worker/worker.rs +++ b/services/appflowy-worker/src/import_worker/worker.rs @@ -17,7 +17,6 @@ use collab_importer::imported_collab::ImportType; use collab_importer::notion::page::CollabResource; use collab_importer::notion::NotionImporter; use collab_importer::util::FileId; -use collab_importer::zip_tool::unzip_stream; use database::collab::mem_cache::{cache_exp_secs_from_collab_type, CollabMemCache}; use database::collab::{insert_into_af_collab_bulk_for_user, select_blob_from_af_collab}; use database::resource_usage::{insert_blob_metadata_bulk, BulkInsertMeta}; @@ -29,6 +28,8 @@ use database_entity::dto::CollabParams; use async_zip::base::read::stream::{Ready, ZipFileReader}; +use collab_importer::zip_tool::async_zip::async_unzip; +use collab_importer::zip_tool::sync_zip::sync_unzip; use futures::stream::FuturesUnordered; use futures::{stream, AsyncBufRead, StreamExt}; use infra::env_util::get_env_var; @@ -79,14 +80,7 @@ pub async fn run_import_worker( error!("Failed to ensure consumer group: {:?}", err); } - let mut storage_dir = temp_dir().join("import_worker_temp_dir"); - if !storage_dir.exists() { - if let Err(err) = fs::create_dir(&storage_dir).await { - error!("Failed to create importer temp dir: {:?}", err); - storage_dir = temp_dir(); - } - } - + let storage_dir = temp_dir(); process_un_acked_tasks( &storage_dir, &mut redis_client, @@ -359,10 +353,10 @@ pub async fn download_and_unzip_file_retry( attempt += 1; match download_and_unzip_file(storage_dir, import_task, s3_client, streaming).await { Ok(result) => return Ok(result), - Err(err) if attempt <= max_retries && !err.is_file_not_found() => { - warn!( - "Attempt {} failed: {}. Retrying in {:?}...", - attempt, err, interval + Err(err) if attempt < max_retries && !err.is_file_not_found() => { + error!( + "{} attempt {} failed: {}. Retrying in {:?}...", + import_task.workspace_id, attempt, err, interval ); tokio::time::sleep(interval).await; }, @@ -396,34 +390,54 @@ async fn download_and_unzip_file( .await .map_err(|err| ImportError::Internal(err.into()))?; let buffer_size = buffer_size_from_content_length(content_length); + if streaming { + let zip_reader = get_zip_reader(buffer_size, StreamOrFile::Stream(stream)).await?; + let unique_file_name = Uuid::new_v4().to_string(); + let output_file_path = storage_dir.join(unique_file_name); + fs::create_dir_all(&output_file_path) + .await + .map_err(|err| ImportError::Internal(err.into()))?; + fs::set_permissions(&output_file_path, Permissions::from_mode(0o777)) + .await + .map_err(|err| { + ImportError::Internal(anyhow!("Failed to set permissions for temp dir: {:?}", err)) + })?; + let unzip_file = async_unzip( + zip_reader.inner, + output_file_path, + Some(import_task.workspace_name.clone()), + ) + .await?; + Ok(unzip_file.unzip_dir_path) + } else { + let file = download_file( + &import_task.workspace_id, + storage_dir, + stream, + &import_task.md5_base64, + ) + .await?; + info!( + "[Import] {} start unzip file: {:?}", + import_task.workspace_id, + file.path_buf() + ); - let zip_reader = get_zip_reader( - storage_dir, - stream, - buffer_size, - streaming, - &import_task.md5_base64, - ) - .await?; - let unique_file_name = Uuid::new_v4().to_string(); - let output_file_path = storage_dir.join(unique_file_name); - fs::create_dir_all(&output_file_path) - .await - .map_err(|err| ImportError::Internal(err.into()))?; - - fs::set_permissions(&output_file_path, Permissions::from_mode(0o777)) - .await - .map_err(|err| { - ImportError::Internal(anyhow!("Failed to set permissions for temp dir: {:?}", err)) - })?; + let file_path = file.path_buf().clone(); + let storage_dir = storage_dir.to_path_buf(); + let workspace_name = import_task.workspace_name.clone(); + let unzip_file = + tokio::task::spawn_blocking(move || sync_unzip(file_path, storage_dir, Some(workspace_name))) + .await + .map_err(|err| ImportError::Internal(err.into()))??; - let unzip_file = unzip_stream( - zip_reader.inner, - output_file_path, - Some(import_task.workspace_name.clone()), - ) - .await?; - Ok(unzip_file.unzip_dir_path) + trace!( + "[Import] {} finish unzip file: {:?}", + import_task.workspace_id, + unzip_file.unzip_dir + ); + Ok(unzip_file.unzip_dir) + } } struct ZipReader { @@ -432,6 +446,12 @@ struct ZipReader { file: Option, } +#[allow(dead_code)] +enum StreamOrFile { + Stream(Box), + File(AutoRemoveDownloadedFile), +} + /// Asynchronously returns a `ZipFileReader` that can read from a stream or a downloaded file, based on the environment setting. /// /// This function checks whether streaming is enabled via the `APPFLOWY_WORKER_IMPORT_NOTION_STREAMING` environment variable. @@ -439,37 +459,35 @@ struct ZipReader { /// Otherwise, it first downloads the zip file to a local file and then reads from it. /// async fn get_zip_reader( - storage_dir: &Path, - stream: Box, buffer_size: usize, - streaming: bool, - file_md5_base64: &Option, + stream_or_file: StreamOrFile, ) -> Result { - let zip_reader = if streaming { - // Occasionally, we encounter the error 'unable to locate the end of central directory record' - // when streaming a ZIP file to async-zip. This indicates that the ZIP reader couldn't find - // the necessary end-of-file marker. The issue might occur if the entire ZIP file has not been - // fully downloaded or buffered before the reader attempts to process the end-of-file information. - let reader = futures::io::BufReader::with_capacity(buffer_size, stream); - let boxed_reader: Pin> = Box::pin(reader); - ZipReader { - inner: async_zip::base::read::stream::ZipFileReader::new(boxed_reader), - file: None, - } - } else { - let file = download_file(storage_dir, stream, file_md5_base64).await?; - let handle = fs::File::open(&file) - .await - .map_err(|err| ImportError::Internal(err.into()))?; - let reader = tokio::io::BufReader::with_capacity(buffer_size, handle).compat(); - let boxed_reader: Pin> = Box::pin(reader); - ZipReader { - inner: async_zip::base::read::stream::ZipFileReader::new(boxed_reader), - // Make sure the lifetime of file is the same as zip reader. - file: Some(file), - } - }; - Ok(zip_reader) + match stream_or_file { + StreamOrFile::Stream(stream) => { + // Occasionally, we encounter the error 'unable to locate the end of central directory record' + // when streaming a ZIP file to async-zip. This indicates that the ZIP reader couldn't find + // the necessary end-of-file marker. The issue might occur if the entire ZIP file has not been + // fully downloaded or buffered before the reader attempts to process the end-of-file information. + let reader = futures::io::BufReader::with_capacity(buffer_size, stream); + let boxed_reader: Pin> = Box::pin(reader); + Ok(ZipReader { + inner: async_zip::base::read::stream::ZipFileReader::new(boxed_reader), + file: None, + }) + }, + StreamOrFile::File(file) => { + let handle = fs::File::open(&file) + .await + .map_err(|err| ImportError::Internal(err.into()))?; + let reader = tokio::io::BufReader::with_capacity(buffer_size, handle).compat(); + let boxed_reader: Pin> = Box::pin(reader); + Ok(ZipReader { + inner: async_zip::base::read::stream::ZipFileReader::new(boxed_reader), + // Make sure the lifetime of file is the same as zip reader. + file: Some(file), + }) + }, + } } /// Determines the buffer size based on the content length of the file. @@ -782,7 +800,11 @@ async fn process_unzip_file( // 8. delete zip file regardless of success or failure match fs::remove_dir_all(unzip_dir_path).await { - Ok(_) => trace!("[Import]: {} deleted unzip file", import_task.workspace_id), + Ok(_) => trace!( + "[Import]: {} deleted unzip file: {:?}", + import_task.workspace_id, + unzip_dir_path + ), Err(err) => error!("Failed to delete unzip file: {:?}", err), } diff --git a/services/appflowy-worker/src/s3_client.rs b/services/appflowy-worker/src/s3_client.rs index 0cd3d7f39..9a157262b 100644 --- a/services/appflowy-worker/src/s3_client.rs +++ b/services/appflowy-worker/src/s3_client.rs @@ -1,6 +1,7 @@ use crate::error::WorkerError; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use aws_sdk_s3::error::SdkError; +use std::fs::Permissions; use anyhow::Result; use aws_sdk_s3::operation::get_object::GetObjectError; @@ -10,12 +11,13 @@ use base64::engine::general_purpose::STANDARD; use base64::Engine; use futures::AsyncReadExt; use std::ops::Deref; +use std::os::unix::fs::PermissionsExt; use std::path::{Path, PathBuf}; use tokio::fs; -use tokio::fs::File; +use tokio::fs::OpenOptions; use tokio::io::AsyncWriteExt; use tokio_util::compat::TokioAsyncReadCompatExt; -use tracing::error; +use tracing::{error, info, trace}; use uuid::Uuid; #[async_trait] @@ -133,17 +135,26 @@ pub struct S3StreamResponse { pub content_length: Option, } -pub struct AutoRemoveDownloadedFile(PathBuf); +pub struct AutoRemoveDownloadedFile { + zip_file_path: PathBuf, + pub(crate) workspace_id: String, +} + +impl AutoRemoveDownloadedFile { + pub fn path_buf(&self) -> &PathBuf { + &self.zip_file_path + } +} impl AsRef for AutoRemoveDownloadedFile { fn as_ref(&self) -> &Path { - &self.0 + &self.zip_file_path } } impl AsRef for AutoRemoveDownloadedFile { fn as_ref(&self) -> &PathBuf { - &self.0 + &self.zip_file_path } } @@ -151,32 +162,55 @@ impl Deref for AutoRemoveDownloadedFile { type Target = PathBuf; fn deref(&self) -> &Self::Target { - &self.0 + &self.zip_file_path } } impl Drop for AutoRemoveDownloadedFile { fn drop(&mut self) { - let path = self.0.clone(); + let path = self.zip_file_path.clone(); + let _workspace_id = self.workspace_id.clone(); tokio::spawn(async move { - if let Err(err) = fs::remove_file(&path).await { - error!( - "Failed to delete the auto remove downloaded file: {:?}, error: {}", - path, err - ) + if path.exists() { + if let Err(err) = fs::remove_file(&path).await { + error!( + "Failed to delete the auto remove downloaded file: {:?}, error: {}", + path, err + ) + } } }); } } pub async fn download_file( + workspace_id: &str, storage_dir: &Path, stream: Box, expected_md5_base64: &Option, ) -> Result { - let zip_file_path = storage_dir.join(format!("{}.zip", Uuid::new_v4())); + let zip_file_dir = storage_dir.join(format!("{}", Uuid::new_v4())); + if !zip_file_dir.exists() { + fs::create_dir_all(&zip_file_dir).await?; + let file_permissions = Permissions::from_mode(0o777); + fs::set_permissions(&zip_file_dir, file_permissions).await?; + } + + let zip_file_path = zip_file_dir.join("file.zip"); + trace!( + "[Import] {} start to write stream to file: {:?}", + workspace_id, + zip_file_path + ); write_stream_to_file(&zip_file_path, expected_md5_base64, stream).await?; - Ok(AutoRemoveDownloadedFile(zip_file_path)) + info!( + "[Import] {} finish writing stream to file: {:?}", + workspace_id, zip_file_path + ); + Ok(AutoRemoveDownloadedFile { + zip_file_path, + workspace_id: workspace_id.to_string(), + }) } pub async fn write_stream_to_file( @@ -185,7 +219,14 @@ pub async fn write_stream_to_file( mut stream: Box, ) -> Result<(), anyhow::Error> { let mut context = md5::Context::new(); - let mut file = File::create(file_path).await?; + let mut file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .mode(0o644) + .open(file_path) + .await + .map_err(|err| anyhow!("Failed to create file with permissions: {:?}", err))?; let mut buffer = vec![0u8; 1_048_576]; loop { let bytes_read = stream.read(&mut buffer).await?; @@ -193,7 +234,10 @@ pub async fn write_stream_to_file( break; } context.consume(&buffer[..bytes_read]); - file.write_all(&buffer[..bytes_read]).await?; + file + .write_all(&buffer[..bytes_read]) + .await + .with_context(|| format!("Failed to write data to file: {:?}", file_path.as_os_str()))?; } let digest = context.compute(); @@ -208,6 +252,9 @@ pub async fn write_stream_to_file( } } - file.flush().await?; + file + .flush() + .await + .with_context(|| format!("Failed to flush data to file: {:?}", file_path.as_os_str()))?; Ok(()) }