Skip to content

Commit

Permalink
cancel import that is parsing for >24 hours (#30378)
Browse files Browse the repository at this point in the history
we currently only fail imports that are InProgress for 24 hours. if it's stuck in Uploaded state, we don't stop it. Let's do that.
Also make the age a knob.

GitOrigin-RevId: dcd03e1951cab3feef5cb28b501ae533e89bf1fe
  • Loading branch information
ldanilek authored and Convex, Inc. committed Oct 5, 2024
1 parent 1d6add9 commit f6bb7d6
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 10 deletions.
25 changes: 15 additions & 10 deletions crates/application/src/snapshot_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use common::{
errors::report_error,
execution_context::ExecutionId,
knobs::{
MAX_IMPORT_AGE,
TRANSACTION_MAX_NUM_USER_WRITES,
TRANSACTION_MAX_USER_WRITE_SIZE_BYTES,
},
Expand Down Expand Up @@ -193,10 +194,6 @@ static IMPORT_SIZE_LIMIT: LazyLock<String> =

const INITIAL_BACKOFF: Duration = Duration::from_secs(1);
const MAX_BACKOFF: Duration = Duration::from_secs(60);
// If an import is taking longer than a day, it's a problem (and our fault).
// But the customer is probably no longer waiting so we should fail the import.
// If an import takes more than a week, the file may be deleted from S3.
pub const MAX_IMPORT_AGE: Duration = Duration::from_secs(24 * 60 * 60);

pub struct SnapshotImportWorker<RT: Runtime> {
runtime: RT,
Expand Down Expand Up @@ -295,6 +292,7 @@ impl<RT: Runtime> SnapshotImportWorker<RT> {
anyhow::bail!("unexpected state {snapshot_import:?}");
},
}
self.fail_if_too_old(&snapshot_import)?;
match self.info_message_for_import(snapshot_import).await {
Ok((info_message, require_manual_confirmation, new_checkpoints)) => {
self.database
Expand Down Expand Up @@ -673,29 +671,36 @@ impl<RT: Runtime> SnapshotImportWorker<RT> {
Ok(())
}

async fn attempt_perform_import(
&mut self,
snapshot_import: ParsedDocument<SnapshotImport>,
) -> anyhow::Result<(Timestamp, u64)> {
fn fail_if_too_old(
&self,
snapshot_import: &ParsedDocument<SnapshotImport>,
) -> anyhow::Result<()> {
if let Some(creation_time) = snapshot_import.creation_time() {
let now = CreationTime::try_from(*self.database.now_ts_for_reads())?;
let age = Duration::from_millis((f64::from(now) - f64::from(creation_time)) as u64);
log_snapshot_import_age(age);
if age > MAX_IMPORT_AGE / 2 {
if age > *MAX_IMPORT_AGE / 2 {
tracing::warn!(
"SnapshotImport {} running too long ({:?})",
snapshot_import.id(),
age
);
}
if age > MAX_IMPORT_AGE {
if age > *MAX_IMPORT_AGE {
anyhow::bail!(ErrorMetadata::bad_request(
"ImportFailed",
"Import took too long. Try again or contact Convex."
));
}
}
Ok(())
}

async fn attempt_perform_import(
&mut self,
snapshot_import: ParsedDocument<SnapshotImport>,
) -> anyhow::Result<(Timestamp, u64)> {
self.fail_if_too_old(&snapshot_import)?;
let (initial_schemas, objects) = self.parse_import(snapshot_import.id()).await?;

let usage = FunctionUsageTracker::new();
Expand Down
6 changes: 6 additions & 0 deletions crates/common/src/knobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1148,3 +1148,9 @@ pub static MIGRATION_REWRITE_BATCH_SIZE: LazyLock<usize> =
/// Fraction that represents the percentage of HTTP actions to execute in FunRun
pub static EXECUTE_HTTP_ACTIONS_IN_FUNRUN: LazyLock<f64> =
LazyLock::new(|| env_config("EXECUTE_HTTP_ACTIONS_IN_FUNRUN", 0.0));

/// If an import is taking longer than a day, it's a problem (and our fault).
/// But the customer is probably no longer waiting so we should fail the import.
/// If an import takes more than a week, the file may be deleted from S3.
pub static MAX_IMPORT_AGE: LazyLock<Duration> =
LazyLock::new(|| Duration::from_secs(env_config("MAX_IMPORT_AGE_SECONDS", 7 * 24 * 60 * 60)));

0 comments on commit f6bb7d6

Please sign in to comment.