Skip to content

Commit

Permalink
SNOW-995369 GCS token refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-lsembera committed Jan 15, 2024
1 parent bb014fc commit 77cd647
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<!-- Arifact name and version information -->
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>2.0.5-SNAPSHOT</version>
<version>2.0.999</version>
<packaging>jar</packaging>
<name>Snowflake Ingest SDK</name>
<description>Snowflake Ingest SDK</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public class RequestBuilder {
// Don't change!
public static final String CLIENT_NAME = "SnowpipeJavaSDK";

public static final String DEFAULT_VERSION = "2.0.5-SNAPSHOT";
public static final String DEFAULT_VERSION = "2.0.999";

public static final String JAVA_USER_AGENT = "JAVA";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import net.snowflake.client.core.OCSPMode;
Expand All @@ -46,7 +49,13 @@ class StreamingIngestStage {
private static final ObjectMapper mapper = new ObjectMapper();
private static final long REFRESH_THRESHOLD_IN_MS =
TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
static final int MAX_RETRY_COUNT = 1;
static final int MAX_RETRY_COUNT = 10;

private static final Set<String> GCP_STORAGE_EXCEPTION_CLASS_NAMES =
new HashSet<>(
Arrays.asList(
"net.snowflake.client.jdbc.internal.google.cloud.storage.StorageException",
"com.google.cloud.storage.StorageException"));

private static final Logging logger = new Logging(StreamingIngestStage.class);

Expand Down Expand Up @@ -187,19 +196,36 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount)
.setProxyProperties(this.proxyProperties)
.setDestFileName(fullFilePath)
.build());
} catch (SnowflakeSQLException e) {
if (e.getErrorCode() != CLOUD_STORAGE_CREDENTIALS_EXPIRED || retryCount >= MAX_RETRY_COUNT) {
} catch (Exception e) {
if (retryCount >= MAX_RETRY_COUNT) {
logger.logError(
"Failed to upload to stage, client={}, message={}", clientName, e.getMessage());
throw e;
"Failed to upload to stage, retry attempts exhausted ({}), client={}, message={}", MAX_RETRY_COUNT, clientName, e.getMessage());
throw new SFException(e, ErrorCode.IO_ERROR);
}
this.refreshSnowflakeMetadata();
this.putRemote(fullFilePath, data, ++retryCount);
} catch (Exception e) {
throw new SFException(e, ErrorCode.IO_ERROR);

if (isCredentialsExpiredException(e)) {
logger.logInfo("Stage metadata need to be refreshed due to upload error: {}", e.getMessage());
this.refreshSnowflakeMetadata();
}
retryCount++;
StreamingIngestUtils.sleepForRetry(retryCount);
logger.logInfo("Retrying upload, attempt {}/{}", retryCount, MAX_RETRY_COUNT);
this.putRemote(fullFilePath, data, retryCount);
}
}

boolean isCredentialsExpiredException(Exception e) {
if (e == null || e.getClass() == null) return false;

if (e instanceof SnowflakeSQLException) {
return ((SnowflakeSQLException) e).getErrorCode() == CLOUD_STORAGE_CREDENTIALS_EXPIRED;
}

return e.getMessage() != null
&& GCP_STORAGE_EXCEPTION_CLASS_NAMES.contains(e.getClass().getName())
&& e.getMessage().contains("401 Unauthorized");
}

SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata()
throws SnowflakeSQLException, IOException {
logger.logInfo("Refresh Snowflake metadata, client={}", clientName);
Expand Down Expand Up @@ -399,7 +425,6 @@ void putLocal(String fullFilePath, byte[] data) {
String stageLocation = this.fileTransferMetadataWithAge.localLocation;
File destFile = Paths.get(stageLocation, fullFilePath).toFile();
FileUtils.copyInputStreamToFile(input, destFile);
System.out.println("Filename: " + destFile); // TODO @rcheng - remove this before merge
} catch (Exception ex) {
throw new SFException(ex, ErrorCode.BLOB_UPLOAD_FAILURE);
}
Expand Down

0 comments on commit 77cd647

Please sign in to comment.