From 88ad83beb3acd255cc0ef5799ac439e646afa5af Mon Sep 17 00:00:00 2001 From: bezzad Date: Sun, 5 Nov 2023 22:56:39 +0330 Subject: [PATCH] fixed strean expandability feature --- src/Downloader.Test/UnitTests/StorageTest.cs | 22 +++++++++++++-- .../UnitTests/StorageTestOnFile.cs | 23 ++++++++------- .../UnitTests/StorageTestOnMemory.cs | 12 ++++++-- src/Downloader/ConcurrentPacketBuffer.cs | 8 ++---- src/Downloader/ConcurrentStream.cs | 28 ++++++++++++++++--- 5 files changed, 66 insertions(+), 27 deletions(-) diff --git a/src/Downloader.Test/UnitTests/StorageTest.cs b/src/Downloader.Test/UnitTests/StorageTest.cs index d658648..556b04b 100644 --- a/src/Downloader.Test/UnitTests/StorageTest.cs +++ b/src/Downloader.Test/UnitTests/StorageTest.cs @@ -11,7 +11,9 @@ public abstract class StorageTest : IDisposable { protected const int DataLength = 2048; protected readonly byte[] Data = DummyData.GenerateRandomBytes(DataLength); - protected virtual ConcurrentStream Storage { get; } + protected ConcurrentStream Storage; + + protected abstract void CreateStorage(int initialSize); public virtual void Dispose() { @@ -22,6 +24,7 @@ public virtual void Dispose() public async Task OpenReadLengthTest() { // arrange + CreateStorage(DataLength); await Storage.WriteAsync(0, Data, DataLength); Storage.Flush(); @@ -36,6 +39,7 @@ public async Task OpenReadLengthTest() public async Task OpenReadStreamTest() { // arrange + CreateStorage(DataLength); await Storage.WriteAsync(0, Data, DataLength); Storage.Flush(); @@ -53,6 +57,7 @@ public async Task OpenReadStreamTest() public async Task SlowWriteTest() { // arrange + CreateStorage(DataLength); var data = new byte[] { 1 }; var size = 1024; @@ -73,6 +78,7 @@ public async Task SlowWriteTest() public async Task WriteAsyncLengthTest() { // arrange + CreateStorage(DataLength); var length = DataLength / 2; // act @@ -87,6 +93,7 @@ public async Task WriteAsyncLengthTest() public async Task WriteAsyncBytesTest() { // arrange + CreateStorage(DataLength); var length = DataLength / 2; // act @@ -105,6 +112,7 @@ public async Task WriteAsyncBytesTest() public async Task WriteAsyncMultipleTimeTest() { // arrange + CreateStorage(DataLength); var count = 128; var size = DataLength / count; @@ -128,6 +136,7 @@ public async Task WriteAsyncMultipleTimeTest() public async Task WriteAsyncOutOfRangeExceptionTest() { // arrange + CreateStorage(DataLength); var length = DataLength + 1; // act @@ -141,19 +150,22 @@ public async Task WriteAsyncOutOfRangeExceptionTest() public async Task TestDispose() { // arrange + CreateStorage(DataLength); await Storage.WriteAsync(0, Data, DataLength); // act Storage.Dispose(); // assert - Assert.Equal(0, Storage.Length); + Assert.ThrowsAny(() => Storage.Length); + Assert.ThrowsAny(() => Storage.Data); } [Fact] public async Task FlushTest() { // arrange + CreateStorage(DataLength); await Storage.WriteAsync(0, Data, DataLength); // act @@ -167,6 +179,7 @@ public async Task FlushTest() public async Task GetLengthTest() { // arrange + CreateStorage(DataLength); var data = new byte[] { 0x0, 0x1, 0x2, 0x3, 0x4 }; await Storage.WriteAsync(0, data, 1); Storage.Flush(); @@ -182,6 +195,7 @@ public async Task GetLengthTest() public async Task TestStreamExpandability() { // arrange + CreateStorage(DataLength); var data = new byte[] { 0x0, 0x1, 0x2, 0x3, 0x4 }; await Storage.WriteAsync(0, data, data.Length); Storage.Flush(); @@ -189,7 +203,7 @@ public async Task TestStreamExpandability() // act var serializedStream = JsonConvert.SerializeObject(Storage); var mutableStream = JsonConvert.DeserializeObject(serializedStream); - await mutableStream.WriteAsync(0, data, data.Length); + await mutableStream.WriteAsync(mutableStream.Position, data, data.Length); mutableStream.Flush(); // assert @@ -200,6 +214,7 @@ public async Task TestStreamExpandability() public async Task TestDynamicBufferData() { // arrange + CreateStorage(DataLength); var size = 1024; // 1KB // act @@ -230,6 +245,7 @@ public async Task TestDynamicBufferData() public async Task TestSerialization() { // arrange + CreateStorage(DataLength); var size = 256; var data = DummyData.GenerateOrderedBytes(size); diff --git a/src/Downloader.Test/UnitTests/StorageTestOnFile.cs b/src/Downloader.Test/UnitTests/StorageTestOnFile.cs index 7ab4246..a8ac28f 100644 --- a/src/Downloader.Test/UnitTests/StorageTestOnFile.cs +++ b/src/Downloader.Test/UnitTests/StorageTestOnFile.cs @@ -7,14 +7,11 @@ namespace Downloader.Test.UnitTests; public class StorageTestOnFile : StorageTest { private string path; - private int size; - private ConcurrentStream _storage; - protected override ConcurrentStream Storage => _storage ??= new ConcurrentStream(path, size); - - public StorageTestOnFile() + + protected override void CreateStorage(int initialSize) { - size = 1024 * 1024; // 1MB path = Path.GetTempFileName(); + Storage = new ConcurrentStream(path, initialSize); } public override void Dispose() @@ -27,18 +24,18 @@ public override void Dispose() public void TestInitialSizeOnFileStream() { // act - var Storage = new ConcurrentStream(path, size); + CreateStorage(DataLength); // assert - Assert.Equal(size, new FileInfo(path).Length); - Assert.Equal(size, Storage.Length); + Assert.Equal(DataLength, new FileInfo(path).Length); + Assert.Equal(DataLength, Storage.Length); } [Fact] public void TestInitialSizeWithNegativeNumberOnFileStream() { // arrange - size = -1; + CreateStorage(-1); // act Storage.Flush(); // create lazy stream @@ -52,9 +49,10 @@ public void TestInitialSizeWithNegativeNumberOnFileStream() public async Task TestWriteSizeOverflowOnFileStream() { // arrange - size = 512; + var size = 512; var actualSize = size * 2; var data = new byte[] { 1 }; + CreateStorage(size); // act for (int i = 0; i < actualSize; i++) @@ -74,11 +72,12 @@ public async Task TestWriteSizeOverflowOnFileStream() public async Task TestAccessMoreThanSizeOnFileStream() { // arrange - size = 10; + var size = 10; var jumpStepCount = 1024; // 1KB var data = new byte[] { 1, 1, 1, 1, 1 }; var selectedDataLen = 3; var actualSize = size + jumpStepCount + selectedDataLen; + CreateStorage(size); // act await Storage.WriteAsync(size + jumpStepCount, data, selectedDataLen); diff --git a/src/Downloader.Test/UnitTests/StorageTestOnMemory.cs b/src/Downloader.Test/UnitTests/StorageTestOnMemory.cs index 9c6f032..65d0420 100644 --- a/src/Downloader.Test/UnitTests/StorageTestOnMemory.cs +++ b/src/Downloader.Test/UnitTests/StorageTestOnMemory.cs @@ -5,13 +5,19 @@ namespace Downloader.Test.UnitTests; public class StorageTestOnMemory : StorageTest { - private ConcurrentStream _storage; - protected override ConcurrentStream Storage => _storage ??= new ConcurrentStream(); + protected override void CreateStorage(int initialSize) + { + Storage = new ConcurrentStream(); + } [Fact] public void TestInitialSizeOnMemoryStream() { + // act + CreateStorage(0); + using var stream = Storage.OpenRead(); + // assert - Assert.IsType(Storage.OpenRead()); + Assert.IsType(stream); } } diff --git a/src/Downloader/ConcurrentPacketBuffer.cs b/src/Downloader/ConcurrentPacketBuffer.cs index 7928763..0045a26 100644 --- a/src/Downloader/ConcurrentPacketBuffer.cs +++ b/src/Downloader/ConcurrentPacketBuffer.cs @@ -74,7 +74,6 @@ public async Task TryAdd(T item) public async Task WaitTryTakeAsync(CancellationToken cancellation) { - ResumeAddingIfEmpty(); await _queueConsumeLocker.WaitAsync(cancellation).ConfigureAwait(false); if (_queue.TryDequeue(out var item)) { @@ -93,19 +92,18 @@ private void StopAddingIfLimitationExceeded(long packetSize) } } - private void ResumeAddingIfEmpty() + public void ResumeAddingIfEmpty() { if (IsEmpty) { - // resume writing packets to the queue + _completionEvent.Set(); ResumeAdding(); - _completionEvent.Set(); } } public void WaitToComplete() { - _completionEvent.Wait(TimeSpan.FromMilliseconds(100)); + _completionEvent.Wait(TimeSpan.FromMilliseconds(1_000)); } public void CompleteAdding() diff --git a/src/Downloader/ConcurrentStream.cs b/src/Downloader/ConcurrentStream.cs index 4576f68..5485e2a 100644 --- a/src/Downloader/ConcurrentStream.cs +++ b/src/Downloader/ConcurrentStream.cs @@ -39,7 +39,12 @@ public byte[] Data set { if (value != null) - _stream = new MemoryStream(value, true); + { + // Don't pass straight value to MemoryStream, + // because causes stream to be an immutable array + _stream = new MemoryStream(); + _stream.Write(value, 0, value.Length); + } } } public bool CanRead => _stream?.CanRead == true; @@ -119,17 +124,33 @@ private async Task Watcher() { while (!_watcherCancelSource.IsCancellationRequested) { + // Warning 1: When the Watcher() is here, at the same time Flush() is called, + // then the flush method checks if the queue is not empty, + // wait until will is empty. But, after checking the queue is not empty, + // immediately the below code is executed, and the last packet is consumed. + // So the last wait becomes deadlock! var packet = await _inputBuffer.WaitTryTakeAsync(_watcherCancelSource.Token).ConfigureAwait(false); if (packet != null) { + // Warning 2: When the Watcher() is here, at the same time Flush() is called, + // then the Flush method checks if the queue is empty, so break workflow. + // but, the stream is not still filled! + await WritePacketOnFile(packet).ConfigureAwait(false); } + // After last packet writing completion, we can check to continue adding + _inputBuffer.ResumeAddingIfEmpty(); } } catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) { await Task.Yield(); } + catch (Exception ex) + { + Console.WriteLine(ex.Message.ToString()); + _watcherCancelSource.Cancel(true); + } } public long Seek(long offset, SeekOrigin origin) @@ -151,7 +172,6 @@ private async Task WritePacketOnFile(Packet packet) { // seek with SeekOrigin.Begin is so faster than SeekOrigin.Current Seek(packet.Position, SeekOrigin.Begin); - await _stream.WriteAsync(packet.Data, 0, packet.Length).ConfigureAwait(false); packet.Dispose(); } @@ -159,12 +179,12 @@ private async Task WritePacketOnFile(Packet packet) public void Flush() { _inputBuffer.WaitToComplete(); - + if (_stream?.CanRead == true) { _stream?.Flush(); } - + GC.Collect(); }