diff --git a/parent-pom.xml b/parent-pom.xml index d58003cba..28412d161 100644 --- a/parent-pom.xml +++ b/parent-pom.xml @@ -94,7 +94,7 @@ 3.0.1 3.1.1 0.8.8 - 0.17.2 + 0.23.0 3.3.0 3.5.0 3.4.2 diff --git a/src/main/java/net/snowflake/client/core/arrow/ArrowVectorConverter.java b/src/main/java/net/snowflake/client/core/arrow/ArrowVectorConverter.java index f61e9954d..788de87e0 100644 --- a/src/main/java/net/snowflake/client/core/arrow/ArrowVectorConverter.java +++ b/src/main/java/net/snowflake/client/core/arrow/ArrowVectorConverter.java @@ -7,8 +7,22 @@ import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.Map; import java.util.TimeZone; +import net.snowflake.client.core.DataConversionContext; +import net.snowflake.client.core.SFBaseSession; import net.snowflake.client.core.SFException; +import net.snowflake.client.jdbc.ErrorCode; +import net.snowflake.client.jdbc.SnowflakeSQLException; +import net.snowflake.client.jdbc.SnowflakeSQLLoggedException; +import net.snowflake.client.jdbc.SnowflakeType; +import net.snowflake.common.core.SqlState; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.complex.FixedSizeListVector; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.types.Types; /** Interface to convert from arrow vector values into java data types. */ public interface ArrowVectorConverter { @@ -163,4 +177,201 @@ public interface ArrowVectorConverter { * @param isUTC true or false value of whether NTZ timestamp should be set to UTC */ void setTreatNTZAsUTC(boolean isUTC); + + /** + * Given an arrow vector (a single column in a single record batch), return an arrow vector + * converter. Note, converter is built on top of arrow vector, so that arrow data can be converted + * back to java data + * + *

+ * + *

Arrow converter mappings for Snowflake fixed-point numbers + * ----------------------------------------------------------------------------------------- Max + * position and scale Converter + * ----------------------------------------------------------------------------------------- + * number(3,0) {@link TinyIntToFixedConverter} number(3,2) {@link TinyIntToScaledFixedConverter} + * number(5,0) {@link SmallIntToFixedConverter} number(5,4) {@link SmallIntToScaledFixedConverter} + * number(10,0) {@link IntToFixedConverter} number(10,9) {@link IntToScaledFixedConverter} + * number(19,0) {@link BigIntToFixedConverter} number(19,18) {@link BigIntToFixedConverter} + * number(38,37) {@link DecimalToScaledFixedConverter} + * ------------------------------------------------------------------------------------------ + * + * @param vector an arrow vector + * @param context data conversion context + * @param session SFBaseSession for purposes of logging + * @param idx the index of the vector in its batch + * @return A converter on top og the vector + */ + static ArrowVectorConverter initConverter( + ValueVector vector, DataConversionContext context, SFBaseSession session, int idx) + throws SnowflakeSQLException { + // arrow minor type + Types.MinorType type = Types.getMinorTypeForArrowType(vector.getField().getType()); + + // each column's metadata + Map customMeta = vector.getField().getMetadata(); + if (type == Types.MinorType.DECIMAL) { + // Note: Decimal vector is different from others + return new DecimalToScaledFixedConverter(vector, idx, context); + } else if (!customMeta.isEmpty()) { + SnowflakeType st = SnowflakeType.valueOf(customMeta.get("logicalType")); + switch (st) { + case ANY: + case CHAR: + case TEXT: + case VARIANT: + return new VarCharConverter(vector, idx, context); + + case MAP: + if (vector instanceof MapVector) { + return new MapConverter((MapVector) vector, idx, context); + } else { + return new VarCharConverter(vector, idx, context); + } + + case VECTOR: + return new VectorTypeConverter((FixedSizeListVector) vector, idx, context); + + case ARRAY: + if (vector instanceof ListVector) { + return new ArrayConverter((ListVector) vector, idx, context); + } else { + return new VarCharConverter(vector, idx, context); + } + + case OBJECT: + if (vector instanceof StructVector) { + return new StructConverter((StructVector) vector, idx, context); + } else { + return new VarCharConverter(vector, idx, context); + } + + case BINARY: + return new VarBinaryToBinaryConverter(vector, idx, context); + + case BOOLEAN: + return new BitToBooleanConverter(vector, idx, context); + + case DATE: + boolean getFormatDateWithTimeZone = false; + if (context.getSession() != null) { + getFormatDateWithTimeZone = context.getSession().getFormatDateWithTimezone(); + } + return new DateConverter(vector, idx, context, getFormatDateWithTimeZone); + + case FIXED: + String scaleStr = vector.getField().getMetadata().get("scale"); + int sfScale = Integer.parseInt(scaleStr); + switch (type) { + case TINYINT: + if (sfScale == 0) { + return new TinyIntToFixedConverter(vector, idx, context); + } else { + return new TinyIntToScaledFixedConverter(vector, idx, context, sfScale); + } + case SMALLINT: + if (sfScale == 0) { + return new SmallIntToFixedConverter(vector, idx, context); + } else { + return new SmallIntToScaledFixedConverter(vector, idx, context, sfScale); + } + case INT: + if (sfScale == 0) { + return new IntToFixedConverter(vector, idx, context); + } else { + return new IntToScaledFixedConverter(vector, idx, context, sfScale); + } + case BIGINT: + if (sfScale == 0) { + return new BigIntToFixedConverter(vector, idx, context); + } else { + return new BigIntToScaledFixedConverter(vector, idx, context, sfScale); + } + } + break; + + case REAL: + return new DoubleToRealConverter(vector, idx, context); + + case TIME: + switch (type) { + case INT: + return new IntToTimeConverter(vector, idx, context); + case BIGINT: + return new BigIntToTimeConverter(vector, idx, context); + default: + throw new SnowflakeSQLLoggedException( + session, + ErrorCode.INTERNAL_ERROR.getMessageCode(), + SqlState.INTERNAL_ERROR, + "Unexpected Arrow Field for ", + st.name()); + } + + case TIMESTAMP_LTZ: + if (vector.getField().getChildren().isEmpty()) { + // case when the scale of the timestamp is equal or smaller than millisecs since epoch + return new BigIntToTimestampLTZConverter(vector, idx, context); + } else if (vector.getField().getChildren().size() == 2) { + // case when the scale of the timestamp is larger than millisecs since epoch, e.g., + // nanosecs + return new TwoFieldStructToTimestampLTZConverter(vector, idx, context); + } else { + throw new SnowflakeSQLLoggedException( + session, + ErrorCode.INTERNAL_ERROR.getMessageCode(), + SqlState.INTERNAL_ERROR, + "Unexpected Arrow Field for ", + st.name()); + } + + case TIMESTAMP_NTZ: + if (vector.getField().getChildren().isEmpty()) { + // case when the scale of the timestamp is equal or smaller than 7 + return new BigIntToTimestampNTZConverter(vector, idx, context); + } else if (vector.getField().getChildren().size() == 2) { + // when the timestamp is represent in two-field struct + return new TwoFieldStructToTimestampNTZConverter(vector, idx, context); + } else { + throw new SnowflakeSQLLoggedException( + session, + ErrorCode.INTERNAL_ERROR.getMessageCode(), + SqlState.INTERNAL_ERROR, + "Unexpected Arrow Field for ", + st.name()); + } + + case TIMESTAMP_TZ: + if (vector.getField().getChildren().size() == 2) { + // case when the scale of the timestamp is equal or smaller than millisecs since epoch + return new TwoFieldStructToTimestampTZConverter(vector, idx, context); + } else if (vector.getField().getChildren().size() == 3) { + // case when the scale of the timestamp is larger than millisecs since epoch, e.g., + // nanosecs + return new ThreeFieldStructToTimestampTZConverter(vector, idx, context); + } else { + throw new SnowflakeSQLLoggedException( + session, + ErrorCode.INTERNAL_ERROR.getMessageCode(), + SqlState.INTERNAL_ERROR, + "Unexpected SnowflakeType ", + st.name()); + } + + default: + throw new SnowflakeSQLLoggedException( + session, + ErrorCode.INTERNAL_ERROR.getMessageCode(), + SqlState.INTERNAL_ERROR, + "Unexpected Arrow Field for ", + st.name()); + } + } + throw new SnowflakeSQLLoggedException( + session, + ErrorCode.INTERNAL_ERROR.getMessageCode(), + SqlState.INTERNAL_ERROR, + "Unexpected Arrow Field for ", + type.toString()); + } } diff --git a/src/main/java/net/snowflake/client/jdbc/ArrowResultChunk.java b/src/main/java/net/snowflake/client/jdbc/ArrowResultChunk.java index 0f7bd7c77..dca895464 100644 --- a/src/main/java/net/snowflake/client/jdbc/ArrowResultChunk.java +++ b/src/main/java/net/snowflake/client/jdbc/ArrowResultChunk.java @@ -3,44 +3,18 @@ */ package net.snowflake.client.jdbc; +import static net.snowflake.client.core.arrow.ArrowVectorConverter.initConverter; + import java.io.IOException; import java.io.InputStream; import java.nio.channels.ClosedByInterruptException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.Map; import net.snowflake.client.core.DataConversionContext; import net.snowflake.client.core.SFBaseSession; import net.snowflake.client.core.SFException; -import net.snowflake.client.core.arrow.ArrayConverter; import net.snowflake.client.core.arrow.ArrowResultChunkIndexSorter; import net.snowflake.client.core.arrow.ArrowVectorConverter; -import net.snowflake.client.core.arrow.BigIntToFixedConverter; -import net.snowflake.client.core.arrow.BigIntToScaledFixedConverter; -import net.snowflake.client.core.arrow.BigIntToTimeConverter; -import net.snowflake.client.core.arrow.BigIntToTimestampLTZConverter; -import net.snowflake.client.core.arrow.BigIntToTimestampNTZConverter; -import net.snowflake.client.core.arrow.BitToBooleanConverter; -import net.snowflake.client.core.arrow.DateConverter; -import net.snowflake.client.core.arrow.DecimalToScaledFixedConverter; -import net.snowflake.client.core.arrow.DoubleToRealConverter; -import net.snowflake.client.core.arrow.IntToFixedConverter; -import net.snowflake.client.core.arrow.IntToScaledFixedConverter; -import net.snowflake.client.core.arrow.IntToTimeConverter; -import net.snowflake.client.core.arrow.MapConverter; -import net.snowflake.client.core.arrow.SmallIntToFixedConverter; -import net.snowflake.client.core.arrow.SmallIntToScaledFixedConverter; -import net.snowflake.client.core.arrow.StructConverter; -import net.snowflake.client.core.arrow.ThreeFieldStructToTimestampTZConverter; -import net.snowflake.client.core.arrow.TinyIntToFixedConverter; -import net.snowflake.client.core.arrow.TinyIntToScaledFixedConverter; -import net.snowflake.client.core.arrow.TwoFieldStructToTimestampLTZConverter; -import net.snowflake.client.core.arrow.TwoFieldStructToTimestampNTZConverter; -import net.snowflake.client.core.arrow.TwoFieldStructToTimestampTZConverter; -import net.snowflake.client.core.arrow.VarBinaryToBinaryConverter; -import net.snowflake.client.core.arrow.VarCharConverter; -import net.snowflake.client.core.arrow.VectorTypeConverter; import net.snowflake.client.log.SFLogger; import net.snowflake.client.log.SFLoggerFactory; import net.snowflake.common.core.SqlState; @@ -58,12 +32,8 @@ import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.complex.FixedSizeListVector; -import org.apache.arrow.vector.complex.ListVector; -import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.ipc.ArrowStreamReader; -import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.util.TransferPair; public class ArrowResultChunk extends SnowflakeResultChunk { @@ -84,7 +54,7 @@ public class ArrowResultChunk extends SnowflakeResultChunk { private boolean enableSortFirstResultChunk; private IntVector firstResultChunkSortedIndices; private VectorSchemaRoot root; - private static SFBaseSession session; + private SFBaseSession session; public ArrowResultChunk( String url, @@ -167,233 +137,15 @@ public void freeData() { } } - /** - * Given a list of arrow vectors (all columns in a single record batch), return list of arrow - * vector converter. Note, converter is built on top of arrow vector, so that arrow data can be - * converted back to java data - * - *

- * - *

Arrow converter mappings for Snowflake fixed-point numbers - * ----------------------------------------------------------------------------------------- Max - * position & scale Converter - * ----------------------------------------------------------------------------------------- - * number(3,0) {@link TinyIntToFixedConverter} number(3,2) {@link TinyIntToScaledFixedConverter} - * number(5,0) {@link SmallIntToFixedConverter} number(5,4) {@link SmallIntToScaledFixedConverter} - * number(10,0) {@link IntToFixedConverter} number(10,9) {@link IntToScaledFixedConverter} - * number(19,0) {@link BigIntToFixedConverter} number(19,18) {@link BigIntToFixedConverter} - * number(38,37) {@link DecimalToScaledFixedConverter} - * ------------------------------------------------------------------------------------------ - * - * @param vectors list of arrow vectors - * @return list of converters on top of each converters - */ - private static List initConverters( - List vectors, DataConversionContext context) throws SnowflakeSQLException { - List converters = new ArrayList<>(); - for (int i = 0; i < vectors.size(); i++) { - ValueVector vector = vectors.get(i); - // arrow minor type - Types.MinorType type = Types.getMinorTypeForArrowType(vector.getField().getType()); - - // each column's metadata - Map customMeta = vector.getField().getMetadata(); - if (type == Types.MinorType.DECIMAL) { - // Note: Decimal vector is different from others - converters.add(new DecimalToScaledFixedConverter(vector, i, context)); - } else if (!customMeta.isEmpty()) { - SnowflakeType st = SnowflakeType.valueOf(customMeta.get("logicalType")); - switch (st) { - case ANY: - case CHAR: - case TEXT: - case VARIANT: - converters.add(new VarCharConverter(vector, i, context)); - break; - - case MAP: - if (vector instanceof MapVector) { - converters.add(new MapConverter((MapVector) vector, i, context)); - } else { - converters.add(new VarCharConverter(vector, i, context)); - } - break; - - case VECTOR: - converters.add(new VectorTypeConverter((FixedSizeListVector) vector, i, context)); - break; - - case ARRAY: - if (vector instanceof ListVector) { - converters.add(new ArrayConverter((ListVector) vector, i, context)); - } else { - converters.add(new VarCharConverter(vector, i, context)); - } - break; - - case OBJECT: - if (vector instanceof StructVector) { - converters.add(new StructConverter((StructVector) vector, i, context)); - } else { - converters.add(new VarCharConverter(vector, i, context)); - } - break; - - case BINARY: - converters.add(new VarBinaryToBinaryConverter(vector, i, context)); - break; - - case BOOLEAN: - converters.add(new BitToBooleanConverter(vector, i, context)); - break; - - case DATE: - boolean getFormatDateWithTimeZone = false; - if (context.getSession() != null) { - getFormatDateWithTimeZone = context.getSession().getFormatDateWithTimezone(); - } - converters.add(new DateConverter(vector, i, context, getFormatDateWithTimeZone)); - break; - - case FIXED: - String scaleStr = vector.getField().getMetadata().get("scale"); - int sfScale = Integer.parseInt(scaleStr); - switch (type) { - case TINYINT: - if (sfScale == 0) { - converters.add(new TinyIntToFixedConverter(vector, i, context)); - } else { - converters.add(new TinyIntToScaledFixedConverter(vector, i, context, sfScale)); - } - break; - case SMALLINT: - if (sfScale == 0) { - converters.add(new SmallIntToFixedConverter(vector, i, context)); - } else { - converters.add(new SmallIntToScaledFixedConverter(vector, i, context, sfScale)); - } - break; - case INT: - if (sfScale == 0) { - converters.add(new IntToFixedConverter(vector, i, context)); - } else { - converters.add(new IntToScaledFixedConverter(vector, i, context, sfScale)); - } - break; - case BIGINT: - if (sfScale == 0) { - converters.add(new BigIntToFixedConverter(vector, i, context)); - } else { - converters.add(new BigIntToScaledFixedConverter(vector, i, context, sfScale)); - } - break; - } - break; - - case REAL: - converters.add(new DoubleToRealConverter(vector, i, context)); - break; - - case TIME: - switch (type) { - case INT: - converters.add(new IntToTimeConverter(vector, i, context)); - break; - case BIGINT: - converters.add(new BigIntToTimeConverter(vector, i, context)); - break; - default: - throw new SnowflakeSQLLoggedException( - session, - ErrorCode.INTERNAL_ERROR.getMessageCode(), - SqlState.INTERNAL_ERROR, - "Unexpected Arrow Field for ", - st.name()); - } - break; - - case TIMESTAMP_LTZ: - if (vector.getField().getChildren().isEmpty()) { - // case when the scale of the timestamp is equal or smaller than millisecs since epoch - converters.add(new BigIntToTimestampLTZConverter(vector, i, context)); - } else if (vector.getField().getChildren().size() == 2) { - // case when the scale of the timestamp is larger than millisecs since epoch, e.g., - // nanosecs - converters.add(new TwoFieldStructToTimestampLTZConverter(vector, i, context)); - } else { - throw new SnowflakeSQLLoggedException( - session, - ErrorCode.INTERNAL_ERROR.getMessageCode(), - SqlState.INTERNAL_ERROR, - "Unexpected Arrow Field for ", - st.name()); - } - break; - - case TIMESTAMP_NTZ: - if (vector.getField().getChildren().isEmpty()) { - // case when the scale of the timestamp is equal or smaller than 7 - converters.add(new BigIntToTimestampNTZConverter(vector, i, context)); - } else if (vector.getField().getChildren().size() == 2) { - // when the timestamp is represent in two-field struct - converters.add(new TwoFieldStructToTimestampNTZConverter(vector, i, context)); - } else { - throw new SnowflakeSQLLoggedException( - session, - ErrorCode.INTERNAL_ERROR.getMessageCode(), - SqlState.INTERNAL_ERROR, - "Unexpected Arrow Field for ", - st.name()); - } - break; - - case TIMESTAMP_TZ: - if (vector.getField().getChildren().size() == 2) { - // case when the scale of the timestamp is equal or smaller than millisecs since epoch - converters.add(new TwoFieldStructToTimestampTZConverter(vector, i, context)); - } else if (vector.getField().getChildren().size() == 3) { - // case when the scale of the timestamp is larger than millisecs since epoch, e.g., - // nanosecs - converters.add(new ThreeFieldStructToTimestampTZConverter(vector, i, context)); - } else { - throw new SnowflakeSQLLoggedException( - session, - ErrorCode.INTERNAL_ERROR.getMessageCode(), - SqlState.INTERNAL_ERROR, - "Unexpected SnowflakeType ", - st.name()); - } - break; - - default: - throw new SnowflakeSQLLoggedException( - session, - ErrorCode.INTERNAL_ERROR.getMessageCode(), - SqlState.INTERNAL_ERROR, - "Unexpected Arrow Field for ", - st.name()); - } - } else { - throw new SnowflakeSQLLoggedException( - session, - ErrorCode.INTERNAL_ERROR.getMessageCode(), - SqlState.INTERNAL_ERROR, - "Unexpected Arrow Field for ", - type.toString()); - } - } - return converters; - } - /** * @return an iterator to iterate over current chunk */ public ArrowChunkIterator getIterator(DataConversionContext dataConversionContext) { - return new ArrowChunkIterator(this, dataConversionContext); + return new ArrowChunkIterator(dataConversionContext); } public static ArrowChunkIterator getEmptyChunkIterator() { - return new ArrowChunkIterator(new EmptyArrowResultChunk()); + return new EmptyArrowResultChunk().new ArrowChunkIterator(null); } public void enableSortFirstResultChunk() { @@ -401,10 +153,7 @@ public void enableSortFirstResultChunk() { } /** Iterator class used to go through the arrow chunk row by row */ - public static class ArrowChunkIterator { - /** chunk that iterator will iterate through */ - private ArrowResultChunk resultChunk; - + public class ArrowChunkIterator { /** index of record batch that iterator currently points to */ private int currentRecordBatchIndex; @@ -426,22 +175,29 @@ public static class ArrowChunkIterator { /** formatters to each data type */ private DataConversionContext dataConversionContext; - ArrowChunkIterator(ArrowResultChunk resultChunk, DataConversionContext dataConversionContext) { - this.resultChunk = resultChunk; + ArrowChunkIterator(DataConversionContext dataConversionContext) { this.currentRecordBatchIndex = -1; - this.totalRecordBatch = resultChunk.batchOfVectors.size(); + this.totalRecordBatch = batchOfVectors.size(); this.currentRowInRecordBatch = -1; this.rowCountInCurrentRecordBatch = 0; this.dataConversionContext = dataConversionContext; } - ArrowChunkIterator(EmptyArrowResultChunk emptyArrowResultChunk) { - this.resultChunk = emptyArrowResultChunk; - this.currentRecordBatchIndex = 0; - this.totalRecordBatch = 0; - this.currentRowInRecordBatch = -1; - this.rowCountInCurrentRecordBatch = 0; - this.currentConverters = Collections.emptyList(); + /** + * Given a list of arrow vectors (all columns in a single record batch), return list of arrow + * vector converter. Note, converter is built on top of arrow vector, so that arrow data can be + * converted back to java data + * + * @param vectors list of arrow vectors + * @return list of converters on top of each converters + */ + private List initConverters(List vectors) + throws SnowflakeSQLException { + List converters = new ArrayList<>(); + for (int i = 0; i < vectors.size(); i++) { + converters.add(initConverter(vectors.get(i), dataConversionContext, session, i)); + } + return converters; } /** advance to next row */ @@ -454,26 +210,22 @@ public boolean next() throws SnowflakeSQLException { currentRecordBatchIndex++; if (currentRecordBatchIndex < totalRecordBatch) { this.currentRowInRecordBatch = 0; - if (currentRecordBatchIndex == 0 && resultChunk.sortFirstResultChunkEnabled()) { + if (currentRecordBatchIndex == 0 && sortFirstResultChunkEnabled()) { // perform client-side sorting for the first chunk (only used in Snowflake internal // regression tests) // if first chunk has multiple record batches, merge them into one and sort it - if (resultChunk.batchOfVectors.size() > 1) { - resultChunk.mergeBatchesIntoOne(); + if (batchOfVectors.size() > 1) { + mergeBatchesIntoOne(); totalRecordBatch = 1; } this.rowCountInCurrentRecordBatch = - resultChunk.batchOfVectors.get(currentRecordBatchIndex).get(0).getValueCount(); - currentConverters = - initConverters( - resultChunk.batchOfVectors.get(currentRecordBatchIndex), dataConversionContext); - resultChunk.sortFirstResultChunk(currentConverters); + batchOfVectors.get(currentRecordBatchIndex).get(0).getValueCount(); + currentConverters = initConverters(batchOfVectors.get(currentRecordBatchIndex)); + sortFirstResultChunk(currentConverters); } else { this.rowCountInCurrentRecordBatch = - resultChunk.batchOfVectors.get(currentRecordBatchIndex).get(0).getValueCount(); - currentConverters = - initConverters( - resultChunk.batchOfVectors.get(currentRecordBatchIndex), dataConversionContext); + batchOfVectors.get(currentRecordBatchIndex).get(0).getValueCount(); + currentConverters = initConverters(batchOfVectors.get(currentRecordBatchIndex)); } return true; } @@ -492,7 +244,7 @@ public boolean isAfterLast() { } public ArrowResultChunk getChunk() { - return resultChunk; + return ArrowResultChunk.this; } public ArrowVectorConverter getCurrentConverter(int columnIdx) throws SFException { @@ -507,8 +259,8 @@ public ArrowVectorConverter getCurrentConverter(int columnIdx) throws SFExceptio * @return index of row in current record batch */ public int getCurrentRowInRecordBatch() { - if (resultChunk.sortFirstResultChunkEnabled() && currentRecordBatchIndex == 0) { - return resultChunk.firstResultChunkSortedIndices.get(currentRowInRecordBatch); + if (sortFirstResultChunkEnabled() && currentRecordBatchIndex == 0) { + return firstResultChunkSortedIndices.get(currentRowInRecordBatch); } else { return currentRowInRecordBatch; }