From 46f56a502e3357b1e46e63aa828c5056c190b5f3 Mon Sep 17 00:00:00 2001 From: da3dsoul Date: Wed, 10 Jan 2024 10:09:00 -0500 Subject: [PATCH] More work on scheduling --- .../Repositories/BaseCachedRepository.cs | 47 +++ .../Concurrency/ConcurrencyGroups.cs | 7 + .../DisallowConcurrencyGroupAttribute.cs | 17 ++ .../LimitConcurrencyAttribute.cs | 5 +- .../Scheduling/Jobs/AniDB/AniDBGetFileJob.cs | 279 ++++++++++++++++++ Shoko.Server/Scheduling/Jobs/BaseJob.cs | 15 +- .../Scheduling/Jobs/Shoko/DiscoverFileJob.cs | 146 +++++---- .../Scheduling/Jobs/Shoko/HashFileJob.cs | 152 +++++----- .../Scheduling/ThreadPooledJobStore.cs | 41 ++- 9 files changed, 535 insertions(+), 174 deletions(-) create mode 100644 Shoko.Server/Scheduling/Concurrency/ConcurrencyGroups.cs create mode 100644 Shoko.Server/Scheduling/Concurrency/DisallowConcurrencyGroupAttribute.cs rename Shoko.Server/Scheduling/{ => Concurrency}/LimitConcurrencyAttribute.cs (69%) create mode 100644 Shoko.Server/Scheduling/Jobs/AniDB/AniDBGetFileJob.cs diff --git a/Shoko.Server/Repositories/BaseCachedRepository.cs b/Shoko.Server/Repositories/BaseCachedRepository.cs index eedb6c3a7..2b910b93d 100644 --- a/Shoko.Server/Repositories/BaseCachedRepository.cs +++ b/Shoko.Server/Repositories/BaseCachedRepository.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading; +using System.Threading.Tasks; using NHibernate; using NutzCode.InMemoryIndex; using Shoko.Commons.Properties; @@ -166,6 +167,16 @@ public virtual void DeleteWithOpenTransaction(ISession session, T cr) WriteLock(() => DeleteFromCacheUnsafe(cr)); } + public virtual async Task DeleteWithOpenTransactionAsync(ISession session, T cr) + { + if (cr == null) return; + + DeleteWithOpenTransactionCallback?.Invoke(session, cr); + await Lock(async () => await session.DeleteAsync(cr)); + + WriteLock(() => DeleteFromCacheUnsafe(cr)); + } + //This function do not run the BeginDeleteCallback and the EndDeleteCallback public void DeleteWithOpenTransaction(ISession session, List objs) { @@ -182,6 +193,19 @@ public void DeleteWithOpenTransaction(ISession session, List objs) WriteLock(() => objs.ForEach(DeleteFromCacheUnsafe)); } + + public async Task DeleteWithOpenTransactionAsync(ISession session, List objs) + { + if (objs.Count == 0) return; + + foreach (var cr in objs) + { + DeleteWithOpenTransactionCallback?.Invoke(session, cr); + await Lock(async () => await session.DeleteAsync(cr)); + } + + WriteLock(() => objs.ForEach(DeleteFromCacheUnsafe)); + } public virtual void Save(T obj) { @@ -304,6 +328,29 @@ public void SaveWithOpenTransaction(ISession session, List objs) WriteLock(() => UpdateCacheUnsafe(obj)); } } + + public async Task SaveWithOpenTransactionAsync(ISession session, List objs) + { + if (objs.Count == 0) return; + + await Lock(async () => + { + foreach (var obj in objs) + { + await session.SaveOrUpdateAsync(obj); + } + }); + + foreach (var obj in objs) + { + SaveWithOpenTransactionCallback?.Invoke(session.Wrap(), obj); + } + + foreach (var obj in objs) + { + WriteLock(() => UpdateCacheUnsafe(obj)); + } + } protected void ReadLock(Action action) { diff --git a/Shoko.Server/Scheduling/Concurrency/ConcurrencyGroups.cs b/Shoko.Server/Scheduling/Concurrency/ConcurrencyGroups.cs new file mode 100644 index 000000000..134ad969a --- /dev/null +++ b/Shoko.Server/Scheduling/Concurrency/ConcurrencyGroups.cs @@ -0,0 +1,7 @@ +namespace Shoko.Server.Scheduling.Concurrency; + +public static class ConcurrencyGroups +{ + public const string AniDB_UDP = "AniDB_UDP"; + public const string AniDB_HTTP = "AniDB_HTTP"; +} diff --git a/Shoko.Server/Scheduling/Concurrency/DisallowConcurrencyGroupAttribute.cs b/Shoko.Server/Scheduling/Concurrency/DisallowConcurrencyGroupAttribute.cs new file mode 100644 index 000000000..d57d9e890 --- /dev/null +++ b/Shoko.Server/Scheduling/Concurrency/DisallowConcurrencyGroupAttribute.cs @@ -0,0 +1,17 @@ +using System; + +namespace Shoko.Server.Scheduling.Concurrency; + +[AttributeUsage(AttributeTargets.Class)] +public class DisallowConcurrencyGroupAttribute : Attribute +{ + /// + /// The group to consider concurrency with. More than one Job in the same group will be disallowed from running concurrently + /// + public string Group { get; set; } + + public DisallowConcurrencyGroupAttribute(string group = null) + { + Group = group; + } +} diff --git a/Shoko.Server/Scheduling/LimitConcurrencyAttribute.cs b/Shoko.Server/Scheduling/Concurrency/LimitConcurrencyAttribute.cs similarity index 69% rename from Shoko.Server/Scheduling/LimitConcurrencyAttribute.cs rename to Shoko.Server/Scheduling/Concurrency/LimitConcurrencyAttribute.cs index b6396cd1a..bd7d7feb2 100644 --- a/Shoko.Server/Scheduling/LimitConcurrencyAttribute.cs +++ b/Shoko.Server/Scheduling/Concurrency/LimitConcurrencyAttribute.cs @@ -1,6 +1,6 @@ using System; -namespace Shoko.Server.Scheduling; +namespace Shoko.Server.Scheduling.Concurrency; public class LimitConcurrencyAttribute : Attribute { @@ -14,8 +14,9 @@ public class LimitConcurrencyAttribute : Attribute /// public int MaxAllowedConcurrentJobs { get; set; } - public LimitConcurrencyAttribute(int maxConcurrentJobs = 4) + public LimitConcurrencyAttribute(int maxConcurrentJobs = 4, int maxAllowedConcurrentJobs = 0) { MaxConcurrentJobs = maxConcurrentJobs; + MaxAllowedConcurrentJobs = maxAllowedConcurrentJobs; } } diff --git a/Shoko.Server/Scheduling/Jobs/AniDB/AniDBGetFileJob.cs b/Shoko.Server/Scheduling/Jobs/AniDB/AniDBGetFileJob.cs new file mode 100644 index 000000000..c6650aefc --- /dev/null +++ b/Shoko.Server/Scheduling/Jobs/AniDB/AniDBGetFileJob.cs @@ -0,0 +1,279 @@ +using System; +using System.Linq; +using System.Text.Json.Serialization; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Quartz; +using Shoko.Commons.Queue; +using Shoko.Models.Enums; +using Shoko.Models.Queue; +using Shoko.Models.Server; +using Shoko.Server.Commands.Attributes; +using Shoko.Server.Databases; +using Shoko.Server.Models; +using Shoko.Server.Providers.AniDB; +using Shoko.Server.Providers.AniDB.Interfaces; +using Shoko.Server.Providers.AniDB.UDP.Generic; +using Shoko.Server.Providers.AniDB.UDP.Info; +using Shoko.Server.Repositories; +using Shoko.Server.Scheduling.Concurrency; +using Shoko.Server.Server; + +namespace Shoko.Server.Scheduling.Jobs.AniDB; + +[DisallowConcurrencyGroup(ConcurrencyGroups.AniDB_UDP)] +[Command(CommandRequestType.AniDB_GetFileUDP)] +public class AniDBGetFileJob : BaseJob +{ + private readonly IUDPConnectionHandler _handler; + private readonly IRequestFactory _requestFactory; + private SVR_VideoLocal _vlocal; + + public virtual int VideoLocalID { get; set; } + public virtual bool ForceAniDB { get; set; } + [JsonIgnore] public virtual SVR_AniDB_File Result { get; set; } + + public override QueueStateStruct Description + { + get + { + if (_vlocal != null) + { + return new QueueStateStruct + { + message = "Getting file info from UDP API: {0}", + queueState = QueueStateEnum.GetFileInfo, + extraParams = new[] { _vlocal.FileName } + }; + } + + return new QueueStateStruct + { + message = "Getting file info from UDP API: {0}", + queueState = QueueStateEnum.GetFileInfo, + extraParams = new[] { VideoLocalID.ToString() } + }; + } + } + + protected override async Task Process(IJobExecutionContext context) + { + Logger.LogInformation("Get AniDB file info: {VideoLocalID}", VideoLocalID); + + if (_handler.IsBanned) + { + throw new AniDBBannedException + { + BanType = UpdateType.UDPBan, BanExpires = _handler.BanTime?.AddHours(_handler.BanTimerResetLength) + }; + } + + _vlocal ??= RepoFactory.VideoLocal.GetByID(VideoLocalID); + if (_vlocal == null) + { + return; + } + + var aniFile = RepoFactory.AniDB_File.GetByHashAndFileSize(_vlocal.Hash, _vlocal.FileSize); + + UDPResponse response = null; + if (aniFile == null || ForceAniDB) + { + var request = _requestFactory.Create( + r => + { + r.Hash = _vlocal.Hash; + r.Size = _vlocal.FileSize; + } + ); + response = request.Execute(); + } + + if (response?.Response == null) + { + Logger.LogInformation("File {VideoLocalID} ({Ed2kHash} | {FileName}) could not be found on AniDB", + _vlocal.VideoLocalID, _vlocal.ED2KHash, _vlocal.GetBestVideoLocalPlace()?.FileName); + return; + } + + // remap if the hash brought up the wrong file + var tempAniDBFile = RepoFactory.AniDB_File.GetByFileID(response.Response.FileID); + if (aniFile != null && tempAniDBFile != null && aniFile != tempAniDBFile) + { + RepoFactory.AniDB_File.Delete(aniFile); + aniFile = tempAniDBFile; + } + + // save to the database + aniFile ??= new SVR_AniDB_File(); + aniFile.Hash = _vlocal.Hash; + aniFile.FileSize = _vlocal.FileSize; + + aniFile.DateTimeUpdated = DateTime.Now; + aniFile.File_Description = response.Response.Description; + aniFile.File_Source = response.Response.Source.ToString(); + aniFile.FileID = response.Response.FileID; + aniFile.FileName = response.Response.Filename; + aniFile.GroupID = response.Response.GroupID ?? 0; + + aniFile.FileVersion = response.Response.Version; + aniFile.IsCensored = response.Response.Censored; + aniFile.IsDeprecated = response.Response.Deprecated; + aniFile.IsChaptered = response.Response.Chaptered; + aniFile.InternalVersion = 3; + + RepoFactory.AniDB_File.Save(aniFile, false); + await CreateLanguages(response.Response); + await CreateEpisodes(_vlocal.FileName, response.Response); + + var anime = RepoFactory.AniDB_Anime.GetByAnimeID(response.Response.AnimeID); + if (anime != null) + { + RepoFactory.AniDB_Anime.Save(anime, false); + } + + var series = RepoFactory.AnimeSeries.GetByAnimeID(response.Response.AnimeID); + series?.UpdateStats(true, true); + series?.AnimeGroup?.TopLevelAnimeGroup?.UpdateStatsFromTopLevel(true, true); + Result = RepoFactory.AniDB_File.GetByFileID(aniFile.FileID); + } + + private static async Task CreateLanguages(ResponseGetFile response) + { + using var session = DatabaseFactory.SessionFactory.OpenSession(); + // playing with async + await BaseRepository.Lock(session, async s => + { + using var trans = s.BeginTransaction(); + // Only update languages if we got a response + if (response?.AudioLanguages is not null) + { + // Delete old + var toDelete = RepoFactory.CrossRef_Languages_AniDB_File.GetByFileID(response.FileID); + await RepoFactory.CrossRef_Languages_AniDB_File.DeleteWithOpenTransactionAsync(s, toDelete); + + // Save new + var toSave = response.AudioLanguages.Select(language => language.Trim().ToLower()) + .Where(lang => lang.Length > 0) + .Select(lang => new CrossRef_Languages_AniDB_File + { + LanguageName = lang, FileID = response.FileID + }) + .ToList(); + await RepoFactory.CrossRef_Languages_AniDB_File.SaveWithOpenTransactionAsync(s, toSave); + } + + if (response?.SubtitleLanguages is not null) + { + // Delete old + var toDelete = RepoFactory.CrossRef_Subtitles_AniDB_File.GetByFileID(response.FileID); + await RepoFactory.CrossRef_Subtitles_AniDB_File.DeleteWithOpenTransactionAsync(s, toDelete); + + // Save new + var toSave = response.SubtitleLanguages.Select(language => language.Trim().ToLower()) + .Where(lang => lang.Length > 0) + .Select(lang => new CrossRef_Subtitles_AniDB_File + { + LanguageName = lang, FileID = response.FileID + }) + .ToList(); + await RepoFactory.CrossRef_Subtitles_AniDB_File.SaveWithOpenTransactionAsync(s, toSave); + } + + await trans.CommitAsync(); + }); + } + + private async Task CreateEpisodes(string filename, ResponseGetFile response) + { + if (response.EpisodeIDs.Count <= 0) return; + + var fileEps = RepoFactory.CrossRef_File_Episode.GetByHash(_vlocal.Hash); + + // Use a single session A. for efficiency and B. to prevent regenerating stats + + await BaseRepository.Lock(fileEps, async x => + { + using var session = DatabaseFactory.SessionFactory.OpenSession(); + using var trans = session.BeginTransaction(); + await RepoFactory.CrossRef_File_Episode.DeleteWithOpenTransactionAsync(session, x); + await trans.CommitAsync(); + }); + + fileEps = response.EpisodeIDs + .Select( + (ep, x) => new CrossRef_File_Episode + { + Hash = _vlocal.Hash, + CrossRefSource = (int)CrossRefSource.AniDB, + AnimeID = response.AnimeID, + EpisodeID = ep.EpisodeID, + Percentage = ep.Percentage, + EpisodeOrder = x + 1, + FileName = filename, + FileSize = _vlocal.FileSize + } + ) + .ToList(); + + if (response.OtherEpisodes.Count > 0) + { + Logger.LogInformation("Found {Count} additional episodes for file", response.OtherEpisodes.Count); + var epOrder = fileEps.Max(a => a.EpisodeOrder); + foreach (var episode in response.OtherEpisodes) + { + var epAnimeID = RepoFactory.AniDB_Episode.GetByEpisodeID(episode.EpisodeID)?.AnimeID; + if (epAnimeID == null) + { + Logger.LogInformation("Could not get AnimeID for episode {EpisodeID}, downloading more info", episode.EpisodeID); + var epRequest = _requestFactory.Create(r => r.EpisodeID = episode.EpisodeID); + try + { + var epResponse = epRequest.Execute(); + epAnimeID = epResponse.Response?.AnimeID; + } + catch (Exception e) + { + Logger.LogError(e, "Could not get Episode Info for {EpisodeID}", episode.EpisodeID); + } + } + + if (epAnimeID == null) continue; + + epOrder++; + fileEps.Add(new CrossRef_File_Episode + { + Hash = _vlocal.Hash, + CrossRefSource = (int)CrossRefSource.AniDB, + AnimeID = epAnimeID.Value, + EpisodeID = episode.EpisodeID, + Percentage = episode.Percentage, + EpisodeOrder = epOrder, + FileName = filename, + FileSize = _vlocal.FileSize + }); + } + } + + + // There is a chance that AniDB returned a dup, however unlikely + await BaseRepository.Lock(fileEps, async x => + { + using var session = DatabaseFactory.SessionFactory.OpenSession(); + using var trans = session.BeginTransaction(); + await RepoFactory.CrossRef_File_Episode.SaveWithOpenTransactionAsync(session, x.DistinctBy(a => (a.Hash, a.EpisodeID)).ToList()); + await trans.CommitAsync(); + }); + } + + public AniDBGetFileJob(ILoggerFactory loggerFactory, IUDPConnectionHandler handler, + IRequestFactory requestFactory) : base(loggerFactory) + { + _handler = handler; + _requestFactory = requestFactory; + } + + protected AniDBGetFileJob() + { + } +} diff --git a/Shoko.Server/Scheduling/Jobs/BaseJob.cs b/Shoko.Server/Scheduling/Jobs/BaseJob.cs index 89788ad19..ef65e52e6 100644 --- a/Shoko.Server/Scheduling/Jobs/BaseJob.cs +++ b/Shoko.Server/Scheduling/Jobs/BaseJob.cs @@ -1,3 +1,4 @@ +using System; using System.Threading.Tasks; using System.Xml.Serialization; using Microsoft.Extensions.Logging; @@ -12,7 +13,19 @@ namespace Shoko.Server.Scheduling.Jobs; /// public abstract class BaseJob : IJob { - public abstract Task Execute(IJobExecutionContext context); + public async Task Execute(IJobExecutionContext context) + { + try + { + await Process(context); + } + catch (Exception ex) + { + throw new JobExecutionException(msg: ex.Message, refireImmediately: false, cause: ex); + } + } + + protected abstract Task Process(IJobExecutionContext context); [XmlIgnore][JsonIgnore] protected readonly ILogger Logger; [XmlIgnore][JsonIgnore] public abstract QueueStateStruct Description { get; } diff --git a/Shoko.Server/Scheduling/Jobs/Shoko/DiscoverFileJob.cs b/Shoko.Server/Scheduling/Jobs/Shoko/DiscoverFileJob.cs index 6036f805e..447abba41 100644 --- a/Shoko.Server/Scheduling/Jobs/Shoko/DiscoverFileJob.cs +++ b/Shoko.Server/Scheduling/Jobs/Shoko/DiscoverFileJob.cs @@ -7,6 +7,7 @@ using QuartzJobFactory; using Shoko.Commons.Queue; using Shoko.Models.Queue; +using Shoko.Server.Databases; using Shoko.Server.Models; using Shoko.Server.Repositories; using Shoko.Server.Repositories.Cached; @@ -38,83 +39,73 @@ public DiscoverFileJob(ILoggerFactory loggerFactory, ISettingsProvider settingsP protected DiscoverFileJob() { } - public override async Task Execute(IJobExecutionContext context) + protected override async Task Process(IJobExecutionContext context) { - try + // The flow has changed. + // Check for previous existence, merge info if needed + // If it's a new file or info is missing, queue a hash + // HashFileJob will create the records for a new file, so don't save an empty record. + Logger.LogTrace("Checking File For Hashes: {Filename}", FileName); + + var (shouldSave, vlocal, vlocalplace) = GetVideoLocal(); + if (vlocal == null || vlocalplace == null) { - // The flow has changed. - // Check for previous existence, merge info if needed - // If it's a new file or info is missing, queue a hash - // HashFileJob will create the records for a new file, so don't save an empty record. - Logger.LogTrace("Checking File For Hashes: {Filename}", FileName); - - var (shouldSave, vlocal, vlocalplace) = GetVideoLocal(); - if (vlocal == null || vlocalplace == null) - { - Logger.LogTrace("Could not get or create VideoLocal. exiting"); - return; - } + Logger.LogTrace("Could not get or create VideoLocal. exiting"); + return; + } - var filename = vlocalplace.FileName; + var filename = vlocalplace.FileName; - Logger.LogTrace("No existing hash in VideoLocal (or forced), checking XRefs"); - if (vlocal.HasAnyEmptyHashes()) + Logger.LogTrace("No existing hash in VideoLocal (or forced), checking XRefs"); + if (vlocal.HasAnyEmptyHashes()) + { + // try getting the hash from the CrossRef + if (TrySetHashFromXrefs(filename, vlocal)) { - // try getting the hash from the CrossRef - if (TrySetHashFromXrefs(filename, vlocal)) - { - shouldSave = true; - Logger.LogTrace("Found Hash in CrossRef_File_Episode: {Hash}", vlocal.Hash); - } - else if (TrySetHashFromFileNameHash(filename, vlocal)) - { - shouldSave = true; - Logger.LogTrace("Found Hash in FileNameHash: {Hash}", vlocal.Hash); - } - - if (string.IsNullOrEmpty(vlocal.Hash) || string.IsNullOrEmpty(vlocal.CRC32) || string.IsNullOrEmpty(vlocal.MD5) || - string.IsNullOrEmpty(vlocal.SHA1)) - shouldSave |= FillHashesAgainstVideoLocalRepo(vlocal); + shouldSave = true; + Logger.LogTrace("Found Hash in CrossRef_File_Episode: {Hash}", vlocal.Hash); } - - var (needEd2k, needCRC32, needMD5, needSHA1) = ShouldHash(vlocal); - var shouldHash = needEd2k || needCRC32 || needMD5 || needSHA1; - if (!shouldHash && !shouldSave) + else if (TrySetHashFromFileNameHash(filename, vlocal)) { - Logger.LogTrace("Hashes were not necessary for file, so exiting: {File}, Hash: {Hash}", FileName, vlocal.Hash); - return; + shouldSave = true; + Logger.LogTrace("Found Hash in FileNameHash: {Hash}", vlocal.Hash); } - if (shouldSave) - { - Logger.LogTrace("Saving VideoLocal: Filename: {FileName}, Hash: {Hash}", FileName, vlocal.Hash); - RepoFactory.VideoLocal.Save(vlocal, true); - - Logger.LogTrace("Saving VideoLocal_Place: Path: {Path}", FileName); - vlocalplace.VideoLocalID = vlocal.VideoLocalID; - RepoFactory.VideoLocalPlace.Save(vlocalplace); - - var duplicateRemoved = ProcessDuplicates(vlocal, vlocalplace); - // it was removed. Don't try to hash - if (duplicateRemoved) return; - } + if (string.IsNullOrEmpty(vlocal.Hash) || string.IsNullOrEmpty(vlocal.CRC32) || string.IsNullOrEmpty(vlocal.MD5) || + string.IsNullOrEmpty(vlocal.SHA1)) + shouldSave |= FillHashesAgainstVideoLocalRepo(vlocal); + } - // TODO queue HashFileJob - if (shouldHash) - { - var scheduler = await _schedulerFactory.GetScheduler(); - await scheduler.StartJob(JobBuilder.Create().UsingJobData(a => - { - a.FileName = FileName; - a.SkipMyList = SkipMyList; - }).WithGeneratedIdentity().Build()); - } + var (needEd2k, needCRC32, needMD5, needSHA1) = ShouldHash(vlocal); + var shouldHash = needEd2k || needCRC32 || needMD5 || needSHA1; + if (!shouldHash && !shouldSave) + { + Logger.LogTrace("Hashes were not necessary for file, so exiting: {File}, Hash: {Hash}", FileName, vlocal.Hash); + return; + } + + if (shouldSave) + { + Logger.LogTrace("Saving VideoLocal: Filename: {FileName}, Hash: {Hash}", FileName, vlocal.Hash); + RepoFactory.VideoLocal.Save(vlocal, true); + + Logger.LogTrace("Saving VideoLocal_Place: Path: {Path}", FileName); + vlocalplace.VideoLocalID = vlocal.VideoLocalID; + RepoFactory.VideoLocalPlace.Save(vlocalplace); + + var duplicateRemoved = await ProcessDuplicates(vlocal, vlocalplace); + // it was removed. Don't try to hash + if (duplicateRemoved) return; } - catch (Exception ex) + + if (shouldHash) { - //logger.Error(ex, ex.ToString()); - // do you want the job to refire? - throw new JobExecutionException(msg: ex.Message, refireImmediately: false, cause: ex); + var scheduler = await _schedulerFactory.GetScheduler(); + await scheduler.StartJob(JobBuilder.Create().UsingJobData(a => + { + a.FileName = FileName; + a.SkipMyList = SkipMyList; + }).WithGeneratedIdentity().Build()); } } @@ -342,26 +333,23 @@ private static bool FillHashesAgainstVideoLocalRepo(SVR_VideoLocal v) /// /// /// true if the file was removed - private bool ProcessDuplicates(SVR_VideoLocal vlocal, SVR_VideoLocal_Place vlocalplace) + private async Task ProcessDuplicates(SVR_VideoLocal vlocal, SVR_VideoLocal_Place vlocalplace) { if (vlocal == null) return false; // If the VideoLocalID == 0, then it's a new file that wasn't merged after hashing, so it can't be a dupe if (vlocal.VideoLocalID == 0) return false; - var preps = vlocal.Places.Where(a => !vlocalplace.FullServerPath.Equals(a.FullServerPath)).ToList(); - foreach (var prep in preps) + var preps = vlocal.Places.Where(a => { - if (prep == null) continue; - - // clean up, if there is a 'duplicate file' that is invalid, remove it. - if (prep.FullServerPath == null) - { - RepoFactory.VideoLocalPlace.Delete(prep); - continue; - } - - if (File.Exists(prep.FullServerPath)) continue; - RepoFactory.VideoLocalPlace.Delete(prep); + if (vlocalplace.FullServerPath.Equals(a.FullServerPath)) return false; + if (a.FullServerPath == null) return true; + return !File.Exists(a.FullServerPath); + }).ToList(); + using (var session = DatabaseFactory.SessionFactory.OpenSession()) + { + using var transaction = session.BeginTransaction(); + await RepoFactory.VideoLocalPlace.DeleteWithOpenTransactionAsync(session, preps); + await transaction.CommitAsync(); } var dupPlace = vlocal.Places.FirstOrDefault(a => !vlocalplace.FullServerPath.Equals(a.FullServerPath)); diff --git a/Shoko.Server/Scheduling/Jobs/Shoko/HashFileJob.cs b/Shoko.Server/Scheduling/Jobs/Shoko/HashFileJob.cs index 246f22a7f..d5b8075ed 100644 --- a/Shoko.Server/Scheduling/Jobs/Shoko/HashFileJob.cs +++ b/Shoko.Server/Scheduling/Jobs/Shoko/HashFileJob.cs @@ -10,10 +10,12 @@ using Shoko.Models.Queue; using Shoko.Models.Server; using Shoko.Server.Commands; +using Shoko.Server.Databases; using Shoko.Server.FileHelper; using Shoko.Server.Models; using Shoko.Server.Repositories; using Shoko.Server.Repositories.Cached; +using Shoko.Server.Scheduling.Concurrency; using Shoko.Server.Server; using Shoko.Server.Settings; using Shoko.Server.Utilities; @@ -46,87 +48,78 @@ public HashFileJob(ILoggerFactory loggerFactory, ISettingsProvider settingsProvi protected HashFileJob() { } - public override async Task Execute(IJobExecutionContext context) + protected override async Task Process(IJobExecutionContext context) { - try + var (shouldSave, vlocal, vlocalplace, folder) = GetVideoLocal(); + Exception e = null; + var filename = vlocalplace.FileName; + var fileSize = GetFileInfo(folder, ref e); + if (fileSize == 0 && e != null) { - var (shouldSave, vlocal, vlocalplace, folder) = GetVideoLocal(); - Exception e = null; - var filename = vlocalplace.FileName; - var fileSize = GetFileInfo(folder, ref e); - if (fileSize == 0 && e != null) - { - Logger.LogError(e, "Could not access file. Exiting"); - return; - } - - var (newFile, needEd2K, needCRC32, needMD5, needSHA1) = ShouldHash(vlocal); - if (!needEd2K && !ForceHash) return; - shouldSave |= !newFile; - FillMissingHashes(vlocal, needMD5, needSHA1, needCRC32, ForceHash); - - // We should have a hash by now - // before we save it, lets make sure there is not any other record with this hash (possible duplicate file) - // TODO change this back to lookup by hash and filesize, but it'll need database migration and changes to other lookups - var tlocal = RepoFactory.VideoLocal.GetByHash(vlocal.Hash); + Logger.LogError(e, "Could not access file. Exiting"); + return; + } - if (tlocal != null) - { - Logger.LogTrace("Found existing VideoLocal with hash, merging info from it"); - // Merge hashes and save, regardless of duplicate file - shouldSave |= MergeInfoFrom(tlocal, vlocal); - vlocal = tlocal; - } + var (newFile, needEd2K, needCRC32, needMD5, needSHA1) = ShouldHash(vlocal); + if (!needEd2K && !ForceHash) return; + shouldSave |= !newFile; + FillMissingHashes(vlocal, needMD5, needSHA1, needCRC32, ForceHash); - if (shouldSave) - { - Logger.LogTrace("Saving VideoLocal: Filename: {FileName}, Hash: {Hash}", FileName, vlocal.Hash); - RepoFactory.VideoLocal.Save(vlocal, true); - } + // We should have a hash by now + // before we save it, lets make sure there is not any other record with this hash (possible duplicate file) + // TODO change this back to lookup by hash and filesize, but it'll need database migration and changes to other lookups + var tlocal = RepoFactory.VideoLocal.GetByHash(vlocal.Hash); - vlocalplace.VideoLocalID = vlocal.VideoLocalID; - RepoFactory.VideoLocalPlace.Save(vlocalplace); + if (tlocal != null) + { + Logger.LogTrace("Found existing VideoLocal with hash, merging info from it"); + // Merge hashes and save, regardless of duplicate file + shouldSave |= MergeInfoFrom(tlocal, vlocal); + vlocal = tlocal; + } - var duplicate = ProcessDuplicates(vlocal, vlocalplace); - if (duplicate) - { - var crProcfile3 = _commandFactory.Create( - c => - { - c.VideoLocalID = vlocal.VideoLocalID; - c.ForceAniDB = false; - } - ); - _commandFactory.Save(crProcfile3); - return; - } + if (shouldSave) + { + Logger.LogTrace("Saving VideoLocal: Filename: {FileName}, Hash: {Hash}", FileName, vlocal.Hash); + RepoFactory.VideoLocal.Save(vlocal, true); + } - SaveFileNameHash(filename, vlocal); + vlocalplace.VideoLocalID = vlocal.VideoLocalID; + RepoFactory.VideoLocalPlace.Save(vlocalplace); - if ((vlocal.Media?.GeneralStream?.Duration ?? 0) == 0 || vlocal.MediaVersion < SVR_VideoLocal.MEDIA_VERSION) - { - if (vlocalplace.RefreshMediaInfo()) + var duplicate = await ProcessDuplicates(vlocal, vlocalplace); + if (duplicate) + { + var crProcfile3 = _commandFactory.Create( + c => { - RepoFactory.VideoLocal.Save(vlocalplace.VideoLocal, true); + c.VideoLocalID = vlocal.VideoLocalID; + c.ForceAniDB = false; } - } + ); + _commandFactory.Save(crProcfile3); + return; + } - ShokoEventHandler.Instance.OnFileHashed(folder, vlocalplace); + SaveFileNameHash(filename, vlocal); - // now add a command to process the file - _commandFactory.CreateAndSave(c => - { - c.VideoLocalID = vlocal.VideoLocalID; - c.ForceAniDB = ForceHash; - c.SkipMyList = SkipMyList; - }); - } - catch (Exception ex) + if ((vlocal.Media?.GeneralStream?.Duration ?? 0) == 0 || vlocal.MediaVersion < SVR_VideoLocal.MEDIA_VERSION) { - //logger.Error(ex, ex.ToString()); - // do you want the job to refire? - throw new JobExecutionException(msg: ex.Message, refireImmediately: false, cause: ex); + if (vlocalplace.RefreshMediaInfo()) + { + RepoFactory.VideoLocal.Save(vlocalplace.VideoLocal, true); + } } + + ShokoEventHandler.Instance.OnFileHashed(folder, vlocalplace); + + // now add a command to process the file + _commandFactory.CreateAndSave(c => + { + c.VideoLocalID = vlocal.VideoLocalID; + c.ForceAniDB = ForceHash; + c.SkipMyList = SkipMyList; + }); } private (bool existing, SVR_VideoLocal, SVR_VideoLocal_Place, SVR_ImportFolder) GetVideoLocal() @@ -456,26 +449,23 @@ private static bool MergeInfoFrom(SVR_VideoLocal target, SVR_VideoLocal source) return changed; } - private bool ProcessDuplicates(SVR_VideoLocal vlocal, SVR_VideoLocal_Place vlocalplace) + private async Task ProcessDuplicates(SVR_VideoLocal vlocal, SVR_VideoLocal_Place vlocalplace) { if (vlocal == null) return false; // If the VideoLocalID == 0, then it's a new file that wasn't merged after hashing, so it can't be a dupe if (vlocal.VideoLocalID == 0) return false; - var preps = vlocal.Places.Where(a => !vlocalplace.FullServerPath.Equals(a.FullServerPath)).ToList(); - foreach (var prep in preps) + var preps = vlocal.Places.Where(a => { - if (prep == null) continue; - - // clean up, if there is a 'duplicate file' that is invalid, remove it. - if (prep.FullServerPath == null) - { - RepoFactory.VideoLocalPlace.Delete(prep); - continue; - } - - if (File.Exists(prep.FullServerPath)) continue; - RepoFactory.VideoLocalPlace.Delete(prep); + if (vlocalplace.FullServerPath.Equals(a.FullServerPath)) return false; + if (a.FullServerPath == null) return true; + return !File.Exists(a.FullServerPath); + }).ToList(); + using (var session = DatabaseFactory.SessionFactory.OpenSession()) + { + using var transaction = session.BeginTransaction(); + await RepoFactory.VideoLocalPlace.DeleteWithOpenTransactionAsync(session, preps); + await transaction.CommitAsync(); } var dupPlace = vlocal.Places.FirstOrDefault(a => !vlocalplace.FullServerPath.Equals(a.FullServerPath)); diff --git a/Shoko.Server/Scheduling/ThreadPooledJobStore.cs b/Shoko.Server/Scheduling/ThreadPooledJobStore.cs index 0ba577520..efa0a789f 100644 --- a/Shoko.Server/Scheduling/ThreadPooledJobStore.cs +++ b/Shoko.Server/Scheduling/ThreadPooledJobStore.cs @@ -9,6 +9,7 @@ using Quartz.Impl.AdoJobStore; using Quartz.Spi; using Quartz.Util; +using Shoko.Server.Scheduling.Concurrency; using Shoko.Server.Scheduling.Delegates; using Shoko.Server.Utilities; @@ -169,22 +170,27 @@ private Type[] GetTypesToExclude() private bool JobAllowed(TriggerAcquisitionContext context) { var jobType = context.CurrentJobType; - var acquiredJobTypesWithLimitedConcurrency = context.AcquiredJobTypesWithLimitedConcurrency; + var acquiredJobTypesWithLimitedConcurrency = context.AcquiredJobsWithLimitedConcurrency; if (ObjectUtils.IsAttributePresent(jobType, typeof(DisallowConcurrentExecutionAttribute))) { - if (acquiredJobTypesWithLimitedConcurrency.TryGetValue(jobType, out var number) && number >= 1) return false; - acquiredJobTypesWithLimitedConcurrency[jobType] = number + 1; + if (acquiredJobTypesWithLimitedConcurrency.TryGetValue(jobType.Name, out var number) && number >= 1) return false; + acquiredJobTypesWithLimitedConcurrency[jobType.Name] = number + 1; } - else if (jobType.GetCustomAttributes().FirstOrDefault(a => a is LimitConcurrencyAttribute) is LimitConcurrencyAttribute attribute) + else if (jobType.GetCustomAttributes().FirstOrDefault(a => a is DisallowConcurrencyGroupAttribute) is DisallowConcurrencyGroupAttribute concurrencyAttribute) { - if (!_typeConcurrencyCache.TryGetValue(jobType, out var maxConcurrentJobs)) maxConcurrentJobs = attribute.MaxConcurrentJobs; - if (acquiredJobTypesWithLimitedConcurrency.TryGetValue(jobType, out var number) && number >= maxConcurrentJobs) return false; - acquiredJobTypesWithLimitedConcurrency[jobType] = number + 1; + if (acquiredJobTypesWithLimitedConcurrency.TryGetValue(concurrencyAttribute.Group, out var number)) return false; + acquiredJobTypesWithLimitedConcurrency[concurrencyAttribute.Group] = number + 1; } else if (_typeConcurrencyCache.TryGetValue(jobType, out var maxJobs) && maxJobs > 0) { - if (acquiredJobTypesWithLimitedConcurrency.TryGetValue(jobType, out var number) && number >= maxJobs) return false; - acquiredJobTypesWithLimitedConcurrency[jobType] = number + 1; + if (acquiredJobTypesWithLimitedConcurrency.TryGetValue(jobType.Name, out var number) && number >= maxJobs) return false; + acquiredJobTypesWithLimitedConcurrency[jobType.Name] = number + 1; + } + else if (jobType.GetCustomAttributes().FirstOrDefault(a => a is LimitConcurrencyAttribute) is LimitConcurrencyAttribute limitConcurrencyAttribute) + { + if (!_typeConcurrencyCache.TryGetValue(jobType, out var maxConcurrentJobs)) maxConcurrentJobs = limitConcurrencyAttribute.MaxConcurrentJobs; + if (acquiredJobTypesWithLimitedConcurrency.TryGetValue(jobType.Name, out var number) && number >= maxConcurrentJobs) return false; + acquiredJobTypesWithLimitedConcurrency[jobType.Name] = number + 1; } return true; @@ -211,7 +217,7 @@ private void InitConcurrencyCache() if (type == null) continue; var value = kv.Value; var attribute = type.GetCustomAttribute(); - if (attribute != null && attribute.MaxAllowedConcurrentJobs < kv.Value) value = attribute.MaxAllowedConcurrentJobs; + if (attribute is { MaxAllowedConcurrentJobs: > 0 } && attribute.MaxAllowedConcurrentJobs < kv.Value) value = attribute.MaxAllowedConcurrentJobs; _typeConcurrencyCache[type] = value; } } @@ -220,7 +226,20 @@ private class TriggerAcquisitionContext { public readonly int MaxDoLoopRetry = 3; public int CurrentLoopCount { get; set; } - public Dictionary AcquiredJobTypesWithLimitedConcurrency { get; }= new(); + public Dictionary AcquiredJobsWithLimitedConcurrency { get; } = new(); public Type CurrentJobType { get; set; } } + + protected override async Task TriggerFired(ConnectionAndTransactionHolder conn, IOperableTrigger trigger, CancellationToken cancellationToken = new CancellationToken()) + { + // notify some sort of callback listener for UI to add a job as running + return await base.TriggerFired(conn, trigger, cancellationToken); + } + + protected override async Task TriggeredJobComplete(ConnectionAndTransactionHolder conn, IOperableTrigger trigger, IJobDetail jobDetail, SchedulerInstruction triggerInstCode, + CancellationToken cancellationToken = new CancellationToken()) + { + // notify some sort of callback listener for UI to set a job as finished + await base.TriggeredJobComplete(conn, trigger, jobDetail, triggerInstCode, cancellationToken); + } }