Skip to content

Commit

Permalink
Cleanup in ThreadPooledJobStore.cs
Browse files Browse the repository at this point in the history
Planning for more advanced logic later
  • Loading branch information
da3dsoul committed Jan 9, 2024
1 parent df5257b commit 800fe9f
Showing 1 changed file with 50 additions and 30 deletions.
80 changes: 50 additions & 30 deletions Shoko.Server/Scheduling/ThreadPooledJobStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Quartz;
using Quartz.Impl.AdoJobStore;
Expand All @@ -16,13 +15,13 @@ namespace Shoko.Server.Scheduling;

public class ThreadPooledJobStore : JobStoreTX
{
private ILogger<ThreadPooledJobStore> Logger { get; set; }
private readonly ILogger<ThreadPooledJobStore> _logger;
private ITypeLoadHelper _typeLoadHelper = null!;
private Dictionary<Type, int> _typeConcurrencyCache;

public ThreadPooledJobStore()
public ThreadPooledJobStore(ILogger<ThreadPooledJobStore> logger)
{
Logger = Utils.ServiceContainer.GetService<ILogger<ThreadPooledJobStore>>();
_logger = logger;
InitConcurrencyCache();
}

Expand All @@ -48,13 +47,11 @@ protected override async Task<IReadOnlyCollection<IOperableTrigger>> AcquireNext
}

var acquiredTriggers = new List<IOperableTrigger>();
var acquiredJobKeysForNoConcurrentExec = new Dictionary<Type, int>();
const int MaxDoLoopRetry = 3;
var currentLoopCount = 0;
var context = new TriggerAcquisitionContext();

do
{
currentLoopCount++;
context.CurrentLoopCount++;
try
{
var results = await Delegate.SelectTriggerToAcquire(conn, noLaterThan + timeWindow, MisfireTime, maxCount, cancellationToken)
Expand All @@ -81,41 +78,32 @@ protected override async Task<IReadOnlyCollection<IOperableTrigger>> AcquireNext

// If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then
// put it back into the timeTriggers set and continue to search for next trigger.
Type jobType;
try
{
jobType = _typeLoadHelper.LoadType(result.JobType)!;
context.CurrentJobType = _typeLoadHelper.LoadType(result.JobType)!;
}
catch (JobPersistenceException jpe)
{
try
{
Logger.LogError(jpe, "Error retrieving job, setting trigger state to ERROR");
_logger.LogError(jpe, "Error retrieving job, setting trigger state to ERROR");
await Delegate.UpdateTriggerState(conn, triggerKey, StateError, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
Logger.LogError(ex, "Unable to set trigger state to ERROR");
_logger.LogError(ex, "Unable to set trigger state to ERROR");
}
continue;
}

if (ObjectUtils.IsAttributePresent(jobType, typeof(DisallowConcurrentExecutionAttribute)))
{
if (acquiredJobKeysForNoConcurrentExec.TryGetValue(jobType, out var number) && number >= 1) continue;
acquiredJobKeysForNoConcurrentExec[jobType] = number + 1;
}
else if (jobType.GetCustomAttributes().FirstOrDefault(a => a is LimitConcurrencyAttribute) is LimitConcurrencyAttribute attribute)
{
if (!_typeConcurrencyCache.TryGetValue(jobType, out var maxConcurrentJobs)) maxConcurrentJobs = attribute.MaxConcurrentJobs;
if (acquiredJobKeysForNoConcurrentExec.TryGetValue(jobType, out var number) && number >= maxConcurrentJobs) continue;
acquiredJobKeysForNoConcurrentExec[jobType] = number + 1;
}
else if (_typeConcurrencyCache.TryGetValue(jobType, out var maxJobs) && maxJobs > 0)
{
if (acquiredJobKeysForNoConcurrentExec.TryGetValue(jobType, out var number) && number >= maxJobs) continue;
acquiredJobKeysForNoConcurrentExec[jobType] = number + 1;
}
// We can choose to not select a trigger for whatever reason, like it's running, and it's set to not allow concurrency
// rate limiting, database not available, etc
// TODO we might move some of this to the delegated SQL for performance reasons.
// Concurrency changes fast enough to not be worth it, but network availability and whatnot could result in many queries
// We could do this by building a map of types like the current queue does and filtering on QRTZ_JOB_DETAILS.JobClass
// We would need to override SQLiteDelegate, SqlServerDelegate, and MySQLDelegate
// QuartzStartup needs to have UseGenericDatabase<Delegate> for each instead of UseSqlServer, etc
if (!JobAllowed(context)) continue;

var nextFireTimeUtc = nextTrigger.GetNextFireTimeUtc();

Expand All @@ -126,7 +114,7 @@ protected override async Task<IReadOnlyCollection<IOperableTrigger>> AcquireNext
// able to be clean up by Quartz since we are not returning it to be processed.
if (nextFireTimeUtc == null)
{
Logger.LogWarning("Trigger {NextTriggerKey} returned null on nextFireTime and yet still exists in DB!", nextTrigger.Key);
_logger.LogWarning("Trigger {NextTriggerKey} returned null on nextFireTime and yet still exists in DB!", nextTrigger.Key);
continue;
}

Expand Down Expand Up @@ -159,7 +147,7 @@ protected override async Task<IReadOnlyCollection<IOperableTrigger>> AcquireNext

// if we didn't end up with any trigger to fire from that first
// batch, try again for another batch. We allow with a max retry count.
if (acquiredTriggers.Count == 0 && currentLoopCount < MaxDoLoopRetry)
if (acquiredTriggers.Count == 0 && context.CurrentLoopCount < context.MaxDoLoopRetry)
{
continue;
}
Expand All @@ -177,6 +165,30 @@ protected override async Task<IReadOnlyCollection<IOperableTrigger>> AcquireNext
return acquiredTriggers;
}

private bool JobAllowed(TriggerAcquisitionContext context)
{
var jobType = context.CurrentJobType;
var acquiredJobTypesWithLimitedConcurrency = context.AcquiredJobTypesWithLimitedConcurrency;
if (ObjectUtils.IsAttributePresent(jobType, typeof(DisallowConcurrentExecutionAttribute)))
{
if (acquiredJobTypesWithLimitedConcurrency.TryGetValue(jobType, out var number) && number >= 1) return false;
acquiredJobTypesWithLimitedConcurrency[jobType] = number + 1;
}
else if (jobType.GetCustomAttributes().FirstOrDefault(a => a is LimitConcurrencyAttribute) is LimitConcurrencyAttribute attribute)
{
if (!_typeConcurrencyCache.TryGetValue(jobType, out var maxConcurrentJobs)) maxConcurrentJobs = attribute.MaxConcurrentJobs;
if (acquiredJobTypesWithLimitedConcurrency.TryGetValue(jobType, out var number) && number >= maxConcurrentJobs) return false;
acquiredJobTypesWithLimitedConcurrency[jobType] = number + 1;
}
else if (_typeConcurrencyCache.TryGetValue(jobType, out var maxJobs) && maxJobs > 0)
{
if (acquiredJobTypesWithLimitedConcurrency.TryGetValue(jobType, out var number) && number >= maxJobs) return false;
acquiredJobTypesWithLimitedConcurrency[jobType] = number + 1;
}

return true;
}

private void InitConcurrencyCache()
{
_typeConcurrencyCache = new Dictionary<Type, int>();
Expand All @@ -202,4 +214,12 @@ private void InitConcurrencyCache()
_typeConcurrencyCache[type] = value;
}
}

private class TriggerAcquisitionContext
{
public readonly int MaxDoLoopRetry = 3;
public int CurrentLoopCount { get; set; }
public Dictionary<Type, int> AcquiredJobTypesWithLimitedConcurrency { get; }= new();
public Type CurrentJobType { get; set; }
}
}

0 comments on commit 800fe9f

Please sign in to comment.