From 81049a5275a94043d7839dc51309fca92f260f89 Mon Sep 17 00:00:00 2001 From: William Sossamon <3278433+WillSoss@users.noreply.github.com> Date: Sat, 17 Feb 2024 10:15:11 -0600 Subject: [PATCH] Use the generic host and application lifetime to handle sigint/sigterm events --- src/Diagnostics/Program.cs | 5 +- src/Runly/Hosting/GetAction.cs | 96 ++-- src/Runly/Hosting/HostedAction.cs | 70 +++ src/Runly/Hosting/IHostAction.cs | 24 - src/Runly/Hosting/ListAction.cs | 74 +-- src/Runly/Hosting/RunAction.cs | 513 +++++++++--------- src/Runly/JobHost.cs | 33 +- src/Runly/Runly.csproj | 4 +- src/Runly/ServiceExtensions.cs | 18 +- test/Runly.Tests.Cli/Program.cs | 5 +- test/Runly.Tests/Jobs.cs | 26 +- test/Runly.Tests/Runly.Tests.csproj | 6 +- .../Applying_config_overrides.cs | 32 +- .../Scenarios/Running/Running_a_job.cs | 18 +- 14 files changed, 508 insertions(+), 416 deletions(-) create mode 100644 src/Runly/Hosting/HostedAction.cs delete mode 100644 src/Runly/Hosting/IHostAction.cs diff --git a/src/Diagnostics/Program.cs b/src/Diagnostics/Program.cs index bcf1a2a..5c33da9 100644 --- a/src/Diagnostics/Program.cs +++ b/src/Diagnostics/Program.cs @@ -1,4 +1,5 @@ -using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using System.Threading.Tasks; namespace Runly.Diagnostics { @@ -8,7 +9,7 @@ static Task Main(string[] args) { return JobHost.CreateDefaultBuilder(args) .Build() - .RunJobAsync(); + .RunAsync(); } } } diff --git a/src/Runly/Hosting/GetAction.cs b/src/Runly/Hosting/GetAction.cs index 9d3e136..dbd6380 100644 --- a/src/Runly/Hosting/GetAction.cs +++ b/src/Runly/Hosting/GetAction.cs @@ -1,71 +1,81 @@ -using System; +using Microsoft.Extensions.Hosting; +using System; using System.IO; using System.Threading; using System.Threading.Tasks; namespace Runly.Hosting { - class GetAction : IHostAction + internal class GetAction : HostedAction { - readonly bool verbose; - string filePath; - readonly string jobType; - readonly JobCache cache; + private readonly bool _verbose; + private string _filePath; + private readonly string _jobType; + private readonly JobCache _cache; + private readonly IHostApplicationLifetime _applciationLifetime; - internal GetAction(bool verbose, string jobType, string filePath, JobCache cache) + internal GetAction(bool verbose, string jobType, string filePath, JobCache cache, IHostApplicationLifetime applicationLifetime) { - this.verbose = verbose; - this.jobType = jobType; - this.filePath = filePath; - this.cache = cache; + _verbose = verbose; + _jobType = jobType; + _filePath = filePath; + _cache = cache; + _applciationLifetime = applicationLifetime; } - public async Task RunAsync(CancellationToken cancel) + protected override async Task RunAsync(CancellationToken cancel) { - TextWriter writer = null; - JobInfo job; - try { - job = cache.Get(jobType); - } - catch (TypeNotFoundException) - { - Console.WriteLine($"Could not find the job type '{jobType}'."); - return; - } + TextWriter writer = null; + JobInfo job; - var config = cache.GetDefaultConfig(job); + try + { + job = _cache.Get(_jobType); + } + catch (TypeNotFoundException) + { + Console.WriteLine($"Could not find the job type '{_jobType}'."); + return; + } - try - { + var config = _cache.GetDefaultConfig(job); - if (filePath == null) + try { - writer = Console.Out; + + if (_filePath == null) + { + writer = Console.Out; + } + else + { + // If path is an existing directory, such as ".", add a file name + if (Directory.Exists(_filePath)) + _filePath = Path.Combine(_filePath, job.JobType.Name + ".json"); + + writer = new StreamWriter(File.Open(_filePath, FileMode.Create)); + } + + await writer.WriteAsync(_verbose ? ConfigWriter.ToJson(config) : ConfigWriter.ToReducedJson(config)); } - else + finally { - // If path is an existing directory, such as ".", add a file name - if (Directory.Exists(filePath)) - filePath = Path.Combine(filePath, job.JobType.Name + ".json"); - - writer = new StreamWriter(File.Open(filePath, FileMode.Create)); + if (_filePath != null && writer != null) + { + await writer.FlushAsync(); + writer.Dispose(); + } } - - await writer.WriteAsync(verbose ? ConfigWriter.ToJson(config) : ConfigWriter.ToReducedJson(config)); + + if (_filePath != null) + Console.WriteLine($"Default config for {job.JobType.FullName} saved to {Path.GetFullPath(_filePath)}"); } finally { - if (filePath != null && writer != null) - { - await writer.FlushAsync(); - writer.Dispose(); - } + _applciationLifetime?.StopApplication(); } - - if (filePath != null) - Console.WriteLine($"Default config for {job.JobType.FullName} saved to {Path.GetFullPath(filePath)}"); } } } diff --git a/src/Runly/Hosting/HostedAction.cs b/src/Runly/Hosting/HostedAction.cs new file mode 100644 index 0000000..febbfcb --- /dev/null +++ b/src/Runly/Hosting/HostedAction.cs @@ -0,0 +1,70 @@ +using Microsoft.Extensions.Hosting; +using System.Threading; +using System.Threading.Tasks; + +namespace Runly.Hosting +{ + internal abstract class HostedAction : IHostedService + { + private Task _run; + private CancellationTokenSource _stoppingCts; + + /// + /// Triggered when the application host is ready to start the service. + /// + /// Indicates that the start process has been aborted. + /// A that represents the asynchronous Start operation. + public virtual Task StartAsync(CancellationToken cancellationToken) + { + // Create linked token to allow cancelling executing task from provided token + _stoppingCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + + // Store the task we're executing + _run = RunAsync(_stoppingCts.Token); + + // If the task is completed then return it, this will bubble cancellation and failure to the caller + if (_run.IsCompleted) + { + return _run; + } + + // Otherwise it's running + return Task.CompletedTask; + } + + /// + /// Triggered when the application host is performing a graceful shutdown. + /// + /// Indicates that the shutdown process should no longer be graceful. + /// A that represents the asynchronous Stop operation. + public virtual async Task StopAsync(CancellationToken cancellationToken) + { + // Stop called without start + if (_run == null) + return; + + try + { + // Signal cancellation to the executing method + _stoppingCts!.Cancel(); + } + finally + { + // Wait until the task completes or the stop token triggers + var tcs = new TaskCompletionSource(); + using CancellationTokenRegistration registration = cancellationToken.Register(s => ((TaskCompletionSource)s!).SetCanceled(), tcs); + // Do not await the _executeTask because cancelling it will throw an OperationCanceledException which we are explicitly ignoring + await Task.WhenAny(_run, tcs.Task).ConfigureAwait(false); + } + + } + + /// + public virtual void Dispose() + { + _stoppingCts?.Cancel(); + } + + protected abstract Task RunAsync(CancellationToken cancellation); + } +} diff --git a/src/Runly/Hosting/IHostAction.cs b/src/Runly/Hosting/IHostAction.cs deleted file mode 100644 index f66c199..0000000 --- a/src/Runly/Hosting/IHostAction.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace Runly.Hosting -{ - interface IHostAction - { - Task RunAsync(CancellationToken cancel); - } - - static class HostActionExtensions - { - public static Task RunAsync(this IHostAction action) - { - return RunAsync(action, new CancellationTokenSource().Token); - } - - public static Task RunAsync(this IHostAction action, CancellationToken cancellation) - { - return action.RunAsync(cancellation); - } - } -} diff --git a/src/Runly/Hosting/ListAction.cs b/src/Runly/Hosting/ListAction.cs index 6ee32a1..706ad54 100644 --- a/src/Runly/Hosting/ListAction.cs +++ b/src/Runly/Hosting/ListAction.cs @@ -1,4 +1,5 @@ -using System; +using Microsoft.Extensions.Hosting; +using System; using System.Collections; using System.IO; using System.Linq; @@ -8,38 +9,47 @@ namespace Runly.Hosting { - class ListAction : IHostAction + internal class ListAction : HostedAction { - readonly bool verbose; - readonly bool json; - readonly JobCache cache; - readonly JsonSchema schema; + private readonly bool _verbose; + private readonly bool _json; + private readonly JobCache _cache; + private readonly JsonSchema _schema; + private readonly IHostApplicationLifetime _applicationLifetime; - public ListAction(bool verbose, bool json, JobCache cache, JsonSchema schema) + public ListAction(bool verbose, bool json, JobCache cache, JsonSchema schema, IHostApplicationLifetime applicationLifetime) { - this.verbose = verbose; - this.json = json; - this.cache = cache ?? throw new ArgumentNullException(nameof(cache)); - this.schema = schema ?? throw new ArgumentNullException(nameof(schema)); + _verbose = verbose; + _json = json; + _cache = cache ?? throw new ArgumentNullException(nameof(cache)); + _schema = schema ?? throw new ArgumentNullException(nameof(schema)); + _applicationLifetime = applicationLifetime; } - public async Task RunAsync(CancellationToken cancel) + protected override async Task RunAsync(CancellationToken cancel) { - string clientVersion = Assembly.GetExecutingAssembly().GetCustomAttribute().InformationalVersion; - - if (json) + try { - WriteJson(Console.Out, clientVersion); - } - else + string clientVersion = Assembly.GetExecutingAssembly().GetCustomAttribute().InformationalVersion; + + if (_json) + { + WriteJson(Console.Out, clientVersion); + } + else + { + WritePlainText(Console.Out, clientVersion); + } + + // Ensure the entire output can be read by the node + Console.WriteLine(); + await Console.Out.FlushAsync(); + } + finally { - WritePlainText(Console.Out, clientVersion); + _applicationLifetime?.StopApplication(); } - - // Ensure the entire output can be read by the node - Console.WriteLine(); - await Console.Out.FlushAsync(); - } + } void WriteJson(TextWriter writer, string clientVersion) { @@ -49,26 +59,26 @@ void WriteJson(TextWriter writer, string clientVersion) IEnumerable GetJobJson() { - if (verbose) + if (_verbose) { - return cache.Jobs.OrderBy(i => i.JobType.FullName).Select(p => new + return _cache.Jobs.OrderBy(i => i.JobType.FullName).Select(p => new { JobType = p.JobType.FullName, ConfigType = p.ConfigType.FullName, - DefaultConfig = cache.GetDefaultConfig(p.JobType.FullName), + DefaultConfig = _cache.GetDefaultConfig(p.JobType.FullName), Assembly = p.JobType.Assembly.GetName().Name, CanRun = p.IsValid, Errors = p.Errors.ToString(), - Schema = schema.Generate(p.ConfigType) + Schema = _schema.Generate(p.ConfigType) }); } // Include the reduced config - return cache.Jobs.OrderBy(i => i.JobType.FullName).Select(p => new + return _cache.Jobs.OrderBy(i => i.JobType.FullName).Select(p => new { JobType = p.JobType.FullName, ConfigType = p.ConfigType.FullName, - DefaultConfig = ConfigWriter.ToReducedJObject(cache.GetDefaultConfig(p.JobType.FullName)), + DefaultConfig = ConfigWriter.ToReducedJObject(_cache.GetDefaultConfig(p.JobType.FullName)), Assembly = p.JobType.Assembly.GetName().Name, CanRun = p.IsValid, Errors = p.Errors.ToString() @@ -80,7 +90,7 @@ void WritePlainText(TextWriter writer, string clientVersion) writer.WriteLine($"Client Version: v{clientVersion}"); writer.WriteLine(); - foreach (var job in cache.Jobs.OrderBy(i => i.JobType.FullName)) + foreach (var job in _cache.Jobs.OrderBy(i => i.JobType.FullName)) { writer.WriteLine(ConsoleFormat.DoubleLine); writer.WriteLine($"Job:\t{job.JobType.FullName} [{job.JobType.Assembly.GetName().Name}]"); @@ -90,7 +100,7 @@ void WritePlainText(TextWriter writer, string clientVersion) if (job.IsValid) { - writer.WriteLine(verbose ? ConfigWriter.ToJson(cache.GetDefaultConfig(job)) : ConfigWriter.ToReducedJson(cache.GetDefaultConfig(job))); + writer.WriteLine(_verbose ? ConfigWriter.ToJson(_cache.GetDefaultConfig(job)) : ConfigWriter.ToReducedJson(_cache.GetDefaultConfig(job))); writer.WriteLine(); } else diff --git a/src/Runly/Hosting/RunAction.cs b/src/Runly/Hosting/RunAction.cs index ca631a4..dddf1c3 100644 --- a/src/Runly/Hosting/RunAction.cs +++ b/src/Runly/Hosting/RunAction.cs @@ -1,4 +1,6 @@ -using Microsoft.Extensions.Logging; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using Runly.Client; using Runly.Client.Models; using Spectre.Console; @@ -11,322 +13,329 @@ using System.Text; using System.Threading; using System.Threading.Tasks; -using System.Xml; namespace Runly.Hosting { - class RunAction : IHostAction + class RunAction : HostedAction { - readonly Execution execution; - readonly Config config; - readonly ILogger logger; - readonly ResultsChannel api; - readonly bool debug; - TimeSpan processDuration = new TimeSpan(); - - public RunAction(Execution execution, Config config, ILogger logger, ResultsChannel api, Debug debug) + private readonly Config _config; + private readonly ResultsChannel _api; + private readonly IServiceProvider _serviceProvider; + private readonly IHostApplicationLifetime _applicationLifetime; + private readonly ILogger _logger; + private readonly bool _debug; + private TimeSpan _processDuration = new TimeSpan(); + + internal Config Config => _config; + + public RunAction(Config config, IHostApplicationLifetime applicationLifetime, IServiceProvider serviceProvider, ILogger logger, ResultsChannel api, Debug debug) { - this.execution = execution; - this.config = config; - this.logger = logger; - this.api = api; - this.debug = debug?.AttachDebugger ?? config?.Execution?.LaunchDebugger ?? false; + _serviceProvider = serviceProvider; + _applicationLifetime = applicationLifetime; + this._config = config; + this._logger = logger; + this._api = api; + this._debug = debug?.AttachDebugger ?? config?.Execution?.LaunchDebugger ?? false; } - public async Task RunAsync(CancellationToken token) + protected override async Task RunAsync(CancellationToken token) { - Console.OutputEncoding = Encoding.UTF8; - Console.InputEncoding = Encoding.UTF8; - - AnsiConsole.MarkupLine($"Running [blue]{config.Job.Type}[/]"); - AnsiConsole.WriteLine(); - AnsiConsole.MarkupLine($"[dim]Press 'c' to cancel[/]"); - AnsiConsole.WriteLine(); - - if (debug) - AttachDebugger(); - - if (api != null && string.IsNullOrWhiteSpace(config.RunlyApi.OrganizationId)) + try { - var message = "OrganizationId is required when an API token is provided."; - logger.LogError(message); - throw new ConfigException(message, nameof(Config.RunlyApi)); - } + var scope = _serviceProvider.CreateAsyncScope(); + try + { + var execution = scope.ServiceProvider.GetRequiredService(); - if (config.RunlyApi.InstanceId != Guid.Empty && (api == null || string.IsNullOrWhiteSpace(config.RunlyApi.OrganizationId))) - { - var message = "An API token and OrganizationId are required when InstanceId is specified."; - logger.LogError(message); - throw new ConfigException(message, nameof(Config.RunlyApi)); - } + Console.OutputEncoding = Encoding.UTF8; + Console.InputEncoding = Encoding.UTF8; - var jobCancellation = CancellationTokenSource.CreateLinkedTokenSource(token); - var pingCancellation = new CancellationTokenSource(); - var useApi = config.RunlyApi.InstanceId != Guid.Empty; - ResultsChannel.Connection channel = null; - ResultLog log = null; + AnsiConsole.MarkupLine($"Running [blue]{_config.Job.Type}[/]"); + AnsiConsole.WriteLine(); + AnsiConsole.MarkupLine($"[dim]Press Ctrl+C to cancel[/]"); + AnsiConsole.WriteLine(); - try - { - if (useApi) - { - channel = await api.ConnectAsync(config.RunlyApi.InstanceId); + if (_debug) + AttachDebugger(); - channel.CancellationRequested += () => + if (_api != null && string.IsNullOrWhiteSpace(_config.RunlyApi.OrganizationId)) { - jobCancellation.Cancel(); - return Task.CompletedTask; - }; - execution.StateChanged += s => UpdateInstanceState(channel); - execution.StateChanged += s => - { - if (s == ExecutionState.Processing && execution.TotalItemCount.HasValue) - return channel.SetTotal(execution.TotalItemCount.Value); - else if (s == ExecutionState.Finalizing) - return channel.MethodResult(new MethodOutcome(JobMethod.ProcessAsync, processDuration, (Error)null)); - else - return Task.CompletedTask; - }; - execution.MethodCompleted += m => channel.MethodResult(m); - execution.ItemCompleted += i => + var message = "OrganizationId is required when an API token is provided."; + _logger.LogError(message); + throw new ConfigException(message, nameof(Config.RunlyApi)); + } + + if (_config.RunlyApi.InstanceId != Guid.Empty && (_api == null || string.IsNullOrWhiteSpace(_config.RunlyApi.OrganizationId))) { - processDuration += TimeSpan.FromMilliseconds(i.Methods.Sum(m => m.Value.Duration.TotalMilliseconds)); - - if (config.RunlyApi.LogSuccessfulItemResults || !i.IsSuccessful) - return channel.ItemResult(i); - else - return Task.CompletedTask; - }; - execution.Completed += (output, disposition, completedAt) => channel.MarkComplete(disposition, output, execution.ItemCategories.Select(c => new ItemProgress + var message = "An API token and OrganizationId are required when InstanceId is specified."; + _logger.LogError(message); + throw new ConfigException(message, nameof(Config.RunlyApi)); + } + + var jobCancellation = CancellationTokenSource.CreateLinkedTokenSource(token); + var pingCancellation = new CancellationTokenSource(); + var useApi = _config.RunlyApi.InstanceId != Guid.Empty; + ResultsChannel.Connection channel = null; + ResultLog log = null; + + try { - Category = c.Category, - IsSuccessful = c.IsSuccessful, - Count = c.Count - })); - } + if (useApi) + { + channel = await _api.ConnectAsync(_config.RunlyApi.InstanceId); + + channel.CancellationRequested += () => + { + jobCancellation.Cancel(); + return Task.CompletedTask; + }; + execution.StateChanged += s => UpdateInstanceState(channel, execution); + execution.StateChanged += s => + { + if (s == ExecutionState.Processing && execution.TotalItemCount.HasValue) + return channel.SetTotal(execution.TotalItemCount.Value); + else if (s == ExecutionState.Finalizing) + return channel.MethodResult(new MethodOutcome(JobMethod.ProcessAsync, _processDuration, (Error)null)); + else + return Task.CompletedTask; + }; + execution.MethodCompleted += m => channel.MethodResult(m); + execution.ItemCompleted += i => + { + _processDuration += TimeSpan.FromMilliseconds(i.Methods.Sum(m => m.Value.Duration.TotalMilliseconds)); + + if (_config.RunlyApi.LogSuccessfulItemResults || !i.IsSuccessful) + return channel.ItemResult(i); + else + return Task.CompletedTask; + }; + execution.Completed += (output, disposition, completedAt) => channel.MarkComplete(disposition, output, execution.ItemCategories.Select(c => new ItemProgress + { + Category = c.Category, + IsSuccessful = c.IsSuccessful, + Count = c.Count + })); + } - if (config.Execution.ResultsToConsole || config.Execution.ResultsToFile) - log = new ResultLog(execution); + if (_config.Execution.ResultsToConsole || _config.Execution.ResultsToFile) + log = new ResultLog(execution); - var executing = execution.ExecuteAsync(jobCancellation.Token); - var pinging = Task.CompletedTask; + var executing = execution.ExecuteAsync(jobCancellation.Token); + var pinging = Task.CompletedTask; - if (useApi) - pinging = PingApi(channel, pingCancellation.Token); + if (useApi) + pinging = PingApi(channel, execution, pingCancellation.Token); - if (config.Execution.ResultsToConsole) - { - await AnsiConsole.Status() - .Spinner(Spinner.Known.Dots) - .SpinnerStyle(Style.Parse("yellow bold")) - .StartAsync("Running", async ctx => + if (_config.Execution.ResultsToConsole) { - AnsiConsole.Markup("Initializing..."); + await AnsiConsole.Status() + .Spinner(Spinner.Known.Dots) + .SpinnerStyle(Style.Parse("yellow bold")) + .StartAsync("Running", async ctx => + { + AnsiConsole.Markup("Initializing..."); - while (execution.State == ExecutionState.NotStarted || - execution.State == ExecutionState.Initializing) - await Task.Delay(10); + while (execution.State == ExecutionState.NotStarted || + execution.State == ExecutionState.Initializing) + await Task.Delay(10); - if (execution.InitializeAsync.IsSuccessful) - { - AnsiConsole.MarkupLine("[green]Success[/]"); + if (execution.InitializeAsync.IsSuccessful) + { + AnsiConsole.MarkupLine("[green]Success[/]"); - AnsiConsole.Markup("Getting items..."); + AnsiConsole.Markup("Getting items..."); - while (execution.State == ExecutionState.GettingItemsToProcess) - await Task.Delay(10); + while (execution.State == ExecutionState.GettingItemsToProcess) + await Task.Delay(10); - if (execution.GetItemsAsync.IsSuccessful && execution.GetEnumerator.IsSuccessful && execution.Count.IsSuccessful) - { - AnsiConsole.MarkupLine("[green]Success[/]"); - } - else - { - AnsiConsole.MarkupLine("[red]Failed[/]"); - AnsiConsole.WriteLine(); - AnsiConsole.WriteException(execution.GetItemsAsync.Exception ?? execution.GetEnumerator.Exception ?? execution.Count.Exception); - AnsiConsole.WriteLine(); - } - } - else - { - AnsiConsole.MarkupLine("[red]Failed[/]"); - AnsiConsole.WriteLine(); - AnsiConsole.WriteException(execution.InitializeAsync.Exception); - AnsiConsole.WriteLine(); - } + if (execution.GetItemsAsync.IsSuccessful && execution.GetEnumerator.IsSuccessful && execution.Count.IsSuccessful) + { + AnsiConsole.MarkupLine("[green]Success[/]"); + } + else + { + AnsiConsole.MarkupLine("[red]Failed[/]"); + AnsiConsole.WriteLine(); + AnsiConsole.WriteException(execution.GetItemsAsync.Exception ?? execution.GetEnumerator.Exception ?? execution.Count.Exception); + AnsiConsole.WriteLine(); + } + } + else + { + AnsiConsole.MarkupLine("[red]Failed[/]"); + AnsiConsole.WriteLine(); + AnsiConsole.WriteException(execution.InitializeAsync.Exception); + AnsiConsole.WriteLine(); + } - }); + }); - if (execution.Disposition != Disposition.Failed) - { - var isIndeterminate = !execution.TotalItemCount.HasValue; + if (execution.Disposition != Disposition.Failed) + { + var isIndeterminate = !execution.TotalItemCount.HasValue; - var cols = isIndeterminate ? - new ProgressColumn[] - { + var cols = isIndeterminate ? + new ProgressColumn[] + { new TaskDescriptionColumn(), new AverageTimeColumn(execution), new AverageTimeLastMinColumn(execution), new ElapsedTimeColumn() { Style = Style.Parse("dim") }, new SpinnerColumn(Spinner.Known.Dots), - } : - new ProgressColumn[] - { + } : + new ProgressColumn[] + { new TaskDescriptionColumn(), - new AverageTimeColumn(execution), - new AverageTimeLastMinColumn(execution), - new PercentageColumn(), - new ElapsedTimeColumn() { Style = Style.Parse("dim") }, - new SpinnerColumn(Spinner.Known.Dots), - }; - - await AnsiConsole.Progress() - .Columns(cols) - .StartAsync(async ctx => - { - var process = ctx.AddTask(isIndeterminate ? "Processing items:" : $"Processing {execution.TotalItemCount:N0} items:", maxValue: execution.TotalItemCount ?? double.MaxValue); - - process.IsIndeterminate = isIndeterminate; + new AverageTimeColumn(execution), + new AverageTimeLastMinColumn(execution), + new PercentageColumn(), + new ElapsedTimeColumn() { Style = Style.Parse("dim") }, + new SpinnerColumn(Spinner.Known.Dots), + }; - while (execution.State == ExecutionState.Processing) - { - process.Value = execution.CompletedItemCount; + await AnsiConsole.Progress() + .Columns(cols) + .StartAsync(async ctx => + { + var process = ctx.AddTask(isIndeterminate ? "Processing items:" : $"Processing {execution.TotalItemCount:N0} items:", maxValue: execution.TotalItemCount ?? double.MaxValue); - await Task.Delay(100); + process.IsIndeterminate = isIndeterminate; - if (!Console.IsInputRedirected) - { - while (Console.KeyAvailable) + while (execution.State == ExecutionState.Processing) { - var key = Console.ReadKey(); + process.Value = execution.CompletedItemCount; - if (key.KeyChar == 'c' || key.KeyChar == 'C') - jobCancellation.Cancel(); - } - } - } + await Task.Delay(100); + } - await Task.Delay(100); - ctx.Refresh(); + await Task.Delay(100); + ctx.Refresh(); - process.Value = execution.CompletedItemCount; - process.StopTask(); - }); - } - else - { - AnsiConsole.WriteLine(); - } + process.Value = execution.CompletedItemCount; + process.StopTask(); + }); + } + else + { + AnsiConsole.WriteLine(); + } - await AnsiConsole.Status() - .Spinner(Spinner.Known.Dots) - .SpinnerStyle(Style.Parse("yellow bold")) - .StartAsync("Running", async ctx => - { - if (log.Categories.Any()) - { - AnsiConsole.MarkupLine("[dim]Item results:[/]"); - - var grid = new Grid(); - grid.AddColumn(); - grid.AddColumn(); - - foreach (var c in log.Categories) - { - grid.AddRow(new Markup[] - { + await AnsiConsole.Status() + .Spinner(Spinner.Known.Dots) + .SpinnerStyle(Style.Parse("yellow bold")) + .StartAsync("Running", async ctx => + { + if (log.Categories.Any()) + { + AnsiConsole.MarkupLine("[dim]Item results:[/]"); + + var grid = new Grid(); + grid.AddColumn(); + grid.AddColumn(); + + foreach (var c in log.Categories) + { + grid.AddRow(new Markup[] + { new Markup($"{c.Count}"), new Markup($"{(c.IsSuccessful ? "" : "[red]")}{c.Category}{(c.IsSuccessful ? "" : "[/]")}") - }); - } + }); + } - AnsiConsole.Write(grid); - AnsiConsole.WriteLine(); - } + AnsiConsole.Write(grid); + AnsiConsole.WriteLine(); + } - if (log.FailedItemCount > 0) - { - foreach (var failure in log.FailedItemsThatThrewExceptions.Take(10)) - { - AnsiConsole.MarkupLine($"Item '{failure.Id ?? "Unknown"}' threw an exception:"); - AnsiConsole.WriteException(failure.ProcessAsync?.Exception ?? failure.GetItemIdAsync?.Exception ?? failure.EnumeratorCurrent?.Exception ?? failure.EnumeratorMoveNext?.Exception); - } + if (log.FailedItemCount > 0) + { + foreach (var failure in log.FailedItemsThatThrewExceptions.Take(10)) + { + AnsiConsole.MarkupLine($"Item '{failure.Id ?? "Unknown"}' threw an exception:"); + AnsiConsole.WriteException(failure.ProcessAsync?.Exception ?? failure.GetItemIdAsync?.Exception ?? failure.EnumeratorCurrent?.Exception ?? failure.EnumeratorMoveNext?.Exception); + } - AnsiConsole.WriteLine(); - } + AnsiConsole.WriteLine(); + } - AnsiConsole.Markup("Finalizing..."); + AnsiConsole.Markup("Finalizing..."); - while (execution.State == ExecutionState.Finalizing) - await Task.Delay(10); + while (execution.State == ExecutionState.Finalizing) + await Task.Delay(10); - if (execution.FinalizeAsync.IsSuccessful) - { - AnsiConsole.MarkupLine("[green]Success[/]"); - } - else - { - AnsiConsole.MarkupLine("[red]Failed[/]"); - AnsiConsole.WriteLine(); - AnsiConsole.WriteException(execution.FinalizeAsync.Exception); - AnsiConsole.WriteLine(); - } - }); - } + if (execution.FinalizeAsync.IsSuccessful) + { + AnsiConsole.MarkupLine("[green]Success[/]"); + } + else + { + AnsiConsole.MarkupLine("[red]Failed[/]"); + AnsiConsole.WriteLine(); + AnsiConsole.WriteException(execution.FinalizeAsync.Exception); + AnsiConsole.WriteLine(); + } + }); + } + // Execution.RunAsync will ensure all event handlers have completed before exiting + await executing; + pingCancellation.Cancel(); - // Execution.RunAsync will ensure all event handlers have completed before exiting - await executing; + if (_config.Execution.ResultsToConsole) + { + AnsiConsole.WriteLine(); - pingCancellation.Cancel(); + if (execution.Disposition == Disposition.Successful) + AnsiConsole.MarkupLine($"[bold green]JOB SUCCESSFUL[/]"); + else if (execution.Disposition == Disposition.Cancelled) + AnsiConsole.MarkupLine($"[bold darkorange]JOB CANCELLED[/]"); + else + AnsiConsole.MarkupLine($"[bold red invert]JOB FAILED[/]"); - if (config.Execution.ResultsToConsole) - { - AnsiConsole.WriteLine(); + } - if (execution.Disposition == Disposition.Successful) - AnsiConsole.MarkupLine($"[bold green]JOB SUCCESSFUL[/]"); - else if (execution.Disposition == Disposition.Cancelled) - AnsiConsole.MarkupLine($"[bold darkorange]JOB CANCELLED[/]"); - else - AnsiConsole.MarkupLine($"[bold red invert]JOB FAILED[/]"); + if (_config.Execution.ResultsToFile) + { + using var writer = new StreamWriter(File.Open(_config.Execution.ResultsFilePath, FileMode.Create)); - } + writer.WriteJson(log); + await writer.FlushAsync(); + } - if (config.Execution.ResultsToFile) - { - using var writer = new StreamWriter(File.Open(config.Execution.ResultsFilePath, FileMode.Create)); + try + { + await pinging; + } + catch (TaskCanceledException) { } - writer.WriteJson(log); - await writer.FlushAsync(); - } + if (channel != null) + await channel.FlushAsync(); + } + finally + { + if (channel != null) + await channel.DisposeAsync(); + } - try + // Ensure the entire output can be read by the node + await Console.Out.FlushAsync(); + } + finally { - await pinging; + await scope.DisposeAsync(); } - catch (TaskCanceledException) { } - - if (channel != null) - await channel.FlushAsync(); } finally - { - if (channel != null) - await channel.DisposeAsync(); - } - - // Ensure the entire output can be read by the node - await Console.Out.FlushAsync(); + { + _applicationLifetime?.StopApplication(); + } } /// /// Calls /// every 1 to 30 seconds, depending on whether new data is available. /// - private async Task PingApi(ResultsChannel.Connection channel, CancellationToken cancellationToken) + private async Task PingApi(ResultsChannel.Connection channel, Execution execution, CancellationToken cancellationToken) { var MinPingInterval = new TimeSpan(0, 0, 1); var MaxPingInterval = new TimeSpan(0, 0, 30); @@ -352,7 +361,7 @@ async Task Update() { try { - await UpdateInstanceState(channel, (state, categories) => + await UpdateInstanceState(channel, execution, (state, categories) => { // Taking counts from the categories instead of SuccessfulItemCount and FailedItemCount // so that there is no different between these sums and the category sums due to changes @@ -380,14 +389,14 @@ await UpdateInstanceState(channel, (state, categories) => } catch (Exception ex) { - logger.LogError(ex, "Error pinging API"); + _logger.LogError(ex, "Error pinging API"); } } } - async Task UpdateInstanceState(ResultsChannel.Connection channel, Func, bool> shouldUpdate = null) + async Task UpdateInstanceState(ResultsChannel.Connection channel, Execution execution, Func, bool> shouldUpdate = null) { - var state = GetInstanceState(execution.State); + var state = GetInstanceState(execution); var categories = execution.ItemCategories.Select(c => new ItemProgress { Category = c.Category, @@ -401,7 +410,7 @@ async Task UpdateInstanceState(ResultsChannel.Connection channel, Func state switch + private InstanceState GetInstanceState(Execution execution) => execution.State switch { // Not casting ExecutionState to InstanceState since these could diverge in the future ExecutionState.NotStarted => InstanceState.NotStarted, @@ -432,7 +441,7 @@ private void AttachDebugger() } catch (Exception ex) { - logger.LogError(ex, "Failed to launch debugger due to error:\n" + ex.ToString()); + _logger.LogError(ex, "Failed to launch debugger due to error:\n" + ex.ToString()); } } } diff --git a/src/Runly/JobHost.cs b/src/Runly/JobHost.cs index 50caa9f..048f6a2 100644 --- a/src/Runly/JobHost.cs +++ b/src/Runly/JobHost.cs @@ -1,3 +1,4 @@ +using System; using System.IO; using System.Reflection; using System.Runtime.InteropServices; @@ -116,27 +117,21 @@ public static IHostBuilder CreateDefaultBuilder(string[] args, params Assembly[] } /// - /// Runs the job. + /// Deprecated. Runs the job. /// /// The to run. /// The that represents the asynchronous operation. - public static Task RunJobAsync(this IHost host) - { - return host.RunJobAsync(new CancellationTokenSource().Token); - } + [Obsolete("Use IHost.RunAsync() instead.")] + public static Task RunJobAsync(this IHost host) => host.RunAsync(); - /// - /// Runs the job. - /// - /// The to run. - /// The token to trigger cancellation. - /// The that represents the asynchronous operation. - public static Task RunJobAsync(this IHost host, CancellationToken cancellationToken) - { - var scope = host.Services.CreateAsyncScope(); - var action = scope.ServiceProvider.GetRequiredService(); - - return action?.RunAsync(cancellationToken).ContinueWith(async action => await scope.DisposeAsync()) ?? Task.CompletedTask; - } - } + /// + /// Deprected. Runs the job. + /// + /// The to run. + /// The token to trigger cancellation. + /// The that represents the asynchronous operation. + [Obsolete("Use IHost.RunAsync(CancellationToken) instead.")] + public static Task RunJobAsync(this IHost host, CancellationToken cancellationToken) => + host.RunAsync(cancellationToken); + } } diff --git a/src/Runly/Runly.csproj b/src/Runly/Runly.csproj index 9a24d2f..e8378ad 100644 --- a/src/Runly/Runly.csproj +++ b/src/Runly/Runly.csproj @@ -29,8 +29,8 @@ - - + + diff --git a/src/Runly/ServiceExtensions.cs b/src/Runly/ServiceExtensions.cs index c0ce20b..ab483f2 100644 --- a/src/Runly/ServiceExtensions.cs +++ b/src/Runly/ServiceExtensions.cs @@ -1,4 +1,5 @@ using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Runly.Client; using Runly.Hosting; @@ -252,21 +253,23 @@ static void AddListAction(this IServiceCollection services, bool verbose, bool j { services.AddTransient(); - services.AddTransient(s => new ListAction( + services.AddTransient(s => new ListAction( verbose, json, s.GetRequiredService(), - s.GetRequiredService() - )); + s.GetRequiredService(), + s.GetRequiredService() + )); } static void AddGetAction(this IServiceCollection services, string type, string filePath, bool verbose) { - services.AddTransient(s => new GetAction( + services.AddTransient(s => new GetAction( verbose, type, string.IsNullOrWhiteSpace(filePath) ? null : filePath, - s.GetRequiredService() + s.GetRequiredService(), + s.GetRequiredService() )); } @@ -350,9 +353,10 @@ static void AddRunAction(this IServiceCollection services, JobCache cache, Confi return job.GetExecution(s.GetRequiredService()); }); - services.AddTransient(s => new RunAction( - s.GetRequiredService(), + services.AddTransient(s => new RunAction( s.GetRequiredService(), + s.GetRequiredService(), + s, s.GetRequiredService>(), s.GetService(), s.GetService() diff --git a/test/Runly.Tests.Cli/Program.cs b/test/Runly.Tests.Cli/Program.cs index e1e0f99..02e8fdf 100644 --- a/test/Runly.Tests.Cli/Program.cs +++ b/test/Runly.Tests.Cli/Program.cs @@ -1,5 +1,6 @@ -using Runly; +using Microsoft.Extensions.Hosting; +using Runly; await JobHost.CreateDefaultBuilder(args) .Build() - .RunJobAsync(); \ No newline at end of file + .RunAsync(); \ No newline at end of file diff --git a/test/Runly.Tests/Jobs.cs b/test/Runly.Tests/Jobs.cs index 48d90a7..5e8c793 100644 --- a/test/Runly.Tests/Jobs.cs +++ b/test/Runly.Tests/Jobs.cs @@ -1,9 +1,15 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Runly.Tests { + public class SignalConfig : Config + { + public bool WaitForSignal { get; set; } + } + public class JobNoConf : Job { public JobNoConf() : base(new Config()) { } @@ -76,18 +82,24 @@ public override Task ProcessAsync(int item, Dep1 arg1, Dep2 arg2) } } - public class Job2WithConstructorDep : Job + public class Job2WithConstructorDep : Job { - public Job2WithConstructorDep(IDep1 dep1, IDep2 dep2) : base(new Config()) { } + readonly AutoResetEvent _signal; - public override IAsyncEnumerable GetItemsAsync() - { - throw new NotImplementedException(); - } + public Job2WithConstructorDep(SignalConfig config, IDep1 dep1, IDep2 dep2, AutoResetEvent signal) + : base(config) + { + _signal = signal; + } + + public override IAsyncEnumerable GetItemsAsync() => new[] { 0 }.ToAsyncEnumerable(); public override Task ProcessAsync(int item) { - throw new NotImplementedException(); + if (Config.WaitForSignal) + _signal.WaitOne(); + + return Task.FromResult(Result.Success()); } } diff --git a/test/Runly.Tests/Runly.Tests.csproj b/test/Runly.Tests/Runly.Tests.csproj index 37b807f..606ccda 100644 --- a/test/Runly.Tests/Runly.Tests.csproj +++ b/test/Runly.Tests/Runly.Tests.csproj @@ -12,9 +12,9 @@ - - - + + + all runtime; build; native; contentfiles; analyzers diff --git a/test/Runly.Tests/Scenarios/Configuration/Applying_config_overrides.cs b/test/Runly.Tests/Scenarios/Configuration/Applying_config_overrides.cs index 52086ab..257b6dc 100644 --- a/test/Runly.Tests/Scenarios/Configuration/Applying_config_overrides.cs +++ b/test/Runly.Tests/Scenarios/Configuration/Applying_config_overrides.cs @@ -1,5 +1,6 @@ using FluentAssertions; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; using Runly.Hosting; using System; using Xunit; @@ -13,13 +14,10 @@ public void Should_apply_overrides() { var args = "CliTestJob --Int32 10 --StringArray will chad --Enum ProcessAsync --IsBool --RunlyApi.Token 1234asdf --Execution.RunAfterId 6B165086-E24D-49B5-A57D-57EBB080C0C1".Split(' '); - var services = new ServiceCollection(); - services.AddRunlyJobs(args, typeof(CliTestJob).Assembly); + var host = JobHost.CreateDefaultBuilder(args).Build(); - var provider = services.BuildServiceProvider(); - - var config = provider.GetRequiredService(); - provider.GetRequiredService().Should().BeOfType(); + var config = host.Services.GetRequiredService(); + host.Services.GetRequiredService().Should().BeOfType(); config.Int32.Should().Be(10); config.StringArray.Should().BeEquivalentTo(new[] { "will", "chad" }); @@ -34,13 +32,10 @@ public void Should_apply_lowercase_overrides() { var args = "CliTestJob --Int32 10 --IsBool --RunlyApi.Token 1234asdf --Execution.RunAfterId 6B165086-E24D-49B5-A57D-57EBB080C0C1".ToLowerInvariant().Split(' '); - var services = new ServiceCollection(); - services.AddRunlyJobs(args, typeof(CliTestJob).Assembly); - - var provider = services.BuildServiceProvider(); - - var config = provider.GetRequiredService(); - provider.GetRequiredService().Should().BeOfType(); + var host = JobHost.CreateDefaultBuilder(args).Build(); + + var config = host.Services.GetRequiredService(); + host.Services.GetRequiredService().Should().BeOfType(); config.Int32.Should().Be(10); config.IsBool.Should().BeTrue(); @@ -53,16 +48,13 @@ public void Should_fail_on_invalid_config_properties() { var args = "CliTestJob --NumberOfItems 10 --isbool --RunlyApi.Token 1234asdf --Execution.RunAfterId 6B165086-E24D-49B5-A57D-57EBB080C0C1".ToLowerInvariant().Split(' '); - var services = new ServiceCollection(); - services.AddRunlyJobs(args, typeof(CliTestJob).Assembly); - - var provider = services.BuildServiceProvider(); + var host = JobHost.CreateDefaultBuilder(args).Build(); - var config = provider.GetService(); - var host = provider.GetService(); + var config = host.Services.GetService(); + var hostedService = host.Services.GetService(); config.Should().BeNull(); - host.Should().BeNull(); + hostedService.Should().BeNull(); } } } diff --git a/test/Runly.Tests/Scenarios/Running/Running_a_job.cs b/test/Runly.Tests/Scenarios/Running/Running_a_job.cs index 126d210..14ccb07 100644 --- a/test/Runly.Tests/Scenarios/Running/Running_a_job.cs +++ b/test/Runly.Tests/Scenarios/Running/Running_a_job.cs @@ -1,8 +1,11 @@ using FluentAssertions; +using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; using Runly.Hosting; using Runly.Testing; using System; +using System.Threading; using System.Threading.Tasks; using Xunit; @@ -38,20 +41,29 @@ public async Task should_run_a_job_with_scoped_dependency_in_constructor() // CreateDefaultBuilder is more strict with Environment = Dev Environment.SetEnvironmentVariable("DOTNET_ENVIRONMENT", "Development"); - var action = JobHost.CreateDefaultBuilder(["Job1WithConstructorDep"], typeof(UnitTest).Assembly) + var signal = new AutoResetEvent(false); + + var action = JobHost.CreateDefaultBuilder(["Job2WithConstructorDep"], typeof(UnitTest).Assembly) .ConfigureServices((context, services) => { services.AddScoped(s => new Dep1()); services.AddSingleton(s => new Dep2()); + services.AddSingleton(signal); }) .Build(); - Dep1.IsDisposed.Should().BeFalse(); + var runAction = action.Services.GetRequiredService() as RunAction; - var run = action.RunJobAsync(); + ((SignalConfig)runAction.Config).WaitForSignal = true; + + Dep1.IsDisposed.Should().BeFalse(); + + var run = action.RunAsync(); Dep1.IsDisposed.Should().BeFalse(); + signal.Set(); + await run; Dep1.IsDisposed.Should().BeTrue();