diff --git a/pom.xml b/pom.xml index 360e70c38..dbf156690 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ net.snowflake snowflake-ingest-sdk - 2.0.3 + 2.0.4-SNAPSHOT jar Snowflake Ingest SDK Snowflake Ingest SDK @@ -63,7 +63,7 @@ 3.19.6 net.snowflake.ingest.internal 1.7.36 - 1.1.10.1 + 1.1.10.4 3.13.30 0.13.0 diff --git a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java index a8018954c..6936bc52a 100644 --- a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java +++ b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java @@ -110,7 +110,7 @@ public class RequestBuilder { // Don't change! public static final String CLIENT_NAME = "SnowpipeJavaSDK"; - public static final String DEFAULT_VERSION = "2.0.3"; + public static final String DEFAULT_VERSION = "2.0.4-SNAPSHOT"; public static final String JAVA_USER_AGENT = "JAVA"; diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java index 5fdadc90b..a8bb5db16 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestChannel.java @@ -8,6 +8,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import javax.annotation.Nullable; +import net.snowflake.ingest.streaming.internal.ColumnProperties; /** * A logical partition that represents a connection to a single Snowflake table, data will be @@ -17,7 +18,7 @@ * at the same time. When a new channel is opened, all previously opened channels with the same name * are invalidated (this applies for the table globally. not just in a single JVM). In order to * ingest data from multiple threads/clients/applications, we recommend opening multiple channels, - * each with a different name. There is no limit on the number of channels that can be opened. + * each with a different name. * *

Thread safety note: Implementations of this interface are required to be thread safe. */ @@ -253,4 +254,13 @@ InsertValidationResponse insertRows( */ @Nullable String getLatestCommittedOffsetToken(); + + /** + * Gets the table schema associated with this channel. Note that this is the table schema at the + * time of a channel open event. The schema may be changed on the Snowflake side in which case + * this will continue to show an old schema version until the channel is re-opened. + * + * @return map representing Column Name -> Column Properties + */ + Map getTableSchema(); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index d91ddf14a..840284406 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -236,10 +236,11 @@ public float getSize() { * * @param row the input row * @param error the insert error that we return to the customer + * @param rowIndex the index of the current row in the input batch * @return the set of input column names */ Set verifyInputColumns( - Map row, InsertValidationResponse.InsertError error) { + Map row, InsertValidationResponse.InsertError error, int rowIndex) { // Map of unquoted column name -> original column name Map inputColNamesMap = row.keySet().stream() @@ -260,7 +261,8 @@ Set verifyInputColumns( throw new SFException( ErrorCode.INVALID_FORMAT_ROW, "Extra columns: " + extraCols, - "Columns not present in the table shouldn't be specified."); + String.format( + "Columns not present in the table shouldn't be specified, rowIndex:%d", rowIndex)); } // Check for missing columns in the row @@ -278,7 +280,8 @@ Set verifyInputColumns( throw new SFException( ErrorCode.INVALID_FORMAT_ROW, "Missing columns: " + missingCols, - "Values for all non-nullable columns must be specified."); + String.format( + "Values for all non-nullable columns must be specified, rowIndex:%d", rowIndex)); } return inputColNamesMap.keySet(); @@ -305,12 +308,12 @@ public InsertValidationResponse insertRows( if (onErrorOption == OpenChannelRequest.OnErrorOption.CONTINUE || onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_AT_FIRST_ERROR) { // Used to map incoming row(nth row) to InsertError(for nth row) in response - long rowIndex = 0; + int rowIndex = 0; for (Map row : rows) { InsertValidationResponse.InsertError error = new InsertValidationResponse.InsertError(row, rowIndex); try { - Set inputColumnNames = verifyInputColumns(row, error); + Set inputColumnNames = verifyInputColumns(row, error, rowIndex); rowsSizeInBytes += addRow(row, this.bufferedRowCount, this.statsMap, inputColumnNames, rowIndex); this.bufferedRowCount++; @@ -338,7 +341,7 @@ public InsertValidationResponse insertRows( float tempRowsSizeInBytes = 0F; int tempRowCount = 0; for (Map row : rows) { - Set inputColumnNames = verifyInputColumns(row, null); + Set inputColumnNames = verifyInputColumns(row, null, tempRowCount); tempRowsSizeInBytes += addTempRow(row, tempRowCount, this.tempStatsMap, inputColumnNames, tempRowCount); checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java new file mode 100644 index 000000000..856170c57 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnProperties.java @@ -0,0 +1,60 @@ +package net.snowflake.ingest.streaming.internal; + +/** + * Class that encapsulates column properties. These are the same properties showed in the output of + * SHOW COLUMNS. Note + * that this is slightly different than the internal column metadata used elsewhere in this SDK. + */ +public class ColumnProperties { + private String type; + + private String logicalType; + + private Integer precision; + + private Integer scale; + + private Integer byteLength; + + private Integer length; + + private boolean nullable; + + ColumnProperties(ColumnMetadata columnMetadata) { + this.type = columnMetadata.getType(); + this.logicalType = columnMetadata.getLogicalType(); + this.precision = columnMetadata.getPrecision(); + this.scale = columnMetadata.getScale(); + this.byteLength = columnMetadata.getByteLength(); + this.length = columnMetadata.getLength(); + this.nullable = columnMetadata.getNullable(); + } + + public String getType() { + return type; + } + + public String getLogicalType() { + return logicalType; + } + + public Integer getPrecision() { + return precision; + } + + public Integer getScale() { + return scale; + } + + public Integer getByteLength() { + return byteLength; + } + + public Integer getLength() { + return length; + } + + public boolean isNullable() { + return nullable; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java index 1bdfc2095..a1831f829 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java @@ -457,7 +457,10 @@ static TimestampWrapper validateAndParseTimestamp( if (offsetDateTime.getYear() < 1 || offsetDateTime.getYear() > 9999) { throw new SFException( ErrorCode.INVALID_VALUE_ROW, - "Timestamp out of representable inclusive range of years between 1 and 9999"); + String.format( + "Timestamp out of representable inclusive range of years between 1 and 9999," + + " rowIndex:%d", + insertRowIndex)); } return new TimestampWrapper(offsetDateTime, scale); } @@ -588,7 +591,10 @@ static int validateAndParseDate(String columnName, Object input, long insertRowI if (offsetDateTime.getYear() < -9999 || offsetDateTime.getYear() > 9999) { throw new SFException( ErrorCode.INVALID_VALUE_ROW, - "Date out of representable inclusive range of years between -9999 and 9999"); + String.format( + "Date out of representable inclusive range of years between -9999 and 9999," + + " rowIndex:%d", + insertRowIndex)); } return Math.toIntExact(offsetDateTime.toLocalDate().toEpochDay()); @@ -814,7 +820,7 @@ static void checkValueInRange( throw new SFException( ErrorCode.INVALID_FORMAT_ROW, String.format( - "Number out of representable exclusive range of (-1e%s..1e%s), Row Index:%s", + "Number out of representable exclusive range of (-1e%s..1e%s), rowIndex:%d", precision - scale, precision - scale, insertRowIndex)); } } @@ -858,8 +864,7 @@ private static SFException typeNotAllowedException( return new SFException( ErrorCode.INVALID_FORMAT_ROW, String.format( - "Object of type %s cannot be ingested into Snowflake column %s of type %s, Row" - + " Index:%s", + "Object of type %s cannot be ingested into Snowflake column %s of type %s, rowIndex:%d", javaType.getName(), columnName, snowflakeType, insertRowIndex), String.format( String.format("Allowed Java types: %s", String.join(", ", allowedJavaTypes)))); @@ -882,7 +887,7 @@ private static SFException valueFormatNotAllowedException( return new SFException( ErrorCode.INVALID_VALUE_ROW, String.format( - "Value cannot be ingested into Snowflake column %s of type %s, Row Index: %s, reason:" + "Value cannot be ingested into Snowflake column %s of type %s, rowIndex:%d, reason:" + " %s", columnName, snowflakeType, rowIndex, reason)); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 8016f9b5c..ff879b8d6 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Calendar; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -174,7 +173,7 @@ List>> getData() { this.isNeedFlush = false; this.lastFlushTime = System.currentTimeMillis(); this.isTestMode = isTestMode; - this.latencyTimerContextMap = new HashMap<>(); + this.latencyTimerContextMap = new ConcurrentHashMap<>(); this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion(); createWorkers(); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 289b8a983..75966eb35 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -210,8 +210,8 @@ private float addRow( throw new SFException( ErrorCode.MAX_ROW_SIZE_EXCEEDED, String.format( - "rowSizeInBytes=%.3f maxAllowedRowSizeInBytes=%d", - size, clientBufferParameters.getMaxAllowedRowSizeInBytes())); + "rowSizeInBytes:%.3f, maxAllowedRowSizeInBytes:%d, rowIndex:%d", + size, clientBufferParameters.getMaxAllowedRowSizeInBytes(), insertRowsCurrIndex)); } out.accept(Arrays.asList(indexedRow)); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index 4a27aba5d..756f9b5e8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -12,6 +12,7 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -51,6 +52,9 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn // State of the channel that will be shared with its underlying buffer private final ChannelRuntimeState channelState; + // Internal map of column name -> column properties + private final Map tableColumns; + /** * Constructor for TESTING ONLY which allows us to set the test mode * @@ -122,6 +126,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn this::collectRowSize, channelState, new ClientBufferParameters(owningClient)); + this.tableColumns = new HashMap<>(); logger.logInfo( "Channel={} created for table={}", this.channelFlushContext.getName(), @@ -298,6 +303,7 @@ public CompletableFuture close() { void setupSchema(List columns) { logger.logDebug("Setup schema for channel={}, schema={}", getFullyQualifiedName(), columns); this.rowBuffer.setupSchema(columns); + columns.forEach(c -> tableColumns.putIfAbsent(c.getName(), new ColumnProperties(c))); } /** @@ -391,6 +397,12 @@ public String getLatestCommittedOffsetToken() { return response.getPersistedOffsetToken(); } + /** Returns a map of column name -> datatype for the table the channel is bound to */ + @Override + public Map getTableSchema() { + return this.tableColumns; + } + /** Check whether we need to throttle the insertRows API */ void throttleInsertIfNeeded(MemoryInfoProvider memoryInfoProvider) { int retry = 0; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java index f6f9e36af..86706fcf2 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java @@ -676,19 +676,19 @@ public void testTooLargeMultiByteSemiStructuredValues() { expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type VARIANT, Row Index: 0, reason:" + + " cannot be ingested into Snowflake column COL of type VARIANT, rowIndex:0, reason:" + " Variant too long: length=18874376 maxLength=16777152", () -> validateAndParseVariant("COL", m, 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type ARRAY, Row Index: 0, reason:" + + " cannot be ingested into Snowflake column COL of type ARRAY, rowIndex:0, reason:" + " Array too large. length=18874378 maxLength=16777152", () -> validateAndParseArray("COL", m, 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type OBJECT, Row Index: 0, reason:" + + " cannot be ingested into Snowflake column COL of type OBJECT, rowIndex:0, reason:" + " Object too large. length=18874376 maxLength=16777152", () -> validateAndParseObject("COL", m, 0)); } @@ -1005,13 +1005,13 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type BOOLEAN, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type BOOLEAN, rowIndex:0. Allowed" + " Java types: boolean, Number, String", () -> validateAndParseBoolean("COL", new Object(), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type BOOLEAN, Row Index: 0, reason:" + + " cannot be ingested into Snowflake column COL of type BOOLEAN, rowIndex:0, reason:" + " Not a valid boolean, see" + " https://docs.snowflake.com/en/sql-reference/data-types-logical.html#conversion-to-boolean" + " for the list of supported formats", @@ -1021,13 +1021,13 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type TIME, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type TIME, rowIndex:0. Allowed" + " Java types: String, LocalTime, OffsetTime", () -> validateAndParseTime("COL", new Object(), 10, 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type TIME, Row Index: 0, reason:" + + " cannot be ingested into Snowflake column COL of type TIME, rowIndex:0, reason:" + " Not a valid time, see" + " https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview for" + " the list of supported formats", @@ -1037,13 +1037,13 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type DATE, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type DATE, rowIndex:0. Allowed" + " Java types: String, LocalDate, LocalDateTime, ZonedDateTime, OffsetDateTime", () -> validateAndParseDate("COL", new Object(), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type DATE, Row Index: 0, reason:" + + " cannot be ingested into Snowflake column COL of type DATE, rowIndex:0, reason:" + " Not a valid value, see" + " https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview for" + " the list of supported formats", @@ -1053,14 +1053,14 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type TIMESTAMP, Row Index:0." + + " cannot be ingested into Snowflake column COL of type TIMESTAMP, rowIndex:0." + " Allowed Java types: String, LocalDate, LocalDateTime, ZonedDateTime," + " OffsetDateTime", () -> validateAndParseTimestamp("COL", new Object(), 3, UTC, true, 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type TIMESTAMP, Row Index: 0," + + " cannot be ingested into Snowflake column COL of type TIMESTAMP, rowIndex:0," + " reason: Not a valid value, see" + " https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview for" + " the list of supported formats", @@ -1070,14 +1070,14 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type TIMESTAMP, Row Index:0." + + " cannot be ingested into Snowflake column COL of type TIMESTAMP, rowIndex:0." + " Allowed Java types: String, LocalDate, LocalDateTime, ZonedDateTime," + " OffsetDateTime", () -> validateAndParseTimestamp("COL", new Object(), 3, UTC, false, 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type TIMESTAMP, Row Index: 0," + + " cannot be ingested into Snowflake column COL of type TIMESTAMP, rowIndex:0," + " reason: Not a valid value, see" + " https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview for" + " the list of supported formats", @@ -1087,14 +1087,14 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type TIMESTAMP, Row Index:0." + + " cannot be ingested into Snowflake column COL of type TIMESTAMP, rowIndex:0." + " Allowed Java types: String, LocalDate, LocalDateTime, ZonedDateTime," + " OffsetDateTime", () -> validateAndParseTimestamp("COL", new Object(), 3, UTC, false, 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type TIMESTAMP, Row Index: 0," + + " cannot be ingested into Snowflake column COL of type TIMESTAMP, rowIndex:0," + " reason: Not a valid value, see" + " https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview for" + " the list of supported formats", @@ -1104,13 +1104,13 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type NUMBER, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type NUMBER, rowIndex:0. Allowed" + " Java types: int, long, byte, short, float, double, BigDecimal, BigInteger, String", () -> validateAndParseBigDecimal("COL", new Object(), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type NUMBER, Row Index: 0, reason:" + + " cannot be ingested into Snowflake column COL of type NUMBER, rowIndex:0, reason:" + " Not a valid number", () -> validateAndParseBigDecimal("COL", "abc", 0)); @@ -1118,13 +1118,13 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type REAL, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type REAL, rowIndex:0. Allowed" + " Java types: Number, String", () -> validateAndParseReal("COL", new Object(), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type REAL, Row Index: 0, reason:" + + " cannot be ingested into Snowflake column COL of type REAL, rowIndex:0, reason:" + " Not a valid decimal number", () -> validateAndParseReal("COL", "abc", 0)); @@ -1132,13 +1132,13 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type STRING, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type STRING, rowIndex:0. Allowed" + " Java types: String, Number, boolean, char", () -> validateAndParseString("COL", new Object(), Optional.empty(), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type STRING, Row Index: 0, reason:" + + " cannot be ingested into Snowflake column COL of type STRING, rowIndex:0, reason:" + " String too long: length=3 characters maxLength=2 characters", () -> validateAndParseString("COL", "abc", Optional.of(2), 0)); @@ -1146,19 +1146,19 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type BINARY, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type BINARY, rowIndex:0. Allowed" + " Java types: byte[], String", () -> validateAndParseBinary("COL", new Object(), Optional.empty(), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type BINARY, Row Index: 0, reason:" + + " cannot be ingested into Snowflake column COL of type BINARY, rowIndex:0, reason:" + " Binary too long: length=2 maxLength=1", () -> validateAndParseBinary("COL", new byte[] {1, 2}, Optional.of(1), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type BINARY, Row Index: 0, reason:" + + " cannot be ingested into Snowflake column COL of type BINARY, rowIndex:0, reason:" + " Not a valid hex string", () -> validateAndParseBinary("COL", "ghi", Optional.empty(), 0)); @@ -1166,14 +1166,14 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type VARIANT, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type VARIANT, rowIndex:0. Allowed" + " Java types: String, Primitive data types and their arrays, java.time.*, List," + " Map, T[]", () -> validateAndParseVariant("COL", new Object(), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type VARIANT, Row Index: 0, reason:" + + " cannot be ingested into Snowflake column COL of type VARIANT, rowIndex:0, reason:" + " Not a valid JSON", () -> validateAndParseVariant("COL", "][", 0)); @@ -1181,14 +1181,14 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type ARRAY, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type ARRAY, rowIndex:0. Allowed" + " Java types: String, Primitive data types and their arrays, java.time.*, List," + " Map, T[]", () -> validateAndParseArray("COL", new Object(), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type ARRAY, Row Index: 0, reason:" + + " cannot be ingested into Snowflake column COL of type ARRAY, rowIndex:0, reason:" + " Not a valid JSON", () -> validateAndParseArray("COL", "][", 0)); @@ -1196,14 +1196,14 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type OBJECT, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type OBJECT, rowIndex:0. Allowed" + " Java types: String, Primitive data types and their arrays, java.time.*, List," + " Map, T[]", () -> validateAndParseObject("COL", new Object(), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type OBJECT, Row Index: 0, reason:" + + " cannot be ingested into Snowflake column COL of type OBJECT, rowIndex:0, reason:" + " Not a valid JSON", () -> validateAndParseObject("COL", "}{", 0)); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 63340e25a..8d71d9a44 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -307,8 +307,8 @@ public void testRowIndexWithMultipleRowsWithError() { .getMessage() .equalsIgnoreCase( "The given row cannot be converted to the internal format due to invalid value:" - + " Value cannot be ingested into Snowflake column COLCHAR of type STRING, Row" - + " Index: 1, reason: String too long: length=22 characters maxLength=11" + + " Value cannot be ingested into Snowflake column COLCHAR of type STRING," + + " rowIndex:1, reason: String too long: length=22 characters maxLength=11" + " characters")); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 62f2efdce..8fbf67264 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -495,6 +495,22 @@ public void testOpenChannelSuccessResponse() throws Exception { Assert.assertEquals(dbName, channel.getDBName()); Assert.assertEquals(schemaName, channel.getSchemaName()); Assert.assertEquals(tableName, channel.getTableName()); + + Assert.assertTrue(channel.getTableSchema().containsKey("C1")); + Assert.assertEquals("NUMBER(38,0)", channel.getTableSchema().get("C1").getType()); + Assert.assertEquals("FIXED", channel.getTableSchema().get("C1").getLogicalType()); + Assert.assertEquals(38, (int) channel.getTableSchema().get("C1").getPrecision()); + Assert.assertEquals(0, (int) channel.getTableSchema().get("C1").getScale()); + Assert.assertNull(channel.getTableSchema().get("C1").getByteLength()); + Assert.assertTrue(channel.getTableSchema().get("C1").isNullable()); + + Assert.assertTrue(channel.getTableSchema().containsKey("C2")); + Assert.assertEquals("NUMBER(38,0)", channel.getTableSchema().get("C2").getType()); + Assert.assertEquals("FIXED", channel.getTableSchema().get("C2").getLogicalType()); + Assert.assertEquals(38, (int) channel.getTableSchema().get("C2").getPrecision()); + Assert.assertEquals(0, (int) channel.getTableSchema().get("C2").getScale()); + Assert.assertNull(channel.getTableSchema().get("C2").getByteLength()); + Assert.assertTrue(channel.getTableSchema().get("C2").isNullable()); } @Test @@ -572,8 +588,8 @@ public void testInsertTooLargeRow() { .collect(Collectors.toList()); String expectedMessage = - "The given row exceeds the maximum allowed row size rowSizeInBytes=67109128.000" - + " maxAllowedRowSizeInBytes=67108864"; + "The given row exceeds the maximum allowed row size rowSizeInBytes:67109128.000," + + " maxAllowedRowSizeInBytes:67108864, rowIndex:0"; Map row = new HashMap<>(); schema.forEach(x -> row.put(x.getName(), byteArrayOneMb));