Skip to content

Commit

Permalink
Store v1 and v2 snapshots.
Browse files Browse the repository at this point in the history
  • Loading branch information
arik-so committed May 17, 2024
1 parent b45a67f commit a413e34
Showing 1 changed file with 33 additions and 22 deletions.
55 changes: 33 additions & 22 deletions src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,20 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
// channel updates

// purge and recreate the pending directories
if fs::metadata(&pending_snapshot_directory).is_ok() {
fs::remove_dir_all(&pending_snapshot_directory).expect("Failed to remove pending snapshot directory.");
}
if fs::metadata(&pending_symlink_directory).is_ok() {
fs::remove_dir_all(&pending_symlink_directory).expect("Failed to remove pending symlink directory.");
let suffixes = ["", "/v2"];
for suffix in suffixes {
let versioned_snapshot_directory = format!("{}{}", pending_snapshot_directory, suffix);
let versioned_symlink_directory = format!("{}{}", pending_symlink_directory, suffix);

if fs::metadata(&versioned_snapshot_directory).is_ok() {
fs::remove_dir_all(&versioned_snapshot_directory).expect("Failed to remove pending snapshot directory.");
}
if fs::metadata(&versioned_symlink_directory).is_ok() {
fs::remove_dir_all(&versioned_symlink_directory).expect("Failed to remove pending symlink directory.");
}
fs::create_dir_all(&versioned_snapshot_directory).expect("Failed to create pending snapshot directory");
fs::create_dir_all(&versioned_symlink_directory).expect("Failed to create pending symlink directory");
}
fs::create_dir_all(&pending_snapshot_directory).expect("Failed to create pending snapshot directory");
fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory");

let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
for current_scope in snapshot_scopes {
Expand All @@ -115,13 +121,16 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
log_info!(self.logger, "Calculating {}-second snapshot", current_scope);
// calculate the snapshot
let delta = super::calculate_delta(network_graph_clone.clone(), current_last_sync_timestamp.clone() as u32, Some(reference_timestamp), self.logger.clone()).await;
let snapshot = super::serialize_delta(&delta, 1, self.logger.clone());
let snapshot_v1 = super::serialize_delta(&delta, 1, self.logger.clone());
let snapshot_v2 = super::serialize_delta(&delta, 2, self.logger.clone());

// persist the snapshot and update the symlink
let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-scope__previous-sync:{}.lngossip", reference_timestamp, current_scope, current_last_sync_timestamp);
let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
log_info!(self.logger, "Persisting {}-second snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", current_scope, snapshot_filename, snapshot.message_count, snapshot.channel_announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
fs::write(&snapshot_path, snapshot.data).unwrap();
let snapshot_path_v1 = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
let snapshot_path_v2 = format!("{}/v2/{}", pending_snapshot_directory, snapshot_filename);
log_info!(self.logger, "Persisting {}-second snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", current_scope, snapshot_filename, snapshot_v1.message_count, snapshot_v1.channel_announcement_count, snapshot_v1.update_count, snapshot_v1.update_count_full, snapshot_v1.update_count_incremental);
fs::write(&snapshot_path_v1, snapshot_v1.data).unwrap();
fs::write(&snapshot_path_v2, snapshot_v2.data).unwrap();
snapshot_filenames_by_scope.insert(current_scope.clone(), snapshot_filename);
}
}
Expand Down Expand Up @@ -176,19 +185,21 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
};
log_info!(self.logger, "i: {}, referenced scope: {}", i, referenced_scope);

let snapshot_filename = snapshot_filenames_by_scope.get(&referenced_scope).unwrap();
let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename);
for suffix in suffixes {
let snapshot_filename = snapshot_filenames_by_scope.get(&referenced_scope).unwrap();
let relative_snapshot_path = format!("{}{}/{}", relative_symlink_to_snapshot_path, suffix, snapshot_filename);

let canonical_last_sync_timestamp = if i == 0 {
// special-case 0 to always refer to a full/initial sync
0
} else {
reference_timestamp.saturating_sub(granularity_interval.saturating_mul(i))
};
let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
let canonical_last_sync_timestamp = if i == 0 {
// special-case 0 to always refer to a full/initial sync
0
} else {
reference_timestamp.saturating_sub(granularity_interval.saturating_mul(i))
};
let symlink_path = format!("{}{}/{}.bin", pending_symlink_directory, suffix, canonical_last_sync_timestamp);

log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_scope, symlink_path, relative_snapshot_path);
symlink(&relative_snapshot_path, &symlink_path).unwrap();
log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_scope, symlink_path, relative_snapshot_path);
symlink(&relative_snapshot_path, &symlink_path).unwrap();
}
}

let update_time_path = format!("{}/update_time.txt", pending_symlink_directory);
Expand Down

0 comments on commit a413e34

Please sign in to comment.