Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-987122 Upgrade JDBC to 3.14.5 and Catch new exception type for r… #677

Merged
merged 11 commits into from
Feb 21, 2024
Merged
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ authentication. Currently, we support ingestion through the following APIs:

The Snowflake Ingest Service SDK depends on the following libraries:

* snowflake-jdbc (3.13.30 to 3.13.33)
* snowflake-jdbc (3.13.30 to 3.14.5)
* slf4j-api
* com.github.luben:zstd-jni (1.5.0-1)

Expand All @@ -24,7 +24,7 @@ using a build system, please make sure these dependencies are on the classpath.

# Prerequisites

**If your project depends on the Snowflake JDBC driver, as well, please make sure the JDBC driver version is 3.13.30 to 3.13.33. JDBC driver version 3.14.0 or higher is currently not supported. **
**If your project depends on the Snowflake JDBC driver, as well, please make sure the JDBC driver version is 3.13.30 to 3.14.5.
sfc-gh-japatel marked this conversation as resolved.
Show resolved Hide resolved

## Java 8+

Expand Down
5 changes: 4 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
<shadeBase>net.snowflake.ingest.internal</shadeBase>
<slf4j.version>1.7.36</slf4j.version>
<snappy.version>1.1.10.4</snappy.version>
<snowjdbc.version>3.13.30</snowjdbc.version>
<snowjdbc.version>3.14.5</snowjdbc.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the fix of https://github.com/snowflakedb/snowflake-sdks-drivers-issues-teamwork/issues/819 is
on its way, could we wait for it and don't do the tricks with large strings?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, we can wait now that it is pushed. but we will have to wait til next release of JDBC.
are we okay with that @sfc-gh-xhuang ?

Copy link
Contributor

@sfc-gh-xhuang sfc-gh-xhuang Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if they release next week, there's no guarantees it will be a simple upgrade either.
The large strings issue only affects reads and this test, it doesn't affect write behavior which is the only thing we use

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. We are just a bystander for that bug. It is not in our code and they were able to reproduce it themselves. I support not waiting for that fix to land.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JDBC plans to release next week but please consider the trade off of trying to use 3.14.6 vs today's 3.14.5

<yetus.version>0.13.0</yetus.version>
</properties>

Expand Down Expand Up @@ -711,6 +711,9 @@
<configuration>
<rules>
<banDuplicateClasses>
<ignoreClasses>
<ignoreClass>META-INF/versions/*/org/bouncycastle/*</ignoreClass>
</ignoreClasses>
sfc-gh-tzhang marked this conversation as resolved.
Show resolved Hide resolved
<findAllDuplicates>true</findAllDuplicates>
<ignoreWhenIdentical>true</ignoreWhenIdentical>
</banDuplicateClasses>
Expand Down
2 changes: 1 addition & 1 deletion public_pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>3.13.30</version>
<version>3.14.5</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ Builder setOffsetToken(String offsetToken) {
return this;
}


Builder setStartOffsetToken(String startOffsetToken) {
this.startOffsetToken = startOffsetToken;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FileColumnProperties that = (FileColumnProperties) o;
return Objects.equals(columnOrdinal,that.columnOrdinal)
return Objects.equals(columnOrdinal, that.columnOrdinal)
&& distinctValues == that.distinctValues
&& nullCount == that.nullCount
&& maxLength == that.maxLength
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,14 @@ public void setupSchema(List<ColumnMetadata> columns) {
addNonNullableFieldName(column.getInternalName());
}
this.statsMap.put(
column.getInternalName(), new RowBufferStats(column.getName(), column.getCollation(), column.getOrdinal()));
column.getInternalName(),
new RowBufferStats(column.getName(), column.getCollation(), column.getOrdinal()));

if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
this.tempStatsMap.put(
column.getInternalName(), new RowBufferStats(column.getName(), column.getCollation(), column.getOrdinal()));
column.getInternalName(),
new RowBufferStats(column.getName(), column.getCollation(), column.getOrdinal()));
}

id++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class RowBufferStats {
RowBufferStats(String columnDisplayName, String collationDefinitionString, int ordinal) {
this.columnDisplayName = columnDisplayName;
this.collationDefinitionString = collationDefinitionString;
this.ordinal= ordinal;
this.ordinal = ordinal;
reset();
}

Expand All @@ -54,7 +54,8 @@ void reset() {

/** Create new statistics for the same column, with all calculated values set to empty */
RowBufferStats forkEmpty() {
return new RowBufferStats(this.getColumnDisplayName(), this.getCollationDefinitionString(), this.getOrdinal());
return new RowBufferStats(
this.getColumnDisplayName(), this.getCollationDefinitionString(), this.getOrdinal());
}

// TODO performance test this vs in place update
Expand All @@ -68,7 +69,8 @@ static RowBufferStats getCombinedStats(RowBufferStats left, RowBufferStats right
left.getCollationDefinitionString(), right.getCollationDefinitionString()));
}
RowBufferStats combined =
new RowBufferStats(left.columnDisplayName, left.getCollationDefinitionString(), left.getOrdinal());
new RowBufferStats(
left.columnDisplayName, left.getCollationDefinitionString(), left.getOrdinal());

if (left.currentMinIntValue != null) {
combined.addIntValue(left.currentMinIntValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.client.core.Constants.CLOUD_STORAGE_CREDENTIALS_EXPIRED;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE;
import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries;
import static net.snowflake.ingest.utils.Constants.CLIENT_CONFIGURE_ENDPOINT;
Expand Down Expand Up @@ -34,7 +33,6 @@
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode;
import net.snowflake.client.jdbc.internal.google.cloud.storage.StorageException;
import net.snowflake.ingest.connection.IngestResponseException;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.utils.ErrorCode;
Expand Down Expand Up @@ -193,6 +191,13 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount)
.setDestFileName(fullFilePath)
.build());
} catch (Exception e) {
if (retryCount == 0) {
sfc-gh-tzhang marked this conversation as resolved.
Show resolved Hide resolved
// for the first exception, we always perform a metadata refresh.
logger.logInfo(
"Stage metadata need to be refreshed due to upload error: {} on first retry attempt",
e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can move out the log below? Also we need to improve the log since logging only message might miss some information

logger.logInfo(
          "Retrying upload, attempt {}/{} {}", retryCount, maxUploadRetries, e.getMessage());

Copy link
Collaborator Author

@sfc-gh-japatel sfc-gh-japatel Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is preferable since this is indicating it is refreshing metadata. I can add stacktrace but that would be too much on every attempt.

We are already printing your suggested log line below.

this.refreshSnowflakeMetadata();
}
if (retryCount >= maxUploadRetries) {
logger.logError(
"Failed to upload to stage, retry attempts exhausted ({}), client={}, message={}",
Expand All @@ -201,12 +206,6 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount)
e.getMessage());
throw new SFException(e, ErrorCode.IO_ERROR);
}

if (isCredentialsExpiredException(e)) {
sfc-gh-japatel marked this conversation as resolved.
Show resolved Hide resolved
logger.logInfo(
"Stage metadata need to be refreshed due to upload error: {}", e.getMessage());
this.refreshSnowflakeMetadata();
}
retryCount++;
StreamingIngestUtils.sleepForRetry(retryCount);
logger.logInfo(
Expand All @@ -215,25 +214,6 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount)
}
}

/**
* @return Whether the passed exception means that credentials expired and the stage metadata
* should be refreshed from Snowflake. The reasons for refresh is SnowflakeSQLException with
* error code 240001 (thrown by the JDBC driver) or GCP StorageException with HTTP status 401.
*/
static boolean isCredentialsExpiredException(Exception e) {
if (e == null || e.getClass() == null) {
return false;
}

if (e instanceof SnowflakeSQLException) {
return ((SnowflakeSQLException) e).getErrorCode() == CLOUD_STORAGE_CREDENTIALS_EXPIRED;
} else if (e instanceof StorageException) {
return ((StorageException) e).getCode() == 401;
}

return false;
}

SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata()
throws SnowflakeSQLException, IOException {
logger.logInfo("Refresh Snowflake metadata, client={}", clientName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.codec.DecoderException;
Expand Down Expand Up @@ -269,6 +270,7 @@ public void testValidateAndParseTimestamp() throws ParseException {

// Test integer-stored time and scale guessing
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
df.setTimeZone(TimeZone.getTimeZone("UTC"));
sfc-gh-japatel marked this conversation as resolved.
Show resolved Hide resolved
assertEquals(
BigInteger.valueOf(df.parse("1971-01-01 00:00:00.001").getTime())
.multiply(BigInteger.valueOf(1000000)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.codec.binary.Hex;
import org.checkerframework.common.value.qual.IntRange;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -105,7 +104,8 @@ static List<ColumnMetadata> createSchema() {
colChar.setLength(11);
colChar.setScale(0);

List<ColumnMetadata> columns = Arrays.asList(
List<ColumnMetadata> columns =
Arrays.asList(
colTinyIntCase, colTinyInt, colSmallInt, colInt, colBigInt, colDecimal, colChar);
for (int i = 0; i < columns.size(); i++) {
columns.get(i).setOrdinal(i + 1);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package net.snowflake.ingest.streaming.internal;

import static net.snowflake.client.core.Constants.CLOUD_STORAGE_CREDENTIALS_EXPIRED;
import static net.snowflake.ingest.streaming.internal.StreamingIngestStage.isCredentialsExpiredException;
import static net.snowflake.ingest.utils.HttpUtil.HTTP_PROXY_PASSWORD;
import static net.snowflake.ingest.utils.HttpUtil.HTTP_PROXY_USER;
import static net.snowflake.ingest.utils.HttpUtil.NON_PROXY_HOSTS;
Expand Down Expand Up @@ -40,7 +39,6 @@
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
import net.snowflake.client.jdbc.internal.google.cloud.storage.StorageException;
import net.snowflake.client.jdbc.internal.google.common.util.concurrent.ThreadFactoryBuilder;
import net.snowflake.ingest.TestUtils;
import net.snowflake.ingest.connection.RequestBuilder;
Expand Down Expand Up @@ -486,20 +484,66 @@ public void testShouldBypassProxy() {
}

@Test
public void testIsCredentialExpiredException() {
Assert.assertTrue(
isCredentialsExpiredException(
new SnowflakeSQLException("Error", CLOUD_STORAGE_CREDENTIALS_EXPIRED)));
Assert.assertTrue(isCredentialsExpiredException(new StorageException(401, "unauthorized")));

Assert.assertFalse(isCredentialsExpiredException(new StorageException(400, "bad request")));
Assert.assertFalse(isCredentialsExpiredException(null));
Assert.assertFalse(isCredentialsExpiredException(new RuntimeException()));
Assert.assertFalse(
isCredentialsExpiredException(
new RuntimeException(String.valueOf(CLOUD_STORAGE_CREDENTIALS_EXPIRED))));
Assert.assertFalse(
isCredentialsExpiredException(
new SnowflakeSQLException("Error", CLOUD_STORAGE_CREDENTIALS_EXPIRED + 1)));
public void testRefreshMetadataOnFirstPutException() throws Exception {
int maxUploadRetryCount = 2;
JsonNode exampleJson = mapper.readTree(exampleRemoteMeta);
SnowflakeFileTransferMetadataV1 originalMetadata =
(SnowflakeFileTransferMetadataV1)
SnowflakeFileTransferAgent.getFileTransferMetadatas(exampleJson).get(0);

byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8);

StreamingIngestStage stage =
new StreamingIngestStage(
true,
"role",
null,
null,
"clientName",
new StreamingIngestStage.SnowflakeFileTransferMetadataWithAge(
originalMetadata, Optional.of(System.currentTimeMillis())),
maxUploadRetryCount);
PowerMockito.mockStatic(SnowflakeFileTransferAgent.class);
SnowflakeSQLException e =
new SnowflakeSQLException(
"Fake bad creds", CLOUD_STORAGE_CREDENTIALS_EXPIRED, "S3 credentials have expired");
PowerMockito.doAnswer(
new org.mockito.stubbing.Answer() {
private boolean firstInvocation = true;

public Object answer(org.mockito.invocation.InvocationOnMock invocation)
throws Throwable {
if (firstInvocation) {
firstInvocation = false;
throw e; // Throw the exception only for the first invocation
}
return null; // Do nothing on subsequent invocations
}
})
.when(SnowflakeFileTransferAgent.class);
SnowflakeFileTransferAgent.uploadWithoutConnection(Mockito.any());
final ArgumentCaptor<SnowflakeFileTransferConfig> captor =
ArgumentCaptor.forClass(SnowflakeFileTransferConfig.class);

stage.putRemote("test/path", dataBytes);

PowerMockito.verifyStatic(SnowflakeFileTransferAgent.class, times(maxUploadRetryCount));
SnowflakeFileTransferAgent.uploadWithoutConnection(captor.capture());
SnowflakeFileTransferConfig capturedConfig = captor.getValue();

Assert.assertFalse(capturedConfig.getRequireCompress());
Assert.assertEquals(OCSPMode.FAIL_OPEN, capturedConfig.getOcspMode());

SnowflakeFileTransferMetadataV1 capturedMetadata =
(SnowflakeFileTransferMetadataV1) capturedConfig.getSnowflakeFileTransferMetadata();
Assert.assertEquals("test/path", capturedMetadata.getPresignedUrlFileName());
Assert.assertEquals(originalMetadata.getCommandType(), capturedMetadata.getCommandType());
Assert.assertEquals(originalMetadata.getPresignedUrl(), capturedMetadata.getPresignedUrl());
Assert.assertEquals(
originalMetadata.getStageInfo().getStageType(),
capturedMetadata.getStageInfo().getStageType());

InputStream capturedInput = capturedConfig.getUploadStream();
Assert.assertEquals("Hello Upload", IOUtils.toString(capturedInput));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,21 @@ <STREAMING_INGEST_WRITE> void assertVariant(
String expectedValue,
String expectedType)
throws Exception {
assertVariant(
dataType,
streamingIngestWriteValue,
expectedValue,
expectedType,
false /*Not max variant or max array*/);
}

<STREAMING_INGEST_WRITE> void assertVariant(
String dataType,
STREAMING_INGEST_WRITE streamingIngestWriteValue,
String expectedValue,
String expectedType,
boolean isAtMaxValueForDataType)
throws Exception {

String tableName = createTable(dataType);
String offsetToken = UUID.randomUUID().toString();
Expand All @@ -328,24 +343,36 @@ <STREAMING_INGEST_WRITE> void assertVariant(
channel.insertRow(createStreamingIngestRow(streamingIngestWriteValue), offsetToken);
TestUtils.waitForOffset(channel, offsetToken);

String query =
String.format(
"select %s, typeof(%s) from %s", VALUE_COLUMN_NAME, VALUE_COLUMN_NAME, tableName);
String query;
// if added row value is using max possible value, we will not verify its returned type since
// JDBC doesnt parse it properly because of Jackson Databind version 2.15
// we will only verify type of it.
// Check this: https://github.com/snowflakedb/snowflake-sdks-drivers-issues-teamwork/issues/819
// TODO:: SNOW-1051731
if (isAtMaxValueForDataType) {
query = String.format("select typeof(%s) from %s", VALUE_COLUMN_NAME, tableName);
} else {
query =
String.format(
"select typeof(%s), %s from %s", VALUE_COLUMN_NAME, VALUE_COLUMN_NAME, tableName);
}
ResultSet resultSet = conn.createStatement().executeQuery(query);
int counter = 0;
String value = null;
String typeof = null;
while (resultSet.next()) {
counter++;
value = resultSet.getString(1);
typeof = resultSet.getString(2);
typeof = resultSet.getString(1);
if (!isAtMaxValueForDataType) value = resultSet.getString(2);
}

Assert.assertEquals(1, counter);
if (expectedValue == null) {
Assert.assertNull(value);
} else {
Assert.assertEquals(objectMapper.readTree(expectedValue), objectMapper.readTree(value));
if (!isAtMaxValueForDataType) {
if (expectedValue == null) {
Assert.assertNull(value);
} else {
Assert.assertEquals(objectMapper.readTree(expectedValue), objectMapper.readTree(value));
}
}
Assert.assertEquals(expectedType, typeof);
migrateTable(tableName); // migration should always succeed
Expand Down
Loading
Loading