Skip to content

Commit

Permalink
SNOW-1566045 Fix double scientific format
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-lsembera committed Sep 3, 2024
1 parent b9d31b6 commit 6e5234e
Show file tree
Hide file tree
Showing 12 changed files with 661 additions and 122 deletions.
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,10 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public class ClientBufferParameters {

private long maxAllowedRowSizeInBytes;

private final boolean enableNewJsonParsingLogic;

private Constants.BdecParquetCompression bdecParquetCompression;

/**
Expand All @@ -30,11 +32,13 @@ private ClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic) {
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.enableNewJsonParsingLogic = enableNewJsonParsingLogic;
}

/** @param clientInternal reference to the client object where the relevant parameters are set */
Expand All @@ -55,6 +59,11 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.getParameterProvider().getBdecParquetCompressionAlgorithm()
: ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT;

this.enableNewJsonParsingLogic =
clientInternal != null
? clientInternal.getParameterProvider().isEnableNewJsonParsingLogic()
: ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT;
}

/**
Expand All @@ -68,12 +77,14 @@ public static ClientBufferParameters test_createClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic) {
return new ClientBufferParameters(
enableParquetInternalBuffering,
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression);
bdecParquetCompression,
enableNewJsonParsingLogic);
}

public boolean getEnableParquetInternalBuffering() {
Expand All @@ -91,4 +102,8 @@ public long getMaxAllowedRowSizeInBytes() {
public Constants.BdecParquetCompression getBdecParquetCompression() {
return bdecParquetCompression;
}

public boolean isEnableNewJsonParsingLogic() {
return enableNewJsonParsingLogic;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@

import static net.snowflake.ingest.streaming.internal.BinaryStringUtils.unicodeCharactersCount;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -39,6 +45,7 @@
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.output.StringBuilderWriter;

/** Utility class for parsing and validating inputs based on Snowflake types */
class DataValidationUtil {
Expand Down Expand Up @@ -70,6 +77,8 @@ class DataValidationUtil {

private static final ObjectMapper objectMapper = new ObjectMapper();

private static final JsonFactory factory = new JsonFactory();

// The version of Jackson we are using does not support serialization of date objects from the
// java.time package. Here we define a module with custom java.time serializers. Additionally, we
// define custom serializer for byte[] because the Jackson default is to serialize it as
Expand Down Expand Up @@ -135,6 +144,61 @@ private static JsonNode validateAndParseSemiStructuredAsJsonTree(
insertRowIndex);
}

/**
* Validates and parses input as JSON. All types in the object tree must be valid variant types,
* see {@link DataValidationUtil#isAllowedSemiStructuredType}.
*
* @param input Object to validate
* @return Minified JSON string
*/
private static String validateAndParseSemiStructured(
String columnName, Object input, String snowflakeType, final long insertRowIndex) {
if (input instanceof String) {
final String stringInput = (String) input;
verifyValidUtf8(stringInput, columnName, snowflakeType, insertRowIndex);
final StringBuilderWriter resultWriter = new StringBuilderWriter(stringInput.length());
try (final JsonParser parser = factory.createParser(stringInput);
final JsonGenerator generator = factory.createGenerator(resultWriter)) {
while (parser.nextToken() != null) {
final JsonToken token = parser.currentToken();
if (token.isNumeric()) {
// If the current token is a number, we cannot just copy the current event because it
// would write token the token from double (or big decimal), whose scientific notation
// may have been altered during deserialization. We want to preserve the scientific
// notation from the user input, so we write the current numer as text.
generator.writeNumber(parser.getText());
} else {
generator.copyCurrentEvent(parser);
}
}
} catch (JsonParseException e) {
throw valueFormatNotAllowedException(
columnName, snowflakeType, "Not a valid JSON", insertRowIndex);
} catch (IOException e) {
throw new SFException(e, ErrorCode.IO_ERROR, "Cannot create JSON Parser or JSON generator");
}
// We return the minified string from the result writer
return resultWriter.toString();
} else if (isAllowedSemiStructuredType(input)) {
JsonNode node = objectMapper.valueToTree(input);
return node.toString();
}

throw typeNotAllowedException(
columnName,
input.getClass(),
snowflakeType,
new String[] {
"String",
"Primitive data types and their arrays",
"java.time.*",
"List<T>",
"Map<String, T>",
"T[]"
},
insertRowIndex);
}

/**
* Validates and parses input as JSON. All types in the object tree must be valid variant types,
* see {@link DataValidationUtil#isAllowedSemiStructuredType}.
Expand Down Expand Up @@ -165,6 +229,34 @@ static String validateAndParseVariant(String columnName, Object input, long inse
return output;
}

/**
* Validates and parses input as JSON. All types in the object tree must be valid variant types,
* see {@link DataValidationUtil#isAllowedSemiStructuredType}.
*
* @param input Object to validate
* @param insertRowIndex
* @return JSON string representing the input
*/
static String validateAndParseVariantNew(String columnName, Object input, long insertRowIndex) {
final String result =
validateAndParseSemiStructured(columnName, input, "VARIANT", insertRowIndex);

// Empty json strings are ingested as nulls
if (result.isEmpty()) {
return null;
}
int stringLength = result.getBytes(StandardCharsets.UTF_8).length;
if (stringLength > MAX_SEMI_STRUCTURED_LENGTH) {
throw valueFormatNotAllowedException(
columnName,
"VARIANT",
String.format(
"Variant too long: length=%d maxLength=%d", stringLength, MAX_SEMI_STRUCTURED_LENGTH),
insertRowIndex);
}
return result;
}

/**
* Validates that passed object is allowed data type for semi-structured columns (i.e. VARIANT,
* ARRAY, OBJECT). For non-trivial types like maps, arrays or lists, it recursively traverses the
Expand Down Expand Up @@ -299,6 +391,41 @@ static String validateAndParseArray(String columnName, Object input, long insert
return output;
}

/**
* Validates and parses JSON array. Non-array types are converted into single-element arrays. All
* types in the array tree must be valid variant types, see {@link
* DataValidationUtil#isAllowedSemiStructuredType}.
*
* @param input Object to validate
* @param insertRowIndex
* @return JSON array representing the input
*/
static String validateAndParseArrayNew(String columnName, Object input, long insertRowIndex) {
String result = validateAndParseSemiStructured(columnName, input, "ARRAY", insertRowIndex);
if (result.isEmpty()) {
// Empty input is ingested as an array of null
result =
JsonToken.START_ARRAY.asString()
+ JsonToken.VALUE_NULL.asString()
+ JsonToken.END_ARRAY.asString();
} else if (!result.startsWith(JsonToken.START_ARRAY.asString())) {
// Non-array values are ingested as single-element arrays, mimicking the Worksheets behavior
result = JsonToken.START_ARRAY.asString() + result + JsonToken.END_ARRAY.asString();
}

// Throw an exception if the size is too large
int stringLength = result.getBytes(StandardCharsets.UTF_8).length;
if (stringLength > MAX_SEMI_STRUCTURED_LENGTH) {
throw valueFormatNotAllowedException(
columnName,
"ARRAY",
String.format(
"Array too large. length=%d maxLength=%d", stringLength, MAX_SEMI_STRUCTURED_LENGTH),
insertRowIndex);
}
return result;
}

/**
* Validates and parses JSON object. Input is rejected if the value does not represent JSON object
* (e.g. String '{}' or Map<String, T>). All types in the object tree must be valid variant types,
Expand Down Expand Up @@ -329,6 +456,34 @@ static String validateAndParseObject(String columnName, Object input, long inser
return output;
}

/**
* Validates and parses JSON object. Input is rejected if the value does not represent JSON object
* (e.g. String '{}' or Map<String, T>). All types in the object tree must be valid variant types,
* see {@link DataValidationUtil#isAllowedSemiStructuredType}.
*
* @param input Object to validate
* @param insertRowIndex
* @return JSON object representing the input
*/
static String validateAndParseObjectNew(String columnName, Object input, long insertRowIndex) {
final String result =
validateAndParseSemiStructured(columnName, input, "OBJECT", insertRowIndex);
if (!result.startsWith(JsonToken.START_OBJECT.asString())) {
throw valueFormatNotAllowedException(columnName, "OBJECT", "Not an object", insertRowIndex);
}
// Throw an exception if the size is too large
int stringLength = result.getBytes(StandardCharsets.UTF_8).length;
if (stringLength > MAX_SEMI_STRUCTURED_LENGTH) {
throw valueFormatNotAllowedException(
columnName,
"OBJECT",
String.format(
"Object too large. length=%d maxLength=%d", stringLength, MAX_SEMI_STRUCTURED_LENGTH),
insertRowIndex);
}
return result;
}

/**
* Converts user input to offset date time, which is the canonical representation of dates and
* timestamps.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,13 @@ private float addRow(
ColumnMetadata column = parquetColumn.columnMetadata;
ParquetValueParser.ParquetBufferValue valueWithSize =
ParquetValueParser.parseColumnValueToParquet(
value, column, parquetColumn.type, forkedStats, defaultTimezone, insertRowsCurrIndex);
value,
column,
parquetColumn.type,
forkedStats,
defaultTimezone,
insertRowsCurrIndex,
clientBufferParameters.isEnableNewJsonParsingLogic());
indexedRow[colIndex] = valueWithSize.getValue();
size += valueWithSize.getSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ static ParquetBufferValue parseColumnValueToParquet(
PrimitiveType.PrimitiveTypeName typeName,
RowBufferStats stats,
ZoneId defaultTimezone,
long insertRowsCurrIndex) {
long insertRowsCurrIndex,
boolean enableNewJsonParsingLogic) {
Utils.assertNotNull("Parquet column stats", stats);
float estimatedParquetSize = 0F;
estimatedParquetSize += DEFINITION_LEVEL_ENCODING_BYTE_LEN;
Expand Down Expand Up @@ -147,7 +148,9 @@ static ParquetBufferValue parseColumnValueToParquet(
getBinaryValueForLogicalBinary(value, stats, columnMetadata, insertRowsCurrIndex);
length = ((byte[]) value).length;
} else {
String str = getBinaryValue(value, stats, columnMetadata, insertRowsCurrIndex);
String str =
getBinaryValue(
value, stats, columnMetadata, insertRowsCurrIndex, enableNewJsonParsingLogic);
value = str;
if (str != null) {
length = str.getBytes().length;
Expand Down Expand Up @@ -365,26 +368,36 @@ private static String getBinaryValue(
Object value,
RowBufferStats stats,
ColumnMetadata columnMetadata,
final long insertRowsCurrIndex) {
final long insertRowsCurrIndex,
boolean enableNewJsonParsingLogic) {
AbstractRowBuffer.ColumnLogicalType logicalType =
AbstractRowBuffer.ColumnLogicalType.valueOf(columnMetadata.getLogicalType());
String str;
if (logicalType.isObject()) {
switch (logicalType) {
case OBJECT:
str =
DataValidationUtil.validateAndParseObject(
columnMetadata.getName(), value, insertRowsCurrIndex);
enableNewJsonParsingLogic
? DataValidationUtil.validateAndParseObjectNew(
columnMetadata.getName(), value, insertRowsCurrIndex)
: DataValidationUtil.validateAndParseObject(
columnMetadata.getName(), value, insertRowsCurrIndex);
break;
case VARIANT:
str =
DataValidationUtil.validateAndParseVariant(
columnMetadata.getName(), value, insertRowsCurrIndex);
enableNewJsonParsingLogic
? DataValidationUtil.validateAndParseVariantNew(
columnMetadata.getName(), value, insertRowsCurrIndex)
: DataValidationUtil.validateAndParseVariant(
columnMetadata.getName(), value, insertRowsCurrIndex);
break;
case ARRAY:
str =
DataValidationUtil.validateAndParseArray(
columnMetadata.getName(), value, insertRowsCurrIndex);
enableNewJsonParsingLogic
? DataValidationUtil.validateAndParseArrayNew(
columnMetadata.getName(), value, insertRowsCurrIndex)
: DataValidationUtil.validateAndParseArray(
columnMetadata.getName(), value, insertRowsCurrIndex);
break;
default:
throw new SFException(
Expand Down
Loading

0 comments on commit 6e5234e

Please sign in to comment.