From ec9bdfc1b956fac6c1498295022529bc406bd3aa Mon Sep 17 00:00:00 2001 From: Tingluo Huang Date: Wed, 4 Sep 2024 15:57:23 -0400 Subject: [PATCH] Improve reliability for waiting job to start before running steps. --- src/Runner.Common/JobServerQueue.cs | 39 +++++++++++++++++++++++++++++ src/Runner.Worker/JobRunner.cs | 10 ++++++-- 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/src/Runner.Common/JobServerQueue.cs b/src/Runner.Common/JobServerQueue.cs index 74c12bea28b..31206a1d4c2 100644 --- a/src/Runner.Common/JobServerQueue.cs +++ b/src/Runner.Common/JobServerQueue.cs @@ -77,6 +77,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue private int _resultsServiceExceptionsCount = 0; private Stopwatch _resultsUploadTimer = new(); private Stopwatch _actionsUploadTimer = new(); + private Stopwatch _jobRecordUpdatedTimer = new(); public TaskCompletionSource JobRecordUpdated => _jobRecordUpdated; @@ -96,6 +97,8 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue private bool _resultsClientInitiated = false; private bool _enableTelemetry = false; + private bool _enableJobRecordUpdatedTelemetry = false; + private bool _enableAutoRetry = false; private delegate Task ResultsFileUploadHandler(ResultsUploadFileInfo file); public override void Initialize(IHostContext hostContext) @@ -180,6 +183,23 @@ public void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultsServi _allDequeueTasks = new Task[] { _webConsoleLineDequeueTask, _fileUploadDequeueTask, _timelineUpdateDequeueTask, _resultsUploadDequeueTask }; _queueInProcess = true; + + + if (jobRequest.Variables.TryGetValue("DistributedTask.EnableJobRecordUpdatedTelemetry", out VariableValue enableTelemetry)) + { + _enableJobRecordUpdatedTelemetry = StringUtil.ConvertToBoolean(enableTelemetry?.Value); + if (_enableJobRecordUpdatedTelemetry) + { + Trace.Info("Enable telemetry for first job record update."); + _jobRecordUpdatedTimer.Start(); + } + } + + if (jobRequest.Variables.TryGetValue("DistributedTask.EnableRecordUpdateAutoRetry", out VariableValue enableAutoRetry)) + { + Trace.Info("Enable auto retry for timeline record update."); + _enableAutoRetry = StringUtil.ConvertToBoolean(enableAutoRetry?.Value); + } } // WebConsoleLine queue and FileUpload queue are always best effort @@ -232,6 +252,11 @@ public async Task ShutdownAsync() Trace.Info(uploadTimeComparison); _jobTelemetries.Add(new JobTelemetry() { Type = JobTelemetryType.General, Message = uploadTimeComparison }); } + + if (_enableJobRecordUpdatedTelemetry) + { + _jobTelemetries.Add(new JobTelemetry() { Type = JobTelemetryType.General, Message = $"First job record updated time after: {_jobRecordUpdatedTimer.ElapsedMilliseconds} ms" }); + } } public void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber) @@ -729,6 +754,11 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false) // We have changed the state of the job Trace.Info("Job timeline record has been updated for the first time."); _jobRecordUpdated.TrySetResult(0); + + if (_enableJobRecordUpdatedTelemetry) + { + _jobRecordUpdatedTimer.Stop(); + } } } catch (Exception ex) @@ -740,6 +770,15 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false) { mainTimelineRecordsUpdateErrors.Add(ex); } + + if (!runOnce && _enableAutoRetry) + { + foreach (var retryRecordId in update.PendingRecords.DistinctBy(x => x.Id).Select(r => r.Id)) + { + Trace.Verbose("Enqueue timeline record {0} update for retry.", retryRecordId); + _timelineUpdateQueue[update.TimelineId].Enqueue(new TimelineRecord() { Id = retryRecordId }); + } + } } } } diff --git a/src/Runner.Worker/JobRunner.cs b/src/Runner.Worker/JobRunner.cs index ad265ecaf68..7a435f481c4 100644 --- a/src/Runner.Worker/JobRunner.cs +++ b/src/Runner.Worker/JobRunner.cs @@ -210,8 +210,14 @@ public async Task RunAsync(AgentJobRequestMessage message, Cancellat // Server won't issue ID_TOKEN for non-inprogress job. // If the job is trying to use OIDC feature, we want the job to be marked as in-progress before running any customer's steps as much as we can. // Timeline record update background process runs every 500ms, so delay 1000ms is enough for most of the cases - Trace.Info($"Waiting for job to be marked as started."); - await Task.WhenAny(_jobServerQueue.JobRecordUpdated.Task, Task.Delay(1000)); + var maxWaitTimeInSeconds = jobContext.Global.Variables.GetInt("DistributedTask.FirstJobRecordUpdateWaitTimeInSeconds"); + if (maxWaitTimeInSeconds == null) + { + maxWaitTimeInSeconds = 1; + } + + Trace.Info($"Waiting {maxWaitTimeInSeconds.Value} seconds for job to be marked as started."); + await Task.WhenAny(_jobServerQueue.JobRecordUpdated.Task, Task.Delay(maxWaitTimeInSeconds.Value * 1000)); } // Run all job steps