From 8b2f27e627639848dc176dc1520c50e21ee265c4 Mon Sep 17 00:00:00 2001 From: bezzad Date: Wed, 18 Sep 2024 15:55:19 +0330 Subject: [PATCH] implement EnableLiveStreaming config to don't copy downloaded data every time raise progress events by default --- .../UnitTests/ChunkDownloaderTest.cs | 5 +-- src/Downloader/ChunkDownloader.cs | 32 +++++++++---------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/src/Downloader.Test/UnitTests/ChunkDownloaderTest.cs b/src/Downloader.Test/UnitTests/ChunkDownloaderTest.cs index 1810a8e..7e7512b 100644 --- a/src/Downloader.Test/UnitTests/ChunkDownloaderTest.cs +++ b/src/Downloader.Test/UnitTests/ChunkDownloaderTest.cs @@ -81,6 +81,7 @@ public async Task ReadStreamProgressEventsTest() var source = DummyData.GenerateRandomBytes(Size); using var sourceMemoryStream = new MemoryStream(source); var chunk = new Chunk(0, Size - 1) { Timeout = 100 }; + Configuration.EnableLiveStreaming = true; var chunkDownloader = new ChunkDownloader(chunk, Configuration, Storage); chunkDownloader.DownloadProgressChanged += (s, e) => { eventCount++; @@ -126,7 +127,7 @@ public async Task ReadStreamTimeoutExceptionTest() var chunk = new Chunk(0, Size - 1) { Timeout = 0 }; var chunkDownloader = new ChunkDownloader(chunk, Configuration, Storage); using var memoryStream = new MemoryStream(randomlyBytes); - using var slowStream = new ThrottledStream(memoryStream, Configuration.BufferBlockSize); + await using var slowStream = new ThrottledStream(memoryStream, Configuration.BufferBlockSize); // act async Task CallReadStream() => await chunkDownloader @@ -171,7 +172,7 @@ async Task act() // act await Assert.ThrowsAnyAsync(act); - using var chunkStream = Storage.OpenRead(); + await using var chunkStream = Storage.OpenRead(); // assert Assert.False(memoryStream.CanRead); // stream has been closed diff --git a/src/Downloader/ChunkDownloader.cs b/src/Downloader/ChunkDownloader.cs index b173c8b..3b95087 100644 --- a/src/Downloader/ChunkDownloader.cs +++ b/src/Downloader/ChunkDownloader.cs @@ -79,7 +79,7 @@ public async Task Download(Request downloadRequest, PauseToken pause, Can private async Task ContinueWithDelay(Request request, PauseToken pause, CancellationToken cancelToken) { _logger?.LogDebug($"ContinueWithDelay of the chunk {Chunk.Id}"); - await request.ThrowIfIsNotSupportDownloadInRange(); + await request.ThrowIfIsNotSupportDownloadInRange().ConfigureAwait(false); await Task.Delay(Chunk.Timeout, cancelToken).ConfigureAwait(false); // Increasing reading timeout to reduce stress and conflicts Chunk.Timeout += _timeoutIncrement; @@ -96,26 +96,24 @@ private async Task DownloadChunk(Request downloadRequest, PauseToken pauseToken, HttpWebRequest request = downloadRequest.GetRequest(); SetRequestRange(request); using HttpWebResponse downloadResponse = request.GetResponse() as HttpWebResponse; - if (downloadResponse.StatusCode == HttpStatusCode.OK || - downloadResponse.StatusCode == HttpStatusCode.PartialContent || - downloadResponse.StatusCode == HttpStatusCode.Created || - downloadResponse.StatusCode == HttpStatusCode.Accepted || - downloadResponse.StatusCode == HttpStatusCode.ResetContent) + if (downloadResponse?.StatusCode == HttpStatusCode.OK || + downloadResponse?.StatusCode == HttpStatusCode.PartialContent || + downloadResponse?.StatusCode == HttpStatusCode.Created || + downloadResponse?.StatusCode == HttpStatusCode.Accepted || + downloadResponse?.StatusCode == HttpStatusCode.ResetContent) { _logger?.LogDebug($"DownloadChunk of the chunk {Chunk.Id} with response status: {downloadResponse.StatusCode}"); _configuration.RequestConfiguration.CookieContainer = request.CookieContainer; - using Stream responseStream = downloadResponse?.GetResponseStream(); - if (responseStream != null) + await using Stream responseStream = downloadResponse.GetResponseStream(); + await using (_sourceStream = new ThrottledStream(responseStream, _configuration.MaximumSpeedPerChunk)) { - using (_sourceStream = new ThrottledStream(responseStream, _configuration.MaximumSpeedPerChunk)) - { - await ReadStream(_sourceStream, pauseToken, cancelToken).ConfigureAwait(false); - } + await ReadStream(_sourceStream, pauseToken, cancelToken).ConfigureAwait(false); } } else { - throw new WebException($"Download response status of the chunk {Chunk.Id} was {downloadResponse.StatusCode}: {downloadResponse.StatusDescription}"); + throw new WebException($"Download response status of the chunk {Chunk.Id} was {downloadResponse?.StatusCode}: " + + downloadResponse?.StatusDescription); } } } @@ -143,7 +141,7 @@ internal async Task ReadStream(Stream stream, PauseToken pauseToken, Cancellatio try { // close stream on cancellation because, it's not work on .Net Framework - using var _ = cancelToken.Register(stream.Close); + await using var _ = cancelToken.Register(stream.Close); while (readSize > 0 && Chunk.CanWrite) { cancelToken.ThrowIfCancellationRequested(); @@ -152,7 +150,7 @@ internal async Task ReadStream(Stream stream, PauseToken pauseToken, Cancellatio using var innerCts = CancellationTokenSource.CreateLinkedTokenSource(cancelToken); innerToken = innerCts.Token; innerCts.CancelAfter(Chunk.Timeout); - using (innerToken.Value.Register(stream.Close)) + await using (innerToken.Value.Register(stream.Close)) { // if innerToken timeout occurs, close the stream just during the reading stream readSize = await stream.ReadAsync(buffer, 0, buffer.Length, innerToken.Value).ConfigureAwait(false); @@ -171,7 +169,9 @@ internal async Task ReadStream(Stream stream, PauseToken pauseToken, Cancellatio TotalBytesToReceive = Chunk.Length, ReceivedBytesSize = Chunk.Position, ProgressedByteSize = readSize, - ReceivedBytes = buffer.Take(readSize).ToArray() + ReceivedBytes = _configuration.EnableLiveStreaming + ? buffer.Take(readSize).ToArray() + : [] }); } }