Skip to content

Commit

Permalink
Merge pull request #4318 from systeminit/nick/eng-2627
Browse files Browse the repository at this point in the history
Add cyclone client timeout and make func execution timeouts configurable
  • Loading branch information
nickgerace authored Aug 9, 2024
2 parents 449d51a + 2cf4bc1 commit b717360
Show file tree
Hide file tree
Showing 13 changed files with 417 additions and 212 deletions.
7 changes: 7 additions & 0 deletions bin/veritech/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ pub(crate) struct Args {
/// Veritech decryption key file location [example: /run/veritech/veritech.key]
#[arg(long)]
pub(crate) decryption_key: Option<PathBuf>,

/// Execution timeout when communicating with a cyclone instance executing a function, in seconds
#[arg(long)]
pub(crate) cyclone_client_execution_timeout: Option<u64>,
}

impl TryFrom<Args> for Config {
Expand Down Expand Up @@ -128,6 +132,9 @@ impl TryFrom<Args> for Config {
);
}
config_map.set("nats.connection_name", NAME);
if let Some(timeout) = args.cyclone_client_execution_timeout {
config_map.set("cyclone_client_execution_timeout", timeout);
}
})?
.try_into()
}
Expand Down
54 changes: 30 additions & 24 deletions lib/cyclone-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,33 +139,33 @@ where

async fn execute_ping(&mut self) -> result::Result<PingExecution<Strm>, ClientError>;

async fn execute_resolver(
async fn prepare_resolver_execution(
&mut self,
request: CycloneRequest<ResolverFunctionRequest>,
) -> result::Result<
Execution<Strm, ResolverFunctionRequest, ResolverFunctionResultSuccess>,
ClientError,
>;

async fn execute_action_run(
async fn prepare_action_run_execution(
&mut self,
request: CycloneRequest<ActionRunRequest>,
) -> result::Result<Execution<Strm, ActionRunRequest, ActionRunResultSuccess>, ClientError>;

async fn execute_reconciliation(
async fn prepare_reconciliation_execution(
&mut self,
request: CycloneRequest<ReconciliationRequest>,
) -> result::Result<
Execution<Strm, ReconciliationRequest, ReconciliationResultSuccess>,
ClientError,
>;

async fn execute_validation(
async fn prepare_validation_execution(
&mut self,
request: CycloneRequest<ValidationRequest>,
) -> result::Result<Execution<Strm, ValidationRequest, ValidationResultSuccess>, ClientError>;

async fn execute_schema_variant_definition(
async fn prepare_schema_variant_definition_execution(
&mut self,
request: CycloneRequest<SchemaVariantDefinitionRequest>,
) -> result::Result<
Expand Down Expand Up @@ -284,53 +284,53 @@ where
Ok(ping::execute(stream))
}

async fn execute_resolver(
async fn prepare_resolver_execution(
&mut self,
request: CycloneRequest<ResolverFunctionRequest>,
) -> Result<Execution<Strm, ResolverFunctionRequest, ResolverFunctionResultSuccess>> {
let stream = self.websocket_stream("/execute/resolver").await?;
Ok(execution::execute(stream, request))
Ok(execution::new_unstarted_execution(stream, request))
}

async fn execute_action_run(
async fn prepare_action_run_execution(
&mut self,
request: CycloneRequest<ActionRunRequest>,
) -> result::Result<Execution<Strm, ActionRunRequest, ActionRunResultSuccess>, ClientError>
{
let stream = self.websocket_stream("/execute/command").await?;
Ok(execution::execute(stream, request))
Ok(execution::new_unstarted_execution(stream, request))
}

async fn execute_reconciliation(
async fn prepare_reconciliation_execution(
&mut self,
request: CycloneRequest<ReconciliationRequest>,
) -> result::Result<
Execution<Strm, ReconciliationRequest, ReconciliationResultSuccess>,
ClientError,
> {
let stream = self.websocket_stream("/execute/reconciliation").await?;
Ok(execution::execute(stream, request))
Ok(execution::new_unstarted_execution(stream, request))
}

async fn execute_validation(
async fn prepare_validation_execution(
&mut self,
request: CycloneRequest<ValidationRequest>,
) -> result::Result<Execution<Strm, ValidationRequest, ValidationResultSuccess>, ClientError>
{
Ok(execution::execute(
Ok(execution::new_unstarted_execution(
self.websocket_stream("/execute/validation").await?,
request,
))
}

async fn execute_schema_variant_definition(
async fn prepare_schema_variant_definition_execution(
&mut self,
request: CycloneRequest<SchemaVariantDefinitionRequest>,
) -> result::Result<
Execution<Strm, SchemaVariantDefinitionRequest, SchemaVariantDefinitionResultSuccess>,
ClientError,
> {
Ok(execution::execute(
Ok(execution::new_unstarted_execution(
self.websocket_stream("/execute/schema_variant_definition")
.await?,
request,
Expand Down Expand Up @@ -839,7 +839,7 @@ mod tests {

// Start the protocol
let mut progress = client
.execute_resolver(CycloneRequest::from_parts(req, Default::default()))
.prepare_resolver_execution(CycloneRequest::from_parts(req, Default::default()))
.await
.expect("failed to establish websocket stream")
.start()
Expand Down Expand Up @@ -940,7 +940,7 @@ mod tests {

// Start the protocol
let mut progress = client
.execute_resolver(CycloneRequest::from_parts(req, Default::default()))
.prepare_resolver_execution(CycloneRequest::from_parts(req, Default::default()))
.await
.expect("failed to establish websocket stream")
.start()
Expand Down Expand Up @@ -1012,7 +1012,7 @@ mod tests {
before: vec![],
};
let mut progress = client
.execute_validation(CycloneRequest::from_parts(req, Default::default()))
.prepare_validation_execution(CycloneRequest::from_parts(req, Default::default()))
.await
.expect("failed to establish websocket stream")
.start()
Expand Down Expand Up @@ -1082,7 +1082,7 @@ mod tests {

// Start the protocol
let mut progress = client
.execute_action_run(CycloneRequest::from_parts(req, Default::default()))
.prepare_action_run_execution(CycloneRequest::from_parts(req, Default::default()))
.await
.expect("failed to establish websocket stream")
.start()
Expand Down Expand Up @@ -1168,7 +1168,7 @@ mod tests {

// Start the protocol
let mut progress = client
.execute_action_run(CycloneRequest::from_parts(req, Default::default()))
.prepare_action_run_execution(CycloneRequest::from_parts(req, Default::default()))
.await
.expect("failed to establish websocket stream")
.start()
Expand Down Expand Up @@ -1253,7 +1253,7 @@ mod tests {

// Start the protocol
let mut progress = client
.execute_reconciliation(CycloneRequest::from_parts(req, Default::default()))
.prepare_reconciliation_execution(CycloneRequest::from_parts(req, Default::default()))
.await
.expect("failed to establish websocket stream")
.start()
Expand Down Expand Up @@ -1328,7 +1328,7 @@ mod tests {

// Start the protocol
let mut progress = client
.execute_reconciliation(CycloneRequest::from_parts(req, Default::default()))
.prepare_reconciliation_execution(CycloneRequest::from_parts(req, Default::default()))
.await
.expect("failed to establish websocket stream")
.start()
Expand Down Expand Up @@ -1405,7 +1405,10 @@ mod tests {

// Start the protocol
let mut progress = client
.execute_schema_variant_definition(CycloneRequest::from_parts(req, Default::default()))
.prepare_schema_variant_definition_execution(CycloneRequest::from_parts(
req,
Default::default(),
))
.await
.expect("failed to establish websocket stream")
.start()
Expand Down Expand Up @@ -1482,7 +1485,10 @@ mod tests {

// Start the protocol
let mut progress = client
.execute_schema_variant_definition(CycloneRequest::from_parts(req, Default::default()))
.prepare_schema_variant_definition_execution(CycloneRequest::from_parts(
req,
Default::default(),
))
.await
.expect("failed to establish websocket stream")
.start()
Expand Down
4 changes: 3 additions & 1 deletion lib/cyclone-client/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio_tungstenite::WebSocketStream;

pub use tokio_tungstenite::tungstenite::Message as WebSocketMessage;

pub fn execute<T, Request, Success>(
pub fn new_unstarted_execution<T, Request, Success>(
stream: WebSocketStream<T>,
request: CycloneRequest<Request>,
) -> Execution<T, Request, Success> {
Expand Down Expand Up @@ -69,6 +69,7 @@ where
Request: Serialize,
{
pub async fn start(mut self) -> Result<ExecutionStarted<T, Success>, ExecutionError<Success>> {
// As soon as we see the "start" message, we are good to go.
match self.stream.next().await {
Some(Ok(WebSocketMessage::Text(json_str))) => {
let msg = Message::deserialize_from_str(&json_str)
Expand All @@ -85,6 +86,7 @@ where
None => return Err(ExecutionError::WSClosedBeforeStart),
}

// Once the start message has been seen on the stream, we can send the request.
let msg = serde_json::to_string(&self.request).map_err(ExecutionError::JSONSerialize)?;
self.stream
.send(WebSocketMessage::Text(msg))
Expand Down
9 changes: 9 additions & 0 deletions lib/cyclone-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ pub struct Config {
#[builder(default)]
lang_server_function_timeout: Option<usize>,

#[builder(default)]
lang_server_process_timeout: Option<u64>,

#[builder(setter(into), default)]
limit_requests: Option<u32>,
}
Expand Down Expand Up @@ -126,6 +129,12 @@ impl Config {
self.lang_server_function_timeout
}

/// Gets a reference to the config's lang server process timeout optional override.
#[must_use]
pub fn lang_server_process_timeout(&self) -> Option<u64> {
self.lang_server_process_timeout
}

/// Gets a reference to the config's limit requests.
#[must_use]
pub fn limit_requests(&self) -> Option<u32> {
Expand Down
23 changes: 17 additions & 6 deletions lib/cyclone-server/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,23 @@ use tokio_util::codec::{Decoder, FramedRead, FramedWrite};
use crate::WebSocketMessage;

const TX_TIMEOUT_SECS: Duration = Duration::from_secs(5);
const LANG_SERVER_PROCESS_TIMEOUT: Duration = Duration::from_secs(32 * 60);
const DEFAULT_LANG_SERVER_PROCESS_TIMEOUT: Duration = Duration::from_secs(32 * 60);

pub fn new<Request, LangServerSuccess, Success>(
lang_server_path: impl Into<PathBuf>,
lang_server_debugging: bool,
lang_server_function_timeout: Option<usize>,
lang_server_process_timeout: Option<u64>,
command: String,
) -> Execution<Request, LangServerSuccess, Success> {
Execution {
lang_server_path: lang_server_path.into(),
lang_server_debugging,
lang_server_function_timeout,
lang_server_process_timeout: match lang_server_process_timeout {
Some(timeout) => Duration::from_secs(timeout),
None => DEFAULT_LANG_SERVER_PROCESS_TIMEOUT,
},
command,
request_marker: PhantomData,
lang_server_success_marker: PhantomData,
Expand All @@ -64,8 +69,8 @@ pub enum ExecutionError {
ChildShutdown(#[from] ShutdownError),
#[error("failed to spawn child process; program={0}")]
ChildSpawn(#[source] io::Error, PathBuf),
#[error("child ran for to long")]
ChildTimeout,
#[error("child process timed out: {0:?}")]
ChildTimeout(Duration),
#[error("failed to decode string as utf8")]
FromUtf8(#[from] FromUtf8Error),
#[error("failed to deserialize json message")]
Expand Down Expand Up @@ -93,6 +98,7 @@ pub struct Execution<Request, LangServerSuccess, Success> {
lang_server_path: PathBuf,
lang_server_debugging: bool,
lang_server_function_timeout: Option<usize>,
lang_server_process_timeout: Duration,
command: String,
request_marker: PhantomData<Request>,
lang_server_success_marker: PhantomData<LangServerSuccess>,
Expand Down Expand Up @@ -161,6 +167,7 @@ where
stderr,
sensitive_strings: Arc::new(sensitive_strings),
success_marker: self.success_marker,
lang_server_process_timeout: self.lang_server_process_timeout,
})
}

Expand Down Expand Up @@ -219,6 +226,7 @@ pub struct ExecutionStarted<LangServerSuccess, Success> {
stderr: FramedRead<ChildStderr, BytesLinesCodec>,
sensitive_strings: Arc<SensitiveStrings>,
success_marker: PhantomData<Success>,
lang_server_process_timeout: Duration,
}

// TODO: implement shutdown oneshot
Expand Down Expand Up @@ -289,15 +297,18 @@ where
Result::<_>::Ok(())
};

match timeout(LANG_SERVER_PROCESS_TIMEOUT, receive_loop).await {
match timeout(self.lang_server_process_timeout, receive_loop).await {
Ok(execution) => execution?,
Err(_) => {
Err(err) => {
// Exceeded timeout, shutdown child process
process::child_shutdown(&mut self.child, Some(process::Signal::SIGTERM), None)
.await?;
drop(self.child);

return Err(ExecutionError::ChildTimeout);
error!(?err, "shutdown child process due to timeout");
return Err(ExecutionError::ChildTimeout(
self.lang_server_process_timeout,
));
}
};

Expand Down
Loading

0 comments on commit b717360

Please sign in to comment.