Skip to content

Commit

Permalink
Revert "Revert cache for time"
Browse files Browse the repository at this point in the history
This reverts commit 5c7a278.
  • Loading branch information
sfc-gh-pbulawa committed Dec 20, 2023
1 parent 5c7a278 commit df23ba5
Showing 1 changed file with 81 additions and 115 deletions.
196 changes: 81 additions & 115 deletions Snowflake.Data/Core/ArrowResultChunk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

using System;
using System.Collections.Generic;
using System.Linq;
using Apache.Arrow;
using Google.Apis.Logging;

namespace Snowflake.Data.Core
{
Expand All @@ -31,37 +33,16 @@ internal class ArrowResultChunk : BaseResultChunk

public List<RecordBatch> RecordBatch { get; set; }

private sbyte[][] _sbyte;
private short[][] _short;
private int[][] _int;
private long[][] _long;

private byte[][] _byte;
private double[][] _double;


private int _currentBatchIndex;
private int _currentRecordIndex = -1;

private void ResetTempTables()
{
_sbyte = new sbyte[ColumnCount][];
_short = new short[ColumnCount][];
_int = new int[ColumnCount][];
_long = new long[ColumnCount][];
_byte = new byte[ColumnCount][];
_double = new double[ColumnCount][];
}

public ArrowResultChunk(RecordBatch recordBatch)
{
RecordBatch = new List<RecordBatch> { recordBatch };

RowCount = recordBatch.Length;
ColumnCount = recordBatch.ColumnCount;
ChunkIndex = -1;

ResetTempTables();
}

public ArrowResultChunk(int columnCount)
Expand All @@ -71,8 +52,6 @@ public ArrowResultChunk(int columnCount)
RowCount = 0;
ColumnCount = columnCount;
ChunkIndex = -1;

ResetTempTables();
}

public void AddRecordBatch(RecordBatch recordBatch)
Expand All @@ -87,8 +66,6 @@ internal override void Reset(ExecResponseChunk chunkInfo, int chunkIndex)
_currentBatchIndex = 0;
_currentRecordIndex = -1;
RecordBatch.Clear();

ResetTempTables();
}

internal override bool Next()
Expand All @@ -102,9 +79,7 @@ internal override bool Next()

_currentBatchIndex += 1;
_currentRecordIndex = 0;

ResetTempTables();


return _currentBatchIndex < RecordBatch.Count;
}

Expand All @@ -123,7 +98,6 @@ internal override bool Rewind()
{
_currentRecordIndex = RecordBatch[_currentBatchIndex].Length - 1;

ResetTempTables();
return true;
}

Expand All @@ -148,7 +122,10 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)

if (column.IsNull(_currentRecordIndex))
return DBNull.Value;


long? value;
long? epoch;
long? fraction;
switch (srcType)
{
case SFDataType.FIXED:
Expand All @@ -157,36 +134,24 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)
switch (column)
{
case Int8Array array:
if (_sbyte[columnIndex] == null)
_sbyte[columnIndex] = array.Values.ToArray();
if (scale == 0)
return _sbyte[columnIndex][_currentRecordIndex];
else
return _sbyte[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale];
return array.GetValue(_currentRecordIndex);
return array.GetValue(_currentRecordIndex) / (decimal)s_powersOf10[scale];

case Int16Array array:
if (_short[columnIndex] == null)
_short[columnIndex] = array.Values.ToArray();
if (scale == 0)
return _short[columnIndex][_currentRecordIndex];
else
return _short[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale];
return array.GetValue(_currentRecordIndex);
return array.GetValue(_currentRecordIndex) / (decimal)s_powersOf10[scale];

case Int32Array array:
if (_int[columnIndex] == null)
_int[columnIndex] = array.Values.ToArray();
if (scale == 0)
return _int[columnIndex][_currentRecordIndex];
else
return _int[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale];
return array.GetValue(_currentRecordIndex);
return array.GetValue(_currentRecordIndex) / (decimal)s_powersOf10[scale];

case Int64Array array:
if (_long[columnIndex] == null)
_long[columnIndex] = array.Values.ToArray();
if (scale == 0)
return _long[columnIndex][_currentRecordIndex];
else
return _long[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale];
return array.GetValue(_currentRecordIndex);
return array.GetValue(_currentRecordIndex) / (decimal)s_powersOf10[scale];

case Decimal128Array array:
return array.GetValue(_currentRecordIndex);
Expand All @@ -199,104 +164,105 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)
case SFDataType.REAL:
// Snowflake data types that are floating-point numbers will fall in this category
// e.g. FLOAT/REAL/DOUBLE
if (_double[columnIndex] == null)
_double[columnIndex] = ((DoubleArray)column).Values.ToArray();
return _double[columnIndex][_currentRecordIndex];
return ((DoubleArray)column).GetValue(_currentRecordIndex);

case SFDataType.TEXT:
case SFDataType.ARRAY:
case SFDataType.VARIANT:
case SFDataType.OBJECT:
if (_byte[columnIndex] == null || _int[columnIndex] == null)
{
_byte[columnIndex] = ((StringArray)column).Values.ToArray();
_int[columnIndex] = ((StringArray)column).ValueOffsets.ToArray();
}
return StringArray.DefaultEncoding.GetString(
_byte[columnIndex],
_int[columnIndex][_currentRecordIndex],
_int[columnIndex][_currentRecordIndex + 1] - _int[columnIndex][_currentRecordIndex]);
return ((StringArray)column).GetString(_currentRecordIndex);

case SFDataType.BINARY:
return ((BinaryArray)column).GetBytes(_currentRecordIndex).ToArray();

case SFDataType.DATE:
if (_int[columnIndex] == null)
_int[columnIndex] = ((Date32Array)column).Values.ToArray();
return SFDataConverter.UnixEpoch.AddTicks(_int[columnIndex][_currentRecordIndex] * TicksPerDay);
var val = ((Date32Array)column).GetValue(_currentRecordIndex);
if (val.HasValue)
return SFDataConverter.UnixEpoch.AddTicks(val.Value * TicksPerDay);
return null;

Check warning on line 182 in Snowflake.Data/Core/ArrowResultChunk.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/ArrowResultChunk.cs#L182

Added line #L182 was not covered by tests

case SFDataType.TIME:
{
var value = column.GetType() == typeof(Int32Array)
? ((Int32Array)column).Values[_currentRecordIndex]
: ((Int64Array)column).Values[_currentRecordIndex];
if (scale == 0)
return DateTimeOffset.FromUnixTimeSeconds(value).DateTime;
if (scale <= 3)
return DateTimeOffset.FromUnixTimeMilliseconds(value * s_powersOf10[3 - scale])
.DateTime;
if (scale <= 7)
return s_epochDate.AddTicks(value * s_powersOf10[7 - scale]).DateTime;
return s_epochDate.AddTicks(value / s_powersOf10[scale - 7]).DateTime;
}
case SFDataType.TIMESTAMP_TZ:
if (((StructArray)column).Fields.Count == 2)
switch (column)
{
var value = ((Int64Array)((StructArray)column).Fields[0]).Values[_currentRecordIndex];
var timezone = ((Int32Array)((StructArray)column).Fields[1]).Values[_currentRecordIndex];
var epoch = ExtractEpoch(value, scale);
var fraction = ExtractFraction(value, scale);
return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToOffset(TimeSpan.FromMinutes(timezone - 1440));
case Int32Array array:
return GetTime(array.GetValue(_currentRecordIndex), scale);
case Int64Array array:
return GetTime(array.GetValue(_currentRecordIndex), scale);
}
else
return null;

Check warning on line 192 in Snowflake.Data/Core/ArrowResultChunk.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/ArrowResultChunk.cs#L192

Added line #L192 was not covered by tests

case SFDataType.TIMESTAMP_TZ:
var structArray = (StructArray)column;
int? timezone;
if (structArray.Fields.Count == 2)
{
var epoch = ((Int64Array)((StructArray)column).Fields[0]).Values[_currentRecordIndex];
var fraction = ((Int32Array)((StructArray)column).Fields[1]).Values[_currentRecordIndex];
var timezone = ((Int32Array)((StructArray)column).Fields[2]).Values[_currentRecordIndex];
return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToOffset(TimeSpan.FromMinutes(timezone - 1440));
value = ((Int64Array)structArray.Fields[0]).GetValue(_currentRecordIndex);
timezone = ((Int32Array)structArray.Fields[1]).GetValue(_currentRecordIndex);
epoch = ExtractEpoch(value.Value, scale);
fraction = ExtractFraction(value.Value, scale);
return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).ToOffset(TimeSpan.FromMinutes(timezone.Value - 1440));
}


epoch = ((Int64Array)structArray.Fields[0]).GetValue(_currentRecordIndex);
fraction = ((Int32Array)structArray.Fields[1]).GetValue(_currentRecordIndex);
timezone = ((Int32Array)structArray.Fields[2]).GetValue(_currentRecordIndex);
return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).ToOffset(TimeSpan.FromMinutes(timezone.Value - 1440));
case SFDataType.TIMESTAMP_LTZ:
if (column.GetType() == typeof(StructArray))
{
var epoch = ((Int64Array)((StructArray)column).Fields[0]).Values[_currentRecordIndex];
var fraction = ((Int32Array)((StructArray)column).Fields[1]).Values[_currentRecordIndex];
return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToLocalTime();
}
else
switch (column)
{
var value = ((Int64Array)column).Values[_currentRecordIndex];
var epoch = ExtractEpoch(value, scale);
var fraction = ExtractFraction(value, scale);
return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToLocalTime();
case StructArray array:
epoch = ((Int64Array)array.Fields[0]).GetValue(_currentRecordIndex);
fraction = ((Int32Array)array.Fields[1]).GetValue(_currentRecordIndex);
return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).ToLocalTime();
case Int64Array array:
value = array.GetValue(_currentRecordIndex);
epoch = ExtractEpoch(value.Value, scale);
fraction = ExtractFraction(value.Value, scale);
return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).ToLocalTime();
}

return null;

Check warning on line 224 in Snowflake.Data/Core/ArrowResultChunk.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/ArrowResultChunk.cs#L224

Added line #L224 was not covered by tests
case SFDataType.TIMESTAMP_NTZ:
if (column.GetType() == typeof(StructArray))
{
var epoch = ((Int64Array)((StructArray)column).Fields[0]).Values[_currentRecordIndex];
var fraction = ((Int32Array)((StructArray)column).Fields[1]).Values[_currentRecordIndex];
return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).DateTime;
}
else
switch (column)
{
var value = ((Int64Array)column).Values[_currentRecordIndex];
var epoch = ExtractEpoch(value, scale);
var fraction = ExtractFraction(value, scale);
return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).DateTime;
case StructArray array:
epoch = ((Int64Array)array.Fields[0]).GetValue(_currentRecordIndex);
fraction = ((Int32Array)array.Fields[1]).GetValue(_currentRecordIndex);
return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).DateTime;
case Int64Array array:
value = array.GetValue(_currentRecordIndex);
epoch = ExtractEpoch(value.Value, scale);
fraction = ExtractFraction(value.Value, scale);
return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).DateTime;
}

return null;

Check warning on line 239 in Snowflake.Data/Core/ArrowResultChunk.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/ArrowResultChunk.cs#L239

Added line #L239 was not covered by tests
}
throw new NotSupportedException($"Type {srcType} is not supported.");
}

private long ExtractEpoch(long value, long scale)
private static long ExtractEpoch(long value, long scale)
{
return value / s_powersOf10[scale];
}

private long ExtractFraction(long value, long scale)
private static long ExtractFraction(long value, long scale)
{
return ((value % s_powersOf10[scale]) * s_powersOf10[9 - scale]);
}

private static DateTime? GetTime(long? value, long scale)
{
if (!value.HasValue)
return null;

Check warning on line 257 in Snowflake.Data/Core/ArrowResultChunk.cs

View check run for this annotation

Codecov / codecov/patch

Snowflake.Data/Core/ArrowResultChunk.cs#L257

Added line #L257 was not covered by tests
if (scale == 0)
return DateTimeOffset.FromUnixTimeSeconds(value.Value).DateTime;
if (scale <= 3)
return DateTimeOffset.FromUnixTimeMilliseconds(value.Value * s_powersOf10[3 - scale])
.DateTime;
if (scale <= 7)
return s_epochDate.AddTicks(value.Value * s_powersOf10[7 - scale]).DateTime;
return s_epochDate.AddTicks(value.Value / s_powersOf10[scale - 7]).DateTime;
}
}
}

0 comments on commit df23ba5

Please sign in to comment.