Skip to content

Commit

Permalink
Clearing Queue Support
Browse files Browse the repository at this point in the history
  • Loading branch information
da3dsoul committed Feb 16, 2024
1 parent 91cca30 commit ff2e497
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 14 deletions.
4 changes: 2 additions & 2 deletions Shoko.Server/API/v3/Controllers/QueueController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ public async Task<ActionResult> Pause()
/// <returns>Void.</returns>
[Authorize("admin")]
[HttpPost("Clear")]
public ActionResult Clear()
public async Task<ActionResult> Clear()
{
// TODO clear queue
await _queueHandler.Clear();
return Ok();
}

Expand Down
34 changes: 29 additions & 5 deletions Shoko.Server/Scheduling/QuartzStartup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
using System.IO;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.DependencyInjection;
using MySqlConnector;
using Quartz;
using Quartz.AspNetCore;
using QuartzJobFactory;
using QuartzJobFactory.Attributes;
using Shoko.Server.Scheduling.Acquisition.Filters;
using Shoko.Server.Scheduling.Delegates;
using Shoko.Server.Scheduling.Jobs;
Expand All @@ -21,9 +23,36 @@ namespace Shoko.Server.Scheduling;

public static class QuartzStartup
{
public static async Task ScheduleRecurringJobs(bool replace)
{
// this needs to run immediately upon scheduling, so it replaces always. Others will run on other schedules
await ScheduleRecurringJob<CheckNetworkAvailabilityJob>(
triggerConfig: t => t.WithSimpleSchedule(tr => tr.WithIntervalInMinutes(5).RepeatForever()).StartNow(), replace: true);

// TODO the other schedule-based jobs that are on timers
}

private static async Task ScheduleRecurringJob<T>(Action<T> jobConfig = null, Func<TriggerBuilder, TriggerBuilder> triggerConfig = null, bool replace = false) where T : class, IJob
{
var scheduler = await Utils.ServiceContainer.GetRequiredService<ISchedulerFactory>().GetScheduler();
jobConfig ??= _ => {};
triggerConfig ??= t => t;
var groupName = typeof(T).GetCustomAttribute<JobKeyGroupAttribute>()?.GroupName;
var jobKey = JobKeyBuilder<T>.Create().WithGroup(groupName).UsingJobData(jobConfig).Build();
if (!await scheduler.CheckExists(jobKey))
{
await scheduler.ScheduleJob(JobBuilder<T>.Create().UsingJobData(jobConfig).WithGeneratedIdentity().Build(),
triggerConfig(TriggerBuilder.Create()).Build());
} else if (replace)
{
await scheduler.RescheduleJob(new TriggerKey(jobKey.Name, jobKey.Group), triggerConfig(TriggerBuilder.Create()).Build());
}
}

internal static void AddQuartz(this IServiceCollection services)
{
// this lets us inject the shoko JobFactory explicitly, instead of only IJobFactory
ShokoEventHandler.Instance.Starting += (_, _) => ScheduleRecurringJobs(false).GetAwaiter().GetResult();
services.AddSingleton<JobFactory>();
services.AddSingleton<ThreadPooledJobStore>();
services.AddSingleton<QueueHandler>();
Expand All @@ -39,11 +68,6 @@ internal static void AddQuartz(this IServiceCollection services)
q.MaxBatchSize = (int) Math.Round(Environment.ProcessorCount * 1.25D, MidpointRounding.AwayFromZero);
q.BatchTriggerAcquisitionFireAheadTimeWindow = TimeSpan.FromSeconds(30);
q.UseJobFactory<JobFactory>();

// Register the connectivity monitor job with a trigger that executes every 5 minutes
q.ScheduleJob<CheckNetworkAvailabilityJob>(
trigger => trigger.WithIdentity("UptimeMonitor", "System").WithSimpleSchedule(tr => tr.WithIntervalInMinutes(5).RepeatForever()).StartNow(),
j => j.WithGeneratedIdentity());
});

services.AddQuartzServer(options =>
Expand Down
8 changes: 8 additions & 0 deletions Shoko.Server/Scheduling/QueueHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ public async Task Resume()
await scheduler.Start();
}

public async Task Clear()
{
var scheduler = await _schedulerFactory.GetScheduler();
if (scheduler.IsShutdown || !scheduler.IsStarted) return;
await scheduler.Clear();
await QuartzStartup.ScheduleRecurringJobs(false);
}

public bool Paused
{
get
Expand Down
8 changes: 4 additions & 4 deletions Shoko.Server/Scheduling/ThreadPooledJobStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ public ThreadPooledJobStore(ILogger<ThreadPooledJobStore> logger, IEnumerable<IA
_queueStateEventHandler = queueStateEventHandler;
_jobFactory = jobFactory;
_acquisitionFilters = acquisitionFilters.ToArray();
foreach (var filter in _acquisitionFilters)
{
filter.StateChanged += FilterOnStateChanged;
}
InitConcurrencyCache();
}

public override async Task Initialize(ITypeLoadHelper loadHelper, ISchedulerSignaler signaler, CancellationToken cancellationToken = default)
{
_signaler = signaler;
foreach (var filter in _acquisitionFilters)
{
filter.StateChanged += FilterOnStateChanged;
}
_typeLoadHelper = loadHelper;
await base.Initialize(loadHelper, signaler, cancellationToken);
}
Expand Down
12 changes: 9 additions & 3 deletions Shoko.Server/Server/ShokoEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public class ShokoEventHandler : IShokoEventHandler
public event EventHandler<SettingsSavedEventArgs> SettingsSaved;
public event EventHandler<AVDumpEventArgs> AVDumpEvent;

public event EventHandler Start;
public event EventHandler Starting;
public event EventHandler Started;
public event EventHandler<CancelEventArgs> Shutdown;


Expand Down Expand Up @@ -224,9 +225,14 @@ public void OnAVDumpGenericException(AVDumpHelper.AVDumpSession session, Excepti
});
}

public void OnStart()
public void OnStarting()
{
Start?.Invoke(null, EventArgs.Empty);
Starting?.Invoke(null, EventArgs.Empty);
}

public void OnStarted()
{
Started?.Invoke(null, EventArgs.Empty);
}

public bool OnShutdown()
Expand Down
4 changes: 4 additions & 0 deletions Shoko.Server/Server/ShokoServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public bool StartUpServer()
// run rotator once and set 24h delay
Utils.ServiceContainer.GetRequiredService<LogRotator>().Start();

ShokoEventHandler.Instance.OnStarting();

// for log readability, this will simply init the singleton
Utils.ServiceContainer.GetService<IUDPConnectionHandler>();
return true;
Expand Down Expand Up @@ -165,6 +167,7 @@ private bool CheckBlockedFiles()

#region Database settings and initial start up

public event EventHandler ServerStarting;
public event EventHandler LoginFormNeeded;
public event EventHandler DatabaseSetup;
public event EventHandler DBSetupCompleted;
Expand Down Expand Up @@ -203,6 +206,7 @@ private void WorkerSetupDB_ReportProgress()
}

DBSetupCompleted?.Invoke(this, EventArgs.Empty);
ShokoEventHandler.Instance.OnStarted();
}

private void ShowDatabaseSetup()
Expand Down

0 comments on commit ff2e497

Please sign in to comment.