Skip to content

Commit

Permalink
make export worker use independent storage tracking (#30797)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 52a773a5a2c971f91c7cb718ab3c5fdd91ba7f00
  • Loading branch information
rrwang7 authored and Convex, Inc. committed Oct 17, 2024
1 parent 3e68485 commit 1dbda04
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 24 deletions.
68 changes: 44 additions & 24 deletions crates/application/src/export_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ use model::{
types::{
Export,
ExportFormat,
ExportRequestor,
},
ExportsModel,
},
Expand Down Expand Up @@ -288,6 +289,7 @@ impl<RT: Runtime> ExportWorker<RT> {
&mut self,
format: ExportFormat,
component: ComponentId,
requestor: ExportRequestor,
) -> anyhow::Result<(Timestamp, ObjectKey, FunctionUsageTracker)> {
tracing::info!("Beginning snapshot export...");
let storage = &self.storage;
Expand Down Expand Up @@ -346,6 +348,7 @@ impl<RT: Runtime> ExportWorker<RT> {
system_tables,
include_storage,
usage.clone(),
requestor,
);
let (_, ()) = try_join!(uploader, zipper)?;
let zip_object_key = upload.complete().await?;
Expand All @@ -367,6 +370,7 @@ impl<RT: Runtime> ExportWorker<RT> {
system_tables: &BTreeMap<(TableNamespace, TableName), TabletId>,
include_storage: bool,
usage: FunctionUsageTracker,
requestor: ExportRequestor,
) -> anyhow::Result<()> {
let namespace: TableNamespace = component_tree.id.into();
let component_path = component_ids_to_paths
Expand Down Expand Up @@ -477,19 +481,18 @@ impl<RT: Runtime> ExportWorker<RT> {
.as_ref()
.map(|ct| ct.parse())
.transpose()?;
usage
.track_storage_call(
component_path.clone(),
"snapshot_export",
file_storage_entry.storage_id.clone(),
content_type,
file_storage_entry.sha256.clone(),
)
.track_storage_egress_size(
component_path.clone(),
"snapshot_export".to_string(),
file_stream.content_length as u64,
);
usage.track_storage_call(
component_path.clone(),
"snapshot_export",
file_storage_entry.storage_id.clone(),
content_type,
file_storage_entry.sha256.clone(),
);
self.usage_tracking.track_independent_storage_egress_size(
component_path.clone(),
requestor.usage_tag(),
file_stream.content_length as u64,
);
zip_snapshot_upload
.stream_full_file(path, file_stream.stream)
.await?;
Expand Down Expand Up @@ -552,6 +555,7 @@ impl<RT: Runtime> ExportWorker<RT> {
system_tables,
include_storage,
usage.clone(),
requestor,
)
.await?;
}
Expand All @@ -570,6 +574,7 @@ impl<RT: Runtime> ExportWorker<RT> {
system_tables: BTreeMap<(TableNamespace, TableName), TabletId>,
include_storage: bool,
usage: FunctionUsageTracker,
requestor: ExportRequestor,
) -> anyhow::Result<()> {
let mut zip_snapshot_upload = ZipSnapshotUpload::new(&mut writer).await?;

Expand All @@ -584,6 +589,7 @@ impl<RT: Runtime> ExportWorker<RT> {
&system_tables,
include_storage,
usage,
requestor,
)
.await?;

Expand All @@ -597,21 +603,34 @@ impl<RT: Runtime> ExportWorker<RT> {
&mut self,
export: ParsedDocument<Export>,
) -> anyhow::Result<()> {
let (ts, object_keys, usage) = self
.export_inner(export.format(), export.component())
let (ts, object_key, usage) = self
.export_inner(export.format(), export.component(), export.requestor())
.await?;

let mut tx = self.database.begin(Identity::system()).await?;
let completed_export =
(*export)
.clone()
.completed(ts, *tx.begin_timestamp(), object_keys)?;
.completed(ts, *tx.begin_timestamp(), object_key.clone())?;
SystemMetadataModel::new_global(&mut tx)
.replace(export.id(), completed_export.try_into()?)
.await?;
self.database
.commit_with_write_source(tx, "export_worker_mark_complete")
.await?;
let object_attributes = self
.storage
.get_object_attributes(&object_key)
.await?
.context("error getting export object attributes from S3")?;

// Charge file bandwidth for the upload of the snapshot to exports storage
self.usage_tracking.track_independent_storage_ingress_size(
ComponentPath::root(),
export.requestor().usage_tag(),
object_attributes.size,
);
// Charge database bandwidth accumulated during the export
self.usage_tracking.track_call(
UdfIdentifier::Cli("export".to_string()),
ExecutionId::new(),
Expand Down Expand Up @@ -809,7 +828,10 @@ mod tests {
use headers::ContentType;
use keybroker::Identity;
use model::{
exports::types::ExportFormat,
exports::types::{
ExportFormat,
ExportRequestor,
},
file_storage::types::FileStorageEntry,
test_helpers::DbFixturesWithModel,
};
Expand Down Expand Up @@ -932,6 +954,7 @@ mod tests {
include_storage: true,
},
ComponentId::Root,
ExportRequestor::SnapshotExport,
)
.await?;

Expand Down Expand Up @@ -1046,6 +1069,7 @@ mod tests {
include_storage: false,
},
ComponentId::Root,
ExportRequestor::SnapshotExport,
)
.await?;

Expand Down Expand Up @@ -1106,6 +1130,7 @@ mod tests {
include_storage: false,
},
child_component,
ExportRequestor::SnapshotExport,
)
.await?;

Expand Down Expand Up @@ -1199,6 +1224,7 @@ mod tests {
include_storage: true,
},
ComponentId::Root,
ExportRequestor::SnapshotExport,
)
.await?;

Expand All @@ -1224,13 +1250,6 @@ mod tests {

let usage = usage.gather_user_stats();
assert!(usage.database_egress_size.is_empty());
assert_eq!(
*usage
.storage_egress_size
.get(&ComponentPath::test_user())
.unwrap(),
3
);

Ok(())
}
Expand Down Expand Up @@ -1268,6 +1287,7 @@ mod tests {
include_storage: false,
},
ComponentId::test_user(),
ExportRequestor::SnapshotExport,
)
.await?;
Ok(())
Expand Down
18 changes: 18 additions & 0 deletions crates/model/src/exports/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,15 @@ impl Export {
| Export::Failed { component, .. } => *component,
}
}

pub fn requestor(&self) -> ExportRequestor {
match self {
Export::Requested { requestor, .. }
| Export::InProgress { requestor, .. }
| Export::Completed { requestor, .. }
| Export::Failed { requestor, .. } => *requestor,
}
}
}

#[derive(Copy, Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -283,6 +292,15 @@ pub enum ExportRequestor {
CloudBackup,
}

impl ExportRequestor {
pub fn usage_tag(&self) -> String {
match self {
Self::SnapshotExport => "snapshot_export".to_string(),
Self::CloudBackup => "cloud_backup".to_string(),
}
}
}

impl Export {
pub fn requested(
format: ExportFormat,
Expand Down
28 changes: 28 additions & 0 deletions crates/usage_tracking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,34 @@ impl UsageCounter {
pub fn new(usage_logger: Arc<dyn UsageEventLogger>) -> Self {
Self { usage_logger }
}

// Used for tracking storage ingress outside of a user function (e.g. snapshot
// import/export).
pub fn track_independent_storage_ingress_size(
&self,
component_path: ComponentPath,
tag: String,
ingress_size: u64,
) {
let independent_tracker =
IndependentStorageCallTracker::new(ExecutionId::new(), self.usage_logger.clone());

independent_tracker.track_storage_ingress_size(component_path, tag, ingress_size);
}

// Used for tracking storage egress outside of a user function (e.g. snapshot
// import/export).
pub fn track_independent_storage_egress_size(
&self,
component_path: ComponentPath,
tag: String,
egress_size: u64,
) {
let independent_tracker =
IndependentStorageCallTracker::new(ExecutionId::new(), self.usage_logger.clone());

independent_tracker.track_storage_egress_size(component_path, tag, egress_size);
}
}

pub enum CallType {
Expand Down

0 comments on commit 1dbda04

Please sign in to comment.