Skip to content

Commit

Permalink
Read import objects directly from their source bucket without copying…
Browse files Browse the repository at this point in the history
… (#33635)

GitOrigin-RevId: 7d36a00428ecec1073f26fa62a9832b613b1e530
  • Loading branch information
goffrie authored and Convex, Inc. committed Jan 24, 2025
1 parent 7bbca3a commit 558bde3
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 107 deletions.
11 changes: 8 additions & 3 deletions crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2026,13 +2026,16 @@ impl<RT: Runtime> Application<RT> {
.snapshot_imports_storage
.finish_client_driven_upload(upload_token, part_tokens)
.await?;
let fq_key = self
.snapshot_imports_storage
.fully_qualified_key(&object_key);
start_stored_import(
self,
identity,
format,
mode,
component_path,
object_key,
fq_key,
ImportRequestor::SnapshotImport,
)
.await
Expand All @@ -2041,14 +2044,16 @@ impl<RT: Runtime> Application<RT> {
pub async fn upload_snapshot_import(
&self,
body_stream: BoxStream<'_, anyhow::Result<Bytes>>,
) -> anyhow::Result<ObjectKey> {
) -> anyhow::Result<FullyQualifiedObjectKey> {
let mut upload: Box<BufferedUpload> = self.snapshot_imports_storage.start_upload().await?;
// unclear why this reassignment is necessary
let mut body_stream = body_stream;
upload.try_write_parallel(&mut body_stream).await?;
drop(body_stream);
let object_key = upload.complete().await?;
Ok(object_key)
Ok(self
.snapshot_imports_storage
.fully_qualified_key(&object_key))
}

#[fastrace::trace]
Expand Down
61 changes: 27 additions & 34 deletions crates/application/src/snapshot_import/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use common::{
types::{
FullyQualifiedObjectKey,
MemberId,
ObjectKey,
TableName,
UdfIdentifier,
},
Expand Down Expand Up @@ -346,11 +345,19 @@ impl<RT: Runtime> SnapshotImportExecutor<RT> {
make_audit_log_event(&self.database, &table_mapping_for_import, &snapshot_import)
.await?;

let object_attributes = self
.snapshot_imports_storage
.get_object_attributes(&snapshot_import.object_key)
.await?
.context("error getting export object attributes from S3")?;
let object_attributes = (match &snapshot_import.object_key {
Ok(key) => {
self.snapshot_imports_storage
.get_fq_object_attributes(key)
.await
},
Err(key) => {
self.snapshot_imports_storage
.get_object_attributes(key)
.await
},
})?
.context("error getting export object attributes from S3")?;

// Charge file bandwidth for the download of the snapshot from imports storage
usage.track_storage_egress_size(
Expand Down Expand Up @@ -396,7 +403,16 @@ impl<RT: Runtime> SnapshotImportExecutor<RT> {
};
let body_stream = move || {
let object_key = object_key.clone();
async move { self.snapshot_imports_storage.get_reader(&object_key).await }
async move {
match object_key {
Ok(key) => {
self.snapshot_imports_storage
.get_fq_object_reader(&key)
.await
},
Err(key) => self.snapshot_imports_storage.get_reader(&key).await,
}
}
};
let objects = parse_objects(format.clone(), component_path.clone(), body_stream).boxed();

Expand Down Expand Up @@ -431,7 +447,7 @@ pub async fn start_stored_import<RT: Runtime>(
format: ImportFormat,
mode: ImportMode,
component_path: ComponentPath,
object_key: ObjectKey,
fq_object_key: FullyQualifiedObjectKey,
requestor: ImportRequestor,
) -> anyhow::Result<DeveloperDocumentId> {
if !(identity.is_admin() || identity.is_system()) {
Expand All @@ -451,7 +467,7 @@ pub async fn start_stored_import<RT: Runtime>(
format.clone(),
mode,
component_path.clone(),
object_key.clone(),
fq_object_key.clone(),
requestor.clone(),
)
.await
Expand Down Expand Up @@ -583,44 +599,21 @@ pub async fn do_import<RT: Runtime>(
.await
}

pub async fn do_import_from_fully_qualified_export<RT: Runtime>(
pub async fn do_import_from_object_key<RT: Runtime>(
application: &Application<RT>,
identity: Identity,
format: ImportFormat,
mode: ImportMode,
component_path: ComponentPath,
export_object_key: FullyQualifiedObjectKey,
) -> anyhow::Result<u64> {
let import_object_key: ObjectKey = application
.snapshot_imports_storage
.copy_object(export_object_key)
.await?;
do_import_from_object_key(
application,
identity,
format,
mode,
component_path,
import_object_key,
)
.await
}

async fn do_import_from_object_key<RT: Runtime>(
application: &Application<RT>,
identity: Identity,
format: ImportFormat,
mode: ImportMode,
component_path: ComponentPath,
object_key: ObjectKey,
) -> anyhow::Result<u64> {
let import_id = start_stored_import(
application,
identity.clone(),
format,
mode,
component_path,
object_key,
export_object_key,
ImportRequestor::SnapshotImport,
)
.await?;
Expand Down
31 changes: 5 additions & 26 deletions crates/application/src/snapshot_import/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use common::{
IndexDescriptor,
IndexName,
MemberId,
ObjectKey,
},
value::ConvexValue,
};
Expand Down Expand Up @@ -825,17 +824,13 @@ async fn import_zip_flip_table_number(rt: TestRuntime) -> anyhow::Result<()> {
ufm.insert(table_name1.clone(), assert_obj!()).await?;
app.commit_test(tx).await?;

let import_object_key: ObjectKey = app
.snapshot_imports_storage
.copy_object(export_object_key.clone())
.await?;
let rows_written = do_import_from_object_key(
&app,
identity.clone(),
ImportFormat::Zip,
mode,
ComponentPath::root(),
import_object_key,
export_object_key.clone(),
)
.await;
tracing::info!("Imported in test for {mode}");
Expand Down Expand Up @@ -879,17 +874,13 @@ async fn import_zip_to_clone_of_deployment(rt: TestRuntime) -> anyhow::Result<()
ufm.insert(table_name2.clone(), assert_obj!()).await?;
app.commit_test(tx).await?;

let import_object_key: ObjectKey = app
.snapshot_imports_storage
.copy_object(export_object_key.clone())
.await?;
let rows_written = do_import_from_object_key(
&app,
identity.clone(),
ImportFormat::Zip,
mode,
ComponentPath::root(),
import_object_key,
export_object_key.clone(),
)
.await;
tracing::info!("Imported in test for {mode}");
Expand Down Expand Up @@ -937,17 +928,13 @@ async fn import_zip_to_deployment_with_unrelated_tables(rt: TestRuntime) -> anyh
ufm.insert(table_name4.clone(), assert_obj!()).await?;
app.commit_test(tx).await?;

let import_object_key: ObjectKey = app
.snapshot_imports_storage
.copy_object(export_object_key.clone())
.await?;
let rows_written = do_import_from_object_key(
&app,
identity.clone(),
ImportFormat::Zip,
mode,
ComponentPath::root(),
import_object_key,
export_object_key.clone(),
)
.await;
tracing::info!("Imported in test for {mode}");
Expand Down Expand Up @@ -983,17 +970,13 @@ async fn import_zip_to_empty(rt: TestRuntime) -> anyhow::Result<()> {
(ImportMode::RequireEmpty, true),
] {
let app = Application::new_for_tests(&rt).await?;
let import_object_key: ObjectKey = app
.snapshot_imports_storage
.copy_object(export_object_key.clone())
.await?;
let rows_written = do_import_from_object_key(
&app,
identity.clone(),
ImportFormat::Zip,
mode,
ComponentPath::root(),
import_object_key,
export_object_key.clone(),
)
.await;
tracing::info!("Imported in test for {mode}");
Expand Down Expand Up @@ -1028,17 +1011,13 @@ async fn import_zip_to_same_deployment(rt: TestRuntime) -> anyhow::Result<()> {
app.commit_test(tx).await?;
let export_object_key = app.export_and_wait().await?;

let import_object_key: ObjectKey = app
.snapshot_imports_storage
.copy_object(export_object_key.clone())
.await?;
let rows_written = do_import_from_object_key(
&app,
identity.clone(),
ImportFormat::Zip,
mode,
ComponentPath::root(),
import_object_key,
export_object_key.clone(),
)
.await;
tracing::info!("Imported in test for {mode}");
Expand Down
15 changes: 14 additions & 1 deletion crates/common/src/types/object_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct ObjectKey(

/// Fully qualified object key. For s3, in the format
/// {bucket}/{prefix}-{object_key}
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
#[derive(
Debug,
Clone,
Expand All @@ -41,7 +42,19 @@ pub struct ObjectKey(
derive_more::From,
derive_more::Into,
)]
pub struct FullyQualifiedObjectKey(String);
pub struct FullyQualifiedObjectKey(
#[cfg_attr(
any(test, feature = "testing"),
proptest(strategy = "\"[a-zA-Z0-9-_.]+/[a-zA-Z0-9-_./]+\"")
)]
String,
);

impl FullyQualifiedObjectKey {
pub fn as_str(&self) -> &str {
&self.0
}
}

impl TryFrom<ObjectKey> for ConvexString {
type Error = anyhow::Error;
Expand Down
2 changes: 1 addition & 1 deletion crates/file_storage/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl<RT: Runtime> TransactionalFileStorage<RT> {

let storage_get_stream = self
.storage
.get_range(&storage_key.to_string().try_into()?, bytes_range)
.get_range(&storage_key, bytes_range)
.await?
.with_context(|| format!("object {storage_key:?} not found"))?;
let content_range = (size != 0)
Expand Down
8 changes: 4 additions & 4 deletions crates/model/src/snapshot_imports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use common::{
Query,
},
runtime::Runtime,
types::ObjectKey,
types::FullyQualifiedObjectKey,
};
use database::{
patch_value,
Expand Down Expand Up @@ -109,15 +109,15 @@ impl<'a, RT: Runtime> SnapshotImportModel<'a, RT> {
format: ImportFormat,
mode: ImportMode,
component_path: ComponentPath,
object_key: ObjectKey,
object_key: FullyQualifiedObjectKey,
requestor: ImportRequestor,
) -> anyhow::Result<ResolvedDocumentId> {
let snapshot_import = SnapshotImport {
state: ImportState::Uploaded,
format,
mode,
component_path,
object_key,
object_key: Ok(object_key),
member_id: self.tx.identity().member_id(),
checkpoints: None,
requestor,
Expand Down Expand Up @@ -444,7 +444,7 @@ mod tests {
ImportFormat::Zip,
ImportMode::Replace,
ComponentPath::root(),
"objectkey".try_into()?,
"objectkey".to_string().into(),
ImportRequestor::SnapshotImport,
)
.await?;
Expand Down
Loading

0 comments on commit 558bde3

Please sign in to comment.