Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1640968 chunk downloader fix #1022

Merged
merged 10 commits into from
Oct 9, 2024
56 changes: 28 additions & 28 deletions Snowflake.Data/Core/ArrowResultChunk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ internal class ArrowResultChunk : BaseResultChunk
internal override ResultFormat ResultFormat => ResultFormat.ARROW;

private static readonly DateTimeOffset s_epochDate = SFDataConverter.UnixEpoch;
private static readonly long[] s_powersOf10 = {
1,
10,
100,
1000,
10000,
100000,
1000000,
10000000,

private static readonly long[] s_powersOf10 = {
1,
10,
100,
1000,
10000,
100000,
1000000,
10000000,
100000000,
1000000000
};
Expand Down Expand Up @@ -62,7 +62,7 @@ public ArrowResultChunk(RecordBatch recordBatch)
RowCount = recordBatch.Length;
ColumnCount = recordBatch.ColumnCount;
ChunkIndex = -1;

ResetTempTables();
}

Expand All @@ -81,11 +81,11 @@ public void AddRecordBatch(RecordBatch recordBatch)
{
RecordBatch.Add(recordBatch);
}

internal override void Reset(ExecResponseChunk chunkInfo, int chunkIndex)
{
base.Reset(chunkInfo, chunkIndex);

_currentBatchIndex = 0;
_currentRecordIndex = -1;
RecordBatch.Clear();
Expand All @@ -97,7 +97,7 @@ internal override bool Next()
{
if (_currentBatchIndex >= RecordBatch.Count)
return false;

_currentRecordIndex += 1;
if (_currentRecordIndex < RecordBatch[_currentBatchIndex].Length)
return true;
Expand Down Expand Up @@ -149,7 +149,7 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)

if (column.IsNull(_currentRecordIndex))
return DBNull.Value;

switch (srcType)
{
case SFDataType.FIXED:
Expand All @@ -170,7 +170,7 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)
if (scale == 0)
return _short[columnIndex][_currentRecordIndex];
return _short[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale];

case Int32Array array:
if (_int[columnIndex] == null)
_int[columnIndex] = array.Values.ToArray();
Expand All @@ -184,7 +184,7 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)
if (scale == 0)
return _long[columnIndex][_currentRecordIndex];
return _long[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale];

case Decimal128Array array:
return array.GetValue(_currentRecordIndex);
}
Expand All @@ -210,8 +210,8 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)
_int[columnIndex] = ((StringArray)column).ValueOffsets.ToArray();
}
return StringArray.DefaultEncoding.GetString(
_byte[columnIndex],
_int[columnIndex][_currentRecordIndex],
_byte[columnIndex],
_int[columnIndex][_currentRecordIndex],
_int[columnIndex][_currentRecordIndex + 1] - _int[columnIndex][_currentRecordIndex]);

case SFDataType.VECTOR:
Expand Down Expand Up @@ -250,16 +250,16 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)

case SFDataType.BINARY:
return ((BinaryArray)column).GetBytes(_currentRecordIndex).ToArray();

case SFDataType.DATE:
if (_int[columnIndex] == null)
_int[columnIndex] = ((Date32Array)column).Values.ToArray();
return SFDataConverter.UnixEpoch.AddTicks(_int[columnIndex][_currentRecordIndex] * TicksPerDay);

case SFDataType.TIME:
{
long value;

if (column.GetType() == typeof(Int32Array))
{
if (_int[columnIndex] == null)
Expand All @@ -278,7 +278,7 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)

value = _long[columnIndex][_currentRecordIndex];
}

if (scale == 0)
return DateTimeOffset.FromUnixTimeSeconds(value).DateTime;
if (scale <= 3)
Expand All @@ -292,7 +292,7 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)
var structCol = (StructArray)column;
if (_long[columnIndex] == null)
_long[columnIndex] = ((Int64Array)structCol.Fields[0]).Values.ToArray();

if (structCol.Fields.Count == 2)
{
if (_int[columnIndex] == null)
Expand All @@ -309,7 +309,7 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)
_fraction[columnIndex] = ((Int32Array)structCol.Fields[1]).Values.ToArray();
if (_int[columnIndex] == null)
_int[columnIndex] = ((Int32Array)structCol.Fields[2]).Values.ToArray();

var epoch = _long[columnIndex][_currentRecordIndex];
var fraction = _fraction[columnIndex][_currentRecordIndex];
var timezone = _int[columnIndex][_currentRecordIndex];
Expand All @@ -331,7 +331,7 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)
{
if (_long[columnIndex] == null)
_long[columnIndex] = ((Int64Array)column).Values.ToArray();

var value = _long[columnIndex][_currentRecordIndex];
var epoch = ExtractEpoch(value, scale);
var fraction = ExtractFraction(value, scale);
Expand All @@ -353,7 +353,7 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)
{
if (_long[columnIndex] == null)
_long[columnIndex] = ((Int64Array)column).Values.ToArray();

var value = _long[columnIndex][_currentRecordIndex];
var epoch = ExtractEpoch(value, scale);
var fraction = ExtractFraction(value, scale);
Expand All @@ -362,7 +362,7 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)
}
throw new NotSupportedException($"Type {srcType} is not supported.");
}

private long ExtractEpoch(long value, long scale)
{
return value / s_powersOf10[scale];
Expand Down
25 changes: 17 additions & 8 deletions Snowflake.Data/Core/BaseResultChunk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@
public abstract class BaseResultChunk : IResultChunk
{
internal abstract ResultFormat ResultFormat { get; }

public int RowCount { get; protected set; }

public int ColumnCount { get; protected set; }

public int ChunkIndex { get; protected set; }

internal int CompressedSize;

internal int UncompressedSize;

internal string Url { get; set; }

internal string[,] RowSet { get; set; }

public int GetRowCount() => RowCount;

public int GetChunkIndex() => ChunkIndex;
Expand All @@ -32,11 +32,11 @@
public abstract UTF8Buffer ExtractCell(int rowIndex, int columnIndex);

public abstract UTF8Buffer ExtractCell(int columnIndex);

internal abstract bool Next();

internal abstract bool Rewind();

internal virtual void Reset(ExecResponseChunk chunkInfo, int chunkIndex)
{
RowCount = chunkInfo.rowCount;
Expand All @@ -46,6 +46,15 @@
UncompressedSize = chunkInfo.uncompressedSize;
}

internal virtual void Reset()
{
RowCount = 0;
Url = null;
ChunkIndex = 0;
CompressedSize = 0;
UncompressedSize = 0;
}

Check warning on line 56 in Snowflake.Data/Core/BaseResultChunk.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/BaseResultChunk.cs#L50-L56

Added lines #L50 - L56 were not covered by tests

internal virtual void ResetForRetry()
{
}
Expand Down
Loading
Loading