diff --git a/Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs b/Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs index 3cb3e6c748..93a2f2419d 100644 --- a/Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs +++ b/Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs @@ -101,13 +101,13 @@ public override async Task SendAsync( ((CosmosTraceDiagnostics)response.Diagnostics).Value.AddOrUpdateDatum("ExcludedRegions", request.RequestOptions.ExcludeRegions); } - if (ConfigurationManager.IsBinaryEncodingEnabled() - && RequestInvokerHandler.IsPointOperationSupportedForBinaryEncoding(request) - && response.Content != null - && response.Content is not CloneableStream) - { - response.Content = await StreamExtension.AsClonableStreamAsync(response.Content, default); - } + //if (ConfigurationManager.IsBinaryEncodingEnabled() + // && RequestInvokerHandler.IsPointOperationSupportedForBinaryEncoding(request) + // && response.Content != null + // && response.Content is not CloneableStream) + //{ + // response.Content = await StreamExtension.AsClonableStreamAsync(response.Content, default); + //} return response; } diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs index ca7565cd2f..4c058267ab 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs @@ -926,7 +926,7 @@ private async Task ProcessItemStreamAsync( // Convert Text to Binary Stream. streamPayload = CosmosSerializationUtil.TrySerializeStreamToTargetFormat( targetSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(), - inputStream: streamPayload == null ? null : await StreamExtension.AsClonableStreamAsync(streamPayload)); + inputStream: streamPayload ?? null); ResponseMessage responseMessage = await this.ClientContext.ProcessResourceOperationStreamAsync( resourceUri: resourceUri, @@ -943,12 +943,11 @@ private async Task ProcessItemStreamAsync( // Convert Binary Stream to Text. if (targetResponseSerializationFormat.HasValue - && (requestOptions == null || !requestOptions.EnableBinaryResponseOnPointOperations) - && responseMessage?.Content is CloneableStream outputCloneableStream) + && (requestOptions == null || !requestOptions.EnableBinaryResponseOnPointOperations)) { responseMessage.Content = CosmosSerializationUtil.TrySerializeStreamToTargetFormat( targetSerializationFormat: targetResponseSerializationFormat.Value, - inputStream: outputCloneableStream); + inputStream: responseMessage?.Content); } return responseMessage; diff --git a/Microsoft.Azure.Cosmos/src/Serializer/CosmosBufferedStreamWrapper.cs b/Microsoft.Azure.Cosmos/src/Serializer/CosmosBufferedStreamWrapper.cs index f59f14931a..3fb0b72965 100644 --- a/Microsoft.Azure.Cosmos/src/Serializer/CosmosBufferedStreamWrapper.cs +++ b/Microsoft.Azure.Cosmos/src/Serializer/CosmosBufferedStreamWrapper.cs @@ -5,8 +5,10 @@ namespace Microsoft.Azure.Cosmos.Serializer { using System; + using System.Diagnostics; using System.IO; - using System.Linq; + using System.Threading; + using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Json; using Microsoft.Azure.Documents; @@ -18,7 +20,7 @@ internal class CosmosBufferedStreamWrapper : Stream /// /// The inner stream being wrapped. /// - private readonly CloneableStream innerStream; + private readonly Stream innerStream; /// /// Indicates whether the inner stream should be disposed. @@ -41,9 +43,13 @@ internal class CosmosBufferedStreamWrapper : Stream /// The input stream to wrap. /// Indicates whether the inner stream should be disposed. public CosmosBufferedStreamWrapper( - CloneableStream inputStream, + Stream inputStream, bool shouldDisposeInnerStream) { + Debug.Assert( + inputStream is CloneableStream || inputStream is MemoryStream, + "The inner stream is neither a memory stream nor a cloneable stream."); + this.innerStream = inputStream ?? throw new ArgumentNullException(nameof(inputStream)); this.shouldDisposeInnerStream = shouldDisposeInnerStream; } @@ -93,7 +99,33 @@ public override int Read(byte[] buffer, int offset, int count) throw new ArgumentNullException(nameof(buffer)); } - return this.innerStream.Read(buffer, offset, count); + if (offset < 0 + || count < 0 + || (buffer.Length - offset) < count + || this.innerStream.Position == this.innerStream.Length) + { + return 0; + } + + int bytesRead = 0; + if (this.hasReadFirstByte + && this.innerStream.Position == 1 + && offset == 0 + && count > 0) + { + buffer[0] = this.firstByteBuffer[0]; + bytesRead = 1; + offset++; + count--; + } + + if (count > 0) + { + int innerBytesRead = this.innerStream.Read(buffer, offset, count); + bytesRead += innerBytesRead; + } + + return bytesRead; } /// @@ -114,7 +146,10 @@ protected override void Dispose(bool disposing) } else { - this.ResetStreamPosition(); + if (this.innerStream.CanSeek) + { + this.innerStream.Position = 0; + } } } @@ -129,11 +164,48 @@ protected override void Dispose(bool disposing) /// public byte[] ReadAll() { - ArraySegment byteSegment = this.innerStream.GetBuffer(); + int count, totalBytes = 0, offset = (int)this.Position, length = (int)this.Length; + byte[] bytes = new byte[length]; - return byteSegment.Array.Length == byteSegment.Count - ? byteSegment.Array - : byteSegment.ToArray(); + while ((count = this.innerStream.Read(bytes, offset, length - offset)) > 0) + { + offset += count; + totalBytes += count; + } + + if (this.hasReadFirstByte) + { + bytes[0] = this.firstByteBuffer[0]; + totalBytes += 1; + } + + return totalBytes > 0 ? bytes : default; + } + + /// + /// Asynchronously reads all bytes from the current position to the end of the stream. + /// + /// + /// A task that represents the asynchronous read operation. The value of the TResult parameter contains a byte array with all the bytes read from the stream, or null if no bytes were read. + /// + public async Task ReadAllAsync(CancellationToken cancellationToken = default) + { + int count, totalBytes = 0, offset = (int)this.Position, length = (int)this.Length; + byte[] bytes = new byte[length]; + + while ((count = await this.innerStream.ReadAsync(bytes, offset, length - offset, cancellationToken)) > 0) + { + offset += count; + totalBytes += count; + } + + if (this.hasReadFirstByte) + { + bytes[0] = this.firstByteBuffer[0]; + totalBytes += 1; + } + + return totalBytes > 0 ? bytes : default; } /// @@ -144,40 +216,31 @@ public byte[] ReadAll() /// public JsonSerializationFormat GetJsonSerializationFormat() { - this.ReadFirstByteAndResetStream(); - - return this.firstByteBuffer[0] switch + this.ReadFirstByte(); + if (this.firstByteBuffer[0] == (byte)JsonSerializationFormat.Binary) { - (byte)JsonSerializationFormat.Binary => JsonSerializationFormat.Binary, - (byte)JsonSerializationFormat.HybridRow => JsonSerializationFormat.HybridRow, - _ => JsonSerializationFormat.Text, - }; + return JsonSerializationFormat.Binary; + } + else + { + return this.firstByteBuffer[0] == (byte)JsonSerializationFormat.HybridRow + ? JsonSerializationFormat.HybridRow + : JsonSerializationFormat.Text; + } } /// - /// Reads the first byte from the inner stream and stores it in the buffer. It also resets the stream position to zero. + /// Reads the first byte from the inner stream and stores it in the buffer. /// /// /// This method sets the flag to true if the first byte is successfully read. /// - private void ReadFirstByteAndResetStream() + private void ReadFirstByte() { if (!this.hasReadFirstByte && this.innerStream.Read(this.firstByteBuffer, 0, 1) > 0) { this.hasReadFirstByte = true; - this.ResetStreamPosition(); - } - } - - /// - /// Resets the inner stream position to zero. - /// - private void ResetStreamPosition() - { - if (this.innerStream.CanSeek) - { - this.innerStream.Position = 0; } } } diff --git a/Microsoft.Azure.Cosmos/src/Serializer/CosmosSerializationUtil.cs b/Microsoft.Azure.Cosmos/src/Serializer/CosmosSerializationUtil.cs index f316b1c18f..6330db006d 100644 --- a/Microsoft.Azure.Cosmos/src/Serializer/CosmosSerializationUtil.cs +++ b/Microsoft.Azure.Cosmos/src/Serializer/CosmosSerializationUtil.cs @@ -50,7 +50,7 @@ internal static string GetStringWithPropertyNamingPolicy(CosmosPropertyNamingPol /// Returns true if the input stream is successfully serialized to the target format, otherwise false. internal static Stream TrySerializeStreamToTargetFormat( JsonSerializationFormat targetSerializationFormat, - CloneableStream inputStream) + Stream inputStream) { if (inputStream == null) {