From be44cf64228b6ed0d4ef16dd28deb43d7f82d323 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sun, 21 Jul 2024 19:58:25 +0200 Subject: [PATCH] src: clean up the base library modules --- src/bin/toysql.rs | 2 +- src/bin/workload.rs | 4 +-- src/client.rs | 75 +++++++++++++++++++--------------------- src/error.rs | 33 +++++++++--------- src/lib.rs | 1 - src/raft/log.rs | 2 +- src/server.rs | 56 ++++++++++++++---------------- src/storage/mvcc.rs | 2 +- tests/e2e/testcluster.rs | 2 +- 9 files changed, 84 insertions(+), 93 deletions(-) diff --git a/src/bin/toysql.rs b/src/bin/toysql.rs index e28cdd8bb..53370e97b 100644 --- a/src/bin/toysql.rs +++ b/src/bin/toysql.rs @@ -58,7 +58,7 @@ impl ToySQL { /// Creates a new ToySQL REPL for the given server host and port fn new(host: &str, port: u16) -> Result { Ok(Self { - client: Client::new((host, port))?, + client: Client::connect((host, port))?, editor: Editor::new()?, history_path: std::env::var_os("HOME") .map(|home| std::path::Path::new(&home).join(".toysql.history")), diff --git a/src/bin/workload.rs b/src/bin/workload.rs index 8a2245703..a1bdc22f8 100644 --- a/src/bin/workload.rs +++ b/src/bin/workload.rs @@ -79,7 +79,7 @@ impl Runner { /// Runs the specified workload. fn run(self, workload: W) -> Result<()> { let mut rng = rand::rngs::StdRng::seed_from_u64(self.seed); - let mut client = Client::new(&self.hosts[0])?; + let mut client = Client::connect(&self.hosts[0])?; // Set up a histogram recording txn latencies as nanoseconds. The // buckets range from 0.001s to 10s. @@ -103,7 +103,7 @@ impl Runner { let (done_tx, done_rx) = crossbeam::channel::bounded::<()>(0); for addr in self.hosts.iter().cycle().take(self.concurrency) { - let mut client = Client::new(addr)?; + let mut client = Client::connect(addr)?; let mut recorder = hist.recorder(); let work_rx = work_rx.clone(); let done_tx = done_tx.clone(); diff --git a/src/client.rs b/src/client.rs index 5c3ac46d9..3cad86c8a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,5 +1,3 @@ -use std::io::Write as _; - use crate::encoding::Value as _; use crate::errdata; use crate::error::{Error, Result}; @@ -9,66 +7,72 @@ use crate::sql::types::Table; use crate::storage::mvcc; use rand::Rng; +use std::io::Write as _; -/// A toyDB client +/// A toyDB client. Connects to a server via TCP and submits SQL statements and +/// other requests. pub struct Client { + /// Inbound response stream. reader: std::io::BufReader, + /// Outbound request stream. writer: std::io::BufWriter, + /// The current transaction, if any. txn: Option, } impl Client { - /// Creates a new client - pub fn new(addr: impl std::net::ToSocketAddrs) -> Result { + /// Connects to a toyDB server, creating a new client. + pub fn connect(addr: impl std::net::ToSocketAddrs) -> Result { let socket = std::net::TcpStream::connect(addr)?; let reader = std::io::BufReader::new(socket.try_clone()?); let writer = std::io::BufWriter::new(socket); Ok(Self { reader, writer, txn: None }) } - /// Call a server method - fn call(&mut self, request: Request) -> Result { + /// Sends a request to the server, returning the response. + fn request(&mut self, request: Request) -> Result { request.encode_into(&mut self.writer)?; self.writer.flush()?; - Result::::decode_from(&mut self.reader)? + Result::decode_from(&mut self.reader)? } - /// Executes a query - pub fn execute(&mut self, query: &str) -> Result { - let resultset = match self.call(Request::Execute(query.into()))? { - Response::Execute(rs) => rs, + /// Executes a SQL statement. + pub fn execute(&mut self, statement: &str) -> Result { + let result = match self.request(Request::Execute(statement.to_string()))? { + Response::Execute(result) => result, response => return errdata!("unexpected response {response:?}"), }; - match &resultset { + // Update the transaction state. + match &result { StatementResult::Begin { state } => self.txn = Some(state.clone()), StatementResult::Commit { .. } => self.txn = None, StatementResult::Rollback { .. } => self.txn = None, _ => {} } - Ok(resultset) + Ok(result) } - /// Fetches the table schema as SQL + /// Fetches a table schema. pub fn get_table(&mut self, table: &str) -> Result { - match self.call(Request::GetTable(table.into()))? { - Response::GetTable(t) => Ok(t), - resp => errdata!("unexpected response: {resp:?}"), + match self.request(Request::GetTable(table.to_string()))? { + Response::GetTable(table) => Ok(table), + response => errdata!("unexpected response: {response:?}"), } } - /// Lists database tables + /// Lists database tables. pub fn list_tables(&mut self) -> Result> { - match self.call(Request::ListTables)? { - Response::ListTables(t) => Ok(t), - resp => errdata!("unexpected response: {resp:?}"), + match self.request(Request::ListTables)? { + Response::ListTables(tables) => Ok(tables), + response => errdata!("unexpected response: {response:?}"), } } - /// Checks server status + /// Returns server status. pub fn status(&mut self) -> Result { - match self.call(Request::Status)? { - Response::Status(s) => Ok(s), - resp => errdata!("unexpected response: {resp:?}"), + match self.request(Request::Status)? { + Response::Status(status) => Ok(status), + response => errdata!("unexpected response: {response:?}"), } } @@ -79,27 +83,20 @@ impl Client { /// Runs the given closure, automatically retrying serialization and abort /// errors. If a transaction is open following an error, it is automatically - /// rolled back. It is the caller's responsibility to use a transaction - /// in the closure where appropriate (i.e. when it is not idempotent). - /// - /// TODO: test this. - pub fn with_retry(&mut self, mut f: F) -> Result - where - F: FnMut(&mut Client) -> Result, - { + /// rolled back. It is the caller's responsibility to use a transaction in + /// the closure where appropriate (i.e. when it is not idempotent). + pub fn with_retry(&mut self, f: impl Fn(&mut Client) -> Result) -> Result { const MAX_RETRIES: u32 = 10; const MIN_WAIT: u64 = 10; const MAX_WAIT: u64 = 2_000; - let mut retries: u32 = 0; loop { match f(self) { - Ok(r) => return Ok(r), + Ok(result) => return Ok(result), Err(Error::Serialization | Error::Abort) if retries < MAX_RETRIES => { if self.txn().is_some() { self.execute("ROLLBACK")?; } - // Use exponential backoff starting at MIN_WAIT doubling up // to MAX_WAIT, but randomize the wait time in this interval // to reduce the chance of collisions. @@ -108,11 +105,11 @@ impl Client { std::thread::sleep(std::time::Duration::from_millis(wait)); retries += 1; } - Err(e) => { + Err(error) => { if self.txn().is_some() { self.execute("ROLLBACK").ok(); // ignore rollback error } - return Err(e); + return Err(error); } } } diff --git a/src/error.rs b/src/error.rs index fe0c2f380..211f9ab4b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -4,7 +4,8 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum Error { /// The operation was aborted and must be retried. This typically happens - /// with e.g. Raft leader changes. + /// with e.g. Raft leader changes. This is used instead of implementing + /// complex retry logic and replay protection in Raft. Abort, /// Invalid data, typically decoding errors or unexpected internal values. InvalidData(String), @@ -35,23 +36,23 @@ impl std::fmt::Display for Error { } impl Error { - /// Returns whether the error is considered deterministic. State machine - /// application needs to know whether a command failure is deterministic on - /// the input command -- if it is, the command can be considered applied and - /// the error returned to the client, but otherwise the state machine must - /// panic to prevent replica divergence. + /// Returns whether the error is considered deterministic. Raft state + /// machine application needs to know whether a command failure is + /// deterministic on the input command -- if it is, the command can be + /// considered applied and the error returned to the client, but otherwise + /// the state machine must panic to prevent replica divergence. pub fn is_deterministic(&self) -> bool { match self { // Aborts don't happen during application, only leader changes. But - // we consider them non-deterministic in case a abort should happen + // we consider them non-deterministic in case an abort should happen // unexpectedly below Raft. Error::Abort => false, // Possible data corruption local to this node. Error::InvalidData(_) => false, - // Input errors are (likely) deterministic. We could employ command - // checksums to be sure. + // Input errors are (likely) deterministic. They might not be in + // case data was corrupted in flight, but we ignore this case. Error::InvalidInput(_) => true, - // IO errors are typically node-local. + // IO errors are typically local to the node (e.g. faulty disk). Error::IO(_) => false, // Write commands in read-only transactions are deterministic. Error::ReadOnly => true, @@ -61,19 +62,19 @@ impl Error { } } -/// Constructs an Error::InvalidData via format!() and into(). +/// Constructs an Error::InvalidData for the given format string. #[macro_export] macro_rules! errdata { ($($args:tt)*) => { $crate::error::Error::InvalidData(format!($($args)*)).into() }; } -/// Constructs an Error::InvalidInput via format!() and into(). +/// Constructs an Error::InvalidInput for the given format string. #[macro_export] macro_rules! errinput { ($($args:tt)*) => { $crate::error::Error::InvalidInput(format!($($args)*)).into() }; } -/// Result returning Error. +/// A toyDB Result returning Error. pub type Result = std::result::Result; impl From for Result { @@ -132,7 +133,7 @@ impl From> for Error { impl From for Error { fn from(err: hdrhistogram::CreationError) -> Self { - panic!("{err}") + panic!("{err}") // faulty code } } @@ -150,13 +151,13 @@ impl From for Error { impl From for Error { fn from(err: log::SetLoggerError) -> Self { - panic!("{err}") + panic!("{err}") // faulty code } } impl From for Error { fn from(err: regex::Error) -> Self { - panic!("{err}") + panic!("{err}") // faulty code } } diff --git a/src/lib.rs b/src/lib.rs index f59302862..199c057be 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,5 @@ #![warn(clippy::all)] #![allow(clippy::new_without_default)] -#![allow(clippy::unneeded_field_pattern)] pub mod client; pub mod encoding; diff --git a/src/raft/log.rs b/src/raft/log.rs index 49e977b92..96cc5b503 100644 --- a/src/raft/log.rs +++ b/src/raft/log.rs @@ -88,7 +88,7 @@ pub struct Log { /// The underlying storage engine. Uses a trait object instead of generics, /// to allow runtime selection of the engine and avoid propagating the /// generic type parameters throughout Raft. - pub(super) engine: Box, + pub engine: Box, /// The current term. term: Term, /// Our leader vote in the current term, if any. diff --git a/src/server.rs b/src/server.rs index 5d446c567..7d39caab5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,5 +1,5 @@ use crate::encoding::{self, Value as _}; -use crate::error::{Error, Result}; +use crate::error::Result; use crate::raft; use crate::sql; use crate::sql::engine::{Catalog as _, Engine as _, StatementResult}; @@ -22,14 +22,14 @@ const RAFT_PEER_RETRY_INTERVAL: std::time::Duration = std::time::Duration::from_ /// A toyDB server. Routes messages to/from an inner Raft node. /// -/// - Listens for inbound Raft connections via TCP and passes messages to the -/// local Raft node. +/// * Listens for inbound SQL connections from clients via TCP and passes +/// requests to the local Raft node. /// -/// - Connects to Raft peers via TCP and sends outbound messages from the -/// local Raft node. +/// * Listens for inbound Raft connections from other toyDB nodes via TCP and +/// passes messages to the local Raft node. /// -/// - Listens for inbound SQL connections via TCP and passes requests to -/// the local Raft node. +/// * Connects to other toyDB nodes via TCP and sends outbound Raft messages +/// from the local Raft node. pub struct Server { /// The inner Raft node. node: raft::Node, @@ -48,18 +48,15 @@ impl Server { raft_state: Box, ) -> Result { let (node_tx, node_rx) = crossbeam::channel::unbounded(); - Ok(Self { - node: raft::Node::new( - id, - peers.keys().copied().collect(), - raft_log, - raft_state, - node_tx, - raft::Options::default(), - )?, - peers, - node_rx, - }) + let node = raft::Node::new( + id, + peers.keys().copied().collect(), + raft_log, + raft_state, + node_tx, + raft::Options::default(), + )?; + Ok(Self { node, peers, node_rx }) } /// Serves Raft and SQL requests indefinitely. Consumes the server. @@ -80,9 +77,8 @@ impl Server { // Serve inbound Raft connections. s.spawn(move || Self::raft_accept(raft_listener, raft_step_tx)); - // Establish outbound Raft connections. + // Establish outbound Raft connections to peers. let mut raft_peers_tx = HashMap::new(); - for (id, addr) in self.peers.into_iter() { let (raft_peer_tx, raft_peer_rx) = crossbeam::channel::bounded(RAFT_PEER_CHANNEL_CAPACITY); @@ -114,7 +110,7 @@ impl Server { fn raft_accept(listener: TcpListener, raft_step_tx: Sender) { std::thread::scope(|s| loop { let (socket, peer) = match listener.accept() { - Ok(sp) => sp, + Ok((socket, peer)) => (socket, peer), Err(err) => { error!("Raft peer accept failed: {err}"); continue; @@ -154,9 +150,7 @@ impl Server { } }; while let Ok(message) = raft_node_rx.recv() { - if let Err(err) = message - .encode_into(&mut socket) - .and_then(|()| socket.flush().map_err(Error::from)) + if let Err(err) = message.encode_into(&mut socket).and_then(|_| Ok(socket.flush()?)) { error!("Failed sending to Raft peer {addr}: {err}"); break; @@ -168,17 +162,17 @@ impl Server { /// Routes Raft messages: /// - /// - node_rx: outbound messages from the local Raft node. Routed to peers + /// * node_rx: outbound messages from the local Raft node. Routed to peers /// via TCP, or to local clients via a response channel. /// - /// - request_rx: inbound requests from local SQL clients. Stepped into + /// * request_rx: inbound requests from local SQL clients. Stepped into /// the local Raft node as ClientRequest messages. Responses are returned /// via the provided response channel. /// - /// - peers_rx: inbound messages from remote Raft peers. Stepped into the + /// * peers_rx: inbound messages from remote Raft peers. Stepped into the /// local Raft node. /// - /// - peers_tx: outbound per-peer channels sent via TCP connections. + /// * peers_tx: outbound per-peer channels sent via TCP connections. /// Messages from the local node's node_rx are sent here. /// /// Panics on any errors, since the Raft node can't recover from failed @@ -252,7 +246,7 @@ impl Server { fn sql_accept(id: raft::NodeID, listener: TcpListener, sql_engine: sql::engine::Raft) { std::thread::scope(|s| loop { let (socket, peer) = match listener.accept() { - Ok(sp) => sp, + Ok((socket, peer)) => (socket, peer), Err(err) => { error!("Client accept failed: {err}"); continue; @@ -269,7 +263,7 @@ impl Server { }) } - /// Processes a client SQL session, by executing SQL statements against the + /// Processes a client SQL session, executing SQL statements against the /// Raft node. fn sql_session( id: raft::NodeID, diff --git a/src/storage/mvcc.rs b/src/storage/mvcc.rs index 76c13aec3..825e9fc57 100644 --- a/src/storage/mvcc.rs +++ b/src/storage/mvcc.rs @@ -225,7 +225,7 @@ impl<'a> encoding::Key<'a> for KeyPrefix<'a> {} /// MVCC engine applies commands one at a time from the Raft log, which will /// serialize them anyway. pub struct MVCC { - pub(crate) engine: Arc>, + pub engine: Arc>, } impl MVCC { diff --git a/tests/e2e/testcluster.rs b/tests/e2e/testcluster.rs index bd84536e9..2f320db5c 100644 --- a/tests/e2e/testcluster.rs +++ b/tests/e2e/testcluster.rs @@ -153,7 +153,7 @@ impl TestCluster { /// Connects to the given cluster node. pub fn connect(&self, id: NodeID) -> Result { self.assert_id(id); - Client::new(self.node_address_sql(id)) + Client::connect(self.node_address_sql(id)) } /// Connects to a random cluster node.