Skip to content

Commit

Permalink
SNOW-893835 Arrow - support for multi-chunk results (#771)
Browse files Browse the repository at this point in the history
### Description
Arrow - support for multi-chunk results.

### Checklist
- [x] Code compiles correctly
- [x] Code is formatted according to [Coding
Conventions](../CodingConventions.md)
- [x] Created tests which fail without the change (if possible)
- [x] All tests passing (`dotnet test`)
- [x] Extended the README / documentation, if necessary
- [x] Provide JIRA issue id (if possible) or GitHub issue id in PR name
  • Loading branch information
sfc-gh-dstempniak authored Sep 13, 2023
1 parent 4609451 commit 78ddc27
Show file tree
Hide file tree
Showing 29 changed files with 844 additions and 546 deletions.
43 changes: 37 additions & 6 deletions Snowflake.Data.Tests/IntegrationTests/SFDbCommandIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,15 @@ public void TestSimpleLargeResultSet()

IDbCommand cmd = conn.CreateCommand();
cmd.CommandText = "select seq4(), uniform(1, 10, 42) from table(generator(rowcount => 1000000)) v order by 1";
IDataReader reader = cmd.ExecuteReader();
int counter = 0;
while (reader.Read())
using (IDataReader reader = cmd.ExecuteReader())
{
Assert.AreEqual(counter.ToString(), reader.GetString(0));
counter++;
int counter = 0;
while (reader.Read())
{
Assert.AreEqual(counter.ToString(), reader.GetString(0));
// don't test the second column as it has random values just to increase the response size
counter++;
}
}
conn.Close();
}
Expand Down Expand Up @@ -273,6 +276,7 @@ public void TestUseV1ResultParser()
while (reader.Read())
{
Assert.AreEqual(counter.ToString(), reader.GetString(0));
// don't test the second column as it has random values just to increase the response size
counter++;
}
conn.Close();
Expand Down Expand Up @@ -302,6 +306,7 @@ public void TestUseV2ChunkDownloader()
while (reader.Read())
{
Assert.AreEqual(counter.ToString(), reader.GetString(0));
// don't test the second column as it has random values just to increase the response size
counter++;
}
conn.Close();
Expand All @@ -310,6 +315,33 @@ public void TestUseV2ChunkDownloader()
SFConfiguration.Instance().ChunkDownloaderVersion = chunkDownloaderVersion;
}

[Test]
[Parallelizable(ParallelScope.Children)]
public void TestDefaultChunkDownloaderWithPrefetchThreads([Values(1, 2, 4)] int prefetchThreads)
{
using (SnowflakeDbConnection conn = new SnowflakeDbConnection(ConnectionString))
{
conn.Open();

IDbCommand cmd = conn.CreateCommand();
cmd.CommandText = $"alter session set CLIENT_PREFETCH_THREADS = {prefetchThreads}";
cmd.ExecuteNonQuery();

// 200000 - empirical value to return 3 additional chunks for both JSON and Arrow response
cmd.CommandText = "select seq4(), uniform(1, 10, 42) from table(generator(rowcount => 200000)) v order by 1";

IDataReader reader = cmd.ExecuteReader();
int counter = 0;
while (reader.Read())
{
Assert.AreEqual(counter.ToString(), reader.GetString(0));
// don't test the second column as it has random values just to increase the response size
counter++;
}
conn.Close();
}
}

[Test]
public void TestDataSourceError()
{
Expand Down Expand Up @@ -517,7 +549,6 @@ public void TestRowsAffected()
Assert.AreEqual(expectedResult[i], rowsAffected);
}
}
conn.Close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public TestChunkParserFactory(int exceptionsToThrow)
_exceptionsThrown = 0;
}

public IChunkParser GetParser(Stream stream)
public IChunkParser GetParser(ResultFormat resultFormat, Stream stream)
{
if (++_exceptionsThrown <= _expectedExceptionsNumber)
return new ThrowingReusableChunkParser();
Expand Down
64 changes: 64 additions & 0 deletions Snowflake.Data.Tests/UnitTests/ArrowChunkParserTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2023 Snowflake Computing Inc. All rights reserved.
*/

using System.Linq;
using Apache.Arrow;
using Apache.Arrow.Ipc;

namespace Snowflake.Data.Tests.UnitTests
{
using NUnit.Framework;
using Snowflake.Data.Client;
using Snowflake.Data.Configuration;
using Snowflake.Data.Core;
using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;

[TestFixture, NonParallelizable]
class ArrowChunkParserTest
{
[Test]
[Ignore("ArrowChunkParserTest")]
public void ArrowChunkParserTestDone()
{
// Do nothing - test progress marker
}

[Test]
public void TestParseChunkReadsRecordBatches([Values(1, 2, 4)] int numberOfRecordBatch)
{
// Arrange
MemoryStream stream = new MemoryStream();

for (var i = 0; i < numberOfRecordBatch; i++)
{
var numberOfRecordsInBatch = 10 * i;
var recordBatch = new RecordBatch.Builder()
.Append("Col_Int32", false, col => col.Int32(array => array.AppendRange(Enumerable.Range(1, numberOfRecordsInBatch))))
.Build();

ArrowStreamWriter writer = new ArrowStreamWriter(stream, recordBatch.Schema, true);
writer.WriteRecordBatch(recordBatch);
}
stream.Position = 0;

var parser = new ArrowChunkParser(stream);

// Act
var chunk = new ArrowResultChunk(1);
var task = parser.ParseChunk(chunk);
task.Wait();

// Assert
Assert.AreEqual(numberOfRecordBatch, chunk.RecordBatch.Count);
for (var i = 0; i < numberOfRecordBatch; i++)
{
var numberOfRecordsInBatch = 10 * i;
Assert.AreEqual(numberOfRecordsInBatch, chunk.RecordBatch[i].Length);
}
}
}
}
140 changes: 125 additions & 15 deletions Snowflake.Data.Tests/UnitTests/ArrowResultChunkTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,52 +13,162 @@ namespace Snowflake.Data.Tests.UnitTests
[TestFixture]
class ArrowResultChunkTest
{
private const int RowCount = 10;
private RecordBatch _recordBatch;
private const int RowCountBatchOne = 10;
private const int RowCountBatchTwo = 20;
private readonly RecordBatch _recordBatchOne = new RecordBatch.Builder()
.Append("Col_Int32", false, col => col.Int32(
array => array.AppendRange(Enumerable.Range(1, RowCountBatchOne))))
.Build();
private readonly RecordBatch _recordBatchTwo = new RecordBatch.Builder()
.Append("Col_Int32", false, col => col.Int32(
array => array.AppendRange(Enumerable.Range(1, RowCountBatchTwo))))
.Build();
private ArrowResultChunk _chunk;


[Test]
[Ignore("ArrowResultChunkTest")]
public void SFArrowResultChunkTestDone()
{
// Do nothing - test progress marker
}

[SetUp]
public void BeforeTest()
[Test]
public void TestAddRecordBatchAddsBatchTwo()
{
_chunk = new ArrowResultChunk(_recordBatchOne);
_chunk.AddRecordBatch(_recordBatchTwo);

Assert.AreEqual(2, _chunk.RecordBatch.Count);
}

[Test]
public void TestNextIteratesThroughAllRecordsOfOneBatch()
{
_recordBatch = new RecordBatch.Builder()
.Append("Col_Int32", false, col => col.Int32(array => array.AppendRange(Enumerable.Range(1, RowCount))))
.Build();
_chunk = new ArrowResultChunk(_recordBatch);
_chunk = new ArrowResultChunk(_recordBatchOne);

for (var i = 0; i < RowCountBatchOne; ++i)
{
Assert.IsTrue(_chunk.Next());
}
Assert.IsFalse(_chunk.Next());
}

[Test]
public void TestNextIteratesThroughAllRecordsOfTwoBatches()
{
_chunk = new ArrowResultChunk(_recordBatchOne);
_chunk.AddRecordBatch(_recordBatchTwo);

for (var i = 0; i < RowCountBatchOne + RowCountBatchTwo; ++i)
{
Assert.IsTrue(_chunk.Next());
}
Assert.IsFalse(_chunk.Next());
}

[Test]
public void TestRewindIteratesThroughAllRecordsOfBatchOne()
{
_chunk = new ArrowResultChunk(_recordBatchOne);

// move to the end of the batch
while (_chunk.Next()) {}

for (var i = 0; i < RowCountBatchOne; ++i)
{
Assert.IsTrue(_chunk.Rewind());
}
Assert.IsFalse(_chunk.Rewind());
}

[Test]
public void TestRewindIteratesThroughAllRecordsOfTwoBatches()
{
_chunk = new ArrowResultChunk(_recordBatchOne);
_chunk.AddRecordBatch(_recordBatchTwo);

// move to the end of the batch
while (_chunk.Next()) {}

for (var i = 0; i < RowCountBatchOne + RowCountBatchTwo; ++i)
{
Assert.IsTrue(_chunk.Rewind());
}
Assert.IsFalse(_chunk.Rewind());
}

[Test]
public void TestResetClearsChunkData()
{
ExecResponseChunk chunkInfo = new ExecResponseChunk()
{
url = "new_url",
uncompressedSize = 100,
rowCount = 2
};
_chunk = new ArrowResultChunk(_recordBatchOne);

_chunk.Reset(chunkInfo, 0);

Assert.AreEqual(0, _chunk.ChunkIndex);
Assert.AreEqual(chunkInfo.url, _chunk.Url);
Assert.AreEqual(chunkInfo.rowCount, _chunk.RowCount);
}

[Test]
public void TestExtractCellWithRowParameterReadsAllRows()
{
_chunk = new ArrowResultChunk(_recordBatchOne);

var column = (Int32Array)_recordBatchOne.Column(0);
for (var i = 0; i < RowCountBatchOne; ++i)
{
var valueFromRecordBatch = column.GetValue(i).ToString();
Assert.AreEqual(valueFromRecordBatch, _chunk.ExtractCell(i, 0).SafeToString());
}
}

[Test]
public void TestExtractCellReadsAllRows()
{
var column = (Int32Array)_recordBatch.Column(0);
for (var i = 0; i < RowCount; ++i)
_chunk = new ArrowResultChunk(_recordBatchOne);

var column = (Int32Array)_recordBatchOne.Column(0);
for (var i = 0; i < RowCountBatchOne; ++i)
{
Assert.AreEqual(column.GetValue(i).ToString(), _chunk.ExtractCell(i, 0).SafeToString());
var valueFromRecordBatch = column.GetValue(i).ToString();

_chunk.Next();
Assert.AreEqual(valueFromRecordBatch, _chunk.ExtractCell(0).SafeToString());
}
}

[Test]
public void TestExtractCellThrowsOutOfRangeException()
{
Assert.Throws<ArgumentOutOfRangeException>(() => _chunk.ExtractCell(RowCount, 0).SafeToString());
_chunk = new ArrowResultChunk(_recordBatchOne);

// move to the end of the batch
while (_chunk.Next()) {}

Assert.Throws<ArgumentOutOfRangeException>(() => _chunk.ExtractCell(0).SafeToString());
}

[Test]
public void TestGetRowCountReturnsNumberOfRows()
public void TestRowCountReturnsNumberOfRows()
{
Assert.AreEqual(RowCount, _chunk.GetRowCount());
_chunk = new ArrowResultChunk(_recordBatchOne);

Assert.AreEqual(RowCountBatchOne, _chunk.RowCount);
}

[Test]
public void TestGetChunkIndexReturnsFirstChunk()
{
Assert.AreEqual(0, _chunk.GetChunkIndex());
_chunk = new ArrowResultChunk(_recordBatchOne);

Assert.AreEqual(0, _chunk.ChunkIndex);
}

}
Expand Down
17 changes: 13 additions & 4 deletions Snowflake.Data.Tests/UnitTests/ArrowResultSetTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,25 +92,34 @@ public void TestHasRowsReturnsFalseIfNoRows()
Assert.IsFalse(_arrowResultSet.HasRows());
}

[Test]
public void TestRewindReturnsFalseBeforeFirstRow()
{
Assert.IsFalse(_arrowResultSet.Rewind());
}

[Test]
public void TestRewindReturnsFalseForFirstRow()
{
_arrowResultSet.Next(); // move to first row
Assert.IsFalse(_arrowResultSet.Rewind());
}

[Test]
public void TestRewindReturnsTrueForSecondRowAndMovesToFirstRow()
{
_arrowResultSet.Next();
_arrowResultSet.Next(); // move to first row
_arrowResultSet.Next(); // move to second row
Assert.IsTrue(_arrowResultSet.Rewind());
Assert.IsFalse(_arrowResultSet.Rewind());
}

[Test]
public void TestRewindReturnsTrueForThirdRowAndMovesToFirstRow()
{
_arrowResultSet.Next();
_arrowResultSet.Next();
_arrowResultSet.Next(); // move to first row
_arrowResultSet.Next(); // move to second row
_arrowResultSet.Next(); // move to third row
Assert.IsTrue(_arrowResultSet.Rewind());
Assert.IsTrue(_arrowResultSet.Rewind());
Assert.IsFalse(_arrowResultSet.Rewind());
Expand Down Expand Up @@ -140,7 +149,7 @@ private QueryExecResponseData PrepareResponseData(RecordBatch recordBatch)
type = "TEXT"
}).ToList(),
parameters = new List<NameValueParameter>(),
chunks = null, // TODO in SNOW-893835 - add tests with multiple chunks
chunks = null,
queryResultFormat = ResultFormat.ARROW,
rowsetBase64 = ConvertToBase64String(recordBatch)
};
Expand Down
Loading

0 comments on commit 78ddc27

Please sign in to comment.