Skip to content

Commit

Permalink
src: clean up the base library modules
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jul 21, 2024
1 parent a1d54aa commit 2cf2a2a
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 51 deletions.
7 changes: 3 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::io::Write as _;

use crate::encoding::Value as _;
use crate::errdata;
use crate::error::{Error, Result};
Expand All @@ -8,16 +6,17 @@ use crate::sql::engine::StatementResult;
use crate::sql::types::Table;

use rand::Rng;
use std::io::Write as _;

/// A toyDB client
/// A toyDB client. Connects to the server via TCP and submits requests.
pub struct Client {
reader: std::io::BufReader<std::net::TcpStream>,
writer: std::io::BufWriter<std::net::TcpStream>,
txn: Option<(u64, bool)>,
}

impl Client {
/// Creates a new client
/// Creates a new client, connecting to the server at addr.
pub fn new(addr: impl std::net::ToSocketAddrs) -> Result<Self> {
let socket = std::net::TcpStream::connect(addr)?;
let reader = std::io::BufReader::new(socket.try_clone()?);
Expand Down
33 changes: 17 additions & 16 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Error {
/// The operation was aborted and must be retried. This typically happens
/// with e.g. Raft leader changes.
/// with e.g. Raft leader changes. This is used instead of implementing
/// complex retry logic and replay protection in Raft.
Abort,
/// Invalid data, typically decoding errors or unexpected internal values.
InvalidData(String),
Expand Down Expand Up @@ -35,23 +36,23 @@ impl std::fmt::Display for Error {
}

impl Error {
/// Returns whether the error is considered deterministic. State machine
/// application needs to know whether a command failure is deterministic on
/// the input command -- if it is, the command can be considered applied and
/// the error returned to the client, but otherwise the state machine must
/// panic to prevent replica divergence.
/// Returns whether the error is considered deterministic. Raft state
/// machine application needs to know whether a command failure is
/// deterministic on the input command -- if it is, the command can be
/// considered applied and the error returned to the client, but otherwise
/// the state machine must panic to prevent replica divergence.
pub fn is_deterministic(&self) -> bool {
match self {
// Aborts don't happen during application, only leader changes. But
// we consider them non-deterministic in case a abort should happen
// we consider them non-deterministic in case an abort should happen
// unexpectedly below Raft.
Error::Abort => false,
// Possible data corruption local to this node.
Error::InvalidData(_) => false,
// Input errors are (likely) deterministic. We could employ command
// checksums to be sure.
// Input errors are (likely) deterministic. They might not be in
// case data was corrupted in flight, but we ignore this case.
Error::InvalidInput(_) => true,
// IO errors are typically node-local.
// IO errors are typically local to the node (e.g. faulty disk).
Error::IO(_) => false,
// Write commands in read-only transactions are deterministic.
Error::ReadOnly => true,
Expand All @@ -61,19 +62,19 @@ impl Error {
}
}

/// Constructs an Error::InvalidData via format!() and into().
/// Constructs an Error::InvalidData for the given format string.
#[macro_export]
macro_rules! errdata {
($($args:tt)*) => { $crate::error::Error::InvalidData(format!($($args)*)).into() };
}

/// Constructs an Error::InvalidInput via format!() and into().
/// Constructs an Error::InvalidInput for the given format string.
#[macro_export]
macro_rules! errinput {
($($args:tt)*) => { $crate::error::Error::InvalidInput(format!($($args)*)).into() };
}

/// Result returning Error.
/// A toyDB Result returning Error.
pub type Result<T> = std::result::Result<T, Error>;

impl<T> From<Error> for Result<T> {
Expand Down Expand Up @@ -132,7 +133,7 @@ impl<T> From<crossbeam::channel::TrySendError<T>> for Error {

impl From<hdrhistogram::CreationError> for Error {
fn from(err: hdrhistogram::CreationError) -> Self {
panic!("{err}")
panic!("{err}") // faulty code
}
}

Expand All @@ -150,13 +151,13 @@ impl From<log::ParseLevelError> for Error {

impl From<log::SetLoggerError> for Error {
fn from(err: log::SetLoggerError) -> Self {
panic!("{err}")
panic!("{err}") // faulty code
}
}

impl From<regex::Error> for Error {
fn from(err: regex::Error) -> Self {
panic!("{err}")
panic!("{err}") // faulty code
}
}

Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![warn(clippy::all)]
#![allow(clippy::new_without_default)]
#![allow(clippy::unneeded_field_pattern)]

pub mod client;
pub mod encoding;
Expand Down
2 changes: 1 addition & 1 deletion src/raft/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub struct Log {
/// The underlying storage engine. Uses a trait object instead of generics,
/// to allow runtime selection of the engine and avoid propagating the
/// generic type parameters throughout Raft.
pub(super) engine: Box<dyn storage::Engine>,
pub engine: Box<dyn storage::Engine>,
/// The current term.
term: Term,
/// Our leader vote in the current term, if any.
Expand Down
50 changes: 22 additions & 28 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::encoding::{self, Value as _};
use crate::error::{Error, Result};
use crate::error::Result;
use crate::raft;
use crate::sql;
use crate::sql::engine::{Catalog as _, Engine as _, StatementResult};
Expand All @@ -22,13 +22,13 @@ const RAFT_PEER_RETRY_INTERVAL: std::time::Duration = std::time::Duration::from_

/// A toyDB server. Routes messages to/from an inner Raft node.
///
/// - Listens for inbound Raft connections via TCP and passes messages to the
/// * Listens for inbound Raft connections via TCP and passes messages to the
/// local Raft node.
///
/// - Connects to Raft peers via TCP and sends outbound messages from the
/// * Connects to Raft peers via TCP and sends outbound messages from the
/// local Raft node.
///
/// - Listens for inbound SQL connections via TCP and passes requests to
/// * Listens for inbound SQL connections via TCP and passes requests to
/// the local Raft node.
pub struct Server {
/// The inner Raft node.
Expand All @@ -48,18 +48,15 @@ impl Server {
raft_state: Box<dyn raft::State>,
) -> Result<Self> {
let (node_tx, node_rx) = crossbeam::channel::unbounded();
Ok(Self {
node: raft::Node::new(
id,
peers.keys().copied().collect(),
raft_log,
raft_state,
node_tx,
raft::Options::default(),
)?,
peers,
node_rx,
})
let node = raft::Node::new(
id,
peers.keys().copied().collect(),
raft_log,
raft_state,
node_tx,
raft::Options::default(),
)?;
Ok(Self { node, peers, node_rx })
}

/// Serves Raft and SQL requests indefinitely. Consumes the server.
Expand All @@ -80,9 +77,8 @@ impl Server {
// Serve inbound Raft connections.
s.spawn(move || Self::raft_accept(raft_listener, raft_step_tx));

// Establish outbound Raft connections.
// Establish outbound Raft connections to peers.
let mut raft_peers_tx = HashMap::new();

for (id, addr) in self.peers.into_iter() {
let (raft_peer_tx, raft_peer_rx) =
crossbeam::channel::bounded(RAFT_PEER_CHANNEL_CAPACITY);
Expand Down Expand Up @@ -114,7 +110,7 @@ impl Server {
fn raft_accept(listener: TcpListener, raft_step_tx: Sender<raft::Envelope>) {
std::thread::scope(|s| loop {
let (socket, peer) = match listener.accept() {
Ok(sp) => sp,
Ok((socket, peer)) => (socket, peer),
Err(err) => {
error!("Raft peer accept failed: {err}");
continue;
Expand Down Expand Up @@ -154,9 +150,7 @@ impl Server {
}
};
while let Ok(message) = raft_node_rx.recv() {
if let Err(err) = message
.encode_into(&mut socket)
.and_then(|()| socket.flush().map_err(Error::from))
if let Err(err) = message.encode_into(&mut socket).and_then(|_| Ok(socket.flush()?))
{
error!("Failed sending to Raft peer {addr}: {err}");
break;
Expand All @@ -168,17 +162,17 @@ impl Server {

/// Routes Raft messages:
///
/// - node_rx: outbound messages from the local Raft node. Routed to peers
/// * node_rx: outbound messages from the local Raft node. Routed to peers
/// via TCP, or to local clients via a response channel.
///
/// - request_rx: inbound requests from local SQL clients. Stepped into
/// * request_rx: inbound requests from local SQL clients. Stepped into
/// the local Raft node as ClientRequest messages. Responses are returned
/// via the provided response channel.
///
/// - peers_rx: inbound messages from remote Raft peers. Stepped into the
/// * peers_rx: inbound messages from remote Raft peers. Stepped into the
/// local Raft node.
///
/// - peers_tx: outbound per-peer channels sent via TCP connections.
/// * peers_tx: outbound per-peer channels sent via TCP connections.
/// Messages from the local node's node_rx are sent here.
///
/// Panics on any errors, since the Raft node can't recover from failed
Expand Down Expand Up @@ -252,7 +246,7 @@ impl Server {
fn sql_accept(id: raft::NodeID, listener: TcpListener, sql_engine: sql::engine::Raft) {
std::thread::scope(|s| loop {
let (socket, peer) = match listener.accept() {
Ok(sp) => sp,
Ok((socket, peer)) => (socket, peer),
Err(err) => {
error!("Client accept failed: {err}");
continue;
Expand All @@ -269,7 +263,7 @@ impl Server {
})
}

/// Processes a client SQL session, by executing SQL statements against the
/// Processes a client SQL session, executing SQL statements against the
/// Raft node.
fn sql_session(
id: raft::NodeID,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mvcc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl<'a> encoding::Key<'a> for KeyPrefix<'a> {}
/// MVCC engine applies commands one at a time from the Raft log, which will
/// serialize them anyway.
pub struct MVCC<E: Engine> {
pub(crate) engine: Arc<Mutex<E>>,
pub engine: Arc<Mutex<E>>,
}

impl<E: Engine> MVCC<E> {
Expand Down

0 comments on commit 2cf2a2a

Please sign in to comment.