Skip to content

Commit

Permalink
Some Changes to Quartz to Allow Filtering based on JobType
Browse files Browse the repository at this point in the history
  • Loading branch information
da3dsoul committed Jan 9, 2024
1 parent 800fe9f commit 014f5d6
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 12 deletions.
13 changes: 13 additions & 0 deletions Shoko.Server/Scheduling/Delegates/IFilteredDriverDelegate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Quartz.Impl.AdoJobStore;

namespace Shoko.Server.Scheduling.Delegates;

public interface IFilteredDriverDelegate : IDriverDelegate
{
Task<IReadOnlyCollection<TriggerAcquireResult>> SelectTriggerToAcquire(ConnectionAndTransactionHolder conn, DateTimeOffset noLaterThan,
DateTimeOffset noEarlierThan, int maxCount, Type[] jobTypesToExclude, CancellationToken cancellationToken = default);
}
99 changes: 99 additions & 0 deletions Shoko.Server/Scheduling/Delegates/MySQLDelegate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Quartz.Impl.AdoJobStore;

namespace Shoko.Server.Scheduling.Delegates;

public class MySQLDelegate : Quartz.Impl.AdoJobStore.MySQLDelegate, IFilteredDriverDelegate
{
private string _schedulerName;

protected override string GetSelectNextTriggerToAcquireSql(int maxCount)
{
return GetSelectNextTriggerToAcquireSql(maxCount, null);
}

protected string GetSelectNextTriggerToAcquireSql(int maxCount, Type[] jobTypesToExclude)
{
return $@"SELECT
t.{ColumnTriggerName}, t.{ColumnTriggerGroup}, jd.{ColumnJobClass}
FROM
{TablePrefixSubst}{TableTriggers} t
JOIN
{TablePrefixSubst}{TableJobDetails} jd ON (jd.{ColumnSchedulerName} = t.{ColumnSchedulerName} AND jd.{ColumnJobGroup} = t.{ColumnJobGroup} AND jd.{ColumnJobName} = t.{ColumnJobName})
WHERE
t.{ColumnSchedulerName} = @schedulerName AND {ColumnTriggerState} = @state AND {ColumnNextFireTime} <= @noLaterThan AND ({ColumnMifireInstruction} = -1 OR ({ColumnMifireInstruction} <> -1 AND {ColumnNextFireTime} >= @noEarlierThan))
{(jobTypesToExclude == null || jobTypesToExclude.Length == 0 ? "" : $"AND jd.{ColumnJobClass} NOT IN ({string.Join(",", jobTypesToExclude.Select(a => $"'{GetStorableJobTypeName(a)}'"))})")}
ORDER BY
{ColumnNextFireTime} ASC, {ColumnPriority} DESC
LIMIT {maxCount};";
}

public override void Initialize(DelegateInitializationArgs args)
{
base.Initialize(args);
_schedulerName = args.InstanceName;
}

public override async Task<IReadOnlyCollection<TriggerAcquireResult>> SelectTriggerToAcquire(
ConnectionAndTransactionHolder conn,
DateTimeOffset noLaterThan,
DateTimeOffset noEarlierThan,
int maxCount,
CancellationToken cancellationToken = default)
{
return await SelectTriggerToAcquire(conn, noLaterThan, noEarlierThan, maxCount, null, cancellationToken);
}

public async Task<IReadOnlyCollection<TriggerAcquireResult>> SelectTriggerToAcquire(
ConnectionAndTransactionHolder conn,
DateTimeOffset noLaterThan,
DateTimeOffset noEarlierThan,
int maxCount,
Type[] jobTypesToExclude,
CancellationToken cancellationToken = default)
{
if (maxCount < 1)
{
maxCount = 1; // we want at least one trigger back.
}

using var cmd = PrepareCommand(conn, ReplaceTablePrefix(GetSelectNextTriggerToAcquireSql(maxCount, jobTypesToExclude)));
List<TriggerAcquireResult> nextTriggers = new();

AddCommandParameter(cmd, "schedulerName", _schedulerName);
AddCommandParameter(cmd, "state", StateWaiting);
AddCommandParameter(cmd, "noLaterThan", GetDbDateTimeValue(noLaterThan));
AddCommandParameter(cmd, "noEarlierThan", GetDbDateTimeValue(noEarlierThan));

using var rs = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
// signal cancel, otherwise ADO.NET might have trouble handling partial reads from open reader
var shouldStop = false;
while (await rs.ReadAsync(cancellationToken).ConfigureAwait(false))
{
if (shouldStop)
{
cmd.Cancel();
break;
}

if (nextTriggers.Count < maxCount)
{
var result = new TriggerAcquireResult(
(string)rs[ColumnTriggerName],
(string)rs[ColumnTriggerGroup],
(string)rs[ColumnJobClass]);
nextTriggers.Add(result);
}
else
{
shouldStop = true;
}
}

return nextTriggers;
}
}
99 changes: 99 additions & 0 deletions Shoko.Server/Scheduling/Delegates/SQLiteDelegate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Quartz.Impl.AdoJobStore;

namespace Shoko.Server.Scheduling.Delegates;

public class SQLiteDelegate : Quartz.Impl.AdoJobStore.SQLiteDelegate, IFilteredDriverDelegate
{
private string _schedulerName;

protected override string GetSelectNextTriggerToAcquireSql(int maxCount)
{
return GetSelectNextTriggerToAcquireSql(maxCount, null);
}

protected string GetSelectNextTriggerToAcquireSql(int maxCount, Type[] jobTypesToExclude)
{
return $@"SELECT
t.{ColumnTriggerName}, t.{ColumnTriggerGroup}, jd.{ColumnJobClass}
FROM
{TablePrefixSubst}{TableTriggers} t
JOIN
{TablePrefixSubst}{TableJobDetails} jd ON (jd.{ColumnSchedulerName} = t.{ColumnSchedulerName} AND jd.{ColumnJobGroup} = t.{ColumnJobGroup} AND jd.{ColumnJobName} = t.{ColumnJobName})
WHERE
t.{ColumnSchedulerName} = @schedulerName AND {ColumnTriggerState} = @state AND {ColumnNextFireTime} <= @noLaterThan AND ({ColumnMifireInstruction} = -1 OR ({ColumnMifireInstruction} <> -1 AND {ColumnNextFireTime} >= @noEarlierThan))
{(jobTypesToExclude == null || jobTypesToExclude.Length == 0 ? "" : $"AND jd.{ColumnJobClass} NOT IN ({string.Join(",", jobTypesToExclude.Select(a => $"'{GetStorableJobTypeName(a)}'"))})")}
ORDER BY
{ColumnNextFireTime} ASC, {ColumnPriority} DESC
LIMIT {maxCount};";
}

public override void Initialize(DelegateInitializationArgs args)
{
base.Initialize(args);
_schedulerName = args.InstanceName;
}

public override async Task<IReadOnlyCollection<TriggerAcquireResult>> SelectTriggerToAcquire(
ConnectionAndTransactionHolder conn,
DateTimeOffset noLaterThan,
DateTimeOffset noEarlierThan,
int maxCount,
CancellationToken cancellationToken = default)
{
return await SelectTriggerToAcquire(conn, noLaterThan, noEarlierThan, maxCount, null, cancellationToken);
}

public async Task<IReadOnlyCollection<TriggerAcquireResult>> SelectTriggerToAcquire(
ConnectionAndTransactionHolder conn,
DateTimeOffset noLaterThan,
DateTimeOffset noEarlierThan,
int maxCount,
Type[] jobTypesToExclude,
CancellationToken cancellationToken = default)
{
if (maxCount < 1)
{
maxCount = 1; // we want at least one trigger back.
}

using var cmd = PrepareCommand(conn, ReplaceTablePrefix(GetSelectNextTriggerToAcquireSql(maxCount, jobTypesToExclude)));
List<TriggerAcquireResult> nextTriggers = new();

AddCommandParameter(cmd, "schedulerName", _schedulerName);
AddCommandParameter(cmd, "state", StateWaiting);
AddCommandParameter(cmd, "noLaterThan", GetDbDateTimeValue(noLaterThan));
AddCommandParameter(cmd, "noEarlierThan", GetDbDateTimeValue(noEarlierThan));

using var rs = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
// signal cancel, otherwise ADO.NET might have trouble handling partial reads from open reader
var shouldStop = false;
while (await rs.ReadAsync(cancellationToken).ConfigureAwait(false))
{
if (shouldStop)
{
cmd.Cancel();
break;
}

if (nextTriggers.Count < maxCount)
{
var result = new TriggerAcquireResult(
(string)rs[ColumnTriggerName],
(string)rs[ColumnTriggerGroup],
(string)rs[ColumnJobClass]);
nextTriggers.Add(result);
}
else
{
shouldStop = true;
}
}

return nextTriggers;
}
}
98 changes: 98 additions & 0 deletions Shoko.Server/Scheduling/Delegates/SqlServerDelegate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Quartz.Impl.AdoJobStore;

namespace Shoko.Server.Scheduling.Delegates;

public class SqlServerDelegate : Quartz.Impl.AdoJobStore.SqlServerDelegate, IFilteredDriverDelegate
{
private string _schedulerName;

protected override string GetSelectNextTriggerToAcquireSql(int maxCount)
{
return GetSelectNextTriggerToAcquireSql(maxCount, null);
}

protected string GetSelectNextTriggerToAcquireSql(int maxCount, Type[] jobTypesToExclude)
{
return $@"SELECT TOP {maxCount}
t.{ColumnTriggerName}, t.{ColumnTriggerGroup}, jd.{ColumnJobClass}
FROM
{TablePrefixSubst}{TableTriggers} t
JOIN
{TablePrefixSubst}{TableJobDetails} jd ON (jd.{ColumnSchedulerName} = t.{ColumnSchedulerName} AND jd.{ColumnJobGroup} = t.{ColumnJobGroup} AND jd.{ColumnJobName} = t.{ColumnJobName})
WHERE
t.{ColumnSchedulerName} = @schedulerName AND {ColumnTriggerState} = @state AND {ColumnNextFireTime} <= @noLaterThan AND ({ColumnMifireInstruction} = -1 OR ({ColumnMifireInstruction} <> -1 AND {ColumnNextFireTime} >= @noEarlierThan))
{(jobTypesToExclude == null || jobTypesToExclude.Length == 0 ? "" : $"AND jd.{ColumnJobClass} NOT IN ({string.Join(",", jobTypesToExclude.Select(a => $"'{GetStorableJobTypeName(a)}'"))})")}
ORDER BY
{ColumnNextFireTime} ASC, {ColumnPriority} DESC;";
}

public override void Initialize(DelegateInitializationArgs args)
{
base.Initialize(args);
_schedulerName = args.InstanceName;
}

public override async Task<IReadOnlyCollection<TriggerAcquireResult>> SelectTriggerToAcquire(
ConnectionAndTransactionHolder conn,
DateTimeOffset noLaterThan,
DateTimeOffset noEarlierThan,
int maxCount,
CancellationToken cancellationToken = default)
{
return await SelectTriggerToAcquire(conn, noLaterThan, noEarlierThan, maxCount, null, cancellationToken);
}

public async Task<IReadOnlyCollection<TriggerAcquireResult>> SelectTriggerToAcquire(
ConnectionAndTransactionHolder conn,
DateTimeOffset noLaterThan,
DateTimeOffset noEarlierThan,
int maxCount,
Type[] jobTypesToExclude,
CancellationToken cancellationToken = default)
{
if (maxCount < 1)
{
maxCount = 1; // we want at least one trigger back.
}

using var cmd = PrepareCommand(conn, ReplaceTablePrefix(GetSelectNextTriggerToAcquireSql(maxCount, jobTypesToExclude)));
List<TriggerAcquireResult> nextTriggers = new();

AddCommandParameter(cmd, "schedulerName", _schedulerName);
AddCommandParameter(cmd, "state", StateWaiting);
AddCommandParameter(cmd, "noLaterThan", GetDbDateTimeValue(noLaterThan));
AddCommandParameter(cmd, "noEarlierThan", GetDbDateTimeValue(noEarlierThan));

using var rs = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
// signal cancel, otherwise ADO.NET might have trouble handling partial reads from open reader
var shouldStop = false;
while (await rs.ReadAsync(cancellationToken).ConfigureAwait(false))
{
if (shouldStop)
{
cmd.Cancel();
break;
}

if (nextTriggers.Count < maxCount)
{
var result = new TriggerAcquireResult(
(string)rs[ColumnTriggerName],
(string)rs[ColumnTriggerGroup],
(string)rs[ColumnJobClass]);
nextTriggers.Add(result);
}
else
{
shouldStop = true;
}
}

return nextTriggers;
}
}
7 changes: 4 additions & 3 deletions Shoko.Server/Scheduling/QuartzStartup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Quartz;
using Quartz.AspNetCore;
using QuartzJobFactory;
using Shoko.Server.Scheduling.Delegates;
using Shoko.Server.Scheduling.Jobs.Actions;
using Shoko.Server.Scheduling.Jobs.Shoko;
using Shoko.Server.Server;
Expand Down Expand Up @@ -58,17 +59,17 @@ private static void UseDatabase(this IServiceCollectionQuartzConfigurator q)
if (settings.Quartz.DatabaseType.Trim().Equals(Constants.DatabaseType.SqlServer, StringComparison.InvariantCultureIgnoreCase))
{
EnsureQuartzDatabaseExists_SQLServer(settings.Quartz.ConnectionString);
options.UseSqlServer(c => c.ConnectionString = settings.Quartz.ConnectionString);
options.UseGenericDatabase<SqlServerDelegate>("SqlServer", c => c.ConnectionString = settings.Quartz.ConnectionString);
}
else if (settings.Quartz.DatabaseType.Trim().Equals(Constants.DatabaseType.MySQL, StringComparison.InvariantCultureIgnoreCase))
{
EnsureQuartzDatabaseExists_MySQL(settings.Quartz.ConnectionString);
options.UseMySqlConnector(c => c.ConnectionString = settings.Quartz.ConnectionString);
options.UseGenericDatabase<MySQLDelegate>("MySqlConnector", c => c.ConnectionString = settings.Quartz.ConnectionString);
}
else if (settings.Quartz.DatabaseType.Trim().Equals(Constants.DatabaseType.Sqlite, StringComparison.InvariantCultureIgnoreCase))
{
EnsureQuartzDatabaseExists_SQLite(settings.Quartz.ConnectionString);
options.UseMicrosoftSQLite(c => c.ConnectionString = settings.Quartz.ConnectionString);
options.UseGenericDatabase<SQLiteDelegate>("SQLite-Microsoft", c => c.ConnectionString = settings.Quartz.ConnectionString);
}
options.UseNewtonsoftJsonSerializer();
});
Expand Down
19 changes: 10 additions & 9 deletions Shoko.Server/Scheduling/ThreadPooledJobStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Quartz.Impl.AdoJobStore;
using Quartz.Spi;
using Quartz.Util;
using Shoko.Server.Scheduling.Delegates;
using Shoko.Server.Utilities;

namespace Shoko.Server.Scheduling;
Expand Down Expand Up @@ -54,8 +55,9 @@ protected override async Task<IReadOnlyCollection<IOperableTrigger>> AcquireNext
context.CurrentLoopCount++;
try
{
var results = await Delegate.SelectTriggerToAcquire(conn, noLaterThan + timeWindow, MisfireTime, maxCount, cancellationToken)
.ConfigureAwait(false);
var typesToExclude = GetTypesToExclude();
var results = await (Delegate as IFilteredDriverDelegate)!
.SelectTriggerToAcquire(conn, noLaterThan + timeWindow, MisfireTime, maxCount, typesToExclude, cancellationToken).ConfigureAwait(false);

// No trigger is ready to fire yet.
if (results.Count == 0)
Expand Down Expand Up @@ -96,13 +98,6 @@ protected override async Task<IReadOnlyCollection<IOperableTrigger>> AcquireNext
continue;
}

// We can choose to not select a trigger for whatever reason, like it's running, and it's set to not allow concurrency
// rate limiting, database not available, etc
// TODO we might move some of this to the delegated SQL for performance reasons.
// Concurrency changes fast enough to not be worth it, but network availability and whatnot could result in many queries
// We could do this by building a map of types like the current queue does and filtering on QRTZ_JOB_DETAILS.JobClass
// We would need to override SQLiteDelegate, SqlServerDelegate, and MySQLDelegate
// QuartzStartup needs to have UseGenericDatabase<Delegate> for each instead of UseSqlServer, etc
if (!JobAllowed(context)) continue;

var nextFireTimeUtc = nextTrigger.GetNextFireTimeUtc();
Expand Down Expand Up @@ -165,6 +160,12 @@ protected override async Task<IReadOnlyCollection<IOperableTrigger>> AcquireNext
return acquiredTriggers;
}

private Type[] GetTypesToExclude()
{
// TODO We can get the status of things and add things to exclude
return Array.Empty<Type>();
}

private bool JobAllowed(TriggerAcquisitionContext context)
{
var jobType = context.CurrentJobType;
Expand Down

0 comments on commit 014f5d6

Please sign in to comment.