From beb6b68f73176485bea82fb3a06a6d96ed407945 Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Mon, 28 Oct 2024 10:13:01 -0400 Subject: [PATCH] Remove separate validation array --- upstairs/src/buffer.rs | 36 ++++++++++++--------------- upstairs/src/client.rs | 41 +++++++++++++----------------- upstairs/src/deferred.rs | 51 ++++++++++++++++++-------------------- upstairs/src/downstairs.rs | 27 ++++---------------- upstairs/src/guest.rs | 19 +++++++------- upstairs/src/lib.rs | 21 ++++------------ upstairs/src/test.rs | 26 ++++++++----------- upstairs/src/upstairs.rs | 9 +++---- 8 files changed, 91 insertions(+), 139 deletions(-) diff --git a/upstairs/src/buffer.rs b/upstairs/src/buffer.rs index d4656835a..bfc7b449d 100644 --- a/upstairs/src/buffer.rs +++ b/upstairs/src/buffer.rs @@ -1,5 +1,4 @@ // Copyright 2023 Oxide Computer Company -use crate::RawReadResponse; use bytes::{Bytes, BytesMut}; use crucible_protocol::ReadBlockContext; use itertools::Itertools; @@ -141,14 +140,17 @@ impl Buffer { /// # Panics /// The response data length must be the same as our buffer length (which /// must be an even multiple of block size, ensured at construction) - pub(crate) fn write_read_response(&mut self, response: RawReadResponse) { - assert!(response.data.len() == self.data.len()); - assert_eq!(response.data.len() % self.block_size, 0); + pub(crate) fn write_read_response( + &mut self, + blocks: &[ReadBlockContext], + data: &mut BytesMut, + ) { + assert!(data.len() == self.data.len()); + assert_eq!(data.len() % self.block_size, 0); let bs = self.block_size; // Build contiguous chunks which are all owned, to copy in bulk - for (empty, mut group) in &response - .blocks + for (empty, mut group) in &blocks .iter() .enumerate() .chunk_by(|(_i, b)| matches!(b, ReadBlockContext::Empty)) @@ -164,16 +166,13 @@ impl Buffer { // Special case: if the entire buffer is owned, then we swap it // instead of copying element-by-element. - if count == response.blocks.len() - && self.data.len() == response.data.len() - { - self.data = response.data; + if count == blocks.len() && self.data.len() == data.len() { + self.data = std::mem::take(data); break; } else { // Otherwise, just copy the sub-region - self.data[(block * bs)..][..(count * bs)].copy_from_slice( - &response.data[(block * bs)..][..(count * bs)], - ); + self.data[(block * bs)..][..(count * bs)] + .copy_from_slice(&data[(block * bs)..][..(count * bs)]); } } } @@ -493,7 +492,7 @@ mod test { let mut rng = rand::thread_rng(); rng.fill_bytes(&mut data); - let blocks = (0..10) + let blocks: Vec<_> = (0..10) .map(|i| { if f(i) { ReadBlockContext::Unencrypted { hash: 123 } @@ -503,10 +502,7 @@ mod test { }) .collect(); - buf.write_read_response(RawReadResponse { - blocks, - data: data.clone(), - }); + buf.write_read_response(&blocks, &mut data.clone()); for i in 0..10 { let buf_chunk = &buf[i * 512..][..512]; @@ -564,12 +560,12 @@ mod test { let mut rng = rand::thread_rng(); rng.fill_bytes(&mut data); - let blocks = (0..10) + let blocks: Vec<_> = (0..10) .map(|_| ReadBlockContext::Unencrypted { hash: 123 }) .collect(); let prev_data_ptr = data.as_ptr(); - buf.write_read_response(RawReadResponse { blocks, data }); + buf.write_read_response(&blocks, &mut data); assert_eq!(buf.data.as_ptr(), prev_data_ptr); } diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index eb60efb7c..15b36684c 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -5,7 +5,6 @@ use crate::{ ClientIOStateCount, ClientId, CrucibleDecoder, CrucibleError, DownstairsIO, DsState, EncryptionContext, IOState, IOop, JobId, Message, RawReadResponse, ReconcileIO, ReconcileIOState, RegionDefinitionStatus, RegionMetadata, - Validation, }; use crucible_common::{x509::TLSContext, ExtentId, VerboseTimeout}; use crucible_protocol::{ @@ -1226,7 +1225,6 @@ impl DownstairsClient { ds_id: JobId, job: &mut DownstairsIO, responses: Result, - read_validations: Vec, deactivate: bool, extent_info: Option, ) -> bool { @@ -1357,7 +1355,7 @@ impl DownstairsClient { */ let read_data = responses.unwrap(); assert!(!read_data.blocks.is_empty()); - if job.read_validations != read_validations { + if job.data.as_ref().unwrap().blocks != read_data.blocks { // XXX This error needs to go to Nexus // XXX This will become the "force all downstairs // to stop and refuse to restart" mode. @@ -1371,8 +1369,8 @@ impl DownstairsClient { self.client_id, ds_id, self.cfg.session_id, - job.read_validations, - read_validations, + job.data.as_ref().unwrap().blocks, + read_data.blocks, start_eid, start_offset, job.state, @@ -1419,9 +1417,7 @@ impl DownstairsClient { assert!(extent_info.is_none()); if jobs_completed_ok == 1 { assert!(job.data.is_none()); - assert!(job.read_validations.is_empty()); job.data = Some(read_data); - job.read_validations = read_validations; assert!(!job.acked); ackable = true; debug!(self.log, "Read AckReady {}", ds_id.0); @@ -1433,7 +1429,8 @@ impl DownstairsClient { * that and verify they are the same. */ debug!(self.log, "Read already AckReady {ds_id}"); - if job.read_validations != read_validations { + let job_blocks = &job.data.as_ref().unwrap().blocks; + if job_blocks != &read_data.blocks { // XXX This error needs to go to Nexus // XXX This will become the "force all downstairs // to stop and refuse to restart" mode. @@ -1444,8 +1441,8 @@ impl DownstairsClient { job: {:?}", self.client_id, ds_id, - job.read_validations, - read_validations, + job_blocks, + read_data.blocks, job, ); } @@ -2947,18 +2944,15 @@ fn update_net_done_probes(m: &Message, cid: ClientId) { } /// Returns: -/// - `Ok(Some(ctx))` for successfully decrypted data -/// - `Ok(None)` if there is no block context and the block is all 0 +/// - `Ok(())` for successfully decrypted data, or if there is no block context +/// and the block is all 0s (i.e. a valid empty block) /// - `Err(..)` otherwise -/// -/// The return value of this will be stored with the job, and compared -/// between each read. pub(crate) fn validate_encrypted_read_response( block_context: Option, data: &mut [u8], encryption_context: &EncryptionContext, log: &Logger, -) -> Result { +) -> Result<(), CrucibleError> { // XXX because we don't have block generation numbers, an attacker // downstairs could: // @@ -2980,7 +2974,7 @@ pub(crate) fn validate_encrypted_read_response( // // XXX if it's not a blank block, we may be under attack? if data.iter().all(|&x| x == 0) { - return Ok(Validation::Empty); + return Ok(()); } else { error!(log, "got empty block context with non-blank block"); return Err(CrucibleError::MissingBlockContext); @@ -3002,7 +2996,7 @@ pub(crate) fn validate_encrypted_read_response( Tag::from_slice(&ctx.tag[..]), ); if decryption_result.is_ok() { - Ok(Validation::Encrypted(ctx)) + Ok(()) } else { error!(log, "Decryption failed!"); Err(CrucibleError::DecryptionError) @@ -3010,21 +3004,20 @@ pub(crate) fn validate_encrypted_read_response( } /// Returns: -/// - Ok(Some(valid_hash)) where the integrity hash matches -/// - Ok(None) where there is no integrity hash in the response and the -/// block is all 0 +/// - Ok(()) where the integrity hash matches (or the integrity hash is missing +/// and the block is all 0s, indicating an empty block) /// - Err otherwise pub(crate) fn validate_unencrypted_read_response( block_hash: Option, data: &mut [u8], log: &Logger, -) -> Result { +) -> Result<(), CrucibleError> { if let Some(hash) = block_hash { // check integrity hashes - make sure it is correct let computed_hash = integrity_hash(&[data]); if computed_hash == hash { - Ok(Validation::Unencrypted(computed_hash)) + Ok(()) } else { // No integrity hash was correct for this response error!(log, "No match computed hash:0x{:x}", computed_hash,); @@ -3048,7 +3041,7 @@ pub(crate) fn validate_unencrypted_read_response( // // XXX if it's not a blank block, we may be under attack? if data[..].iter().all(|&x| x == 0) { - Ok(Validation::Empty) + Ok(()) } else { error!(log, "got empty block context with non-blank block"); Err(CrucibleError::MissingBlockContext) diff --git a/upstairs/src/deferred.rs b/upstairs/src/deferred.rs index 9050c7dd6..cbe43932e 100644 --- a/upstairs/src/deferred.rs +++ b/upstairs/src/deferred.rs @@ -5,11 +5,11 @@ use std::sync::Arc; use crate::{ backpressure::BackpressureGuard, client::ConnectionId, upstairs::UpstairsConfig, BlockContext, BlockOp, ClientData, ClientId, - ImpactedBlocks, Message, RawWrite, Validation, + ImpactedBlocks, Message, RawWrite, }; use bytes::BytesMut; use crucible_common::{integrity_hash, CrucibleError, RegionDefinition}; -use crucible_protocol::ReadBlockContext; +use crucible_protocol::{ReadBlockContext, ReadResponseHeader}; use futures::{ future::{ready, Either, Ready}, stream::FuturesOrdered, @@ -192,11 +192,13 @@ impl DeferredWrite { #[derive(Debug)] pub(crate) struct DeferredMessage { + /// Message received from the client + /// + /// If the deferred message was a read, then the data and context blocks in + /// this [Message::ReadResponse] has been validated (and decrypted if + /// necessary). pub message: Message, - /// If this was a `ReadResponse`, then the validation result is stored here - pub hashes: Vec, - pub client_id: ClientId, /// See `DeferredRead::connection_id` @@ -205,8 +207,8 @@ pub(crate) struct DeferredMessage { /// Standalone data structure which can perform decryption pub(crate) struct DeferredRead { - /// Message, which must be a `ReadResponse` - pub message: Message, + pub header: ReadResponseHeader, + pub data: BytesMut, /// Unique ID for this particular connection to the downstairs /// @@ -225,20 +227,16 @@ impl DeferredRead { /// Consume the `DeferredRead` and perform decryption /// /// If decryption fails, then the resulting `Message` has an error in the - /// `responses` field, and `hashes` is empty. + /// `responses` field. pub fn run(mut self) -> DeferredMessage { use crate::client::{ validate_encrypted_read_response, validate_unencrypted_read_response, }; - let Message::ReadResponse { header, data } = &mut self.message else { - panic!("invalid DeferredRead"); - }; - let mut hashes = vec![]; - if let Ok(rs) = header.blocks.as_mut() { - assert_eq!(data.len() % rs.len(), 0); - let block_size = data.len() / rs.len(); + if let Ok(rs) = self.header.blocks.as_mut() { + assert_eq!(self.data.len() % rs.len(), 0); + let block_size = self.data.len() / rs.len(); for (i, r) in rs.iter_mut().enumerate() { let v = if let Some(ctx) = &self.cfg.encryption_context { match r { @@ -256,7 +254,7 @@ impl DeferredRead { .and_then(|r| { validate_encrypted_read_response( r, - &mut data[i * block_size..][..block_size], + &mut self.data[i * block_size..][..block_size], ctx, &self.log, ) @@ -279,28 +277,27 @@ impl DeferredRead { .and_then(|r| { validate_unencrypted_read_response( r, - &mut data[i * block_size..][..block_size], + &mut self.data[i * block_size..][..block_size], &self.log, ) }) }; - match v { - Ok(hash) => hashes.push(hash), - Err(e) => { - error!(self.log, "decryption failure: {e:?}"); - header.blocks = Err(e); - hashes.clear(); - break; - } + if let Err(e) = v { + error!(self.log, "decryption failure: {e:?}"); + self.header.blocks = Err(e); + break; } } } + let message = Message::ReadResponse { + header: self.header, + data: self.data, + }; DeferredMessage { client_id: self.client_id, - message: self.message, + message, connection_id: self.connection_id, - hashes, } } } diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index 049b88ffd..23a889169 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -19,7 +19,7 @@ use crate::{ DownstairsMend, DsState, ExtentFix, ExtentRepairIDs, IOState, IOStateCount, IOop, ImpactedBlocks, JobId, Message, RawReadResponse, RawWrite, ReconcileIO, ReconciliationId, RegionDefinition, ReplaceResult, - SnapshotDetails, Validation, WorkSummary, + SnapshotDetails, WorkSummary, }; use crucible_common::{ impacted_blocks::ImpactedAddr, BlockIndex, BlockOffset, ExtentId, @@ -425,8 +425,6 @@ impl Downstairs { let done = self.ds_active.get_mut(&ds_id).unwrap(); assert!(!done.acked); - let data = done.data.take(); - done.acked = true; let r = done.result(); @@ -476,6 +474,10 @@ impl Downstairs { // Copy (if present) read data back to the guest buffer they // provided to us, and notify any waiters. if let Some(res) = done.res.take() { + let data = done + .data + .as_mut() + .map(|v| (v.blocks.as_slice(), &mut v.data)); res.transfer_and_notify(data, r); } @@ -2134,7 +2136,6 @@ impl Downstairs { res, replay: false, data: None, - read_validations: vec![], backpressure_guard: bp_guard, }, ); @@ -2810,7 +2811,6 @@ impl Downstairs { &mut self, client_id: ClientId, m: Message, - read_validations: Vec, up_state: &UpstairsState, ) -> Result<(), CrucibleError> { let (upstairs_id, session_id, ds_id, read_data, extent_info) = match m { @@ -3061,7 +3061,6 @@ impl Downstairs { ds_id, client_id, read_data, - read_validations, up_state, extent_info, ); @@ -3108,17 +3107,10 @@ impl Downstairs { ) -> bool { let was_ackable = self.ackable_work.contains(&ds_id); - // Make up dummy values for hashes, since they're not actually checked - // here (besides confirming that we have the correct number). - let hashes = match &responses { - Ok(r) => vec![Validation::Unencrypted(0); r.blocks.len()], - Err(..) => vec![], - }; self.process_io_completion_inner( ds_id, client_id, responses, - hashes, up_state, extent_info, ); @@ -3132,7 +3124,6 @@ impl Downstairs { ds_id: JobId, client_id: ClientId, responses: Result, - read_validations: Vec, up_state: &UpstairsState, extent_info: Option, ) { @@ -3161,18 +3152,10 @@ impl Downstairs { return; }; - // Sanity-checking for a programmer error during offloaded decryption. - // If we didn't get one hash per read block, then `responses` must - // have been converted into `Err(..)`. - if let Ok(reads) = &responses { - assert_eq!(reads.blocks.len(), read_validations.len()); - } - if self.clients[client_id].process_io_completion( ds_id, job, responses, - read_validations, deactivate, extent_info, ) { diff --git a/upstairs/src/guest.rs b/upstairs/src/guest.rs index 84533334a..077f4c676 100644 --- a/upstairs/src/guest.rs +++ b/upstairs/src/guest.rs @@ -9,7 +9,7 @@ use crate::{ backpressure::{ BackpressureAmount, BackpressureConfig, SharedBackpressureAmount, }, - BlockIO, BlockOp, BlockOpWaiter, BlockRes, Buffer, RawReadResponse, + BlockIO, BlockOp, BlockOpWaiter, BlockRes, Buffer, ReadBlockContext, ReplaceResult, UpstairsAction, }; use crucible_client_types::RegionExtentInfo; @@ -36,16 +36,15 @@ pub(crate) enum GuestBlockRes { } impl GuestBlockRes { - /* - * When all downstairs jobs have completed, and all buffers have been - * attached to the GtoS struct, we can do the final copy of the data - * from upstairs memory back to the guest's memory. Notify corresponding - * BlockOpWaiter if required - */ + /// Copy data to guest buffers and notify the guest + /// + /// The `downstairs_response` must be present if this was a `Read` job. In + /// this case, the `&mut BytesMut` argument may be taken (to reduce `memcpy` + /// overhead). #[instrument] pub(crate) fn transfer_and_notify( self, - downstairs_response: Option, + downstairs_response: Option<(&[ReadBlockContext], &mut BytesMut)>, result: Result<(), CrucibleError>, ) { /* @@ -60,13 +59,13 @@ impl GuestBlockRes { */ match self { GuestBlockRes::Read(mut buffer, res) => { - if let Some(downstairs_response) = downstairs_response { + if let Some((blocks, data)) = downstairs_response { // XXX don't do if result.is_err()? // Copy over into guest memory. let _ignored = span!(Level::TRACE, "copy to guest buffer").entered(); - buffer.write_read_response(downstairs_response); + buffer.write_read_response(blocks, data); } else { // Should this panic? If the caller is requesting a // transfer, the guest_buffer should exist. If it does not diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index 1ae6a74a0..02df2783d 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -882,17 +882,6 @@ impl std::fmt::Display for DsState { } } -/// Results of validating a single block -#[derive(Copy, Clone, Eq, PartialEq, Debug)] -pub(crate) enum Validation { - /// The block has no hash / context and is empty - Empty, - /// For an unencrypted block, the result is the hash - Unencrypted(u64), - /// For an encrypted block, the result is the tag + nonce - Encrypted(crucible_protocol::EncryptionContext), -} - /* * A unit of work for downstairs that is put into the hashmap. */ @@ -918,12 +907,12 @@ struct DownstairsIO { */ replay: bool, - /* - * If the operation is a Read, this holds the resulting buffer - * The validation vec holds the validation results for the read - */ + /// If the operation is a Read, this holds the resulting buffer and hashes + /// + /// The buffer _may_ be removed during the transfer to the Guest, to reduce + /// `memcpy` overhead. If this occurs, the hashes remain present for + /// consistency checking with subsequent replies. data: Option, - read_validations: Vec, /// Handle for this job's contribution to guest backpressure /// diff --git a/upstairs/src/test.rs b/upstairs/src/test.rs index 2faa9cf73..d046fbdc2 100644 --- a/upstairs/src/test.rs +++ b/upstairs/src/test.rs @@ -384,14 +384,14 @@ pub(crate) mod up_test { }; // Validate it - let successful_hash = validate_encrypted_read_response( + let r = validate_encrypted_read_response( Some(ctx), &mut data, &Arc::new(context), &csl(), - )?; + ); - assert_eq!(successful_hash, Validation::Encrypted(ctx)); + assert!(r.is_ok()); // `validate_encrypted_read_response` will mutate the read // response's data value, make sure it decrypted @@ -430,15 +430,15 @@ pub(crate) mod up_test { data.resize(512, 0u8); // Validate the read response - let successful_hash = validate_encrypted_read_response( + let r = validate_encrypted_read_response( None, &mut data, &Arc::new(context), &csl(), - )?; + ); // The above function will return None for a blank block - assert_eq!(successful_hash, Validation::Empty); + assert!(r.is_ok()); assert_eq!(data, vec![0u8; 512]); Ok(()) @@ -456,16 +456,13 @@ pub(crate) mod up_test { let original_data = data.clone(); // Validate it - let successful_hash = validate_unencrypted_read_response( + let r = validate_unencrypted_read_response( Some(read_response_hash), &mut data, &csl(), - )?; - - assert_eq!( - successful_hash, - Validation::Unencrypted(read_response_hash) ); + + assert!(r.is_ok()); assert_eq!(data, original_data); Ok(()) @@ -480,10 +477,9 @@ pub(crate) mod up_test { let original_data = data.clone(); // Validate a read response - let successful_hash = - validate_unencrypted_read_response(None, &mut data, &csl())?; + let r = validate_unencrypted_read_response(None, &mut data, &csl()); - assert_eq!(successful_hash, Validation::Empty); + assert!(r.is_ok()); assert_eq!(data, original_data); Ok(()) diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 6ff4cb55e..884143e5d 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -1502,7 +1502,7 @@ impl Upstairs { // Defer the message if it's a (large) read that needs // decryption, or there are other deferred messages in the queue // (to preserve order). Otherwise, handle it immediately. - if let Message::ReadResponse { header, .. } = &m { + if let Message::ReadResponse { header, data } = m { // Any read larger than `MIN_DEFER_SIZE_BYTES` constant // should be deferred to the worker pool; smaller reads can // be processed in-thread (since the overhead isn't worth @@ -1524,7 +1524,8 @@ impl Upstairs { }; let dr = DeferredRead { - message: m, + header, + data, client_id, connection_id: id, cfg: self.cfg.clone(), @@ -1543,7 +1544,6 @@ impl Upstairs { } else { let dm = DeferredMessage { message: m, - hashes: vec![], client_id, connection_id: id, }; @@ -1569,7 +1569,7 @@ impl Upstairs { } fn on_client_message(&mut self, dm: DeferredMessage) { - let (client_id, m, hashes) = (dm.client_id, dm.message, dm.hashes); + let (client_id, m) = (dm.client_id, dm.message); // It's possible for a deferred message to arrive **after** we have // disconnected from this particular Downstairs. In that case, we want @@ -1605,7 +1605,6 @@ impl Upstairs { let r = self.downstairs.process_io_completion( client_id, m, - hashes, &self.state, ); if let Err(e) = r {