Skip to content

Commit

Permalink
Code changes to check perf regression fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
kundadebdatta committed Nov 11, 2024
1 parent 9bb0764 commit 7230e75
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 42 deletions.
14 changes: 7 additions & 7 deletions Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ public override async Task<ResponseMessage> SendAsync(
((CosmosTraceDiagnostics)response.Diagnostics).Value.AddOrUpdateDatum("ExcludedRegions", request.RequestOptions.ExcludeRegions);
}

if (ConfigurationManager.IsBinaryEncodingEnabled()
&& RequestInvokerHandler.IsPointOperationSupportedForBinaryEncoding(request)
&& response.Content != null
&& response.Content is not CloneableStream)
{
response.Content = await StreamExtension.AsClonableStreamAsync(response.Content, default);
}
//if (ConfigurationManager.IsBinaryEncodingEnabled()
// && RequestInvokerHandler.IsPointOperationSupportedForBinaryEncoding(request)
// && response.Content != null
// && response.Content is not CloneableStream)
//{
// response.Content = await StreamExtension.AsClonableStreamAsync(response.Content, default);
//}

return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(
// Convert Text to Binary Stream.
streamPayload = CosmosSerializationUtil.TrySerializeStreamToTargetFormat(
targetSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
inputStream: streamPayload == null ? null : await StreamExtension.AsClonableStreamAsync(streamPayload));
inputStream: streamPayload ?? null);

ResponseMessage responseMessage = await this.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: resourceUri,
Expand All @@ -943,12 +943,11 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(

// Convert Binary Stream to Text.
if (targetResponseSerializationFormat.HasValue
&& (requestOptions == null || !requestOptions.EnableBinaryResponseOnPointOperations)
&& responseMessage?.Content is CloneableStream outputCloneableStream)
&& (requestOptions == null || !requestOptions.EnableBinaryResponseOnPointOperations))
{
responseMessage.Content = CosmosSerializationUtil.TrySerializeStreamToTargetFormat(
targetSerializationFormat: targetResponseSerializationFormat.Value,
inputStream: outputCloneableStream);
inputStream: responseMessage?.Content);
}

return responseMessage;
Expand Down
123 changes: 93 additions & 30 deletions Microsoft.Azure.Cosmos/src/Serializer/CosmosBufferedStreamWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
namespace Microsoft.Azure.Cosmos.Serializer
{
using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Documents;

Expand All @@ -18,7 +20,7 @@ internal class CosmosBufferedStreamWrapper : Stream
/// <summary>
/// The inner stream being wrapped.
/// </summary>
private readonly CloneableStream innerStream;
private readonly Stream innerStream;

/// <summary>
/// Indicates whether the inner stream should be disposed.
Expand All @@ -41,9 +43,13 @@ internal class CosmosBufferedStreamWrapper : Stream
/// <param name="inputStream">The input stream to wrap.</param>
/// <param name="shouldDisposeInnerStream">Indicates whether the inner stream should be disposed.</param>
public CosmosBufferedStreamWrapper(
CloneableStream inputStream,
Stream inputStream,
bool shouldDisposeInnerStream)
{
Debug.Assert(
inputStream is CloneableStream || inputStream is MemoryStream,
"The inner stream is neither a memory stream nor a cloneable stream.");

this.innerStream = inputStream ?? throw new ArgumentNullException(nameof(inputStream));
this.shouldDisposeInnerStream = shouldDisposeInnerStream;
}
Expand Down Expand Up @@ -93,7 +99,33 @@ public override int Read(byte[] buffer, int offset, int count)
throw new ArgumentNullException(nameof(buffer));
}

return this.innerStream.Read(buffer, offset, count);
if (offset < 0
|| count < 0
|| (buffer.Length - offset) < count
|| this.innerStream.Position == this.innerStream.Length)
{
return 0;
}

int bytesRead = 0;
if (this.hasReadFirstByte
&& this.innerStream.Position == 1
&& offset == 0
&& count > 0)
{
buffer[0] = this.firstByteBuffer[0];
bytesRead = 1;
offset++;
count--;
}

if (count > 0)
{
int innerBytesRead = this.innerStream.Read(buffer, offset, count);
bytesRead += innerBytesRead;
}

return bytesRead;
}

/// <inheritdoc />
Expand All @@ -114,7 +146,10 @@ protected override void Dispose(bool disposing)
}
else
{
this.ResetStreamPosition();
if (this.innerStream.CanSeek)
{
this.innerStream.Position = 0;
}
}
}

Expand All @@ -129,11 +164,48 @@ protected override void Dispose(bool disposing)
/// </returns>
public byte[] ReadAll()
{
ArraySegment<byte> byteSegment = this.innerStream.GetBuffer();
int count, totalBytes = 0, offset = (int)this.Position, length = (int)this.Length;
byte[] bytes = new byte[length];

return byteSegment.Array.Length == byteSegment.Count
? byteSegment.Array
: byteSegment.ToArray();
while ((count = this.innerStream.Read(bytes, offset, length - offset)) > 0)
{
offset += count;
totalBytes += count;
}

if (this.hasReadFirstByte)
{
bytes[0] = this.firstByteBuffer[0];
totalBytes += 1;
}

return totalBytes > 0 ? bytes : default;
}

/// <summary>
/// Asynchronously reads all bytes from the current position to the end of the stream.
/// </summary>
/// <returns>
/// A task that represents the asynchronous read operation. The value of the TResult parameter contains a byte array with all the bytes read from the stream, or <c>null</c> if no bytes were read.
/// </returns>
public async Task<byte[]> ReadAllAsync(CancellationToken cancellationToken = default)
{
int count, totalBytes = 0, offset = (int)this.Position, length = (int)this.Length;
byte[] bytes = new byte[length];

while ((count = await this.innerStream.ReadAsync(bytes, offset, length - offset, cancellationToken)) > 0)
{
offset += count;
totalBytes += count;
}

if (this.hasReadFirstByte)
{
bytes[0] = this.firstByteBuffer[0];
totalBytes += 1;
}

return totalBytes > 0 ? bytes : default;
}

/// <summary>
Expand All @@ -144,40 +216,31 @@ public byte[] ReadAll()
/// </returns>
public JsonSerializationFormat GetJsonSerializationFormat()
{
this.ReadFirstByteAndResetStream();

return this.firstByteBuffer[0] switch
this.ReadFirstByte();
if (this.firstByteBuffer[0] == (byte)JsonSerializationFormat.Binary)
{
(byte)JsonSerializationFormat.Binary => JsonSerializationFormat.Binary,
(byte)JsonSerializationFormat.HybridRow => JsonSerializationFormat.HybridRow,
_ => JsonSerializationFormat.Text,
};
return JsonSerializationFormat.Binary;
}
else
{
return this.firstByteBuffer[0] == (byte)JsonSerializationFormat.HybridRow
? JsonSerializationFormat.HybridRow
: JsonSerializationFormat.Text;
}
}

/// <summary>
/// Reads the first byte from the inner stream and stores it in the buffer. It also resets the stream position to zero.
/// Reads the first byte from the inner stream and stores it in the buffer.
/// </summary>
/// <remarks>
/// This method sets the <see cref="hasReadFirstByte"/> flag to true if the first byte is successfully read.
/// </remarks>
private void ReadFirstByteAndResetStream()
private void ReadFirstByte()
{
if (!this.hasReadFirstByte
&& this.innerStream.Read(this.firstByteBuffer, 0, 1) > 0)
{
this.hasReadFirstByte = true;
this.ResetStreamPosition();
}
}

/// <summary>
/// Resets the inner stream position to zero.
/// </summary>
private void ResetStreamPosition()
{
if (this.innerStream.CanSeek)
{
this.innerStream.Position = 0;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ internal static string GetStringWithPropertyNamingPolicy(CosmosPropertyNamingPol
/// <returns>Returns true if the input stream is successfully serialized to the target format, otherwise false.</returns>
internal static Stream TrySerializeStreamToTargetFormat(
JsonSerializationFormat targetSerializationFormat,
CloneableStream inputStream)
Stream inputStream)
{
if (inputStream == null)
{
Expand Down

0 comments on commit 7230e75

Please sign in to comment.