Skip to content

Commit

Permalink
SNOW-987122 Upgrade JDBC to 3.14.5 and Catch new exception type for r… (
Browse files Browse the repository at this point in the history
#677)

* SNOW-987122 Upgrade JDBC to 3.14.5 and Catch new exception type for renewing expired S3 token

* Modify enforcer rule for duplicate classes

* Fix test and set UTC TZ

* Temporary fix for failing long variant, object and array test

* Do not commit this - Long running test

* Remove isCredentialExpired exception and refresh metadata on first failure of upload

* Long running fips e2e test- lukas suggestion

* Revert long running tests

* Remove unused function and formatter

* Add stacktrace in logs
  • Loading branch information
sfc-gh-japatel authored Feb 21, 2024
1 parent 63eda37 commit f8c8e3f
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 61 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ authentication. Currently, we support ingestion through the following APIs:

The Snowflake Ingest Service SDK depends on the following libraries:

* snowflake-jdbc (3.13.30 to 3.13.33)
* snowflake-jdbc (3.13.30 to 3.14.5)
* slf4j-api
* com.github.luben:zstd-jni (1.5.0-1)

Expand All @@ -24,7 +24,7 @@ using a build system, please make sure these dependencies are on the classpath.

# Prerequisites

**If your project depends on the Snowflake JDBC driver, as well, please make sure the JDBC driver version is 3.13.30 to 3.13.33. JDBC driver version 3.14.0 or higher is currently not supported. **
**If your project depends on the Snowflake JDBC driver, as well, please make sure the JDBC driver version is 3.13.30 to 3.14.5.

## Java 8+

Expand Down
5 changes: 4 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
<shadeBase>net.snowflake.ingest.internal</shadeBase>
<slf4j.version>1.7.36</slf4j.version>
<snappy.version>1.1.10.4</snappy.version>
<snowjdbc.version>3.13.30</snowjdbc.version>
<snowjdbc.version>3.14.5</snowjdbc.version>
<yetus.version>0.13.0</yetus.version>
</properties>

Expand Down Expand Up @@ -711,6 +711,9 @@
<configuration>
<rules>
<banDuplicateClasses>
<ignoreClasses>
<ignoreClass>META-INF/versions/*/org/bouncycastle/*</ignoreClass>
</ignoreClasses>
<findAllDuplicates>true</findAllDuplicates>
<ignoreWhenIdentical>true</ignoreWhenIdentical>
</banDuplicateClasses>
Expand Down
2 changes: 1 addition & 1 deletion public_pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>3.13.30</version>
<version>3.14.5</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.client.core.Constants.CLOUD_STORAGE_CREDENTIALS_EXPIRED;
import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE;
import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries;
import static net.snowflake.ingest.utils.Constants.CLIENT_CONFIGURE_ENDPOINT;
import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS;
import static net.snowflake.ingest.utils.HttpUtil.generateProxyPropertiesForJDBC;
import static net.snowflake.ingest.utils.Utils.getStackTrace;

import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayInputStream;
Expand All @@ -34,7 +34,6 @@
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode;
import net.snowflake.client.jdbc.internal.google.cloud.storage.StorageException;
import net.snowflake.ingest.connection.IngestResponseException;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.utils.ErrorCode;
Expand Down Expand Up @@ -193,6 +192,13 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount)
.setDestFileName(fullFilePath)
.build());
} catch (Exception e) {
if (retryCount == 0) {
// for the first exception, we always perform a metadata refresh.
logger.logInfo(
"Stage metadata need to be refreshed due to upload error: {} on first retry attempt",
e.getMessage());
this.refreshSnowflakeMetadata();
}
if (retryCount >= maxUploadRetries) {
logger.logError(
"Failed to upload to stage, retry attempts exhausted ({}), client={}, message={}",
Expand All @@ -201,39 +207,18 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount)
e.getMessage());
throw new SFException(e, ErrorCode.IO_ERROR);
}

if (isCredentialsExpiredException(e)) {
logger.logInfo(
"Stage metadata need to be refreshed due to upload error: {}", e.getMessage());
this.refreshSnowflakeMetadata();
}
retryCount++;
StreamingIngestUtils.sleepForRetry(retryCount);
logger.logInfo(
"Retrying upload, attempt {}/{} {}", retryCount, maxUploadRetries, e.getMessage());
"Retrying upload, attempt {}/{} msg: {}, stackTrace:{}",
retryCount,
maxUploadRetries,
e.getMessage(),
getStackTrace(e));
this.putRemote(fullFilePath, data, retryCount);
}
}

/**
* @return Whether the passed exception means that credentials expired and the stage metadata
* should be refreshed from Snowflake. The reasons for refresh is SnowflakeSQLException with
* error code 240001 (thrown by the JDBC driver) or GCP StorageException with HTTP status 401.
*/
static boolean isCredentialsExpiredException(Exception e) {
if (e == null || e.getClass() == null) {
return false;
}

if (e instanceof SnowflakeSQLException) {
return ((SnowflakeSQLException) e).getErrorCode() == CLOUD_STORAGE_CREDENTIALS_EXPIRED;
} else if (e instanceof StorageException) {
return ((StorageException) e).getCode() == 401;
}

return false;
}

SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata()
throws SnowflakeSQLException, IOException {
logger.logInfo("Refresh Snowflake metadata, client={}", clientName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.codec.DecoderException;
Expand Down Expand Up @@ -269,6 +270,7 @@ public void testValidateAndParseTimestamp() throws ParseException {

// Test integer-stored time and scale guessing
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
df.setTimeZone(TimeZone.getTimeZone("UTC"));
assertEquals(
BigInteger.valueOf(df.parse("1971-01-01 00:00:00.001").getTime())
.multiply(BigInteger.valueOf(1000000)),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package net.snowflake.ingest.streaming.internal;

import static net.snowflake.client.core.Constants.CLOUD_STORAGE_CREDENTIALS_EXPIRED;
import static net.snowflake.ingest.streaming.internal.StreamingIngestStage.isCredentialsExpiredException;
import static net.snowflake.ingest.utils.HttpUtil.HTTP_PROXY_PASSWORD;
import static net.snowflake.ingest.utils.HttpUtil.HTTP_PROXY_USER;
import static net.snowflake.ingest.utils.HttpUtil.NON_PROXY_HOSTS;
Expand Down Expand Up @@ -40,7 +39,6 @@
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
import net.snowflake.client.jdbc.internal.google.cloud.storage.StorageException;
import net.snowflake.client.jdbc.internal.google.common.util.concurrent.ThreadFactoryBuilder;
import net.snowflake.ingest.TestUtils;
import net.snowflake.ingest.connection.RequestBuilder;
Expand Down Expand Up @@ -486,20 +484,66 @@ public void testShouldBypassProxy() {
}

@Test
public void testIsCredentialExpiredException() {
Assert.assertTrue(
isCredentialsExpiredException(
new SnowflakeSQLException("Error", CLOUD_STORAGE_CREDENTIALS_EXPIRED)));
Assert.assertTrue(isCredentialsExpiredException(new StorageException(401, "unauthorized")));

Assert.assertFalse(isCredentialsExpiredException(new StorageException(400, "bad request")));
Assert.assertFalse(isCredentialsExpiredException(null));
Assert.assertFalse(isCredentialsExpiredException(new RuntimeException()));
Assert.assertFalse(
isCredentialsExpiredException(
new RuntimeException(String.valueOf(CLOUD_STORAGE_CREDENTIALS_EXPIRED))));
Assert.assertFalse(
isCredentialsExpiredException(
new SnowflakeSQLException("Error", CLOUD_STORAGE_CREDENTIALS_EXPIRED + 1)));
public void testRefreshMetadataOnFirstPutException() throws Exception {
int maxUploadRetryCount = 2;
JsonNode exampleJson = mapper.readTree(exampleRemoteMeta);
SnowflakeFileTransferMetadataV1 originalMetadata =
(SnowflakeFileTransferMetadataV1)
SnowflakeFileTransferAgent.getFileTransferMetadatas(exampleJson).get(0);

byte[] dataBytes = "Hello Upload".getBytes(StandardCharsets.UTF_8);

StreamingIngestStage stage =
new StreamingIngestStage(
true,
"role",
null,
null,
"clientName",
new StreamingIngestStage.SnowflakeFileTransferMetadataWithAge(
originalMetadata, Optional.of(System.currentTimeMillis())),
maxUploadRetryCount);
PowerMockito.mockStatic(SnowflakeFileTransferAgent.class);
SnowflakeSQLException e =
new SnowflakeSQLException(
"Fake bad creds", CLOUD_STORAGE_CREDENTIALS_EXPIRED, "S3 credentials have expired");
PowerMockito.doAnswer(
new org.mockito.stubbing.Answer() {
private boolean firstInvocation = true;

public Object answer(org.mockito.invocation.InvocationOnMock invocation)
throws Throwable {
if (firstInvocation) {
firstInvocation = false;
throw e; // Throw the exception only for the first invocation
}
return null; // Do nothing on subsequent invocations
}
})
.when(SnowflakeFileTransferAgent.class);
SnowflakeFileTransferAgent.uploadWithoutConnection(Mockito.any());
final ArgumentCaptor<SnowflakeFileTransferConfig> captor =
ArgumentCaptor.forClass(SnowflakeFileTransferConfig.class);

stage.putRemote("test/path", dataBytes);

PowerMockito.verifyStatic(SnowflakeFileTransferAgent.class, times(maxUploadRetryCount));
SnowflakeFileTransferAgent.uploadWithoutConnection(captor.capture());
SnowflakeFileTransferConfig capturedConfig = captor.getValue();

Assert.assertFalse(capturedConfig.getRequireCompress());
Assert.assertEquals(OCSPMode.FAIL_OPEN, capturedConfig.getOcspMode());

SnowflakeFileTransferMetadataV1 capturedMetadata =
(SnowflakeFileTransferMetadataV1) capturedConfig.getSnowflakeFileTransferMetadata();
Assert.assertEquals("test/path", capturedMetadata.getPresignedUrlFileName());
Assert.assertEquals(originalMetadata.getCommandType(), capturedMetadata.getCommandType());
Assert.assertEquals(originalMetadata.getPresignedUrl(), capturedMetadata.getPresignedUrl());
Assert.assertEquals(
originalMetadata.getStageInfo().getStageType(),
capturedMetadata.getStageInfo().getStageType());

InputStream capturedInput = capturedConfig.getUploadStream();
Assert.assertEquals("Hello Upload", IOUtils.toString(capturedInput));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,21 @@ <STREAMING_INGEST_WRITE> void assertVariant(
String expectedValue,
String expectedType)
throws Exception {
assertVariant(
dataType,
streamingIngestWriteValue,
expectedValue,
expectedType,
false /*Not max variant or max array*/);
}

<STREAMING_INGEST_WRITE> void assertVariant(
String dataType,
STREAMING_INGEST_WRITE streamingIngestWriteValue,
String expectedValue,
String expectedType,
boolean isAtMaxValueForDataType)
throws Exception {

String tableName = createTable(dataType);
String offsetToken = UUID.randomUUID().toString();
Expand All @@ -328,24 +343,36 @@ <STREAMING_INGEST_WRITE> void assertVariant(
channel.insertRow(createStreamingIngestRow(streamingIngestWriteValue), offsetToken);
TestUtils.waitForOffset(channel, offsetToken);

String query =
String.format(
"select %s, typeof(%s) from %s", VALUE_COLUMN_NAME, VALUE_COLUMN_NAME, tableName);
String query;
// if added row value is using max possible value, we will not verify its returned type since
// JDBC doesnt parse it properly because of Jackson Databind version 2.15
// we will only verify type of it.
// Check this: https://github.com/snowflakedb/snowflake-sdks-drivers-issues-teamwork/issues/819
// TODO:: SNOW-1051731
if (isAtMaxValueForDataType) {
query = String.format("select typeof(%s) from %s", VALUE_COLUMN_NAME, tableName);
} else {
query =
String.format(
"select typeof(%s), %s from %s", VALUE_COLUMN_NAME, VALUE_COLUMN_NAME, tableName);
}
ResultSet resultSet = conn.createStatement().executeQuery(query);
int counter = 0;
String value = null;
String typeof = null;
while (resultSet.next()) {
counter++;
value = resultSet.getString(1);
typeof = resultSet.getString(2);
typeof = resultSet.getString(1);
if (!isAtMaxValueForDataType) value = resultSet.getString(2);
}

Assert.assertEquals(1, counter);
if (expectedValue == null) {
Assert.assertNull(value);
} else {
Assert.assertEquals(objectMapper.readTree(expectedValue), objectMapper.readTree(value));
if (!isAtMaxValueForDataType) {
if (expectedValue == null) {
Assert.assertNull(value);
} else {
Assert.assertEquals(objectMapper.readTree(expectedValue), objectMapper.readTree(value));
}
}
Assert.assertEquals(expectedType, typeof);
migrateTable(tableName); // migration should always succeed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,31 @@ public void testVariant() throws Exception {
assertVariant("VARIANT", null, null, null);
}

/**
* For JDBC version > 3.13.3 we dont verify the value returned from max variant, max array and max
* object because of
* https://github.com/snowflakedb/snowflake-sdks-drivers-issues-teamwork/issues/819
*
* @throws Exception
*/
@Test
public void testMaxVariantAndObject() throws Exception {
String maxObject = createLargeVariantObject(MAX_ALLOWED_LENGTH);
assertVariant("VARIANT", maxObject, maxObject, "OBJECT");
assertVariant("OBJECT", maxObject, maxObject, "OBJECT");
assertVariant("VARIANT", maxObject, maxObject, "OBJECT", true);
assertVariant("OBJECT", maxObject, maxObject, "OBJECT", true);
}

/**
* For JDBC version > 3.13.3 we dont verify the value returned from max variant, max array and max
* object because of
* https://github.com/snowflakedb/snowflake-sdks-drivers-issues-teamwork/issues/819
*
* @throws Exception
*/
@Test
public void testMaxArray() throws Exception {
String maxArray = "[" + createLargeVariantObject(MAX_ALLOWED_LENGTH - 2) + "]";
assertVariant("ARRAY", maxArray, maxArray, "ARRAY");
assertVariant("ARRAY", maxArray, maxArray, "ARRAY", true);
}

@Test
Expand Down

0 comments on commit f8c8e3f

Please sign in to comment.