Skip to content

Commit

Permalink
Hopefully fix the queue flickering
Browse files Browse the repository at this point in the history
  • Loading branch information
da3dsoul committed Mar 14, 2024
1 parent 1971c87 commit e4f14e8
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 43 deletions.
47 changes: 33 additions & 14 deletions Shoko.Server/Scheduling/Delegates/MySQLDelegate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class MySQLDelegate : Quartz.Impl.AdoJobStore.MySQLDelegate, IFilteredDri
WHERE t.{ColumnSchedulerName} = @schedulerName AND {ColumnTriggerState} = @state AND {ColumnNextFireTime} <= @noLaterThan AND ({ColumnMifireInstruction} = -1 OR ({ColumnMifireInstruction} <> -1 AND {ColumnNextFireTime} >= @noEarlierThan))
AND jd.{ColumnJobClass} NOT IN (@types)";

private static string GetSelectPartLimitType(int index)
private static string GetSelectPartOfType(int index)
{
return @$"SELECT t.{ColumnTriggerName}, t.{ColumnTriggerGroup}, jd.{ColumnJobClass}, t.{ColumnPriority}, t.{ColumnNextFireTime}, @limitBlocked{index} as {Blocked}
FROM {TablePrefixSubst}{TableTriggers} t
Expand All @@ -46,7 +46,7 @@ private static string GetSelectPartLimitType(int index)
LIMIT @limit{index} OFFSET @offset{index}";
}

private static string GetSelectPartConcurrencyGroup(int index)
private static string GetSelectPartInTypes(int index)
{
return @$"SELECT t.{ColumnTriggerName}, t.{ColumnTriggerGroup}, jd.{ColumnJobClass}, t.{ColumnPriority}, t.{ColumnNextFireTime}, @groupBlocked{index} as {Blocked}
FROM {TablePrefixSubst}{TableTriggers} t
Expand Down Expand Up @@ -105,14 +105,14 @@ public async Task<IReadOnlyCollection<TriggerAcquireResult>> SelectTriggerToAcqu
for (index = 0; index < jobTypes.TypesToLimit.Count; index++)
{
commandText.Append("\nUNION SELECT * FROM (\n");
commandText.Append(GetSelectPartLimitType(index));
commandText.Append(GetSelectPartOfType(index));
commandText.Append("\n)");
}

for (index = 0; index < jobTypes.AvailableConcurrencyGroups.Count(); index++)
{
commandText.Append("\nUNION SELECT * FROM (\n");
commandText.Append(GetSelectPartConcurrencyGroup(index));
commandText.Append(GetSelectPartInTypes(index));
commandText.Append("\n)");
}

Expand Down Expand Up @@ -188,14 +188,14 @@ public virtual async Task<int> SelectWaitingTriggerCount(ConnectionAndTransactio
for (index = 0; index < jobTypes.TypesToLimit.Count; index++)
{
commandText.Append("\nUNION SELECT * FROM (\n");
commandText.Append(GetSelectPartLimitType(index));
commandText.Append(GetSelectPartOfType(index));
commandText.Append("\n)");
}

for (index = 0; index < jobTypes.AvailableConcurrencyGroups.Count(); index++)
{
commandText.Append("\nUNION SELECT * FROM (\n");
commandText.Append(GetSelectPartConcurrencyGroup(index));
commandText.Append(GetSelectPartInTypes(index));
commandText.Append("\n)");
}

Expand Down Expand Up @@ -302,40 +302,51 @@ public virtual async Task<Dictionary<Type, int>> SelectJobTypeCounts(ConnectionA
subquery.Append(hasExcludeTypes ? GetSelectPartExcludingTypes : GetSelectPartNoExclusions);

int index;
var startIndex = 0;
// not blocked
for (index = 0; index < jobTypes.TypesToLimit.Count; index++)
for (index = 0; index < startIndex + jobTypes.TypesToLimit.Count; index++)
{
subquery.Append("\nUNION SELECT * FROM (\n");
subquery.Append(GetSelectPartLimitType(index));
subquery.Append(GetSelectPartOfType(index));
subquery.Append("\n)");
}

// blocked
if (!excludeBlocked)
{
for (; index < 2 * jobTypes.TypesToLimit.Count; index++)
startIndex = index;
for (; index < startIndex + jobTypes.TypesToLimit.Count; index++)
{
subquery.Append("\nUNION SELECT * FROM (\n");
subquery.Append(GetSelectPartLimitType(index));
subquery.Append(GetSelectPartOfType(index));
subquery.Append("\n)");
}
}

// not blocked
for (index = 0; index < jobTypes.AvailableConcurrencyGroups.Count(); index++)
startIndex = 0;
for (index = 0; index < startIndex + jobTypes.AvailableConcurrencyGroups.Count(); index++)
{
subquery.Append("\nUNION SELECT * FROM (\n");
subquery.Append(GetSelectPartConcurrencyGroup(index));
subquery.Append(GetSelectPartInTypes(index));
subquery.Append("\n)");
}

// blocked
if (!excludeBlocked)
{
for (; index < 2 * jobTypes.AvailableConcurrencyGroups.Count(); index++)
startIndex = index;
for (; index < startIndex + jobTypes.AvailableConcurrencyGroups.Count(); index++)
{
subquery.Append("\nUNION SELECT * FROM (\n");
subquery.Append(GetSelectPartInTypes(index));
subquery.Append("\n)");
}

if (hasExcludeTypes)
{
subquery.Append("\nUNION SELECT * FROM (\n");
subquery.Append(GetSelectPartConcurrencyGroup(index));
subquery.Append(GetSelectPartInTypes(index));
subquery.Append("\n)");
}
}
Expand Down Expand Up @@ -398,6 +409,14 @@ public virtual async Task<Dictionary<Type, int>> SelectJobTypeCounts(ConnectionA
AddCommandParameter(cmd, $"groupOffset{index}", 1);
index++;
}

if (hasExcludeTypes)
{
AddCommandParameter(cmd, $"groupBlocked{index}", 1);
cmd.AddArrayParameters($"groupLimit{index}Types", GetJobClasses(jobTypes.TypesToExclude));
AddCommandParameter(cmd, $"groupLimit{index}", -1);
AddCommandParameter(cmd, $"groupOffset{index}", 0);
}
}

await using var rs = await cmd.ExecuteReaderAsync(System.Data.CommandBehavior.SequentialAccess, cancellationToken).ConfigureAwait(false);
Expand Down
47 changes: 33 additions & 14 deletions Shoko.Server/Scheduling/Delegates/SQLiteDelegate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class SQLiteDelegate : Quartz.Impl.AdoJobStore.SQLiteDelegate, IFilteredD
WHERE t.{ColumnSchedulerName} = @schedulerName AND {ColumnTriggerState} = @state AND {ColumnNextFireTime} <= @noLaterThan AND ({ColumnMifireInstruction} = -1 OR ({ColumnMifireInstruction} <> -1 AND {ColumnNextFireTime} >= @noEarlierThan))
AND jd.{ColumnJobClass} NOT IN (@types)";

private static string GetSelectPartLimitType(int index)
private static string GetSelectPartOfType(int index)
{
return @$"SELECT t.{ColumnTriggerName}, t.{ColumnTriggerGroup}, jd.{ColumnJobClass}, t.{ColumnPriority}, t.{ColumnNextFireTime}, @limitBlocked{index} as {Blocked}
FROM {TablePrefixSubst}{TableTriggers} t
Expand All @@ -47,7 +47,7 @@ private static string GetSelectPartLimitType(int index)
LIMIT @limit{index} OFFSET @offset{index}";
}

private static string GetSelectPartConcurrencyGroup(int index)
private static string GetSelectPartInTypes(int index)
{
return @$"SELECT t.{ColumnTriggerName}, t.{ColumnTriggerGroup}, jd.{ColumnJobClass}, t.{ColumnPriority}, t.{ColumnNextFireTime}, @groupBlocked{index} as {Blocked}
FROM {TablePrefixSubst}{TableTriggers} t
Expand Down Expand Up @@ -110,14 +110,14 @@ public async Task<IReadOnlyCollection<TriggerAcquireResult>> SelectTriggerToAcqu
for (index = 0; index < jobTypes.TypesToLimit.Count; index++)
{
commandText.Append("\nUNION SELECT * FROM (\n");
commandText.Append(GetSelectPartLimitType(index));
commandText.Append(GetSelectPartOfType(index));
commandText.Append("\n)");
}

for (index = 0; index < jobTypes.AvailableConcurrencyGroups.Count(); index++)
{
commandText.Append("\nUNION SELECT * FROM (\n");
commandText.Append(GetSelectPartConcurrencyGroup(index));
commandText.Append(GetSelectPartInTypes(index));
commandText.Append("\n)");
}

Expand Down Expand Up @@ -193,14 +193,14 @@ public virtual async Task<int> SelectWaitingTriggerCount(ConnectionAndTransactio
for (index = 0; index < jobTypes.TypesToLimit.Count; index++)
{
commandText.Append("\nUNION SELECT * FROM (\n");
commandText.Append(GetSelectPartLimitType(index));
commandText.Append(GetSelectPartOfType(index));
commandText.Append("\n)");
}

for (index = 0; index < jobTypes.AvailableConcurrencyGroups.Count(); index++)
{
commandText.Append("\nUNION SELECT * FROM (\n");
commandText.Append(GetSelectPartConcurrencyGroup(index));
commandText.Append(GetSelectPartInTypes(index));
commandText.Append("\n)");
}

Expand Down Expand Up @@ -306,40 +306,51 @@ public virtual async Task<Dictionary<Type, int>> SelectJobTypeCounts(ConnectionA
subquery.Append(hasExcludeTypes ? GetSelectPartExcludingTypes : GetSelectPartNoExclusions);

int index;
var startIndex = 0;
// not blocked
for (index = 0; index < jobTypes.TypesToLimit.Count; index++)
for (index = 0; index < startIndex + jobTypes.TypesToLimit.Count; index++)
{
subquery.Append("\nUNION SELECT * FROM (\n");
subquery.Append(GetSelectPartLimitType(index));
subquery.Append(GetSelectPartOfType(index));
subquery.Append("\n)");
}

// blocked
if (!excludeBlocked)
{
for (; index < 2 * jobTypes.TypesToLimit.Count; index++)
startIndex = index;
for (; index < startIndex + jobTypes.TypesToLimit.Count; index++)
{
subquery.Append("\nUNION SELECT * FROM (\n");
subquery.Append(GetSelectPartLimitType(index));
subquery.Append(GetSelectPartOfType(index));
subquery.Append("\n)");
}
}

// not blocked
for (index = 0; index < jobTypes.AvailableConcurrencyGroups.Count(); index++)
startIndex = 0;
for (index = 0; index < startIndex + jobTypes.AvailableConcurrencyGroups.Count(); index++)
{
subquery.Append("\nUNION SELECT * FROM (\n");
subquery.Append(GetSelectPartConcurrencyGroup(index));
subquery.Append(GetSelectPartInTypes(index));
subquery.Append("\n)");
}

// blocked
if (!excludeBlocked)
{
for (; index < 2 * jobTypes.AvailableConcurrencyGroups.Count(); index++)
startIndex = index;
for (; index < startIndex + jobTypes.AvailableConcurrencyGroups.Count(); index++)
{
subquery.Append("\nUNION SELECT * FROM (\n");
subquery.Append(GetSelectPartInTypes(index));
subquery.Append("\n)");
}

if (hasExcludeTypes)
{
subquery.Append("\nUNION SELECT * FROM (\n");
subquery.Append(GetSelectPartConcurrencyGroup(index));
subquery.Append(GetSelectPartInTypes(index));
subquery.Append("\n)");
}
}
Expand Down Expand Up @@ -402,6 +413,14 @@ public virtual async Task<Dictionary<Type, int>> SelectJobTypeCounts(ConnectionA
AddCommandParameter(cmd, $"groupOffset{index}", 1);
index++;
}

if (hasExcludeTypes)
{
AddCommandParameter(cmd, $"groupBlocked{index}", 1);
cmd.AddArrayParameters($"groupLimit{index}Types", GetJobClasses(jobTypes.TypesToExclude));
AddCommandParameter(cmd, $"groupLimit{index}", -1);
AddCommandParameter(cmd, $"groupOffset{index}", 0);
}
}

await using var rs = await cmd.ExecuteReaderAsync(System.Data.CommandBehavior.SequentialAccess, cancellationToken).ConfigureAwait(false);
Expand Down
47 changes: 33 additions & 14 deletions Shoko.Server/Scheduling/Delegates/SqlServerDelegate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class SqlServerDelegate : Quartz.Impl.AdoJobStore.SqlServerDelegate, IFil
WHERE t.{ColumnSchedulerName} = @schedulerName AND {ColumnTriggerState} = @state AND {ColumnNextFireTime} <= @noLaterThan AND ({ColumnMifireInstruction} = -1 OR ({ColumnMifireInstruction} <> -1 AND {ColumnNextFireTime} >= @noEarlierThan))
AND jd.{ColumnJobClass} NOT IN (@types)";

private static string GetSelectPartLimitType(int index)
private static string GetSelectPartOfType(int index)
{
return @$"SELECT t.{ColumnTriggerName}, t.{ColumnTriggerGroup}, jd.{ColumnJobClass}, t.{ColumnPriority}, t.{ColumnNextFireTime}, @limitBlocked{index} as {Blocked}
FROM {TablePrefixSubst}{TableTriggers} t
Expand All @@ -46,7 +46,7 @@ private static string GetSelectPartLimitType(int index)
OFFSET @offset{index} ROWS FETCH NEXT @limit{index} ROWS ONLY";
}

private static string GetSelectPartConcurrencyGroup(int index)
private static string GetSelectPartInTypes(int index)
{
return @$"SELECT t.{ColumnTriggerName}, t.{ColumnTriggerGroup}, jd.{ColumnJobClass}, t.{ColumnPriority}, t.{ColumnNextFireTime}, @groupBlocked{index} as {Blocked}
FROM {TablePrefixSubst}{TableTriggers} t
Expand Down Expand Up @@ -109,14 +109,14 @@ public async Task<IReadOnlyCollection<TriggerAcquireResult>> SelectTriggerToAcqu
for (index = 0; index < jobTypes.TypesToLimit.Count; index++)
{
commandText.Append("\nUNION SELECT * FROM (\n");
commandText.Append(GetSelectPartLimitType(index));
commandText.Append(GetSelectPartOfType(index));
commandText.Append("\n)");
}

for (index = 0; index < jobTypes.AvailableConcurrencyGroups.Count(); index++)
{
commandText.Append("\nUNION SELECT * FROM (\n");
commandText.Append(GetSelectPartConcurrencyGroup(index));
commandText.Append(GetSelectPartInTypes(index));
commandText.Append("\n)");
}

Expand Down Expand Up @@ -192,14 +192,14 @@ public virtual async Task<int> SelectWaitingTriggerCount(ConnectionAndTransactio
for (index = 0; index < jobTypes.TypesToLimit.Count; index++)
{
commandText.Append("\nUNION SELECT * FROM (\n");
commandText.Append(GetSelectPartLimitType(index));
commandText.Append(GetSelectPartOfType(index));
commandText.Append("\n)");
}

for (index = 0; index < jobTypes.AvailableConcurrencyGroups.Count(); index++)
{
commandText.Append("\nUNION SELECT * FROM (\n");
commandText.Append(GetSelectPartConcurrencyGroup(index));
commandText.Append(GetSelectPartInTypes(index));
commandText.Append("\n)");
}

Expand Down Expand Up @@ -306,40 +306,51 @@ public virtual async Task<Dictionary<Type, int>> SelectJobTypeCounts(ConnectionA
subquery.Append(hasExcludeTypes ? GetSelectPartExcludingTypes : GetSelectPartNoExclusions);

int index;
var startIndex = 0;
// not blocked
for (index = 0; index < jobTypes.TypesToLimit.Count; index++)
for (index = 0; index < startIndex + jobTypes.TypesToLimit.Count; index++)
{
subquery.Append("\nUNION SELECT * FROM (\n");
subquery.Append(GetSelectPartLimitType(index));
subquery.Append(GetSelectPartOfType(index));
subquery.Append("\n)");
}

// blocked
if (!excludeBlocked)
{
for (; index < 2 * jobTypes.TypesToLimit.Count; index++)
startIndex = index;
for (; index < startIndex + jobTypes.TypesToLimit.Count; index++)
{
subquery.Append("\nUNION SELECT * FROM (\n");
subquery.Append(GetSelectPartLimitType(index));
subquery.Append(GetSelectPartOfType(index));
subquery.Append("\n)");
}
}

// not blocked
for (index = 0; index < jobTypes.AvailableConcurrencyGroups.Count(); index++)
startIndex = 0;
for (index = 0; index < startIndex + jobTypes.AvailableConcurrencyGroups.Count(); index++)
{
subquery.Append("\nUNION SELECT * FROM (\n");
subquery.Append(GetSelectPartConcurrencyGroup(index));
subquery.Append(GetSelectPartInTypes(index));
subquery.Append("\n)");
}

// blocked
if (!excludeBlocked)
{
for (; index < 2 * jobTypes.AvailableConcurrencyGroups.Count(); index++)
startIndex = index;
for (; index < startIndex + jobTypes.AvailableConcurrencyGroups.Count(); index++)
{
subquery.Append("\nUNION SELECT * FROM (\n");
subquery.Append(GetSelectPartInTypes(index));
subquery.Append("\n)");
}

if (hasExcludeTypes)
{
subquery.Append("\nUNION SELECT * FROM (\n");
subquery.Append(GetSelectPartConcurrencyGroup(index));
subquery.Append(GetSelectPartInTypes(index));
subquery.Append("\n)");
}
}
Expand Down Expand Up @@ -402,6 +413,14 @@ public virtual async Task<Dictionary<Type, int>> SelectJobTypeCounts(ConnectionA
AddCommandParameter(cmd, $"groupOffset{index}", 1);
index++;
}

if (hasExcludeTypes)
{
AddCommandParameter(cmd, $"groupBlocked{index}", 1);
cmd.AddArrayParameters($"groupLimit{index}Types", GetJobClasses(jobTypes.TypesToExclude));
AddCommandParameter(cmd, $"groupLimit{index}", -1);
AddCommandParameter(cmd, $"groupOffset{index}", 0);
}
}

await using var rs = await cmd.ExecuteReaderAsync(System.Data.CommandBehavior.SequentialAccess, cancellationToken).ConfigureAwait(false);
Expand Down
1 change: 0 additions & 1 deletion Shoko.Server/Scheduling/ThreadPooledJobStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
using Shoko.Server.Scheduling.Delegates;
using Shoko.Server.Settings;
using Shoko.Server.Utilities;
using SQLiteDelegate = Shoko.Server.Scheduling.Delegates.SQLiteDelegate;

namespace Shoko.Server.Scheduling;

Expand Down

0 comments on commit e4f14e8

Please sign in to comment.