Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(veritech): When errors occur that don't cause us to retry the function, inform subscribers #5263

Merged
merged 1 commit into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions lib/dal/tests/integration_test/func/kill_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use dal_test::helpers::{
};
use dal_test::test;
use pretty_assertions_sorted::assert_eq;
use si_events::FuncRunState;
use si_events::{ActionResultState, FuncRunState};

#[test]
async fn kill_execution_works(ctx: &mut DalContext) {
Expand Down Expand Up @@ -96,6 +96,18 @@ async fn kill_execution_works(ctx: &mut DalContext) {
FuncRunner::kill_execution(ctx, func_run_id)
.await
.expect("could not kill execution");
ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx)
.await
.expect("could not commit and update snapshot to visibility");
let action_result_state = ctx
.layer_db()
.func_run()
.read(func_run_id)
.await
.expect("could not get func run")
.expect("no func run found")
.action_result_state()
.expect("action result state found");
let func_run_state = ctx
.layer_db()
.func_run()
Expand All @@ -104,8 +116,21 @@ async fn kill_execution_works(ctx: &mut DalContext) {
.expect("could not get func run")
.expect("no func run found")
.state();

// This should be [`FuncRunState::Killed`]
// but because this is getting bubbled back to the Pinga Job running the
// action, and the error is now embedded in a successful response,
// (because we did get a response from Veritech)
Comment on lines +120 to +123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UGH! So sorry about this... that explains a lot.

// Pinga is setting the func run state to success and the
// action result state to failure.
// make the madness end pls.
assert_eq!(
FuncRunState::Success, // expected
func_run_state // actual
);

assert_eq!(
FuncRunState::Killed, // expected
func_run_state // actual
ActionResultState::Failure, // expected
action_result_state // actual
);
}
74 changes: 51 additions & 23 deletions lib/veritech-server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ use si_data_nats::{InnerMessage, Subject};
// seems strange to get these cyclone_core types from si_pool_noodle?
use si_pool_noodle::{
ActionRunResultSuccess, CycloneClient, CycloneRequest, CycloneRequestable, ExecutionError,
ManagementResultSuccess, ProgressMessage, ResolverFunctionResultSuccess,
SchemaVariantDefinitionResultSuccess, SensitiveStrings, ValidationResultSuccess,
FunctionResultFailure, FunctionResultFailureError, ManagementResultSuccess, ProgressMessage,
ResolverFunctionResultSuccess, SchemaVariantDefinitionResultSuccess, SensitiveStrings,
ValidationResultSuccess,
};
use std::{collections::HashMap, result, str::Utf8Error, sync::Arc, time::Duration};
use telemetry::prelude::*;
use telemetry_utils::metric;
use thiserror::Error;
use tokio::sync::{oneshot, Mutex};
use veritech_core::{
Expand Down Expand Up @@ -186,7 +186,6 @@ where
let nats_for_publisher = state.nats.clone();
let publisher = Publisher::new(&nats_for_publisher, &reply_mailbox);
let execution_id = request.execution_id().to_owned();

let cyclone_request = CycloneRequest::from_parts(request.clone(), sensitive_strings);

let (kill_sender, kill_receiver) = oneshot::channel::<()>();
Expand Down Expand Up @@ -224,7 +223,7 @@ where
trace!("received heartbeat message");
}
Err(err) => {
warn!(error = ?err, "next progress message was an error, bailing out");
warn!(si.error.message = ?err, "next progress message was an error, bailing out");
break;
}
}
Expand All @@ -242,47 +241,76 @@ where
HandlerResult::Ok(function_result)
};

// we do not want to return errors at this point as it will retry functions that may have
// failed for legitimate reasons and should not be retried
// we do not want to return errors at this point as it will Nack the message and end up auto-retrying
// functions that may have failed for legitimate reasons and should not be retried
let timeout = state.cyclone_client_execution_timeout;
let result = tokio::select! {
_ = tokio::time::sleep(timeout) => {
error!("hit timeout for communicating with cyclone server");
kill_sender_remove_blocking(&state.kill_senders, execution_id).await?;
error!("hit timeout for communicating with cyclone server:{:?}", &timeout);
kill_sender_remove_blocking(&state.kill_senders, execution_id.to_owned()).await?;
Err(HandlerError::CycloneTimeout(
timeout,
))
},
Ok(_) = kill_receiver => {
Err(HandlerError::Killed(execution_id))
Err(HandlerError::Killed(execution_id.clone()))
}
func_result = progress_loop => {
kill_sender_remove_blocking(&state.kill_senders, execution_id).await?;
kill_sender_remove_blocking(&state.kill_senders, execution_id.to_owned()).await?;
func_result
},
};

match result {
// Got an Ok - let anyone subscribing to a reply know
Ok(function_result) => {
if let Err(err) = publisher.publish_result(&function_result).await {
metric!(counter.function_run.action = -1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is scooped up by the dec call below, yeah?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah were we double decrementing here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think just leftover from pre-refactor

error!(error = ?err, "failed to publish errored result");
error!(si.error.message = ?err, "failed to publish errored result");
}

request.dec_run_metric();
span.record_ok();
}
Err(HandlerError::CycloneTimeout(timeout)) => {
request.dec_run_metric();
warn!(error = ?timeout, "timed out trying to run function to completion");
}
Err(HandlerError::Killed(execution_id)) => {
request.dec_run_metric();
info!(error = ?execution_id, "function killed during execution via signal");
}
Err(err) => {
// Got an error that we don't want to recover from here - need to let anyone subscribing know we're done
// so they're not waiting forever and can decide how to proceed
// Construct the Error result to propagate to subscribers
Err(ref err) => {
let func_result_error = match err {
HandlerError::CycloneTimeout(ref timeout) => {
warn!(si.error.message = ?err, "timed out trying to run function to completion: {:?}", timeout);
let func_res_failure = FunctionResultFailure::new_for_veritech_server_error(
execution_id.to_owned(),
"timed out trying to run function to completion",
timestamp(),
);
si_pool_noodle::FunctionResult::Failure::<Request>(func_res_failure)
}
HandlerError::Killed(ref execution_id) => {
warn!(si.error.message = ?err, si.func_run.id = ?execution_id, "function killed during execution: {:?} via signal", execution_id);
let func_res_failure = FunctionResultFailure::new(
execution_id.to_owned(),
FunctionResultFailureError {
kind: si_pool_noodle::FunctionResultFailureErrorKind::KilledExecution,
message: "function execution terminated".to_owned(),
},
timestamp(),
);
si_pool_noodle::FunctionResult::Failure::<Request>(func_res_failure)
}
err => {
error!(si.error.message = ?err, si.func_run.id = ?execution_id.to_owned(), "failure trying to run function to completion");
let func_res_failure = FunctionResultFailure::new_for_veritech_server_error(
execution_id.to_owned(),
"timed out trying to run function to completion",
timestamp(),
);
si_pool_noodle::FunctionResult::Failure::<Request>(func_res_failure)
}
};
request.dec_run_metric();
error!(error = ?err, "failure trying to run function to completion");
if let Err(err) = publisher.publish_result(&func_result_error).await {
error!(error = ?err, "failed to publish errored result");
}
}
}

Expand Down
Loading