From df343e7c464db3b226d9a819c59e6a3a2dfd3dea Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Tue, 23 Jan 2024 13:05:39 -0500 Subject: [PATCH] Implement read decryption offloading (#1089) Analogous to #1066 , this PR moves read decryption into the rayon thread pool. It uses exactly the same infrastructure (particularly `DeferredQueue`), and is implemented in a very similar way. --- common/src/lib.rs | 6 +- upstairs/src/client.rs | 47 +++---- upstairs/src/deferred.rs | 82 ++++++++++- upstairs/src/downstairs.rs | 19 +++ upstairs/src/upstairs.rs | 276 +++++++++++++++++++++++++++++++++++-- 5 files changed, 387 insertions(+), 43 deletions(-) diff --git a/common/src/lib.rs b/common/src/lib.rs index b7f6aef47..af8d06909 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -155,6 +155,9 @@ pub enum CrucibleError { #[error("context slot deserialization failed: {0}")] BadContextSlot(String), + + #[error("missing block context for non-empty block")] + MissingBlockContext, } impl From for CrucibleError { @@ -398,7 +401,8 @@ impl From for dropshot::HttpError { | CrucibleError::UuidMismatch | CrucibleError::MissingContextSlot(..) | CrucibleError::BadMetadata(..) - | CrucibleError::BadContextSlot(..) => { + | CrucibleError::BadContextSlot(..) + | CrucibleError::MissingBlockContext => { dropshot::HttpError::for_internal_error(e.to_string()) } } diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index b2ece8fa2..05e45b763 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -1170,10 +1170,15 @@ impl DownstairsClient { /// Handles a single IO operation /// /// Returns `true` if the job is now ackable, `false` otherwise + /// + /// If this is a read response, then the values in `responses` must + /// _already_ be decrypted (with corresponding hashes stored in + /// `read_response_hashes`). pub(crate) fn process_io_completion( &mut self, job: &mut DownstairsIO, - mut responses: Result, CrucibleError>, + responses: Result, CrucibleError>, + read_response_hashes: Vec>, deactivate: bool, extent_info: Option, ) -> bool { @@ -1189,26 +1194,9 @@ impl DownstairsClient { let mut jobs_completed_ok = job.state_count().completed_ok(); let mut ackable = false; - // Validate integrity hashes and optionally authenticated decryption. - // - // With AE, responses can come back that are invalid given an encryption - // context. Test this here. If it fails, then something has gone - // irrecoverably wrong and we should panic. - let mut read_response_hashes = Vec::new(); - let new_state = match &mut responses { - Ok(responses) => { - responses.iter_mut().for_each(|x| { - let mh = - if let Some(context) = &self.cfg.encryption_context { - validate_encrypted_read_response( - x, context, &self.log, - ) - } else { - validate_unencrypted_read_response(x, &self.log) - } - .expect("decryption failed"); - read_response_hashes.push(mh); - }); + let new_state = match &responses { + Ok(..) => { + // Messages have already been decrypted out-of-band jobs_completed_ok += 1; IOState::Done } @@ -2620,8 +2608,12 @@ pub(crate) fn validate_encrypted_read_response( // expect to see this case unless this is a blank block. // // XXX if it's not a blank block, we may be under attack? - assert!(response.data[..].iter().all(|&x| x == 0)); - return Ok(None); + if response.data[..].iter().all(|&x| x == 0) { + return Ok(None); + } else { + error!(log, "got empty block context with non-blank block"); + return Err(CrucibleError::MissingBlockContext); + } } let mut valid_hash = None; @@ -2786,9 +2778,12 @@ pub(crate) fn validate_unencrypted_read_response( // this case unless this is a blank block. // // XXX if it's not a blank block, we may be under attack? - assert!(response.data[..].iter().all(|&x| x == 0)); - - Ok(None) + if response.data[..].iter().all(|&x| x == 0) { + Ok(None) + } else { + error!(log, "got empty block context with non-blank block"); + Err(CrucibleError::MissingBlockContext) + } } } diff --git a/upstairs/src/deferred.rs b/upstairs/src/deferred.rs index 4d168e3e8..4c4f5f72d 100644 --- a/upstairs/src/deferred.rs +++ b/upstairs/src/deferred.rs @@ -3,15 +3,17 @@ use std::sync::Arc; use crate::{ - upstairs::UpstairsConfig, BlockContext, BlockReq, BlockRes, ImpactedBlocks, + upstairs::UpstairsConfig, BlockContext, BlockReq, BlockRes, ClientId, + ImpactedBlocks, Message, }; use bytes::Bytes; use crucible_common::{integrity_hash, CrucibleError, RegionDefinition}; use futures::{ - future::{Either, Ready}, + future::{ready, Either, Ready}, stream::FuturesOrdered, StreamExt, }; +use slog::{error, Logger}; use tokio::sync::oneshot; /// Future stored in a [`DeferredQueue`] @@ -70,6 +72,18 @@ impl DeferredQueue { t.map(|t| t.expect("oneshot failed")) } + /// Stores a new future in the queue, marking it as non-empty + pub fn push_immediate(&mut self, t: T) { + self.push_back(Either::Left(ready(Ok(t)))); + } + + /// Stores a new pending oneshot in the queue, returning the sender + pub fn push_oneshot(&mut self) -> oneshot::Sender { + let (rx, tx) = oneshot::channel(); + self.push_back(Either::Right(tx)); + rx + } + /// Check whether the queue is known to be empty /// /// It is possible for this to return `false` if the queue is actually @@ -184,3 +198,67 @@ impl DeferredWrite { }) } } + +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug)] +pub(crate) struct DeferredMessage { + pub message: Message, + + /// If this was a `ReadResponse`, then the hashes are stored here + pub hashes: Vec>, + + pub client_id: ClientId, +} + +/// Standalone data structure which can perform decryption +pub(crate) struct DeferredRead { + /// Message, which must be a `ReadResponse` + pub message: Message, + + pub client_id: ClientId, + pub cfg: Arc, + pub log: Logger, +} + +impl DeferredRead { + /// Consume the `DeferredRead` and perform decryption + /// + /// If decryption fails, then the resulting `Message` has an error in the + /// `responses` field, and `hashes` is empty. + pub fn run(mut self) -> DeferredMessage { + use crate::client::{ + validate_encrypted_read_response, + validate_unencrypted_read_response, + }; + let Message::ReadResponse { responses, .. } = &mut self.message else { + panic!("invalid DeferredRead"); + }; + let mut hashes = vec![]; + + if let Ok(rs) = responses { + for r in rs.iter_mut() { + let v = if let Some(ctx) = &self.cfg.encryption_context { + validate_encrypted_read_response(r, ctx, &self.log) + } else { + validate_unencrypted_read_response(r, &self.log) + }; + match v { + Ok(hash) => hashes.push(hash), + Err(e) => { + error!(self.log, "decryption failure: {e:?}"); + *responses = Err(e); + hashes.clear(); + break; + } + } + } + } + + DeferredMessage { + client_id: self.client_id, + message: self.message, + hashes, + } + } +} diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index 6cce3269c..62444788e 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -2901,6 +2901,7 @@ impl Downstairs { &mut self, client_id: ClientId, m: Message, + read_response_hashes: Vec>, up_state: &UpstairsState, ) -> Result<(), CrucibleError> { let (upstairs_id, session_id, ds_id, read_data, extent_info) = match &m @@ -3133,6 +3134,7 @@ impl Downstairs { ds_id, client_id, read_data, + read_response_hashes, up_state, extent_info, ); @@ -3181,10 +3183,18 @@ impl Downstairs { extent_info: Option, ) -> bool { let was_ackable = self.ackable_work.contains(&ds_id); + + // Make up dummy values for hashes, since they're not actually checked + // here (besides confirming that we have the correct number). + let hashes = match &responses { + Ok(r) => vec![Some(0); r.len()], + Err(..) => vec![], + }; self.process_io_completion_inner( ds_id, client_id, responses, + hashes, up_state, extent_info, ); @@ -3198,6 +3208,7 @@ impl Downstairs { ds_id: JobId, client_id: ClientId, responses: Result, CrucibleError>, + read_response_hashes: Vec>, up_state: &UpstairsState, extent_info: Option, ) { @@ -3226,9 +3237,17 @@ impl Downstairs { return; }; + // Sanity-checking for a programmer error during offloaded decryption. + // If we didn't get one hash per read block, then `responses` must + // have been converted into `Err(..)`. + if let Ok(reads) = &responses { + assert_eq!(reads.len(), read_response_hashes.len()); + } + if self.clients[client_id].process_io_completion( job, responses, + read_response_hashes, deactivate, extent_info, ) { diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 5d4b66f29..cca97f767 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -6,7 +6,8 @@ use crate::{ control::ControlRequest, deadline_secs, deferred::{ - DeferredBlockReq, DeferredQueue, DeferredWrite, EncryptedWrite, + DeferredBlockReq, DeferredMessage, DeferredQueue, DeferredRead, + DeferredWrite, EncryptedWrite, }, downstairs::{Downstairs, DownstairsAction}, extent_from_offset, @@ -25,11 +26,11 @@ use std::{ }, }; -use futures::future::{pending, ready, Either}; +use futures::future::{pending, Either}; use ringbuffer::RingBuffer; use slog::{debug, error, info, o, warn, Logger}; use tokio::{ - sync::{mpsc, oneshot}, + sync::mpsc, time::{sleep_until, Instant}, }; use uuid::Uuid; @@ -196,6 +197,9 @@ pub(crate) struct Upstairs { /// Stream of post-processed `BlockOp` futures deferred_reqs: DeferredQueue>, + + /// Stream of decrypted `Message` futures + deferred_msgs: DeferredQueue, } /// Action to be taken which modifies the [`Upstairs`] state @@ -207,6 +211,9 @@ pub(crate) enum UpstairsAction { /// A deferred block request has completed DeferredBlockReq(DeferredBlockReq), + /// A deferred message has arrived + DeferredMessage(DeferredMessage), + LeakCheck, FlushCheck, StatUpdate, @@ -340,6 +347,7 @@ impl Upstairs { control_rx, control_tx, deferred_reqs: DeferredQueue::new(), + deferred_msgs: DeferredQueue::new(), } } @@ -416,6 +424,16 @@ impl Upstairs { } } } + m = self.deferred_msgs.next(), if !self.deferred_msgs.is_empty() + => { + // The outer Option is None if the queue is empty. If this is + // the case, then we check that the empty flag was set. + let Some(m) = m else { + assert!(self.deferred_msgs.is_empty()); + return UpstairsAction::NoOp; + }; + UpstairsAction::DeferredMessage(m) + } _ = sleep_until(self.leak_deadline) => { UpstairsAction::LeakCheck } @@ -445,6 +463,9 @@ impl Upstairs { UpstairsAction::DeferredBlockReq(req) => { self.apply_guest_request(req).await; } + UpstairsAction::DeferredMessage(m) => { + self.on_client_message(m).await; + } UpstairsAction::LeakCheck => { const LEAK_MS: usize = 1000; let leak_tick = @@ -565,6 +586,20 @@ impl Upstairs { assert!(self.deferred_reqs.is_empty()); } + /// Helper function to await all deferred messages + /// + /// This is only useful in tests because it **only** processes deferred + /// messages (doing no other Upstairs work). In production, there + /// could be other events that need handling simultaneously, so we do not + /// want to stall the Upstairs. + #[cfg(test)] + async fn await_deferred_msgs(&mut self) { + while let Some(msg) = self.deferred_msgs.next().await { + self.apply(UpstairsAction::DeferredMessage(msg)).await; + } + assert!(self.deferred_msgs.is_empty()); + } + /// Check outstanding IOops for each downstairs. /// /// If the number is too high, then mark that downstairs as failed, scrub @@ -802,9 +837,8 @@ impl Upstairs { // have to keep using it for subsequent requests (even ones that are // not writes) to preserve FIFO ordering _ if !self.deferred_reqs.is_empty() => { - self.deferred_reqs.push_back(Either::Left(ready(Ok(Some( - DeferredBlockReq::Other(req), - ))))); + self.deferred_reqs + .push_immediate(Some(DeferredBlockReq::Other(req))); } // Otherwise, we can apply a non-write operation immediately, saving // a trip through the FuturesUnordered @@ -1282,12 +1316,11 @@ impl Upstairs { if let Some(w) = self.compute_deferred_write(offset, data, res, is_write_unwritten) { - let (tx, rx) = oneshot::channel(); + let tx = self.deferred_reqs.push_oneshot(); rayon::spawn(move || { let out = w.run().map(DeferredBlockReq::Write); let _ = tx.send(out); }); - self.deferred_reqs.push_back(Either::Right(rx)); } } @@ -1421,7 +1454,62 @@ impl Upstairs { c.halt_io_task(ClientStopReason::Timeout); } ClientAction::Response(m) => { - self.on_client_message(client_id, m).await; + // We have received a message, so reset the timeout watchdog for + // this particular client. + self.downstairs.clients[client_id].reset_timeout(); + + // Defer the message if it's a (large) read that needs + // decryption, or there are other deferred messages in the queue + // (to preserve order). Otherwise, handle it immediately. + if let Message::ReadResponse { responses, .. } = &m { + // Any read larger than this constant should be deferred to + // the worker pool; smaller reads can be processed in-thread + // (since the overhead isn't worth it) + const MIN_DEFER_SIZE_BYTES: u64 = 8192; + let should_defer = !self.deferred_msgs.is_empty() + || match responses { + Ok(rs) => { + // Find the number of bytes being decrypted + let response_size = rs.len() as u64 + * self + .ddef + .get_def() + .map(|b| b.block_size()) + .unwrap_or(0); + + response_size > MIN_DEFER_SIZE_BYTES + } + Err(_) => false, + }; + + let dr = DeferredRead { + message: m, + client_id, + cfg: self.cfg.clone(), + log: self.log.new(o!("job" => "decrypt")), + }; + if should_defer { + let tx = self.deferred_msgs.push_oneshot(); + rayon::spawn(move || { + let out = dr.run(); + let _ = tx.send(out); + }); + } else { + // Do decryption right here! + self.on_client_message(dr.run()).await; + } + } else { + let dm = DeferredMessage { + message: m, + hashes: vec![], + client_id, + }; + if self.deferred_msgs.is_empty() { + self.on_client_message(dm).await; + } else { + self.deferred_msgs.push_immediate(dm); + } + } } ClientAction::TaskStopped(r) => { self.on_client_task_stopped(client_id, r); @@ -1437,10 +1525,8 @@ impl Upstairs { } } - async fn on_client_message(&mut self, client_id: ClientId, m: Message) { - // We have received a message, so reset the timeout watchdog for this - // particular client. - self.downstairs.clients[client_id].reset_timeout(); + async fn on_client_message(&mut self, m: DeferredMessage) { + let (client_id, m, hashes) = (m.client_id, m.message, m.hashes); match m { Message::Imok => { // Nothing to do here, glad to hear that you're okay @@ -1460,6 +1546,7 @@ impl Upstairs { let r = self.downstairs.process_io_completion( client_id, m, + hashes, &self.state, ); if let Err(e) = r { @@ -3634,6 +3721,152 @@ pub(crate) mod test { }], }]); + // Because this read is small, it happens right away + up.apply(UpstairsAction::Downstairs(DownstairsAction::Client { + client_id: ClientId::new(0), + action: ClientAction::Response(Message::ReadResponse { + upstairs_id: up.cfg.upstairs_id, + session_id: up.cfg.session_id, + job_id: JobId(1000), + responses, + }), + })) + .await; + + // This was a small read and handled in-line + assert!(up.deferred_msgs.is_empty()); + // No panic, great job everyone + } + + #[tokio::test] + async fn good_deferred_decryption() { + let mut up = make_encrypted_upstairs(); + up.force_active().unwrap(); + set_all_active(&mut up.downstairs); + + let blocks = 16384 / 512; + let data = Buffer::new(512 * blocks); + let offset = Block::new_512(7); + let (_tx, res) = BlockReqWaiter::pair(); + up.apply(UpstairsAction::Guest(BlockReq { + op: BlockOp::Read { offset, data }, + res, + })) + .await; + + let mut data = Vec::from([1u8; 512]); + + let (nonce, tag, hash) = up + .cfg + .encryption_context + .as_ref() + .unwrap() + .encrypt_in_place(&mut data) + .unwrap(); + + let nonce: [u8; 12] = nonce.into(); + let tag: [u8; 16] = tag.into(); + + // Build up the long read response, which should be long enough to + // trigger the deferred read path. + let mut responses = vec![]; + for i in 0..blocks { + responses.push(ReadResponse { + eid: 0, + offset: Block::new_512(offset.value + i as u64), + data: BytesMut::from(&data[..]), + block_contexts: vec![BlockContext { + encryption_context: Some( + crucible_protocol::EncryptionContext { nonce, tag }, + ), + hash, + }], + }); + } + let responses = Ok(responses); + + // This defers decryption to a separate thread, because the read is + // large. We'll check that the job is deferred below. + up.apply(UpstairsAction::Downstairs(DownstairsAction::Client { + client_id: ClientId::new(0), + action: ClientAction::Response(Message::ReadResponse { + upstairs_id: up.cfg.upstairs_id, + session_id: up.cfg.session_id, + job_id: JobId(1000), + responses, + }), + })) + .await; + + // This was a large read and was deferred + assert!(!up.deferred_msgs.is_empty()); + + up.await_deferred_msgs().await; + // No panic, great job everyone + } + + #[tokio::test] + async fn bad_deferred_decryption_means_panic() { + let mut up = make_encrypted_upstairs(); + up.force_active().unwrap(); + set_all_active(&mut up.downstairs); + + let blocks = 16384 / 512; + let data = Buffer::new(512 * blocks); + let offset = Block::new_512(7); + let (_tx, res) = BlockReqWaiter::pair(); + up.apply(UpstairsAction::Guest(BlockReq { + op: BlockOp::Read { offset, data }, + res, + })) + .await; + + // fake read response from downstairs that will fail decryption + let mut data = Vec::from([1u8; 512]); + + let (nonce, tag, _) = up + .cfg + .encryption_context + .as_ref() + .unwrap() + .encrypt_in_place(&mut data) + .unwrap(); + + let nonce: [u8; 12] = nonce.into(); + let mut tag: [u8; 16] = tag.into(); + + // alter tag + if tag[3] == 0xFF { + tag[3] = 0x00; + } else { + tag[3] = 0xFF; + } + + // compute integrity hash after alteration above! It should still + // validate + let hash = integrity_hash(&[&nonce, &tag, &data]); + + // Build up the long read response, which should be long enough to + // trigger the deferred read path. + let mut responses = vec![]; + for i in 0..blocks { + responses.push(ReadResponse { + eid: 0, + offset: Block::new_512(offset.value + i as u64), + data: BytesMut::from(&data[..]), + block_contexts: vec![BlockContext { + encryption_context: Some( + crucible_protocol::EncryptionContext { nonce, tag }, + ), + hash, + }], + }); + } + let responses = Ok(responses); + + // This defers decryption to a separate thread, because the read is + // large. This won't panic, because decryption failing just populates + // the message with an error. up.apply(UpstairsAction::Downstairs(DownstairsAction::Client { client_id: ClientId::new(0), action: ClientAction::Response(Message::ReadResponse { @@ -3644,9 +3877,24 @@ pub(crate) mod test { }), })) .await; - // no panic, great work everyone + + // Prepare to receive the message with an invalid tag + let fut = up.await_deferred_msgs(); + + let result = std::panic::AssertUnwindSafe(fut).catch_unwind().await; + assert!(result.is_err()); + let r = result + .as_ref() + .unwrap_err() + .downcast_ref::() + .unwrap(); + assert!( + r.contains("DecryptionError"), + "panic for the wrong reason: {r}" + ); } + /// Confirm that an offloaded decryption also panics (eventually) #[tokio::test] async fn bad_decryption_means_panic() { let mut up = make_encrypted_upstairs();