diff --git a/Cargo.lock b/Cargo.lock index f9c731261f94..434f3640b55c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3887,6 +3887,7 @@ dependencies = [ "async-trait", "colored", "enumflags2", + "futures", "hyper", "indexmap 1.9.3", "indoc 2.0.3", diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/Cargo.toml b/query-engine/connector-test-kit-rs/query-tests-setup/Cargo.toml index cf1b98b25adb..63bae3c14172 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/Cargo.toml +++ b/query-engine/connector-test-kit-rs/query-tests-setup/Cargo.toml @@ -33,6 +33,7 @@ indexmap = { version = "1.0", features = ["serde-1"] } query-engine-metrics = {path = "../../metrics"} quaint.workspace = true jsonrpc-core = "17" +futures = "0.3" # Only this version is vetted, upgrade only after going through the code, # as this is a small crate with little user base. diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js.rs index 2ec8513baeda..c852924bbf69 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js.rs @@ -3,7 +3,7 @@ mod external_process; use super::*; use external_process::*; use serde::de::DeserializeOwned; -use std::{collections::HashMap, sync::atomic::AtomicU64}; +use std::sync::atomic::AtomicU64; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; pub(crate) async fn executor_process_request( diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js/external_process.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js/external_process.rs index 1abfedbaf8ee..96781c4e8361 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js/external_process.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/connector_tag/js/external_process.rs @@ -1,7 +1,12 @@ use super::*; +use futures::lock::Mutex; use once_cell::sync::Lazy; use serde::de::DeserializeOwned; -use std::{fmt::Display, io::Write as _, sync::atomic::Ordering}; +use std::{ + fmt::Display, + io::Write as _, + sync::{atomic::Ordering, Arc}, +}; use tokio::sync::{mpsc, oneshot}; type Result = std::result::Result>; @@ -29,6 +34,17 @@ fn exit_with_message(status_code: i32, message: &str) -> ! { } impl ExecutorProcess { + fn spawn() -> ExecutorProcess { + match std::thread::spawn(ExecutorProcess::new).join() { + Ok(Ok(process)) => process, + Ok(Err(err)) => exit_with_message(1, &format!("Failed to start node process. Details: {err}")), + Err(err) => { + let err = err.downcast_ref::().map(ToOwned::to_owned).unwrap_or_default(); + exit_with_message(1, &format!("Panic while trying to start node process.\nDetails: {err}")) + } + } + } + fn new() -> Result { let (sender, receiver) = mpsc::channel::(300); @@ -81,15 +97,30 @@ impl ExecutorProcess { } } -pub(super) static EXTERNAL_PROCESS: Lazy = - Lazy::new(|| match std::thread::spawn(ExecutorProcess::new).join() { - Ok(Ok(process)) => process, - Ok(Err(err)) => exit_with_message(1, &format!("Failed to start node process. Details: {err}")), - Err(err) => { - let err = err.downcast_ref::().map(ToOwned::to_owned).unwrap_or_default(); - exit_with_message(1, &format!("Panic while trying to start node process.\nDetails: {err}")) +#[derive(Clone)] +pub(crate) struct RestartableExecutorProcess { + p: Arc>, +} + +impl RestartableExecutorProcess { + fn new() -> Self { + Self { + p: Arc::new(Mutex::new(ExecutorProcess::spawn())), } - }); + } + + async fn restart(&self) { + let mut p = self.p.lock().await; + _ = std::mem::replace(&mut *p, ExecutorProcess::spawn()); + } + + pub(crate) async fn request(&self, method: &str, params: serde_json::Value) -> Result { + let p = self.p.lock().await; + p.request(method, params).await + } +} + +pub(super) static EXTERNAL_PROCESS: Lazy = Lazy::new(|| RestartableExecutorProcess::new()); type ReqImpl = ( jsonrpc_core::MethodCall, @@ -122,8 +153,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver) -> Result<()> { let mut stdout = BufReader::new(process.stdout.unwrap()).lines(); let mut stdin = process.stdin.unwrap(); - let mut pending_requests: HashMap>> = - HashMap::new(); + let mut last_pending_request: Option<(jsonrpc_core::Id, oneshot::Sender>)> = None; loop { tokio::select! { @@ -137,7 +167,11 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver) -> Result<()> { { match serde_json::from_str::(&line) { Ok(response) => { - let sender = pending_requests.remove(response.id()).unwrap(); + let (id, sender) = last_pending_request.take().expect("got a response from the external process, but there was no pending request"); + if &id != response.id() { + unreachable!("got a response from the external process, but the id didn't match. Are you running with cargo tests with `--test-threads=1`"); + } + match response { jsonrpc_core::Output::Success(success) => { // The other end may be dropped if the whole @@ -159,7 +193,12 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver) -> Result<()> { } Ok(None) => // end of the stream { - exit_with_message(1, "child node process stdout closed") + tracing::error!("Error when reading from child node process. Process might have exited. Restarting..."); + if let Some((_, sender)) = last_pending_request.take() { + let _ = sender.send(Err(Box::new(jsonrpc_core::types::Error::new(jsonrpc_core::ErrorCode::ServerError(500))))).unwrap(); + } + EXTERNAL_PROCESS.restart().await; + break; } Err(err) => // log it { @@ -174,7 +213,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver) -> Result<()> { exit_with_message(1, "The json-rpc client channel was closed"); } Some((request, response_sender)) => { - pending_requests.insert(request.id.clone(), response_sender); + last_pending_request = Some((request.id.clone(), response_sender)); let mut req = serde_json::to_vec(&request).unwrap(); req.push(b'\n'); stdin.write_all(&req).await.unwrap();