Skip to content

Commit

Permalink
added logger to streams and packages
Browse files Browse the repository at this point in the history
  • Loading branch information
bezzad committed Dec 17, 2023
1 parent 8e09a94 commit 0394e70
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ public ChunkDownloaderOnMemoryTest()
MinimumSizeOfChunking = 16,
Timeout = 100,
};
Storage = new ConcurrentStream();
Storage = new ConcurrentStream(null);
}
}
2 changes: 1 addition & 1 deletion src/Downloader.Test/UnitTests/StorageTestOnMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class StorageTestOnMemory : StorageTest
{
protected override void CreateStorage(int initialSize)
{
Storage = new ConcurrentStream();
Storage = new ConcurrentStream(null);
}

[Fact]
Expand Down
6 changes: 3 additions & 3 deletions src/Downloader/ConcurrentPacketBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private void StopAddingIfLimitationExceeded(long packetSize)
{
if (BufferSize < packetSize * Count)
{
_logger.LogDebug($"ConcurrentPacketBuffer: Stop writing packets to the queue on size {packetSize * Count}bytes until the memory is free");
_logger?.LogDebug($"ConcurrentPacketBuffer: Stop writing packets to the queue on size {packetSize * Count}bytes until the memory is free");
StopAdding();
}
}
Expand All @@ -124,13 +124,13 @@ public async Task WaitToComplete()

public void StopAdding()
{
_logger.LogDebug("ConcurrentPacketBuffer: stop writing new items to the list by blocking writer threads");
_logger?.LogDebug("ConcurrentPacketBuffer: stop writing new items to the list by blocking writer threads");
_addingBlocker.Pause();
}

public void ResumeAdding()
{
_logger.LogDebug("ConcurrentPacketBuffer: resume writing new item to the list");
_logger?.LogDebug("ConcurrentPacketBuffer: resume writing new item to the list");
_addingBlocker.Resume();
}

Expand Down
16 changes: 9 additions & 7 deletions src/Downloader/ConcurrentStream.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using Downloader.Extensions.Logging;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -65,21 +66,21 @@ public long MaxMemoryBufferBytes
}

// parameterless constructor for deserialization
public ConcurrentStream() : this(0) { }
public ConcurrentStream(ILogger logger = null) : this(0, logger) { }

public ConcurrentStream(long maxMemoryBufferBytes = 0)
public ConcurrentStream(long maxMemoryBufferBytes = 0, ILogger logger = null) : base(logger)
{
_stream = new MemoryStream();
Initial(maxMemoryBufferBytes);
}

public ConcurrentStream(Stream stream, long maxMemoryBufferBytes = 0)
public ConcurrentStream(Stream stream, long maxMemoryBufferBytes = 0, ILogger logger = null) : base(logger)
{
_stream = stream;
Initial(maxMemoryBufferBytes);
}

public ConcurrentStream(string filename, long initSize, long maxMemoryBufferBytes = 0)
public ConcurrentStream(string filename, long initSize, long maxMemoryBufferBytes = 0, ILogger logger = null) : base(logger)
{
_path = filename;
_stream = new FileStream(filename, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read);
Expand All @@ -90,9 +91,9 @@ public ConcurrentStream(string filename, long initSize, long maxMemoryBufferByte
Initial(maxMemoryBufferBytes);
}

private void Initial(long maxMemoryBufferBytes)
private void Initial(long maxMemoryBufferBytes, ILogger logger = null)
{
_inputBuffer = new ConcurrentPacketBuffer<Packet>(maxMemoryBufferBytes);
_inputBuffer = new ConcurrentPacketBuffer<Packet>(maxMemoryBufferBytes, logger);
_watcherCancelSource = new CancellationTokenSource();

Task<Task> task = Task.Factory.StartNew(
Expand Down Expand Up @@ -145,6 +146,7 @@ private async Task Watcher()
}
catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException)
{
Logger?.LogError(ex, "ConcurrentStream: Call CancelState()");
CancelState();
}
catch (Exception ex)
Expand Down
9 changes: 5 additions & 4 deletions src/Downloader/DownloadPackage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using Downloader.Extensions.Logging;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -61,12 +62,12 @@ public void Validate()
}
}

public void BuildStorage(bool reserveFileSize, long maxMemoryBufferBytes = 0)
public void BuildStorage(bool reserveFileSize, long maxMemoryBufferBytes = 0, ILogger logger = null)
{
if (string.IsNullOrWhiteSpace(FileName))
Storage = new ConcurrentStream(maxMemoryBufferBytes);
Storage = new ConcurrentStream(maxMemoryBufferBytes, logger);
else
Storage = new ConcurrentStream(FileName, reserveFileSize ? TotalFileSize : 0, maxMemoryBufferBytes);
Storage = new ConcurrentStream(FileName, reserveFileSize ? TotalFileSize : 0, maxMemoryBufferBytes, logger);
}

public void Dispose()
Expand Down
13 changes: 11 additions & 2 deletions src/Downloader/TaskStateManagement.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
using System;
using Downloader.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

namespace Downloader;

public class TaskStateManagement
{
private readonly ConcurrentQueue<Exception> _exceptions = new ConcurrentQueue<Exception>();
protected readonly ILogger Logger;

/// <summary>
/// Gets the System.AggregateException that caused the ConcurrentStream
Expand Down Expand Up @@ -47,11 +50,17 @@ public class TaskStateManagement
/// </summary>
public TaskStatus Status { get; private set; } = TaskStatus.Created;

public TaskStateManagement(ILogger logger = null)
{
Logger = logger;
}

internal void StartState() => Status = TaskStatus.Running;
internal void CompleteState() => Status = TaskStatus.RanToCompletion;
internal void CancelState() => Status = TaskStatus.Canceled;
internal void SetException(Exception exp)
internal void SetException(Exception exp, [CallerMemberName] string callerName = null)
{
Logger?.LogCritical(exp, $"TaskStateManagement: SetException catch an exception on {callerName}");
Status = TaskStatus.Faulted;
_exceptions.Enqueue(exp);
Exception = new AggregateException(_exceptions);
Expand Down

0 comments on commit 0394e70

Please sign in to comment.