Skip to content

Commit

Permalink
Merge pull request #4271 from systeminit/brit/error-events-on-errors
Browse files Browse the repository at this point in the history
Add error events to SDF routes and some other helpful tracing
  • Loading branch information
stack72 authored Aug 1, 2024
2 parents 20e1373 + 92f1032 commit 038a95e
Show file tree
Hide file tree
Showing 29 changed files with 141 additions and 105 deletions.
4 changes: 4 additions & 0 deletions lib/dal/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ impl Action {
Ok(action)
}

#[instrument(level = "info", skip_all, fields(si.action.id = ?id, si.action.state = ?state))]
pub async fn set_state(ctx: &DalContext, id: ActionId, state: ActionState) -> ActionResult<()> {
let idx = ctx.workspace_snapshot()?.get_node_index_by_id(id).await?;
let node_weight = ctx
Expand Down Expand Up @@ -614,6 +615,9 @@ impl Action {
Ok(result)
}

#[instrument(name = "workspace_snapshot.dispatch_action", level = "info", skip_all, fields(
si.action.id = ?action_id,
))]
pub async fn dispatch_action(ctx: &DalContext, action_id: ActionId) -> ActionResult<()> {
Action::set_state(ctx, action_id, ActionState::Dispatched).await?;

Expand Down
6 changes: 3 additions & 3 deletions lib/dal/src/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl ChangeSet {
skip_all,
fields(
si.change_set.id = %change_set_id,
si.workspace.pk = Empty,
si.workspace.id = Empty,
),
)]
pub async fn find(
Expand All @@ -360,7 +360,7 @@ impl ChangeSet {
let change_set = Self::try_from(row)?;

if let Some(workspace_id) = change_set.workspace_id {
span.record("si.workspace.pk", workspace_id.to_string());
span.record("si.workspace.id", workspace_id.to_string());
}
Ok(Some(change_set))
}
Expand Down Expand Up @@ -609,7 +609,7 @@ impl ChangeSet {
skip_all,
fields(
si.workspace_snapshot_address = %workspace_snapshot_address,
si.workspace.pk = Empty,
si.workspace.id = Empty,
),
)]
pub async fn workspace_snapshot_address_in_use(
Expand Down
2 changes: 1 addition & 1 deletion lib/dal/src/component/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct Frame;

impl Frame {
/// Provides an ability to remove the existing ['Component']'s parent``
#[instrument(level = "info", skip_all)]
#[instrument(level = "info", skip(ctx))]
pub async fn orphan_child(ctx: &DalContext, child_id: ComponentId) -> FrameResult<()> {
// Normally, we'd call `Component::get_parent_by_id` to get the parent's ID, but that
// returns a hard error if there are multiple parents. Since we want to be able to use this
Expand Down
36 changes: 27 additions & 9 deletions lib/dal/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,14 @@ impl DalContext {
) -> Result<(), TransactionsError> {
if let Some(conflicts) = rebase(&self.tenancy, &self.layer_db(), rebase_request).await? {
let conflict_count = &conflicts.conflicts_found.len();
let updates_found_and_skipped = &conflicts.updates_found_and_skipped.clone();
let err = TransactionsError::ConflictsOccurred(conflicts);
error!(error= ?err, si.conflicts.count={conflict_count}, "conflicts found on commit");
error!(
si.error.message = ?err,
si.conflicts.count = { conflict_count },
si.updates_found_and_skipped = format!("{:?}", updates_found_and_skipped),
"conflicts found on commit"
);
return Err(err);
}
Ok(())
Expand Down Expand Up @@ -459,8 +465,13 @@ impl DalContext {
self.commit_internal(rebase_request).await?
} {
let conflict_count = &conflicts.conflicts_found.len();
let err = TransactionsError::ConflictsOccurred(conflicts);
error!(error= ?err, conflict.count={conflict_count}, "conflicts found on commit");
let err = TransactionsError::ConflictsOccurred(conflicts.clone());
error!(
si.error.message = ?err,
si.conflicts.count = { conflict_count },
si.conflicts = ?conflicts.conflicts_found.clone(),
"conflicts found on commit"
);
return Err(err);
}
Ok(())
Expand Down Expand Up @@ -1225,7 +1236,7 @@ pub struct Updates {
skip_all,
fields(
si.change_set.id = Empty,
si.workspace.pk = Empty,
si.workspace.id = Empty,
si.conflicts = Empty,
si.updates = Empty,
si.conflicts.count = Empty,
Expand All @@ -1252,7 +1263,7 @@ async fn rebase(
rebase_request.to_rebase_change_set_id.to_string(),
);
span.record(
"si.workspace.pk",
"si.workspace.id",
tenancy
.workspace_pk()
.unwrap_or(WorkspacePk::NONE)
Expand Down Expand Up @@ -1345,7 +1356,7 @@ impl Transactions {
skip_all,
fields(
si.change_set.id = Empty,
si.workspace.pk = Empty,
si.workspace.id = Empty,
)
)]
pub async fn commit_into_conns(
Expand All @@ -1356,7 +1367,7 @@ impl Transactions {
) -> Result<(Connections, Option<Conflicts>), TransactionsError> {
let span = Span::current();
span.record(
"si.workspace.pk",
"si.workspace.id",
tenancy
.workspace_pk()
.unwrap_or(WorkspacePk::NONE)
Expand Down Expand Up @@ -1410,8 +1421,15 @@ impl Transactions {
} else {
None
};
if conflicts.is_some() {
error!("conflicts found");
if let Some(ref conflicts) = conflicts {
let conflict_count = conflicts.conflicts_found.len();
let err = TransactionsError::ConflictsOccurred(conflicts.clone());
error!(
si.error.message = ?err,
si.conflicts.count = { conflict_count },
si.conflicts = ?conflicts.conflicts_found.clone(),
"conflicts found on blocking commit"
);
}

let nats_conn = self.nats_txn.commit_into_conn().await?;
Expand Down
12 changes: 10 additions & 2 deletions lib/dal/src/func/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1297,7 +1297,11 @@ impl FuncRunnerLogsTask {

async fn run(self) {
if let Err(err) = self.try_run().await {
error!(task = Self::NAME, error = ?err, "error while processing function logs");
error!(
si.error.message = ?err,
task = Self::NAME,
"error while processing function logs"
);
}
}

Expand Down Expand Up @@ -1390,7 +1394,11 @@ impl FuncRunnerExecutionTask {
parent_span.record_err(err)
})
{
error!(task = Self::NAME, error = ?err, "error while dispatching and running function");
error!(
si.error.message = ?err,
task = Self::NAME,
"error while dispatching and running function"
);
}
}

Expand Down
14 changes: 10 additions & 4 deletions lib/dal/src/job/definition/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ impl JobConsumer for ActionJob {
)]
async fn run(&self, ctx: &mut DalContext) -> JobConsumerResult<JobCompletionState> {
if let Err(err) = inner_run(ctx, self.id).await {
error!(error = ?err, si.action.id = %self.id, "unable to finish action");
error!(si.error.message = ?err, si.action.id = %self.id, "unable to finish action");
if let Err(err) = process_failed_action(ctx, self.id).await {
error!(error = ?err, "failed to process action failure");
error!(si.error.message = ?err, "failed to process action failure");
}
}

Expand Down Expand Up @@ -210,6 +210,9 @@ async fn prepare_for_execution(
Ok((prototype_id, component_id))
}

#[instrument(name = "action_job.process_and_record_execution",
skip_all, level = "info", fields(
si.action.id = ?action_id))]
async fn process_and_record_execution(
mut ctx: DalContext,
maybe_resource: Option<&ActionRunResultSuccess>,
Expand Down Expand Up @@ -282,13 +285,16 @@ async fn process_and_record_execution(
.await?
.publish_on_commit(&ctx)
.await?;

ctx.commit().await?;

Ok(())
}

#[instrument(name = "action_job.process_failed_action", skip_all, level = "info")]
#[instrument(
name = "action_job.process_failed_action",
skip_all,
level = "info",
fields(si.action.id = ?action_id))]
async fn process_failed_action(ctx: &DalContext, action_id: ActionId) -> JobConsumerResult<()> {
info!(%action_id, "processing action failed");

Expand Down
8 changes: 4 additions & 4 deletions lib/dal/src/job/definition/dependent_values_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ impl JobConsumer for DependentValuesUpdate {
skip_all,
fields(
si.change_set.id = Empty,
si.workspace.pk = Empty,
si.workspace.id = Empty,
),
)]
async fn run(&self, ctx: &mut DalContext) -> JobConsumerResult<JobCompletionState> {
let span = Span::current();
span.record("si.change_set.id", ctx.change_set_id().to_string());
span.record(
"si.workspace.pk",
"si.workspace.id",
ctx.tenancy()
.workspace_pk()
.unwrap_or(WorkspacePk::NONE)
Expand Down Expand Up @@ -261,7 +261,7 @@ impl DependentValuesUpdate {
)
.await
{
error!("status update finished event send failed for AttributeValue {finished_value_id}: {err}");
error!(si.error.message = ?err, "status update finished event send failed for AttributeValue {finished_value_id}");
};
}
}
Expand Down Expand Up @@ -301,7 +301,7 @@ async fn execution_error(
fallback
};

error!("{}", error_message);
error!(si.error.message = error_message, %attribute_value_id);
}

async fn execution_error_detail(
Expand Down
2 changes: 1 addition & 1 deletion lib/rebaser-server/src/change_set_requests/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Result<T> = result::Result<T, HandlerError>;

impl IntoResponse for HandlerError {
fn into_response(self) -> Response {
error!(error = ?self, "failed to process message");
error!(si.error.message = ?self, "failed to process message");
Response::server_error()
}
}
Expand Down
32 changes: 27 additions & 5 deletions lib/rebaser-server/src/rebase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,27 @@ pub enum RebaseError {

type RebaseResult<T> = Result<T, RebaseError>;

#[instrument(name = "perform_rebase", level = "debug", skip_all, fields())]
#[instrument(name = "rebase.perform_rebase", level = "info", skip_all, fields(
si.change_set.id = Empty,
si.workspace.id = Empty,
si.conflicts = Empty,
si.updates = Empty,
si.conflicts.count = Empty,
si.updates.count = Empty,
))]
pub async fn perform_rebase(
ctx: &mut DalContext,
message: &ActivityRebaseRequest,
) -> RebaseResult<RebaseStatus> {
let span = Span::current();
span.record(
"si.change_set.id",
&message.metadata.tenancy.change_set_id.to_string(),
);
span.record(
"si.workspace.id",
&message.metadata.tenancy.workspace_pk.to_string(),
);
let start = Instant::now();
// Gather everything we need to detect conflicts and updates from the inbound message.
let mut to_rebase_change_set =
Expand Down Expand Up @@ -153,13 +169,19 @@ pub async fn perform_rebase(
.await?;
info!("pointer updated: {:?}", start.elapsed());
}
let updates_count = conflicts_and_updates.updates.len();
let updates_performed = serde_json::to_value(conflicts_and_updates.updates)?.to_string();

RebaseStatus::Success {
updates_performed: serde_json::to_value(conflicts_and_updates.updates)?.to_string(),
}
span.record("si.updates", updates_performed.clone());
span.record("si.updates.count", updates_count.to_string());
RebaseStatus::Success { updates_performed }
} else {
let conflicts_count = conflicts_and_updates.conflicts.len();
let conflicts_found = serde_json::to_value(conflicts_and_updates.conflicts)?.to_string();
span.record("si.conflicts", conflicts_found.clone());
span.record("si.conflicts.count", conflicts_count.to_string());
RebaseStatus::ConflictsFound {
conflicts_found: serde_json::to_value(conflicts_and_updates.conflicts)?.to_string(),
conflicts_found,
updates_found_and_skipped: serde_json::to_value(conflicts_and_updates.updates)?
.to_string(),
}
Expand Down
2 changes: 1 addition & 1 deletion lib/rebaser-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl Server {
skip_all,
fields(
si.change_set.id = %change_set_id,
si.workspace.pk = %workspace_id,
si.workspace.id = %workspace_id,
)
)]
async fn launch_change_set_task(
Expand Down
2 changes: 2 additions & 0 deletions lib/sdf-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub use routes::{routes, AppError};
pub use server::{build_service, build_service_for_tests, Server};
pub use si_data_pg::PgPool;
pub use si_layer_cache::LayerDb;
pub use telemetry::prelude::*;
pub use uds::{UdsIncomingStream, UdsIncomingStreamError};

mod config;
Expand Down Expand Up @@ -38,6 +39,7 @@ macro_rules! impl_default_error_into_response {
let body = Json(
serde_json::json!({ "error": { "message": error_message, "code": 42, "statusCode": status.as_u16() } }),
);
telemetry::prelude::tracing::error!(si.error.message = error_message);

(status, body).into_response()
}
Expand Down
3 changes: 2 additions & 1 deletion lib/sdf-server/src/server/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use hyper::StatusCode;
use serde_json::{json, Value};
use si_data_nats::NatsError;
use si_data_pg::PgError;
use telemetry::prelude::*;
use thiserror::Error;
use tower_http::cors::CorsLayer;
use tower_http::{compression::CompressionLayer, cors::AllowOrigin};
Expand Down Expand Up @@ -127,7 +128,7 @@ impl IntoResponse for AppError {
"statusCode": status.as_u16(),
},
}));

error!(si.error.message = error_message);
(status, body).into_response()
}
}
2 changes: 2 additions & 0 deletions lib/sdf-server/src/server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use axum::{
Json,
};
use serde::{Serialize, Serializer};
use telemetry::prelude::*;

pub mod action;
pub mod async_route;
Expand Down Expand Up @@ -49,6 +50,7 @@ impl ApiError {

impl IntoResponse for ApiError {
fn into_response(self) -> Response {
error!(err=?self, ?self.error.status_code, ?self.error.status_code, self.error.message );
(self.error.status_code, Json(self)).into_response()
}
}
Expand Down
4 changes: 2 additions & 2 deletions lib/sdf-server/src/server/service/async_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ pub async fn handle_error(
match WsEvent::async_error(ctx, task_id, err_string).await {
Ok(event) => {
if let Err(commit_err) = event.publish_immediately(ctx).await {
error!("Unable to publish ws event for async error: {commit_err}");
error!(si.error.message = ?commit_err.to_string(), "Unable to publish ws event for async error");
}
}
Err(creation_err) => {
error!("Unable to create ws event for async error: {creation_err}")
error!( si.error.message = ?creation_err.to_string(), "Unable to create ws event for async error");
}
}
}
1 change: 1 addition & 0 deletions lib/sdf-server/src/server/service/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl IntoResponse for ChangeSetError {
serde_json::json!({ "error": { "message": error_message, "code": 42, "statusCode": status.as_u16() } }),
);

error!(si.error.message = error_message);
(status, body).into_response()
}
}
Expand Down
2 changes: 2 additions & 0 deletions lib/sdf-server/src/server/service/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use dal::{
};
use dal::{attribute::value::AttributeValueError, component::debug::ComponentDebugViewError};
use dal::{ChangeSetError, TransactionsError};
use telemetry::prelude::*;
use thiserror::Error;

use crate::server::state::AppState;
Expand Down Expand Up @@ -125,6 +126,7 @@ impl IntoResponse for ComponentError {
serde_json::json!({ "error": { "message": error_message, "code": 42, "statusCode": status.as_u16() } }),
);

error!(si.error.message = error_message);
(status, body).into_response()
}
}
Expand Down
Loading

0 comments on commit 038a95e

Please sign in to comment.