Skip to content

Commit

Permalink
Further optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-pbulawa committed Dec 18, 2023
1 parent d62389c commit d2e075b
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 41 deletions.
103 changes: 67 additions & 36 deletions Snowflake.Data/Core/ArrowResultChunk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@
using System;
using System.Collections.Generic;
using Apache.Arrow;
using Snowflake.Data.Log;

namespace Snowflake.Data.Core
{
internal class ArrowResultChunk : BaseResultChunk
{
private static readonly SFLogger s_logger = SFLoggerFactory.GetLogger<ArrowResultChunk>();

internal override ResultFormat ResultFormat => ResultFormat.ARROW;

private static readonly DateTimeOffset s_epochDate = SFDataConverter.UnixEpoch;
Expand All @@ -37,20 +34,22 @@ internal class ArrowResultChunk : BaseResultChunk
private sbyte[][] _sbyte;
private short[][] _short;
private int[][] _int;
private int[][] _fraction;
private long[][] _long;

private byte[][] _byte;
private double[][] _double;


private int _currentBatchIndex = 0;

private int _currentBatchIndex;
private int _currentRecordIndex = -1;

private void ResetTempTables()
{
_sbyte = new sbyte[ColumnCount][];
_short = new short[ColumnCount][];
_int = new int[ColumnCount][];
_fraction = new int[ColumnCount][];
_long = new long[ColumnCount][];
_byte = new byte[ColumnCount][];
_double = new double[ColumnCount][];
Expand Down Expand Up @@ -139,19 +138,12 @@ public override UTF8Buffer ExtractCell(int rowIndex, int columnIndex)
throw new NotSupportedException();
}

[Obsolete("ExtractCell with columnIndex is deprecated", false)]
public override UTF8Buffer ExtractCell(int columnIndex)

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Linux (net6.0, AZURE)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on MAC (net6.0, AZURE)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Linux (net6.0, AWS)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on MAC (net6.0, GCP)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on MAC (net6.0, AWS)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Linux (net6.0, GCP)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net6.0, AWS)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net6.0, AZURE)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net472, AZURE)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net472, AWS)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net471, AWS)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net6.0, GCP)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net471, AZURE)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net471, GCP)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net472, GCP)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Linux (net6.0, AZURE)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on MAC (net6.0, AZURE)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Linux (net6.0, AWS)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Linux (net6.0, GCP)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net6.0, AWS)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on MAC (net6.0, AWS)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on MAC (net6.0, GCP)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net6.0, AZURE)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net6.0, GCP)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net471, AZURE)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net472, GCP)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net471, GCP)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net471, AWS)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net472, AWS)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'

Check warning on line 142 in Snowflake.Data/Core/ArrowResultChunk.cs

View workflow job for this annotation

GitHub Actions / Tests on Windows (net472, AZURE)

Obsolete member 'ArrowResultChunk.ExtractCell(int)' overrides non-obsolete member 'BaseResultChunk.ExtractCell(int)'
{
throw new NotSupportedException();
}

public string GetMetadata()
{
if (RecordBatch.Count == 0)
return $"No data";

return $"batches: {RecordBatch.Count}";
}

public object ExtractCell(int columnIndex, SFDataType srcType, long scale)
{
var column = RecordBatch[_currentBatchIndex].Column(columnIndex);
Expand All @@ -171,32 +163,28 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)
_sbyte[columnIndex] = array.Values.ToArray();
if (scale == 0)
return _sbyte[columnIndex][_currentRecordIndex];
else
return _sbyte[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale];
return _sbyte[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale];

case Int16Array array:
if (_short[columnIndex] == null)
_short[columnIndex] = array.Values.ToArray();
if (scale == 0)
return _short[columnIndex][_currentRecordIndex];
else
return _short[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale];
return _short[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale];

case Int32Array array:
if (_int[columnIndex] == null)
_int[columnIndex] = array.Values.ToArray();
if (scale == 0)
return _int[columnIndex][_currentRecordIndex];
else
return _int[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale];
return _int[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale];

case Int64Array array:
if (_long[columnIndex] == null)
_long[columnIndex] = array.Values.ToArray();
if (scale == 0)
return _long[columnIndex][_currentRecordIndex];
else
return _long[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale];
return _long[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale];

case Decimal128Array array:
return array.GetValue(_currentRecordIndex);
Expand Down Expand Up @@ -237,9 +225,27 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)

case SFDataType.TIME:
{
var value = column.GetType() == typeof(Int32Array)
? ((Int32Array)column).Values[_currentRecordIndex]
: ((Int64Array)column).Values[_currentRecordIndex];
long value;

if (column.GetType() == typeof(Int32Array))
{
if (_int[columnIndex] == null)
{
_int[columnIndex] = ((Int32Array)column).Values.ToArray();
}

value = _int[columnIndex][_currentRecordIndex];
}
else
{
if (_long[columnIndex] == null)
{
_long[columnIndex] = ((Int64Array)column).Values.ToArray();
}

value = _long[columnIndex][_currentRecordIndex];
}

if (scale == 0)
return DateTimeOffset.FromUnixTimeSeconds(value).DateTime;
if (scale <= 3)
Expand All @@ -250,32 +256,50 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)
return s_epochDate.AddTicks(value / s_powersOf10[scale - 7]).DateTime;
}
case SFDataType.TIMESTAMP_TZ:
if (((StructArray)column).Fields.Count == 2)
var structCol = (StructArray)column;
if (_long[columnIndex] == null)
_long[columnIndex] = ((Int64Array)structCol.Fields[0]).Values.ToArray();

if (structCol.Fields.Count == 2)
{
var value = ((Int64Array)((StructArray)column).Fields[0]).Values[_currentRecordIndex];
var timezone = ((Int32Array)((StructArray)column).Fields[1]).Values[_currentRecordIndex];
if (_int[columnIndex] == null)
_int[columnIndex] = ((Int32Array)structCol.Fields[1]).Values.ToArray();
var value = _long[columnIndex][_currentRecordIndex];
var timezone = _int[columnIndex][_currentRecordIndex];
var epoch = ExtractEpoch(value, scale);
var fraction = ExtractFraction(value, scale);
return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToOffset(TimeSpan.FromMinutes(timezone - 1440));
}
else
{
var epoch = ((Int64Array)((StructArray)column).Fields[0]).Values[_currentRecordIndex];
var fraction = ((Int32Array)((StructArray)column).Fields[1]).Values[_currentRecordIndex];
var timezone = ((Int32Array)((StructArray)column).Fields[2]).Values[_currentRecordIndex];
if (_fraction[columnIndex] == null)
_fraction[columnIndex] = ((Int32Array)structCol.Fields[1]).Values.ToArray();
if (_int[columnIndex] == null)
_int[columnIndex] = ((Int32Array)structCol.Fields[2]).Values.ToArray();

var epoch = _long[columnIndex][_currentRecordIndex];
var fraction = _fraction[columnIndex][_currentRecordIndex];
var timezone = _int[columnIndex][_currentRecordIndex];
return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToOffset(TimeSpan.FromMinutes(timezone - 1440));
}

case SFDataType.TIMESTAMP_LTZ:
if (column.GetType() == typeof(StructArray))
{
var epoch = ((Int64Array)((StructArray)column).Fields[0]).Values[_currentRecordIndex];
var fraction = ((Int32Array)((StructArray)column).Fields[1]).Values[_currentRecordIndex];
if (_long[columnIndex] == null)
_long[columnIndex] = ((Int64Array)((StructArray)column).Fields[0]).Values.ToArray();
if (_fraction[columnIndex] == null)
_fraction[columnIndex] = ((Int32Array)((StructArray)column).Fields[1]).Values.ToArray();
var epoch = _long[columnIndex][_currentRecordIndex];
var fraction = _fraction[columnIndex][_currentRecordIndex];
return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToLocalTime();
}
else
{
var value = ((Int64Array)column).Values[_currentRecordIndex];
if (_long[columnIndex] == null)
_long[columnIndex] = ((Int64Array)column).Values.ToArray();

var value = _long[columnIndex][_currentRecordIndex];
var epoch = ExtractEpoch(value, scale);
var fraction = ExtractFraction(value, scale);
return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToLocalTime();
Expand All @@ -284,13 +308,20 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)
case SFDataType.TIMESTAMP_NTZ:
if (column.GetType() == typeof(StructArray))
{
var epoch = ((Int64Array)((StructArray)column).Fields[0]).Values[_currentRecordIndex];
var fraction = ((Int32Array)((StructArray)column).Fields[1]).Values[_currentRecordIndex];
if (_long[columnIndex] == null)
_long[columnIndex] = ((Int64Array)((StructArray)column).Fields[0]).Values.ToArray();
if (_fraction[columnIndex] == null)
_fraction[columnIndex] = ((Int32Array)((StructArray)column).Fields[1]).Values.ToArray();
var epoch = _long[columnIndex][_currentRecordIndex];
var fraction = _fraction[columnIndex][_currentRecordIndex];
return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).DateTime;
}
else
{
var value = ((Int64Array)column).Values[_currentRecordIndex];
if (_long[columnIndex] == null)
_long[columnIndex] = ((Int64Array)column).Values.ToArray();

var value = _long[columnIndex][_currentRecordIndex];
var epoch = ExtractEpoch(value, scale);
var fraction = ExtractFraction(value, scale);
return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).DateTime;
Expand Down
9 changes: 4 additions & 5 deletions Snowflake.Data/Core/ArrowResultSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
*/

using System;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading;
Expand All @@ -24,7 +23,7 @@ class ArrowResultSet : SFBaseResultSet
private BaseResultChunk _currentChunk;
private readonly IChunkDownloader _chunkDownloader;

public ArrowResultSet(QueryExecResponseData responseData, SFStatement sfStatement, CancellationToken cancellationToken) : base()
public ArrowResultSet(QueryExecResponseData responseData, SFStatement sfStatement, CancellationToken cancellationToken)
{
columnCount = responseData.rowType.Count;
try
Expand Down Expand Up @@ -66,7 +65,7 @@ public ArrowResultSet(QueryExecResponseData responseData, SFStatement sfStatemen
_currentChunk = new ArrowResultChunk(columnCount);
}
}
catch(System.Exception ex)
catch(Exception ex)

Check warning on line 68 in Snowflake.Data/Core/ArrowResultSet.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/ArrowResultSet.cs#L68

Added line #L68 was not covered by tests
{
s_logger.Error("Result set error queryId="+responseData.queryId, ex);
throw;
Expand All @@ -84,7 +83,7 @@ internal override async Task<bool> NextAsync()
{
s_logger.Debug($"Get next chunk from chunk downloader, chunk: {_currentChunk.ChunkIndex + 1}/{_totalChunkCount}" +
$" rows: {_currentChunk.RowCount}, size compressed: {_currentChunk.CompressedSize}," +
$" size uncompressed: {_currentChunk.UncompressedSize}, " + ((ArrowResultChunk)_currentChunk).GetMetadata());
$" size uncompressed: {_currentChunk.UncompressedSize}");

Check warning on line 86 in Snowflake.Data/Core/ArrowResultSet.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/ArrowResultSet.cs#L84-L86

Added lines #L84 - L86 were not covered by tests
_currentChunk = await _chunkDownloader.GetNextChunkAsync().ConfigureAwait(false);
return _currentChunk?.Next() ?? false;
}
Expand All @@ -103,7 +102,7 @@ internal override bool Next()
{
s_logger.Debug($"Get next chunk from chunk downloader, chunk: {_currentChunk.ChunkIndex + 1}/{_totalChunkCount}" +
$" rows: {_currentChunk.RowCount}, size compressed: {_currentChunk.CompressedSize}," +
$" size uncompressed: {_currentChunk.UncompressedSize}, " + ((ArrowResultChunk)_currentChunk).GetMetadata());
$" size uncompressed: {_currentChunk.UncompressedSize}");

Check warning on line 105 in Snowflake.Data/Core/ArrowResultSet.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/ArrowResultSet.cs#L103-L105

Added lines #L103 - L105 were not covered by tests
_currentChunk = Task.Run(async() => await (_chunkDownloader.GetNextChunkAsync()).ConfigureAwait(false)).Result;

return _currentChunk?.Next() ?? false;
Expand Down

0 comments on commit d2e075b

Please sign in to comment.