From 800fe9f611036e024449af5005874b96286fd041 Mon Sep 17 00:00:00 2001 From: da3dsoul Date: Tue, 9 Jan 2024 00:29:57 -0500 Subject: [PATCH] Cleanup in ThreadPooledJobStore.cs Planning for more advanced logic later --- .../Scheduling/ThreadPooledJobStore.cs | 80 ++++++++++++------- 1 file changed, 50 insertions(+), 30 deletions(-) diff --git a/Shoko.Server/Scheduling/ThreadPooledJobStore.cs b/Shoko.Server/Scheduling/ThreadPooledJobStore.cs index 769b17c24..b095f8ac0 100644 --- a/Shoko.Server/Scheduling/ThreadPooledJobStore.cs +++ b/Shoko.Server/Scheduling/ThreadPooledJobStore.cs @@ -4,7 +4,6 @@ using System.Reflection; using System.Threading; using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Quartz; using Quartz.Impl.AdoJobStore; @@ -16,13 +15,13 @@ namespace Shoko.Server.Scheduling; public class ThreadPooledJobStore : JobStoreTX { - private ILogger Logger { get; set; } + private readonly ILogger _logger; private ITypeLoadHelper _typeLoadHelper = null!; private Dictionary _typeConcurrencyCache; - public ThreadPooledJobStore() + public ThreadPooledJobStore(ILogger logger) { - Logger = Utils.ServiceContainer.GetService>(); + _logger = logger; InitConcurrencyCache(); } @@ -48,13 +47,11 @@ protected override async Task> AcquireNext } var acquiredTriggers = new List(); - var acquiredJobKeysForNoConcurrentExec = new Dictionary(); - const int MaxDoLoopRetry = 3; - var currentLoopCount = 0; + var context = new TriggerAcquisitionContext(); do { - currentLoopCount++; + context.CurrentLoopCount++; try { var results = await Delegate.SelectTriggerToAcquire(conn, noLaterThan + timeWindow, MisfireTime, maxCount, cancellationToken) @@ -81,41 +78,32 @@ protected override async Task> AcquireNext // If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then // put it back into the timeTriggers set and continue to search for next trigger. - Type jobType; try { - jobType = _typeLoadHelper.LoadType(result.JobType)!; + context.CurrentJobType = _typeLoadHelper.LoadType(result.JobType)!; } catch (JobPersistenceException jpe) { try { - Logger.LogError(jpe, "Error retrieving job, setting trigger state to ERROR"); + _logger.LogError(jpe, "Error retrieving job, setting trigger state to ERROR"); await Delegate.UpdateTriggerState(conn, triggerKey, StateError, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { - Logger.LogError(ex, "Unable to set trigger state to ERROR"); + _logger.LogError(ex, "Unable to set trigger state to ERROR"); } continue; } - if (ObjectUtils.IsAttributePresent(jobType, typeof(DisallowConcurrentExecutionAttribute))) - { - if (acquiredJobKeysForNoConcurrentExec.TryGetValue(jobType, out var number) && number >= 1) continue; - acquiredJobKeysForNoConcurrentExec[jobType] = number + 1; - } - else if (jobType.GetCustomAttributes().FirstOrDefault(a => a is LimitConcurrencyAttribute) is LimitConcurrencyAttribute attribute) - { - if (!_typeConcurrencyCache.TryGetValue(jobType, out var maxConcurrentJobs)) maxConcurrentJobs = attribute.MaxConcurrentJobs; - if (acquiredJobKeysForNoConcurrentExec.TryGetValue(jobType, out var number) && number >= maxConcurrentJobs) continue; - acquiredJobKeysForNoConcurrentExec[jobType] = number + 1; - } - else if (_typeConcurrencyCache.TryGetValue(jobType, out var maxJobs) && maxJobs > 0) - { - if (acquiredJobKeysForNoConcurrentExec.TryGetValue(jobType, out var number) && number >= maxJobs) continue; - acquiredJobKeysForNoConcurrentExec[jobType] = number + 1; - } + // We can choose to not select a trigger for whatever reason, like it's running, and it's set to not allow concurrency + // rate limiting, database not available, etc + // TODO we might move some of this to the delegated SQL for performance reasons. + // Concurrency changes fast enough to not be worth it, but network availability and whatnot could result in many queries + // We could do this by building a map of types like the current queue does and filtering on QRTZ_JOB_DETAILS.JobClass + // We would need to override SQLiteDelegate, SqlServerDelegate, and MySQLDelegate + // QuartzStartup needs to have UseGenericDatabase for each instead of UseSqlServer, etc + if (!JobAllowed(context)) continue; var nextFireTimeUtc = nextTrigger.GetNextFireTimeUtc(); @@ -126,7 +114,7 @@ protected override async Task> AcquireNext // able to be clean up by Quartz since we are not returning it to be processed. if (nextFireTimeUtc == null) { - Logger.LogWarning("Trigger {NextTriggerKey} returned null on nextFireTime and yet still exists in DB!", nextTrigger.Key); + _logger.LogWarning("Trigger {NextTriggerKey} returned null on nextFireTime and yet still exists in DB!", nextTrigger.Key); continue; } @@ -159,7 +147,7 @@ protected override async Task> AcquireNext // if we didn't end up with any trigger to fire from that first // batch, try again for another batch. We allow with a max retry count. - if (acquiredTriggers.Count == 0 && currentLoopCount < MaxDoLoopRetry) + if (acquiredTriggers.Count == 0 && context.CurrentLoopCount < context.MaxDoLoopRetry) { continue; } @@ -177,6 +165,30 @@ protected override async Task> AcquireNext return acquiredTriggers; } + private bool JobAllowed(TriggerAcquisitionContext context) + { + var jobType = context.CurrentJobType; + var acquiredJobTypesWithLimitedConcurrency = context.AcquiredJobTypesWithLimitedConcurrency; + if (ObjectUtils.IsAttributePresent(jobType, typeof(DisallowConcurrentExecutionAttribute))) + { + if (acquiredJobTypesWithLimitedConcurrency.TryGetValue(jobType, out var number) && number >= 1) return false; + acquiredJobTypesWithLimitedConcurrency[jobType] = number + 1; + } + else if (jobType.GetCustomAttributes().FirstOrDefault(a => a is LimitConcurrencyAttribute) is LimitConcurrencyAttribute attribute) + { + if (!_typeConcurrencyCache.TryGetValue(jobType, out var maxConcurrentJobs)) maxConcurrentJobs = attribute.MaxConcurrentJobs; + if (acquiredJobTypesWithLimitedConcurrency.TryGetValue(jobType, out var number) && number >= maxConcurrentJobs) return false; + acquiredJobTypesWithLimitedConcurrency[jobType] = number + 1; + } + else if (_typeConcurrencyCache.TryGetValue(jobType, out var maxJobs) && maxJobs > 0) + { + if (acquiredJobTypesWithLimitedConcurrency.TryGetValue(jobType, out var number) && number >= maxJobs) return false; + acquiredJobTypesWithLimitedConcurrency[jobType] = number + 1; + } + + return true; + } + private void InitConcurrencyCache() { _typeConcurrencyCache = new Dictionary(); @@ -202,4 +214,12 @@ private void InitConcurrencyCache() _typeConcurrencyCache[type] = value; } } + + private class TriggerAcquisitionContext + { + public readonly int MaxDoLoopRetry = 3; + public int CurrentLoopCount { get; set; } + public Dictionary AcquiredJobTypesWithLimitedConcurrency { get; }= new(); + public Type CurrentJobType { get; set; } + } }