Skip to content

Commit

Permalink
Ensure snapshot files are properly flushed (#2483)
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov authored Jan 10, 2025
1 parent 483dcd6 commit 7c5e614
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion crates/worker/src/partition/snapshots/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use tempfile::TempDir;
use tokio::io;
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tokio_util::io::StreamReader;
Expand Down Expand Up @@ -428,13 +428,16 @@ impl SnapshotRepository {
.map_err(|e| anyhow!("Failed to download snapshot file {:?}: {}", key, e))?
.into_stream(),
);

let mut snapshot_file =
tokio::fs::File::create_new(&file_path).await.map_err(|e| {
anyhow!("Failed to create snapshot file {:?}: {}", file_path, e)
})?;
let size = io::copy(&mut file_data, &mut snapshot_file)
.await
.map_err(|e| anyhow!("Failed to download snapshot file {:?}: {}", key, e))?;
snapshot_file.shutdown().await?;

if size != expected_size as u64 {
return Err(anyhow!(
"Downloaded snapshot file {:?} has unexpected size: expected {}, got {}",
Expand Down Expand Up @@ -763,6 +766,7 @@ mod tests {
let data = b"snapshot-data";
let mut data_file = tokio::fs::File::create(source_dir.join("data.sst")).await?;
data_file.write_all(data).await?;
data_file.shutdown().await?;

let snapshot = mock_snapshot_metadata(
"/data.sst".to_owned(),
Expand Down Expand Up @@ -796,6 +800,7 @@ mod tests {
info!("Creating file: {:?}", latest_path);
let mut latest = tokio::fs::File::create(&latest_path).await?;
latest.write_all(b"not valid json").await?;
latest.shutdown().await?;

assert!(repository.put(&snapshot, source_dir).await.is_err());

Expand Down Expand Up @@ -843,6 +848,7 @@ mod tests {
let data = b"snapshot-data";
let mut data_file = tokio::fs::File::create(source_dir.join("data.sst")).await?;
data_file.write_all(data).await?;
data_file.shutdown().await?;

let mut snapshot1 = mock_snapshot_metadata(
"/data.sst".to_owned(),
Expand Down Expand Up @@ -902,6 +908,7 @@ mod tests {
let data = b"snapshot-data";
let mut data_file = tokio::fs::File::create(source_dir.join("data.sst")).await?;
data_file.write_all(data).await?;
data_file.shutdown().await?;

let mut snapshot2 = mock_snapshot_metadata(
"/data.sst".to_owned(),
Expand Down

0 comments on commit 7c5e614

Please sign in to comment.