diff --git a/crates/worker/src/partition/snapshots/repository.rs b/crates/worker/src/partition/snapshots/repository.rs index 0e27b6e72..d84cd5474 100644 --- a/crates/worker/src/partition/snapshots/repository.rs +++ b/crates/worker/src/partition/snapshots/repository.rs @@ -100,7 +100,11 @@ pub struct LatestSnapshot { } impl LatestSnapshot { - pub fn from_snapshot(snapshot: &PartitionSnapshotMetadata, path: String) -> Self { + /// `relative_path` is the path to the snapshot data relative to the partition's snapshots prefix. + pub fn from_snapshot( + snapshot: &PartitionSnapshotMetadata, + unique_snapshot_path: String, + ) -> Self { LatestSnapshot { version: snapshot.version, cluster_name: snapshot.cluster_name.clone(), @@ -109,7 +113,7 @@ impl LatestSnapshot { snapshot_id: snapshot.snapshot_id, created_at: snapshot.created_at.clone(), min_applied_lsn: snapshot.min_applied_lsn, - path, + path: unique_snapshot_path, } } } @@ -208,29 +212,19 @@ impl SnapshotRepository { snapshot: &PartitionSnapshotMetadata, local_snapshot_path: &Path, ) -> Result<(), PutSnapshotError> { - // A unique snapshot path within the partition prefix. We pad the LSN to ensure correct - // lexicographic sorting. - let snapshot_prefix = Self::get_snapshot_prefix(snapshot); - let full_snapshot_path = format!( - "{prefix}{partition_id}/{snapshot_prefix}", - prefix = self.prefix, - partition_id = snapshot.partition_id, - ); - + let snapshot_path = self.get_snapshot_complete_prefix(snapshot); debug!( "Uploading snapshot from {:?} to {}", - local_snapshot_path, full_snapshot_path + local_snapshot_path, snapshot_path ); - let mut progress = SnapshotUploadProgress::with_snapshot_path(full_snapshot_path.clone()); + let mut progress = SnapshotUploadProgress::with_snapshot_path(snapshot_path.clone()); let mut buf = BytesMut::new(); for file in &snapshot.files { let filename = file.name.trim_start_matches("/"); - let key = object_store::path::Path::from(format!( - "{}/{}", - full_snapshot_path.as_str(), - filename - )); + let key = object_store::path::Path::from( + self.get_snapshot_file_full_path(snapshot, filename), + ); let put_result = put_snapshot_object( local_snapshot_path.join(filename).as_path(), @@ -249,10 +243,9 @@ impl SnapshotRepository { progress.push(file.name.clone()); } - let metadata_key = object_store::path::Path::from(format!( - "{}/metadata.json", - full_snapshot_path.as_str() - )); + let metadata_key = object_store::path::Path::from( + self.get_snapshot_file_full_path(snapshot, "metadata.json"), + ); let metadata_json_payload = PutPayload::from( serde_json::to_string_pretty(snapshot).expect("Can always serialize JSON"), ); @@ -270,11 +263,9 @@ impl SnapshotRepository { "Successfully published snapshot metadata", ); - let latest_path = object_store::path::Path::from(format!( - "{prefix}{partition_id}/latest.json", - prefix = self.prefix, - partition_id = snapshot.partition_id, - )); + let latest_path = object_store::path::Path::from( + self.get_partition_latest_snapshot_pointer_path(snapshot.partition_id), + ); // By performing a CAS on the latest snapshot pointer, we can ensure strictly monotonic updates. let maybe_stored = self @@ -293,7 +284,8 @@ impl SnapshotRepository { return Ok(()); } - let latest = LatestSnapshot::from_snapshot(snapshot, snapshot_prefix); + let latest = + LatestSnapshot::from_snapshot(snapshot, Self::get_snapshot_id_component(snapshot)); let latest = PutPayload::from( serde_json::to_string_pretty(&latest) .map_err(|e| PutSnapshotError::from(e, progress.clone()))?, @@ -329,14 +321,6 @@ impl SnapshotRepository { Ok(()) } - fn get_snapshot_prefix(snapshot: &PartitionSnapshotMetadata) -> String { - format!( - "lsn_{lsn:020}-{snapshot_id}", - lsn = snapshot.min_applied_lsn, - snapshot_id = snapshot.snapshot_id - ) - } - /// Discover and download the latest snapshot available. It is the caller's responsibility /// to delete the snapshot directory when it is no longer needed. #[instrument( @@ -349,11 +333,9 @@ impl SnapshotRepository { &self, partition_id: PartitionId, ) -> anyhow::Result> { - let latest_path = object_store::path::Path::from(format!( - "{prefix}{partition_id}/latest.json", - prefix = self.prefix, - partition_id = partition_id, - )); + let latest_path = object_store::path::Path::from( + self.get_partition_latest_snapshot_pointer_path(partition_id), + ); let latest = self.object_store.get(&latest_path).await; @@ -370,10 +352,10 @@ impl SnapshotRepository { debug!("Latest snapshot metadata: {:?}", latest); let snapshot_metadata_path = object_store::path::Path::from(format!( - "{prefix}{partition_id}/{path}/metadata.json", + "{prefix}{partition_id}/{snapshot_path}/metadata.json", prefix = self.prefix, partition_id = partition_id, - path = latest.path, + snapshot_path = latest.path, )); let snapshot_metadata = self.object_store.get(&snapshot_metadata_path).await; @@ -425,10 +407,10 @@ impl SnapshotRepository { let filename = file.name.trim_start_matches("/"); let expected_size = file.size; let key = object_store::path::Path::from(format!( - "{prefix}{partition_id}/{path}/{filename}", + "{prefix}{partition_id}/{snapshot_path}/{filename}", prefix = self.prefix, partition_id = partition_id, - path = latest.path, + snapshot_path = latest.path, filename = filename, )); let file_path = snapshot_dir.path().join(filename); @@ -546,6 +528,57 @@ impl SnapshotRepository { } } } + + /// Construct the complete snapshot prefix from the base of the repository bucket. + fn get_snapshot_complete_prefix( + &self, + snapshot_metadata: &PartitionSnapshotMetadata, + ) -> String { + format!( + "{partition_prefix}/{snapshot_path}", + partition_prefix = self.get_partition_snapshots_prefix(snapshot_metadata.partition_id), + snapshot_path = Self::get_snapshot_id_component(snapshot_metadata) + ) + } + + /// Construct the full path to a specific file that is a part of a given snapshot. + fn get_snapshot_file_full_path( + &self, + snapshot_metadata: &PartitionSnapshotMetadata, + filename: &str, + ) -> String { + format!( + "{snapshot_complete_prefix}/{filename}", + snapshot_complete_prefix = self.get_snapshot_complete_prefix(snapshot_metadata), + ) + } + + /// Construct the full path to the latest snapshot pointer for a given partition. + fn get_partition_latest_snapshot_pointer_path(&self, partition_id: PartitionId) -> String { + format!( + "{partition_prefix}/latest.json", + partition_prefix = self.get_partition_snapshots_prefix(partition_id) + ) + } + + /// Construct the snapshots prefix relative to the repository bucket for a given partition's snapshots. + fn get_partition_snapshots_prefix(&self, partition_id: PartitionId) -> String { + format!( + "{prefix}{partition_id}", + prefix = self.prefix, + partition_id = partition_id, + ) + } + + /// Construct the unique path component for a snapshot, e.g. `lsn_00001234-snap_abc123`. + fn get_snapshot_id_component(snapshot: &PartitionSnapshotMetadata) -> String { + // We zero-pad the LSN to ensure correct lexicographic sorting + format!( + "lsn_{lsn:020}-{snapshot_id}", + lsn = snapshot.min_applied_lsn, + snapshot_id = snapshot.snapshot_id + ) + } } async fn create_object_store_client(destination: Url) -> anyhow::Result> { @@ -590,14 +623,14 @@ async fn create_object_store_client(destination: Url) -> anyhow::Result, } impl SnapshotUploadProgress { - fn with_snapshot_path(full_snapshot_path: String) -> Self { + fn with_snapshot_path(snapshot_complete_path: String) -> Self { SnapshotUploadProgress { - full_snapshot_path, + snapshot_complete_path, uploaded_files: vec![], } } @@ -620,7 +653,7 @@ impl PutSnapshotError { { PutSnapshotError { error: error.into(), - full_snapshot_path: progress.full_snapshot_path, + full_snapshot_path: progress.snapshot_complete_path, uploaded_files: progress.uploaded_files, } } @@ -861,7 +894,7 @@ mod tests { repository.put(&snapshot1, source_dir.clone()).await?; - let snapshot_prefix = SnapshotRepository::get_snapshot_prefix(&snapshot1); + let snapshot_prefix = SnapshotRepository::get_snapshot_id_component(&snapshot1); let data = object_store .get(&Path::from(format!( "{}/{}/{}/data.sst", @@ -917,7 +950,7 @@ mod tests { assert_eq!( LatestSnapshot::from_snapshot( &snapshot2, - SnapshotRepository::get_snapshot_prefix(&snapshot2) + SnapshotRepository::get_snapshot_id_component(&snapshot2) ), latest );