diff --git a/pom.xml b/pom.xml index b771ac80b..017c1f673 100644 --- a/pom.xml +++ b/pom.xml @@ -426,6 +426,10 @@ commons-codec commons-codec + + commons-io + commons-io + diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java index 162e56145..092982d3c 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java @@ -6,11 +6,16 @@ import static net.snowflake.ingest.streaming.internal.BinaryStringUtils.unicodeCharactersCount; -import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; +import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.charset.StandardCharsets; @@ -39,6 +44,7 @@ import net.snowflake.ingest.utils.SFException; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; +import org.apache.commons.io.output.StringBuilderWriter; /** Utility class for parsing and validating inputs based on Snowflake types */ class DataValidationUtil { @@ -70,6 +76,8 @@ class DataValidationUtil { private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final JsonFactory factory = new JsonFactory(); + // The version of Jackson we are using does not support serialization of date objects from the // java.time package. Here we define a module with custom java.time serializers. Additionally, we // define custom serializer for byte[] because the Jackson default is to serialize it as @@ -103,21 +111,38 @@ private static BigDecimal[] makePower10Table() { * see {@link DataValidationUtil#isAllowedSemiStructuredType}. * * @param input Object to validate - * @return JSON tree representing the input + * @return Minified JSON string */ - private static JsonNode validateAndParseSemiStructuredAsJsonTree( + private static String validateAndParseSemiStructured( String columnName, Object input, String snowflakeType, final long insertRowIndex) { if (input instanceof String) { - String stringInput = (String) input; + final String stringInput = (String) input; verifyValidUtf8(stringInput, columnName, snowflakeType, insertRowIndex); - try { - return objectMapper.readTree(stringInput); - } catch (JsonProcessingException e) { + final StringBuilderWriter resultWriter = new StringBuilderWriter(stringInput.length()); + try (final JsonParser parser = factory.createParser(stringInput); + final JsonGenerator generator = factory.createGenerator(resultWriter)) { + while (parser.nextToken() != null) { + final JsonToken token = parser.currentToken(); + if (token.isNumeric()) { + // If the current token is a number, we cannot just copy the current event because it + // would write token the token from double (or big decimal), whose scientific notation + // may have been altered during deserialization. We want to preserve the scientific + // notation from the user input, so we write the current numer as text. + generator.writeNumber(parser.getText()); + } else { + generator.copyCurrentEvent(parser); + } + } + } catch (JsonParseException e) { throw valueFormatNotAllowedException( columnName, snowflakeType, "Not a valid JSON", insertRowIndex); + } catch (IOException e) { + throw new SFException(e, ErrorCode.IO_ERROR, "Cannot create JSON Parser or JSON generator"); } + return resultWriter.toString(); } else if (isAllowedSemiStructuredType(input)) { - return objectMapper.valueToTree(input); + JsonNode node = objectMapper.valueToTree(input); + return node.toString(); } throw typeNotAllowedException( @@ -144,16 +169,14 @@ private static JsonNode validateAndParseSemiStructuredAsJsonTree( * @return JSON string representing the input */ static String validateAndParseVariant(String columnName, Object input, long insertRowIndex) { - JsonNode node = - validateAndParseSemiStructuredAsJsonTree(columnName, input, "VARIANT", insertRowIndex); + final String result = + validateAndParseSemiStructured(columnName, input, "VARIANT", insertRowIndex); - // Missing nodes are not valid json, ingest them as NULL instead - if (node.isMissingNode()) { + // Empty json strings are ingested as nulls + if (result.isEmpty()) { return null; } - - String output = node.toString(); - int stringLength = output.getBytes(StandardCharsets.UTF_8).length; + int stringLength = result.getBytes(StandardCharsets.UTF_8).length; if (stringLength > MAX_SEMI_STRUCTURED_LENGTH) { throw valueFormatNotAllowedException( columnName, @@ -162,7 +185,7 @@ static String validateAndParseVariant(String columnName, Object input, long inse "Variant too long: length=%d maxLength=%d", stringLength, MAX_SEMI_STRUCTURED_LENGTH), insertRowIndex); } - return output; + return result; } /** @@ -277,17 +300,20 @@ static boolean isAllowedSemiStructuredType(Object o) { * @return JSON array representing the input */ static String validateAndParseArray(String columnName, Object input, long insertRowIndex) { - JsonNode jsonNode = - validateAndParseSemiStructuredAsJsonTree(columnName, input, "ARRAY", insertRowIndex); - - // Non-array values are ingested as single-element arrays, mimicking the Worksheets behavior - if (!jsonNode.isArray()) { - jsonNode = objectMapper.createArrayNode().add(jsonNode); + String result = validateAndParseSemiStructured(columnName, input, "ARRAY", insertRowIndex); + if (result.isEmpty()) { + // Empty input is ingested as an array of null + result = + JsonToken.START_ARRAY.asString() + + JsonToken.VALUE_NULL.asString() + + JsonToken.END_ARRAY.asString(); + } else if (!result.startsWith(JsonToken.START_ARRAY.asString())) { + // Non-array values are ingested as single-element arrays, mimicking the Worksheets behavior + result = JsonToken.START_ARRAY.asString() + result + JsonToken.END_ARRAY.asString(); } - String output = jsonNode.toString(); // Throw an exception if the size is too large - int stringLength = output.getBytes(StandardCharsets.UTF_8).length; + int stringLength = result.getBytes(StandardCharsets.UTF_8).length; if (stringLength > MAX_SEMI_STRUCTURED_LENGTH) { throw valueFormatNotAllowedException( columnName, @@ -296,7 +322,7 @@ static String validateAndParseArray(String columnName, Object input, long insert "Array too large. length=%d maxLength=%d", stringLength, MAX_SEMI_STRUCTURED_LENGTH), insertRowIndex); } - return output; + return result; } /** @@ -309,15 +335,13 @@ static String validateAndParseArray(String columnName, Object input, long insert * @return JSON object representing the input */ static String validateAndParseObject(String columnName, Object input, long insertRowIndex) { - JsonNode jsonNode = - validateAndParseSemiStructuredAsJsonTree(columnName, input, "OBJECT", insertRowIndex); - if (!jsonNode.isObject()) { + final String result = + validateAndParseSemiStructured(columnName, input, "OBJECT", insertRowIndex); + if (!result.startsWith(JsonToken.START_OBJECT.asString())) { throw valueFormatNotAllowedException(columnName, "OBJECT", "Not an object", insertRowIndex); } - - String output = jsonNode.toString(); // Throw an exception if the size is too large - int stringLength = output.getBytes(StandardCharsets.UTF_8).length; + int stringLength = result.getBytes(StandardCharsets.UTF_8).length; if (stringLength > MAX_SEMI_STRUCTURED_LENGTH) { throw valueFormatNotAllowedException( columnName, @@ -326,7 +350,7 @@ static String validateAndParseObject(String columnName, Object input, long inser "Object too large. length=%d maxLength=%d", stringLength, MAX_SEMI_STRUCTURED_LENGTH), insertRowIndex); } - return output; + return result; } /** diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java index 8ab22619f..3187bf34b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java @@ -493,9 +493,38 @@ public void testValidateAndParseVariant() throws Exception { assertEquals("1", validateAndParseVariant("COL", 1, 0)); assertEquals("1", validateAndParseVariant("COL", "1", 0)); assertEquals("1", validateAndParseVariant("COL", " 1 ", 0)); - String stringVariant = "{\"key\":1}"; - assertEquals(stringVariant, validateAndParseVariant("COL", stringVariant, 0)); - assertEquals(stringVariant, validateAndParseVariant("COL", " " + stringVariant + " \t\n", 0)); + assertEquals("{\"key\":1}", validateAndParseVariant("COL", "{\"key\":1}", 0)); + assertEquals("{\"key\":1}", validateAndParseVariant("COL", " { \"key\" : 1 } \t\n", 0)); + + // Variants should preserve input format of numbers + assertEquals( + "{\"key\":1111111.1111111}", + validateAndParseObject("COL", " {\"key\": 1111111.1111111} \t\n", 0)); + assertEquals( + "{\"key\":11.111111111111e8}", + validateAndParseObject("COL", " {\"key\": 11.111111111111e8 } \t\n", 0)); + assertEquals( + "{\"key\":11.111111111111e-8}", + validateAndParseObject("COL", " {\"key\": 11.111111111111e-8 } \t\n", 0)); + assertEquals( + "{\"key\":11.111111111111E8}", + validateAndParseObject("COL", " {\"key\": 11.111111111111E8 } \t\n", 0)); + assertEquals( + "{\"key\":11.111111111111E-8}", + validateAndParseObject("COL", " {\"key\": 11.111111111111E-8 } \t\n", 0)); + + assertEquals( + "{\"key\":11111111111111e8}", + validateAndParseObject("COL", " {\"key\": 11111111111111e8 } \t\n", 0)); + assertEquals( + "{\"key\":11111111111111e-8}", + validateAndParseObject("COL", " {\"key\": 11111111111111e-8 } \t\n", 0)); + assertEquals( + "{\"key\":11111111111111E8}", + validateAndParseObject("COL", " {\"key\": 11111111111111E8 } \t\n", 0)); + assertEquals( + "{\"key\":11111111111111E-8}", + validateAndParseObject("COL", " {\"key\": 11111111111111E-8 } \t\n", 0)); // Test custom serializers assertEquals( @@ -559,7 +588,8 @@ public void testValidateAndParseVariant() throws Exception { public void testValidateAndParseArray() throws Exception { assertEquals("[1]", validateAndParseArray("COL", 1, 0)); assertEquals("[1]", validateAndParseArray("COL", "1", 0)); - assertEquals("[1]", validateAndParseArray("COL", " 1 ", 0)); + assertEquals( + "[1.1e10]", validateAndParseArray("COL", " 1.1e10 ", 0)); assertEquals("[1,2,3]", validateAndParseArray("COL", "[1, 2, 3]", 0)); assertEquals("[1,2,3]", validateAndParseArray("COL", " [1, 2, 3] \t\n", 0)); int[] intArray = new int[] {1, 2, 3}; @@ -623,9 +653,8 @@ public void testValidateAndParseArray() throws Exception { @Test public void testValidateAndParseObject() throws Exception { - String stringObject = "{\"key\":1}"; - assertEquals(stringObject, validateAndParseObject("COL", stringObject, 0)); - assertEquals(stringObject, validateAndParseObject("COL", " " + stringObject + " \t\n", 0)); + assertEquals("{\"key\":1}", validateAndParseObject("COL", "{\"key\":1}", 0)); + assertEquals("{\"key\":1}", validateAndParseObject("COL", " {\"key\": 1} \t\n", 0)); String badObject = "foo"; try { diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java index ec3017ad5..c39ffe967 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java @@ -378,6 +378,33 @@ void assertVariant( migrateTable(tableName); // migration should always succeed } + void assertVariantLiterally( + String dataType, String writeValue, String expectedValue, String expectedType) + throws Exception { + + String tableName = createTable(dataType); + String offsetToken = UUID.randomUUID().toString(); + + // Ingest using streaming ingest + SnowflakeStreamingIngestChannel channel = openChannel(tableName); + channel.insertRow(createStreamingIngestRow(writeValue), offsetToken); + TestUtils.waitForOffset(channel, offsetToken); + + final String query = + String.format( + "select %s as v1, typeof(v1) as v1_type, parse_json('%s') as v2, typeof(v2) as v2_type" + + " from %s", + VALUE_COLUMN_NAME, writeValue, tableName); + try (ResultSet resultSet = conn.createStatement().executeQuery(query)) { + resultSet.next(); + Assert.assertEquals(expectedValue, resultSet.getString("V1")); + Assert.assertEquals(expectedValue, resultSet.getString("V2")); + Assert.assertEquals(expectedType, resultSet.getString("V1_TYPE")); + Assert.assertEquals(expectedType, resultSet.getString("V2_TYPE")); + } + ; + } + protected void migrateTable(String tableName) throws SQLException { conn.createStatement().execute(String.format("alter table %s migrate;", tableName)); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java index a3c9d2365..30b4b349b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java @@ -222,6 +222,24 @@ public void testArray() throws Exception { assertVariant("ARRAY", Collections.singletonMap("1", "2"), "[{\"1\": \"2\"}]", "ARRAY"); } + @Test + public void testNumberScientificNotation() throws Exception { + assertVariantLiterally("VARIANT", " 12.34\t\n", "12.34", "DECIMAL"); + assertVariantLiterally("VARIANT", " 1.234e1\t\n", "1.234000000000000e+01", "DOUBLE"); + assertVariantLiterally("VARIANT", " 1.234E1\t\n", "1.234000000000000e+01", "DOUBLE"); + assertVariantLiterally("VARIANT", " 123.4e-1\t\n", "1.234000000000000e+01", "DOUBLE"); + assertVariantLiterally("VARIANT", " 123.4E-1\t\n", "1.234000000000000e+01", "DOUBLE"); + + assertVariantLiterally( + "OBJECT", + " {\"key\": 1.234E1\t\n}", + "{\n" + " \"key\": 1.234000000000000e+01\n" + "}", + "OBJECT"); + + assertVariantLiterally( + "ARRAY", " [1.234E1\t\n]\n", "[\n" + " 1.234000000000000e+01\n" + "]", "ARRAY"); + } + private String createLargeVariantObject(int size) throws JsonProcessingException { char[] stringContent = new char[size - 17]; // {"a":"11","b":""} Arrays.fill(stringContent, 'c');