Skip to content

Commit

Permalink
Remove DynError (#1123)
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n authored Jun 11, 2024
1 parent 20f1c5a commit 990d704
Show file tree
Hide file tree
Showing 49 changed files with 527 additions and 313 deletions.
313 changes: 281 additions & 32 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions crates/core/src/bin/freenet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use freenet::{
};
use std::{net::SocketAddr, sync::Arc};

async fn run(config: Config) -> Result<(), anyhow::Error> {
async fn run(config: Config) -> anyhow::Result<()> {
match config.mode {
OperationMode::Local => run_local(config).await,
OperationMode::Network => run_network(config).await,
}
}

async fn run_local(config: Config) -> Result<(), anyhow::Error> {
async fn run_local(config: Config) -> anyhow::Result<()> {
tracing::info!("Starting freenet node in local mode");
let port = config.ws_api.port;
let ip = config.ws_api.address;
Expand All @@ -27,12 +27,12 @@ async fn run_local(config: Config) -> Result<(), anyhow::Error> {
.map_err(anyhow::Error::msg)
}

async fn run_network(config: Config) -> Result<(), anyhow::Error> {
async fn run_network(config: Config) -> anyhow::Result<()> {
tracing::info!("Starting freenet node in network mode");
run_network_node(config).await
}

fn main() -> Result<(), anyhow::Error> {
fn main() -> anyhow::Result<()> {
freenet::config::set_logger(None);
let config = ConfigArgs::parse().build()?;
let rt = tokio::runtime::Builder::new_multi_thread()
Expand Down
13 changes: 7 additions & 6 deletions crates/core/src/client_events/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::{
client_events::AuthToken,
server::{ClientConnection, HostCallbackResult},
util::EncodingProtocol,
DynError,
};

use super::{ClientError, ClientEventsProxy, ClientId, HostResult, OpenRequest};
Expand Down Expand Up @@ -224,7 +223,7 @@ async fn websocket_interface(
mut auth_token: Option<AuthToken>,
encoding_protoc: EncodingProtocol,
ws: WebSocket,
) -> Result<(), DynError> {
) -> anyhow::Result<()> {
let (mut response_rx, client_id) = new_client_connection(&request_sender).await?;
let (mut tx, mut rx) = ws.split();
let contract_updates: Arc<Mutex<VecDeque<(_, mpsc::UnboundedReceiver<HostResult>)>>> =
Expand All @@ -246,7 +245,7 @@ async fn websocket_interface(
active_listeners.push_back((key, listener));
}
Err(err @ mpsc::error::TryRecvError::Disconnected) => {
return Err(Box::new(err) as DynError)
return Err(anyhow::anyhow!(err))
}
}
}
Expand Down Expand Up @@ -345,7 +344,7 @@ async fn process_client_request(
request_sender: &mpsc::Sender<ClientConnection>,
auth_token: &mut Option<AuthToken>,
encoding_protoc: EncodingProtocol,
) -> Result<Option<Message>, Option<DynError>> {
) -> Result<Option<Message>, Option<anyhow::Error>> {
let msg = match msg {
Ok(Message::Binary(data)) => data,
Ok(Message::Text(data)) => data.into_bytes(),
Expand Down Expand Up @@ -401,7 +400,7 @@ async fn process_host_response(
client_id: ClientId,
encoding_protoc: EncodingProtocol,
tx: &mut SplitSink<WebSocket, Message>,
) -> Result<Option<NewSubscription>, DynError> {
) -> anyhow::Result<Option<NewSubscription>> {
match msg {
Some(HostCallbackResult::Result { id, result }) => {
debug_assert_eq!(id, client_id);
Expand Down Expand Up @@ -452,7 +451,9 @@ async fn process_host_response(
tx.send(Message::Binary(result_error)).await?;
tx.send(Message::Close(None)).await?;
tracing::warn!("node shut down while handling responses for {client_id}");
Err(format!("node shut down while handling responses for {client_id}").into())
Err(anyhow::anyhow!(
"node shut down while handling responses for {client_id}"
))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl ConfigArgs {

match config_args {
Some((filename, ext)) => {
let path = dir.join(&filename).with_extension(&ext);
let path = dir.join(filename).with_extension(&ext);
tracing::info!("Reading configuration file: {path:?}",);
match ext.as_str() {
"toml" => {
Expand Down
31 changes: 16 additions & 15 deletions crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use crate::wasm_runtime::{
use crate::{
client_events::{ClientId, HostResult},
operations::{self, Operation},
DynError,
};

use super::storages::Storage;
Expand All @@ -45,7 +44,7 @@ pub(super) mod mock_runtime;
pub(super) mod runtime;

#[derive(Debug)]
pub struct ExecutorError(Either<Box<RequestError>, DynError>);
pub struct ExecutorError(Either<Box<RequestError>, anyhow::Error>);

enum InnerOpError {
Upsert(ContractKey),
Expand All @@ -55,13 +54,13 @@ enum InnerOpError {
impl std::error::Error for ExecutorError {}

impl ExecutorError {
pub fn other(error: impl Into<DynError>) -> Self {
pub fn other(error: impl Into<anyhow::Error>) -> Self {
Self(Either::Right(error.into()))
}

/// Call this when an unreachable path is reached but need to avoid panics.
fn internal_error() -> Self {
ExecutorError(Either::Right("internal error".into()))
ExecutorError(Either::Right(anyhow::anyhow!("internal error")))
}

fn request(error: impl Into<RequestError>) -> Self {
Expand Down Expand Up @@ -107,19 +106,19 @@ impl ExecutorError {
Some(InnerOpError::Upsert(key)) => {
return ExecutorError::request(StdContractError::update_exec_error(key, e))
}
_ => return ExecutorError::other(format!("execution error: {e}")),
_ => return ExecutorError::other(anyhow::anyhow!("execution error: {e}")),
},
RuntimeInnerError::WasmExportError(e) => match op {
Some(InnerOpError::Upsert(key)) => {
return ExecutorError::request(StdContractError::update_exec_error(key, e))
}
_ => return ExecutorError::other(format!("execution error: {e}")),
_ => return ExecutorError::other(anyhow::anyhow!("execution error: {e}")),
},
RuntimeInnerError::WasmInstantiationError(e) => match op {
Some(InnerOpError::Upsert(key)) => {
return ExecutorError::request(StdContractError::update_exec_error(key, e))
}
_ => return ExecutorError::other(format!("execution error: {e}")),
_ => return ExecutorError::other(anyhow::anyhow!("execution error: {e}")),
},
_ => {}
}
Expand Down Expand Up @@ -225,7 +224,7 @@ enum CallbackError {
}

impl ExecutorToEventLoopChannel<ExecutorHalve> {
async fn send_to_event_loop<Op, T>(&mut self, message: T) -> Result<Transaction, DynError>
async fn send_to_event_loop<Op, T>(&mut self, message: T) -> anyhow::Result<Transaction>
where
T: ComposeNetworkMessage<Op>,
Op: Operation + Send + 'static,
Expand Down Expand Up @@ -257,7 +256,7 @@ impl ExecutorToEventLoopChannel<ExecutorHalve> {
.response_for_rx
.recv()
.await
.ok_or_else(|| ExecutorError::other("channel closed"))?;
.ok_or_else(|| ExecutorError::other(anyhow::anyhow!("channel closed")))?;
if op_result.id() != &transaction {
self.end.completed.insert(*op_result.id(), op_result);
return Err(CallbackError::MissingResult);
Expand All @@ -267,13 +266,13 @@ impl ExecutorToEventLoopChannel<ExecutorHalve> {
}

impl ExecutorToEventLoopChannel<NetworkEventListenerHalve> {
pub async fn transaction_from_executor(&mut self) -> Result<Transaction, DynError> {
pub async fn transaction_from_executor(&mut self) -> anyhow::Result<Transaction> {
let tx = self
.end
.waiting_for_op_rx
.recv()
.await
.ok_or("channel closed")?;
.ok_or(anyhow::anyhow!("channel closed"))?;
Ok(tx)
}

Expand Down Expand Up @@ -463,11 +462,11 @@ pub struct Executor<R = Runtime> {
impl<R> Executor<R> {
pub async fn new(
state_store: StateStore<Storage>,
ctrl_handler: impl FnOnce() -> Result<(), DynError>,
ctrl_handler: impl FnOnce() -> anyhow::Result<()>,
mode: OperationMode,
runtime: R,
event_loop_channel: Option<ExecutorToEventLoopChannel<ExecutorHalve>>,
) -> Result<Self, DynError> {
) -> anyhow::Result<Self> {
ctrl_handler()?;

Ok(Self {
Expand Down Expand Up @@ -498,7 +497,7 @@ impl<R> Executor<R> {
SecretsStore,
StateStore<Storage>,
),
DynError,
anyhow::Error,
> {
const MAX_SIZE: i64 = 10 * 1024 * 1024;
const MAX_MEM_CACHE: u32 = 10_000_000;
Expand All @@ -521,7 +520,9 @@ impl<R> Executor<R> {
M: ComposeNetworkMessage<Op>,
{
let Some(ch) = &mut self.event_loop_channel else {
return Err(ExecutorError::other("missing event loop channel"));
return Err(ExecutorError::other(anyhow::anyhow!(
"missing event loop channel"
)));
};
let transaction = ch
.send_to_event_loop(request)
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/contract/executor/mock_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ impl Executor<MockRuntime> {
pub async fn new_mock(
identifier: &str,
event_loop_channel: ExecutorToEventLoopChannel<ExecutorHalve>,
) -> Result<Self, DynError> {
) -> anyhow::Result<Self> {
let data_dir = Self::test_data_dir(identifier);

let contracts_data_dir = data_dir.join("contracts");
Expand Down Expand Up @@ -56,7 +56,7 @@ impl ContractExecutor for Executor<MockRuntime> {
.await
.map_err(ExecutorError::other)?
else {
return Err(ExecutorError::other(format!(
return Err(ExecutorError::other(anyhow::anyhow!(
"missing state and/or parameters for contract {key}"
)));
};
Expand All @@ -68,7 +68,7 @@ impl ContractExecutor for Executor<MockRuntime> {
None
};
let Ok(state) = self.state_store.get(&key).await else {
return Err(ExecutorError::other(format!(
return Err(ExecutorError::other(anyhow::anyhow!(
"missing state for contract {key}"
)));
};
Expand Down
17 changes: 9 additions & 8 deletions crates/core/src/contract/executor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl Executor<Runtime> {
pub async fn from_config(
config: Arc<Config>,
event_loop_channel: Option<ExecutorToEventLoopChannel<ExecutorHalve>>,
) -> Result<Self, DynError> {
) -> anyhow::Result<Self> {
let (contract_store, delegate_store, secret_store, state_store) =
Self::get_stores(&config).await?;
let rt = Runtime::build(contract_store, delegate_store, secret_store, false).unwrap();
Expand Down Expand Up @@ -246,7 +246,7 @@ impl Executor<Runtime> {
}
Err(RequestError::Disconnect.into())
}
_ => Err(ExecutorError::other("not supported")),
_ => Err(ExecutorError::other(anyhow::anyhow!("not supported"))),
}
}

Expand All @@ -272,8 +272,9 @@ impl Executor<Runtime> {
fetch_contract: contract,
} => self.perform_contract_get(contract, key).await,
ContractRequest::Subscribe { key, summary } => {
let updates =
updates.ok_or_else(|| ExecutorError::other("missing update channel"))?;
let updates = updates.ok_or_else(|| {
ExecutorError::other(anyhow::anyhow!("missing update channel"))
})?;
self.register_contract_notifier(key, cli_id, updates, summary)?;
// by default a subscribe op has an implicit get
let res = self.perform_contract_get(false, key).await?;
Expand All @@ -287,7 +288,7 @@ impl Executor<Runtime> {
}
Ok(res)
}
_ => Err(ExecutorError::other("not supported")),
_ => Err(ExecutorError::other(anyhow::anyhow!("not supported"))),
}
}

Expand Down Expand Up @@ -381,13 +382,13 @@ impl Executor<Runtime> {

Err(err) => {
tracing::error!("failed executing delegate `{key}`: {err}");
Err(ExecutorError::other(format!(
Err(ExecutorError::other(anyhow::anyhow!(
"uncontrolled error while executing `{key}`"
)))
}
}
}
_ => Err(ExecutorError::other("not supported")),
_ => Err(ExecutorError::other(anyhow::anyhow!("not supported"))),
}
}

Expand Down Expand Up @@ -719,7 +720,7 @@ impl Executor<Runtime> {
async fn get_local_contract(
&self,
id: &ContractInstanceId,
) -> Result<State<'static>, Either<Box<RequestError>, DynError>> {
) -> Result<State<'static>, Either<Box<RequestError>, anyhow::Error>> {
let Ok(contract) = self.state_store.get(&(*id).into()).await else {
return Err(Either::Right(
StdContractError::MissingRelated { key: *id }.into(),
Expand Down
19 changes: 9 additions & 10 deletions crates/core/src/contract/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use super::{
use crate::client_events::HostResult;
use crate::config::Config;
use crate::message::Transaction;
use crate::{client_events::ClientId, wasm_runtime::Runtime, DynError};
use crate::{client_events::ClientId, wasm_runtime::Runtime};

pub(crate) struct ClientResponsesReceiver(UnboundedReceiver<(ClientId, HostResult)>);

Expand Down Expand Up @@ -60,7 +60,7 @@ pub(crate) trait ContractHandler {
contract_handler_channel: ContractHandlerChannel<ContractHandlerHalve>,
executor_request_sender: ExecutorToEventLoopChannel<ExecutorHalve>,
builder: Self::Builder,
) -> impl Future<Output = Result<Self, DynError>> + Send
) -> impl Future<Output = anyhow::Result<Self>> + Send
where
Self: Sized + 'static;

Expand All @@ -82,7 +82,7 @@ impl ContractHandler for NetworkContractHandler<Runtime> {
channel: ContractHandlerChannel<ContractHandlerHalve>,
executor_request_sender: ExecutorToEventLoopChannel<ExecutorHalve>,
config: Self::Builder,
) -> Result<Self, DynError>
) -> anyhow::Result<Self>
where
Self: Sized + 'static,
{
Expand All @@ -108,7 +108,7 @@ impl ContractHandler for NetworkContractHandler<super::MockRuntime> {
channel: ContractHandlerChannel<ContractHandlerHalve>,
executor_request_sender: ExecutorToEventLoopChannel<ExecutorHalve>,
identifier: Self::Builder,
) -> Result<Self, DynError>
) -> anyhow::Result<Self>
where
Self: Sized + 'static,
{
Expand Down Expand Up @@ -203,12 +203,12 @@ static EV_ID: AtomicU64 = AtomicU64::new(0);
impl ContractHandlerChannel<WaitingResolution> {
pub async fn relay_transaction_result_to_client(
&mut self,
) -> Result<(ClientId, Transaction), DynError> {
) -> anyhow::Result<(ClientId, Transaction)> {
self.end
.wait_for_res_rx
.recv()
.await
.ok_or_else(|| "channel dropped".into())
.ok_or_else(|| anyhow::anyhow!("channel dropped"))
}
}

Expand Down Expand Up @@ -386,7 +386,7 @@ pub mod test {
use crate::config::GlobalExecutor;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn channel_test() -> Result<(), anyhow::Error> {
async fn channel_test() -> anyhow::Result<()> {
let (send_halve, mut rcv_halve, _) = contract_handler_channel();

let contract = ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract::new(
Expand Down Expand Up @@ -441,7 +441,6 @@ pub(super) mod in_memory {
},
ContractHandler, ContractHandlerChannel, ContractHandlerHalve,
};
use crate::DynError;

pub(crate) struct MemoryContractHandler {
channel: ContractHandlerChannel<ContractHandlerHalve>,
Expand Down Expand Up @@ -471,7 +470,7 @@ pub(super) mod in_memory {
channel: ContractHandlerChannel<ContractHandlerHalve>,
executor_request_sender: ExecutorToEventLoopChannel<ExecutorHalve>,
identifier: Self::Builder,
) -> Result<Self, DynError>
) -> anyhow::Result<Self>
where
Self: Sized + 'static,
{
Expand All @@ -488,7 +487,7 @@ pub(super) mod in_memory {
}

#[test]
fn serialization() -> Result<(), anyhow::Error> {
fn serialization() -> anyhow::Result<()> {
use freenet_stdlib::prelude::WrappedContract;
let bytes = crate::util::test::random_bytes_1kb();
let mut gen = arbitrary::Unstructured::new(&bytes);
Expand Down
Loading

0 comments on commit 990d704

Please sign in to comment.