Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-tzhang committed Sep 30, 2024
2 parents 856fc25 + 579cbf3 commit 2931107
Show file tree
Hide file tree
Showing 12 changed files with 134 additions and 669 deletions.
24 changes: 12 additions & 12 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -450,10 +450,6 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core -->
<dependency>
Expand Down Expand Up @@ -784,8 +780,9 @@
<ignoreNonCompile>true</ignoreNonCompile>
<ignoredDependencies>
<!-- We defined these as direct dependencies (as opposed to just declaring it in dependencyManagement)
to workaround https://issues.apache.org/jira/browse/MNG-7982. Now the dependency analyzer complains that
the dependency is unused, so we ignore it here-->
to workaround https://issues.apache.org/jira/browse/MNG-7982. Now the dependency
analyzer complains that
the dependency is unused, so we ignore it here-->
<ignoredDependency>org.apache.commons:commons-compress</ignoredDependency>
<ignoredDependency>org.apache.commons:commons-configuration2</ignoredDependency>
</ignoredDependencies>
Expand Down Expand Up @@ -880,9 +877,10 @@
<configuration>
<errorRemedy>failFast</errorRemedy>
<!--
The list of allowed licenses. If you see the build failing due to "There are some forbidden licenses used, please
check your dependencies", verify the conditions of the license and add the reference to it here.
-->
The list of allowed licenses. If you see the build failing due to "There are some forbidden licenses
used, please
check your dependencies", verify the conditions of the license and add the reference to it here.
-->
<includedLicenses>
<includedLicense>Apache License 2.0</includedLicense>
<includedLicense>BSD 2-Clause License</includedLicense>
Expand Down Expand Up @@ -1195,9 +1193,11 @@
</executions>
</plugin>
<!--
Plugin executes license processing Python script, which copies third party license files into the directory
target/generated-licenses-info/META-INF/third-party-licenses, which is then included in the shaded JAR.
-->
Plugin executes license processing Python script, which copies third party license files into the
directory
target/generated-licenses-info/META-INF/third-party-licenses, which is then included in the shaded
JAR.
-->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ public class ClientBufferParameters {

private long maxAllowedRowSizeInBytes;

private final boolean enableNewJsonParsingLogic;

private Constants.BdecParquetCompression bdecParquetCompression;

/**
Expand All @@ -32,13 +30,11 @@ private ClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic) {
Constants.BdecParquetCompression bdecParquetCompression) {
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.enableNewJsonParsingLogic = enableNewJsonParsingLogic;
}

/** @param clientInternal reference to the client object where the relevant parameters are set */
Expand All @@ -59,11 +55,6 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.getParameterProvider().getBdecParquetCompressionAlgorithm()
: ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT;

this.enableNewJsonParsingLogic =
clientInternal != null
? clientInternal.getParameterProvider().isEnableNewJsonParsingLogic()
: ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT;
}

/**
Expand All @@ -77,14 +68,12 @@ public static ClientBufferParameters test_createClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic) {
Constants.BdecParquetCompression bdecParquetCompression) {
return new ClientBufferParameters(
enableParquetInternalBuffering,
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression,
enableNewJsonParsingLogic);
bdecParquetCompression);
}

public boolean getEnableParquetInternalBuffering() {
Expand All @@ -102,8 +91,4 @@ public long getMaxAllowedRowSizeInBytes() {
public Constants.BdecParquetCompression getBdecParquetCompression() {
return bdecParquetCompression;
}

public boolean isEnableNewJsonParsingLogic() {
return enableNewJsonParsingLogic;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,11 @@

import static net.snowflake.ingest.streaming.internal.BinaryStringUtils.unicodeCharactersCount;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -45,7 +39,6 @@
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.output.StringBuilderWriter;

/** Utility class for parsing and validating inputs based on Snowflake types */
class DataValidationUtil {
Expand Down Expand Up @@ -77,8 +70,6 @@ class DataValidationUtil {

private static final ObjectMapper objectMapper = new ObjectMapper();

private static final JsonFactory factory = new JsonFactory();

// The version of Jackson we are using does not support serialization of date objects from the
// java.time package. Here we define a module with custom java.time serializers. Additionally, we
// define custom serializer for byte[] because the Jackson default is to serialize it as
Expand Down Expand Up @@ -144,61 +135,6 @@ private static JsonNode validateAndParseSemiStructuredAsJsonTree(
insertRowIndex);
}

/**
* Validates and parses input as JSON. All types in the object tree must be valid variant types,
* see {@link DataValidationUtil#isAllowedSemiStructuredType}.
*
* @param input Object to validate
* @return Minified JSON string
*/
private static String validateAndParseSemiStructured(
String columnName, Object input, String snowflakeType, final long insertRowIndex) {
if (input instanceof String) {
final String stringInput = (String) input;
verifyValidUtf8(stringInput, columnName, snowflakeType, insertRowIndex);
final StringBuilderWriter resultWriter = new StringBuilderWriter(stringInput.length());
try (final JsonParser parser = factory.createParser(stringInput);
final JsonGenerator generator = factory.createGenerator(resultWriter)) {
while (parser.nextToken() != null) {
final JsonToken token = parser.currentToken();
if (token.isNumeric()) {
// If the current token is a number, we cannot just copy the current event because it
// would write token the token from double (or big decimal), whose scientific notation
// may have been altered during deserialization. We want to preserve the scientific
// notation from the user input, so we write the current numer as text.
generator.writeNumber(parser.getText());
} else {
generator.copyCurrentEvent(parser);
}
}
} catch (JsonParseException e) {
throw valueFormatNotAllowedException(
columnName, snowflakeType, "Not a valid JSON", insertRowIndex);
} catch (IOException e) {
throw new SFException(e, ErrorCode.IO_ERROR, "Cannot create JSON Parser or JSON generator");
}
// We return the minified string from the result writer
return resultWriter.toString();
} else if (isAllowedSemiStructuredType(input)) {
JsonNode node = objectMapper.valueToTree(input);
return node.toString();
}

throw typeNotAllowedException(
columnName,
input.getClass(),
snowflakeType,
new String[] {
"String",
"Primitive data types and their arrays",
"java.time.*",
"List<T>",
"Map<String, T>",
"T[]"
},
insertRowIndex);
}

/**
* Validates and parses input as JSON. All types in the object tree must be valid variant types,
* see {@link DataValidationUtil#isAllowedSemiStructuredType}.
Expand Down Expand Up @@ -229,34 +165,6 @@ static String validateAndParseVariant(String columnName, Object input, long inse
return output;
}

/**
* Validates and parses input as JSON. All types in the object tree must be valid variant types,
* see {@link DataValidationUtil#isAllowedSemiStructuredType}.
*
* @param input Object to validate
* @param insertRowIndex
* @return JSON string representing the input
*/
static String validateAndParseVariantNew(String columnName, Object input, long insertRowIndex) {
final String result =
validateAndParseSemiStructured(columnName, input, "VARIANT", insertRowIndex);

// Empty json strings are ingested as nulls
if (result.isEmpty()) {
return null;
}
int stringLength = result.getBytes(StandardCharsets.UTF_8).length;
if (stringLength > MAX_SEMI_STRUCTURED_LENGTH) {
throw valueFormatNotAllowedException(
columnName,
"VARIANT",
String.format(
"Variant too long: length=%d maxLength=%d", stringLength, MAX_SEMI_STRUCTURED_LENGTH),
insertRowIndex);
}
return result;
}

/**
* Validates that passed object is allowed data type for semi-structured columns (i.e. VARIANT,
* ARRAY, OBJECT). For non-trivial types like maps, arrays or lists, it recursively traverses the
Expand Down Expand Up @@ -391,41 +299,6 @@ static String validateAndParseArray(String columnName, Object input, long insert
return output;
}

/**
* Validates and parses JSON array. Non-array types are converted into single-element arrays. All
* types in the array tree must be valid variant types, see {@link
* DataValidationUtil#isAllowedSemiStructuredType}.
*
* @param input Object to validate
* @param insertRowIndex
* @return JSON array representing the input
*/
static String validateAndParseArrayNew(String columnName, Object input, long insertRowIndex) {
String result = validateAndParseSemiStructured(columnName, input, "ARRAY", insertRowIndex);
if (result.isEmpty()) {
// Empty input is ingested as an array of null
result =
JsonToken.START_ARRAY.asString()
+ JsonToken.VALUE_NULL.asString()
+ JsonToken.END_ARRAY.asString();
} else if (!result.startsWith(JsonToken.START_ARRAY.asString())) {
// Non-array values are ingested as single-element arrays, mimicking the Worksheets behavior
result = JsonToken.START_ARRAY.asString() + result + JsonToken.END_ARRAY.asString();
}

// Throw an exception if the size is too large
int stringLength = result.getBytes(StandardCharsets.UTF_8).length;
if (stringLength > MAX_SEMI_STRUCTURED_LENGTH) {
throw valueFormatNotAllowedException(
columnName,
"ARRAY",
String.format(
"Array too large. length=%d maxLength=%d", stringLength, MAX_SEMI_STRUCTURED_LENGTH),
insertRowIndex);
}
return result;
}

/**
* Validates and parses JSON object. Input is rejected if the value does not represent JSON object
* (e.g. String '{}' or Map<String, T>). All types in the object tree must be valid variant types,
Expand Down Expand Up @@ -456,34 +329,6 @@ static String validateAndParseObject(String columnName, Object input, long inser
return output;
}

/**
* Validates and parses JSON object. Input is rejected if the value does not represent JSON object
* (e.g. String '{}' or Map<String, T>). All types in the object tree must be valid variant types,
* see {@link DataValidationUtil#isAllowedSemiStructuredType}.
*
* @param input Object to validate
* @param insertRowIndex
* @return JSON object representing the input
*/
static String validateAndParseObjectNew(String columnName, Object input, long insertRowIndex) {
final String result =
validateAndParseSemiStructured(columnName, input, "OBJECT", insertRowIndex);
if (!result.startsWith(JsonToken.START_OBJECT.asString())) {
throw valueFormatNotAllowedException(columnName, "OBJECT", "Not an object", insertRowIndex);
}
// Throw an exception if the size is too large
int stringLength = result.getBytes(StandardCharsets.UTF_8).length;
if (stringLength > MAX_SEMI_STRUCTURED_LENGTH) {
throw valueFormatNotAllowedException(
columnName,
"OBJECT",
String.format(
"Object too large. length=%d maxLength=%d", stringLength, MAX_SEMI_STRUCTURED_LENGTH),
insertRowIndex);
}
return result;
}

/**
* Converts user input to offset date time, which is the canonical representation of dates and
* timestamps.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,7 @@ private float addRow(
ColumnMetadata column = parquetColumn.columnMetadata;
ParquetValueParser.ParquetBufferValue valueWithSize =
ParquetValueParser.parseColumnValueToParquet(
value,
column,
parquetColumn.type,
forkedStats,
defaultTimezone,
insertRowsCurrIndex,
clientBufferParameters.isEnableNewJsonParsingLogic());
value, column, parquetColumn.type, forkedStats, defaultTimezone, insertRowsCurrIndex);
indexedRow[colIndex] = valueWithSize.getValue();
size += valueWithSize.getSize();
}
Expand Down
Loading

0 comments on commit 2931107

Please sign in to comment.