Skip to content

Commit

Permalink
fix: upload file zip file expire (#987)
Browse files Browse the repository at this point in the history
  • Loading branch information
appflowy authored Nov 13, 2024
1 parent 91b0c50 commit 50d519c
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 43 deletions.
100 changes: 58 additions & 42 deletions services/appflowy-worker/src/import_worker/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,37 +253,47 @@ async fn consume_task(
entry_id: String,
) -> Result<(), ImportError> {
if let ImportTask::Notion(task) = &mut import_task {
if let Some(created_at_timestamp) = task.created_at {
if is_task_expired(created_at_timestamp, task.last_process_at) {
if let Ok(import_record) = select_import_task(&context.pg_pool, &task.task_id).await {
handle_expired_task(
&mut context,
&import_record,
task,
stream_name,
group_name,
&entry_id,
)
.await?;
}
// If no created_at timestamp, proceed directly to processing
if task.created_at.is_none() {
return process_and_ack_task(context, import_task, stream_name, group_name, &entry_id).await;
}

return Ok(());
} else if !check_blob_existence(&context.s3_client, &task.s3_key).await? {
task.last_process_at = Some(Utc::now().timestamp());
trace!("[Import] {} file not found, re-add task", task.workspace_id);
re_add_task(
&mut context.redis_client,
// Check if the task is expired
if is_task_expired(task.created_at.unwrap(), task.last_process_at) {
if let Ok(import_record) = select_import_task(&context.pg_pool, &task.task_id).await {
handle_expired_task(
&mut context,
&import_record,
task,
stream_name,
group_name,
import_task,
&entry_id,
)
.await?;
return Ok(());
}
return Ok(());
}

// Check if the blob exists
if check_blob_existence(&context.s3_client, &task.s3_key).await? {
if task.last_process_at.is_none() {
task.last_process_at = Some(Utc::now().timestamp());
}
} else {
trace!("[Import] {} file not found, queue task", task.workspace_id);
push_task(
&mut context.redis_client,
stream_name,
group_name,
import_task,
&entry_id,
)
.await?;
return Ok(());
}
}

// Process and acknowledge the task
process_and_ack_task(context, import_task, stream_name, group_name, &entry_id).await
}

Expand Down Expand Up @@ -353,14 +363,10 @@ async fn process_and_ack_task(
result
}

fn is_task_expired(timestamp: i64, last_process_at: Option<i64>) -> bool {
if last_process_at.is_none() {
return false;
}

match DateTime::<Utc>::from_timestamp(timestamp, 0) {
fn is_task_expired(created_timestamp: i64, last_process_at: Option<i64>) -> bool {
match DateTime::<Utc>::from_timestamp(created_timestamp, 0) {
None => {
info!("[Import] failed to parse timestamp: {}", timestamp);
info!("[Import] failed to parse timestamp: {}", created_timestamp);
true
},
Some(created_at) => {
Expand All @@ -374,15 +380,28 @@ fn is_task_expired(timestamp: i64, last_process_at: Option<i64>) -> bool {
}

let elapsed = now - created_at;
let minutes = get_env_var("APPFLOWY_WORKER_IMPORT_TASK_EXPIRE_MINUTES", "20")
let hours = get_env_var("APPFLOWY_WORKER_IMPORT_TASK_PROCESS_EXPIRE_HOURS", "6")
.parse::<i64>()
.unwrap_or(20);
.unwrap_or(6);

if elapsed.num_hours() >= hours {
return true;
}

if last_process_at.is_none() {
return false;
}

let elapsed = now - created_at;
let minutes = get_env_var("APPFLOWY_WORKER_IMPORT_TASK_EXPIRE_MINUTES", "30")
.parse::<i64>()
.unwrap_or(30);
elapsed.num_minutes() >= minutes
},
}
}

async fn re_add_task(
async fn push_task(
redis_client: &mut ConnectionManager,
stream_name: &str,
group_name: &str,
Expand Down Expand Up @@ -494,6 +513,14 @@ async fn process_task(
remove_workspace(&task.workspace_id, &context.pg_pool).await;
}

match fs::remove_dir_all(&unzip_dir_path).await {
Ok(_) => info!(
"[Import]: {} deleted unzip file: {:?}",
task.workspace_id, unzip_dir_path
),
Err(err) => error!("Failed to delete unzip file: {:?}", err),
}

clean_up(&context.s3_client, &task).await;
notify_user(&task, result, context.notifier, &context.metrics).await?;
},
Expand Down Expand Up @@ -1067,17 +1094,6 @@ async fn process_unzip_file(
batch_upload_files_to_s3(&import_task.workspace_id, s3_client, upload_resources)
.await
.map_err(|err| ImportError::Internal(anyhow!("Failed to upload files to S3: {:?}", err)))?;

// 10. delete zip file regardless of success or failure
match fs::remove_dir_all(unzip_dir_path).await {
Ok(_) => trace!(
"[Import]: {} deleted unzip file: {:?}",
import_task.workspace_id,
unzip_dir_path
),
Err(err) => error!("Failed to delete unzip file: {:?}", err),
}

Ok(())
}

Expand Down
4 changes: 3 additions & 1 deletion tests/search/document_search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,7 @@ async fn test_document_indexing_and_search() {
assert_eq!(search_resp.len(), 1);
let item = &search_resp[0];
assert_eq!(item.object_id, object_id);
assert_eq!(item.preview.as_deref(), Some("\nWelcome to AppFlowy"));

let preview = item.preview.clone().unwrap();
assert!(preview.contains("Welcome to AppFlowy"));
}

0 comments on commit 50d519c

Please sign in to comment.