Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE: Make binary string encoding configurable, add support for base64 #770

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
776e52d
Make binary string encoding configurable, add support for base64
sfc-gh-kgaputis Jun 11, 2024
0e3e5f1
Merge branch 'master' into kgaputis-binarystringencoding
sfc-gh-kgaputis Jun 11, 2024
0ea5339
SNOW-1357377 Add request Id in all streaming ingest APIs (#759)
sfc-gh-japatel Jun 13, 2024
8d89112
no-snow Fix flaky test (#772)
sfc-gh-lsembera Jun 14, 2024
93b8280
SNOW-1457523: Fix CVE for snowflake-ingest-java io.airlift:aircompres…
sfc-gh-tzhang Jun 19, 2024
b54555c
Various performance improvements in the `insertRows` path (#782)
sfc-gh-psaha Jun 26, 2024
3519904
SNOW-1457523: Upgrade Parquet dependencies version to fix CVE (#779)
sfc-gh-tzhang Jun 26, 2024
b838014
SNOW-1465503 Check row count in Parquet footer before committing (#784)
sfc-gh-lsembera Jul 8, 2024
c98252e
SNOW-1512935 Retry SocketTimeoutException (#789)
sfc-gh-lsembera Jul 9, 2024
f0100f1
NO-SNOW Fix flaky test using multiple threads (#790)
sfc-gh-lsembera Jul 9, 2024
252d4c5
SNOW-1373151: Proactively refresh token to avoid token expiration exc…
sfc-gh-tzhang Jul 16, 2024
f9b5b80
Reject new stage metadata if the deployment id does not match what th…
sfc-gh-psaha Jul 16, 2024
5ffbc0b
SNOW-1512935 Reduce client socket timeout from 5 minutes to 1 minute …
sfc-gh-lsembera Jul 22, 2024
d1be60a
SNOW-1545879 Reduce the max channel/chunk sizes (#796)
sfc-gh-lsembera Jul 22, 2024
30468e7
NO-SNOW Upgrade protobuf package (#793)
sfc-gh-xhuang Jul 22, 2024
ecaec0f
V2.1.2 Release (#797)
sfc-gh-asen Jul 22, 2024
334767d
Code refactor for Iceberg support (#792)
sfc-gh-alhuang Jul 23, 2024
9656537
SNOW-1512047 Introduce independent per-table flushes when interleavin…
sfc-gh-alhuang Jul 26, 2024
985b396
SNOW-1618257 Fix PRIMARY_FILE_ID_KEY (#807)
sfc-gh-kkloudas Aug 8, 2024
c2162b9
V2.2.0 Release (#808)
sfc-gh-tzhang Aug 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public class ClientBufferParameters {

private Constants.BdecParquetCompression bdecParquetCompression;

private Constants.BinaryStringEncoding binaryStringEncoding;

/**
* Private constructor used for test methods
*
Expand All @@ -30,11 +32,13 @@ private ClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
Constants.BdecParquetCompression bdecParquetCompression,
Constants.BinaryStringEncoding binaryStringEncoding) {
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.binaryStringEncoding = binaryStringEncoding;
}

/** @param clientInternal reference to the client object where the relevant parameters are set */
Expand All @@ -55,6 +59,10 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.getParameterProvider().getBdecParquetCompressionAlgorithm()
: ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT;
this.binaryStringEncoding =
clientInternal != null
? clientInternal.getParameterProvider().getBinaryStringEncoding()
: ParameterProvider.BINARY_STRING_ENCODING_DEFAULT;
}

/**
Expand All @@ -68,12 +76,14 @@ public static ClientBufferParameters test_createClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
Constants.BdecParquetCompression bdecParquetCompression,
Constants.BinaryStringEncoding binaryStringEncoding) {
return new ClientBufferParameters(
enableParquetInternalBuffering,
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression);
bdecParquetCompression,
binaryStringEncoding);
}

public boolean getEnableParquetInternalBuffering() {
Expand All @@ -91,4 +101,8 @@ public long getMaxAllowedRowSizeInBytes() {
public Constants.BdecParquetCompression getBdecParquetCompression() {
return bdecParquetCompression;
}

public Constants.BinaryStringEncoding getBinaryStringEncoding() {
return binaryStringEncoding;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,14 @@
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no wildcard imports.

import java.util.function.Supplier;
import net.snowflake.client.jdbc.internal.google.common.collect.Sets;
import net.snowflake.client.jdbc.internal.snowflake.common.core.SnowflakeDateTimeFormat;
import net.snowflake.client.jdbc.internal.snowflake.common.util.Power10;
import net.snowflake.ingest.streaming.internal.serialization.ByteArraySerializer;
import net.snowflake.ingest.streaming.internal.serialization.ZonedDateTimeSerializer;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.codec.DecoderException;
Expand Down Expand Up @@ -615,7 +612,7 @@ static int validateAndParseDate(String columnName, Object input, long insertRowI
* @return Validated array
*/
static byte[] validateAndParseBinary(
String columnName, Object input, Optional<Integer> maxLengthOptional, long insertRowIndex) {
String columnName, Object input, Optional<Integer> maxLengthOptional, long insertRowIndex, Constants.BinaryStringEncoding binaryStringEncoding) {
byte[] output;
if (input instanceof byte[]) {
// byte[] is a mutable object, we need to create a defensive copy to protect against
Expand All @@ -625,12 +622,30 @@ static byte[] validateAndParseBinary(
output = new byte[originalInputArray.length];
System.arraycopy(originalInputArray, 0, output, 0, originalInputArray.length);
} else if (input instanceof String) {
try {
String stringInput = ((String) input).trim();
output = Hex.decodeHex(stringInput);
} catch (DecoderException e) {
if(binaryStringEncoding == Constants.BinaryStringEncoding.BASE64) {
try {
String stringInput = ((String) input).trim();
// Remove double quotes if present
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not strip surrounding quotes. We don't do it for hex and neither does it work in Snowflake Worksheets.

if (stringInput.length() >= 2 && stringInput.startsWith("\"") && stringInput.endsWith("\"")) {
stringInput = stringInput.substring(1, stringInput.length() - 1);
}
Base64.Decoder decoder = Base64.getDecoder();
output = decoder.decode(stringInput);
} catch (IllegalArgumentException e) {
throw valueFormatNotAllowedException(
columnName, "BINARY", "Not a valid base64 string", insertRowIndex);
}
} else if (binaryStringEncoding == Constants.BinaryStringEncoding.HEX) {
try {
String stringInput = ((String) input).trim();
output = Hex.decodeHex(stringInput);
} catch (DecoderException e) {
throw valueFormatNotAllowedException(
columnName, "BINARY", "Not a valid hex string", insertRowIndex);
}
} else {
throw valueFormatNotAllowedException(
columnName, "BINARY", "Not a valid hex string", insertRowIndex);
columnName, "BINARY", "Unsupported binary string format " + binaryStringEncoding.name(), insertRowIndex);
}
} else {
throw typeNotAllowedException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ private float addRow(
ColumnMetadata column = parquetColumn.columnMetadata;
ParquetValueParser.ParquetBufferValue valueWithSize =
ParquetValueParser.parseColumnValueToParquet(
value, column, parquetColumn.type, forkedStats, defaultTimezone, insertRowsCurrIndex);
value, column, parquetColumn.type, forkedStats, defaultTimezone, insertRowsCurrIndex, clientBufferParameters.getBinaryStringEncoding());
indexedRow[colIndex] = valueWithSize.getValue();
size += valueWithSize.getSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.time.ZoneId;
import java.util.Optional;
import javax.annotation.Nullable;

import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;
Expand Down Expand Up @@ -80,12 +82,12 @@ float getSize() {
* @return parsed value and byte size of Parquet internal representation
*/
static ParquetBufferValue parseColumnValueToParquet(
Object value,
ColumnMetadata columnMetadata,
PrimitiveType.PrimitiveTypeName typeName,
RowBufferStats stats,
ZoneId defaultTimezone,
long insertRowsCurrIndex) {
Object value,
ColumnMetadata columnMetadata,
PrimitiveType.PrimitiveTypeName typeName,
RowBufferStats stats,
ZoneId defaultTimezone,
long insertRowsCurrIndex, Constants.BinaryStringEncoding binaryStringEncoding) {
Utils.assertNotNull("Parquet column stats", stats);
float estimatedParquetSize = 0F;
estimatedParquetSize += DEFINITION_LEVEL_ENCODING_BYTE_LEN;
Expand Down Expand Up @@ -144,7 +146,7 @@ static ParquetBufferValue parseColumnValueToParquet(
int length = 0;
if (logicalType == AbstractRowBuffer.ColumnLogicalType.BINARY) {
value =
getBinaryValueForLogicalBinary(value, stats, columnMetadata, insertRowsCurrIndex);
getBinaryValueForLogicalBinary(value, stats, columnMetadata, insertRowsCurrIndex, binaryStringEncoding);
length = ((byte[]) value).length;
} else {
String str = getBinaryValue(value, stats, columnMetadata, insertRowsCurrIndex);
Expand Down Expand Up @@ -414,14 +416,16 @@ private static byte[] getBinaryValueForLogicalBinary(
Object value,
RowBufferStats stats,
ColumnMetadata columnMetadata,
final long insertRowsCurrIndex) {
final long insertRowsCurrIndex,
final Constants.BinaryStringEncoding binaryStringEncoding) {
String maxLengthString = columnMetadata.getByteLength().toString();
byte[] bytes =
DataValidationUtil.validateAndParseBinary(
columnMetadata.getName(),
value,
Optional.of(maxLengthString).map(Integer::parseInt),
insertRowsCurrIndex);
insertRowsCurrIndex, binaryStringEncoding
);
stats.addBinaryValue(bytes);
return bytes;
}
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,24 @@ public static BdecParquetCompression fromName(String name) {
name, Arrays.asList(BdecParquetCompression.values())));
}
}

public enum BinaryStringEncoding {
HEX,
BASE64;

public static BinaryStringEncoding fromName(String name) {
for (BinaryStringEncoding e : BinaryStringEncoding.values()) {
if (e.name().toLowerCase().equals(name.toLowerCase())) {
return e;
}
}
throw new IllegalArgumentException(
String.format(
"Unsupported BinaryStringEncoding = '%s', allowed values are %s",
name, Arrays.asList(BinaryStringEncoding.values())));
}
}

// Parameters
public static final boolean DISABLE_BACKGROUND_FLUSH = false;
public static final boolean COMPRESS_BLOB_TWICE = false;
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class ParameterProvider {
public static final String BDEC_PARQUET_COMPRESSION_ALGORITHM =
"BDEC_PARQUET_COMPRESSION_ALGORITHM".toLowerCase();

public static final String BINARY_STRING_ENCODING =
"BINARY_STRING_ENCODING".toLowerCase();

// Default values
public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100;
public static final long INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT = 1000;
Expand All @@ -64,6 +67,9 @@ public class ParameterProvider {
public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT =
Constants.BdecParquetCompression.GZIP;

public static final Constants.BinaryStringEncoding BINARY_STRING_ENCODING_DEFAULT =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add tests to ParameterProviderTest.

Constants.BinaryStringEncoding.HEX;

/* Parameter that enables using internal Parquet buffers for buffering of rows before serializing.
It reduces memory consumption compared to using Java Objects for buffering.*/
public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false;
Expand Down Expand Up @@ -188,6 +194,13 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties
BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT,
parameterOverrides,
props);

this.updateValue(
BINARY_STRING_ENCODING,
BINARY_STRING_ENCODING_DEFAULT,
parameterOverrides,
props);

}

/** @return Longest interval in milliseconds between buffer flushes */
Expand Down Expand Up @@ -407,6 +420,18 @@ public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() {
return Constants.BdecParquetCompression.fromName((String) val);
}

/** @return binary string encoding */
public Constants.BinaryStringEncoding getBinaryStringEncoding() {
Object val =
this.parameterMap.getOrDefault(
BINARY_STRING_ENCODING, BINARY_STRING_ENCODING_DEFAULT);
if (val instanceof Constants.BinaryStringEncoding) {
return (Constants.BinaryStringEncoding) val;
}
return Constants.BinaryStringEncoding.fromName((String) val);
}


@Override
public String toString() {
return "ParameterProvider{" + "parameterMap=" + parameterMap + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;

import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.codec.DecoderException;
Expand Down Expand Up @@ -888,76 +890,76 @@ public void testValidateAndParseBinary() throws DecoderException {
assertArrayEquals(
"honk".getBytes(StandardCharsets.UTF_8),
validateAndParseBinary(
"COL", "honk".getBytes(StandardCharsets.UTF_8), Optional.empty(), 0));
"COL", "honk".getBytes(StandardCharsets.UTF_8), Optional.empty(), 0, Constants.BinaryStringEncoding.HEX));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the same test coverage for base64, as we have for hex.


assertArrayEquals(
new byte[] {-1, 0, 1},
validateAndParseBinary("COL", new byte[] {-1, 0, 1}, Optional.empty(), 0));
validateAndParseBinary("COL", new byte[] {-1, 0, 1}, Optional.empty(), 0, Constants.BinaryStringEncoding.HEX));
assertArrayEquals(
Hex.decodeHex("1234567890abcdef"), // pragma: allowlist secret NOT A SECRET
validateAndParseBinary(
"COL",
"1234567890abcdef", // pragma: allowlist secret NOT A SECRET
Optional.empty(),
0)); // pragma: allowlist secret NOT A SECRET
0, Constants.BinaryStringEncoding.HEX)); // pragma: allowlist secret NOT A SECRET
assertArrayEquals(
Hex.decodeHex("1234567890abcdef"), // pragma: allowlist secret NOT A SECRET
validateAndParseBinary(
"COL",
" 1234567890abcdef \t\n",
Optional.empty(),
0)); // pragma: allowlist secret NOT A SECRET
0, Constants.BinaryStringEncoding.HEX)); // pragma: allowlist secret NOT A SECRET

assertArrayEquals(
maxAllowedArray, validateAndParseBinary("COL", maxAllowedArray, Optional.empty(), 0));
maxAllowedArray, validateAndParseBinary("COL", maxAllowedArray, Optional.empty(), 0, Constants.BinaryStringEncoding.HEX));
assertArrayEquals(
maxAllowedArrayMinusOne,
validateAndParseBinary("COL", maxAllowedArrayMinusOne, Optional.empty(), 0));
validateAndParseBinary("COL", maxAllowedArrayMinusOne, Optional.empty(), 0, Constants.BinaryStringEncoding.HEX));

// Too large arrays should be rejected
expectError(
ErrorCode.INVALID_VALUE_ROW,
() -> validateAndParseBinary("COL", new byte[1], Optional.of(0), 0));
() -> validateAndParseBinary("COL", new byte[1], Optional.of(0), 0, Constants.BinaryStringEncoding.HEX));
expectError(
ErrorCode.INVALID_VALUE_ROW,
() -> validateAndParseBinary("COL", new byte[BYTES_8_MB + 1], Optional.empty(), 0));
() -> validateAndParseBinary("COL", new byte[BYTES_8_MB + 1], Optional.empty(), 0, Constants.BinaryStringEncoding.HEX));
expectError(
ErrorCode.INVALID_VALUE_ROW,
() -> validateAndParseBinary("COL", new byte[8], Optional.of(7), 0));
() -> validateAndParseBinary("COL", new byte[8], Optional.of(7), 0, Constants.BinaryStringEncoding.HEX));
expectError(
ErrorCode.INVALID_VALUE_ROW,
() -> validateAndParseBinary("COL", "aabb", Optional.of(1), 0));
() -> validateAndParseBinary("COL", "aabb", Optional.of(1), 0, Constants.BinaryStringEncoding.HEX));

// unsupported data types should fail
expectError(
ErrorCode.INVALID_VALUE_ROW,
() -> validateAndParseBinary("COL", "000", Optional.empty(), 0));
() -> validateAndParseBinary("COL", "000", Optional.empty(), 0, Constants.BinaryStringEncoding.HEX));
expectError(
ErrorCode.INVALID_VALUE_ROW,
() -> validateAndParseBinary("COL", "abcg", Optional.empty(), 0));
() -> validateAndParseBinary("COL", "abcg", Optional.empty(), 0, Constants.BinaryStringEncoding.HEX));
expectError(
ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseBinary("COL", "c", Optional.empty(), 0));
ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseBinary("COL", "c", Optional.empty(), 0, Constants.BinaryStringEncoding.HEX));
expectError(
ErrorCode.INVALID_FORMAT_ROW,
() ->
validateAndParseBinary(
"COL", Arrays.asList((byte) 1, (byte) 2, (byte) 3), Optional.empty(), 0));
"COL", Arrays.asList((byte) 1, (byte) 2, (byte) 3), Optional.empty(), 0, Constants.BinaryStringEncoding.HEX));
expectError(
ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseBinary("COL", 1, Optional.empty(), 0));
ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseBinary("COL", 1, Optional.empty(), 0, Constants.BinaryStringEncoding.HEX));
expectError(
ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseBinary("COL", 12, Optional.empty(), 0));
ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseBinary("COL", 12, Optional.empty(), 0, Constants.BinaryStringEncoding.HEX));
expectError(
ErrorCode.INVALID_FORMAT_ROW,
() -> validateAndParseBinary("COL", 1.5, Optional.empty(), 0));
() -> validateAndParseBinary("COL", 1.5, Optional.empty(), 0, Constants.BinaryStringEncoding.HEX));
expectError(
ErrorCode.INVALID_FORMAT_ROW,
() -> validateAndParseBinary("COL", BigInteger.ONE, Optional.empty(), 0));
() -> validateAndParseBinary("COL", BigInteger.ONE, Optional.empty(), 0, Constants.BinaryStringEncoding.HEX));
expectError(
ErrorCode.INVALID_FORMAT_ROW,
() -> validateAndParseBinary("COL", false, Optional.empty(), 0));
() -> validateAndParseBinary("COL", false, Optional.empty(), 0, Constants.BinaryStringEncoding.HEX));
expectError(
ErrorCode.INVALID_FORMAT_ROW,
() -> validateAndParseBinary("COL", new Object(), Optional.empty(), 0));
() -> validateAndParseBinary("COL", new Object(), Optional.empty(), 0, Constants.BinaryStringEncoding.HEX));
}

@Test
Expand Down Expand Up @@ -1179,19 +1181,19 @@ public void testExceptionMessages() {
"The given row cannot be converted to the internal format: Object of type java.lang.Object"
+ " cannot be ingested into Snowflake column COL of type BINARY, rowIndex:0. Allowed"
+ " Java types: byte[], String",
() -> validateAndParseBinary("COL", new Object(), Optional.empty(), 0));
() -> validateAndParseBinary("COL", new Object(), Optional.empty(), 0, Constants.BinaryStringEncoding.HEX));
expectErrorCodeAndMessage(
ErrorCode.INVALID_VALUE_ROW,
"The given row cannot be converted to the internal format due to invalid value: Value"
+ " cannot be ingested into Snowflake column COL of type BINARY, rowIndex:0, reason:"
+ " Binary too long: length=2 maxLength=1",
() -> validateAndParseBinary("COL", new byte[] {1, 2}, Optional.of(1), 0));
() -> validateAndParseBinary("COL", new byte[] {1, 2}, Optional.of(1), 0, Constants.BinaryStringEncoding.HEX));
expectErrorCodeAndMessage(
ErrorCode.INVALID_VALUE_ROW,
"The given row cannot be converted to the internal format due to invalid value: Value"
+ " cannot be ingested into Snowflake column COL of type BINARY, rowIndex:0, reason:"
+ " Not a valid hex string",
() -> validateAndParseBinary("COL", "ghi", Optional.empty(), 0));
() -> validateAndParseBinary("COL", "ghi", Optional.empty(), 0, Constants.BinaryStringEncoding.HEX));

// VARIANT
expectErrorCodeAndMessage(
Expand Down
Loading
Loading