diff --git a/pom.xml b/pom.xml index 61f876c75..cb4b2ed0b 100644 --- a/pom.xml +++ b/pom.xml @@ -426,6 +426,10 @@ commons-codec commons-codec + + commons-io + commons-io + diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index 278d4abea..ac05c814e 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -16,6 +16,8 @@ public class ClientBufferParameters { private long maxAllowedRowSizeInBytes; + private final boolean enableNewJsonParsingLogic; + private Constants.BdecParquetCompression bdecParquetCompression; /** @@ -30,11 +32,13 @@ private ClientBufferParameters( boolean enableParquetInternalBuffering, long maxChunkSizeInBytes, long maxAllowedRowSizeInBytes, - Constants.BdecParquetCompression bdecParquetCompression) { + Constants.BdecParquetCompression bdecParquetCompression, + boolean enableNewJsonParsingLogic) { this.enableParquetInternalBuffering = enableParquetInternalBuffering; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes; this.bdecParquetCompression = bdecParquetCompression; + this.enableNewJsonParsingLogic = enableNewJsonParsingLogic; } /** @param clientInternal reference to the client object where the relevant parameters are set */ @@ -55,6 +59,11 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter clientInternal != null ? clientInternal.getParameterProvider().getBdecParquetCompressionAlgorithm() : ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT; + + this.enableNewJsonParsingLogic = + clientInternal != null + ? clientInternal.getParameterProvider().isEnableNewJsonParsingLogic() + : ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT; } /** @@ -68,12 +77,14 @@ public static ClientBufferParameters test_createClientBufferParameters( boolean enableParquetInternalBuffering, long maxChunkSizeInBytes, long maxAllowedRowSizeInBytes, - Constants.BdecParquetCompression bdecParquetCompression) { + Constants.BdecParquetCompression bdecParquetCompression, + boolean enableNewJsonParsingLogic) { return new ClientBufferParameters( enableParquetInternalBuffering, maxChunkSizeInBytes, maxAllowedRowSizeInBytes, - bdecParquetCompression); + bdecParquetCompression, + enableNewJsonParsingLogic); } public boolean getEnableParquetInternalBuffering() { @@ -91,4 +102,8 @@ public long getMaxAllowedRowSizeInBytes() { public Constants.BdecParquetCompression getBdecParquetCompression() { return bdecParquetCompression; } + + public boolean isEnableNewJsonParsingLogic() { + return enableNewJsonParsingLogic; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java index 162e56145..310a711d0 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java @@ -6,11 +6,17 @@ import static net.snowflake.ingest.streaming.internal.BinaryStringUtils.unicodeCharactersCount; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; +import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.charset.StandardCharsets; @@ -39,6 +45,7 @@ import net.snowflake.ingest.utils.SFException; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; +import org.apache.commons.io.output.StringBuilderWriter; /** Utility class for parsing and validating inputs based on Snowflake types */ class DataValidationUtil { @@ -70,6 +77,8 @@ class DataValidationUtil { private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final JsonFactory factory = new JsonFactory(); + // The version of Jackson we are using does not support serialization of date objects from the // java.time package. Here we define a module with custom java.time serializers. Additionally, we // define custom serializer for byte[] because the Jackson default is to serialize it as @@ -135,6 +144,61 @@ private static JsonNode validateAndParseSemiStructuredAsJsonTree( insertRowIndex); } + /** + * Validates and parses input as JSON. All types in the object tree must be valid variant types, + * see {@link DataValidationUtil#isAllowedSemiStructuredType}. + * + * @param input Object to validate + * @return Minified JSON string + */ + private static String validateAndParseSemiStructured( + String columnName, Object input, String snowflakeType, final long insertRowIndex) { + if (input instanceof String) { + final String stringInput = (String) input; + verifyValidUtf8(stringInput, columnName, snowflakeType, insertRowIndex); + final StringBuilderWriter resultWriter = new StringBuilderWriter(stringInput.length()); + try (final JsonParser parser = factory.createParser(stringInput); + final JsonGenerator generator = factory.createGenerator(resultWriter)) { + while (parser.nextToken() != null) { + final JsonToken token = parser.currentToken(); + if (token.isNumeric()) { + // If the current token is a number, we cannot just copy the current event because it + // would write token the token from double (or big decimal), whose scientific notation + // may have been altered during deserialization. We want to preserve the scientific + // notation from the user input, so we write the current numer as text. + generator.writeNumber(parser.getText()); + } else { + generator.copyCurrentEvent(parser); + } + } + } catch (JsonParseException e) { + throw valueFormatNotAllowedException( + columnName, snowflakeType, "Not a valid JSON", insertRowIndex); + } catch (IOException e) { + throw new SFException(e, ErrorCode.IO_ERROR, "Cannot create JSON Parser or JSON generator"); + } + // We return the minified string from the result writer + return resultWriter.toString(); + } else if (isAllowedSemiStructuredType(input)) { + JsonNode node = objectMapper.valueToTree(input); + return node.toString(); + } + + throw typeNotAllowedException( + columnName, + input.getClass(), + snowflakeType, + new String[] { + "String", + "Primitive data types and their arrays", + "java.time.*", + "List", + "Map", + "T[]" + }, + insertRowIndex); + } + /** * Validates and parses input as JSON. All types in the object tree must be valid variant types, * see {@link DataValidationUtil#isAllowedSemiStructuredType}. @@ -165,6 +229,34 @@ static String validateAndParseVariant(String columnName, Object input, long inse return output; } + /** + * Validates and parses input as JSON. All types in the object tree must be valid variant types, + * see {@link DataValidationUtil#isAllowedSemiStructuredType}. + * + * @param input Object to validate + * @param insertRowIndex + * @return JSON string representing the input + */ + static String validateAndParseVariantNew(String columnName, Object input, long insertRowIndex) { + final String result = + validateAndParseSemiStructured(columnName, input, "VARIANT", insertRowIndex); + + // Empty json strings are ingested as nulls + if (result.isEmpty()) { + return null; + } + int stringLength = result.getBytes(StandardCharsets.UTF_8).length; + if (stringLength > MAX_SEMI_STRUCTURED_LENGTH) { + throw valueFormatNotAllowedException( + columnName, + "VARIANT", + String.format( + "Variant too long: length=%d maxLength=%d", stringLength, MAX_SEMI_STRUCTURED_LENGTH), + insertRowIndex); + } + return result; + } + /** * Validates that passed object is allowed data type for semi-structured columns (i.e. VARIANT, * ARRAY, OBJECT). For non-trivial types like maps, arrays or lists, it recursively traverses the @@ -299,6 +391,41 @@ static String validateAndParseArray(String columnName, Object input, long insert return output; } + /** + * Validates and parses JSON array. Non-array types are converted into single-element arrays. All + * types in the array tree must be valid variant types, see {@link + * DataValidationUtil#isAllowedSemiStructuredType}. + * + * @param input Object to validate + * @param insertRowIndex + * @return JSON array representing the input + */ + static String validateAndParseArrayNew(String columnName, Object input, long insertRowIndex) { + String result = validateAndParseSemiStructured(columnName, input, "ARRAY", insertRowIndex); + if (result.isEmpty()) { + // Empty input is ingested as an array of null + result = + JsonToken.START_ARRAY.asString() + + JsonToken.VALUE_NULL.asString() + + JsonToken.END_ARRAY.asString(); + } else if (!result.startsWith(JsonToken.START_ARRAY.asString())) { + // Non-array values are ingested as single-element arrays, mimicking the Worksheets behavior + result = JsonToken.START_ARRAY.asString() + result + JsonToken.END_ARRAY.asString(); + } + + // Throw an exception if the size is too large + int stringLength = result.getBytes(StandardCharsets.UTF_8).length; + if (stringLength > MAX_SEMI_STRUCTURED_LENGTH) { + throw valueFormatNotAllowedException( + columnName, + "ARRAY", + String.format( + "Array too large. length=%d maxLength=%d", stringLength, MAX_SEMI_STRUCTURED_LENGTH), + insertRowIndex); + } + return result; + } + /** * Validates and parses JSON object. Input is rejected if the value does not represent JSON object * (e.g. String '{}' or Map). All types in the object tree must be valid variant types, @@ -329,6 +456,34 @@ static String validateAndParseObject(String columnName, Object input, long inser return output; } + /** + * Validates and parses JSON object. Input is rejected if the value does not represent JSON object + * (e.g. String '{}' or Map). All types in the object tree must be valid variant types, + * see {@link DataValidationUtil#isAllowedSemiStructuredType}. + * + * @param input Object to validate + * @param insertRowIndex + * @return JSON object representing the input + */ + static String validateAndParseObjectNew(String columnName, Object input, long insertRowIndex) { + final String result = + validateAndParseSemiStructured(columnName, input, "OBJECT", insertRowIndex); + if (!result.startsWith(JsonToken.START_OBJECT.asString())) { + throw valueFormatNotAllowedException(columnName, "OBJECT", "Not an object", insertRowIndex); + } + // Throw an exception if the size is too large + int stringLength = result.getBytes(StandardCharsets.UTF_8).length; + if (stringLength > MAX_SEMI_STRUCTURED_LENGTH) { + throw valueFormatNotAllowedException( + columnName, + "OBJECT", + String.format( + "Object too large. length=%d maxLength=%d", stringLength, MAX_SEMI_STRUCTURED_LENGTH), + insertRowIndex); + } + return result; + } + /** * Converts user input to offset date time, which is the canonical representation of dates and * timestamps. diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 17aaa9136..627478bca 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -207,7 +207,13 @@ private float addRow( ColumnMetadata column = parquetColumn.columnMetadata; ParquetValueParser.ParquetBufferValue valueWithSize = ParquetValueParser.parseColumnValueToParquet( - value, column, parquetColumn.type, forkedStats, defaultTimezone, insertRowsCurrIndex); + value, + column, + parquetColumn.type, + forkedStats, + defaultTimezone, + insertRowsCurrIndex, + clientBufferParameters.isEnableNewJsonParsingLogic()); indexedRow[colIndex] = valueWithSize.getValue(); size += valueWithSize.getSize(); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetValueParser.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetValueParser.java index 282a007d4..298ec2ba2 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetValueParser.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetValueParser.java @@ -85,7 +85,8 @@ static ParquetBufferValue parseColumnValueToParquet( PrimitiveType.PrimitiveTypeName typeName, RowBufferStats stats, ZoneId defaultTimezone, - long insertRowsCurrIndex) { + long insertRowsCurrIndex, + boolean enableNewJsonParsingLogic) { Utils.assertNotNull("Parquet column stats", stats); float estimatedParquetSize = 0F; estimatedParquetSize += DEFINITION_LEVEL_ENCODING_BYTE_LEN; @@ -147,7 +148,9 @@ static ParquetBufferValue parseColumnValueToParquet( getBinaryValueForLogicalBinary(value, stats, columnMetadata, insertRowsCurrIndex); length = ((byte[]) value).length; } else { - String str = getBinaryValue(value, stats, columnMetadata, insertRowsCurrIndex); + String str = + getBinaryValue( + value, stats, columnMetadata, insertRowsCurrIndex, enableNewJsonParsingLogic); value = str; if (str != null) { length = str.getBytes().length; @@ -365,7 +368,8 @@ private static String getBinaryValue( Object value, RowBufferStats stats, ColumnMetadata columnMetadata, - final long insertRowsCurrIndex) { + final long insertRowsCurrIndex, + boolean enableNewJsonParsingLogic) { AbstractRowBuffer.ColumnLogicalType logicalType = AbstractRowBuffer.ColumnLogicalType.valueOf(columnMetadata.getLogicalType()); String str; @@ -373,18 +377,27 @@ private static String getBinaryValue( switch (logicalType) { case OBJECT: str = - DataValidationUtil.validateAndParseObject( - columnMetadata.getName(), value, insertRowsCurrIndex); + enableNewJsonParsingLogic + ? DataValidationUtil.validateAndParseObjectNew( + columnMetadata.getName(), value, insertRowsCurrIndex) + : DataValidationUtil.validateAndParseObject( + columnMetadata.getName(), value, insertRowsCurrIndex); break; case VARIANT: str = - DataValidationUtil.validateAndParseVariant( - columnMetadata.getName(), value, insertRowsCurrIndex); + enableNewJsonParsingLogic + ? DataValidationUtil.validateAndParseVariantNew( + columnMetadata.getName(), value, insertRowsCurrIndex) + : DataValidationUtil.validateAndParseVariant( + columnMetadata.getName(), value, insertRowsCurrIndex); break; case ARRAY: str = - DataValidationUtil.validateAndParseArray( - columnMetadata.getName(), value, insertRowsCurrIndex); + enableNewJsonParsingLogic + ? DataValidationUtil.validateAndParseArrayNew( + columnMetadata.getName(), value, insertRowsCurrIndex) + : DataValidationUtil.validateAndParseArray( + columnMetadata.getName(), value, insertRowsCurrIndex); break; default: throw new SFException( diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index bd769f6cc..0525737a3 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -46,6 +46,9 @@ public class ParameterProvider { public static final String BDEC_PARQUET_COMPRESSION_ALGORITHM = "BDEC_PARQUET_COMPRESSION_ALGORITHM".toLowerCase(); + public static final String ENABLE_NEW_JSON_PARSING_LOGIC = + "ENABLE_NEW_JSON_PARSING_LOGIC".toLowerCase(); + // Default values public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100; public static final long INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT = 1000; @@ -80,6 +83,8 @@ public class ParameterProvider { It reduces memory consumption compared to using Java Objects for buffering.*/ public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false; + public static final boolean ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT = true; + /** Map of parameter name to parameter value. This will be set by client/configure API Call. */ private final Map parameterMap = new HashMap<>(); @@ -252,6 +257,13 @@ private void setParameterMap( props, false); + this.checkAndUpdate( + ENABLE_NEW_JSON_PARSING_LOGIC, + ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT, + parameterOverrides, + props, + false); + if (getMaxChunksInBlob() > getMaxChunksInRegistrationRequest()) { throw new IllegalArgumentException( String.format( @@ -486,6 +498,14 @@ public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() { return Constants.BdecParquetCompression.fromName((String) val); } + /** @return Whether new JSON parsing logic, which preserves */ + public boolean isEnableNewJsonParsingLogic() { + Object val = + this.parameterMap.getOrDefault( + ENABLE_NEW_JSON_PARSING_LOGIC, ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT); + return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; + } + @Override public String toString() { return "ParameterProvider{" + "parameterMap=" + parameterMap + '}'; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java index 8ab22619f..b92cc6e5e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java @@ -6,16 +6,19 @@ import static net.snowflake.ingest.streaming.internal.DataValidationUtil.BYTES_8_MB; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.isAllowedSemiStructuredType; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseArray; +import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseArrayNew; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseBigDecimal; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseBinary; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseBoolean; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseDate; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseObject; +import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseObjectNew; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseReal; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseString; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseTime; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseTimestamp; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseVariant; +import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseVariantNew; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -41,7 +44,6 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.TimeZone; @@ -49,6 +51,7 @@ import net.snowflake.ingest.utils.SFException; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; +import org.apache.commons.lang3.StringUtils; import org.junit.Assert; import org.junit.Test; @@ -490,56 +493,121 @@ public void testValidateAndParseString() { @Test public void testValidateAndParseVariant() throws Exception { - assertEquals("1", validateAndParseVariant("COL", 1, 0)); - assertEquals("1", validateAndParseVariant("COL", "1", 0)); - assertEquals("1", validateAndParseVariant("COL", " 1 ", 0)); - String stringVariant = "{\"key\":1}"; - assertEquals(stringVariant, validateAndParseVariant("COL", stringVariant, 0)); - assertEquals(stringVariant, validateAndParseVariant("COL", " " + stringVariant + " \t\n", 0)); + assertJson("variant", "1", 1); + assertJson("variant", "1", "1"); + assertJson("variant", "1", " 1 \n"); + assertJson("variant", "{\"key\":1}", "{\"key\":1}"); + assertJson("variant", "{\"key\":1}", " { \"key\": 1 } "); + + // Variants should preserve input format of numbers + assertJson( + "variant", "{\"key\":1111111.1111111}", " {\"key\": 1111111.1111111} \t\n", false); + assertJson( + "variant", + "{\"key\":11.111111111111e8}", + " {\"key\": 11.111111111111e8 } \t\n", + false); + assertJson( + "variant", + "{\"key\":11.111111111111e-8}", + " {\"key\": 11.111111111111e-8 } \t\n", + false); + assertJson( + "variant", + "{\"key\":11.111111111111E8}", + " {\"key\": 11.111111111111E8 } \t\n", + false); + assertJson( + "variant", + "{\"key\":11.111111111111E-8}", + " {\"key\": 11.111111111111E-8 } \t\n", + false); + assertJson( + "variant", + "{\"key\":11111111111111e8}", + " {\"key\": 11111111111111e8 } \t\n", + false); + assertJson( + "variant", + "{\"key\":11111111111111e-8}", + " {\"key\": 11111111111111e-8 } \t\n", + false); + assertJson( + "variant", + "{\"key\":11111111111111E8}", + " {\"key\": 11111111111111E8 } \t\n", + false); + assertJson( + "variant", + "{\"key\":11111111111111E-8}", + " {\"key\": 11111111111111E-8 } \t\n", + false); // Test custom serializers - assertEquals( - "[-128,0,127]", - validateAndParseVariant("COL", new byte[] {Byte.MIN_VALUE, 0, Byte.MAX_VALUE}, 0)); - assertEquals( + assertJson("variant", "[-128,0,127]", new byte[] {Byte.MIN_VALUE, 0, Byte.MAX_VALUE}); + assertJson( + "variant", "\"2022-09-28T03:04:12.123456789-07:00\"", - validateAndParseVariant( - "COL", - ZonedDateTime.of(2022, 9, 28, 3, 4, 12, 123456789, ZoneId.of("America/Los_Angeles")), - 0)); + ZonedDateTime.of(2022, 9, 28, 3, 4, 12, 123456789, ZoneId.of("America/Los_Angeles"))); // Test valid JSON tokens - assertEquals("null", validateAndParseVariant("COL", null, 0)); - assertEquals("null", validateAndParseVariant("COL", "null", 0)); - assertEquals("true", validateAndParseVariant("COL", true, 0)); - assertEquals("true", validateAndParseVariant("COL", "true", 0)); - assertEquals("false", validateAndParseVariant("COL", false, 0)); - assertEquals("false", validateAndParseVariant("COL", "false", 0)); - assertEquals("{}", validateAndParseVariant("COL", "{}", 0)); - assertEquals("[]", validateAndParseVariant("COL", "[]", 0)); - assertEquals("[\"foo\",1,null]", validateAndParseVariant("COL", "[\"foo\",1,null]", 0)); - assertEquals("\"\"", validateAndParseVariant("COL", "\"\"", 0)); + + assertJson("variant", "null", null); + assertJson("variant", "null", "null"); + assertJson("variant", "true", true); + assertJson("variant", "true", "true"); + assertJson("variant", "false", false); + assertJson("variant", "false", "false"); + + assertJson("variant", "[]", "[]"); + assertJson("variant", "{}", "{}"); + assertJson("variant", "[\"foo\",1,null]", "[\"foo\",1,null]"); + assertJson("variant", "\"\"", "\"\""); // Test missing values are null instead of empty string assertNull(validateAndParseVariant("COL", "", 0)); + assertNull(validateAndParseVariantNew("COL", "", 0)); assertNull(validateAndParseVariant("COL", " ", 0)); + assertNull(validateAndParseVariantNew("COL", " ", 0)); // Test that invalid UTF-8 strings cannot be ingested expectError( ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseVariant("COL", "\"foo\uD800bar\"", 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, + () -> validateAndParseVariantNew("COL", "\"foo\uD800bar\"", 0)); // Test forbidden values expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseVariant("COL", "{null}", 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseVariantNew("COL", "{null}", 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseVariant("COL", "}{", 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseVariantNew("COL", "}{", 0)); + expectError( ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseVariant("COL", readTree("{}"), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseVariantNew("COL", readTree("{}"), 0)); + expectError( ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseVariant("COL", new Object(), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseVariantNew("COL", new Object(), 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseVariant("COL", "foo", 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseVariantNew("COL", "foo", 0)); + expectError(ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseVariant("COL", new Date(), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseVariantNew("COL", new Date(), 0)); + expectError( ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseVariant("COL", Collections.singletonList(new Object()), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, + () -> validateAndParseVariantNew("COL", Collections.singletonList(new Object()), 0)); + expectError( ErrorCode.INVALID_FORMAT_ROW, () -> @@ -547,65 +615,103 @@ public void testValidateAndParseVariant() throws Exception { "COL", Collections.singletonList(Collections.singletonMap("foo", new Object())), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, + () -> + validateAndParseVariantNew( + "COL", + Collections.singletonList(Collections.singletonMap("foo", new Object())), + 0)); + expectError( ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseVariant("COL", Collections.singletonMap(new Object(), "foo"), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, + () -> validateAndParseVariantNew("COL", Collections.singletonMap(new Object(), "foo"), 0)); + expectError( ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseVariant("COL", Collections.singletonMap("foo", new Object()), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, + () -> validateAndParseVariantNew("COL", Collections.singletonMap("foo", new Object()), 0)); } - @Test - public void testValidateAndParseArray() throws Exception { - assertEquals("[1]", validateAndParseArray("COL", 1, 0)); - assertEquals("[1]", validateAndParseArray("COL", "1", 0)); - assertEquals("[1]", validateAndParseArray("COL", " 1 ", 0)); - assertEquals("[1,2,3]", validateAndParseArray("COL", "[1, 2, 3]", 0)); - assertEquals("[1,2,3]", validateAndParseArray("COL", " [1, 2, 3] \t\n", 0)); - int[] intArray = new int[] {1, 2, 3}; - assertEquals("[1,2,3]", validateAndParseArray("COL", intArray, 0)); - - String[] stringArray = new String[] {"a", "b", "c"}; - assertEquals("[\"a\",\"b\",\"c\"]", validateAndParseArray("COL", stringArray, 0)); - - Object[] objectArray = new Object[] {1, 2, 3}; - assertEquals("[1,2,3]", validateAndParseArray("COL", objectArray, 0)); - - Object[] ObjectArrayWithNull = new Object[] {1, null, 3}; - assertEquals("[1,null,3]", validateAndParseArray("COL", ObjectArrayWithNull, 0)); - - Object[][] nestedArray = new Object[][] {{1, 2, 3}, null, {4, 5, 6}}; - assertEquals("[[1,2,3],null,[4,5,6]]", validateAndParseArray("COL", nestedArray, 0)); - - List intList = Arrays.asList(1, 2, 3); - assertEquals("[1,2,3]", validateAndParseArray("COL", intList, 0)); + private void assertJson(String colType, String expectedValue, Object value) { + assertJson(colType, expectedValue, value, true); + } - List objectList = Arrays.asList(1, 2, 3); - assertEquals("[1,2,3]", validateAndParseArray("COL", objectList, 0)); + private void assertJson( + String colType, String expectedValue, Object value, boolean alsoTestOldApproach) { + if (colType.equalsIgnoreCase("variant")) { + assertEquals(expectedValue, validateAndParseVariantNew("COL", value, 0)); + if (alsoTestOldApproach) { + assertEquals(expectedValue, validateAndParseVariant("COL", value, 0)); + } + } else if (colType.equalsIgnoreCase("array")) { + assertEquals(expectedValue, validateAndParseArrayNew("COL", value, 0)); + if (alsoTestOldApproach) { + assertEquals(expectedValue, validateAndParseArray("COL", value, 0)); + } + } else if (colType.equalsIgnoreCase("object")) { + assertEquals(expectedValue, validateAndParseObjectNew("COL", value, 0)); + if (alsoTestOldApproach) { + assertEquals(expectedValue, validateAndParseObject("COL", value, 0)); + } + } else { + Assert.fail("Unexpected colType " + colType); + } + } - List nestedList = Arrays.asList(Arrays.asList(1, 2, 3), 2, 3); - assertEquals("[[1,2,3],2,3]", validateAndParseArray("COL", nestedList, 0)); + @Test + public void testValidateAndParseArray() throws Exception { + assertJson("array", "[1]", 1); + assertJson("array", "[1]", "1"); + assertJson("array", "[\"1\"]", "\"1\""); + assertJson("array", "[1.1e10]", " 1.1e10 ", false); + assertJson("array", "[1,2,3]", " [1, 2, 3] \t\n"); + assertJson("array", "[1,2,3]", new int[] {1, 2, 3}); + assertJson("array", "[\"a\",\"b\",\"c\"]", new String[] {"a", "b", "c"}); + assertJson("array", "[1,2,3]", new Object[] {1, 2, 3}); + assertJson("array", "[1,null,3]", new Object[] {1, null, 3}); + assertJson("array", "[[1,2,3],null,[4,5,6]]", new Object[][] {{1, 2, 3}, null, {4, 5, 6}}); + assertJson("array", "[1,2,3]", Arrays.asList(1, 2, 3)); + assertJson("array", "[[1,2,3],2,3]", Arrays.asList(Arrays.asList(1, 2, 3), 2, 3)); // Test null values - assertEquals("[null]", validateAndParseArray("COL", "", 0)); - assertEquals("[null]", validateAndParseArray("COL", " ", 0)); - assertEquals("[null]", validateAndParseArray("COL", "null", 0)); - assertEquals("[null]", validateAndParseArray("COL", null, 0)); + assertJson("array", "[null]", ""); + assertJson("array", "[null]", " "); + assertJson("array", "[null]", "null"); + assertJson("array", "[null]", null); // Test that invalid UTF-8 strings cannot be ingested expectError( ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseArray("COL", "\"foo\uD800bar\"", 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseArrayNew("COL", "\"foo\uD800bar\"", 0)); // Test forbidden values expectError( ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseArray("COL", readTree("[]"), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseArrayNew("COL", readTree("[]"), 0)); expectError(ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseArray("COL", new Object(), 0)); expectError( - ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseArray("COL", "foo", 0)); // invalid JSO)N + ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseArrayNew("COL", new Object(), 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseArray("COL", "foo", 0)); // invalid JSON + expectError( + ErrorCode.INVALID_VALUE_ROW, + () -> validateAndParseArrayNew("COL", "foo", 0)); // invalid JSON expectError(ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseArray("COL", new Date(), 0)); + expectError(ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseArrayNew("COL", new Date(), 0)); expectError( ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseArray("COL", Collections.singletonList(new Object()), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, + () -> validateAndParseArrayNew("COL", Collections.singletonList(new Object()), 0)); expectError( ErrorCode.INVALID_FORMAT_ROW, () -> @@ -613,60 +719,96 @@ public void testValidateAndParseArray() throws Exception { "COL", Collections.singletonList(Collections.singletonMap("foo", new Object())), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, + () -> + validateAndParseArrayNew( + "COL", + Collections.singletonList(Collections.singletonMap("foo", new Object())), + 0)); expectError( ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseArray("COL", Collections.singletonMap(new Object(), "foo"), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, + () -> validateAndParseArrayNew("COL", Collections.singletonMap(new Object(), "foo"), 0)); expectError( ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseArray("COL", Collections.singletonMap("foo", new Object()), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, + () -> validateAndParseArrayNew("COL", Collections.singletonMap("foo", new Object()), 0)); } @Test public void testValidateAndParseObject() throws Exception { - String stringObject = "{\"key\":1}"; - assertEquals(stringObject, validateAndParseObject("COL", stringObject, 0)); - assertEquals(stringObject, validateAndParseObject("COL", " " + stringObject + " \t\n", 0)); - - String badObject = "foo"; - try { - validateAndParseObject("COL", badObject, 0); - Assert.fail("Expected INVALID_ROW error"); - } catch (SFException err) { - assertEquals(ErrorCode.INVALID_VALUE_ROW.getMessageCode(), err.getVendorCode()); - } + assertJson("object", "{}", " { } "); + assertJson("object", "{\"key\":1}", "{\"key\":1}"); + assertJson("object", "{\"key\":1}", " { \"key\" : 1 } "); + assertJson("object", "{\"key\":111.111}", " { \"key\" : 111.111 } "); + assertJson("object", "{\"key\":111.111e6}", " { \"key\" : 111.111e6 } ", false); + assertJson("object", "{\"key\":111.111E6}", " { \"key\" : 111.111E6 } ", false); + assertJson("object", "{\"key\":111.111e-6}", " { \"key\" : 111.111e-6 } ", false); + assertJson("object", "{\"key\":111.111E-6}", " { \"key\" : 111.111E-6 } ", false); - char[] data = new char[20000000]; - Arrays.fill(data, 'a'); - String stringVal = new String(data); - Map mapVal = new HashMap<>(); - mapVal.put("key", stringVal); - String tooLargeObject = objectMapper.writeValueAsString(mapVal); - try { - validateAndParseObject("COL", tooLargeObject, 0); - Assert.fail("Expected INVALID_ROW error"); - } catch (SFException err) { - assertEquals(ErrorCode.INVALID_VALUE_ROW.getMessageCode(), err.getVendorCode()); - } + final String tooLargeObject = + objectMapper.writeValueAsString( + Collections.singletonMap("key", StringUtils.repeat('a', 20000000))); + expectError( + ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseObject("COL", tooLargeObject, 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseObjectNew("COL", tooLargeObject, 0)); // Test that invalid UTF-8 strings cannot be ingested expectError( ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseObject("COL", "{\"foo\": \"foo\uD800bar\"}", 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, + () -> validateAndParseObjectNew("COL", "{\"foo\": \"foo\uD800bar\"}", 0)); // Test forbidden values + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseObject("COL", "", 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseObjectNew("COL", "", 0)); + expectError( ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseObject("COL", readTree("{}"), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseObjectNew("COL", readTree("{}"), 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseObject("COL", "[]", 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseObjectNew("COL", "[]", 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseObject("COL", "1", 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseObjectNew("COL", "1", 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseObject("COL", 1, 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseObjectNew("COL", 1, 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseObject("COL", 1.5, 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseObjectNew("COL", 1.5, 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseObject("COL", false, 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseObjectNew("COL", false, 0)); + expectError(ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseObject("COL", new Object(), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseObjectNew("COL", new Object(), 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseObject("COL", "foo", 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseObjectNew("COL", "foo", 0)); + expectError(ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseObject("COL", new Date(), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseObjectNew("COL", new Date(), 0)); + expectError( ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseObject("COL", Collections.singletonList(new Object()), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, + () -> validateAndParseObjectNew("COL", Collections.singletonList(new Object()), 0)); + expectError( ErrorCode.INVALID_FORMAT_ROW, () -> @@ -674,12 +816,34 @@ public void testValidateAndParseObject() throws Exception { "COL", Collections.singletonList(Collections.singletonMap("foo", new Object())), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, + () -> + validateAndParseObjectNew( + "COL", + Collections.singletonList(Collections.singletonMap("foo", new Object())), + 0)); + expectError( ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseObject("COL", Collections.singletonMap(new Object(), "foo"), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, + () -> validateAndParseObjectNew("COL", Collections.singletonMap(new Object(), "foo"), 0)); + + expectError( + ErrorCode.INVALID_FORMAT_ROW, + () -> validateAndParseObject("COL", Collections.singletonMap(new Object(), "foo"), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, + () -> validateAndParseObjectNew("COL", Collections.singletonMap(new Object(), "foo"), 0)); + expectError( ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseObject("COL", Collections.singletonMap("foo", new Object()), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, + () -> validateAndParseObjectNew("COL", Collections.singletonMap("foo", new Object()), 0)); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index a0478bd8a..2bc4e1cdc 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -403,4 +403,22 @@ public void testInvalidCompressionAlgorithm() { e.getMessage()); } } + + @Test + public void EnableNewJsonParsingLogicAsBool() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + Assert.assertFalse(parameterProvider.isEnableNewJsonParsingLogic()); + } + + @Test + public void EnableNewJsonParsingLogicAsString() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC, "false"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + Assert.assertFalse(parameterProvider.isEnableNewJsonParsingLogic()); + } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParquetValueParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParquetValueParserTest.java index 6878478e2..e7c74e3b7 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParquetValueParserTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParquetValueParserTest.java @@ -10,6 +10,7 @@ import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import net.snowflake.ingest.utils.ParameterProvider; import net.snowflake.ingest.utils.SFException; import org.apache.parquet.schema.PrimitiveType; import org.junit.Assert; @@ -31,7 +32,13 @@ public void parseValueFixedSB1ToInt32() { RowBufferStats rowBufferStats = new RowBufferStats("COL1"); ParquetValueParser.ParquetBufferValue pv = ParquetValueParser.parseColumnValueToParquet( - 12, testCol, PrimitiveType.PrimitiveTypeName.INT32, rowBufferStats, UTC, 0); + 12, + testCol, + PrimitiveType.PrimitiveTypeName.INT32, + rowBufferStats, + UTC, + 0, + ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) @@ -57,7 +64,13 @@ public void parseValueFixedSB2ToInt32() { RowBufferStats rowBufferStats = new RowBufferStats("COL1"); ParquetValueParser.ParquetBufferValue pv = ParquetValueParser.parseColumnValueToParquet( - 1234, testCol, PrimitiveType.PrimitiveTypeName.INT32, rowBufferStats, UTC, 0); + 1234, + testCol, + PrimitiveType.PrimitiveTypeName.INT32, + rowBufferStats, + UTC, + 0, + ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) @@ -83,7 +96,13 @@ public void parseValueFixedSB4ToInt32() { RowBufferStats rowBufferStats = new RowBufferStats("COL1"); ParquetValueParser.ParquetBufferValue pv = ParquetValueParser.parseColumnValueToParquet( - 123456789, testCol, PrimitiveType.PrimitiveTypeName.INT32, rowBufferStats, UTC, 0); + 123456789, + testCol, + PrimitiveType.PrimitiveTypeName.INT32, + rowBufferStats, + UTC, + 0, + ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) @@ -114,7 +133,8 @@ public void parseValueFixedSB8ToInt64() { PrimitiveType.PrimitiveTypeName.INT64, rowBufferStats, UTC, - 0); + 0, + ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) @@ -145,7 +165,8 @@ public void parseValueFixedSB16ToByteArray() { PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, rowBufferStats, UTC, - 0); + 0, + ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) @@ -178,7 +199,8 @@ public void parseValueFixedDecimalToInt32() { PrimitiveType.PrimitiveTypeName.DOUBLE, rowBufferStats, UTC, - 0); + 0, + ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) @@ -202,7 +224,13 @@ public void parseValueDouble() { RowBufferStats rowBufferStats = new RowBufferStats("COL1"); ParquetValueParser.ParquetBufferValue pv = ParquetValueParser.parseColumnValueToParquet( - 12345.54321d, testCol, PrimitiveType.PrimitiveTypeName.DOUBLE, rowBufferStats, UTC, 0); + 12345.54321d, + testCol, + PrimitiveType.PrimitiveTypeName.DOUBLE, + rowBufferStats, + UTC, + 0, + ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) @@ -226,7 +254,13 @@ public void parseValueBoolean() { RowBufferStats rowBufferStats = new RowBufferStats("COL1"); ParquetValueParser.ParquetBufferValue pv = ParquetValueParser.parseColumnValueToParquet( - true, testCol, PrimitiveType.PrimitiveTypeName.BOOLEAN, rowBufferStats, UTC, 0); + true, + testCol, + PrimitiveType.PrimitiveTypeName.BOOLEAN, + rowBufferStats, + UTC, + 0, + ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) @@ -255,7 +289,8 @@ public void parseValueBinary() { PrimitiveType.PrimitiveTypeName.BINARY, rowBufferStats, UTC, - 0); + 0, + ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) @@ -270,15 +305,17 @@ public void parseValueBinary() { @Test public void parseValueVariantToBinary() { - testJsonWithLogicalType("VARIANT"); + testJsonWithLogicalType("VARIANT", true); + testJsonWithLogicalType("VARIANT", false); } @Test public void parseValueObjectToBinary() { - testJsonWithLogicalType("OBJECT"); + testJsonWithLogicalType("OBJECT", true); + testJsonWithLogicalType("OBJECT", false); } - private void testJsonWithLogicalType(String logicalType) { + private void testJsonWithLogicalType(String logicalType, boolean enableNewJsonParsingLogic) { ColumnMetadata testCol = ColumnMetadataBuilder.newBuilder() .logicalType(logicalType) @@ -292,7 +329,13 @@ private void testJsonWithLogicalType(String logicalType) { RowBufferStats rowBufferStats = new RowBufferStats("COL1"); ParquetValueParser.ParquetBufferValue pv = ParquetValueParser.parseColumnValueToParquet( - var, testCol, PrimitiveType.PrimitiveTypeName.BINARY, rowBufferStats, UTC, 0); + var, + testCol, + PrimitiveType.PrimitiveTypeName.BINARY, + rowBufferStats, + UTC, + 0, + enableNewJsonParsingLogic); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) @@ -309,20 +352,23 @@ private void testJsonWithLogicalType(String logicalType) { @Test public void parseValueNullVariantToBinary() { - testNullJsonWithLogicalType(null); + testNullJsonWithLogicalType(null, true); + testNullJsonWithLogicalType(null, false); } @Test public void parseValueEmptyStringVariantToBinary() { - testNullJsonWithLogicalType(""); + testNullJsonWithLogicalType("", true); + testNullJsonWithLogicalType("", false); } @Test public void parseValueEmptySpaceStringVariantToBinary() { - testNullJsonWithLogicalType(" "); + testNullJsonWithLogicalType(" ", true); + testNullJsonWithLogicalType(" ", false); } - private void testNullJsonWithLogicalType(String var) { + private void testNullJsonWithLogicalType(String var, boolean enableNewJsonParsingLogic) { ColumnMetadata testCol = ColumnMetadataBuilder.newBuilder() .logicalType("VARIANT") @@ -333,7 +379,13 @@ private void testNullJsonWithLogicalType(String var) { RowBufferStats rowBufferStats = new RowBufferStats("COL1"); ParquetValueParser.ParquetBufferValue pv = ParquetValueParser.parseColumnValueToParquet( - var, testCol, PrimitiveType.PrimitiveTypeName.BINARY, rowBufferStats, UTC, 0); + var, + testCol, + PrimitiveType.PrimitiveTypeName.BINARY, + rowBufferStats, + UTC, + 0, + enableNewJsonParsingLogic); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) @@ -348,6 +400,11 @@ private void testNullJsonWithLogicalType(String var) { @Test public void parseValueArrayToBinary() { + parseValueArrayToBinaryInternal(false); + parseValueArrayToBinaryInternal(true); + } + + public void parseValueArrayToBinaryInternal(boolean enableNewJsonParsingLogic) { ColumnMetadata testCol = ColumnMetadataBuilder.newBuilder() .logicalType("ARRAY") @@ -363,7 +420,13 @@ public void parseValueArrayToBinary() { RowBufferStats rowBufferStats = new RowBufferStats("COL1"); ParquetValueParser.ParquetBufferValue pv = ParquetValueParser.parseColumnValueToParquet( - input, testCol, PrimitiveType.PrimitiveTypeName.BINARY, rowBufferStats, UTC, 0); + input, + testCol, + PrimitiveType.PrimitiveTypeName.BINARY, + rowBufferStats, + UTC, + 0, + enableNewJsonParsingLogic); String resultArray = "[{\"a\":\"1\",\"b\":\"2\",\"c\":\"3\"}]"; @@ -395,7 +458,13 @@ public void parseValueTextToBinary() { RowBufferStats rowBufferStats = new RowBufferStats("COL1"); ParquetValueParser.ParquetBufferValue pv = ParquetValueParser.parseColumnValueToParquet( - text, testCol, PrimitiveType.PrimitiveTypeName.BINARY, rowBufferStats, UTC, 0); + text, + testCol, + PrimitiveType.PrimitiveTypeName.BINARY, + rowBufferStats, + UTC, + 0, + ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT); String result = text; @@ -434,7 +503,8 @@ public void parseValueTimestampNtzSB4Error() { PrimitiveType.PrimitiveTypeName.INT32, rowBufferStats, UTC, - 0)); + 0, + ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT)); Assert.assertEquals( "Unknown data type for logical: TIMESTAMP_NTZ, physical: SB4.", exception.getMessage()); } @@ -458,7 +528,8 @@ public void parseValueTimestampNtzSB8ToINT64() { PrimitiveType.PrimitiveTypeName.INT64, rowBufferStats, UTC, - 0); + 0, + ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) @@ -488,7 +559,8 @@ public void parseValueTimestampNtzSB16ToByteArray() { PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, rowBufferStats, UTC, - 0); + 0, + ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) @@ -514,7 +586,13 @@ public void parseValueDateToInt32() { RowBufferStats rowBufferStats = new RowBufferStats("COL1"); ParquetValueParser.ParquetBufferValue pv = ParquetValueParser.parseColumnValueToParquet( - "2021-01-01", testCol, PrimitiveType.PrimitiveTypeName.INT32, rowBufferStats, UTC, 0); + "2021-01-01", + testCol, + PrimitiveType.PrimitiveTypeName.INT32, + rowBufferStats, + UTC, + 0, + ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) @@ -539,7 +617,13 @@ public void parseValueTimeSB4ToInt32() { RowBufferStats rowBufferStats = new RowBufferStats("COL1"); ParquetValueParser.ParquetBufferValue pv = ParquetValueParser.parseColumnValueToParquet( - "01:00:00", testCol, PrimitiveType.PrimitiveTypeName.INT32, rowBufferStats, UTC, 0); + "01:00:00", + testCol, + PrimitiveType.PrimitiveTypeName.INT32, + rowBufferStats, + UTC, + 0, + ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) @@ -564,7 +648,13 @@ public void parseValueTimeSB8ToInt64() { RowBufferStats rowBufferStats = new RowBufferStats("COL1"); ParquetValueParser.ParquetBufferValue pv = ParquetValueParser.parseColumnValueToParquet( - "01:00:00.123", testCol, PrimitiveType.PrimitiveTypeName.INT64, rowBufferStats, UTC, 0); + "01:00:00.123", + testCol, + PrimitiveType.PrimitiveTypeName.INT64, + rowBufferStats, + UTC, + 0, + ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) @@ -597,7 +687,8 @@ public void parseValueTimeSB16Error() { PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, rowBufferStats, UTC, - 0)); + 0, + ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT)); Assert.assertEquals( "Unknown data type for logical: TIME, physical: SB16.", exception.getMessage()); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index fe4ae4804..41739c267 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -1,6 +1,7 @@ package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; +import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT; import static net.snowflake.ingest.utils.ParameterProvider.MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNK_SIZE_IN_BYTES_DEFAULT; import static org.junit.Assert.fail; @@ -129,7 +130,8 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o enableParquetMemoryOptimization, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT, - Constants.BdecParquetCompression.GZIP), + Constants.BdecParquetCompression.GZIP, + ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT), null, null); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java index ec3017ad5..c39ffe967 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java @@ -378,6 +378,33 @@ void assertVariant( migrateTable(tableName); // migration should always succeed } + void assertVariantLiterally( + String dataType, String writeValue, String expectedValue, String expectedType) + throws Exception { + + String tableName = createTable(dataType); + String offsetToken = UUID.randomUUID().toString(); + + // Ingest using streaming ingest + SnowflakeStreamingIngestChannel channel = openChannel(tableName); + channel.insertRow(createStreamingIngestRow(writeValue), offsetToken); + TestUtils.waitForOffset(channel, offsetToken); + + final String query = + String.format( + "select %s as v1, typeof(v1) as v1_type, parse_json('%s') as v2, typeof(v2) as v2_type" + + " from %s", + VALUE_COLUMN_NAME, writeValue, tableName); + try (ResultSet resultSet = conn.createStatement().executeQuery(query)) { + resultSet.next(); + Assert.assertEquals(expectedValue, resultSet.getString("V1")); + Assert.assertEquals(expectedValue, resultSet.getString("V2")); + Assert.assertEquals(expectedType, resultSet.getString("V1_TYPE")); + Assert.assertEquals(expectedType, resultSet.getString("V2_TYPE")); + } + ; + } + protected void migrateTable(String tableName) throws SQLException { conn.createStatement().execute(String.format("alter table %s migrate;", tableName)); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java index a3c9d2365..432312357 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java @@ -222,6 +222,30 @@ public void testArray() throws Exception { assertVariant("ARRAY", Collections.singletonMap("1", "2"), "[{\"1\": \"2\"}]", "ARRAY"); } + @Test + public void testNumberScientificNotation() throws Exception { + assertVariantLiterally("VARIANT", " 12.34\t\n", "12.34", "DECIMAL"); + + assertVariantLiterally("VARIANT", " 1.234e1\t\n", "1.234000000000000e+01", "DOUBLE"); + assertVariantLiterally("VARIANT", " 1.234E1\t\n", "1.234000000000000e+01", "DOUBLE"); + assertVariantLiterally("VARIANT", " 123.4e-1\t\n", "1.234000000000000e+01", "DOUBLE"); + assertVariantLiterally("VARIANT", " 123.4E-1\t\n", "1.234000000000000e+01", "DOUBLE"); + + assertVariantLiterally("VARIANT", " 1234e1\t\n", "1.234000000000000e+04", "DOUBLE"); + assertVariantLiterally("VARIANT", " 1234E1\t\n", "1.234000000000000e+04", "DOUBLE"); + assertVariantLiterally("VARIANT", " 1234e-1\t\n", "1.234000000000000e+02", "DOUBLE"); + assertVariantLiterally("VARIANT", " 1234E-1\t\n", "1.234000000000000e+02", "DOUBLE"); + + assertVariantLiterally( + "OBJECT", + " {\"key\": 1.234E1\t\n}", + "{\n" + " \"key\": 1.234000000000000e+01\n" + "}", + "OBJECT"); + + assertVariantLiterally( + "ARRAY", " [1.234E1\t\n]\n", "[\n" + " 1.234000000000000e+01\n" + "]", "ARRAY"); + } + private String createLargeVariantObject(int size) throws JsonProcessingException { char[] stringContent = new char[size - 17]; // {"a":"11","b":""} Arrays.fill(stringContent, 'c');