diff --git a/Snowflake.Data/Core/ArrowResultChunk.cs b/Snowflake.Data/Core/ArrowResultChunk.cs index 4617f1731..3a330bb5b 100755 --- a/Snowflake.Data/Core/ArrowResultChunk.cs +++ b/Snowflake.Data/Core/ArrowResultChunk.cs @@ -4,7 +4,9 @@ using System; using System.Collections.Generic; +using System.Linq; using Apache.Arrow; +using Google.Apis.Logging; namespace Snowflake.Data.Core { @@ -31,85 +33,9 @@ internal class ArrowResultChunk : BaseResultChunk public List RecordBatch { get; set; } - private sbyte[][] _sbyte; - private short[][] _short; - private int[][] _int; - private int[][] _fraction; - private long[][] _long; - - private byte[][] _byte; - private double[][] _double; - - private int[] _structFieldCount; - private int _currentBatchIndex; private int _currentRecordIndex = -1; - private void PrepareCache() - { - if (RowCount == 0) - { - return; - } - - for (var i = 0; i < ColumnCount; i++) - { - var column = RecordBatch[_currentBatchIndex].Column(i); - switch (column) - { - case Int8Array array: - _sbyte[i] = array.Values.ToArray(); - break; - case Int16Array array: - _short[i] = array.Values.ToArray(); - break; - case Int32Array array: - _int[i] = array.Values.ToArray(); - break; - case Int64Array array: - _long[i] = array.Values.ToArray(); - break; - case DoubleArray array: - _double[i] = array.Values.ToArray(); - break; - case StringArray array: - _byte[i] = array.Values.ToArray(); - _int[i] = array.ValueOffsets.ToArray(); - break; - case Date32Array array: - _int[i] = array.Values.ToArray(); - break; - case StructArray array: - _long[i] = ((Int64Array)array.Fields[0]).Values.ToArray(); - switch (array.Fields.Count) - { - case 2: - _int[i] = ((Int32Array)array.Fields[1]).Values.ToArray(); - _structFieldCount[i] = 2; - break; - case 3: - _fraction[i] = ((Int32Array)array.Fields[1]).Values.ToArray(); - _int[i] = ((Int32Array)array.Fields[2]).Values.ToArray(); - _structFieldCount[i] = 3; - break; - } - break; - } - } - } - - private void ResetTempTables() - { - _sbyte = new sbyte[ColumnCount][]; - _short = new short[ColumnCount][]; - _int = new int[ColumnCount][]; - _fraction = new int[ColumnCount][]; - _long = new long[ColumnCount][]; - _byte = new byte[ColumnCount][]; - _double = new double[ColumnCount][]; - _structFieldCount = new int[ColumnCount]; - } - public ArrowResultChunk(RecordBatch recordBatch) { RecordBatch = new List { recordBatch }; @@ -117,9 +43,6 @@ public ArrowResultChunk(RecordBatch recordBatch) RowCount = recordBatch.Length; ColumnCount = recordBatch.ColumnCount; ChunkIndex = -1; - - ResetTempTables(); - PrepareCache(); } public ArrowResultChunk(int columnCount) @@ -129,9 +52,6 @@ public ArrowResultChunk(int columnCount) RowCount = 0; ColumnCount = columnCount; ChunkIndex = -1; - - ResetTempTables(); - PrepareCache(); } public void AddRecordBatch(RecordBatch recordBatch) @@ -146,9 +66,6 @@ internal override void Reset(ExecResponseChunk chunkInfo, int chunkIndex) _currentBatchIndex = 0; _currentRecordIndex = -1; RecordBatch.Clear(); - - ResetTempTables(); - PrepareCache(); } internal override bool Next() @@ -162,18 +79,8 @@ internal override bool Next() _currentBatchIndex += 1; _currentRecordIndex = 0; - - if (_currentBatchIndex < RecordBatch.Count) - { - ResetTempTables(); - PrepareCache(); - - return true; - } - ResetTempTables(); - - return false; + return _currentBatchIndex < RecordBatch.Count; } internal override bool Rewind() @@ -191,8 +98,6 @@ internal override bool Rewind() { _currentRecordIndex = RecordBatch[_currentBatchIndex].Length - 1; - ResetTempTables(); - PrepareCache(); return true; } @@ -217,7 +122,10 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale) if (column.IsNull(_currentRecordIndex)) return DBNull.Value; - + + long? value; + long? epoch; + long? fraction; switch (srcType) { case SFDataType.FIXED: @@ -225,25 +133,25 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale) // e.g. NUMBER, DECIMAL/NUMERIC, INT/INTEGER switch (column) { - case Int8Array _: + case Int8Array array: if (scale == 0) - return _sbyte[columnIndex][_currentRecordIndex]; - return _sbyte[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale]; + return array.GetValue(_currentRecordIndex); + return array.GetValue(_currentRecordIndex) / (decimal)s_powersOf10[scale]; - case Int16Array _: + case Int16Array array: if (scale == 0) - return _short[columnIndex][_currentRecordIndex]; - return _short[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale]; + return array.GetValue(_currentRecordIndex); + return array.GetValue(_currentRecordIndex) / (decimal)s_powersOf10[scale]; - case Int32Array _: + case Int32Array array: if (scale == 0) - return _int[columnIndex][_currentRecordIndex]; - return _int[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale]; + return array.GetValue(_currentRecordIndex); + return array.GetValue(_currentRecordIndex) / (decimal)s_powersOf10[scale]; - case Int64Array _: + case Int64Array array: if (scale == 0) - return _long[columnIndex][_currentRecordIndex]; - return _long[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale]; + return array.GetValue(_currentRecordIndex); + return array.GetValue(_currentRecordIndex) / (decimal)s_powersOf10[scale]; case Decimal128Array array: return array.GetValue(_currentRecordIndex); @@ -256,82 +164,79 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale) case SFDataType.REAL: // Snowflake data types that are floating-point numbers will fall in this category // e.g. FLOAT/REAL/DOUBLE - return _double[columnIndex][_currentRecordIndex]; + return ((DoubleArray)column).GetValue(_currentRecordIndex); case SFDataType.TEXT: case SFDataType.ARRAY: case SFDataType.VARIANT: case SFDataType.OBJECT: - return StringArray.DefaultEncoding.GetString( - _byte[columnIndex], - _int[columnIndex][_currentRecordIndex], - _int[columnIndex][_currentRecordIndex + 1] - _int[columnIndex][_currentRecordIndex]); + return ((StringArray)column).GetString(_currentRecordIndex); case SFDataType.BINARY: return ((BinaryArray)column).GetBytes(_currentRecordIndex).ToArray(); case SFDataType.DATE: - return SFDataConverter.UnixEpoch.AddTicks(_int[columnIndex][_currentRecordIndex] * TicksPerDay); + var val = ((Date32Array)column).GetValue(_currentRecordIndex); + if (val.HasValue) + return SFDataConverter.UnixEpoch.AddTicks(val.Value * TicksPerDay); + return null; case SFDataType.TIME: - { - var value = column.GetType() == typeof(Int32Array) ? _int[columnIndex][_currentRecordIndex] : _long[columnIndex][_currentRecordIndex]; - - if (scale == 0) - return DateTimeOffset.FromUnixTimeSeconds(value).DateTime; - if (scale <= 3) - return DateTimeOffset.FromUnixTimeMilliseconds(value * s_powersOf10[3 - scale]) - .DateTime; - if (scale <= 7) - return s_epochDate.AddTicks(value * s_powersOf10[7 - scale]).DateTime; - return s_epochDate.AddTicks(value / s_powersOf10[scale - 7]).DateTime; - } - case SFDataType.TIMESTAMP_TZ: - if (_structFieldCount[columnIndex] == 2) + switch (column) { - var value = _long[columnIndex][_currentRecordIndex]; - var timezone = _int[columnIndex][_currentRecordIndex]; - var epoch = ExtractEpoch(value, scale); - var fraction = ExtractFraction(value, scale); - return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToOffset(TimeSpan.FromMinutes(timezone - 1440)); + case Int32Array array: + return GetTime(array.GetValue(_currentRecordIndex), scale); + case Int64Array array: + return GetTime(array.GetValue(_currentRecordIndex), scale); } - else + return null; + + case SFDataType.TIMESTAMP_TZ: + var structArray = (StructArray)column; + int? timezone; + if (structArray.Fields.Count == 2) { - var epoch = _long[columnIndex][_currentRecordIndex]; - var fraction = _fraction[columnIndex][_currentRecordIndex]; - var timezone = _int[columnIndex][_currentRecordIndex]; - return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToOffset(TimeSpan.FromMinutes(timezone - 1440)); + value = ((Int64Array)structArray.Fields[0]).GetValue(_currentRecordIndex); + timezone = ((Int32Array)structArray.Fields[1]).GetValue(_currentRecordIndex); + epoch = ExtractEpoch(value.Value, scale); + fraction = ExtractFraction(value.Value, scale); + return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).ToOffset(TimeSpan.FromMinutes(timezone.Value - 1440)); } - + + epoch = ((Int64Array)structArray.Fields[0]).GetValue(_currentRecordIndex); + fraction = ((Int32Array)structArray.Fields[1]).GetValue(_currentRecordIndex); + timezone = ((Int32Array)structArray.Fields[2]).GetValue(_currentRecordIndex); + return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).ToOffset(TimeSpan.FromMinutes(timezone.Value - 1440)); case SFDataType.TIMESTAMP_LTZ: - if (_structFieldCount[columnIndex] == 2) - { - var epoch = _long[columnIndex][_currentRecordIndex]; - var fraction = _int[columnIndex][_currentRecordIndex]; - return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToLocalTime(); - } - else + switch (column) { - var value = _long[columnIndex][_currentRecordIndex]; - var epoch = ExtractEpoch(value, scale); - var fraction = ExtractFraction(value, scale); - return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToLocalTime(); + case StructArray array: + epoch = ((Int64Array)array.Fields[0]).GetValue(_currentRecordIndex); + fraction = ((Int32Array)array.Fields[1]).GetValue(_currentRecordIndex); + return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).ToLocalTime(); + case Int64Array array: + value = array.GetValue(_currentRecordIndex); + epoch = ExtractEpoch(value.Value, scale); + fraction = ExtractFraction(value.Value, scale); + return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).ToLocalTime(); } + return null; case SFDataType.TIMESTAMP_NTZ: - if (_structFieldCount[columnIndex] == 2) - { - var epoch = _long[columnIndex][_currentRecordIndex]; - var fraction = _int[columnIndex][_currentRecordIndex]; - return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).DateTime; - } - else + switch (column) { - var value = _long[columnIndex][_currentRecordIndex]; - var epoch = ExtractEpoch(value, scale); - var fraction = ExtractFraction(value, scale); - return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).DateTime; + case StructArray array: + epoch = ((Int64Array)array.Fields[0]).GetValue(_currentRecordIndex); + fraction = ((Int32Array)array.Fields[1]).GetValue(_currentRecordIndex); + return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).DateTime; + case Int64Array array: + value = array.GetValue(_currentRecordIndex); + epoch = ExtractEpoch(value.Value, scale); + fraction = ExtractFraction(value.Value, scale); + return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).DateTime; } + + return null; } throw new NotSupportedException($"Type {srcType} is not supported."); } @@ -345,5 +250,19 @@ private static long ExtractFraction(long value, long scale) { return ((value % s_powersOf10[scale]) * s_powersOf10[9 - scale]); } + + private static DateTime? GetTime(long? value, long scale) + { + if (!value.HasValue) + return null; + if (scale == 0) + return DateTimeOffset.FromUnixTimeSeconds(value.Value).DateTime; + if (scale <= 3) + return DateTimeOffset.FromUnixTimeMilliseconds(value.Value * s_powersOf10[3 - scale]) + .DateTime; + if (scale <= 7) + return s_epochDate.AddTicks(value.Value * s_powersOf10[7 - scale]).DateTime; + return s_epochDate.AddTicks(value.Value / s_powersOf10[scale - 7]).DateTime; + } } }