Skip to content

Commit

Permalink
SNOW-1476302 Preserve numeric format for variants
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-lsembera committed Aug 26, 2024
1 parent b4b84b8 commit 543b5a7
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 39 deletions.
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,10 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@

import static net.snowflake.ingest.streaming.internal.BinaryStringUtils.unicodeCharactersCount;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -39,6 +44,7 @@
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.output.StringBuilderWriter;

/** Utility class for parsing and validating inputs based on Snowflake types */
class DataValidationUtil {
Expand Down Expand Up @@ -70,6 +76,8 @@ class DataValidationUtil {

private static final ObjectMapper objectMapper = new ObjectMapper();

private static final JsonFactory factory = new JsonFactory();

// The version of Jackson we are using does not support serialization of date objects from the
// java.time package. Here we define a module with custom java.time serializers. Additionally, we
// define custom serializer for byte[] because the Jackson default is to serialize it as
Expand Down Expand Up @@ -103,21 +111,38 @@ private static BigDecimal[] makePower10Table() {
* see {@link DataValidationUtil#isAllowedSemiStructuredType}.
*
* @param input Object to validate
* @return JSON tree representing the input
* @return Minified JSON string
*/
private static JsonNode validateAndParseSemiStructuredAsJsonTree(
private static String validateAndParseSemiStructured(
String columnName, Object input, String snowflakeType, final long insertRowIndex) {
if (input instanceof String) {
String stringInput = (String) input;
final String stringInput = (String) input;
verifyValidUtf8(stringInput, columnName, snowflakeType, insertRowIndex);
try {
return objectMapper.readTree(stringInput);
} catch (JsonProcessingException e) {
final StringBuilderWriter resultWriter = new StringBuilderWriter(stringInput.length());
try (final JsonParser parser = factory.createParser(stringInput);
final JsonGenerator generator = factory.createGenerator(resultWriter)) {
while (parser.nextToken() != null) {
final JsonToken token = parser.currentToken();
if (token.isNumeric()) {
// If the current token is a number, we cannot just copy the current event because it
// would write token the token from double (or big decimal), whose scientific notation
// may have been altered during deserialization. We want to preserve the scientific
// notation from the user input, so we write the current numer as text.
generator.writeNumber(parser.getText());
} else {
generator.copyCurrentEvent(parser);
}
}
} catch (JsonParseException e) {
throw valueFormatNotAllowedException(
columnName, snowflakeType, "Not a valid JSON", insertRowIndex);
} catch (IOException e) {
throw new SFException(e, ErrorCode.IO_ERROR, "Cannot create JSON Parser or JSON generator");
}
return resultWriter.toString();
} else if (isAllowedSemiStructuredType(input)) {
return objectMapper.valueToTree(input);
JsonNode node = objectMapper.valueToTree(input);
return node.toString();
}

throw typeNotAllowedException(
Expand All @@ -144,16 +169,14 @@ private static JsonNode validateAndParseSemiStructuredAsJsonTree(
* @return JSON string representing the input
*/
static String validateAndParseVariant(String columnName, Object input, long insertRowIndex) {
JsonNode node =
validateAndParseSemiStructuredAsJsonTree(columnName, input, "VARIANT", insertRowIndex);
final String result =
validateAndParseSemiStructured(columnName, input, "VARIANT", insertRowIndex);

// Missing nodes are not valid json, ingest them as NULL instead
if (node.isMissingNode()) {
// Empty json strings are ingested as nulls
if (result.isEmpty()) {
return null;
}

String output = node.toString();
int stringLength = output.getBytes(StandardCharsets.UTF_8).length;
int stringLength = result.getBytes(StandardCharsets.UTF_8).length;
if (stringLength > MAX_SEMI_STRUCTURED_LENGTH) {
throw valueFormatNotAllowedException(
columnName,
Expand All @@ -162,7 +185,7 @@ static String validateAndParseVariant(String columnName, Object input, long inse
"Variant too long: length=%d maxLength=%d", stringLength, MAX_SEMI_STRUCTURED_LENGTH),
insertRowIndex);
}
return output;
return result;
}

/**
Expand Down Expand Up @@ -277,17 +300,20 @@ static boolean isAllowedSemiStructuredType(Object o) {
* @return JSON array representing the input
*/
static String validateAndParseArray(String columnName, Object input, long insertRowIndex) {
JsonNode jsonNode =
validateAndParseSemiStructuredAsJsonTree(columnName, input, "ARRAY", insertRowIndex);

// Non-array values are ingested as single-element arrays, mimicking the Worksheets behavior
if (!jsonNode.isArray()) {
jsonNode = objectMapper.createArrayNode().add(jsonNode);
String result = validateAndParseSemiStructured(columnName, input, "ARRAY", insertRowIndex);
if (result.isEmpty()) {
// Empty input is ingested as an array of null
result =
JsonToken.START_ARRAY.asString()
+ JsonToken.VALUE_NULL.asString()
+ JsonToken.END_ARRAY.asString();
} else if (!result.startsWith(JsonToken.START_ARRAY.asString())) {
// Non-array values are ingested as single-element arrays, mimicking the Worksheets behavior
result = JsonToken.START_ARRAY.asString() + result + JsonToken.END_ARRAY.asString();
}

String output = jsonNode.toString();
// Throw an exception if the size is too large
int stringLength = output.getBytes(StandardCharsets.UTF_8).length;
int stringLength = result.getBytes(StandardCharsets.UTF_8).length;
if (stringLength > MAX_SEMI_STRUCTURED_LENGTH) {
throw valueFormatNotAllowedException(
columnName,
Expand All @@ -296,7 +322,7 @@ static String validateAndParseArray(String columnName, Object input, long insert
"Array too large. length=%d maxLength=%d", stringLength, MAX_SEMI_STRUCTURED_LENGTH),
insertRowIndex);
}
return output;
return result;
}

/**
Expand All @@ -309,15 +335,13 @@ static String validateAndParseArray(String columnName, Object input, long insert
* @return JSON object representing the input
*/
static String validateAndParseObject(String columnName, Object input, long insertRowIndex) {
JsonNode jsonNode =
validateAndParseSemiStructuredAsJsonTree(columnName, input, "OBJECT", insertRowIndex);
if (!jsonNode.isObject()) {
final String result =
validateAndParseSemiStructured(columnName, input, "OBJECT", insertRowIndex);
if (!result.startsWith(JsonToken.START_OBJECT.asString())) {
throw valueFormatNotAllowedException(columnName, "OBJECT", "Not an object", insertRowIndex);
}

String output = jsonNode.toString();
// Throw an exception if the size is too large
int stringLength = output.getBytes(StandardCharsets.UTF_8).length;
int stringLength = result.getBytes(StandardCharsets.UTF_8).length;
if (stringLength > MAX_SEMI_STRUCTURED_LENGTH) {
throw valueFormatNotAllowedException(
columnName,
Expand All @@ -326,7 +350,7 @@ static String validateAndParseObject(String columnName, Object input, long inser
"Object too large. length=%d maxLength=%d", stringLength, MAX_SEMI_STRUCTURED_LENGTH),
insertRowIndex);
}
return output;
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,9 +493,38 @@ public void testValidateAndParseVariant() throws Exception {
assertEquals("1", validateAndParseVariant("COL", 1, 0));
assertEquals("1", validateAndParseVariant("COL", "1", 0));
assertEquals("1", validateAndParseVariant("COL", " 1 ", 0));
String stringVariant = "{\"key\":1}";
assertEquals(stringVariant, validateAndParseVariant("COL", stringVariant, 0));
assertEquals(stringVariant, validateAndParseVariant("COL", " " + stringVariant + " \t\n", 0));
assertEquals("{\"key\":1}", validateAndParseVariant("COL", "{\"key\":1}", 0));
assertEquals("{\"key\":1}", validateAndParseVariant("COL", " { \"key\" : 1 } \t\n", 0));

// Variants should preserve input format of numbers
assertEquals(
"{\"key\":1111111.1111111}",
validateAndParseObject("COL", " {\"key\": 1111111.1111111} \t\n", 0));
assertEquals(
"{\"key\":11.111111111111e8}",
validateAndParseObject("COL", " {\"key\": 11.111111111111e8 } \t\n", 0));
assertEquals(
"{\"key\":11.111111111111e-8}",
validateAndParseObject("COL", " {\"key\": 11.111111111111e-8 } \t\n", 0));
assertEquals(
"{\"key\":11.111111111111E8}",
validateAndParseObject("COL", " {\"key\": 11.111111111111E8 } \t\n", 0));
assertEquals(
"{\"key\":11.111111111111E-8}",
validateAndParseObject("COL", " {\"key\": 11.111111111111E-8 } \t\n", 0));

assertEquals(
"{\"key\":11111111111111e8}",
validateAndParseObject("COL", " {\"key\": 11111111111111e8 } \t\n", 0));
assertEquals(
"{\"key\":11111111111111e-8}",
validateAndParseObject("COL", " {\"key\": 11111111111111e-8 } \t\n", 0));
assertEquals(
"{\"key\":11111111111111E8}",
validateAndParseObject("COL", " {\"key\": 11111111111111E8 } \t\n", 0));
assertEquals(
"{\"key\":11111111111111E-8}",
validateAndParseObject("COL", " {\"key\": 11111111111111E-8 } \t\n", 0));

// Test custom serializers
assertEquals(
Expand Down Expand Up @@ -559,7 +588,8 @@ public void testValidateAndParseVariant() throws Exception {
public void testValidateAndParseArray() throws Exception {
assertEquals("[1]", validateAndParseArray("COL", 1, 0));
assertEquals("[1]", validateAndParseArray("COL", "1", 0));
assertEquals("[1]", validateAndParseArray("COL", " 1 ", 0));
assertEquals(
"[1.1e10]", validateAndParseArray("COL", " 1.1e10 ", 0));
assertEquals("[1,2,3]", validateAndParseArray("COL", "[1, 2, 3]", 0));
assertEquals("[1,2,3]", validateAndParseArray("COL", " [1, 2, 3] \t\n", 0));
int[] intArray = new int[] {1, 2, 3};
Expand Down Expand Up @@ -623,9 +653,8 @@ public void testValidateAndParseArray() throws Exception {

@Test
public void testValidateAndParseObject() throws Exception {
String stringObject = "{\"key\":1}";
assertEquals(stringObject, validateAndParseObject("COL", stringObject, 0));
assertEquals(stringObject, validateAndParseObject("COL", " " + stringObject + " \t\n", 0));
assertEquals("{\"key\":1}", validateAndParseObject("COL", "{\"key\":1}", 0));
assertEquals("{\"key\":1}", validateAndParseObject("COL", " {\"key\": 1} \t\n", 0));

String badObject = "foo";
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,33 @@ <STREAMING_INGEST_WRITE> void assertVariant(
migrateTable(tableName); // migration should always succeed
}

void assertVariantLiterally(
String dataType, String writeValue, String expectedValue, String expectedType)
throws Exception {

String tableName = createTable(dataType);
String offsetToken = UUID.randomUUID().toString();

// Ingest using streaming ingest
SnowflakeStreamingIngestChannel channel = openChannel(tableName);
channel.insertRow(createStreamingIngestRow(writeValue), offsetToken);
TestUtils.waitForOffset(channel, offsetToken);

final String query =
String.format(
"select %s as v1, typeof(v1) as v1_type, parse_json('%s') as v2, typeof(v2) as v2_type"
+ " from %s",
VALUE_COLUMN_NAME, writeValue, tableName);
try (ResultSet resultSet = conn.createStatement().executeQuery(query)) {
resultSet.next();
Assert.assertEquals(expectedValue, resultSet.getString("V1"));
Assert.assertEquals(expectedValue, resultSet.getString("V2"));
Assert.assertEquals(expectedType, resultSet.getString("V1_TYPE"));
Assert.assertEquals(expectedType, resultSet.getString("V2_TYPE"));
}
;
}

protected void migrateTable(String tableName) throws SQLException {
conn.createStatement().execute(String.format("alter table %s migrate;", tableName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,24 @@ public void testArray() throws Exception {
assertVariant("ARRAY", Collections.singletonMap("1", "2"), "[{\"1\": \"2\"}]", "ARRAY");
}

@Test
public void testNumberScientificNotation() throws Exception {
assertVariantLiterally("VARIANT", " 12.34\t\n", "12.34", "DECIMAL");
assertVariantLiterally("VARIANT", " 1.234e1\t\n", "1.234000000000000e+01", "DOUBLE");
assertVariantLiterally("VARIANT", " 1.234E1\t\n", "1.234000000000000e+01", "DOUBLE");
assertVariantLiterally("VARIANT", " 123.4e-1\t\n", "1.234000000000000e+01", "DOUBLE");
assertVariantLiterally("VARIANT", " 123.4E-1\t\n", "1.234000000000000e+01", "DOUBLE");

assertVariantLiterally(
"OBJECT",
" {\"key\": 1.234E1\t\n}",
"{\n" + " \"key\": 1.234000000000000e+01\n" + "}",
"OBJECT");

assertVariantLiterally(
"ARRAY", " [1.234E1\t\n]\n", "[\n" + " 1.234000000000000e+01\n" + "]", "ARRAY");
}

private String createLargeVariantObject(int size) throws JsonProcessingException {
char[] stringContent = new char[size - 17]; // {"a":"11","b":""}
Arrays.fill(stringContent, 'c');
Expand Down

0 comments on commit 543b5a7

Please sign in to comment.