Skip to content

Commit

Permalink
add IAsyncDisposable implementation to Download.cs and DownloadPackag…
Browse files Browse the repository at this point in the history
…e and ConcurrentStream.cs and IDownload.cs
  • Loading branch information
bezzad committed Sep 18, 2024
1 parent 1e7591a commit 5061c5a
Show file tree
Hide file tree
Showing 4 changed files with 313 additions and 38 deletions.
133 changes: 128 additions & 5 deletions src/Downloader/ConcurrentStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@

namespace Downloader;

public class ConcurrentStream : TaskStateManagement, IDisposable
/// <summary>
/// Represents a stream that supports concurrent read and write operations with optional memory buffering.
/// </summary>
public class ConcurrentStream : TaskStateManagement, IDisposable, IAsyncDisposable
{
private ConcurrentPacketBuffer<Packet> _inputBuffer;
private volatile bool _disposed;
private Stream _stream;
private string _path;
private CancellationTokenSource _watcherCancelSource;

/// <summary>
/// Gets or sets the path of the file associated with the stream.
/// </summary>
public string Path
{
get => _path;
Expand All @@ -26,6 +32,10 @@ public string Path
}
}
}

/// <summary>
/// Gets the data of the stream as a byte array if the stream is a MemoryStream.
/// </summary>
public byte[] Data
{
get
Expand All @@ -49,38 +59,86 @@ public byte[] Data
}
}
}

/// <summary>
/// Gets a value indicating whether the stream supports reading.
/// </summary>
public bool CanRead => _stream?.CanRead == true;

/// <summary>
/// Gets a value indicating whether the stream supports seeking.
/// </summary>
public bool CanSeek => _stream?.CanSeek == true;

/// <summary>
/// Gets a value indicating whether the stream supports writing.
/// </summary>
public bool CanWrite => _stream?.CanWrite == true;

/// <summary>
/// Gets the length of the stream in bytes.
/// </summary>
public long Length => _stream?.Length ?? 0;

/// <summary>
/// Gets or sets the current position within the stream.
/// </summary>
public long Position
{
get => _stream?.Position ?? 0;
set => _stream.Position = value;
}

/// <summary>
/// Gets or sets the maximum amount of memory, in bytes, that the stream is allowed to allocate for buffering.
/// </summary>
public long MaxMemoryBufferBytes
{
get => _inputBuffer.BufferSize;
set => _inputBuffer.BufferSize = value;
}

// parameterless constructor for deserialization
public ConcurrentStream() : this(0, null) { }
/// <summary>
/// Initializes a new instance of the <see cref="ConcurrentStream"/> class with default settings.
/// </summary>
public ConcurrentStream() : this(0) { }

/// <summary>
/// Initializes a new instance of the <see cref="ConcurrentStream"/> class with the specified logger.
/// </summary>
/// <param name="logger">The logger to use for logging.</param>
public ConcurrentStream(ILogger logger) : this(0, logger) { }

/// <summary>
/// Initializes a new instance of the <see cref="ConcurrentStream"/> class with the specified maximum memory buffer size and logger.
/// </summary>
/// <param name="maxMemoryBufferBytes">The maximum amount of memory, in bytes, that the stream is allowed to allocate for buffering.</param>
/// <param name="logger">The logger to use for logging.</param>
public ConcurrentStream(long maxMemoryBufferBytes = 0, ILogger logger = null) : base(logger)
{
_stream = new MemoryStream();
Initial(maxMemoryBufferBytes);
}

/// <summary>
/// Initializes a new instance of the <see cref="ConcurrentStream"/> class with the specified stream and maximum memory buffer size.
/// </summary>
/// <param name="stream">The stream to use.</param>
/// <param name="maxMemoryBufferBytes">The maximum amount of memory, in bytes, that the stream is allowed to allocate for buffering.</param>
/// <param name="logger">The logger to use for logging.</param>
public ConcurrentStream(Stream stream, long maxMemoryBufferBytes = 0, ILogger logger = null) : base(logger)
{
_stream = stream;
Initial(maxMemoryBufferBytes);
}

/// <summary>
/// Initializes a new instance of the <see cref="ConcurrentStream"/> class with the specified file path, initial size, and maximum memory buffer size.
/// </summary>
/// <param name="filename">The path of the file to use.</param>
/// <param name="initSize">The initial size of the file.</param>
/// <param name="maxMemoryBufferBytes">The maximum amount of memory, in bytes, that the stream is allowed to allocate for buffering.</param>
/// <param name="logger">The logger to use for logging.</param>
public ConcurrentStream(string filename, long initSize, long maxMemoryBufferBytes = 0, ILogger logger = null) : base(logger)
{
_path = filename;
Expand All @@ -92,6 +150,11 @@ public ConcurrentStream(string filename, long initSize, long maxMemoryBufferByte
Initial(maxMemoryBufferBytes);
}

/// <summary>
/// Initializes the stream with the specified maximum memory buffer size.
/// </summary>
/// <param name="maxMemoryBufferBytes">The maximum amount of memory, in bytes, that the stream is allowed to allocate for buffering.</param>
/// <param name="logger">The logger to use for logging.</param>
private void Initial(long maxMemoryBufferBytes, ILogger logger = null)
{
_inputBuffer = new ConcurrentPacketBuffer<Packet>(maxMemoryBufferBytes, logger);
Expand All @@ -106,18 +169,37 @@ private void Initial(long maxMemoryBufferBytes, ILogger logger = null)
task.Unwrap();
}

/// <summary>
/// Opens the stream for reading.
/// </summary>
/// <returns>The stream for reading.</returns>
public Stream OpenRead()
{
Seek(0, SeekOrigin.Begin);
return _stream;
}

/// <summary>
/// Reads a sequence of bytes from the stream and advances the position within the stream by the number of bytes read.
/// </summary>
/// <param name="buffer">An array of bytes to store the read data.</param>
/// <param name="offset">The zero-based byte offset in buffer at which to begin storing the data read from the stream.</param>
/// <param name="count">The maximum number of bytes to be read from the stream.</param>
/// <returns>The total number of bytes read into the buffer.</returns>
public int Read(byte[] buffer, int offset, int count)
{
var stream = OpenRead();
return stream.Read(buffer, offset, count);
}

/// <summary>
/// Writes a sequence of bytes to the stream asynchronously at the specified position.
/// </summary>
/// <param name="position">The position within the stream to write the data.</param>
/// <param name="bytes">The data to write to the stream.</param>
/// <param name="length">The number of bytes to write.</param>
/// <param name="fireAndForget">A value indicating whether to wait for the write operation to complete.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public async Task WriteAsync(long position, byte[] bytes, int length, bool fireAndForget = true)
{
if (bytes.Length < length)
Expand All @@ -135,6 +217,10 @@ public async Task WriteAsync(long position, byte[] bytes, int length, bool fireA
}
}

/// <summary>
/// Watches for incoming packets and writes them to the stream.
/// </summary>
/// <returns>A task that represents the asynchronous watch operation.</returns>
private async Task Watcher()
{
try
Expand All @@ -161,6 +247,12 @@ private async Task Watcher()
}
}

/// <summary>
/// Sets the position within the current stream.
/// </summary>
/// <param name="offset">A byte offset relative to the origin parameter.</param>
/// <param name="origin">A value of type SeekOrigin indicating the reference point used to obtain the new position.</param>
/// <returns>The new position within the current stream.</returns>
public long Seek(long offset, SeekOrigin origin)
{
if (offset != Position && CanSeek)
Expand All @@ -171,11 +263,20 @@ public long Seek(long offset, SeekOrigin origin)
return Position;
}

/// <summary>
/// Sets the length of the current stream.
/// </summary>
/// <param name="value">The desired length of the current stream in bytes.</param>
public void SetLength(long value)
{
_stream.SetLength(value);
}

/// <summary>
/// Writes a packet to the stream.
/// </summary>
/// <param name="packet">The packet to write.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
private async Task WritePacketOnFile(Packet packet)
{
// seek with SeekOrigin.Begin is so faster than SeekOrigin.Current
Expand All @@ -184,6 +285,10 @@ private async Task WritePacketOnFile(Packet packet)
packet.Dispose();
}

/// <summary>
/// Flushes the stream asynchronously.
/// </summary>
/// <returns>A task that represents the asynchronous flush operation.</returns>
public async Task FlushAsync()
{
await _inputBuffer.WaitToComplete().ConfigureAwait(false);
Expand All @@ -196,14 +301,32 @@ public async Task FlushAsync()
GC.Collect();
}

/// <summary>
/// Releases the unmanaged resources used by the ConcurrentStream and optionally releases the managed resources.
/// </summary>
public void Dispose()
{
if (_disposed == false)
if (!_disposed)
{
_disposed = true;
_watcherCancelSource.Cancel(); // request the cancellation
_stream.Dispose();
_inputBuffer.Dispose();
}
}
}

/// <summary>
/// Asynchronously releases the unmanaged resources used by the ConcurrentStream.
/// </summary>
/// <returns>A task that represents the asynchronous dispose operation.</returns>
public async ValueTask DisposeAsync()
{
if (!_disposed)
{
_disposed = true;
await _watcherCancelSource.CancelAsync().ConfigureAwait(false); // request the cancellation
await _stream.DisposeAsync().ConfigureAwait(false);
_inputBuffer.Dispose();
}
}
}
52 changes: 29 additions & 23 deletions src/Downloader/Download.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,26 @@ internal class Download : IDownload

public event EventHandler<DownloadProgressChangedEventArgs> ChunkDownloadProgressChanged
{
add { _downloadService.ChunkDownloadProgressChanged += value; }
remove { _downloadService.ChunkDownloadProgressChanged -= value; }
add => _downloadService.ChunkDownloadProgressChanged += value;
remove => _downloadService.ChunkDownloadProgressChanged -= value;
}

public event EventHandler<AsyncCompletedEventArgs> DownloadFileCompleted
{
add { _downloadService.DownloadFileCompleted += value; }
remove { _downloadService.DownloadFileCompleted -= value; }
add => _downloadService.DownloadFileCompleted += value;
remove => _downloadService.DownloadFileCompleted -= value;
}

public event EventHandler<DownloadProgressChangedEventArgs> DownloadProgressChanged
{
add { _downloadService.DownloadProgressChanged += value; }
remove { _downloadService.DownloadProgressChanged -= value; }
add => _downloadService.DownloadProgressChanged += value;
remove => _downloadService.DownloadProgressChanged -= value;
}

public event EventHandler<DownloadStartedEventArgs> DownloadStarted
{
add { _downloadService.DownloadStarted += value; }
remove { _downloadService.DownloadStarted -= value; }
add => _downloadService.DownloadStarted += value;
remove => _downloadService.DownloadStarted -= value;
}

public Download(string url, string path, string filename, DownloadConfiguration configuration)
Expand Down Expand Up @@ -72,26 +72,26 @@ public async Task<Stream> StartAsync(CancellationToken cancellationToken = defau
{
return await _downloadService.DownloadFileTaskAsync(Url, cancellationToken).ConfigureAwait(false);
}
else if (string.IsNullOrWhiteSpace(Filename))
{
await _downloadService.DownloadFileTaskAsync(Url, new DirectoryInfo(Folder), cancellationToken).ConfigureAwait(false);
return null;
}
else

if (string.IsNullOrWhiteSpace(Filename))
{
// with Folder and Filename
await _downloadService.DownloadFileTaskAsync(Url, Path.Combine(Folder, Filename), cancellationToken).ConfigureAwait(false);
await _downloadService.DownloadFileTaskAsync(Url, new DirectoryInfo(Folder!), cancellationToken)
.ConfigureAwait(false);
return null;
}

// with Folder and Filename
await _downloadService.DownloadFileTaskAsync(Url, Path.Combine(Folder!, Filename), cancellationToken)
.ConfigureAwait(false);
return null;
}
else if(string.IsNullOrWhiteSpace(Url))

if (string.IsNullOrWhiteSpace(Url))
{
return await _downloadService.DownloadFileTaskAsync(Package, cancellationToken).ConfigureAwait(false);
}
else
{
return await _downloadService.DownloadFileTaskAsync(Package, Url, cancellationToken).ConfigureAwait(false);
}

return await _downloadService.DownloadFileTaskAsync(Package, Url, cancellationToken).ConfigureAwait(false);
}

public void Stop()
Expand Down Expand Up @@ -123,9 +123,15 @@ public override int GetHashCode()
return hashCode;
}

public async void Dispose()
public async ValueTask DisposeAsync()
{
await _downloadService.Clear().ConfigureAwait(false);
Package = null;
}
}

public void Dispose()
{
_downloadService.Clear().Wait();
Package = null;
}
}
Loading

0 comments on commit 5061c5a

Please sign in to comment.