Skip to content

Commit

Permalink
Use new syncfs syscall (#1427)
Browse files Browse the repository at this point in the history
Use new illumos syncfs() call for fsync-ing extents. syncfs() exposes access to
`zfs_sync()`, which syncs an entire zfs dataset. We use one dataset per crucible
region, so that's exactly what we want. This is more efficient than syncing each
file individually, particularly under random write workloads to large regions
with many extents.

This functionality is gated behind the `omicron-build` cargo feature. We
preserve the existing parallel-fsync approach when that feature is not
enabled.

references:
- https://www.illumos.org/issues/16675
- illumos/illumos-gate@4763305
  • Loading branch information
mkeeter authored Sep 11, 2024
1 parent 51347bd commit 558a6d9
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 106 deletions.
2 changes: 1 addition & 1 deletion .github/buildomat/jobs/build-release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ banner rbuild
ptime -m cargo build --verbose --release --all-features

banner rtest
ptime -m cargo test --verbose -- --nocapture > /tmp/cargo-test-out.log 2>&1
ptime -m cargo test --verbose --features=omicron-build -- --nocapture > /tmp/cargo-test-out.log 2>&1

banner output
mkdir -p /work/rbins
Expand Down
4 changes: 2 additions & 2 deletions .github/buildomat/jobs/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pfexec coreadm -i /tmp/core.%f.%p \
-e global-setid

banner build
ptime -m cargo build --verbose
ptime -m cargo build --verbose --all-features

banner output

Expand All @@ -50,4 +50,4 @@ echo in_work_bins
ls -l /work/bins

banner test
ptime -m cargo test --verbose -- --nocapture > /tmp/cargo-test-out.log 2>&1
ptime -m cargo test --verbose --features=omicron-build -- --nocapture > /tmp/cargo-test-out.log 2>&1
1 change: 1 addition & 0 deletions downstairs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,4 @@ asm = ["usdt/asm"]
default = []
zfs_snapshot = []
integration-tests = [] # Enables creating SQLite volumes
omicron-build = [] # Uses syncfs(3c) for flushes instead of fsync
78 changes: 72 additions & 6 deletions downstairs/src/extent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,45 @@ pub(crate) trait ExtentInner: Send + Sync + Debug {
fn flush_number(&self) -> Result<u64, CrucibleError>;
fn dirty(&self) -> Result<bool, CrucibleError>;

fn flush(
/// Performs any metadata updates needed before a flush
fn pre_flush(
&mut self,
new_flush: u64,
new_gen: u64,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError>;

/// Syncs all relevant data to persistant storage
fn flush_inner(
&mut self,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError>;

/// Performs any metadata updates after syncing data to persistent storage
fn post_flush(
&mut self,
new_flush: u64,
new_gen: u64,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError>;

/// Performs a full flush (pre/inner/post)
///
/// This is only exposed for the sake of unit testing; normal code should
/// use the fine-grained functions and be forced to consider performance.
#[cfg(test)]
fn flush(
&mut self,
new_flush: u64,
new_gen: u64,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
self.pre_flush(new_flush, new_gen, job_id)?;
self.flush_inner(job_id)?;
self.post_flush(new_flush, new_gen, job_id)?;
Ok(())
}

fn read(
&mut self,
job_id: JobId,
Expand Down Expand Up @@ -578,22 +610,25 @@ impl Extent {
Ok(())
}

#[instrument]
pub(crate) fn flush<I: Into<JobOrReconciliationId> + Debug>(
/// Prepares for a flush
///
/// Returns `false` if we should skip the flush (because this extent is not
/// dirty), or `true` if we should proceed.
pub(crate) fn pre_flush<I: Into<JobOrReconciliationId> + Debug>(
&mut self,
new_flush: u64,
new_gen: u64,
id: I, // only used for logging
log: &Logger,
) -> Result<(), CrucibleError> {
) -> Result<bool, CrucibleError> {
let job_id: JobOrReconciliationId = id.into();

if !self.inner.dirty()? {
/*
* If we have made no writes to this extent since the last flush,
* we do not need to update the extent on disk
*/
return Ok(());
return Ok(false);
}

// Read only extents should never have the dirty bit set. If they do,
Expand All @@ -604,7 +639,38 @@ impl Extent {
crucible_bail!(ModifyingReadOnlyRegion);
}

self.inner.flush(new_flush, new_gen, job_id)
self.inner.pre_flush(new_flush, new_gen, job_id)?;
Ok(true)
}

/// Performs post-flush cleanup
pub(crate) fn post_flush<I: Into<JobOrReconciliationId> + Debug>(
&mut self,
new_flush: u64,
new_gen: u64,
id: I, // only used for logging
) -> Result<(), CrucibleError> {
let job_id: JobOrReconciliationId = id.into();
self.inner.post_flush(new_flush, new_gen, job_id)
}

/// Flushes this extent if it is dirty
#[instrument]
pub(crate) fn flush<
I: Into<JobOrReconciliationId> + Debug + Copy + Clone,
>(
&mut self,
new_flush: u64,
new_gen: u64,
id: I, // only used for logging
log: &Logger,
) -> Result<(), CrucibleError> {
if !self.pre_flush(new_flush, new_gen, id, log)? {
return Ok(());
}
self.inner.flush_inner(id.into())?;
self.post_flush(new_flush, new_gen, id)?;
Ok(())
}

pub fn get_meta_info(&self) -> ExtentMeta {
Expand Down
33 changes: 19 additions & 14 deletions downstairs/src/extent_inner_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,20 +387,12 @@ impl ExtentInner for RawInner {
Ok(ExtentReadResponse { data: buf, blocks })
}

fn flush(
fn pre_flush(
&mut self,
new_flush: u64,
new_gen: u64,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
if !self.dirty()? {
/*
* If we have made no writes to this extent since the last flush,
* we do not need to update the extent on disk
*/
return Ok(());
}

cdt::extent__flush__start!(|| {
(job_id.get(), self.extent_number.0, 0)
});
Expand All @@ -409,10 +401,17 @@ impl ExtentInner for RawInner {
// operation atomic.
self.set_flush_number(new_flush, new_gen)?;

Ok(())
}

fn flush_inner(
&mut self,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
// Now, we fsync to ensure data is flushed to disk. It's okay to crash
// before this point, because setting the flush number is atomic.
cdt::extent__flush__file__start!(|| {
(job_id.get(), self.extent_number.0, 0)
(job_id.get(), self.extent_number.0)
});
if let Err(e) = self.file.sync_all() {
/*
Expand All @@ -425,9 +424,17 @@ impl ExtentInner for RawInner {
}
self.context_slot_dirty.fill(0);
cdt::extent__flush__file__done!(|| {
(job_id.get(), self.extent_number.0, 0)
(job_id.get(), self.extent_number.0)
});
Ok(())
}

fn post_flush(
&mut self,
_new_flush: u64,
_new_gen: u64,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
// Check for fragmentation in the context slots leading to worse
// performance, and defragment if that's the case.
let extra_syscalls_per_rw = self
Expand All @@ -442,9 +449,7 @@ impl ExtentInner for RawInner {
Ok(())
};

cdt::extent__flush__done!(|| {
(job_id.get(), self.extent_number.0, 0)
});
cdt::extent__flush__done!(|| { (job_id.get(), self.extent_number.0) });

r
}
Expand Down
59 changes: 46 additions & 13 deletions downstairs/src/extent_inner_sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,32 @@ impl ExtentInner for SqliteInner {
self.0.lock().unwrap().dirty()
}

fn flush(
fn pre_flush(
&mut self,
new_flush: u64,
new_gen: u64,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
self.0.lock().unwrap().flush(new_flush, new_gen, job_id)
self.0.lock().unwrap().pre_flush(new_flush, new_gen, job_id)
}

fn flush_inner(
&mut self,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
self.0.lock().unwrap().flush_inner(job_id)
}

fn post_flush(
&mut self,
new_flush: u64,
new_gen: u64,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
self.0
.lock()
.unwrap()
.post_flush(new_flush, new_gen, job_id)
}

fn read(
Expand Down Expand Up @@ -194,10 +213,10 @@ impl SqliteMoreInner {
Ok(self.dirty.get())
}

fn flush(
fn pre_flush(
&mut self,
new_flush: u64,
new_gen: u64,
_new_flush: u64,
_new_gen: u64,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
// Used for profiling
Expand All @@ -207,12 +226,19 @@ impl SqliteMoreInner {
(job_id.get(), self.extent_number.0, n_dirty_blocks)
});

Ok(())
}

fn flush_inner(
&mut self,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
/*
* We must first fsync to get any outstanding data written to disk.
* This must be done before we update the flush number.
*/
cdt::extent__flush__file__start!(|| {
(job_id.get(), self.extent_number.0, n_dirty_blocks)
(job_id.get(), self.extent_number.0)
});
if let Err(e) = self.file.sync_all() {
/*
Expand All @@ -225,9 +251,18 @@ impl SqliteMoreInner {
);
}
cdt::extent__flush__file__done!(|| {
(job_id.get(), self.extent_number.0, n_dirty_blocks)
(job_id.get(), self.extent_number.0)
});

Ok(())
}

fn post_flush(
&mut self,
new_flush: u64,
new_gen: u64,
job_id: JobOrReconciliationId,
) -> Result<(), CrucibleError> {
// Clear old block contexts. In order to be crash consistent, only
// perform this after the extent fsync is done. For each block
// written since the last flush, remove all block context rows where
Expand All @@ -237,7 +272,7 @@ impl SqliteMoreInner {
// file is rehashed, since in that case we don't have that luxury.

cdt::extent__flush__collect__hashes__start!(|| {
(job_id.get(), self.extent_number.0, n_dirty_blocks)
(job_id.get(), self.extent_number.0)
});

// Rehash any parts of the file that we *may have written* data to since
Expand All @@ -250,7 +285,7 @@ impl SqliteMoreInner {
});

cdt::extent__flush__sqlite__insert__start!(|| {
(job_id.get(), self.extent_number.0, n_dirty_blocks)
(job_id.get(), self.extent_number.0)
});

// We put all of our metadb updates into a single transaction to
Expand All @@ -265,7 +300,7 @@ impl SqliteMoreInner {
)?;

cdt::extent__flush__sqlite__insert__done!(|| {
(job_id.get(), self.extent_number.0, n_dirty_blocks)
(job_id.get(), self.extent_number.0)
});

self.set_flush_number(new_flush, new_gen)?;
Expand All @@ -275,9 +310,7 @@ impl SqliteMoreInner {
// Finally, reset the file's seek offset to 0
self.file.seek(SeekFrom::Start(0))?;

cdt::extent__flush__done!(|| {
(job_id.get(), self.extent_number.0, n_dirty_blocks)
});
cdt::extent__flush__done!(|| { (job_id.get(), self.extent_number.0) });
Ok(())
}

Expand Down
36 changes: 8 additions & 28 deletions downstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,44 +739,24 @@ pub mod cdt {
fn submit__writeunwritten__done(_: u64) {}
fn submit__write__done(_: u64) {}
fn submit__flush__done(_: u64) {}
fn extent__flush__start(job_id: u64, extent_id: u32, extent_size: u64) {}
fn extent__flush__done(job_id: u64, extent_id: u32, extent_size: u64) {}
fn extent__flush__file__start(
fn extent__flush__start(
job_id: u64,
extent_id: u32,
extent_size: u64,
) {
}
fn extent__flush__file__done(
job_id: u64,
extent_id: u32,
extent_size: u64,
) {
}
fn extent__flush__collect__hashes__start(
job_id: u64,
extent_id: u32,
num_dirty: u64,
num_dirty_blocks: u64,
) {
}
fn extent__flush__done(job_id: u64, extent_id: u32) {}
fn extent__flush__file__start(job_id: u64, extent_id: u32) {}
fn extent__flush__file__done(job_id: u64, extent_id: u32) {}
fn extent__flush__collect__hashes__start(job_id: u64, extent_id: u32) {}
fn extent__flush__collect__hashes__done(
job_id: u64,
extent_id: u32,
num_rehashed: u64,
) {
}
fn extent__flush__sqlite__insert__start(
job_id: u64,
extent_id: u32,
extent_size: u64,
) {
}
fn extent__flush__sqlite__insert__done(
_job_id: u64,
_extent_id: u32,
extent_size: u64,
) {
}
fn extent__flush__sqlite__insert__start(job_id: u64, extent_id: u32) {}
fn extent__flush__sqlite__insert__done(job_id: u64, extent_id: u32) {}
fn extent__write__start(job_id: u64, extent_id: u32, n_blocks: u64) {}
fn extent__write__done(job_id: u64, extent_id: u32, n_blocks: u64) {}
fn extent__write__get__hashes__start(
Expand Down
Loading

0 comments on commit 558a6d9

Please sign in to comment.