Skip to content

Commit

Permalink
Extract common object store key builder helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Dec 17, 2024
1 parent 3da5356 commit 269f4f1
Showing 1 changed file with 84 additions and 51 deletions.
135 changes: 84 additions & 51 deletions crates/worker/src/partition/snapshots/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ pub struct LatestSnapshot {
}

impl LatestSnapshot {
pub fn from_snapshot(snapshot: &PartitionSnapshotMetadata, path: String) -> Self {
/// `relative_path` is the path to the snapshot data relative to the partition's snapshots prefix.
pub fn from_snapshot(
snapshot: &PartitionSnapshotMetadata,
unique_snapshot_path: String,
) -> Self {
LatestSnapshot {
version: snapshot.version,
cluster_name: snapshot.cluster_name.clone(),
Expand All @@ -109,7 +113,7 @@ impl LatestSnapshot {
snapshot_id: snapshot.snapshot_id,
created_at: snapshot.created_at.clone(),
min_applied_lsn: snapshot.min_applied_lsn,
path,
path: unique_snapshot_path,
}
}
}
Expand Down Expand Up @@ -208,29 +212,19 @@ impl SnapshotRepository {
snapshot: &PartitionSnapshotMetadata,
local_snapshot_path: &Path,
) -> Result<(), PutSnapshotError> {
// A unique snapshot path within the partition prefix. We pad the LSN to ensure correct
// lexicographic sorting.
let snapshot_prefix = Self::get_snapshot_prefix(snapshot);
let full_snapshot_path = format!(
"{prefix}{partition_id}/{snapshot_prefix}",
prefix = self.prefix,
partition_id = snapshot.partition_id,
);

let snapshot_path = self.get_snapshot_complete_prefix(snapshot);
debug!(
"Uploading snapshot from {:?} to {}",
local_snapshot_path, full_snapshot_path
local_snapshot_path, snapshot_path
);

let mut progress = SnapshotUploadProgress::with_snapshot_path(full_snapshot_path.clone());
let mut progress = SnapshotUploadProgress::with_snapshot_path(snapshot_path.clone());
let mut buf = BytesMut::new();
for file in &snapshot.files {
let filename = file.name.trim_start_matches("/");
let key = object_store::path::Path::from(format!(
"{}/{}",
full_snapshot_path.as_str(),
filename
));
let key = object_store::path::Path::from(
self.get_snapshot_file_full_path(snapshot, filename),
);

let put_result = put_snapshot_object(
local_snapshot_path.join(filename).as_path(),
Expand All @@ -249,10 +243,9 @@ impl SnapshotRepository {
progress.push(file.name.clone());
}

let metadata_key = object_store::path::Path::from(format!(
"{}/metadata.json",
full_snapshot_path.as_str()
));
let metadata_key = object_store::path::Path::from(
self.get_snapshot_file_full_path(snapshot, "metadata.json"),
);
let metadata_json_payload = PutPayload::from(
serde_json::to_string_pretty(snapshot).expect("Can always serialize JSON"),
);
Expand All @@ -270,11 +263,9 @@ impl SnapshotRepository {
"Successfully published snapshot metadata",
);

let latest_path = object_store::path::Path::from(format!(
"{prefix}{partition_id}/latest.json",
prefix = self.prefix,
partition_id = snapshot.partition_id,
));
let latest_path = object_store::path::Path::from(
self.get_partition_latest_snapshot_pointer_path(snapshot.partition_id),
);

// By performing a CAS on the latest snapshot pointer, we can ensure strictly monotonic updates.
let maybe_stored = self
Expand All @@ -293,7 +284,8 @@ impl SnapshotRepository {
return Ok(());
}

let latest = LatestSnapshot::from_snapshot(snapshot, snapshot_prefix);
let latest =
LatestSnapshot::from_snapshot(snapshot, Self::get_snapshot_id_component(snapshot));
let latest = PutPayload::from(
serde_json::to_string_pretty(&latest)
.map_err(|e| PutSnapshotError::from(e, progress.clone()))?,
Expand Down Expand Up @@ -329,14 +321,6 @@ impl SnapshotRepository {
Ok(())
}

fn get_snapshot_prefix(snapshot: &PartitionSnapshotMetadata) -> String {
format!(
"lsn_{lsn:020}-{snapshot_id}",
lsn = snapshot.min_applied_lsn,
snapshot_id = snapshot.snapshot_id
)
}

/// Discover and download the latest snapshot available. It is the caller's responsibility
/// to delete the snapshot directory when it is no longer needed.
#[instrument(
Expand All @@ -349,11 +333,9 @@ impl SnapshotRepository {
&self,
partition_id: PartitionId,
) -> anyhow::Result<Option<LocalPartitionSnapshot>> {
let latest_path = object_store::path::Path::from(format!(
"{prefix}{partition_id}/latest.json",
prefix = self.prefix,
partition_id = partition_id,
));
let latest_path = object_store::path::Path::from(
self.get_partition_latest_snapshot_pointer_path(partition_id),
);

let latest = self.object_store.get(&latest_path).await;

Expand All @@ -370,10 +352,10 @@ impl SnapshotRepository {
debug!("Latest snapshot metadata: {:?}", latest);

let snapshot_metadata_path = object_store::path::Path::from(format!(
"{prefix}{partition_id}/{path}/metadata.json",
"{prefix}{partition_id}/{snapshot_path}/metadata.json",
prefix = self.prefix,
partition_id = partition_id,
path = latest.path,
snapshot_path = latest.path,
));
let snapshot_metadata = self.object_store.get(&snapshot_metadata_path).await;

Expand Down Expand Up @@ -425,10 +407,10 @@ impl SnapshotRepository {
let filename = file.name.trim_start_matches("/");
let expected_size = file.size;
let key = object_store::path::Path::from(format!(
"{prefix}{partition_id}/{path}/{filename}",
"{prefix}{partition_id}/{snapshot_path}/{filename}",
prefix = self.prefix,
partition_id = partition_id,
path = latest.path,
snapshot_path = latest.path,
filename = filename,
));
let file_path = snapshot_dir.path().join(filename);
Expand Down Expand Up @@ -546,6 +528,57 @@ impl SnapshotRepository {
}
}
}

/// Construct the complete snapshot prefix from the base of the repository bucket.
fn get_snapshot_complete_prefix(
&self,
snapshot_metadata: &PartitionSnapshotMetadata,
) -> String {
format!(
"{partition_prefix}/{snapshot_path}",
partition_prefix = self.get_partition_snapshots_prefix(snapshot_metadata.partition_id),
snapshot_path = Self::get_snapshot_id_component(snapshot_metadata)
)
}

/// Construct the full path to a specific file that is a part of a given snapshot.
fn get_snapshot_file_full_path(
&self,
snapshot_metadata: &PartitionSnapshotMetadata,
filename: &str,
) -> String {
format!(
"{snapshot_complete_prefix}/{filename}",
snapshot_complete_prefix = self.get_snapshot_complete_prefix(snapshot_metadata),
)
}

/// Construct the full path to the latest snapshot pointer for a given partition.
fn get_partition_latest_snapshot_pointer_path(&self, partition_id: PartitionId) -> String {
format!(
"{partition_prefix}/latest.json",
partition_prefix = self.get_partition_snapshots_prefix(partition_id)
)
}

/// Construct the snapshots prefix relative to the repository bucket for a given partition's snapshots.
fn get_partition_snapshots_prefix(&self, partition_id: PartitionId) -> String {
format!(
"{prefix}{partition_id}",
prefix = self.prefix,
partition_id = partition_id,
)
}

/// Construct the unique path component for a snapshot, e.g. `lsn_00001234-snap_abc123`.
fn get_snapshot_id_component(snapshot: &PartitionSnapshotMetadata) -> String {
// We zero-pad the LSN to ensure correct lexicographic sorting
format!(
"lsn_{lsn:020}-{snapshot_id}",
lsn = snapshot.min_applied_lsn,
snapshot_id = snapshot.snapshot_id
)
}
}

async fn create_object_store_client(destination: Url) -> anyhow::Result<Arc<dyn ObjectStore>> {
Expand Down Expand Up @@ -590,14 +623,14 @@ async fn create_object_store_client(destination: Url) -> anyhow::Result<Arc<dyn

#[derive(Clone, Debug)]
struct SnapshotUploadProgress {
pub full_snapshot_path: String,
pub snapshot_complete_path: String,
pub uploaded_files: Vec<String>,
}

impl SnapshotUploadProgress {
fn with_snapshot_path(full_snapshot_path: String) -> Self {
fn with_snapshot_path(snapshot_complete_path: String) -> Self {
SnapshotUploadProgress {
full_snapshot_path,
snapshot_complete_path,
uploaded_files: vec![],
}
}
Expand All @@ -620,7 +653,7 @@ impl PutSnapshotError {
{
PutSnapshotError {
error: error.into(),
full_snapshot_path: progress.full_snapshot_path,
full_snapshot_path: progress.snapshot_complete_path,
uploaded_files: progress.uploaded_files,
}
}
Expand Down Expand Up @@ -861,7 +894,7 @@ mod tests {

repository.put(&snapshot1, source_dir.clone()).await?;

let snapshot_prefix = SnapshotRepository::get_snapshot_prefix(&snapshot1);
let snapshot_prefix = SnapshotRepository::get_snapshot_id_component(&snapshot1);
let data = object_store
.get(&Path::from(format!(
"{}/{}/{}/data.sst",
Expand Down Expand Up @@ -917,7 +950,7 @@ mod tests {
assert_eq!(
LatestSnapshot::from_snapshot(
&snapshot2,
SnapshotRepository::get_snapshot_prefix(&snapshot2)
SnapshotRepository::get_snapshot_id_component(&snapshot2)
),
latest
);
Expand Down

0 comments on commit 269f4f1

Please sign in to comment.