Skip to content

Commit

Permalink
implement EnableLiveStreaming config to don't copy downloaded data ev…
Browse files Browse the repository at this point in the history
…ery time raise progress events by default
  • Loading branch information
bezzad committed Sep 18, 2024
1 parent 49da7d9 commit 8b2f27e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 18 deletions.
5 changes: 3 additions & 2 deletions src/Downloader.Test/UnitTests/ChunkDownloaderTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public async Task ReadStreamProgressEventsTest()
var source = DummyData.GenerateRandomBytes(Size);
using var sourceMemoryStream = new MemoryStream(source);
var chunk = new Chunk(0, Size - 1) { Timeout = 100 };
Configuration.EnableLiveStreaming = true;
var chunkDownloader = new ChunkDownloader(chunk, Configuration, Storage);
chunkDownloader.DownloadProgressChanged += (s, e) => {
eventCount++;
Expand Down Expand Up @@ -126,7 +127,7 @@ public async Task ReadStreamTimeoutExceptionTest()
var chunk = new Chunk(0, Size - 1) { Timeout = 0 };
var chunkDownloader = new ChunkDownloader(chunk, Configuration, Storage);
using var memoryStream = new MemoryStream(randomlyBytes);
using var slowStream = new ThrottledStream(memoryStream, Configuration.BufferBlockSize);
await using var slowStream = new ThrottledStream(memoryStream, Configuration.BufferBlockSize);

// act
async Task CallReadStream() => await chunkDownloader
Expand Down Expand Up @@ -171,7 +172,7 @@ async Task act()

// act
await Assert.ThrowsAnyAsync<OperationCanceledException>(act);
using var chunkStream = Storage.OpenRead();
await using var chunkStream = Storage.OpenRead();

// assert
Assert.False(memoryStream.CanRead); // stream has been closed
Expand Down
32 changes: 16 additions & 16 deletions src/Downloader/ChunkDownloader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public async Task<Chunk> Download(Request downloadRequest, PauseToken pause, Can
private async Task<Chunk> ContinueWithDelay(Request request, PauseToken pause, CancellationToken cancelToken)
{
_logger?.LogDebug($"ContinueWithDelay of the chunk {Chunk.Id}");
await request.ThrowIfIsNotSupportDownloadInRange();
await request.ThrowIfIsNotSupportDownloadInRange().ConfigureAwait(false);
await Task.Delay(Chunk.Timeout, cancelToken).ConfigureAwait(false);
// Increasing reading timeout to reduce stress and conflicts
Chunk.Timeout += _timeoutIncrement;
Expand All @@ -96,26 +96,24 @@ private async Task DownloadChunk(Request downloadRequest, PauseToken pauseToken,
HttpWebRequest request = downloadRequest.GetRequest();
SetRequestRange(request);
using HttpWebResponse downloadResponse = request.GetResponse() as HttpWebResponse;
if (downloadResponse.StatusCode == HttpStatusCode.OK ||
downloadResponse.StatusCode == HttpStatusCode.PartialContent ||
downloadResponse.StatusCode == HttpStatusCode.Created ||
downloadResponse.StatusCode == HttpStatusCode.Accepted ||
downloadResponse.StatusCode == HttpStatusCode.ResetContent)
if (downloadResponse?.StatusCode == HttpStatusCode.OK ||
downloadResponse?.StatusCode == HttpStatusCode.PartialContent ||
downloadResponse?.StatusCode == HttpStatusCode.Created ||
downloadResponse?.StatusCode == HttpStatusCode.Accepted ||
downloadResponse?.StatusCode == HttpStatusCode.ResetContent)
{
_logger?.LogDebug($"DownloadChunk of the chunk {Chunk.Id} with response status: {downloadResponse.StatusCode}");
_configuration.RequestConfiguration.CookieContainer = request.CookieContainer;
using Stream responseStream = downloadResponse?.GetResponseStream();
if (responseStream != null)
await using Stream responseStream = downloadResponse.GetResponseStream();
await using (_sourceStream = new ThrottledStream(responseStream, _configuration.MaximumSpeedPerChunk))
{
using (_sourceStream = new ThrottledStream(responseStream, _configuration.MaximumSpeedPerChunk))
{
await ReadStream(_sourceStream, pauseToken, cancelToken).ConfigureAwait(false);
}
await ReadStream(_sourceStream, pauseToken, cancelToken).ConfigureAwait(false);
}
}
else
{
throw new WebException($"Download response status of the chunk {Chunk.Id} was {downloadResponse.StatusCode}: {downloadResponse.StatusDescription}");
throw new WebException($"Download response status of the chunk {Chunk.Id} was {downloadResponse?.StatusCode}: " +
downloadResponse?.StatusDescription);
}
}
}
Expand Down Expand Up @@ -143,7 +141,7 @@ internal async Task ReadStream(Stream stream, PauseToken pauseToken, Cancellatio
try
{
// close stream on cancellation because, it's not work on .Net Framework
using var _ = cancelToken.Register(stream.Close);
await using var _ = cancelToken.Register(stream.Close);
while (readSize > 0 && Chunk.CanWrite)
{
cancelToken.ThrowIfCancellationRequested();
Expand All @@ -152,7 +150,7 @@ internal async Task ReadStream(Stream stream, PauseToken pauseToken, Cancellatio
using var innerCts = CancellationTokenSource.CreateLinkedTokenSource(cancelToken);
innerToken = innerCts.Token;
innerCts.CancelAfter(Chunk.Timeout);
using (innerToken.Value.Register(stream.Close))
await using (innerToken.Value.Register(stream.Close))
{
// if innerToken timeout occurs, close the stream just during the reading stream
readSize = await stream.ReadAsync(buffer, 0, buffer.Length, innerToken.Value).ConfigureAwait(false);
Expand All @@ -171,7 +169,9 @@ internal async Task ReadStream(Stream stream, PauseToken pauseToken, Cancellatio
TotalBytesToReceive = Chunk.Length,
ReceivedBytesSize = Chunk.Position,
ProgressedByteSize = readSize,
ReceivedBytes = buffer.Take(readSize).ToArray()
ReceivedBytes = _configuration.EnableLiveStreaming
? buffer.Take(readSize).ToArray()
: []
});
}
}
Expand Down

0 comments on commit 8b2f27e

Please sign in to comment.