Skip to content

Commit

Permalink
Restart executor when it dies
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelff authored and Miguel Fernández committed Nov 28, 2023
1 parent 8713116 commit 71e60e9
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ indexmap = { version = "1.0", features = ["serde-1"] }
query-engine-metrics = {path = "../../metrics"}
quaint.workspace = true
jsonrpc-core = "17"
futures = "0.3"

# Only this version is vetted, upgrade only after going through the code,
# as this is a small crate with little user base.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod external_process;
use super::*;
use external_process::*;
use serde::de::DeserializeOwned;
use std::{collections::HashMap, sync::atomic::AtomicU64};
use std::sync::atomic::AtomicU64;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};

pub(crate) async fn executor_process_request<T: DeserializeOwned>(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use super::*;
use futures::lock::Mutex;
use once_cell::sync::Lazy;
use serde::de::DeserializeOwned;
use std::{fmt::Display, io::Write as _, sync::atomic::Ordering};
use std::{
fmt::Display,
io::Write as _,
sync::{atomic::Ordering, Arc},
};
use tokio::sync::{mpsc, oneshot};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
Expand Down Expand Up @@ -29,6 +34,17 @@ fn exit_with_message(status_code: i32, message: &str) -> ! {
}

impl ExecutorProcess {
fn spawn() -> ExecutorProcess {
match std::thread::spawn(ExecutorProcess::new).join() {
Ok(Ok(process)) => process,
Ok(Err(err)) => exit_with_message(1, &format!("Failed to start node process. Details: {err}")),
Err(err) => {
let err = err.downcast_ref::<String>().map(ToOwned::to_owned).unwrap_or_default();
exit_with_message(1, &format!("Panic while trying to start node process.\nDetails: {err}"))
}
}
}

fn new() -> Result<ExecutorProcess> {
let (sender, receiver) = mpsc::channel::<ReqImpl>(300);

Expand Down Expand Up @@ -81,15 +97,30 @@ impl ExecutorProcess {
}
}

pub(super) static EXTERNAL_PROCESS: Lazy<ExecutorProcess> =
Lazy::new(|| match std::thread::spawn(ExecutorProcess::new).join() {
Ok(Ok(process)) => process,
Ok(Err(err)) => exit_with_message(1, &format!("Failed to start node process. Details: {err}")),
Err(err) => {
let err = err.downcast_ref::<String>().map(ToOwned::to_owned).unwrap_or_default();
exit_with_message(1, &format!("Panic while trying to start node process.\nDetails: {err}"))
#[derive(Clone)]
pub(crate) struct RestartableExecutorProcess {
p: Arc<Mutex<ExecutorProcess>>,
}

impl RestartableExecutorProcess {
fn new() -> Self {
Self {
p: Arc::new(Mutex::new(ExecutorProcess::spawn())),
}
});
}

async fn restart(&self) {
let mut p = self.p.lock().await;
_ = std::mem::replace(&mut *p, ExecutorProcess::spawn());
}

pub(crate) async fn request<T: DeserializeOwned>(&self, method: &str, params: serde_json::Value) -> Result<T> {
let p = self.p.lock().await;
p.request(method, params).await
}
}

pub(super) static EXTERNAL_PROCESS: Lazy<RestartableExecutorProcess> = Lazy::new(|| RestartableExecutorProcess::new());

type ReqImpl = (
jsonrpc_core::MethodCall,
Expand Down Expand Up @@ -122,8 +153,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {

let mut stdout = BufReader::new(process.stdout.unwrap()).lines();
let mut stdin = process.stdin.unwrap();
let mut pending_requests: HashMap<jsonrpc_core::Id, oneshot::Sender<Result<serde_json::value::Value>>> =
HashMap::new();
let mut last_pending_request: Option<(jsonrpc_core::Id, oneshot::Sender<Result<serde_json::value::Value>>)> = None;

loop {
tokio::select! {
Expand All @@ -137,7 +167,11 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
{
match serde_json::from_str::<jsonrpc_core::Output>(&line) {
Ok(response) => {
let sender = pending_requests.remove(response.id()).unwrap();
let (id, sender) = last_pending_request.take().expect("got a response from the external process, but there was no pending request");
if &id != response.id() {
unreachable!("got a response from the external process, but the id didn't match. Are you running with cargo tests with `--test-threads=1`");
}

match response {
jsonrpc_core::Output::Success(success) => {
// The other end may be dropped if the whole
Expand All @@ -159,7 +193,12 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
}
Ok(None) => // end of the stream
{
exit_with_message(1, "child node process stdout closed")
tracing::error!("Error when reading from child node process. Process might have exited. Restarting...");
if let Some((_, sender)) = last_pending_request.take() {
let _ = sender.send(Err(Box::new(jsonrpc_core::types::Error::new(jsonrpc_core::ErrorCode::ServerError(500))))).unwrap();
}
EXTERNAL_PROCESS.restart().await;
break;
}
Err(err) => // log it
{
Expand All @@ -174,7 +213,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
exit_with_message(1, "The json-rpc client channel was closed");
}
Some((request, response_sender)) => {
pending_requests.insert(request.id.clone(), response_sender);
last_pending_request = Some((request.id.clone(), response_sender));
let mut req = serde_json::to_vec(&request).unwrap();
req.push(b'\n');
stdin.write_all(&req).await.unwrap();
Expand Down

0 comments on commit 71e60e9

Please sign in to comment.