Skip to content

Commit

Permalink
Simplify open partition store logic
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Dec 23, 2024
1 parent 1870774 commit 8a82f9e
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 95 deletions.
6 changes: 5 additions & 1 deletion crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,10 @@ where
Ok(stopped) => {
match stopped {
ProcessorStopReason::LogTrimGap { to_lsn } =>
info!(?to_lsn, "Shutting partition processor down because we encountered a trim gap in the log."),
info!(
trim_gap_to_lsn = ?to_lsn,
"Shutting partition processor down because we encountered a trim gap in the log."
),
_ => warn!("Shutting partition processor down because it stopped unexpectedly.")
}
},
Expand Down Expand Up @@ -495,6 +498,7 @@ where
}
}
Record::TrimGap(to_lsn) => {
// todo(pavel): this assumption may be violated if a command batch contains some records and a trim gap!
assert!(trim_gap.is_none(), "Expecting only a single trim gap!");
trim_gap = Some(to_lsn)
}
Expand Down
254 changes: 160 additions & 94 deletions crates/worker/src/partition_processor_manager/spawn_processor_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ use tracing::{debug, info, instrument, warn};
use restate_bifrost::Bifrost;
use restate_core::{Metadata, RuntimeTaskHandle, TaskCenter, TaskKind};
use restate_invoker_impl::Service as InvokerService;
use restate_partition_store::snapshots::LocalPartitionSnapshot;
use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager};
use restate_service_protocol::codec::ProtobufRawEntryCodec;
use restate_types::cluster::cluster_state::PartitionProcessorStatus;
use restate_types::config::Configuration;
use restate_types::config::{Configuration, WorkerOptions};
use restate_types::identifiers::{PartitionId, PartitionKey};
use restate_types::live::Live;
use restate_types::logs::Lsn;
Expand Down Expand Up @@ -135,99 +136,15 @@ impl SpawnPartitionProcessorTask {
let key_range = key_range.clone();

move || async move {
let partition_store = if fast_forward_lsn.is_some() || !partition_store_manager
.has_partition_store(pp_builder.partition_id)
.await
{
let snapshot = if snapshot_repository.is_none() {
debug!(
partition_id = %partition_id,
"No snapshot repository configured",
);
None
} else {
debug!(
partition_id = %partition_id,
"Looking for partition snapshot from which to bootstrap partition store",
);
snapshot_repository.expect("is some").get_latest(partition_id).await?
};


if let Some(snapshot) = snapshot {
match fast_forward_lsn {
Some(trim_gap_to_lsn) => {
if snapshot.min_applied_lsn >= trim_gap_to_lsn {
info!("Found snapshot with state beyond the trim gap, resetting local partition store...");
partition_store_manager.drop_partition(partition_id).await;
} else {
warn!("Latest snapshot is before the trim gap!");
}
},
None => {
info!(
partition_id = %partition_id,
"Found snapshot to bootstrap partition, restoring it",
);
},
}

let snapshot_path = snapshot.base_dir.clone();
match partition_store_manager
.open_partition_store_from_snapshot(
partition_id,
key_range.clone(),
snapshot,
&options.storage.rocksdb,
)
.await {
Ok(partition_store) => {
let res = tokio::fs::remove_dir_all(&snapshot_path).await;
if let Err(e) = res {
// This is not critical; since we move the SST files into RocksDB on import, at
// worst the snapshot metadata file will be retained.
warn!(
partition_id = %partition_id,
?snapshot_path,
"Failed to remove local snapshot directory, continuing with startup: {:?}",
e
);
}
partition_store
}
Err(e) => {
warn!(
partition_id = %partition_id,
?snapshot_path,
"Failed to import snapshot, local copy retained"
);
return Err(anyhow::anyhow!(e));
}
}
} else {
info!(
partition_id = %partition_id,
"No snapshot found to bootstrap partition, creating new store",
);
partition_store_manager
.open_partition_store(
partition_id,
key_range,
OpenMode::CreateIfMissing,
&options.storage.rocksdb,
)
.await?
}
} else {
partition_store_manager
.open_partition_store(
partition_id,
key_range,
OpenMode::OpenExisting,
&options.storage.rocksdb,
)
.await?
};
let partition_store = open_partition_store(
partition_id,
partition_store_manager,
snapshot_repository,
fast_forward_lsn,
&options,
key_range,
)
.await?;

TaskCenter::spawn_child(
TaskKind::SystemService,
Expand Down Expand Up @@ -256,3 +173,152 @@ impl SpawnPartitionProcessorTask {
Ok((state, root_task_handle))
}
}

async fn open_partition_store(
partition_id: PartitionId,
partition_store_manager: PartitionStoreManager,
snapshot_repository: Option<SnapshotRepository>,
fast_forward_lsn: Option<Lsn>,
options: &WorkerOptions,
key_range: RangeInclusive<PartitionKey>,
) -> anyhow::Result<PartitionStore> {
if partition_store_manager
.has_partition_store(partition_id)
.await
&& fast_forward_lsn.is_none()
{
return Ok(partition_store_manager
.open_partition_store(
partition_id,
key_range,
OpenMode::OpenExisting,
&options.storage.rocksdb,
)
.await?);
}

// We don't have a local partition store initialized - or we have a fast-forward LSN target set.
// Attempt to get the latest available snapshot from the snapshots repository:
let snapshot = match snapshot_repository {
Some(repository) => {
debug!("Looking for partition snapshot from which to bootstrap partition store");
// todo(pavel): pass target LSN to repository
repository.get_latest(partition_id).await?
}
None => {
debug!("No snapshot repository configured");
None
}
};

match (snapshot, fast_forward_lsn) {
(None, None) => {
info!("No snapshot found to bootstrap partition, creating new store");
Ok(partition_store_manager
.open_partition_store(
partition_id,
key_range,
OpenMode::CreateIfMissing,
&options.storage.rocksdb,
)
.await?)
}
(Some(snapshot), None) => {
info!(partition_id = %partition_id, "Found partition snapshot, restoring it");
Ok(import_snapshot(
partition_id,
key_range,
snapshot,
partition_store_manager,
options,
)
.await?)
}
(Some(snapshot), Some(fast_forward_lsn))
if snapshot.min_applied_lsn >= fast_forward_lsn =>
{
// We trust that the fast_forward_lsn is greater than the locally applied LSN.
info!(
latest_snapshot_lsn = ?snapshot.min_applied_lsn,
?fast_forward_lsn,
"Found snapshot with LSN >= target LSN, dropping local partition store state",
);
partition_store_manager.drop_partition(partition_id).await;
Ok(import_snapshot(
partition_id,
key_range,
snapshot,
partition_store_manager,
options,
)
.await?)
}
(maybe_snapshot, Some(fast_forward_lsn)) => {
// Play it safe and keep the partition store intact; we can't do much else at this
// point. We'll likely halt again as soon as the processor starts up.
if let Some(snapshot) = maybe_snapshot {
warn!(
?snapshot.min_applied_lsn,
?fast_forward_lsn,
"Latest available snapshot is older than the the fast-forward target LSN!",
);
} else {
warn!(
?fast_forward_lsn,
"A fast-forward target LSN is set, but no snapshot available for partition!",
);
}
// todo(pavel): backoff for a while and return Err
Ok(partition_store_manager
.open_partition_store(
partition_id,
key_range,
OpenMode::OpenExisting,
&options.storage.rocksdb,
)
.await?)
}
}
}

async fn import_snapshot(
partition_id: PartitionId,
key_range: RangeInclusive<PartitionKey>,
snapshot: LocalPartitionSnapshot,
partition_store_manager: PartitionStoreManager,
options: &WorkerOptions,
) -> anyhow::Result<PartitionStore> {
let snapshot_path = snapshot.base_dir.clone();
match partition_store_manager
.open_partition_store_from_snapshot(
partition_id,
key_range.clone(),
snapshot,
&options.storage.rocksdb,
)
.await
{
Ok(partition_store) => {
let res = tokio::fs::remove_dir_all(&snapshot_path).await;
if let Err(e) = res {
// This is not critical; since we move the SST files into RocksDB on import,
// at worst only the snapshot metadata file will remain in the staging dir
warn!(
partition_id = %partition_id,
?snapshot_path,
"Failed to remove local snapshot directory, continuing with startup: {:?}",
e,
);
}
Ok(partition_store)
}
Err(e) => {
warn!(
partition_id = %partition_id,
?snapshot_path,
"Failed to import snapshot, local snapshot data retained"
);
Err(anyhow::anyhow!(e))
}
}
}

0 comments on commit 8a82f9e

Please sign in to comment.