diff --git a/src/BUTR.Site.NexusMods.Server/Jobs/CrashReportProcessorJob.cs b/src/BUTR.Site.NexusMods.Server/Jobs/CrashReportProcessorJob.cs index c026f572..818fd27e 100644 --- a/src/BUTR.Site.NexusMods.Server/Jobs/CrashReportProcessorJob.cs +++ b/src/BUTR.Site.NexusMods.Server/Jobs/CrashReportProcessorJob.cs @@ -44,10 +44,9 @@ public async Task Execute(IJobExecutionContext context) var client = scope.ServiceProvider.GetRequiredService(); await using var crashReportBatchedHandler = scope.ServiceProvider.GetRequiredService(); - await foreach (var batch in client.GetNewCrashReportMetadatasAsync(DateTime.UtcNow.AddDays(-2), ct).OfType().ChunkAsync(1000).WithCancellation(ct)) + await foreach (var batch in client.GetNewCrashReportMetadatasAsync(DateTime.UtcNow.AddDays(-20), ct).OfType().ChunkAsync(1000).WithCancellation(ct)) { - await crashReportBatchedHandler.HandleBatchAsync(batch, ct); - processed += batch.Count; + processed += await crashReportBatchedHandler.HandleBatchAsync(batch, ct); } } diff --git a/src/BUTR.Site.NexusMods.Server/Services/CrashReportBatchedHandler.cs b/src/BUTR.Site.NexusMods.Server/Services/CrashReportBatchedHandler.cs index a1cf20ad..d4b84e97 100644 --- a/src/BUTR.Site.NexusMods.Server/Services/CrashReportBatchedHandler.cs +++ b/src/BUTR.Site.NexusMods.Server/Services/CrashReportBatchedHandler.cs @@ -59,24 +59,31 @@ public CrashReportBatchedHandler(ILogger logger, IOpt _client = client; } - public async Task HandleBatchAsync(IEnumerable requests, CancellationToken ct) + public async Task HandleBatchAsync(IEnumerable requests, CancellationToken ct) { await _lock.WaitAsync(ct); - _toDownloadChannel = Channel.CreateBounded(ParallelCount * 2); - _httpResultChannel = Channel.CreateBounded(ParallelCount * 2); - _linkedCrashReportsChannel = Channel.CreateUnbounded(); - _ignoredCrashReportsChannel = Channel.CreateUnbounded(); + try + { + _toDownloadChannel = Channel.CreateBounded(ParallelCount * 2); + _httpResultChannel = Channel.CreateBounded(ParallelCount * 2); + _linkedCrashReportsChannel = Channel.CreateUnbounded(); + _ignoredCrashReportsChannel = Channel.CreateUnbounded(); - var filterTask = FilterCrashReportsAsync(requests, ct); - var downloadTask = DownloadCrashReportsAsync(ct); - var writeTask = WriteCrashReportsToDatabaseAsync(ct); - await Task.WhenAll(filterTask, downloadTask, writeTask); + var filterTask = FilterCrashReportsAsync(requests, ct); + var downloadTask = DownloadCrashReportsAsync(ct); + var writeTask = WriteCrashReportsToDatabaseAsync(ct); + await Task.WhenAll(filterTask, downloadTask, writeTask); - if (filterTask.IsFaulted || downloadTask.IsFaulted || writeTask.IsFaulted) - throw new AggregateException(new[] { filterTask.Exception, downloadTask.Exception, writeTask.Exception }.OfType()); + if (filterTask.IsFaulted || downloadTask.IsFaulted || writeTask.IsFaulted) + throw new AggregateException(new[] { filterTask.Exception, downloadTask.Exception, writeTask.Exception }.OfType()); - _lock.Release(); + return await writeTask; + } + finally + { + _lock.Release(); + } } private async Task FilterCrashReportsAsync(IEnumerable crashReports, CancellationToken ct) @@ -192,7 +199,7 @@ await Parallel.ForEachAsync(_toDownloadChannel.Reader.ReadAllAsync(ct), options, } } - private async Task WriteCrashReportsToDatabaseAsync(CancellationToken ct) + private async Task WriteCrashReportsToDatabaseAsync(CancellationToken ct) { var tenant = _tenantContextAccessor.Current; @@ -213,7 +220,7 @@ private async Task WriteCrashReportsToDatabaseAsync(CancellationToken ct) if (report is null) { failedCrashReportFileIds.Add(fileId); - return; + continue; } var crashReportId = CrashReportId.From(report.Id); @@ -284,6 +291,8 @@ private async Task WriteCrashReportsToDatabaseAsync(CancellationToken ct) await dbContextWrite.CrashReportToFileIds.UpsertOnSaveAsync(linkedCrashReports); await dbContextWrite.CrashReportIgnoredFileIds.UpsertOnSaveAsync(ignoredCrashReports); // Disposing the DBContext will save the data + + return crashReportsBuilder.Count; } public ValueTask DisposeAsync()