Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Binary Encoding: Fixes Performance Regression #4884

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,6 @@ public override async Task<ResponseMessage> SendAsync(
((CosmosTraceDiagnostics)response.Diagnostics).Value.AddOrUpdateDatum("ExcludedRegions", request.RequestOptions.ExcludeRegions);
}

if (ConfigurationManager.IsBinaryEncodingEnabled()
&& RequestInvokerHandler.IsPointOperationSupportedForBinaryEncoding(request)
&& response.Content != null
&& response.Content is not CloneableStream)
{
response.Content = await StreamExtension.AsClonableStreamAsync(response.Content, default);
}

return response;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(
// Convert Text to Binary Stream.
streamPayload = CosmosSerializationUtil.TrySerializeStreamToTargetFormat(
targetSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
inputStream: streamPayload == null ? null : await StreamExtension.AsClonableStreamAsync(streamPayload));
inputStream: streamPayload ?? null);

ResponseMessage responseMessage = await this.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: resourceUri,
Expand All @@ -943,12 +943,11 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(

// Convert Binary Stream to Text.
if (targetResponseSerializationFormat.HasValue
&& (requestOptions == null || !requestOptions.EnableBinaryResponseOnPointOperations)
&& responseMessage?.Content is CloneableStream outputCloneableStream)
&& (requestOptions == null || !requestOptions.EnableBinaryResponseOnPointOperations))
{
responseMessage.Content = CosmosSerializationUtil.TrySerializeStreamToTargetFormat(
targetSerializationFormat: targetResponseSerializationFormat.Value,
inputStream: outputCloneableStream);
inputStream: responseMessage?.Content);
}

return responseMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
namespace Microsoft.Azure.Cosmos.Serializer
{
using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Documents;

Expand All @@ -18,7 +20,7 @@ internal class CosmosBufferedStreamWrapper : Stream
/// <summary>
/// The inner stream being wrapped.
/// </summary>
private readonly CloneableStream innerStream;
private readonly Stream innerStream;

/// <summary>
/// Indicates whether the inner stream should be disposed.
Expand All @@ -41,7 +43,7 @@ internal class CosmosBufferedStreamWrapper : Stream
/// <param name="inputStream">The input stream to wrap.</param>
/// <param name="shouldDisposeInnerStream">Indicates whether the inner stream should be disposed.</param>
public CosmosBufferedStreamWrapper(
CloneableStream inputStream,
Stream inputStream,
bool shouldDisposeInnerStream)
{
this.innerStream = inputStream ?? throw new ArgumentNullException(nameof(inputStream));
Expand Down Expand Up @@ -93,7 +95,33 @@ public override int Read(byte[] buffer, int offset, int count)
throw new ArgumentNullException(nameof(buffer));
}

return this.innerStream.Read(buffer, offset, count);
if (offset < 0
|| count < 0
|| (buffer.Length - offset) < count
|| this.innerStream.Position == this.innerStream.Length)
{
return 0;
}

int bytesRead = 0;
if (this.hasReadFirstByte
&& this.innerStream.Position == 1
&& offset == 0
&& count > 0)
{
buffer[0] = this.firstByteBuffer[0];
bytesRead = 1;
offset++;
count--;
}

if (count > 0)
{
int innerBytesRead = this.innerStream.Read(buffer, offset, count);
bytesRead += innerBytesRead;
}

return bytesRead;
}

/// <inheritdoc />
Expand Down Expand Up @@ -129,11 +157,48 @@ protected override void Dispose(bool disposing)
/// </returns>
public byte[] ReadAll()
{
ArraySegment<byte> byteSegment = this.innerStream.GetBuffer();
int count, totalBytes = 0, offset = (int)this.Position, length = (int)this.Length;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT - I admit personal opinion - but I find it unreadable when multiple variables are declared in one line.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.Length is not save when the inner stream is not buffered. And the API contract does not ensure that - it would thorw an exception if the inner stream is not buffered.

byte[] bytes = new byte[length];

while ((count = this.innerStream.Read(bytes, offset, length - offset)) > 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation here is functionally incorrect when you first use Stream.Read and subsequently Stream.ReadAll - for example you would copy the first byte again in the buffer for ReadAll - while it has already been processed in that case.

{
offset += count;
totalBytes += count;
}

if (this.hasReadFirstByte)
{
bytes[0] = this.firstByteBuffer[0];
totalBytes += 1;
}

return totalBytes > 0 ? bytes : default;
}

/// <summary>
/// Asynchronously reads all bytes from the current position to the end of the stream.
/// </summary>
/// <returns>
/// A task that represents the asynchronous read operation. The value of the TResult parameter contains a byte array with all the bytes read from the stream, or <c>null</c> if no bytes were read.
/// </returns>
public async Task<byte[]> ReadAllAsync(CancellationToken cancellationToken = default)
{
int count, totalBytes = 0, offset = (int)this.Position, length = (int)this.Length;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments in sync PAI. In general I think Stream implementations should implement the async variation and just await it in the sync overload. The perf overhead of doing it even if the asnyc overload completes synchronously is relatively small.

byte[] bytes = new byte[length];

while ((count = await this.innerStream.ReadAsync(bytes, offset, length - offset, cancellationToken)) > 0)
{
offset += count;
totalBytes += count;
}

if (this.hasReadFirstByte)
{
bytes[0] = this.firstByteBuffer[0];
totalBytes += 1;
}

return byteSegment.Array.Length == byteSegment.Count
? byteSegment.Array
: byteSegment.ToArray();
return totalBytes > 0 ? bytes : default;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ internal CosmosJsonDotNetSerializer(
/// <returns>The object representing the deserialized stream</returns>
public override T FromStream<T>(Stream stream)
{
using (stream)
using (CosmosBufferedStreamWrapper bufferedStream = new (stream, shouldDisposeInnerStream: true))
{
if (typeof(Stream).IsAssignableFrom(typeof(T)))
{
Expand All @@ -98,29 +98,25 @@ public override T FromStream<T>(Stream stream)

JsonSerializer jsonSerializer = this.GetSerializer();

if (stream is CloneableStream cloneableStream)
if (bufferedStream.GetJsonSerializationFormat() == Json.JsonSerializationFormat.Binary)
{
using (CosmosBufferedStreamWrapper bufferedStream = new (cloneableStream, shouldDisposeInnerStream: false))
{
if (bufferedStream.GetJsonSerializationFormat() == Json.JsonSerializationFormat.Binary)
{
byte[] content = bufferedStream.ReadAll();
byte[] content = bufferedStream.ReadAll();

using Json.Interop.CosmosDBToNewtonsoftReader reader = new (
jsonReader: Json.JsonReader.Create(
jsonSerializationFormat: Json.JsonSerializationFormat.Binary,
buffer: content));
using Json.Interop.CosmosDBToNewtonsoftReader reader = new (
jsonReader: Json.JsonReader.Create(
jsonSerializationFormat: Json.JsonSerializationFormat.Binary,
buffer: content));

return jsonSerializer.Deserialize<T>(reader);
}
}
return jsonSerializer.Deserialize<T>(reader);
}

using (StreamReader sr = new (stream))
else
{
using (JsonTextReader jsonTextReader = new (sr))
using (StreamReader sr = new (bufferedStream))
{
return jsonSerializer.Deserialize<T>(jsonTextReader);
using (JsonTextReader jsonTextReader = new (sr))
{
return jsonSerializer.Deserialize<T>(jsonTextReader);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ internal static string GetStringWithPropertyNamingPolicy(CosmosPropertyNamingPol
/// <returns>Returns true if the input stream is successfully serialized to the target format, otherwise false.</returns>
internal static Stream TrySerializeStreamToTargetFormat(
JsonSerializationFormat targetSerializationFormat,
CloneableStream inputStream)
Stream inputStream)
{
if (inputStream == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,30 +50,24 @@ public override T FromStream<T>(Stream stream)
return default;
}

using (stream)
using (CosmosBufferedStreamWrapper bufferedStream = new (stream, shouldDisposeInnerStream: true))
{
if (stream is Documents.CloneableStream cloneableStream)
if (bufferedStream.GetJsonSerializationFormat() == JsonSerializationFormat.Binary)
{
using (CosmosBufferedStreamWrapper bufferedStream = new (cloneableStream, shouldDisposeInnerStream: false))
byte[] content = bufferedStream.ReadAll();

if (CosmosObject.TryCreateFromBuffer(content, out CosmosObject cosmosObject))
{
return System.Text.Json.JsonSerializer.Deserialize<T>(cosmosObject.ToString(), this.jsonSerializerOptions);
}
else
{
if (bufferedStream.GetJsonSerializationFormat() == JsonSerializationFormat.Binary)
{
byte[] content = bufferedStream.ReadAll();

if (CosmosObject.TryCreateFromBuffer(content, out CosmosObject cosmosObject))
{
return System.Text.Json.JsonSerializer.Deserialize<T>(cosmosObject.ToString(), this.jsonSerializerOptions);
}
else
{
using Stream textStream = CosmosSerializationUtil.ConvertToStreamUsingJsonSerializationFormat(content, JsonSerializationFormat.Text);
return this.DeserializeStream<T>(textStream);
}
}
using Stream textStream = CosmosSerializationUtil.ConvertToStreamUsingJsonSerializationFormat(content, JsonSerializationFormat.Text);
return this.DeserializeStream<T>(textStream);
}
}

return this.DeserializeStream<T>(stream);
return this.DeserializeStream<T>(bufferedStream);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,23 @@ public async Task TestWriteAndRead()
}
}

internal class NonSeekableMemoryStream : Stream
internal class NonSeekableMemoryStream : Stream, IDisposable
{
private readonly byte[] buffer;
private int position;
private bool writable; // Can user write to this stream?
private bool isOpen; // Is this stream open or closed?

public NonSeekableMemoryStream(byte[] data)
{
this.buffer = data;
this.isOpen = true;
this.writable = false;
}

public override bool CanRead => true;
public override bool CanRead => this.isOpen;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override bool CanWrite => this.writable;

public override long Length => this.buffer.Length;

Expand Down Expand Up @@ -225,6 +229,18 @@ public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
this.isOpen = false;
this.writable = false;
this.Flush();
}

base.Dispose(disposing);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ await VerifyItemOperations(
[TestMethod]
[DataRow(true, DisplayName = "Test scenario when binary encoding is enabled at client level.")]
[DataRow(false, DisplayName = "Test scenario when binary encoding is disabled at client level.")]
public async Task CreateItemAsync_WithNonSeekableStream_ShouldConvertToClonnableStream(bool binaryEncodingEnabledInClient)
public async Task CreateItemAsync_WithNonSeekableStream_ShouldCreateSuccessfully(
bool binaryEncodingEnabledInClient)
{
try
{
Expand Down Expand Up @@ -183,7 +184,6 @@ public async Task CreateItemAsync_WithNonSeekableStream_ShouldConvertToClonnable
Assert.AreEqual(options, request.RequestOptions);
Assert.AreEqual(ResourceType.Document, request.ResourceType);
Assert.IsNotNull(request.Headers.PartitionKey);
// Assert.AreEqual("\"[4567.1234]\"", request.Headers.PartitionKey);
testHandlerHitCount++;

bool shouldReturnBinaryResponse = request.Headers[HttpConstants.HttpHeaders.SupportedSerializationFormats] != null
Expand Down