From 0394e7086bcff81d862ba4ce71675ae987ba9e19 Mon Sep 17 00:00:00 2001 From: bezzad Date: Sun, 17 Dec 2023 14:27:43 +0330 Subject: [PATCH] added logger to streams and packages --- .../UnitTests/ChunkDownloaderOnMemoryTest.cs | 2 +- .../UnitTests/StorageTestOnMemory.cs | 2 +- src/Downloader/ConcurrentPacketBuffer.cs | 6 +++--- src/Downloader/ConcurrentStream.cs | 16 +++++++++------- src/Downloader/DownloadPackage.cs | 9 +++++---- src/Downloader/TaskStateManagement.cs | 13 +++++++++++-- 6 files changed, 30 insertions(+), 18 deletions(-) diff --git a/src/Downloader.Test/UnitTests/ChunkDownloaderOnMemoryTest.cs b/src/Downloader.Test/UnitTests/ChunkDownloaderOnMemoryTest.cs index 402487a..07be0a8 100644 --- a/src/Downloader.Test/UnitTests/ChunkDownloaderOnMemoryTest.cs +++ b/src/Downloader.Test/UnitTests/ChunkDownloaderOnMemoryTest.cs @@ -12,6 +12,6 @@ public ChunkDownloaderOnMemoryTest() MinimumSizeOfChunking = 16, Timeout = 100, }; - Storage = new ConcurrentStream(); + Storage = new ConcurrentStream(null); } } diff --git a/src/Downloader.Test/UnitTests/StorageTestOnMemory.cs b/src/Downloader.Test/UnitTests/StorageTestOnMemory.cs index 65d0420..aff69d0 100644 --- a/src/Downloader.Test/UnitTests/StorageTestOnMemory.cs +++ b/src/Downloader.Test/UnitTests/StorageTestOnMemory.cs @@ -7,7 +7,7 @@ public class StorageTestOnMemory : StorageTest { protected override void CreateStorage(int initialSize) { - Storage = new ConcurrentStream(); + Storage = new ConcurrentStream(null); } [Fact] diff --git a/src/Downloader/ConcurrentPacketBuffer.cs b/src/Downloader/ConcurrentPacketBuffer.cs index ac5ec15..17a75b3 100644 --- a/src/Downloader/ConcurrentPacketBuffer.cs +++ b/src/Downloader/ConcurrentPacketBuffer.cs @@ -103,7 +103,7 @@ private void StopAddingIfLimitationExceeded(long packetSize) { if (BufferSize < packetSize * Count) { - _logger.LogDebug($"ConcurrentPacketBuffer: Stop writing packets to the queue on size {packetSize * Count}bytes until the memory is free"); + _logger?.LogDebug($"ConcurrentPacketBuffer: Stop writing packets to the queue on size {packetSize * Count}bytes until the memory is free"); StopAdding(); } } @@ -124,13 +124,13 @@ public async Task WaitToComplete() public void StopAdding() { - _logger.LogDebug("ConcurrentPacketBuffer: stop writing new items to the list by blocking writer threads"); + _logger?.LogDebug("ConcurrentPacketBuffer: stop writing new items to the list by blocking writer threads"); _addingBlocker.Pause(); } public void ResumeAdding() { - _logger.LogDebug("ConcurrentPacketBuffer: resume writing new item to the list"); + _logger?.LogDebug("ConcurrentPacketBuffer: resume writing new item to the list"); _addingBlocker.Resume(); } diff --git a/src/Downloader/ConcurrentStream.cs b/src/Downloader/ConcurrentStream.cs index b5be4dc..38652f8 100644 --- a/src/Downloader/ConcurrentStream.cs +++ b/src/Downloader/ConcurrentStream.cs @@ -1,4 +1,5 @@ -using System; +using Downloader.Extensions.Logging; +using System; using System.IO; using System.Threading; using System.Threading.Tasks; @@ -65,21 +66,21 @@ public long MaxMemoryBufferBytes } // parameterless constructor for deserialization - public ConcurrentStream() : this(0) { } + public ConcurrentStream(ILogger logger = null) : this(0, logger) { } - public ConcurrentStream(long maxMemoryBufferBytes = 0) + public ConcurrentStream(long maxMemoryBufferBytes = 0, ILogger logger = null) : base(logger) { _stream = new MemoryStream(); Initial(maxMemoryBufferBytes); } - public ConcurrentStream(Stream stream, long maxMemoryBufferBytes = 0) + public ConcurrentStream(Stream stream, long maxMemoryBufferBytes = 0, ILogger logger = null) : base(logger) { _stream = stream; Initial(maxMemoryBufferBytes); } - public ConcurrentStream(string filename, long initSize, long maxMemoryBufferBytes = 0) + public ConcurrentStream(string filename, long initSize, long maxMemoryBufferBytes = 0, ILogger logger = null) : base(logger) { _path = filename; _stream = new FileStream(filename, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read); @@ -90,9 +91,9 @@ public ConcurrentStream(string filename, long initSize, long maxMemoryBufferByte Initial(maxMemoryBufferBytes); } - private void Initial(long maxMemoryBufferBytes) + private void Initial(long maxMemoryBufferBytes, ILogger logger = null) { - _inputBuffer = new ConcurrentPacketBuffer(maxMemoryBufferBytes); + _inputBuffer = new ConcurrentPacketBuffer(maxMemoryBufferBytes, logger); _watcherCancelSource = new CancellationTokenSource(); Task task = Task.Factory.StartNew( @@ -145,6 +146,7 @@ private async Task Watcher() } catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) { + Logger?.LogError(ex, "ConcurrentStream: Call CancelState()"); CancelState(); } catch (Exception ex) diff --git a/src/Downloader/DownloadPackage.cs b/src/Downloader/DownloadPackage.cs index 56599e0..f1e5c15 100644 --- a/src/Downloader/DownloadPackage.cs +++ b/src/Downloader/DownloadPackage.cs @@ -1,4 +1,5 @@ -using System; +using Downloader.Extensions.Logging; +using System; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -61,12 +62,12 @@ public void Validate() } } - public void BuildStorage(bool reserveFileSize, long maxMemoryBufferBytes = 0) + public void BuildStorage(bool reserveFileSize, long maxMemoryBufferBytes = 0, ILogger logger = null) { if (string.IsNullOrWhiteSpace(FileName)) - Storage = new ConcurrentStream(maxMemoryBufferBytes); + Storage = new ConcurrentStream(maxMemoryBufferBytes, logger); else - Storage = new ConcurrentStream(FileName, reserveFileSize ? TotalFileSize : 0, maxMemoryBufferBytes); + Storage = new ConcurrentStream(FileName, reserveFileSize ? TotalFileSize : 0, maxMemoryBufferBytes, logger); } public void Dispose() diff --git a/src/Downloader/TaskStateManagement.cs b/src/Downloader/TaskStateManagement.cs index 7841614..715327e 100644 --- a/src/Downloader/TaskStateManagement.cs +++ b/src/Downloader/TaskStateManagement.cs @@ -1,5 +1,7 @@ -using System; +using Downloader.Extensions.Logging; +using System; using System.Collections.Concurrent; +using System.Runtime.CompilerServices; using System.Threading.Tasks; namespace Downloader; @@ -7,6 +9,7 @@ namespace Downloader; public class TaskStateManagement { private readonly ConcurrentQueue _exceptions = new ConcurrentQueue(); + protected readonly ILogger Logger; /// /// Gets the System.AggregateException that caused the ConcurrentStream @@ -47,11 +50,17 @@ public class TaskStateManagement /// public TaskStatus Status { get; private set; } = TaskStatus.Created; + public TaskStateManagement(ILogger logger = null) + { + Logger = logger; + } + internal void StartState() => Status = TaskStatus.Running; internal void CompleteState() => Status = TaskStatus.RanToCompletion; internal void CancelState() => Status = TaskStatus.Canceled; - internal void SetException(Exception exp) + internal void SetException(Exception exp, [CallerMemberName] string callerName = null) { + Logger?.LogCritical(exp, $"TaskStateManagement: SetException catch an exception on {callerName}"); Status = TaskStatus.Faulted; _exceptions.Enqueue(exp); Exception = new AggregateException(_exceptions);