Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V2.2.2 release with behavior change reverted #827

Merged
merged 6 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -450,10 +450,6 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ public class ClientBufferParameters {

private long maxAllowedRowSizeInBytes;

private final boolean enableNewJsonParsingLogic;

private Constants.BdecParquetCompression bdecParquetCompression;

/**
Expand All @@ -32,13 +30,11 @@ private ClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic) {
Constants.BdecParquetCompression bdecParquetCompression) {
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.enableNewJsonParsingLogic = enableNewJsonParsingLogic;
}

/** @param clientInternal reference to the client object where the relevant parameters are set */
Expand All @@ -59,11 +55,6 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.getParameterProvider().getBdecParquetCompressionAlgorithm()
: ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT;

this.enableNewJsonParsingLogic =
clientInternal != null
? clientInternal.getParameterProvider().isEnableNewJsonParsingLogic()
: ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT;
}

/**
Expand All @@ -77,14 +68,12 @@ public static ClientBufferParameters test_createClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic) {
Constants.BdecParquetCompression bdecParquetCompression) {
return new ClientBufferParameters(
enableParquetInternalBuffering,
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression,
enableNewJsonParsingLogic);
bdecParquetCompression);
}

public boolean getEnableParquetInternalBuffering() {
Expand All @@ -102,8 +91,4 @@ public long getMaxAllowedRowSizeInBytes() {
public Constants.BdecParquetCompression getBdecParquetCompression() {
return bdecParquetCompression;
}

public boolean isEnableNewJsonParsingLogic() {
return enableNewJsonParsingLogic;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,11 @@

import static net.snowflake.ingest.streaming.internal.BinaryStringUtils.unicodeCharactersCount;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -45,7 +39,6 @@
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.output.StringBuilderWriter;

/** Utility class for parsing and validating inputs based on Snowflake types */
class DataValidationUtil {
Expand Down Expand Up @@ -77,8 +70,6 @@ class DataValidationUtil {

private static final ObjectMapper objectMapper = new ObjectMapper();

private static final JsonFactory factory = new JsonFactory();

// The version of Jackson we are using does not support serialization of date objects from the
// java.time package. Here we define a module with custom java.time serializers. Additionally, we
// define custom serializer for byte[] because the Jackson default is to serialize it as
Expand Down Expand Up @@ -144,61 +135,6 @@ private static JsonNode validateAndParseSemiStructuredAsJsonTree(
insertRowIndex);
}

/**
* Validates and parses input as JSON. All types in the object tree must be valid variant types,
* see {@link DataValidationUtil#isAllowedSemiStructuredType}.
*
* @param input Object to validate
* @return Minified JSON string
*/
private static String validateAndParseSemiStructured(
String columnName, Object input, String snowflakeType, final long insertRowIndex) {
if (input instanceof String) {
final String stringInput = (String) input;
verifyValidUtf8(stringInput, columnName, snowflakeType, insertRowIndex);
final StringBuilderWriter resultWriter = new StringBuilderWriter(stringInput.length());
try (final JsonParser parser = factory.createParser(stringInput);
final JsonGenerator generator = factory.createGenerator(resultWriter)) {
while (parser.nextToken() != null) {
final JsonToken token = parser.currentToken();
if (token.isNumeric()) {
// If the current token is a number, we cannot just copy the current event because it
// would write token the token from double (or big decimal), whose scientific notation
// may have been altered during deserialization. We want to preserve the scientific
// notation from the user input, so we write the current numer as text.
generator.writeNumber(parser.getText());
} else {
generator.copyCurrentEvent(parser);
}
}
} catch (JsonParseException e) {
throw valueFormatNotAllowedException(
columnName, snowflakeType, "Not a valid JSON", insertRowIndex);
} catch (IOException e) {
throw new SFException(e, ErrorCode.IO_ERROR, "Cannot create JSON Parser or JSON generator");
}
// We return the minified string from the result writer
return resultWriter.toString();
} else if (isAllowedSemiStructuredType(input)) {
JsonNode node = objectMapper.valueToTree(input);
return node.toString();
}

throw typeNotAllowedException(
columnName,
input.getClass(),
snowflakeType,
new String[] {
"String",
"Primitive data types and their arrays",
"java.time.*",
"List<T>",
"Map<String, T>",
"T[]"
},
insertRowIndex);
}

/**
* Validates and parses input as JSON. All types in the object tree must be valid variant types,
* see {@link DataValidationUtil#isAllowedSemiStructuredType}.
Expand Down Expand Up @@ -229,34 +165,6 @@ static String validateAndParseVariant(String columnName, Object input, long inse
return output;
}

/**
* Validates and parses input as JSON. All types in the object tree must be valid variant types,
* see {@link DataValidationUtil#isAllowedSemiStructuredType}.
*
* @param input Object to validate
* @param insertRowIndex
* @return JSON string representing the input
*/
static String validateAndParseVariantNew(String columnName, Object input, long insertRowIndex) {
final String result =
validateAndParseSemiStructured(columnName, input, "VARIANT", insertRowIndex);

// Empty json strings are ingested as nulls
if (result.isEmpty()) {
return null;
}
int stringLength = result.getBytes(StandardCharsets.UTF_8).length;
if (stringLength > MAX_SEMI_STRUCTURED_LENGTH) {
throw valueFormatNotAllowedException(
columnName,
"VARIANT",
String.format(
"Variant too long: length=%d maxLength=%d", stringLength, MAX_SEMI_STRUCTURED_LENGTH),
insertRowIndex);
}
return result;
}

/**
* Validates that passed object is allowed data type for semi-structured columns (i.e. VARIANT,
* ARRAY, OBJECT). For non-trivial types like maps, arrays or lists, it recursively traverses the
Expand Down Expand Up @@ -391,41 +299,6 @@ static String validateAndParseArray(String columnName, Object input, long insert
return output;
}

/**
* Validates and parses JSON array. Non-array types are converted into single-element arrays. All
* types in the array tree must be valid variant types, see {@link
* DataValidationUtil#isAllowedSemiStructuredType}.
*
* @param input Object to validate
* @param insertRowIndex
* @return JSON array representing the input
*/
static String validateAndParseArrayNew(String columnName, Object input, long insertRowIndex) {
String result = validateAndParseSemiStructured(columnName, input, "ARRAY", insertRowIndex);
if (result.isEmpty()) {
// Empty input is ingested as an array of null
result =
JsonToken.START_ARRAY.asString()
+ JsonToken.VALUE_NULL.asString()
+ JsonToken.END_ARRAY.asString();
} else if (!result.startsWith(JsonToken.START_ARRAY.asString())) {
// Non-array values are ingested as single-element arrays, mimicking the Worksheets behavior
result = JsonToken.START_ARRAY.asString() + result + JsonToken.END_ARRAY.asString();
}

// Throw an exception if the size is too large
int stringLength = result.getBytes(StandardCharsets.UTF_8).length;
if (stringLength > MAX_SEMI_STRUCTURED_LENGTH) {
throw valueFormatNotAllowedException(
columnName,
"ARRAY",
String.format(
"Array too large. length=%d maxLength=%d", stringLength, MAX_SEMI_STRUCTURED_LENGTH),
insertRowIndex);
}
return result;
}

/**
* Validates and parses JSON object. Input is rejected if the value does not represent JSON object
* (e.g. String '{}' or Map<String, T>). All types in the object tree must be valid variant types,
Expand Down Expand Up @@ -456,34 +329,6 @@ static String validateAndParseObject(String columnName, Object input, long inser
return output;
}

/**
* Validates and parses JSON object. Input is rejected if the value does not represent JSON object
* (e.g. String '{}' or Map<String, T>). All types in the object tree must be valid variant types,
* see {@link DataValidationUtil#isAllowedSemiStructuredType}.
*
* @param input Object to validate
* @param insertRowIndex
* @return JSON object representing the input
*/
static String validateAndParseObjectNew(String columnName, Object input, long insertRowIndex) {
final String result =
validateAndParseSemiStructured(columnName, input, "OBJECT", insertRowIndex);
if (!result.startsWith(JsonToken.START_OBJECT.asString())) {
throw valueFormatNotAllowedException(columnName, "OBJECT", "Not an object", insertRowIndex);
}
// Throw an exception if the size is too large
int stringLength = result.getBytes(StandardCharsets.UTF_8).length;
if (stringLength > MAX_SEMI_STRUCTURED_LENGTH) {
throw valueFormatNotAllowedException(
columnName,
"OBJECT",
String.format(
"Object too large. length=%d maxLength=%d", stringLength, MAX_SEMI_STRUCTURED_LENGTH),
insertRowIndex);
}
return result;
}

/**
* Converts user input to offset date time, which is the canonical representation of dates and
* timestamps.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,7 @@ private float addRow(
ColumnMetadata column = parquetColumn.columnMetadata;
ParquetValueParser.ParquetBufferValue valueWithSize =
ParquetValueParser.parseColumnValueToParquet(
value,
column,
parquetColumn.type,
forkedStats,
defaultTimezone,
insertRowsCurrIndex,
clientBufferParameters.isEnableNewJsonParsingLogic());
value, column, parquetColumn.type, forkedStats, defaultTimezone, insertRowsCurrIndex);
indexedRow[colIndex] = valueWithSize.getValue();
size += valueWithSize.getSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ static ParquetBufferValue parseColumnValueToParquet(
PrimitiveType.PrimitiveTypeName typeName,
RowBufferStats stats,
ZoneId defaultTimezone,
long insertRowsCurrIndex,
boolean enableNewJsonParsingLogic) {
long insertRowsCurrIndex) {
Utils.assertNotNull("Parquet column stats", stats);
float estimatedParquetSize = 0F;
estimatedParquetSize += DEFINITION_LEVEL_ENCODING_BYTE_LEN;
Expand Down Expand Up @@ -148,9 +147,7 @@ static ParquetBufferValue parseColumnValueToParquet(
getBinaryValueForLogicalBinary(value, stats, columnMetadata, insertRowsCurrIndex);
length = ((byte[]) value).length;
} else {
String str =
getBinaryValue(
value, stats, columnMetadata, insertRowsCurrIndex, enableNewJsonParsingLogic);
String str = getBinaryValue(value, stats, columnMetadata, insertRowsCurrIndex);
value = str;
if (str != null) {
length = str.getBytes().length;
Expand Down Expand Up @@ -368,36 +365,26 @@ private static String getBinaryValue(
Object value,
RowBufferStats stats,
ColumnMetadata columnMetadata,
final long insertRowsCurrIndex,
boolean enableNewJsonParsingLogic) {
final long insertRowsCurrIndex) {
AbstractRowBuffer.ColumnLogicalType logicalType =
AbstractRowBuffer.ColumnLogicalType.valueOf(columnMetadata.getLogicalType());
String str;
if (logicalType.isObject()) {
switch (logicalType) {
case OBJECT:
str =
enableNewJsonParsingLogic
? DataValidationUtil.validateAndParseObjectNew(
columnMetadata.getName(), value, insertRowsCurrIndex)
: DataValidationUtil.validateAndParseObject(
columnMetadata.getName(), value, insertRowsCurrIndex);
DataValidationUtil.validateAndParseObject(
columnMetadata.getName(), value, insertRowsCurrIndex);
break;
case VARIANT:
str =
enableNewJsonParsingLogic
? DataValidationUtil.validateAndParseVariantNew(
columnMetadata.getName(), value, insertRowsCurrIndex)
: DataValidationUtil.validateAndParseVariant(
columnMetadata.getName(), value, insertRowsCurrIndex);
DataValidationUtil.validateAndParseVariant(
columnMetadata.getName(), value, insertRowsCurrIndex);
break;
case ARRAY:
str =
enableNewJsonParsingLogic
? DataValidationUtil.validateAndParseArrayNew(
columnMetadata.getName(), value, insertRowsCurrIndex)
: DataValidationUtil.validateAndParseArray(
columnMetadata.getName(), value, insertRowsCurrIndex);
DataValidationUtil.validateAndParseArray(
columnMetadata.getName(), value, insertRowsCurrIndex);
break;
default:
throw new SFException(
Expand Down
Loading
Loading