From 014f5d6b6b3b2ee04986394d489338c7ebd9f2f8 Mon Sep 17 00:00:00 2001 From: da3dsoul Date: Tue, 9 Jan 2024 18:05:21 -0500 Subject: [PATCH] Some Changes to Quartz to Allow Filtering based on JobType --- .../Delegates/IFilteredDriverDelegate.cs | 13 +++ .../Scheduling/Delegates/MySQLDelegate.cs | 99 +++++++++++++++++++ .../Scheduling/Delegates/SQLiteDelegate.cs | 99 +++++++++++++++++++ .../Scheduling/Delegates/SqlServerDelegate.cs | 98 ++++++++++++++++++ Shoko.Server/Scheduling/QuartzStartup.cs | 7 +- .../Scheduling/ThreadPooledJobStore.cs | 19 ++-- 6 files changed, 323 insertions(+), 12 deletions(-) create mode 100644 Shoko.Server/Scheduling/Delegates/IFilteredDriverDelegate.cs create mode 100644 Shoko.Server/Scheduling/Delegates/MySQLDelegate.cs create mode 100644 Shoko.Server/Scheduling/Delegates/SQLiteDelegate.cs create mode 100644 Shoko.Server/Scheduling/Delegates/SqlServerDelegate.cs diff --git a/Shoko.Server/Scheduling/Delegates/IFilteredDriverDelegate.cs b/Shoko.Server/Scheduling/Delegates/IFilteredDriverDelegate.cs new file mode 100644 index 000000000..1fd32bae2 --- /dev/null +++ b/Shoko.Server/Scheduling/Delegates/IFilteredDriverDelegate.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Quartz.Impl.AdoJobStore; + +namespace Shoko.Server.Scheduling.Delegates; + +public interface IFilteredDriverDelegate : IDriverDelegate +{ + Task> SelectTriggerToAcquire(ConnectionAndTransactionHolder conn, DateTimeOffset noLaterThan, + DateTimeOffset noEarlierThan, int maxCount, Type[] jobTypesToExclude, CancellationToken cancellationToken = default); +} diff --git a/Shoko.Server/Scheduling/Delegates/MySQLDelegate.cs b/Shoko.Server/Scheduling/Delegates/MySQLDelegate.cs new file mode 100644 index 000000000..95d364d28 --- /dev/null +++ b/Shoko.Server/Scheduling/Delegates/MySQLDelegate.cs @@ -0,0 +1,99 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Quartz.Impl.AdoJobStore; + +namespace Shoko.Server.Scheduling.Delegates; + +public class MySQLDelegate : Quartz.Impl.AdoJobStore.MySQLDelegate, IFilteredDriverDelegate +{ + private string _schedulerName; + + protected override string GetSelectNextTriggerToAcquireSql(int maxCount) + { + return GetSelectNextTriggerToAcquireSql(maxCount, null); + } + + protected string GetSelectNextTriggerToAcquireSql(int maxCount, Type[] jobTypesToExclude) + { + return $@"SELECT + t.{ColumnTriggerName}, t.{ColumnTriggerGroup}, jd.{ColumnJobClass} + FROM + {TablePrefixSubst}{TableTriggers} t + JOIN + {TablePrefixSubst}{TableJobDetails} jd ON (jd.{ColumnSchedulerName} = t.{ColumnSchedulerName} AND jd.{ColumnJobGroup} = t.{ColumnJobGroup} AND jd.{ColumnJobName} = t.{ColumnJobName}) + WHERE + t.{ColumnSchedulerName} = @schedulerName AND {ColumnTriggerState} = @state AND {ColumnNextFireTime} <= @noLaterThan AND ({ColumnMifireInstruction} = -1 OR ({ColumnMifireInstruction} <> -1 AND {ColumnNextFireTime} >= @noEarlierThan)) + {(jobTypesToExclude == null || jobTypesToExclude.Length == 0 ? "" : $"AND jd.{ColumnJobClass} NOT IN ({string.Join(",", jobTypesToExclude.Select(a => $"'{GetStorableJobTypeName(a)}'"))})")} + ORDER BY + {ColumnNextFireTime} ASC, {ColumnPriority} DESC + LIMIT {maxCount};"; + } + + public override void Initialize(DelegateInitializationArgs args) + { + base.Initialize(args); + _schedulerName = args.InstanceName; + } + + public override async Task> SelectTriggerToAcquire( + ConnectionAndTransactionHolder conn, + DateTimeOffset noLaterThan, + DateTimeOffset noEarlierThan, + int maxCount, + CancellationToken cancellationToken = default) + { + return await SelectTriggerToAcquire(conn, noLaterThan, noEarlierThan, maxCount, null, cancellationToken); + } + + public async Task> SelectTriggerToAcquire( + ConnectionAndTransactionHolder conn, + DateTimeOffset noLaterThan, + DateTimeOffset noEarlierThan, + int maxCount, + Type[] jobTypesToExclude, + CancellationToken cancellationToken = default) + { + if (maxCount < 1) + { + maxCount = 1; // we want at least one trigger back. + } + + using var cmd = PrepareCommand(conn, ReplaceTablePrefix(GetSelectNextTriggerToAcquireSql(maxCount, jobTypesToExclude))); + List nextTriggers = new(); + + AddCommandParameter(cmd, "schedulerName", _schedulerName); + AddCommandParameter(cmd, "state", StateWaiting); + AddCommandParameter(cmd, "noLaterThan", GetDbDateTimeValue(noLaterThan)); + AddCommandParameter(cmd, "noEarlierThan", GetDbDateTimeValue(noEarlierThan)); + + using var rs = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); + // signal cancel, otherwise ADO.NET might have trouble handling partial reads from open reader + var shouldStop = false; + while (await rs.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + if (shouldStop) + { + cmd.Cancel(); + break; + } + + if (nextTriggers.Count < maxCount) + { + var result = new TriggerAcquireResult( + (string)rs[ColumnTriggerName], + (string)rs[ColumnTriggerGroup], + (string)rs[ColumnJobClass]); + nextTriggers.Add(result); + } + else + { + shouldStop = true; + } + } + + return nextTriggers; + } +} diff --git a/Shoko.Server/Scheduling/Delegates/SQLiteDelegate.cs b/Shoko.Server/Scheduling/Delegates/SQLiteDelegate.cs new file mode 100644 index 000000000..fd5f9e1ec --- /dev/null +++ b/Shoko.Server/Scheduling/Delegates/SQLiteDelegate.cs @@ -0,0 +1,99 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Quartz.Impl.AdoJobStore; + +namespace Shoko.Server.Scheduling.Delegates; + +public class SQLiteDelegate : Quartz.Impl.AdoJobStore.SQLiteDelegate, IFilteredDriverDelegate +{ + private string _schedulerName; + + protected override string GetSelectNextTriggerToAcquireSql(int maxCount) + { + return GetSelectNextTriggerToAcquireSql(maxCount, null); + } + + protected string GetSelectNextTriggerToAcquireSql(int maxCount, Type[] jobTypesToExclude) + { + return $@"SELECT + t.{ColumnTriggerName}, t.{ColumnTriggerGroup}, jd.{ColumnJobClass} + FROM + {TablePrefixSubst}{TableTriggers} t + JOIN + {TablePrefixSubst}{TableJobDetails} jd ON (jd.{ColumnSchedulerName} = t.{ColumnSchedulerName} AND jd.{ColumnJobGroup} = t.{ColumnJobGroup} AND jd.{ColumnJobName} = t.{ColumnJobName}) + WHERE + t.{ColumnSchedulerName} = @schedulerName AND {ColumnTriggerState} = @state AND {ColumnNextFireTime} <= @noLaterThan AND ({ColumnMifireInstruction} = -1 OR ({ColumnMifireInstruction} <> -1 AND {ColumnNextFireTime} >= @noEarlierThan)) + {(jobTypesToExclude == null || jobTypesToExclude.Length == 0 ? "" : $"AND jd.{ColumnJobClass} NOT IN ({string.Join(",", jobTypesToExclude.Select(a => $"'{GetStorableJobTypeName(a)}'"))})")} + ORDER BY + {ColumnNextFireTime} ASC, {ColumnPriority} DESC + LIMIT {maxCount};"; + } + + public override void Initialize(DelegateInitializationArgs args) + { + base.Initialize(args); + _schedulerName = args.InstanceName; + } + + public override async Task> SelectTriggerToAcquire( + ConnectionAndTransactionHolder conn, + DateTimeOffset noLaterThan, + DateTimeOffset noEarlierThan, + int maxCount, + CancellationToken cancellationToken = default) + { + return await SelectTriggerToAcquire(conn, noLaterThan, noEarlierThan, maxCount, null, cancellationToken); + } + + public async Task> SelectTriggerToAcquire( + ConnectionAndTransactionHolder conn, + DateTimeOffset noLaterThan, + DateTimeOffset noEarlierThan, + int maxCount, + Type[] jobTypesToExclude, + CancellationToken cancellationToken = default) + { + if (maxCount < 1) + { + maxCount = 1; // we want at least one trigger back. + } + + using var cmd = PrepareCommand(conn, ReplaceTablePrefix(GetSelectNextTriggerToAcquireSql(maxCount, jobTypesToExclude))); + List nextTriggers = new(); + + AddCommandParameter(cmd, "schedulerName", _schedulerName); + AddCommandParameter(cmd, "state", StateWaiting); + AddCommandParameter(cmd, "noLaterThan", GetDbDateTimeValue(noLaterThan)); + AddCommandParameter(cmd, "noEarlierThan", GetDbDateTimeValue(noEarlierThan)); + + using var rs = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); + // signal cancel, otherwise ADO.NET might have trouble handling partial reads from open reader + var shouldStop = false; + while (await rs.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + if (shouldStop) + { + cmd.Cancel(); + break; + } + + if (nextTriggers.Count < maxCount) + { + var result = new TriggerAcquireResult( + (string)rs[ColumnTriggerName], + (string)rs[ColumnTriggerGroup], + (string)rs[ColumnJobClass]); + nextTriggers.Add(result); + } + else + { + shouldStop = true; + } + } + + return nextTriggers; + } +} diff --git a/Shoko.Server/Scheduling/Delegates/SqlServerDelegate.cs b/Shoko.Server/Scheduling/Delegates/SqlServerDelegate.cs new file mode 100644 index 000000000..ea471f0b1 --- /dev/null +++ b/Shoko.Server/Scheduling/Delegates/SqlServerDelegate.cs @@ -0,0 +1,98 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Quartz.Impl.AdoJobStore; + +namespace Shoko.Server.Scheduling.Delegates; + +public class SqlServerDelegate : Quartz.Impl.AdoJobStore.SqlServerDelegate, IFilteredDriverDelegate +{ + private string _schedulerName; + + protected override string GetSelectNextTriggerToAcquireSql(int maxCount) + { + return GetSelectNextTriggerToAcquireSql(maxCount, null); + } + + protected string GetSelectNextTriggerToAcquireSql(int maxCount, Type[] jobTypesToExclude) + { + return $@"SELECT TOP {maxCount} + t.{ColumnTriggerName}, t.{ColumnTriggerGroup}, jd.{ColumnJobClass} + FROM + {TablePrefixSubst}{TableTriggers} t + JOIN + {TablePrefixSubst}{TableJobDetails} jd ON (jd.{ColumnSchedulerName} = t.{ColumnSchedulerName} AND jd.{ColumnJobGroup} = t.{ColumnJobGroup} AND jd.{ColumnJobName} = t.{ColumnJobName}) + WHERE + t.{ColumnSchedulerName} = @schedulerName AND {ColumnTriggerState} = @state AND {ColumnNextFireTime} <= @noLaterThan AND ({ColumnMifireInstruction} = -1 OR ({ColumnMifireInstruction} <> -1 AND {ColumnNextFireTime} >= @noEarlierThan)) + {(jobTypesToExclude == null || jobTypesToExclude.Length == 0 ? "" : $"AND jd.{ColumnJobClass} NOT IN ({string.Join(",", jobTypesToExclude.Select(a => $"'{GetStorableJobTypeName(a)}'"))})")} + ORDER BY + {ColumnNextFireTime} ASC, {ColumnPriority} DESC;"; + } + + public override void Initialize(DelegateInitializationArgs args) + { + base.Initialize(args); + _schedulerName = args.InstanceName; + } + + public override async Task> SelectTriggerToAcquire( + ConnectionAndTransactionHolder conn, + DateTimeOffset noLaterThan, + DateTimeOffset noEarlierThan, + int maxCount, + CancellationToken cancellationToken = default) + { + return await SelectTriggerToAcquire(conn, noLaterThan, noEarlierThan, maxCount, null, cancellationToken); + } + + public async Task> SelectTriggerToAcquire( + ConnectionAndTransactionHolder conn, + DateTimeOffset noLaterThan, + DateTimeOffset noEarlierThan, + int maxCount, + Type[] jobTypesToExclude, + CancellationToken cancellationToken = default) + { + if (maxCount < 1) + { + maxCount = 1; // we want at least one trigger back. + } + + using var cmd = PrepareCommand(conn, ReplaceTablePrefix(GetSelectNextTriggerToAcquireSql(maxCount, jobTypesToExclude))); + List nextTriggers = new(); + + AddCommandParameter(cmd, "schedulerName", _schedulerName); + AddCommandParameter(cmd, "state", StateWaiting); + AddCommandParameter(cmd, "noLaterThan", GetDbDateTimeValue(noLaterThan)); + AddCommandParameter(cmd, "noEarlierThan", GetDbDateTimeValue(noEarlierThan)); + + using var rs = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); + // signal cancel, otherwise ADO.NET might have trouble handling partial reads from open reader + var shouldStop = false; + while (await rs.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + if (shouldStop) + { + cmd.Cancel(); + break; + } + + if (nextTriggers.Count < maxCount) + { + var result = new TriggerAcquireResult( + (string)rs[ColumnTriggerName], + (string)rs[ColumnTriggerGroup], + (string)rs[ColumnJobClass]); + nextTriggers.Add(result); + } + else + { + shouldStop = true; + } + } + + return nextTriggers; + } +} diff --git a/Shoko.Server/Scheduling/QuartzStartup.cs b/Shoko.Server/Scheduling/QuartzStartup.cs index 70c598de2..c209e2507 100644 --- a/Shoko.Server/Scheduling/QuartzStartup.cs +++ b/Shoko.Server/Scheduling/QuartzStartup.cs @@ -7,6 +7,7 @@ using Quartz; using Quartz.AspNetCore; using QuartzJobFactory; +using Shoko.Server.Scheduling.Delegates; using Shoko.Server.Scheduling.Jobs.Actions; using Shoko.Server.Scheduling.Jobs.Shoko; using Shoko.Server.Server; @@ -58,17 +59,17 @@ private static void UseDatabase(this IServiceCollectionQuartzConfigurator q) if (settings.Quartz.DatabaseType.Trim().Equals(Constants.DatabaseType.SqlServer, StringComparison.InvariantCultureIgnoreCase)) { EnsureQuartzDatabaseExists_SQLServer(settings.Quartz.ConnectionString); - options.UseSqlServer(c => c.ConnectionString = settings.Quartz.ConnectionString); + options.UseGenericDatabase("SqlServer", c => c.ConnectionString = settings.Quartz.ConnectionString); } else if (settings.Quartz.DatabaseType.Trim().Equals(Constants.DatabaseType.MySQL, StringComparison.InvariantCultureIgnoreCase)) { EnsureQuartzDatabaseExists_MySQL(settings.Quartz.ConnectionString); - options.UseMySqlConnector(c => c.ConnectionString = settings.Quartz.ConnectionString); + options.UseGenericDatabase("MySqlConnector", c => c.ConnectionString = settings.Quartz.ConnectionString); } else if (settings.Quartz.DatabaseType.Trim().Equals(Constants.DatabaseType.Sqlite, StringComparison.InvariantCultureIgnoreCase)) { EnsureQuartzDatabaseExists_SQLite(settings.Quartz.ConnectionString); - options.UseMicrosoftSQLite(c => c.ConnectionString = settings.Quartz.ConnectionString); + options.UseGenericDatabase("SQLite-Microsoft", c => c.ConnectionString = settings.Quartz.ConnectionString); } options.UseNewtonsoftJsonSerializer(); }); diff --git a/Shoko.Server/Scheduling/ThreadPooledJobStore.cs b/Shoko.Server/Scheduling/ThreadPooledJobStore.cs index b095f8ac0..0ba577520 100644 --- a/Shoko.Server/Scheduling/ThreadPooledJobStore.cs +++ b/Shoko.Server/Scheduling/ThreadPooledJobStore.cs @@ -9,6 +9,7 @@ using Quartz.Impl.AdoJobStore; using Quartz.Spi; using Quartz.Util; +using Shoko.Server.Scheduling.Delegates; using Shoko.Server.Utilities; namespace Shoko.Server.Scheduling; @@ -54,8 +55,9 @@ protected override async Task> AcquireNext context.CurrentLoopCount++; try { - var results = await Delegate.SelectTriggerToAcquire(conn, noLaterThan + timeWindow, MisfireTime, maxCount, cancellationToken) - .ConfigureAwait(false); + var typesToExclude = GetTypesToExclude(); + var results = await (Delegate as IFilteredDriverDelegate)! + .SelectTriggerToAcquire(conn, noLaterThan + timeWindow, MisfireTime, maxCount, typesToExclude, cancellationToken).ConfigureAwait(false); // No trigger is ready to fire yet. if (results.Count == 0) @@ -96,13 +98,6 @@ protected override async Task> AcquireNext continue; } - // We can choose to not select a trigger for whatever reason, like it's running, and it's set to not allow concurrency - // rate limiting, database not available, etc - // TODO we might move some of this to the delegated SQL for performance reasons. - // Concurrency changes fast enough to not be worth it, but network availability and whatnot could result in many queries - // We could do this by building a map of types like the current queue does and filtering on QRTZ_JOB_DETAILS.JobClass - // We would need to override SQLiteDelegate, SqlServerDelegate, and MySQLDelegate - // QuartzStartup needs to have UseGenericDatabase for each instead of UseSqlServer, etc if (!JobAllowed(context)) continue; var nextFireTimeUtc = nextTrigger.GetNextFireTimeUtc(); @@ -165,6 +160,12 @@ protected override async Task> AcquireNext return acquiredTriggers; } + private Type[] GetTypesToExclude() + { + // TODO We can get the status of things and add things to exclude + return Array.Empty(); + } + private bool JobAllowed(TriggerAcquisitionContext context) { var jobType = context.CurrentJobType;