Skip to content

Commit

Permalink
Revert cache for time
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-pbulawa committed Dec 19, 2023
1 parent 7df8f1b commit 5c7a278
Showing 1 changed file with 115 additions and 81 deletions.
196 changes: 115 additions & 81 deletions Snowflake.Data/Core/ArrowResultChunk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

using System;
using System.Collections.Generic;
using System.Linq;
using Apache.Arrow;
using Google.Apis.Logging;

namespace Snowflake.Data.Core
{
Expand All @@ -33,16 +31,37 @@ internal class ArrowResultChunk : BaseResultChunk

public List<RecordBatch> RecordBatch { get; set; }

private sbyte[][] _sbyte;
private short[][] _short;
private int[][] _int;
private long[][] _long;

private byte[][] _byte;
private double[][] _double;


private int _currentBatchIndex;
private int _currentRecordIndex = -1;

private void ResetTempTables()
{
_sbyte = new sbyte[ColumnCount][];
_short = new short[ColumnCount][];
_int = new int[ColumnCount][];
_long = new long[ColumnCount][];
_byte = new byte[ColumnCount][];
_double = new double[ColumnCount][];
}

public ArrowResultChunk(RecordBatch recordBatch)
{
RecordBatch = new List<RecordBatch> { recordBatch };

RowCount = recordBatch.Length;
ColumnCount = recordBatch.ColumnCount;
ChunkIndex = -1;

ResetTempTables();
}

public ArrowResultChunk(int columnCount)
Expand All @@ -52,6 +71,8 @@ public ArrowResultChunk(int columnCount)
RowCount = 0;
ColumnCount = columnCount;
ChunkIndex = -1;

ResetTempTables();
}

public void AddRecordBatch(RecordBatch recordBatch)
Expand All @@ -66,6 +87,8 @@ internal override void Reset(ExecResponseChunk chunkInfo, int chunkIndex)
_currentBatchIndex = 0;
_currentRecordIndex = -1;
RecordBatch.Clear();

ResetTempTables();
}

internal override bool Next()
Expand All @@ -79,7 +102,9 @@ internal override bool Next()

_currentBatchIndex += 1;
_currentRecordIndex = 0;


ResetTempTables();

return _currentBatchIndex < RecordBatch.Count;
}

Expand All @@ -98,6 +123,7 @@ internal override bool Rewind()
{
_currentRecordIndex = RecordBatch[_currentBatchIndex].Length - 1;

ResetTempTables();
return true;
}

Expand All @@ -122,10 +148,7 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)

if (column.IsNull(_currentRecordIndex))
return DBNull.Value;

long? value;
long? epoch;
long? fraction;

switch (srcType)
{
case SFDataType.FIXED:
Expand All @@ -134,24 +157,36 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)
switch (column)
{
case Int8Array array:
if (_sbyte[columnIndex] == null)
_sbyte[columnIndex] = array.Values.ToArray();
if (scale == 0)
return array.GetValue(_currentRecordIndex);
return array.GetValue(_currentRecordIndex) / (decimal)s_powersOf10[scale];
return _sbyte[columnIndex][_currentRecordIndex];
else
return _sbyte[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale];

case Int16Array array:
if (_short[columnIndex] == null)
_short[columnIndex] = array.Values.ToArray();
if (scale == 0)
return array.GetValue(_currentRecordIndex);
return array.GetValue(_currentRecordIndex) / (decimal)s_powersOf10[scale];
return _short[columnIndex][_currentRecordIndex];
else
return _short[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale];

case Int32Array array:
if (_int[columnIndex] == null)
_int[columnIndex] = array.Values.ToArray();
if (scale == 0)
return array.GetValue(_currentRecordIndex);
return array.GetValue(_currentRecordIndex) / (decimal)s_powersOf10[scale];
return _int[columnIndex][_currentRecordIndex];
else
return _int[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale];

case Int64Array array:
if (_long[columnIndex] == null)
_long[columnIndex] = array.Values.ToArray();
if (scale == 0)
return array.GetValue(_currentRecordIndex);
return array.GetValue(_currentRecordIndex) / (decimal)s_powersOf10[scale];
return _long[columnIndex][_currentRecordIndex];
else
return _long[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale];

case Decimal128Array array:
return array.GetValue(_currentRecordIndex);
Expand All @@ -164,105 +199,104 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale)
case SFDataType.REAL:
// Snowflake data types that are floating-point numbers will fall in this category
// e.g. FLOAT/REAL/DOUBLE
return ((DoubleArray)column).GetValue(_currentRecordIndex);
if (_double[columnIndex] == null)
_double[columnIndex] = ((DoubleArray)column).Values.ToArray();
return _double[columnIndex][_currentRecordIndex];

case SFDataType.TEXT:
case SFDataType.ARRAY:
case SFDataType.VARIANT:
case SFDataType.OBJECT:
return ((StringArray)column).GetString(_currentRecordIndex);
if (_byte[columnIndex] == null || _int[columnIndex] == null)
{
_byte[columnIndex] = ((StringArray)column).Values.ToArray();
_int[columnIndex] = ((StringArray)column).ValueOffsets.ToArray();
}
return StringArray.DefaultEncoding.GetString(
_byte[columnIndex],
_int[columnIndex][_currentRecordIndex],
_int[columnIndex][_currentRecordIndex + 1] - _int[columnIndex][_currentRecordIndex]);

case SFDataType.BINARY:
return ((BinaryArray)column).GetBytes(_currentRecordIndex).ToArray();

case SFDataType.DATE:
var val = ((Date32Array)column).GetValue(_currentRecordIndex);
if (val.HasValue)
return SFDataConverter.UnixEpoch.AddTicks(val.Value * TicksPerDay);
return null;
if (_int[columnIndex] == null)
_int[columnIndex] = ((Date32Array)column).Values.ToArray();
return SFDataConverter.UnixEpoch.AddTicks(_int[columnIndex][_currentRecordIndex] * TicksPerDay);

case SFDataType.TIME:
switch (column)
{
var value = column.GetType() == typeof(Int32Array)
? ((Int32Array)column).Values[_currentRecordIndex]
: ((Int64Array)column).Values[_currentRecordIndex];
if (scale == 0)
return DateTimeOffset.FromUnixTimeSeconds(value).DateTime;
if (scale <= 3)
return DateTimeOffset.FromUnixTimeMilliseconds(value * s_powersOf10[3 - scale])
.DateTime;
if (scale <= 7)
return s_epochDate.AddTicks(value * s_powersOf10[7 - scale]).DateTime;
return s_epochDate.AddTicks(value / s_powersOf10[scale - 7]).DateTime;
}
case SFDataType.TIMESTAMP_TZ:
if (((StructArray)column).Fields.Count == 2)
{
case Int32Array array:
return GetTime(array.GetValue(_currentRecordIndex), scale);
case Int64Array array:
return GetTime(array.GetValue(_currentRecordIndex), scale);
var value = ((Int64Array)((StructArray)column).Fields[0]).Values[_currentRecordIndex];
var timezone = ((Int32Array)((StructArray)column).Fields[1]).Values[_currentRecordIndex];
var epoch = ExtractEpoch(value, scale);
var fraction = ExtractFraction(value, scale);
return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToOffset(TimeSpan.FromMinutes(timezone - 1440));
}
return null;

case SFDataType.TIMESTAMP_TZ:
var structArray = (StructArray)column;
int? timezone;
if (structArray.Fields.Count == 2)
else
{
value = ((Int64Array)structArray.Fields[0]).GetValue(_currentRecordIndex);
timezone = ((Int32Array)structArray.Fields[1]).GetValue(_currentRecordIndex);
epoch = ExtractEpoch(value.Value, scale);
fraction = ExtractFraction(value.Value, scale);
return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).ToOffset(TimeSpan.FromMinutes(timezone.Value - 1440));
var epoch = ((Int64Array)((StructArray)column).Fields[0]).Values[_currentRecordIndex];
var fraction = ((Int32Array)((StructArray)column).Fields[1]).Values[_currentRecordIndex];
var timezone = ((Int32Array)((StructArray)column).Fields[2]).Values[_currentRecordIndex];
return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToOffset(TimeSpan.FromMinutes(timezone - 1440));
}

epoch = ((Int64Array)structArray.Fields[0]).GetValue(_currentRecordIndex);
fraction = ((Int32Array)structArray.Fields[1]).GetValue(_currentRecordIndex);
timezone = ((Int32Array)structArray.Fields[2]).GetValue(_currentRecordIndex);
return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).ToOffset(TimeSpan.FromMinutes(timezone.Value - 1440));

case SFDataType.TIMESTAMP_LTZ:
switch (column)
if (column.GetType() == typeof(StructArray))
{
case StructArray array:
epoch = ((Int64Array)array.Fields[0]).GetValue(_currentRecordIndex);
fraction = ((Int32Array)array.Fields[1]).GetValue(_currentRecordIndex);
return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).ToLocalTime();
case Int64Array array:
value = array.GetValue(_currentRecordIndex);
epoch = ExtractEpoch(value.Value, scale);
fraction = ExtractFraction(value.Value, scale);
return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).ToLocalTime();
var epoch = ((Int64Array)((StructArray)column).Fields[0]).Values[_currentRecordIndex];
var fraction = ((Int32Array)((StructArray)column).Fields[1]).Values[_currentRecordIndex];
return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToLocalTime();
}
else
{
var value = ((Int64Array)column).Values[_currentRecordIndex];
var epoch = ExtractEpoch(value, scale);
var fraction = ExtractFraction(value, scale);
return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToLocalTime();
}

return null;
case SFDataType.TIMESTAMP_NTZ:
switch (column)
if (column.GetType() == typeof(StructArray))
{
case StructArray array:
epoch = ((Int64Array)array.Fields[0]).GetValue(_currentRecordIndex);
fraction = ((Int32Array)array.Fields[1]).GetValue(_currentRecordIndex);
return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).DateTime;
case Int64Array array:
value = array.GetValue(_currentRecordIndex);
epoch = ExtractEpoch(value.Value, scale);
fraction = ExtractFraction(value.Value, scale);
return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).DateTime;
var epoch = ((Int64Array)((StructArray)column).Fields[0]).Values[_currentRecordIndex];
var fraction = ((Int32Array)((StructArray)column).Fields[1]).Values[_currentRecordIndex];
return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).DateTime;
}
else
{
var value = ((Int64Array)column).Values[_currentRecordIndex];
var epoch = ExtractEpoch(value, scale);
var fraction = ExtractFraction(value, scale);
return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).DateTime;
}

return null;
}
throw new NotSupportedException($"Type {srcType} is not supported.");
}

private static long ExtractEpoch(long value, long scale)
private long ExtractEpoch(long value, long scale)
{
return value / s_powersOf10[scale];
}

private static long ExtractFraction(long value, long scale)
private long ExtractFraction(long value, long scale)
{
return ((value % s_powersOf10[scale]) * s_powersOf10[9 - scale]);
}

private static DateTime? GetTime(long? value, long scale)
{
if (!value.HasValue)
return null;
if (scale == 0)
return DateTimeOffset.FromUnixTimeSeconds(value.Value).DateTime;
if (scale <= 3)
return DateTimeOffset.FromUnixTimeMilliseconds(value.Value * s_powersOf10[3 - scale])
.DateTime;
if (scale <= 7)
return s_epochDate.AddTicks(value.Value * s_powersOf10[7 - scale]).DateTime;
return s_epochDate.AddTicks(value.Value / s_powersOf10[scale - 7]).DateTime;
}
}
}

0 comments on commit 5c7a278

Please sign in to comment.