Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Users/konstantin tyukalov/split exec context #4759

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,31 +91,32 @@ public interface IExecutionContext : IAgentService, IKnobValueContext
void EmitHostNode20FallbackTelemetry(bool node20ResultsInGlibCErrorHost);
}

public sealed class ExecutionContext : AgentService, IExecutionContext, IDisposable
public class ExecutionContext : AgentService, IExecutionContext, IDisposable
{
private const int _maxIssueCount = 10;

private readonly TimelineRecord _record = new TimelineRecord();
protected readonly TimelineRecord _record = new();
protected ExecutionTargetInfo _defaultStepTarget;
protected ExecutionTargetInfo _currentStepTarget;
protected IPagingLogger _logger;
protected bool _outputForward = false;
protected Guid _mainTimelineId;
protected int _childTimelineRecordOrder = 0;
protected CancellationTokenSource _cancellationTokenSource;
protected IExecutionContext _parentExecutionContext;

private readonly Dictionary<Guid, TimelineRecord> _detailRecords = new Dictionary<Guid, TimelineRecord>();
private readonly object _loggerLock = new object();
private readonly List<IAsyncCommandContext> _asyncCommands = new List<IAsyncCommandContext>();
private readonly HashSet<string> _outputvariables = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
private readonly List<TaskRestrictions> _restrictions = new List<TaskRestrictions>();
private readonly string _buildLogsFolderName = "buildlogs";
private IAgentLogPlugin _logPlugin;
private IPagingLogger _logger;
private IJobServerQueue _jobServerQueue;
private IExecutionContext _parentExecutionContext;
private bool _outputForward = false;
private Guid _mainTimelineId;
private Guid _detailTimelineId;
private int _childTimelineRecordOrder = 0;
private CancellationTokenSource _cancellationTokenSource;
private CancellationTokenSource _forceCompleteCancellationTokenSource = new CancellationTokenSource();
private TaskCompletionSource<int> _forceCompleted = new TaskCompletionSource<int>();
private bool _throttlingReported = false;
private ExecutionTargetInfo _defaultStepTarget;
private ExecutionTargetInfo _currentStepTarget;
private bool _disableLogUploads;
private string _buildLogsFolderPath;
private string _buildLogsFile;
Expand All @@ -130,17 +131,17 @@ public sealed class ExecutionContext : AgentService, IExecutionContext, IDisposa
public Task ForceCompleted => _forceCompleted.Task;
public CancellationToken CancellationToken => _cancellationTokenSource.Token;
public CancellationToken ForceCompleteCancellationToken => _forceCompleteCancellationTokenSource.Token;
public List<ServiceEndpoint> Endpoints { get; private set; }
public List<SecureFile> SecureFiles { get; private set; }
public List<Pipelines.RepositoryResource> Repositories { get; private set; }
public Dictionary<string, string> JobSettings { get; private set; }
public Variables Variables { get; private set; }
public Variables TaskVariables { get; private set; }
public List<ServiceEndpoint> Endpoints { get; protected set; }
public List<SecureFile> SecureFiles { get; protected set; }
public List<Pipelines.RepositoryResource> Repositories { get; protected set; }
public Dictionary<string, string> JobSettings { get; protected set; }
public Variables Variables { get; protected set; }
public Variables TaskVariables { get; protected set; }
public HashSet<string> OutputVariables => _outputvariables;
public bool WriteDebug { get; private set; }
public List<string> PrependPath { get; private set; }
public List<ContainerInfo> Containers { get; private set; }
public List<ContainerInfo> SidecarContainers { get; private set; }
public bool WriteDebug { get; protected set; }
public List<string> PrependPath { get; protected set; }
public List<ContainerInfo> Containers { get; protected set; }
public List<ContainerInfo> SidecarContainers { get; protected set; }
public List<TaskRestrictions> Restrictions => _restrictions;
public List<IAsyncCommandContext> AsyncCommands => _asyncCommands;

Expand Down Expand Up @@ -172,7 +173,7 @@ public string ResultCode
}
}

public PlanFeatures Features { get; private set; }
public PlanFeatures Features { get; protected set; }

public override void Initialize(IHostContext hostContext)
{
Expand Down Expand Up @@ -761,7 +762,7 @@ public ITraceWriter GetTraceWriter()
return Trace;
}

private void InitializeTimelineRecord(Guid timelineId, Guid timelineRecordId, Guid? parentTimelineRecordId, string recordType, string displayName, string refName, int? order)
protected internal void InitializeTimelineRecord(Guid timelineId, Guid timelineRecordId, Guid? parentTimelineRecordId, string recordType, string displayName, string refName, int? order)
{
_mainTimelineId = timelineId;
_record.Id = timelineRecordId;
Expand Down Expand Up @@ -937,13 +938,23 @@ private void PublishTelemetry(

public void Dispose()
{
_cancellationTokenSource?.Dispose();
_forceCompleteCancellationTokenSource?.Dispose();
Dispose(true);

GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
if (disposing)
{
_cancellationTokenSource?.Dispose();
_forceCompleteCancellationTokenSource?.Dispose();

_buildLogsWriter?.Dispose();
_buildLogsWriter = null;
_buildLogsData?.Dispose();
_buildLogsData = null;
_buildLogsWriter?.Dispose();
_buildLogsWriter = null;
_buildLogsData?.Dispose();
_buildLogsData = null;
}
}
}

Expand Down
65 changes: 65 additions & 0 deletions src/Agent.Worker/ExecutionContext/JobExecutionContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Threading;
using Microsoft.TeamFoundation.DistributedTask.WebApi;

namespace Microsoft.VisualStudio.Services.Agent.Worker;

public interface IJobExecutionContext : IExecutionContext
{
public ITaskExecutionContext CreateTaskExecutionContext(
Guid recordId,
string displayName,
string refName,
Variables taskVariables = null,
bool outputForward = false,
List<TaskRestrictions> taskRestrictions = null);
}

public sealed class JobExecutionContext : ExecutionContext, IJobExecutionContext
{
public ITaskExecutionContext CreateTaskExecutionContext(
Guid recordId,
string displayName,
string refName,
Variables taskVariables = null,
bool outputForward = false,
List<TaskRestrictions> taskRestrictions = null)
{
Trace.Entering();

var taskContext = new TaskExecutionContext();
taskContext.Initialize(HostContext);
taskContext.InitContextProperties(
jobContext: this,
features: Features,
variables: Variables,
endpoints: Endpoints,
repositories: Repositories,
jobSettings: JobSettings,
secureFiles: SecureFiles,
taskVariables: taskVariables,
writeDebug: WriteDebug,
prependPath: PrependPath,
containers: Containers,
sidecarContainers: SidecarContainers,
outputForward: outputForward,
defaultStepTarget: _defaultStepTarget,
currentStepTarget: _currentStepTarget,
cancellationTokenSource: new CancellationTokenSource());

if (taskRestrictions != null)
{
taskContext.Restrictions.AddRange(taskRestrictions);
}

taskContext.InitializeTimelineRecord(_mainTimelineId, recordId, _record.Id, ExecutionContextType.Task, displayName, refName, ++_childTimelineRecordOrder);

taskContext.InitLogger(_mainTimelineId, recordId);

return taskContext;
}
}
87 changes: 87 additions & 0 deletions src/Agent.Worker/ExecutionContext/TaskExecutionContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Threading;
using System.Collections.Generic;
using Microsoft.TeamFoundation.DistributedTask.WebApi;
using Microsoft.VisualStudio.Services.Agent.Util;
using Microsoft.TeamFoundation.DistributedTask.Pipelines;
using Agent.Sdk;

namespace Microsoft.VisualStudio.Services.Agent.Worker;

public interface ITaskExecutionContext : IExecutionContext
{
public IJobExecutionContext JobContext { get; }

public void InitLogger(Guid mainTimelineId, Guid recordId);

public void InitContextProperties(
IJobExecutionContext jobContext,
PlanFeatures features,
Variables variables,
List<ServiceEndpoint> endpoints,
List<RepositoryResource> repositories,
Dictionary<string, string> jobSettings,
List<SecureFile> secureFiles,
Variables taskVariables,
bool writeDebug,
List<string> prependPath,
List<ContainerInfo> containers,
List<ContainerInfo> sidecarContainers,
bool outputForward,
ExecutionTargetInfo defaultStepTarget,
ExecutionTargetInfo currentStepTarget,
CancellationTokenSource cancellationTokenSource);
}

public sealed class TaskExecutionContext : ExecutionContext, ITaskExecutionContext
{
public IJobExecutionContext JobContext { get; private set; }

public TaskExecutionContext() { }

public void InitContextProperties(
IJobExecutionContext jobContext,
PlanFeatures features,
Variables variables,
List<ServiceEndpoint> endpoints,
List<RepositoryResource> repositories,
Dictionary<string,string> jobSettings,
List<SecureFile> secureFiles,
Variables taskVariables,
bool writeDebug,
List<string> prependPath,
List<ContainerInfo> containers,
List<ContainerInfo> sidecarContainers,
bool outputForward,
ExecutionTargetInfo defaultStepTarget,
ExecutionTargetInfo currentStepTarget,
CancellationTokenSource cancellationTokenSource)
{
JobContext = jobContext;
_parentExecutionContext = jobContext;
Features = features;
Variables = variables;
Endpoints = endpoints;
Repositories = repositories;
JobSettings = jobSettings;
SecureFiles = secureFiles;
TaskVariables = taskVariables;
_cancellationTokenSource = cancellationTokenSource;
WriteDebug = writeDebug;
PrependPath = prependPath;
Containers = containers;
SidecarContainers = sidecarContainers;
_outputForward = outputForward;
_defaultStepTarget = defaultStepTarget;
_currentStepTarget = currentStepTarget;
}

public void InitLogger(Guid mainTimelineId, Guid recordId)
{
_logger = HostContext.CreateService<IPagingLogger>();
_logger.Setup(mainTimelineId, recordId);
}
}
18 changes: 9 additions & 9 deletions src/Agent.Worker/JobExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ namespace Microsoft.VisualStudio.Services.Agent.Worker
public interface IJobExtension : IExtension
{
HostTypes HostType { get; }
Task<List<IStep>> InitializeJob(IExecutionContext jobContext, Pipelines.AgentJobRequestMessage message);
Task FinalizeJob(IExecutionContext jobContext);
Task<List<IStep>> InitializeJob(IJobExecutionContext jobContext, Pipelines.AgentJobRequestMessage message);
Task FinalizeJob(IJobExecutionContext jobContext);
string GetRootedPath(IExecutionContext context, string path);
void ConvertLocalPath(IExecutionContext context, string localPath, out string repoName, out string sourcePath);
}
Expand Down Expand Up @@ -55,14 +55,14 @@ public abstract class JobExtension : AgentService, IJobExtension
// download all required tasks.
// make sure all task's condition inputs are valid.
// build up three list of steps for jobrunner. (pre-job, job, post-job)
public async Task<List<IStep>> InitializeJob(IExecutionContext jobContext, Pipelines.AgentJobRequestMessage message)
public async Task<List<IStep>> InitializeJob(IJobExecutionContext jobContext, Pipelines.AgentJobRequestMessage message)
{
Trace.Entering();
ArgUtil.NotNull(jobContext, nameof(jobContext));
ArgUtil.NotNull(message, nameof(message));

// create a new timeline record node for 'Initialize job'
IExecutionContext context = jobContext.CreateChild(Guid.NewGuid(), StringUtil.Loc("InitializeJob"), $"{nameof(JobExtension)}_Init");
ITaskExecutionContext context = jobContext.CreateTaskExecutionContext(Guid.NewGuid(), StringUtil.Loc("InitializeJob"), $"{nameof(JobExtension)}_Init");

List<IStep> preJobSteps = new List<IStep>();
List<IStep> jobSteps = new List<IStep>();
Expand Down Expand Up @@ -373,7 +373,7 @@ public async Task<List<IStep>> InitializeJob(IExecutionContext jobContext, Pipel
{
ITaskRunner taskStep = step as ITaskRunner;
ArgUtil.NotNull(taskStep, step.DisplayName);
taskStep.ExecutionContext = jobContext.CreateChild(
taskStep.ExecutionContext = jobContext.CreateTaskExecutionContext(
Guid.NewGuid(),
StringUtil.Loc("PreJob", taskStep.DisplayName),
taskStep.Task.Name,
Expand All @@ -388,7 +388,7 @@ public async Task<List<IStep>> InitializeJob(IExecutionContext jobContext, Pipel
{
ITaskRunner taskStep = step as ITaskRunner;
ArgUtil.NotNull(taskStep, step.DisplayName);
taskStep.ExecutionContext = jobContext.CreateChild(
taskStep.ExecutionContext = jobContext.CreateTaskExecutionContext(
taskStep.Task.Id,
taskStep.DisplayName,
taskStep.Task.Name,
Expand Down Expand Up @@ -418,7 +418,7 @@ public async Task<List<IStep>> InitializeJob(IExecutionContext jobContext, Pipel
{
ITaskRunner taskStep = step as ITaskRunner;
ArgUtil.NotNull(taskStep, step.DisplayName);
taskStep.ExecutionContext = jobContext.CreateChild(
taskStep.ExecutionContext = jobContext.CreateTaskExecutionContext(
Guid.NewGuid(),
StringUtil.Loc("PostJob", taskStep.DisplayName),
taskStep.Task.Name,
Expand Down Expand Up @@ -515,13 +515,13 @@ public async Task<List<IStep>> InitializeJob(IExecutionContext jobContext, Pipel
}
}

public async Task FinalizeJob(IExecutionContext jobContext)
public async Task FinalizeJob(IJobExecutionContext jobContext)
{
Trace.Entering();
ArgUtil.NotNull(jobContext, nameof(jobContext));

// create a new timeline record node for 'Finalize job'
IExecutionContext context = jobContext.CreateChild(Guid.NewGuid(), StringUtil.Loc("FinalizeJob"), $"{nameof(JobExtension)}_Final");
ITaskExecutionContext context = jobContext.CreateTaskExecutionContext(Guid.NewGuid(), StringUtil.Loc("FinalizeJob"), $"{nameof(JobExtension)}_Final");
using (var register = jobContext.CancellationToken.Register(() => { context.CancelToken(); }))
{
try
Expand Down
4 changes: 2 additions & 2 deletions src/Agent.Worker/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ public async Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message,
_jobServerQueue.Start(message);
HostContext.WritePerfCounter($"WorkerJobServerQueueStarted_{message.RequestId.ToString()}");

IExecutionContext jobContext = null;
IJobExecutionContext jobContext = null;
CancellationTokenRegistration? agentShutdownRegistration = null;
VssConnection taskConnection = null;
VssConnection legacyTaskConnection = null;
IResourceMetricsManager resourceDiagnosticManager = null;
try
{
// Create the job execution context.
jobContext = HostContext.CreateService<IExecutionContext>();
jobContext = HostContext.CreateService<IJobExecutionContext>();
jobContext.InitializeJob(message, jobRequestCancellationToken);

Trace.Info("Starting the job execution context.");
Expand Down
Loading