diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStage.java b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStage.java index e8e56f383..ed73e3774 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStage.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStage.java @@ -17,6 +17,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Paths; +import java.time.Duration; +import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -47,6 +49,11 @@ class StreamingIngestStage { private static final long REFRESH_THRESHOLD_IN_MS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES); + // Stage credential refresh interval, currently the token will expire in 1hr for GCS and 2hr for + // AWS/Azure, so set it a bit smaller than 1hr + private static final Duration refreshDuration = Duration.ofMinutes(58); + private static Instant prevRefresh = Instant.EPOCH; + private static final Logging logger = new Logging(StreamingIngestStage.class); /** @@ -180,6 +187,12 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount) InputStream inStream = new ByteArrayInputStream(data); try { + // Proactively refresh the credential if it's going to expire, to avoid the token expiration + // error from JDBC which confuses customer + if (Instant.now().isAfter(prevRefresh.plus(refreshDuration))) { + refreshSnowflakeMetadata(); + } + SnowflakeFileTransferAgent.uploadWithoutConnection( SnowflakeFileTransferConfig.Builder.newInstance() .setSnowflakeFileTransferMetadata(fileTransferMetadataCopy) @@ -194,9 +207,6 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount) } catch (Exception e) { if (retryCount == 0) { // for the first exception, we always perform a metadata refresh. - logger.logInfo( - "Stage metadata need to be refreshed due to upload error: {} on first retry attempt", - e.getMessage()); this.refreshSnowflakeMetadata(); } if (retryCount >= maxUploadRetries) { @@ -281,6 +291,8 @@ synchronized SnowflakeFileTransferMetadataWithAge refreshSnowflakeMetadata(boole SnowflakeFileTransferAgent.getFileTransferMetadatas(responseNode).get(0), Optional.of(System.currentTimeMillis())); } + + prevRefresh = Instant.now(); return this.fileTransferMetadataWithAge; }