Skip to content

Commit

Permalink
* fix not invoking the action returned by `ReplyContentImageSaver.Sav…
Browse files Browse the repository at this point in the history
…e()` to release its locked images, by manually dispose it and recreate another instance of saver after `DbContext.SaveChanges()` @ `DoWork()`

* rename primary ctor param `replyContentImageSaver` to `replyContentImageSaverFactory as its original type now being wrapped in `Func<Owned<>>`
@ ProcessImagesInAllReplyContentsWorker.cs

* instead of throwing `InvalidOperationException, logging about the failure when removing global locked image
@ `ReplyContentImageSaver.Dispose()`
@ crawler

+ param `afterSaveChangesAction` @ `TransformEntityWorker.Transform()`
@ shared
@ c#
  • Loading branch information
n0099 committed Jun 12, 2024
1 parent 55c529f commit 9d319a9
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 10 deletions.
9 changes: 6 additions & 3 deletions c#/crawler/src/Tieba/Crawl/Saver/ReplyContentImageSaver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ public void Dispose()
{
try
{
if (_newlyLockedImages != null && _newlyLockedImages.Any(pair =>
!GlobalLockedImagesInReplyKeyByUrlFilename.TryRemove(pair)))
throw new InvalidOperationException();
_newlyLockedImages?.ForEach(pair =>
{
if (!GlobalLockedImagesInReplyKeyByUrlFilename.TryRemove(pair))
logger.LogError("Previously locked image {} already removed from the global locks",
SharedHelper.UnescapedJsonSerialize(pair));
});
}
finally
{
Expand Down
21 changes: 14 additions & 7 deletions c#/crawler/src/Worker/ProcessImagesInAllReplyContentsWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ public class ProcessImagesInAllReplyContentsWorker(
IConfiguration config,
Func<Owned<CrawlerDbContext.NewDefault>> dbContextDefaultFactory,
Func<Owned<CrawlerDbContext.New>> dbContextFactory,
ReplyContentImageSaver replyContentImageSaver)
Func<Owned<ReplyContentImageSaver>> replyContentImageSaverFactory)
: TransformEntityWorker<CrawlerDbContext, ReplyContent, ReplyContent, Pid>(logger)
{
protected override async Task DoWork(CancellationToken stoppingToken)
Expand All @@ -21,6 +21,7 @@ protected override async Task DoWork(CancellationToken stoppingToken)
{
logger.LogInformation("Simplify images in reply contents of fid {} started", fid);
var replyContentsKeyByPid = new Dictionary<Pid, RepeatedField<Content>>(saveWritingEntitiesBatchSize);
var replyContentImageSaver = replyContentImageSaverFactory();
await using var dbFactory = dbContextFactory();
await Transform(
() => dbFactory.Value(fid),
Expand Down Expand Up @@ -53,13 +54,19 @@ await Transform(
var p = ee.Property(e => e.ProtoBufBytes);
p.IsModified = !ByteArrayEqualityComparer.Instance.Equals(p.OriginalValue, p.CurrentValue);
});
replyContentImageSaver.Save(writingDb, replyContentsKeyByPid.Select(pair => new ReplyPost
{
Pid = pair.Key,
Content = null!,
ContentsProtoBuf = pair.Value
}));
_ = replyContentImageSaver.Value.Save(writingDb,
replyContentsKeyByPid.Select(pair => new ReplyPost
{
Pid = pair.Key,
Content = null!,
ContentsProtoBuf = pair.Value
}));
},
() =>
{
replyContentsKeyByPid.Clear();
replyContentImageSaver.Dispose();
replyContentImageSaver = replyContentImageSaverFactory();
},
stoppingToken);
logger.LogInformation("Simplify images in reply contents of fid {} finished after {:F2}s",
Expand Down
2 changes: 2 additions & 0 deletions c#/shared/src/TransformEntityWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ protected async Task Transform(
Func<TReadingEntity, TWritingEntity> writingEntityFactory,
Action<TReadingEntity, TWritingEntity> writingEntityMutator,
Action<TDbContext, IReadOnlyCollection<EntityEntry<TWritingEntity>>> writingEntityEntriesAction,
Action afterSaveChangesAction,
CancellationToken stoppingToken = default)
{
var processedEntityCount = 0;
Expand All @@ -52,6 +53,7 @@ async Task SaveThenLog(int processedCount, Process currentProcess)
var updatedEntityCount = await writingDb.SaveChangesAsync(stoppingToken);
writingDb.ChangeTracker.Clear();
writingEntityEntries.Clear();
afterSaveChangesAction();

logger.LogTrace("processedEntityCount:{} updatedEntityCount:{} elapsed:{}ms processMemory:{:F2}MiB exceptions:{}",
processedCount, updatedEntityCount,
Expand Down

0 comments on commit 9d319a9

Please sign in to comment.