Skip to content

Commit

Permalink
fix(veritech): When errors occur that don't cause us to retry the fun…
Browse files Browse the repository at this point in the history
…ction, inform subscribers

This change ensures that for all functions types, when errors occur (such as killing a function, or timing out), we inform subscribers of the failure so they can handle them appropriately. 

The main issue this fixes is during Pinga jobs, if a function times out or is killed, the job will continue to run until it hits its own time out, exacerbating hung change sets
  • Loading branch information
britmyerss committed Jan 16, 2025
1 parent 8ada592 commit 746f098
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 26 deletions.
31 changes: 28 additions & 3 deletions lib/dal/tests/integration_test/func/kill_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use dal_test::helpers::{
};
use dal_test::test;
use pretty_assertions_sorted::assert_eq;
use si_events::FuncRunState;
use si_events::{ActionResultState, FuncRunState};

#[test]
async fn kill_execution_works(ctx: &mut DalContext) {
Expand Down Expand Up @@ -96,6 +96,18 @@ async fn kill_execution_works(ctx: &mut DalContext) {
FuncRunner::kill_execution(ctx, func_run_id)
.await
.expect("could not kill execution");
ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx)
.await
.expect("could not commit and update snapshot to visibility");
let action_result_state = ctx
.layer_db()
.func_run()
.read(func_run_id)
.await
.expect("could not get func run")
.expect("no func run found")
.action_result_state()
.expect("action result state found");
let func_run_state = ctx
.layer_db()
.func_run()
Expand All @@ -104,8 +116,21 @@ async fn kill_execution_works(ctx: &mut DalContext) {
.expect("could not get func run")
.expect("no func run found")
.state();

// This should be [`FuncRunState::Killed`]
// but because this is getting bubbled back to the Pinga Job running the
// action, and the error is now embedded in a successful response,
// (because we did get a response from Veritech)
// Pinga is setting the func run state to success and the
// action result state to failure.
// make the madness end pls.
assert_eq!(
FuncRunState::Success, // expected
func_run_state // actual
);

assert_eq!(
FuncRunState::Killed, // expected
func_run_state // actual
ActionResultState::Failure, // expected
action_result_state // actual
);
}
74 changes: 51 additions & 23 deletions lib/veritech-server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ use si_data_nats::{InnerMessage, Subject};
// seems strange to get these cyclone_core types from si_pool_noodle?
use si_pool_noodle::{
ActionRunResultSuccess, CycloneClient, CycloneRequest, CycloneRequestable, ExecutionError,
ManagementResultSuccess, ProgressMessage, ResolverFunctionResultSuccess,
SchemaVariantDefinitionResultSuccess, SensitiveStrings, ValidationResultSuccess,
FunctionResultFailure, FunctionResultFailureError, ManagementResultSuccess, ProgressMessage,
ResolverFunctionResultSuccess, SchemaVariantDefinitionResultSuccess, SensitiveStrings,
ValidationResultSuccess,
};
use std::{collections::HashMap, result, str::Utf8Error, sync::Arc, time::Duration};
use telemetry::prelude::*;
use telemetry_utils::metric;
use thiserror::Error;
use tokio::sync::{oneshot, Mutex};
use veritech_core::{
Expand Down Expand Up @@ -186,7 +186,6 @@ where
let nats_for_publisher = state.nats.clone();
let publisher = Publisher::new(&nats_for_publisher, &reply_mailbox);
let execution_id = request.execution_id().to_owned();

let cyclone_request = CycloneRequest::from_parts(request.clone(), sensitive_strings);

let (kill_sender, kill_receiver) = oneshot::channel::<()>();
Expand Down Expand Up @@ -224,7 +223,7 @@ where
trace!("received heartbeat message");
}
Err(err) => {
warn!(error = ?err, "next progress message was an error, bailing out");
warn!(si.error.message = ?err, "next progress message was an error, bailing out");
break;
}
}
Expand All @@ -242,47 +241,76 @@ where
HandlerResult::Ok(function_result)
};

// we do not want to return errors at this point as it will retry functions that may have
// failed for legitimate reasons and should not be retried
// we do not want to return errors at this point as it will Nack the message and end up auto-retrying
// functions that may have failed for legitimate reasons and should not be retried
let timeout = state.cyclone_client_execution_timeout;
let result = tokio::select! {
_ = tokio::time::sleep(timeout) => {
error!("hit timeout for communicating with cyclone server");
kill_sender_remove_blocking(&state.kill_senders, execution_id).await?;
error!("hit timeout for communicating with cyclone server:{:?}", &timeout);
kill_sender_remove_blocking(&state.kill_senders, execution_id.to_owned()).await?;
Err(HandlerError::CycloneTimeout(
timeout,
))
},
Ok(_) = kill_receiver => {
Err(HandlerError::Killed(execution_id))
Err(HandlerError::Killed(execution_id.clone()))
}
func_result = progress_loop => {
kill_sender_remove_blocking(&state.kill_senders, execution_id).await?;
kill_sender_remove_blocking(&state.kill_senders, execution_id.to_owned()).await?;
func_result
},
};

match result {
// Got an Ok - let anyone subscribing to a reply know
Ok(function_result) => {
if let Err(err) = publisher.publish_result(&function_result).await {
metric!(counter.function_run.action = -1);
error!(error = ?err, "failed to publish errored result");
error!(si.error.message = ?err, "failed to publish errored result");
}

request.dec_run_metric();
span.record_ok();
}
Err(HandlerError::CycloneTimeout(timeout)) => {
request.dec_run_metric();
warn!(error = ?timeout, "timed out trying to run function to completion");
}
Err(HandlerError::Killed(execution_id)) => {
request.dec_run_metric();
info!(error = ?execution_id, "function killed during execution via signal");
}
Err(err) => {
// Got an error that we don't want to recover from here - need to let anyone subscribing know we're done
// so they're not waiting forever and can decide how to proceed
// Construct the Error result to propagate to subscribers
Err(ref err) => {
let func_result_error = match err {
HandlerError::CycloneTimeout(ref timeout) => {
warn!(si.error.message = ?err, "timed out trying to run function to completion: {:?}", timeout);
let func_res_failure = FunctionResultFailure::new_for_veritech_server_error(
execution_id.to_owned(),
"timed out trying to run function to completion",
timestamp(),
);
si_pool_noodle::FunctionResult::Failure::<Request>(func_res_failure)
}
HandlerError::Killed(ref execution_id) => {
warn!(si.error.message = ?err, si.func_run.id = ?execution_id, "function killed during execution: {:?} via signal", execution_id);
let func_res_failure = FunctionResultFailure::new(
execution_id.to_owned(),
FunctionResultFailureError {
kind: si_pool_noodle::FunctionResultFailureErrorKind::KilledExecution,
message: "function execution terminated".to_owned(),
},
timestamp(),
);
si_pool_noodle::FunctionResult::Failure::<Request>(func_res_failure)
}
err => {
error!(si.error.message = ?err, si.func_run.id = ?execution_id.to_owned(), "failure trying to run function to completion");
let func_res_failure = FunctionResultFailure::new_for_veritech_server_error(
execution_id.to_owned(),
"timed out trying to run function to completion",
timestamp(),
);
si_pool_noodle::FunctionResult::Failure::<Request>(func_res_failure)
}
};
request.dec_run_metric();
error!(error = ?err, "failure trying to run function to completion");
if let Err(err) = publisher.publish_result(&func_result_error).await {
error!(error = ?err, "failed to publish errored result");
}
}
}

Expand Down

0 comments on commit 746f098

Please sign in to comment.