Skip to content

Commit

Permalink
More work on scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
da3dsoul committed Jan 10, 2024
1 parent 014f5d6 commit 46f56a5
Show file tree
Hide file tree
Showing 9 changed files with 535 additions and 174 deletions.
47 changes: 47 additions & 0 deletions Shoko.Server/Repositories/BaseCachedRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NHibernate;
using NutzCode.InMemoryIndex;
using Shoko.Commons.Properties;
Expand Down Expand Up @@ -166,6 +167,16 @@ public virtual void DeleteWithOpenTransaction(ISession session, T cr)
WriteLock(() => DeleteFromCacheUnsafe(cr));
}

public virtual async Task DeleteWithOpenTransactionAsync(ISession session, T cr)
{
if (cr == null) return;

DeleteWithOpenTransactionCallback?.Invoke(session, cr);
await Lock(async () => await session.DeleteAsync(cr));

WriteLock(() => DeleteFromCacheUnsafe(cr));
}

//This function do not run the BeginDeleteCallback and the EndDeleteCallback
public void DeleteWithOpenTransaction(ISession session, List<T> objs)
{
Expand All @@ -182,6 +193,19 @@ public void DeleteWithOpenTransaction(ISession session, List<T> objs)

WriteLock(() => objs.ForEach(DeleteFromCacheUnsafe));
}

public async Task DeleteWithOpenTransactionAsync(ISession session, List<T> objs)
{
if (objs.Count == 0) return;

foreach (var cr in objs)
{
DeleteWithOpenTransactionCallback?.Invoke(session, cr);
await Lock(async () => await session.DeleteAsync(cr));
}

WriteLock(() => objs.ForEach(DeleteFromCacheUnsafe));
}

public virtual void Save(T obj)
{
Expand Down Expand Up @@ -304,6 +328,29 @@ public void SaveWithOpenTransaction(ISession session, List<T> objs)
WriteLock(() => UpdateCacheUnsafe(obj));
}
}

public async Task SaveWithOpenTransactionAsync(ISession session, List<T> objs)
{
if (objs.Count == 0) return;

await Lock(async () =>
{
foreach (var obj in objs)
{
await session.SaveOrUpdateAsync(obj);
}
});

foreach (var obj in objs)
{
SaveWithOpenTransactionCallback?.Invoke(session.Wrap(), obj);
}

foreach (var obj in objs)
{
WriteLock(() => UpdateCacheUnsafe(obj));
}
}

protected void ReadLock(Action action)
{
Expand Down
7 changes: 7 additions & 0 deletions Shoko.Server/Scheduling/Concurrency/ConcurrencyGroups.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Shoko.Server.Scheduling.Concurrency;

public static class ConcurrencyGroups
{
public const string AniDB_UDP = "AniDB_UDP";
public const string AniDB_HTTP = "AniDB_HTTP";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;

namespace Shoko.Server.Scheduling.Concurrency;

[AttributeUsage(AttributeTargets.Class)]
public class DisallowConcurrencyGroupAttribute : Attribute
{
/// <summary>
/// The group to consider concurrency with. More than one Job in the same group will be disallowed from running concurrently
/// </summary>
public string Group { get; set; }

public DisallowConcurrencyGroupAttribute(string group = null)
{
Group = group;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System;

namespace Shoko.Server.Scheduling;
namespace Shoko.Server.Scheduling.Concurrency;

public class LimitConcurrencyAttribute : Attribute
{
Expand All @@ -14,8 +14,9 @@ public class LimitConcurrencyAttribute : Attribute
/// </summary>
public int MaxAllowedConcurrentJobs { get; set; }

public LimitConcurrencyAttribute(int maxConcurrentJobs = 4)
public LimitConcurrencyAttribute(int maxConcurrentJobs = 4, int maxAllowedConcurrentJobs = 0)
{
MaxConcurrentJobs = maxConcurrentJobs;
MaxAllowedConcurrentJobs = maxAllowedConcurrentJobs;
}
}
279 changes: 279 additions & 0 deletions Shoko.Server/Scheduling/Jobs/AniDB/AniDBGetFileJob.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
using System;
using System.Linq;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Quartz;
using Shoko.Commons.Queue;
using Shoko.Models.Enums;
using Shoko.Models.Queue;
using Shoko.Models.Server;
using Shoko.Server.Commands.Attributes;
using Shoko.Server.Databases;
using Shoko.Server.Models;
using Shoko.Server.Providers.AniDB;
using Shoko.Server.Providers.AniDB.Interfaces;
using Shoko.Server.Providers.AniDB.UDP.Generic;
using Shoko.Server.Providers.AniDB.UDP.Info;
using Shoko.Server.Repositories;
using Shoko.Server.Scheduling.Concurrency;
using Shoko.Server.Server;

namespace Shoko.Server.Scheduling.Jobs.AniDB;

[DisallowConcurrencyGroup(ConcurrencyGroups.AniDB_UDP)]
[Command(CommandRequestType.AniDB_GetFileUDP)]
public class AniDBGetFileJob : BaseJob
{
private readonly IUDPConnectionHandler _handler;
private readonly IRequestFactory _requestFactory;
private SVR_VideoLocal _vlocal;

public virtual int VideoLocalID { get; set; }
public virtual bool ForceAniDB { get; set; }
[JsonIgnore] public virtual SVR_AniDB_File Result { get; set; }

public override QueueStateStruct Description
{
get
{
if (_vlocal != null)
{
return new QueueStateStruct
{
message = "Getting file info from UDP API: {0}",
queueState = QueueStateEnum.GetFileInfo,
extraParams = new[] { _vlocal.FileName }
};
}

return new QueueStateStruct
{
message = "Getting file info from UDP API: {0}",
queueState = QueueStateEnum.GetFileInfo,
extraParams = new[] { VideoLocalID.ToString() }
};
}
}

protected override async Task Process(IJobExecutionContext context)
{
Logger.LogInformation("Get AniDB file info: {VideoLocalID}", VideoLocalID);

if (_handler.IsBanned)
{
throw new AniDBBannedException
{
BanType = UpdateType.UDPBan, BanExpires = _handler.BanTime?.AddHours(_handler.BanTimerResetLength)
};
}

_vlocal ??= RepoFactory.VideoLocal.GetByID(VideoLocalID);
if (_vlocal == null)
{
return;
}

var aniFile = RepoFactory.AniDB_File.GetByHashAndFileSize(_vlocal.Hash, _vlocal.FileSize);

UDPResponse<ResponseGetFile> response = null;
if (aniFile == null || ForceAniDB)
{
var request = _requestFactory.Create<RequestGetFile>(
r =>
{
r.Hash = _vlocal.Hash;
r.Size = _vlocal.FileSize;
}
);
response = request.Execute();
}

if (response?.Response == null)
{
Logger.LogInformation("File {VideoLocalID} ({Ed2kHash} | {FileName}) could not be found on AniDB",
_vlocal.VideoLocalID, _vlocal.ED2KHash, _vlocal.GetBestVideoLocalPlace()?.FileName);
return;
}

// remap if the hash brought up the wrong file
var tempAniDBFile = RepoFactory.AniDB_File.GetByFileID(response.Response.FileID);
if (aniFile != null && tempAniDBFile != null && aniFile != tempAniDBFile)
{
RepoFactory.AniDB_File.Delete(aniFile);
aniFile = tempAniDBFile;
}

// save to the database
aniFile ??= new SVR_AniDB_File();
aniFile.Hash = _vlocal.Hash;
aniFile.FileSize = _vlocal.FileSize;

aniFile.DateTimeUpdated = DateTime.Now;
aniFile.File_Description = response.Response.Description;
aniFile.File_Source = response.Response.Source.ToString();
aniFile.FileID = response.Response.FileID;
aniFile.FileName = response.Response.Filename;
aniFile.GroupID = response.Response.GroupID ?? 0;

aniFile.FileVersion = response.Response.Version;
aniFile.IsCensored = response.Response.Censored;
aniFile.IsDeprecated = response.Response.Deprecated;
aniFile.IsChaptered = response.Response.Chaptered;
aniFile.InternalVersion = 3;

RepoFactory.AniDB_File.Save(aniFile, false);
await CreateLanguages(response.Response);
await CreateEpisodes(_vlocal.FileName, response.Response);

var anime = RepoFactory.AniDB_Anime.GetByAnimeID(response.Response.AnimeID);
if (anime != null)
{
RepoFactory.AniDB_Anime.Save(anime, false);
}

var series = RepoFactory.AnimeSeries.GetByAnimeID(response.Response.AnimeID);
series?.UpdateStats(true, true);
series?.AnimeGroup?.TopLevelAnimeGroup?.UpdateStatsFromTopLevel(true, true);
Result = RepoFactory.AniDB_File.GetByFileID(aniFile.FileID);
}

private static async Task CreateLanguages(ResponseGetFile response)
{
using var session = DatabaseFactory.SessionFactory.OpenSession();
// playing with async
await BaseRepository.Lock(session, async s =>
{
using var trans = s.BeginTransaction();
// Only update languages if we got a response
if (response?.AudioLanguages is not null)
{
// Delete old
var toDelete = RepoFactory.CrossRef_Languages_AniDB_File.GetByFileID(response.FileID);
await RepoFactory.CrossRef_Languages_AniDB_File.DeleteWithOpenTransactionAsync(s, toDelete);

// Save new
var toSave = response.AudioLanguages.Select(language => language.Trim().ToLower())
.Where(lang => lang.Length > 0)
.Select(lang => new CrossRef_Languages_AniDB_File
{
LanguageName = lang, FileID = response.FileID
})
.ToList();
await RepoFactory.CrossRef_Languages_AniDB_File.SaveWithOpenTransactionAsync(s, toSave);
}

if (response?.SubtitleLanguages is not null)
{
// Delete old
var toDelete = RepoFactory.CrossRef_Subtitles_AniDB_File.GetByFileID(response.FileID);
await RepoFactory.CrossRef_Subtitles_AniDB_File.DeleteWithOpenTransactionAsync(s, toDelete);

// Save new
var toSave = response.SubtitleLanguages.Select(language => language.Trim().ToLower())
.Where(lang => lang.Length > 0)
.Select(lang => new CrossRef_Subtitles_AniDB_File
{
LanguageName = lang, FileID = response.FileID
})
.ToList();
await RepoFactory.CrossRef_Subtitles_AniDB_File.SaveWithOpenTransactionAsync(s, toSave);
}

await trans.CommitAsync();
});
}

private async Task CreateEpisodes(string filename, ResponseGetFile response)
{
if (response.EpisodeIDs.Count <= 0) return;

var fileEps = RepoFactory.CrossRef_File_Episode.GetByHash(_vlocal.Hash);

// Use a single session A. for efficiency and B. to prevent regenerating stats

await BaseRepository.Lock(fileEps, async x =>
{
using var session = DatabaseFactory.SessionFactory.OpenSession();
using var trans = session.BeginTransaction();
await RepoFactory.CrossRef_File_Episode.DeleteWithOpenTransactionAsync(session, x);
await trans.CommitAsync();
});

fileEps = response.EpisodeIDs
.Select(
(ep, x) => new CrossRef_File_Episode
{
Hash = _vlocal.Hash,
CrossRefSource = (int)CrossRefSource.AniDB,
AnimeID = response.AnimeID,
EpisodeID = ep.EpisodeID,
Percentage = ep.Percentage,
EpisodeOrder = x + 1,
FileName = filename,
FileSize = _vlocal.FileSize
}
)
.ToList();

if (response.OtherEpisodes.Count > 0)
{
Logger.LogInformation("Found {Count} additional episodes for file", response.OtherEpisodes.Count);
var epOrder = fileEps.Max(a => a.EpisodeOrder);
foreach (var episode in response.OtherEpisodes)
{
var epAnimeID = RepoFactory.AniDB_Episode.GetByEpisodeID(episode.EpisodeID)?.AnimeID;
if (epAnimeID == null)
{
Logger.LogInformation("Could not get AnimeID for episode {EpisodeID}, downloading more info", episode.EpisodeID);
var epRequest = _requestFactory.Create<RequestGetEpisode>(r => r.EpisodeID = episode.EpisodeID);
try
{
var epResponse = epRequest.Execute();
epAnimeID = epResponse.Response?.AnimeID;
}
catch (Exception e)
{
Logger.LogError(e, "Could not get Episode Info for {EpisodeID}", episode.EpisodeID);
}
}

if (epAnimeID == null) continue;

epOrder++;
fileEps.Add(new CrossRef_File_Episode
{
Hash = _vlocal.Hash,
CrossRefSource = (int)CrossRefSource.AniDB,
AnimeID = epAnimeID.Value,
EpisodeID = episode.EpisodeID,
Percentage = episode.Percentage,
EpisodeOrder = epOrder,
FileName = filename,
FileSize = _vlocal.FileSize
});
}
}


// There is a chance that AniDB returned a dup, however unlikely
await BaseRepository.Lock(fileEps, async x =>
{
using var session = DatabaseFactory.SessionFactory.OpenSession();
using var trans = session.BeginTransaction();
await RepoFactory.CrossRef_File_Episode.SaveWithOpenTransactionAsync(session, x.DistinctBy(a => (a.Hash, a.EpisodeID)).ToList());
await trans.CommitAsync();
});
}

public AniDBGetFileJob(ILoggerFactory loggerFactory, IUDPConnectionHandler handler,
IRequestFactory requestFactory) : base(loggerFactory)
{
_handler = handler;
_requestFactory = requestFactory;
}

protected AniDBGetFileJob()
{
}
}
Loading

0 comments on commit 46f56a5

Please sign in to comment.