Skip to content

Commit

Permalink
Implement apacheGH-34636: Reduce allocations when using ArrayPool
Browse files Browse the repository at this point in the history
Implement apacheGH-39144: Consistently pass CancellationToken through async calls.
  • Loading branch information
CurtHagenlocher committed Dec 10, 2023
1 parent 7415ce6 commit 2f5053a
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 77 deletions.
40 changes: 15 additions & 25 deletions csharp/src/Apache.Arrow/Extensions/ArrayPoolExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,36 @@
using System;
using System.Buffers;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

namespace Apache.Arrow
{
internal static class ArrayPoolExtensions
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void RentReturn(this ArrayPool<byte> pool, int length, Action<Memory<byte>> action)
public static ArrayLease RentReturn(this ArrayPool<byte> pool, int length, out Memory<byte> buffer)
{
byte[] array = null;

try
{
array = pool.Rent(length);
action(array.AsMemory(0, length));
}
finally
{
if (array != null)
{
pool.Return(array);
}
}
byte[] array = pool.Rent(length);
buffer = array.AsMemory(0, length);
return new ArrayLease(pool, array);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static async ValueTask RentReturnAsync(this ArrayPool<byte> pool, int length, Func<Memory<byte>, ValueTask> action)
internal struct ArrayLease : IDisposable
{
byte[] array = null;
private readonly ArrayPool<byte> _pool;
private byte[] _array;

try
public ArrayLease(ArrayPool<byte> pool, byte[] array)
{
array = pool.Rent(length);
await action(array.AsMemory(0, length));
_pool = pool;
_array = array;
}
finally

public void Dispose()
{
if (array != null)
if (_array != null)
{
pool.Return(array);
_pool.Return(_array);
_array = null;
}
}
}
Expand Down
46 changes: 23 additions & 23 deletions csharp/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,47 +42,47 @@ public ArrowFileReaderImplementation(Stream stream, MemoryAllocator allocator, I
{
}

public async ValueTask<int> RecordBatchCountAsync()
public async ValueTask<int> RecordBatchCountAsync(CancellationToken cancellationToken = default)
{
if (!HasReadSchema)
{
await ReadSchemaAsync().ConfigureAwait(false);
await ReadSchemaAsync(cancellationToken).ConfigureAwait(false);
}

return _footer.RecordBatchCount;
}

protected override async ValueTask ReadSchemaAsync()
protected override async ValueTask ReadSchemaAsync(CancellationToken cancellationToken = default)
{
if (HasReadSchema)
{
return;
}

await ValidateFileAsync().ConfigureAwait(false);
await ValidateFileAsync(cancellationToken).ConfigureAwait(false);

int footerLength = 0;
await ArrayPool<byte>.Shared.RentReturnAsync(4, async (buffer) =>
using (ArrayPool<byte>.Shared.RentReturn(4, out Memory<byte> buffer))
{
BaseStream.Position = GetFooterLengthPosition();

int bytesRead = await BaseStream.ReadFullBufferAsync(buffer).ConfigureAwait(false);
int bytesRead = await BaseStream.ReadFullBufferAsync(buffer, cancellationToken).ConfigureAwait(false);
EnsureFullRead(buffer, bytesRead);

footerLength = ReadFooterLength(buffer);
}).ConfigureAwait(false);
}

await ArrayPool<byte>.Shared.RentReturnAsync(footerLength, async (buffer) =>
using (ArrayPool<byte>.Shared.RentReturn(footerLength, out Memory<byte> buffer))
{
long footerStartPosition = GetFooterLengthPosition() - footerLength;

BaseStream.Position = footerStartPosition;

int bytesRead = await BaseStream.ReadFullBufferAsync(buffer).ConfigureAwait(false);
int bytesRead = await BaseStream.ReadFullBufferAsync(buffer, cancellationToken).ConfigureAwait(false);
EnsureFullRead(buffer, bytesRead);

ReadSchema(buffer);
}).ConfigureAwait(false);
}
}

protected override void ReadSchema()
Expand All @@ -95,17 +95,17 @@ protected override void ReadSchema()
ValidateFile();

int footerLength = 0;
ArrayPool<byte>.Shared.RentReturn(4, (buffer) =>
using (ArrayPool<byte>.Shared.RentReturn(4, out Memory<byte> buffer))
{
BaseStream.Position = GetFooterLengthPosition();

int bytesRead = BaseStream.ReadFullBuffer(buffer);
EnsureFullRead(buffer, bytesRead);

footerLength = ReadFooterLength(buffer);
});
}

ArrayPool<byte>.Shared.RentReturn(footerLength, (buffer) =>
using (ArrayPool<byte>.Shared.RentReturn(footerLength, out Memory<byte> buffer))
{
long footerStartPosition = GetFooterLengthPosition() - footerLength;

Expand All @@ -115,7 +115,7 @@ protected override void ReadSchema()
EnsureFullRead(buffer, bytesRead);

ReadSchema(buffer);
});
}
}

private long GetFooterLengthPosition()
Expand Down Expand Up @@ -239,14 +239,14 @@ private void ReadDictionaries()
/// <summary>
/// Check if file format is valid. If it's valid don't run the validation again.
/// </summary>
private async ValueTask ValidateFileAsync()
private async ValueTask ValidateFileAsync(CancellationToken cancellationToken = default)
{
if (IsFileValid)
{
return;
}

await ValidateMagicAsync().ConfigureAwait(false);
await ValidateMagicAsync(cancellationToken).ConfigureAwait(false);

IsFileValid = true;
}
Expand All @@ -266,31 +266,31 @@ private void ValidateFile()
IsFileValid = true;
}

private async ValueTask ValidateMagicAsync()
private async ValueTask ValidateMagicAsync(CancellationToken cancellationToken = default)
{
long startingPosition = BaseStream.Position;
int magicLength = ArrowFileConstants.Magic.Length;

try
{
await ArrayPool<byte>.Shared.RentReturnAsync(magicLength, async (buffer) =>
using (ArrayPool<byte>.Shared.RentReturn(magicLength, out Memory<byte> buffer))
{
// Seek to the beginning of the stream
BaseStream.Position = 0;

// Read beginning of stream
await BaseStream.ReadAsync(buffer).ConfigureAwait(false);
await BaseStream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);

VerifyMagic(buffer);

// Move stream position to magic-length bytes away from the end of the stream
BaseStream.Position = BaseStream.Length - magicLength;

// Read the end of the stream
await BaseStream.ReadAsync(buffer).ConfigureAwait(false);
await BaseStream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);

VerifyMagic(buffer);
}).ConfigureAwait(false);
}
}
finally
{
Expand All @@ -305,7 +305,7 @@ private void ValidateMagic()

try
{
ArrayPool<byte>.Shared.RentReturn(magicLength, buffer =>
using (ArrayPool<byte>.Shared.RentReturn(magicLength, out Memory<byte> buffer))
{
// Seek to the beginning of the stream
BaseStream.Position = 0;
Expand All @@ -322,7 +322,7 @@ private void ValidateMagic()
BaseStream.Read(buffer);

VerifyMagic(buffer);
});
}
}
finally
{
Expand Down
8 changes: 4 additions & 4 deletions csharp/src/Apache.Arrow/Ipc/ArrowFileWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ private void WriteFooter(Schema schema)

// Write footer length

Buffers.RentReturn(4, (buffer) =>
using (Buffers.RentReturn(4, out Memory<byte> buffer))
{
int footerLength;
checked
Expand All @@ -226,7 +226,7 @@ private void WriteFooter(Schema schema)
BinaryPrimitives.WriteInt32LittleEndian(buffer.Span, footerLength);

BaseStream.Write(buffer);
});
}

// Write magic

Expand Down Expand Up @@ -286,7 +286,7 @@ private async Task WriteFooterAsync(Schema schema, CancellationToken cancellatio

cancellationToken.ThrowIfCancellationRequested();

await Buffers.RentReturnAsync(4, async (buffer) =>
using (Buffers.RentReturn(4, out Memory<byte> buffer))
{
int footerLength;
checked
Expand All @@ -297,7 +297,7 @@ await Buffers.RentReturnAsync(4, async (buffer) =>
BinaryPrimitives.WriteInt32LittleEndian(buffer.Span, footerLength);

await BaseStream.WriteAsync(buffer, cancellationToken).ConfigureAwait(false);
}).ConfigureAwait(false);
}

// Write magic

Expand Down
Loading

0 comments on commit 2f5053a

Please sign in to comment.