diff --git a/Snowflake.Data.Tests/IntegrationTests/SFDbDataReaderIT.cs b/Snowflake.Data.Tests/IntegrationTests/SFDbDataReaderIT.cs index e0b51a8be..b0e555185 100755 --- a/Snowflake.Data.Tests/IntegrationTests/SFDbDataReaderIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/SFDbDataReaderIT.cs @@ -15,8 +15,7 @@ namespace Snowflake.Data.Tests.IntegrationTests { - // TODO: enable tests for Arrow - //[TestFixture(ResultFormat.ARROW)] + [TestFixture(ResultFormat.ARROW)] [TestFixture(ResultFormat.JSON)] class SFDbDataReaderIT : SFBaseTest { @@ -571,7 +570,7 @@ public void TestGetTimestampLTZ() } [Test] - public void TestGetBoolean() + public void TestGetBoolean([Values]bool value) { using (var conn = CreateAndOpenConnection()) { @@ -585,7 +584,7 @@ public void TestGetBoolean() var p1 = cmd.CreateParameter(); p1.ParameterName = "1"; p1.DbType = DbType.Boolean; - p1.Value = true; + p1.Value = value; cmd.Parameters.Add(p1); var count = cmd.ExecuteNonQuery(); @@ -597,7 +596,7 @@ public void TestGetBoolean() ValidateResultFormat(reader); Assert.IsTrue(reader.Read()); - Assert.IsTrue(reader.GetBoolean(0)); + Assert.AreEqual(value, reader.GetBoolean(0)); reader.Close(); CloseConnection(conn); diff --git a/Snowflake.Data.Tests/UnitTests/ArrowResultChunkTest.cs b/Snowflake.Data.Tests/UnitTests/ArrowResultChunkTest.cs index b11486e78..8197409be 100755 --- a/Snowflake.Data.Tests/UnitTests/ArrowResultChunkTest.cs +++ b/Snowflake.Data.Tests/UnitTests/ArrowResultChunkTest.cs @@ -4,6 +4,7 @@ using System; using System.Collections; +using System.Collections.Generic; using System.Linq; using Apache.Arrow; using Apache.Arrow.Types; @@ -141,7 +142,7 @@ public void TestGetChunkIndexReturnsFirstChunk() { var chunk = new ArrowResultChunk(_recordBatchOne); - Assert.AreEqual(0, chunk.ChunkIndex); + Assert.AreEqual(-1, chunk.ChunkIndex); } [Test] @@ -156,10 +157,34 @@ public void TestUnusedExtractCellThrowsNotSupportedException() [Test] public void TestExtractCellReturnsNull() { - var chunk = new ArrowResultChunk(RecordBatchWithNullValue); - chunk.Next(); - - Assert.AreEqual(DBNull.Value, chunk.ExtractCell(0, SFDataType.FIXED, 0)); + var cases = new Dictionary + { + { new ArrowResultChunk(new RecordBatch.Builder().Append("Col_Int8", false, col => col.Int8(array => array.AppendNull())).Build()), SFDataType.FIXED }, + { new ArrowResultChunk(new RecordBatch.Builder().Append("Col_Int16", false, col => col.Int16(array => array.AppendNull())).Build()), SFDataType.FIXED }, + { new ArrowResultChunk(new RecordBatch.Builder().Append("Col_Int32", false, col => col.Int32(array => array.AppendNull())).Build()), SFDataType.FIXED }, + { new ArrowResultChunk(new RecordBatch.Builder().Append("Col_Int64", false, col => col.Int64(array => array.AppendNull())).Build()), SFDataType.FIXED }, + { new ArrowResultChunk(new RecordBatch.Builder().Append("Col_Decimal128", false, col => col.Decimal128(new Decimal128Type(0, 0), array => array.AppendNull())).Build()), SFDataType.FIXED }, + { new ArrowResultChunk(new RecordBatch.Builder().Append("Col_Boolean", false, col => col.Boolean(array => array.AppendNull())).Build()), SFDataType.BOOLEAN }, + { new ArrowResultChunk(new RecordBatch.Builder().Append("Col_Real", false, col => col.Double(array => array.AppendNull())).Build()), SFDataType.REAL }, + { new ArrowResultChunk(new RecordBatch.Builder().Append("Col_Text", false, col => col.String(array => array.AppendNull())).Build()), SFDataType.TEXT }, + { new ArrowResultChunk(new RecordBatch.Builder().Append("Col_Array", false, col => col.String(array => array.AppendNull())).Build()), SFDataType.ARRAY }, + { new ArrowResultChunk(new RecordBatch.Builder().Append("Col_Variant", false, col => col.String(array => array.AppendNull())).Build()), SFDataType.VARIANT }, + { new ArrowResultChunk(new RecordBatch.Builder().Append("Col_Object", false, col => col.String(array => array.AppendNull())).Build()), SFDataType.OBJECT }, + { new ArrowResultChunk(new RecordBatch.Builder().Append("Col_Binary", false, col => col.Binary(array => array.AppendNull())).Build()), SFDataType.BINARY }, + { new ArrowResultChunk(new RecordBatch.Builder().Append("Col_Date", false, col => col.Date32(array => array.AppendNull())).Build()), SFDataType.DATE }, + { new ArrowResultChunk(new RecordBatch.Builder().Append("Col_Time", false, col => col.Int32(array => array.AppendNull())).Build()), SFDataType.TIME }, + { new ArrowResultChunk(new RecordBatch.Builder().Append("Col_Timestamp_TZ", false, col => col.Int32(array => array.AppendNull())).Build()), SFDataType.TIMESTAMP_TZ }, + { new ArrowResultChunk(new RecordBatch.Builder().Append("Col_Timestamp_LTZ", false, col => col.Int32(array => array.AppendNull())).Build()), SFDataType.TIMESTAMP_LTZ }, + { new ArrowResultChunk(new RecordBatch.Builder().Append("Col_Timestamp_NTZ", false, col => col.Int32(array => array.AppendNull())).Build()), SFDataType.TIMESTAMP_NTZ }, + }; + + foreach (var pair in cases) + { + var chunk = pair.Key; + var type = pair.Value; + chunk.Next(); + Assert.AreEqual(DBNull.Value, chunk.ExtractCell(0, type, 0), $"Expected DBNull.Value for SFDataType: {type}"); + } } [Test] diff --git a/Snowflake.Data/Core/ArrowResultChunk.cs b/Snowflake.Data/Core/ArrowResultChunk.cs index 4b6171d35..4422d1957 100755 --- a/Snowflake.Data/Core/ArrowResultChunk.cs +++ b/Snowflake.Data/Core/ArrowResultChunk.cs @@ -5,7 +5,6 @@ using System; using System.Collections.Generic; using Apache.Arrow; -using Apache.Arrow.Types; namespace Snowflake.Data.Core { @@ -28,18 +27,43 @@ internal class ArrowResultChunk : BaseResultChunk 1000000000 }; + private const long TicksPerDay = (long)24 * 60 * 60 * 1000 * 10000; + public List RecordBatch { get; set; } - private int _currentBatchIndex = 0; + private sbyte[][] _sbyte; + private short[][] _short; + private int[][] _int; + private int[][] _fraction; + private long[][] _long; + + private byte[][] _byte; + private double[][] _double; + + + private int _currentBatchIndex; private int _currentRecordIndex = -1; + private void ResetTempTables() + { + _sbyte = new sbyte[ColumnCount][]; + _short = new short[ColumnCount][]; + _int = new int[ColumnCount][]; + _fraction = new int[ColumnCount][]; + _long = new long[ColumnCount][]; + _byte = new byte[ColumnCount][]; + _double = new double[ColumnCount][]; + } + public ArrowResultChunk(RecordBatch recordBatch) { RecordBatch = new List { recordBatch }; RowCount = recordBatch.Length; ColumnCount = recordBatch.ColumnCount; - ChunkIndex = 0; + ChunkIndex = -1; + + ResetTempTables(); } public ArrowResultChunk(int columnCount) @@ -48,21 +72,25 @@ public ArrowResultChunk(int columnCount) RowCount = 0; ColumnCount = columnCount; - ChunkIndex = 0; + ChunkIndex = -1; + + ResetTempTables(); } public void AddRecordBatch(RecordBatch recordBatch) { RecordBatch.Add(recordBatch); } - + internal override void Reset(ExecResponseChunk chunkInfo, int chunkIndex) { base.Reset(chunkInfo, chunkIndex); - + _currentBatchIndex = 0; _currentRecordIndex = -1; RecordBatch.Clear(); + + ResetTempTables(); } internal override bool Next() @@ -77,6 +105,8 @@ internal override bool Next() _currentBatchIndex += 1; _currentRecordIndex = 0; + ResetTempTables(); + return _currentBatchIndex < RecordBatch.Count; } @@ -94,6 +124,8 @@ internal override bool Rewind() if (_currentBatchIndex >= 0) { _currentRecordIndex = RecordBatch[_currentBatchIndex].Length - 1; + + ResetTempTables(); return true; } @@ -106,6 +138,7 @@ public override UTF8Buffer ExtractCell(int rowIndex, int columnIndex) throw new NotSupportedException(); } + [Obsolete("ExtractCell with columnIndex is deprecated", false)] public override UTF8Buffer ExtractCell(int columnIndex) { throw new NotSupportedException(); @@ -123,106 +156,174 @@ public object ExtractCell(int columnIndex, SFDataType srcType, long scale) case SFDataType.FIXED: // Snowflake data types that are fixed-point numbers will fall into this category // e.g. NUMBER, DECIMAL/NUMERIC, INT/INTEGER - switch (column.Data.DataType.TypeId) + switch (column) { - case ArrowTypeId.Int8: + case Int8Array array: + if (_sbyte[columnIndex] == null) + _sbyte[columnIndex] = array.Values.ToArray(); if (scale == 0) - return ((Int8Array)column).GetValue(_currentRecordIndex); - else - return ((Int8Array)column).GetValue(_currentRecordIndex) / (decimal)s_powersOf10[scale]; - case ArrowTypeId.Int16: + return _sbyte[columnIndex][_currentRecordIndex]; + return _sbyte[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale]; + + case Int16Array array: + if (_short[columnIndex] == null) + _short[columnIndex] = array.Values.ToArray(); if (scale == 0) - return ((Int16Array)column).GetValue(_currentRecordIndex); - else - return ((Int16Array)column).GetValue(_currentRecordIndex) / (decimal)s_powersOf10[scale]; - case ArrowTypeId.Int32: + return _short[columnIndex][_currentRecordIndex]; + return _short[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale]; + + case Int32Array array: + if (_int[columnIndex] == null) + _int[columnIndex] = array.Values.ToArray(); if (scale == 0) - return ((Int32Array)column).GetValue(_currentRecordIndex); - else - return ((Int32Array)column).GetValue(_currentRecordIndex) / (decimal)s_powersOf10[scale]; - case ArrowTypeId.Int64: + return _int[columnIndex][_currentRecordIndex]; + return _int[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale]; + + case Int64Array array: + if (_long[columnIndex] == null) + _long[columnIndex] = array.Values.ToArray(); if (scale == 0) - return ((Int64Array)column).GetValue(_currentRecordIndex); - else - return ((Int64Array)column).GetValue(_currentRecordIndex) / (decimal)s_powersOf10[scale]; - case ArrowTypeId.Decimal128: - return ((Decimal128Array)column).GetValue(_currentRecordIndex); + return _long[columnIndex][_currentRecordIndex]; + return _long[columnIndex][_currentRecordIndex] / (decimal)s_powersOf10[scale]; + + case Decimal128Array array: + return array.GetValue(_currentRecordIndex); } break; case SFDataType.BOOLEAN: return ((BooleanArray)column).GetValue(_currentRecordIndex); + case SFDataType.REAL: // Snowflake data types that are floating-point numbers will fall in this category // e.g. FLOAT/REAL/DOUBLE - return ((DoubleArray)column).GetValue(_currentRecordIndex); + if (_double[columnIndex] == null) + _double[columnIndex] = ((DoubleArray)column).Values.ToArray(); + return _double[columnIndex][_currentRecordIndex]; + case SFDataType.TEXT: case SFDataType.ARRAY: case SFDataType.VARIANT: case SFDataType.OBJECT: - return ((StringArray)column).GetString(_currentRecordIndex); + if (_byte[columnIndex] == null || _int[columnIndex] == null) + { + _byte[columnIndex] = ((StringArray)column).Values.ToArray(); + _int[columnIndex] = ((StringArray)column).ValueOffsets.ToArray(); + } + return StringArray.DefaultEncoding.GetString( + _byte[columnIndex], + _int[columnIndex][_currentRecordIndex], + _int[columnIndex][_currentRecordIndex + 1] - _int[columnIndex][_currentRecordIndex]); + case SFDataType.BINARY: return ((BinaryArray)column).GetBytes(_currentRecordIndex).ToArray(); + case SFDataType.DATE: - return ((Date32Array)column).GetDateTime(_currentRecordIndex); + if (_int[columnIndex] == null) + _int[columnIndex] = ((Date32Array)column).Values.ToArray(); + return SFDataConverter.UnixEpoch.AddTicks(_int[columnIndex][_currentRecordIndex] * TicksPerDay); + case SFDataType.TIME: { - var value = column.Data.DataType.TypeId == ArrowTypeId.Int32 - ? ((Int32Array)column).GetValue(_currentRecordIndex) - : ((Int64Array)column).GetValue(_currentRecordIndex); + long value; + + if (column.GetType() == typeof(Int32Array)) + { + if (_int[columnIndex] == null) + { + _int[columnIndex] = ((Int32Array)column).Values.ToArray(); + } + + value = _int[columnIndex][_currentRecordIndex]; + } + else + { + if (_long[columnIndex] == null) + { + _long[columnIndex] = ((Int64Array)column).Values.ToArray(); + } + + value = _long[columnIndex][_currentRecordIndex]; + } + if (scale == 0) - return DateTimeOffset.FromUnixTimeSeconds(value.Value).DateTime; + return DateTimeOffset.FromUnixTimeSeconds(value).DateTime; if (scale <= 3) - return DateTimeOffset.FromUnixTimeMilliseconds(value.Value * s_powersOf10[3 - scale]) + return DateTimeOffset.FromUnixTimeMilliseconds(value * s_powersOf10[3 - scale]) .DateTime; if (scale <= 7) - return s_epochDate.AddTicks(value.Value * s_powersOf10[7 - scale]).DateTime; - return s_epochDate.AddTicks(value.Value / s_powersOf10[scale - 7]).DateTime; + return s_epochDate.AddTicks(value * s_powersOf10[7 - scale]).DateTime; + return s_epochDate.AddTicks(value / s_powersOf10[scale - 7]).DateTime; } case SFDataType.TIMESTAMP_TZ: - if (((StructArray)column).Fields.Count == 2) + var structCol = (StructArray)column; + if (_long[columnIndex] == null) + _long[columnIndex] = ((Int64Array)structCol.Fields[0]).Values.ToArray(); + + if (structCol.Fields.Count == 2) { - var value = ((Int64Array)((StructArray)column).Fields[0]).GetValue(_currentRecordIndex); - var timezone = ((Int32Array)((StructArray)column).Fields[1]).GetValue(_currentRecordIndex); - var epoch = ExtractEpoch(value.Value, scale); - var fraction = ExtractFraction(value.Value, scale); - return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToOffset(TimeSpan.FromMinutes(timezone.Value - 1440)); + if (_int[columnIndex] == null) + _int[columnIndex] = ((Int32Array)structCol.Fields[1]).Values.ToArray(); + var value = _long[columnIndex][_currentRecordIndex]; + var timezone = _int[columnIndex][_currentRecordIndex]; + var epoch = ExtractEpoch(value, scale); + var fraction = ExtractFraction(value, scale); + return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToOffset(TimeSpan.FromMinutes(timezone - 1440)); } else { - var epoch = ((Int64Array)((StructArray)column).Fields[0]).GetValue(_currentRecordIndex); - var fraction = ((Int32Array)((StructArray)column).Fields[1]).GetValue(_currentRecordIndex); - var timezone = ((Int32Array)((StructArray)column).Fields[2]).GetValue(_currentRecordIndex); - return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).ToOffset(TimeSpan.FromMinutes(timezone.Value - 1440)); + if (_fraction[columnIndex] == null) + _fraction[columnIndex] = ((Int32Array)structCol.Fields[1]).Values.ToArray(); + if (_int[columnIndex] == null) + _int[columnIndex] = ((Int32Array)structCol.Fields[2]).Values.ToArray(); + + var epoch = _long[columnIndex][_currentRecordIndex]; + var fraction = _fraction[columnIndex][_currentRecordIndex]; + var timezone = _int[columnIndex][_currentRecordIndex]; + return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToOffset(TimeSpan.FromMinutes(timezone - 1440)); } case SFDataType.TIMESTAMP_LTZ: - if (column.Data.DataType.TypeId == ArrowTypeId.Struct) + if (column.GetType() == typeof(StructArray)) { - var epoch = ((Int64Array)((StructArray)column).Fields[0]).GetValue(_currentRecordIndex); - var fraction = ((Int32Array)((StructArray)column).Fields[1]).GetValue(_currentRecordIndex); - return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).ToLocalTime(); + if (_long[columnIndex] == null) + _long[columnIndex] = ((Int64Array)((StructArray)column).Fields[0]).Values.ToArray(); + if (_fraction[columnIndex] == null) + _fraction[columnIndex] = ((Int32Array)((StructArray)column).Fields[1]).Values.ToArray(); + var epoch = _long[columnIndex][_currentRecordIndex]; + var fraction = _fraction[columnIndex][_currentRecordIndex]; + return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToLocalTime(); } else { - var value = ((Int64Array)column).GetValue(_currentRecordIndex); - var epoch = ExtractEpoch(value.Value, scale); - var fraction = ExtractFraction(value.Value, scale); + if (_long[columnIndex] == null) + _long[columnIndex] = ((Int64Array)column).Values.ToArray(); + + var value = _long[columnIndex][_currentRecordIndex]; + var epoch = ExtractEpoch(value, scale); + var fraction = ExtractFraction(value, scale); return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).ToLocalTime(); } case SFDataType.TIMESTAMP_NTZ: - if (column.Data.DataType.TypeId == ArrowTypeId.Struct) + if (column.GetType() == typeof(StructArray)) { - var epoch = ((Int64Array)((StructArray)column).Fields[0]).GetValue(_currentRecordIndex); - var fraction = ((Int32Array)((StructArray)column).Fields[1]).GetValue(_currentRecordIndex); - return s_epochDate.AddSeconds(epoch.Value).AddTicks(fraction.Value / 100).DateTime; + if (_long[columnIndex] == null) + _long[columnIndex] = ((Int64Array)((StructArray)column).Fields[0]).Values.ToArray(); + if (_fraction[columnIndex] == null) + _fraction[columnIndex] = ((Int32Array)((StructArray)column).Fields[1]).Values.ToArray(); + var epoch = _long[columnIndex][_currentRecordIndex]; + var fraction = _fraction[columnIndex][_currentRecordIndex]; + return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).DateTime; } else { - var value = ((Int64Array)column).GetValue(_currentRecordIndex); - var epoch = ExtractEpoch(value.Value, scale); - var fraction = ExtractFraction(value.Value, scale); + if (_long[columnIndex] == null) + _long[columnIndex] = ((Int64Array)column).Values.ToArray(); + + var value = _long[columnIndex][_currentRecordIndex]; + var epoch = ExtractEpoch(value, scale); + var fraction = ExtractFraction(value, scale); return s_epochDate.AddSeconds(epoch).AddTicks(fraction / 100).DateTime; } } diff --git a/Snowflake.Data/Core/ArrowResultSet.cs b/Snowflake.Data/Core/ArrowResultSet.cs index ced646c5c..31e0eccca 100755 --- a/Snowflake.Data/Core/ArrowResultSet.cs +++ b/Snowflake.Data/Core/ArrowResultSet.cs @@ -23,27 +23,11 @@ class ArrowResultSet : SFBaseResultSet private BaseResultChunk _currentChunk; private readonly IChunkDownloader _chunkDownloader; - public ArrowResultSet(QueryExecResponseData responseData, SFStatement sfStatement, CancellationToken cancellationToken) : base() + public ArrowResultSet(QueryExecResponseData responseData, SFStatement sfStatement, CancellationToken cancellationToken) { columnCount = responseData.rowType.Count; try { - if (responseData.rowsetBase64.Length > 0) - { - using (var stream = new MemoryStream(Convert.FromBase64String(responseData.rowsetBase64))) - { - using (var reader = new ArrowStreamReader(stream)) - { - var recordBatch = reader.ReadNextRecordBatch(); - _currentChunk = new ArrowResultChunk(recordBatch); - } - } - } - else - { - _currentChunk = new ArrowResultChunk(columnCount); - } - this.sfStatement = sfStatement; UpdateSessionStatus(responseData); @@ -60,14 +44,39 @@ public ArrowResultSet(QueryExecResponseData responseData, SFStatement sfStatemen isClosed = false; queryId = responseData.queryId; + + ReadChunk(responseData); } - catch(System.Exception ex) + catch(Exception ex) { s_logger.Error("Result set error queryId="+responseData.queryId, ex); throw; } } + private void ReadChunk(QueryExecResponseData responseData) + { + if (responseData.rowsetBase64.Length > 0) + { + using (var stream = new MemoryStream(Convert.FromBase64String(responseData.rowsetBase64))) + { + using (var reader = new ArrowStreamReader(stream)) + { + var recordBatch = reader.ReadNextRecordBatch(); + _currentChunk = new ArrowResultChunk(recordBatch); + while ((recordBatch = reader.ReadNextRecordBatch()) != null) + { + ((ArrowResultChunk)_currentChunk).AddRecordBatch(recordBatch); + } + } + } + } + else + { + _currentChunk = new ArrowResultChunk(columnCount); + } + } + internal override async Task NextAsync() { ThrowIfClosed(); @@ -77,14 +86,16 @@ internal override async Task NextAsync() if (_totalChunkCount > 0) { - s_logger.Debug("Get next chunk from chunk downloader"); + s_logger.Debug($"Get next chunk from chunk downloader, chunk: {_currentChunk.ChunkIndex + 1}/{_totalChunkCount}" + + $" rows: {_currentChunk.RowCount}, size compressed: {_currentChunk.CompressedSize}," + + $" size uncompressed: {_currentChunk.UncompressedSize}"); _currentChunk = await _chunkDownloader.GetNextChunkAsync().ConfigureAwait(false); return _currentChunk?.Next() ?? false; } return false; } - + internal override bool Next() { ThrowIfClosed(); @@ -94,8 +105,11 @@ internal override bool Next() if (_totalChunkCount > 0) { - s_logger.Debug("Get next chunk from chunk downloader"); + s_logger.Debug($"Get next chunk from chunk downloader, chunk: {_currentChunk.ChunkIndex + 1}/{_totalChunkCount}" + + $" rows: {_currentChunk.RowCount}, size compressed: {_currentChunk.CompressedSize}," + + $" size uncompressed: {_currentChunk.UncompressedSize}"); _currentChunk = Task.Run(async() => await (_chunkDownloader.GetNextChunkAsync()).ConfigureAwait(false)).Result; + return _currentChunk?.Next() ?? false; } @@ -140,7 +154,7 @@ internal override bool Rewind() return false; } - + private object GetObjectInternal(int ordinal) { ThrowIfClosed(); @@ -159,14 +173,39 @@ internal override object GetValue(int ordinal) { var value = GetObjectInternal(ordinal); if (value == DBNull.Value) + { return value; + } - if (value is decimal ret) - return ret; - - var dstType = sfResultSetMetaData.GetCSharpTypeByIndex(ordinal); + object obj; + checked + { + switch (value) + { + case decimal ret: obj = ret; + break; + case long ret: obj = ret; + break; + case int ret: obj = (long)ret; + break; + case short ret: obj = (long)ret; + break; + case sbyte ret: obj = (long)ret; + break; + case string ret: obj = ret; + break; + case bool ret: obj = ret; + break; + default: + { + var dstType = sfResultSetMetaData.GetCSharpTypeByIndex(ordinal); + obj = Convert.ChangeType(value, dstType); + break; + } + } + } - return Convert.ChangeType(value, dstType); + return obj; } internal override bool IsDBNull(int ordinal) diff --git a/Snowflake.Data/Core/BaseResultChunk.cs b/Snowflake.Data/Core/BaseResultChunk.cs index e84fa1602..37e8fa114 100755 --- a/Snowflake.Data/Core/BaseResultChunk.cs +++ b/Snowflake.Data/Core/BaseResultChunk.cs @@ -16,6 +16,10 @@ public abstract class BaseResultChunk : IResultChunk public int ChunkIndex { get; protected set; } + internal int CompressedSize; + + internal int UncompressedSize; + internal string Url { get; set; } internal string[,] RowSet { get; set; } @@ -32,12 +36,14 @@ public abstract class BaseResultChunk : IResultChunk internal abstract bool Next(); internal abstract bool Rewind(); - + internal virtual void Reset(ExecResponseChunk chunkInfo, int chunkIndex) { RowCount = chunkInfo.rowCount; Url = chunkInfo.url; ChunkIndex = chunkIndex; + CompressedSize = chunkInfo.compressedSize; + UncompressedSize = chunkInfo.uncompressedSize; } internal virtual void ResetForRetry() diff --git a/Snowflake.Data/Core/RestResponse.cs b/Snowflake.Data/Core/RestResponse.cs index ae1055c28..97f3f9772 100755 --- a/Snowflake.Data/Core/RestResponse.cs +++ b/Snowflake.Data/Core/RestResponse.cs @@ -307,6 +307,9 @@ internal class ExecResponseChunk [JsonProperty(PropertyName = "uncompressedSize")] internal int uncompressedSize { get; set; } + + [JsonProperty(PropertyName = "compressedSize")] + internal int compressedSize { get; set; } } internal class CloseResponse : BaseRestResponse diff --git a/Snowflake.Data/Core/SFResultChunk.cs b/Snowflake.Data/Core/SFResultChunk.cs index 0917bd8cd..06b158ab3 100755 --- a/Snowflake.Data/Core/SFResultChunk.cs +++ b/Snowflake.Data/Core/SFResultChunk.cs @@ -18,6 +18,7 @@ public SFResultChunk(string[,] rowSet) RowSet = rowSet; RowCount = rowSet.GetLength(0); ColumnCount = rowSet.GetLength(1); + ChunkIndex = -1; } public SFResultChunk(string url, int rowCount, int columnCount, int index) diff --git a/Snowflake.Data/Core/SFResultSet.cs b/Snowflake.Data/Core/SFResultSet.cs index c8ab3b40e..55b069806 100755 --- a/Snowflake.Data/Core/SFResultSet.cs +++ b/Snowflake.Data/Core/SFResultSet.cs @@ -8,6 +8,7 @@ using Snowflake.Data.Log; using Snowflake.Data.Client; using System.Collections.Generic; +using System.Diagnostics; namespace Snowflake.Data.Core { @@ -115,7 +116,9 @@ internal override async Task NextAsync() { // GetNextChunk could be blocked if download result is not done yet. // So put this piece of code in a seperate task - s_logger.Debug("Get next chunk from chunk downloader"); + s_logger.Debug($"Get next chunk from chunk downloader, chunk: {_currentChunk.ChunkIndex + 1}/{_totalChunkCount}" + + $" rows: {_currentChunk.RowCount}, size compressed: {_currentChunk.CompressedSize}," + + $" size uncompressed: {_currentChunk.UncompressedSize}"); BaseResultChunk nextChunk = await _chunkDownloader.GetNextChunkAsync().ConfigureAwait(false); if (nextChunk != null) { @@ -136,7 +139,9 @@ internal override bool Next() if (_chunkDownloader != null) { - s_logger.Debug("Get next chunk from chunk downloader"); + s_logger.Debug($"Get next chunk from chunk downloader, chunk: {_currentChunk.ChunkIndex + 1}/{_totalChunkCount}" + + $" rows: {_currentChunk.RowCount}, size compressed: {_currentChunk.CompressedSize}," + + $" size uncompressed: {_currentChunk.UncompressedSize}"); BaseResultChunk nextChunk = Task.Run(async() => await (_chunkDownloader.GetNextChunkAsync()).ConfigureAwait(false)).Result; if (nextChunk != null) { @@ -284,7 +289,7 @@ internal override string GetString(int ordinal) return GetObjectInternal(ordinal).SafeToString(); } } - + internal override object GetValue(int ordinal) { UTF8Buffer val = GetObjectInternal(ordinal); diff --git a/Snowflake.Data/Snowflake.Data.csproj b/Snowflake.Data/Snowflake.Data.csproj index f82793477..c8a86434a 100644 --- a/Snowflake.Data/Snowflake.Data.csproj +++ b/Snowflake.Data/Snowflake.Data.csproj @@ -19,7 +19,7 @@ - +