From b805b82557de20a1c360b704d6c056799f4b38dd Mon Sep 17 00:00:00 2001 From: Patrick Date: Thu, 14 Nov 2024 12:21:56 -0500 Subject: [PATCH] adding custom message w/ workflow execution ID when duration exceeds 15 minutes (#15241) * adding custom message w/ workflow execution ID when duration exceeds 15 minutes * adding logging for internal evs * fixing Info --> Infof --- core/services/workflows/engine.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index e20af85540d..69c36c1c174 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -27,6 +27,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) +const fifteenMinutesMs = 15 * 60 * 1000 + type stepRequest struct { stepRef string state store.WorkflowExecution @@ -622,7 +624,7 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.Workflow // the async nature of the workflow engine would provide no guarantees. } logCustMsg(ctx, cma, "execution status: "+status, l) - return e.finishExecution(ctx, state.ExecutionID, status) + return e.finishExecution(ctx, cma, state.ExecutionID, status) } // Finally, since the workflow hasn't timed out or completed, let's @@ -669,9 +671,12 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) { } } -func (e *Engine) finishExecution(ctx context.Context, executionID string, status string) error { - e.logger.With(platform.KeyWorkflowExecutionID, executionID, "status", status).Info("finishing execution") +func (e *Engine) finishExecution(ctx context.Context, cma custmsg.MessageEmitter, executionID string, status string) error { + l := e.logger.With(platform.KeyWorkflowExecutionID, executionID, "status", status) metrics := e.metrics.with("status", status) + + l.Info("finishing execution") + err := e.executionStates.UpdateStatus(ctx, executionID, status) if err != nil { return err @@ -687,6 +692,13 @@ func (e *Engine) finishExecution(ctx context.Context, executionID string, status e.stepUpdatesChMap.remove(executionID) metrics.updateTotalWorkflowsGauge(ctx, e.stepUpdatesChMap.len()) metrics.updateWorkflowExecutionLatencyGauge(ctx, executionDuration) + + if executionDuration > fifteenMinutesMs { + logCustMsg(ctx, cma, fmt.Sprintf("execution duration exceeded 15 minutes: %d", executionDuration), l) + l.Warnf("execution duration exceeded 15 minutes: %d", executionDuration) + } + logCustMsg(ctx, cma, fmt.Sprintf("execution duration: %d", executionDuration), l) + l.Infof("execution duration: %d", executionDuration) e.onExecutionFinished(executionID) return nil }