Skip to content

Commit

Permalink
Implement read decryption offloading (#1089)
Browse files Browse the repository at this point in the history
Analogous to #1066 , this PR moves read decryption into the rayon thread pool.

It uses exactly the same infrastructure (particularly `DeferredQueue`), and is
implemented in a very similar way.
  • Loading branch information
mkeeter authored Jan 23, 2024
1 parent 2d43bc3 commit df343e7
Show file tree
Hide file tree
Showing 5 changed files with 387 additions and 43 deletions.
6 changes: 5 additions & 1 deletion common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ pub enum CrucibleError {

#[error("context slot deserialization failed: {0}")]
BadContextSlot(String),

#[error("missing block context for non-empty block")]
MissingBlockContext,
}

impl From<std::io::Error> for CrucibleError {
Expand Down Expand Up @@ -398,7 +401,8 @@ impl From<CrucibleError> for dropshot::HttpError {
| CrucibleError::UuidMismatch
| CrucibleError::MissingContextSlot(..)
| CrucibleError::BadMetadata(..)
| CrucibleError::BadContextSlot(..) => {
| CrucibleError::BadContextSlot(..)
| CrucibleError::MissingBlockContext => {
dropshot::HttpError::for_internal_error(e.to_string())
}
}
Expand Down
47 changes: 21 additions & 26 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1170,10 +1170,15 @@ impl DownstairsClient {
/// Handles a single IO operation
///
/// Returns `true` if the job is now ackable, `false` otherwise
///
/// If this is a read response, then the values in `responses` must
/// _already_ be decrypted (with corresponding hashes stored in
/// `read_response_hashes`).
pub(crate) fn process_io_completion(
&mut self,
job: &mut DownstairsIO,
mut responses: Result<Vec<ReadResponse>, CrucibleError>,
responses: Result<Vec<ReadResponse>, CrucibleError>,
read_response_hashes: Vec<Option<u64>>,
deactivate: bool,
extent_info: Option<ExtentInfo>,
) -> bool {
Expand All @@ -1189,26 +1194,9 @@ impl DownstairsClient {
let mut jobs_completed_ok = job.state_count().completed_ok();
let mut ackable = false;

// Validate integrity hashes and optionally authenticated decryption.
//
// With AE, responses can come back that are invalid given an encryption
// context. Test this here. If it fails, then something has gone
// irrecoverably wrong and we should panic.
let mut read_response_hashes = Vec::new();
let new_state = match &mut responses {
Ok(responses) => {
responses.iter_mut().for_each(|x| {
let mh =
if let Some(context) = &self.cfg.encryption_context {
validate_encrypted_read_response(
x, context, &self.log,
)
} else {
validate_unencrypted_read_response(x, &self.log)
}
.expect("decryption failed");
read_response_hashes.push(mh);
});
let new_state = match &responses {
Ok(..) => {
// Messages have already been decrypted out-of-band
jobs_completed_ok += 1;
IOState::Done
}
Expand Down Expand Up @@ -2620,8 +2608,12 @@ pub(crate) fn validate_encrypted_read_response(
// expect to see this case unless this is a blank block.
//
// XXX if it's not a blank block, we may be under attack?
assert!(response.data[..].iter().all(|&x| x == 0));
return Ok(None);
if response.data[..].iter().all(|&x| x == 0) {
return Ok(None);
} else {
error!(log, "got empty block context with non-blank block");
return Err(CrucibleError::MissingBlockContext);
}
}

let mut valid_hash = None;
Expand Down Expand Up @@ -2786,9 +2778,12 @@ pub(crate) fn validate_unencrypted_read_response(
// this case unless this is a blank block.
//
// XXX if it's not a blank block, we may be under attack?
assert!(response.data[..].iter().all(|&x| x == 0));

Ok(None)
if response.data[..].iter().all(|&x| x == 0) {
Ok(None)
} else {
error!(log, "got empty block context with non-blank block");
Err(CrucibleError::MissingBlockContext)
}
}
}

Expand Down
82 changes: 80 additions & 2 deletions upstairs/src/deferred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
use std::sync::Arc;

use crate::{
upstairs::UpstairsConfig, BlockContext, BlockReq, BlockRes, ImpactedBlocks,
upstairs::UpstairsConfig, BlockContext, BlockReq, BlockRes, ClientId,
ImpactedBlocks, Message,
};
use bytes::Bytes;
use crucible_common::{integrity_hash, CrucibleError, RegionDefinition};
use futures::{
future::{Either, Ready},
future::{ready, Either, Ready},
stream::FuturesOrdered,
StreamExt,
};
use slog::{error, Logger};
use tokio::sync::oneshot;

/// Future stored in a [`DeferredQueue`]
Expand Down Expand Up @@ -70,6 +72,18 @@ impl<T> DeferredQueue<T> {
t.map(|t| t.expect("oneshot failed"))
}

/// Stores a new future in the queue, marking it as non-empty
pub fn push_immediate(&mut self, t: T) {
self.push_back(Either::Left(ready(Ok(t))));
}

/// Stores a new pending oneshot in the queue, returning the sender
pub fn push_oneshot(&mut self) -> oneshot::Sender<T> {
let (rx, tx) = oneshot::channel();
self.push_back(Either::Right(tx));
rx
}

/// Check whether the queue is known to be empty
///
/// It is possible for this to return `false` if the queue is actually
Expand Down Expand Up @@ -184,3 +198,67 @@ impl DeferredWrite {
})
}
}

////////////////////////////////////////////////////////////////////////////////

#[derive(Debug)]
pub(crate) struct DeferredMessage {
pub message: Message,

/// If this was a `ReadResponse`, then the hashes are stored here
pub hashes: Vec<Option<u64>>,

pub client_id: ClientId,
}

/// Standalone data structure which can perform decryption
pub(crate) struct DeferredRead {
/// Message, which must be a `ReadResponse`
pub message: Message,

pub client_id: ClientId,
pub cfg: Arc<UpstairsConfig>,
pub log: Logger,
}

impl DeferredRead {
/// Consume the `DeferredRead` and perform decryption
///
/// If decryption fails, then the resulting `Message` has an error in the
/// `responses` field, and `hashes` is empty.
pub fn run(mut self) -> DeferredMessage {
use crate::client::{
validate_encrypted_read_response,
validate_unencrypted_read_response,
};
let Message::ReadResponse { responses, .. } = &mut self.message else {
panic!("invalid DeferredRead");
};
let mut hashes = vec![];

if let Ok(rs) = responses {
for r in rs.iter_mut() {
let v = if let Some(ctx) = &self.cfg.encryption_context {
validate_encrypted_read_response(r, ctx, &self.log)
} else {
validate_unencrypted_read_response(r, &self.log)
};
match v {
Ok(hash) => hashes.push(hash),
Err(e) => {
error!(self.log, "decryption failure: {e:?}");
*responses = Err(e);
hashes.clear();
break;
}
}
}
}

DeferredMessage {
client_id: self.client_id,
message: self.message,
hashes,
}
}
}
19 changes: 19 additions & 0 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2901,6 +2901,7 @@ impl Downstairs {
&mut self,
client_id: ClientId,
m: Message,
read_response_hashes: Vec<Option<u64>>,
up_state: &UpstairsState,
) -> Result<(), CrucibleError> {
let (upstairs_id, session_id, ds_id, read_data, extent_info) = match &m
Expand Down Expand Up @@ -3133,6 +3134,7 @@ impl Downstairs {
ds_id,
client_id,
read_data,
read_response_hashes,
up_state,
extent_info,
);
Expand Down Expand Up @@ -3181,10 +3183,18 @@ impl Downstairs {
extent_info: Option<ExtentInfo>,
) -> bool {
let was_ackable = self.ackable_work.contains(&ds_id);

// Make up dummy values for hashes, since they're not actually checked
// here (besides confirming that we have the correct number).
let hashes = match &responses {
Ok(r) => vec![Some(0); r.len()],
Err(..) => vec![],
};
self.process_io_completion_inner(
ds_id,
client_id,
responses,
hashes,
up_state,
extent_info,
);
Expand All @@ -3198,6 +3208,7 @@ impl Downstairs {
ds_id: JobId,
client_id: ClientId,
responses: Result<Vec<ReadResponse>, CrucibleError>,
read_response_hashes: Vec<Option<u64>>,
up_state: &UpstairsState,
extent_info: Option<ExtentInfo>,
) {
Expand Down Expand Up @@ -3226,9 +3237,17 @@ impl Downstairs {
return;
};

// Sanity-checking for a programmer error during offloaded decryption.
// If we didn't get one hash per read block, then `responses` must
// have been converted into `Err(..)`.
if let Ok(reads) = &responses {
assert_eq!(reads.len(), read_response_hashes.len());
}

if self.clients[client_id].process_io_completion(
job,
responses,
read_response_hashes,
deactivate,
extent_info,
) {
Expand Down
Loading

0 comments on commit df343e7

Please sign in to comment.