diff --git a/src/main/java/net/snowflake/client/jdbc/SnowflakeFileTransferAgent.java b/src/main/java/net/snowflake/client/jdbc/SnowflakeFileTransferAgent.java index d6c06b910..cf4996715 100644 --- a/src/main/java/net/snowflake/client/jdbc/SnowflakeFileTransferAgent.java +++ b/src/main/java/net/snowflake/client/jdbc/SnowflakeFileTransferAgent.java @@ -87,7 +87,7 @@ public class SnowflakeFileTransferAgent extends SFBaseFileTransferAgent { private String localLocation; // Query ID of PUT or GET statement - private String queryID = ""; + private String queryID = null; // default parallelism private int parallel = DEFAULT_PARALLEL; @@ -295,7 +295,7 @@ static class InputStreamWithMetadata { * @throws SnowflakeSQLException if encountered exception when compressing */ private static InputStreamWithMetadata compressStreamWithGZIP( - InputStream inputStream, SFBaseSession session) throws SnowflakeSQLException { + InputStream inputStream, SFBaseSession session, String queryId) throws SnowflakeSQLException { FileBackedOutputStream tempStream = new FileBackedOutputStream(MAX_BUFFER_SIZE, true); try { @@ -335,6 +335,7 @@ private static InputStreamWithMetadata compressStreamWithGZIP( logger.error("Exception compressing input stream", ex); throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), @@ -353,7 +354,7 @@ private static InputStreamWithMetadata compressStreamWithGZIP( */ @Deprecated private static InputStreamWithMetadata compressStreamWithGZIPNoDigest( - InputStream inputStream, SFBaseSession session) throws SnowflakeSQLException { + InputStream inputStream, SFBaseSession session, String queryId) throws SnowflakeSQLException { try { FileBackedOutputStream tempStream = new FileBackedOutputStream(MAX_BUFFER_SIZE, true); @@ -383,6 +384,7 @@ private static InputStreamWithMetadata compressStreamWithGZIPNoDigest( logger.error("Exception compressing input stream", ex); throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), @@ -425,6 +427,9 @@ private static InputStreamWithMetadata computeDigest(InputStream is, boolean res * *

The callable does compression if needed and upload the result to the table's staging area. * + * @deprecated use {@link #getUploadFileCallable(StageInfo, String, FileMetadata, + * SnowflakeStorageClient, SFSession, String, InputStream, boolean, int, File, + * RemoteStoreFileEncryptionMaterial, String)} * @param stage information about the stage * @param srcFilePath source file path * @param metadata file metadata @@ -438,6 +443,7 @@ private static InputStreamWithMetadata computeDigest(InputStream is, boolean res * @param encMat not null if encryption is required * @return a callable that uploading file to the remote store */ + @Deprecated public static Callable getUploadFileCallable( final StageInfo stage, final String srcFilePath, @@ -450,6 +456,53 @@ public static Callable getUploadFileCallable( final int parallel, final File srcFile, final RemoteStoreFileEncryptionMaterial encMat) { + return getUploadFileCallable( + stage, + srcFilePath, + metadata, + client, + session, + command, + inputStream, + sourceFromStream, + parallel, + srcFile, + encMat, + null); + } + + /** + * A callable that can be executed in a separate thread using executor service. + * + *

The callable does compression if needed and upload the result to the table's staging area. + * + * @param stage information about the stage + * @param srcFilePath source file path + * @param metadata file metadata + * @param client client object used to communicate with c3 + * @param session session object + * @param command command string + * @param inputStream null if upload source is file + * @param sourceFromStream whether upload source is file or stream + * @param parallel number of threads for parallel uploading + * @param srcFile source file name + * @param encMat not null if encryption is required + * @param queryId last executed query id (for forwarding in possible exceptions) + * @return a callable that uploading file to the remote store + */ + public static Callable getUploadFileCallable( + final StageInfo stage, + final String srcFilePath, + final FileMetadata metadata, + final SnowflakeStorageClient client, + final SFSession session, + final String command, + final InputStream inputStream, + final boolean sourceFromStream, + final int parallel, + final File srcFile, + final RemoteStoreFileEncryptionMaterial encMat, + final String queryId) { return new Callable() { public Void call() throws Exception { @@ -484,6 +537,7 @@ public Void call() throws Exception { // this shouldn't happen if (metadata == null) { throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -507,8 +561,8 @@ public Void call() throws Exception { if (metadata.requireCompress) { InputStreamWithMetadata compressedSizeAndStream = (encMat == null - ? compressStreamWithGZIPNoDigest(uploadStream, session) - : compressStreamWithGZIP(uploadStream, session)); + ? compressStreamWithGZIPNoDigest(uploadStream, session, queryId) + : compressStreamWithGZIP(uploadStream, session, queryId)); fileBackedOutputStream = compressedSizeAndStream.fileBackedOutputStream; @@ -572,7 +626,8 @@ public Void call() throws Exception { destFileName, uploadStream, fileBackedOutputStream, - session); + session, + queryId); break; case S3: @@ -594,7 +649,8 @@ public Void call() throws Exception { (fileToUpload == null), encMat, null, - null); + null, + queryId); metadata.isEncrypted = encMat != null; break; } @@ -640,6 +696,9 @@ public Void call() throws Exception { * *

The callable download files from a stage location to a local location * + * @deprecated use {@link #getDownloadFileCallable(StageInfo, String, String, Map, + * SnowflakeStorageClient, SFSession, String, int, RemoteStoreFileEncryptionMaterial, String, + * String)} * @param stage stage information * @param srcFilePath path that stores the downloaded file * @param localLocation local location @@ -652,6 +711,7 @@ public Void call() throws Exception { * @param presignedUrl Presigned URL for file download * @return a callable responsible for downloading files */ + @Deprecated public static Callable getDownloadFileCallable( final StageInfo stage, final String srcFilePath, @@ -663,6 +723,49 @@ public static Callable getDownloadFileCallable( final int parallel, final RemoteStoreFileEncryptionMaterial encMat, final String presignedUrl) { + return getDownloadFileCallable( + stage, + srcFilePath, + localLocation, + fileMetadataMap, + client, + session, + command, + parallel, + encMat, + presignedUrl, + null); + } + + /** + * A callable that can be executed in a separate thread using executor service. + * + *

The callable download files from a stage location to a local location + * + * @param stage stage information + * @param srcFilePath path that stores the downloaded file + * @param localLocation local location + * @param fileMetadataMap file metadata map + * @param client remote store client + * @param session session object + * @param command command string + * @param encMat remote store encryption material + * @param parallel number of parallel threads for downloading + * @param presignedUrl Presigned URL for file download + * @return a callable responsible for downloading files + */ + public static Callable getDownloadFileCallable( + final StageInfo stage, + final String srcFilePath, + final String localLocation, + final Map fileMetadataMap, + final SnowflakeStorageClient client, + final SFSession session, + final String command, + final int parallel, + final RemoteStoreFileEncryptionMaterial encMat, + final String presignedUrl, + final String queryId) { return new Callable() { public Void call() throws Exception { @@ -676,6 +779,7 @@ public Void call() throws Exception { // this shouldn't happen if (metadata == null) { throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -695,7 +799,7 @@ public Void call() throws Exception { switch (stage.getStageType()) { case LOCAL_FS: pullFileFromLocal( - stage.getLocation(), srcFilePath, localLocation, destFileName, session); + stage.getLocation(), srcFilePath, localLocation, destFileName, session, queryId); break; case AZURE: @@ -711,7 +815,8 @@ public Void call() throws Exception { command, parallel, encMat, - presignedUrl); + presignedUrl, + queryId); metadata.isEncrypted = encMat != null; break; } @@ -800,6 +905,7 @@ private void parseCommand() throws SnowflakeSQLException { initPresignedUrls(commandType, jsonNode); } catch (Exception ex) { throw new SnowflakeSQLException( + queryID, ex, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), @@ -826,7 +932,7 @@ private void parseCommand() throws SnowflakeSQLException { localFilePathFromGS = src_locations[0]; } - sourceFiles = expandFileNames(src_locations); + sourceFiles = expandFileNames(src_locations, queryID); autoCompress = jsonNode.path("data").path("autoCompress").asBoolean(true); @@ -867,6 +973,7 @@ private void parseCommand() throws SnowflakeSQLException { // it should not start with any ~ after the above replacement if (localLocation.startsWith("~")) { throw new SnowflakeSQLLoggedException( + queryID, session, ErrorCode.PATH_NOT_DIRECTORY.getMessageCode(), SqlState.IO_ERROR, @@ -890,6 +997,7 @@ private void parseCommand() throws SnowflakeSQLException { // local location should be a directory if ((new File(localLocation)).isFile()) { throw new SnowflakeSQLLoggedException( + queryID, session, ErrorCode.PATH_NOT_DIRECTORY.getMessageCode(), SqlState.IO_ERROR, @@ -938,6 +1046,7 @@ private void parseCommand() throws SnowflakeSQLException { * @throws SnowflakeSQLException */ static StageInfo getStageInfo(JsonNode jsonNode, SFSession session) throws SnowflakeSQLException { + String queryId = jsonNode.path("data").path("queryId").asText(); // more parameters common to upload/download String stageLocation = jsonNode.path("data").path("stageInfo").path("location").asText(); @@ -1005,7 +1114,7 @@ static StageInfo getStageInfo(JsonNode jsonNode, SFSession session) throws Snowf } } - Map stageCredentials = extractStageCreds(jsonNode); + Map stageCredentials = extractStageCreds(jsonNode, queryId); StageInfo stageInfo = StageInfo.createStageInfo( @@ -1048,6 +1157,7 @@ static StageInfo getStageInfo(JsonNode jsonNode, SFSession session) throws Snowf return stageInfo; } + /** * A helper method to verify if the local file path from GS matches what's parsed locally. This is * for security purpose as documented in SNOW-15153. @@ -1061,6 +1171,7 @@ private void verifyLocalFilePath(String localFilePathFromGS) throws SnowflakeSQL if (!localFilePath.isEmpty() && !localFilePath.equals(localFilePathFromGS)) { throw new SnowflakeSQLLoggedException( + queryID, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -1155,7 +1266,8 @@ private static JsonNode parseCommandInGS(SFStatement statement, String command) false, // async new ExecTimeTelemetryData()); // OOB telemetry timing queries } catch (SFException ex) { - throw new SnowflakeSQLException(ex, ex.getSqlState(), ex.getVendorCode(), ex.getParams()); + throw new SnowflakeSQLException( + ex.getQueryId(), ex, ex.getSqlState(), ex.getVendorCode(), ex.getParams()); } JsonNode jsonNode = (JsonNode) result; @@ -1169,7 +1281,8 @@ private static JsonNode parseCommandInGS(SFStatement statement, String command) * @param rootNode JSON doc returned by GS * @throws SnowflakeSQLException Will be thrown if we fail to parse the stage credentials */ - private static Map extractStageCreds(JsonNode rootNode) throws SnowflakeSQLException { + private static Map extractStageCreds(JsonNode rootNode, String queryId) + throws SnowflakeSQLException { JsonNode credsNode = rootNode.path("data").path("stageInfo").path("creds"); Map stageCredentials = null; @@ -1180,6 +1293,7 @@ private static JsonNode parseCommandInGS(SFStatement statement, String command) } catch (Exception ex) { throw new SnowflakeSQLException( + queryId, ex, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), @@ -1207,6 +1321,7 @@ public List getFileTransferMetadatas() && stageInfo.getStageType() != StageInfo.StageType.AZURE && stageInfo.getStageType() != StageInfo.StageType.S3) { throw new SnowflakeSQLLoggedException( + queryID, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -1215,6 +1330,7 @@ public List getFileTransferMetadatas() if (commandType != CommandType.UPLOAD) { throw new SnowflakeSQLLoggedException( + queryID, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -1250,19 +1366,37 @@ public List getFileTransferMetadatas() */ public static List getFileTransferMetadatas(JsonNode jsonNode) throws SnowflakeSQLException { + return getFileTransferMetadatas(jsonNode, null); + } + + /** + * This is API function to parse the File Transfer Metadatas from a supplied PUT call response. + * + *

NOTE: It only supports PUT on S3/AZURE/GCS (i.e. NOT LOCAL_FS) + * + *

It also assumes there is no active SFSession + * + * @param jsonNode JSON doc returned by GS from PUT call + * @param queryId String last executed query id if available + * @return The file transfer metadatas for to-be-transferred files. + * @throws SnowflakeSQLException if any error occurs + */ + public static List getFileTransferMetadatas( + JsonNode jsonNode, String queryId) throws SnowflakeSQLException { CommandType commandType = !jsonNode.path("data").path("command").isMissingNode() ? CommandType.valueOf(jsonNode.path("data").path("command").asText()) : CommandType.UPLOAD; if (commandType != CommandType.UPLOAD) { throw new SnowflakeSQLException( - ErrorCode.INTERNAL_ERROR, "This API only supports PUT commands"); + queryId, ErrorCode.INTERNAL_ERROR, "This API only supports PUT commands"); } JsonNode locationsNode = jsonNode.path("data").path("src_locations"); if (!locationsNode.isArray()) { - throw new SnowflakeSQLException(ErrorCode.INTERNAL_ERROR, "src_locations must be an array"); + throw new SnowflakeSQLException( + queryId, ErrorCode.INTERNAL_ERROR, "src_locations must be an array"); } final String[] srcLocations; @@ -1271,13 +1405,16 @@ public static List getFileTransferMetadatas(JsonN srcLocations = mapper.readValue(locationsNode.toString(), String[].class); } catch (Exception ex) { throw new SnowflakeSQLException( - ErrorCode.INTERNAL_ERROR, "Failed to parse the locations due to: " + ex.getMessage()); + queryId, + ErrorCode.INTERNAL_ERROR, + "Failed to parse the locations due to: " + ex.getMessage()); } try { encryptionMaterial = getEncryptionMaterial(commandType, jsonNode); } catch (Exception ex) { throw new SnowflakeSQLException( + queryId, ErrorCode.INTERNAL_ERROR, "Failed to parse encryptionMaterial due to: " + ex.getMessage()); } @@ -1285,7 +1422,7 @@ public static List getFileTransferMetadatas(JsonN // For UPLOAD we expect encryptionMaterial to have length 1 assert encryptionMaterial.size() == 1; - final Set sourceFiles = expandFileNames(srcLocations); + final Set sourceFiles = expandFileNames(srcLocations, queryId); StageInfo stageInfo = getStageInfo(jsonNode, null /*SFSession*/); @@ -1294,6 +1431,7 @@ public static List getFileTransferMetadatas(JsonN && stageInfo.getStageType() != StageInfo.StageType.AZURE && stageInfo.getStageType() != StageInfo.StageType.S3) { throw new SnowflakeSQLException( + queryId, ErrorCode.INTERNAL_ERROR, "This API only supports S3/AZURE/GCS, received=" + stageInfo.getStageType()); } @@ -1431,10 +1569,11 @@ private void uploadStream() throws SnowflakeSQLException { true, parallel, null, - encMat)); + encMat, + queryID)); } else if (commandType == CommandType.DOWNLOAD) { throw new SnowflakeSQLLoggedException( - session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR); + queryID, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR); } threadExecutor.shutdown(); @@ -1451,10 +1590,13 @@ private void uploadStream() throws SnowflakeSQLException { uploadTask.get(); } catch (InterruptedException ex) { throw new SnowflakeSQLLoggedException( - session, ErrorCode.INTERRUPTED.getMessageCode(), SqlState.QUERY_CANCELED); + queryID, session, ErrorCode.INTERRUPTED.getMessageCode(), SqlState.QUERY_CANCELED); } catch (ExecutionException ex) { throw new SnowflakeSQLException( - ex.getCause(), SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode()); + queryID, + ex.getCause(), + SqlState.INTERNAL_ERROR, + ErrorCode.INTERNAL_ERROR.getMessageCode()); } logger.debug("Done with uploading from a stream"); } finally { @@ -1472,6 +1614,7 @@ public InputStream downloadStream(String fileName) throws SnowflakeSQLException logger.error("downloadStream function doesn't support local file system", false); throw new SnowflakeSQLException( + queryID, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), session, @@ -1498,7 +1641,8 @@ public InputStream downloadStream(String fileName) throws SnowflakeSQLException remoteLocation.location, stageFilePath, stageInfo.getRegion(), - presignedUrl); + presignedUrl, + queryID); } /** Helper to download files from remote */ @@ -1535,7 +1679,8 @@ private void downloadFiles() throws SnowflakeSQLException { command, parallel, encMat, - presignedUrl)); + presignedUrl, + queryID)); logger.debug("submitted download job for: {}", srcFile); } @@ -1547,7 +1692,7 @@ private void downloadFiles() throws SnowflakeSQLException { threadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); } catch (InterruptedException ex) { throw new SnowflakeSQLLoggedException( - session, ErrorCode.INTERRUPTED.getMessageCode(), SqlState.QUERY_CANCELED); + queryID, session, ErrorCode.INTERRUPTED.getMessageCode(), SqlState.QUERY_CANCELED); } logger.debug("Done with downloading"); } finally { @@ -1630,7 +1775,8 @@ private void uploadFiles(Set fileList, int parallel) throws SnowflakeSQL false, (parallel > 1 ? 1 : this.parallel), srcFileObj, - encryptionMaterial.get(0))); + encryptionMaterial.get(0), + queryID)); logger.debug("submitted copy job for: {}", srcFile); } @@ -1643,7 +1789,7 @@ private void uploadFiles(Set fileList, int parallel) throws SnowflakeSQL threadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); } catch (InterruptedException ex) { throw new SnowflakeSQLLoggedException( - session, ErrorCode.INTERRUPTED.getMessageCode(), SqlState.QUERY_CANCELED); + queryID, session, ErrorCode.INTERRUPTED.getMessageCode(), SqlState.QUERY_CANCELED); } logger.debug("Done with uploading"); @@ -1692,7 +1838,8 @@ public void cancel() { * @return a set of file names that is matched * @throws SnowflakeSQLException if cannot find the file */ - static Set expandFileNames(String[] filePathList) throws SnowflakeSQLException { + static Set expandFileNames(String[] filePathList, String queryId) + throws SnowflakeSQLException { Set result = new HashSet(); // a location to file pattern map so that we only need to list the @@ -1773,6 +1920,7 @@ static Set expandFileNames(String[] filePathList) throws SnowflakeSQLExc } } catch (Exception ex) { throw new SnowflakeSQLException( + queryId, ex, SqlState.DATA_EXCEPTION, ErrorCode.FAIL_LIST_FILES.getMessageCode(), @@ -1800,7 +1948,8 @@ private static boolean pushFileToLocal( String destFileName, InputStream inputStream, FileBackedOutputStream fileBackedOutStr, - SFBaseSession session) + SFBaseSession session, + String queryId) throws SQLException { // replace ~ with user home @@ -1821,6 +1970,7 @@ private static boolean pushFileToLocal( FileUtils.copyInputStreamToFile(inputStream, destFile); } catch (Exception ex) { throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), @@ -1836,7 +1986,8 @@ private static boolean pullFileFromLocal( String filePath, String destLocation, String destFileName, - SFBaseSession session) + SFBaseSession session, + String queryId) throws SQLException { try { logger.debug( @@ -1851,6 +2002,7 @@ private static boolean pullFileFromLocal( FileUtils.copyFileToDirectory(srcFile, new File(destLocation)); } catch (Exception ex) { throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), @@ -1877,7 +2029,8 @@ private static void pushFileToRemoteStore( boolean uploadFromStream, RemoteStoreFileEncryptionMaterial encMat, String streamingIngestClientName, - String streamingIngestClientKey) + String streamingIngestClientKey, + String queryId) throws SQLException, IOException { remoteLocation remoteLocation = extractLocationAndPath(stage.getLocation()); @@ -1936,7 +2089,8 @@ private static void pushFileToRemoteStore( fileBackedOutStr, meta, stage.getRegion(), - presignedUrl); + presignedUrl, + queryId); } finally { if (uploadFromStream && inputStream != null) { inputStream.close(); @@ -1994,8 +2148,8 @@ public static void uploadWithoutConnection(SnowflakeFileTransferConfig config) t if (requireCompress) { InputStreamWithMetadata compressedSizeAndStream = (encMat == null - ? compressStreamWithGZIPNoDigest(uploadStream, /* session = */ null) - : compressStreamWithGZIP(uploadStream, /* session = */ null)); + ? compressStreamWithGZIPNoDigest(uploadStream, /* session= */ null, null) + : compressStreamWithGZIP(uploadStream, /* session= */ null, encMat.getQueryId())); fileBackedOutputStream = compressedSizeAndStream.fileBackedOutputStream; @@ -2031,13 +2185,14 @@ public static void uploadWithoutConnection(SnowflakeFileTransferConfig config) t uploadSize); SnowflakeStorageClient initialClient = - storageFactory.createClient(stageInfo, 1, encMat, /*session = */ null); + storageFactory.createClient(stageInfo, 1, encMat, /* session= */ null); // Normal flow will never hit here. This is only for testing purposes if (isInjectedFileTransferExceptionEnabled()) { throw (Exception) SnowflakeFileTransferAgent.injectedFileTransferException; } + String queryId = encMat != null && encMat.getQueryId() != null ? encMat.getQueryId() : null; switch (stageInfo.getStageType()) { case S3: case AZURE: @@ -2057,7 +2212,8 @@ public static void uploadWithoutConnection(SnowflakeFileTransferConfig config) t (fileToUpload == null), encMat, streamingIngestClientName, - streamingIngestClientKey); + streamingIngestClientKey, + queryId); break; case GCS: // If the down-scoped token is used to upload file, one metadata may be used to upload @@ -2084,7 +2240,8 @@ public static void uploadWithoutConnection(SnowflakeFileTransferConfig config) t encMat, metadata.getPresignedUrl(), streamingIngestClientName, - streamingIngestClientKey); + streamingIngestClientKey, + queryId); break; } } catch (Exception ex) { @@ -2124,7 +2281,8 @@ private static void pushFileToRemoteStoreWithPresignedUrl( RemoteStoreFileEncryptionMaterial encMat, String presignedUrl, String streamingIngestClientName, - String streamingIngestClientKey) + String streamingIngestClientKey, + String queryId) throws SQLException, IOException { remoteLocation remoteLocation = extractLocationAndPath(stage.getLocation()); @@ -2169,7 +2327,8 @@ private static void pushFileToRemoteStoreWithPresignedUrl( fileBackedOutStr, meta, stage.getRegion(), - presignedUrl); + presignedUrl, + queryId); } finally { if (uploadFromStream && inputStream != null) { inputStream.close(); @@ -2192,7 +2351,8 @@ public static void renewExpiredToken( throws SnowflakeSQLException { SFStatement statement = new SFStatement(session); JsonNode jsonNode = parseCommandInGS(statement, command); - Map stageCredentials = extractStageCreds(jsonNode); + String queryId = jsonNode.path("data").path("queryId").asText(); + Map stageCredentials = extractStageCreds(jsonNode, queryId); // renew client with the fresh token logger.debug("Renewing expired access token"); @@ -2209,7 +2369,8 @@ private static void pullFileFromRemoteStore( String command, int parallel, RemoteStoreFileEncryptionMaterial encMat, - String presignedUrl) + String presignedUrl, + String queryId) throws SQLException { remoteLocation remoteLocation = extractLocationAndPath(stage.getLocation()); @@ -2236,7 +2397,8 @@ private static void pullFileFromRemoteStore( remoteLocation.location, stageFilePath, stage.getRegion(), - presignedUrl); + presignedUrl, + queryId); } /** @@ -2339,7 +2501,8 @@ private void filterExistingFiles() throws SnowflakeSQLException { (Exception) ex.getCause(); // Cause of StorageProviderException is always an Exception } - storageClient.handleStorageException(ex, ++retryCount, "listObjects", session, command); + storageClient.handleStorageException( + ex, ++retryCount, "listObjects", session, command, queryID); continue; } @@ -2359,7 +2522,7 @@ private void filterExistingFiles() throws SnowflakeSQLException { ex.getCause(); // Cause of StorageProviderException is always an Exception } storageClient.handleStorageException( - ex, ++retryCount, "compareRemoteFiles", session, command); + ex, ++retryCount, "compareRemoteFiles", session, command, queryID); } } while (retryCount <= storageClient.getMaxRetries()); } else if (stageInfo.getStageType() == StageInfo.StageType.LOCAL_FS) { @@ -2418,7 +2581,7 @@ private void filterExistingFiles() throws SnowflakeSQLException { if (fileMetadataMap.get(mappedSrcFile).requireCompress) { logger.debug("Compressing stream for digest check"); - InputStreamWithMetadata res = compressStreamWithGZIP(localFileStream, session); + InputStreamWithMetadata res = compressStreamWithGZIP(localFileStream, session, queryID); fileBackedOutputStreams.add(res.fileBackedOutputStream); localFileStream = res.fileBackedOutputStream.asByteSource().openStream(); @@ -2429,6 +2592,7 @@ private void filterExistingFiles() throws SnowflakeSQLException { fileBackedOutputStreams.add(res.fileBackedOutputStream); } catch (IOException | NoSuchAlgorithmException ex) { throw new SnowflakeSQLLoggedException( + queryID, session, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), @@ -2459,6 +2623,7 @@ private void filterExistingFiles() throws SnowflakeSQLException { } catch (IOException | NoSuchAlgorithmException ex) { throw new SnowflakeSQLLoggedException( + queryID, session, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), @@ -2600,7 +2765,7 @@ private void compareAndSkipRemoteFiles( if (fileMetadataMap.get(mappedSrcFile).requireCompress) { logger.debug("Compressing stream for digest check"); - InputStreamWithMetadata res = compressStreamWithGZIP(fileStream, session); + InputStreamWithMetadata res = compressStreamWithGZIP(fileStream, session, queryID); fileStream = res.fileBackedOutputStream.asByteSource().openStream(); fileBackedOutputStreams.add(res.fileBackedOutputStream); @@ -2655,6 +2820,7 @@ private void compareAndSkipRemoteFiles( } } catch (IOException | NoSuchAlgorithmException ex) { throw new SnowflakeSQLLoggedException( + queryID, session, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), @@ -2713,6 +2879,7 @@ private void initFileMetadata() throws SnowflakeSQLException { logger.debug("File doesn't exist: {}", sourceFile); throw new SnowflakeSQLLoggedException( + queryID, session, ErrorCode.FILE_NOT_FOUND.getMessageCode(), SqlState.DATA_EXCEPTION, @@ -2721,6 +2888,7 @@ private void initFileMetadata() throws SnowflakeSQLException { logger.debug("Not a file, but directory: {}", sourceFile); throw new SnowflakeSQLLoggedException( + queryID, session, ErrorCode.FILE_IS_DIRECTORY.getMessageCode(), SqlState.DATA_EXCEPTION, @@ -2786,6 +2954,7 @@ private void processFileCompressionTypes() throws SnowflakeSQLException { FileCompressionType.lookupByMimeSubType(sourceCompression.toLowerCase()); if (!foundCompType.isPresent()) { throw new SnowflakeSQLLoggedException( + queryID, session, ErrorCode.COMPRESSION_TYPE_NOT_KNOWN.getMessageCode(), SqlState.FEATURE_NOT_SUPPORTED, @@ -2795,6 +2964,7 @@ private void processFileCompressionTypes() throws SnowflakeSQLException { if (!userSpecifiedSourceCompression.isSupported()) { throw new SnowflakeSQLLoggedException( + queryID, session, ErrorCode.COMPRESSION_TYPE_NOT_SUPPORTED.getMessageCode(), SqlState.FEATURE_NOT_SUPPORTED, @@ -2880,6 +3050,7 @@ private void processFileCompressionTypes() throws SnowflakeSQLException { } else { // error if not supported throw new SnowflakeSQLLoggedException( + queryID, session, ErrorCode.COMPRESSION_TYPE_NOT_SUPPORTED.getMessageCode(), SqlState.FEATURE_NOT_SUPPORTED, @@ -3073,15 +3244,30 @@ public CommandType getCommandType() { return commandType; } - /* - * Handles an InvalidKeyException which indicates that the JCE component - * is not installed properly + /** + * Handles an InvalidKeyException which indicates that the JCE component is not installed properly + * + * @deprecated use {@link #throwJCEMissingError(String, Exception, String)} * @param operation a string indicating the operation type, e.g. upload/download * @param ex The exception to be handled - * @throws throws the error as a SnowflakeSQLException + * @throws SnowflakeSQLException throws the error as a SnowflakeSQLException */ + @Deprecated public static void throwJCEMissingError(String operation, Exception ex) throws SnowflakeSQLException { + throwJCEMissingError(operation, ex, null); + } + + /** + * Handles an InvalidKeyException which indicates that the JCE component is not installed properly + * + * @param operation a string indicating the operation type, e.g. upload/download + * @param ex The exception to be handled + * @param queryId last query id if available + * @throws SnowflakeSQLException throws the error as a SnowflakeSQLException + */ + public static void throwJCEMissingError(String operation, Exception ex, String queryId) + throws SnowflakeSQLException { // Most likely cause: Unlimited strength policy files not installed String msg = "Strong encryption with Java JRE requires JCE " @@ -3101,23 +3287,46 @@ public static void throwJCEMissingError(String operation, Exception ex) logger.error(msg); } throw new SnowflakeSQLException( - ex, SqlState.SYSTEM_ERROR, ErrorCode.AWS_CLIENT_ERROR.getMessageCode(), operation, msg); + queryId, + ex, + SqlState.SYSTEM_ERROR, + ErrorCode.AWS_CLIENT_ERROR.getMessageCode(), + operation, + msg); } /** * For handling IOException: No space left on device when attempting to download a file to a * location where there is not enough space. We don't want to retry on this exception. * + * @deprecated use {@link #throwNoSpaceLeftError(SFSession, String, Exception, String)} * @param session the current session * @param operation the operation i.e. GET * @param ex the exception caught * @throws SnowflakeSQLLoggedException */ + @Deprecated public static void throwNoSpaceLeftError(SFSession session, String operation, Exception ex) throws SnowflakeSQLLoggedException { + throwNoSpaceLeftError(session, operation, ex, null); + } + + /** + * For handling IOException: No space left on device when attempting to download a file to a + * location where there is not enough space. We don't want to retry on this exception. + * + * @param session the current session + * @param operation the operation i.e. GET + * @param ex the exception caught + * @throws SnowflakeSQLLoggedException + */ + public static void throwNoSpaceLeftError( + SFSession session, String operation, Exception ex, String queryId) + throws SnowflakeSQLLoggedException { String exMessage = SnowflakeUtil.getRootCause(ex).getMessage(); if (exMessage != null && exMessage.equals(NO_SPACE_LEFT_ON_DEVICE_ERR)) { throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), diff --git a/src/main/java/net/snowflake/client/jdbc/SnowflakeSQLException.java b/src/main/java/net/snowflake/client/jdbc/SnowflakeSQLException.java index 6d5ca18ed..00e8a3b64 100644 --- a/src/main/java/net/snowflake/client/jdbc/SnowflakeSQLException.java +++ b/src/main/java/net/snowflake/client/jdbc/SnowflakeSQLException.java @@ -51,18 +51,24 @@ public SnowflakeSQLException(String queryId, String reason, String sqlState, int queryId); } - public SnowflakeSQLException(String reason, String SQLState) { - super(reason, SQLState); + public SnowflakeSQLException(String reason, String sqlState) { + super(reason, sqlState); // log user error from GS at fine level - logger.debug("Snowflake exception: {}, sqlState:{}", reason, SQLState); + logger.debug("Snowflake exception: {}, sqlState:{}", reason, sqlState); } + /** use {@link SnowflakeSQLException#SnowflakeSQLException(String, String, int)} */ + @Deprecated public SnowflakeSQLException(String sqlState, int vendorCode) { + this((String) null, sqlState, vendorCode); + } + + public SnowflakeSQLException(String queryId, String sqlState, int vendorCode) { super( errorResourceBundleManager.getLocalizedMessage(String.valueOf(vendorCode)), sqlState, vendorCode); - + this.queryId = queryId; logger.debug( "Snowflake exception: {}, sqlState:{}, vendorCode:{}", errorResourceBundleManager.getLocalizedMessage(String.valueOf(vendorCode)), @@ -70,12 +76,18 @@ public SnowflakeSQLException(String sqlState, int vendorCode) { vendorCode); } + /** use {@link SnowflakeSQLException#SnowflakeSQLException(String, String, int, Object...)} */ + @Deprecated public SnowflakeSQLException(String sqlState, int vendorCode, Object... params) { + this((String) null, sqlState, vendorCode, params); + } + + public SnowflakeSQLException(String queryId, String sqlState, int vendorCode, Object... params) { super( errorResourceBundleManager.getLocalizedMessage(String.valueOf(vendorCode), params), sqlState, vendorCode); - + this.queryId = queryId; logger.debug( "Snowflake exception: {}, sqlState:{}, vendorCode:{}", errorResourceBundleManager.getLocalizedMessage(String.valueOf(vendorCode), params), @@ -100,12 +112,23 @@ public SnowflakeSQLException(Throwable ex, ErrorCode errorCode, Object... params this(ex, errorCode.getSqlState(), errorCode.getMessageCode(), params); } + /** + * @deprecated use {@link SnowflakeSQLException#SnowflakeSQLException(String, Throwable, String, + * int, Object...)} + */ + @Deprecated public SnowflakeSQLException(Throwable ex, String sqlState, int vendorCode, Object... params) { + this(null, ex, sqlState, vendorCode, params); + } + + public SnowflakeSQLException( + String queryId, Throwable ex, String sqlState, int vendorCode, Object... params) { super( errorResourceBundleManager.getLocalizedMessage(String.valueOf(vendorCode), params), sqlState, vendorCode, ex); + this.queryId = queryId; logger.debug( "Snowflake exception: " @@ -121,6 +144,15 @@ public SnowflakeSQLException(ErrorCode errorCode, Object... params) { errorCode.getMessageCode()); } + public SnowflakeSQLException(String queryId, ErrorCode errorCode, Object... params) { + super( + errorResourceBundleManager.getLocalizedMessage( + String.valueOf(errorCode.getMessageCode()), params), + errorCode.getSqlState(), + errorCode.getMessageCode()); + this.queryId = queryId; + } + public SnowflakeSQLException( ErrorCode errorCode, int retryCount, boolean issocketTimeoutNoBackoff, long elapsedSeconds) { super( diff --git a/src/main/java/net/snowflake/client/jdbc/SnowflakeSQLLoggedException.java b/src/main/java/net/snowflake/client/jdbc/SnowflakeSQLLoggedException.java index b776be26c..d9d741a8c 100644 --- a/src/main/java/net/snowflake/client/jdbc/SnowflakeSQLLoggedException.java +++ b/src/main/java/net/snowflake/client/jdbc/SnowflakeSQLLoggedException.java @@ -232,6 +232,12 @@ public SnowflakeSQLLoggedException(SFBaseSession session, int vendorCode, String sendTelemetryData(null, SQLState, vendorCode, session, this); } + public SnowflakeSQLLoggedException( + String queryId, SFBaseSession session, int vendorCode, String SQLState) { + super(queryId, SQLState, vendorCode); + sendTelemetryData(queryId, SQLState, vendorCode, session, this); + } + public SnowflakeSQLLoggedException(SFBaseSession session, String SQLState, String reason) { super(reason, SQLState); sendTelemetryData(null, SQLState, -1, session, this); @@ -239,40 +245,45 @@ public SnowflakeSQLLoggedException(SFBaseSession session, String SQLState, Strin public SnowflakeSQLLoggedException( SFBaseSession session, int vendorCode, String SQLState, Object... params) { - super(SQLState, vendorCode, params); - String reason = - errorResourceBundleManager.getLocalizedMessage(String.valueOf(vendorCode), params); - sendTelemetryData(null, SQLState, vendorCode, session, this); + this(null, session, vendorCode, SQLState, params); + } + + public SnowflakeSQLLoggedException( + String queryId, SFBaseSession session, int vendorCode, String SQLState, Object... params) { + super(queryId, SQLState, vendorCode, params); + sendTelemetryData(queryId, SQLState, vendorCode, session, this); } public SnowflakeSQLLoggedException( SFBaseSession session, ErrorCode errorCode, Throwable ex, Object... params) { super(ex, errorCode, params); - // add telemetry sendTelemetryData(null, errorCode.getSqlState(), errorCode.getMessageCode(), session, this); } public SnowflakeSQLLoggedException( SFBaseSession session, String SQLState, int vendorCode, Throwable ex, Object... params) { super(ex, SQLState, vendorCode, params); - // add telemetry - String reason = - errorResourceBundleManager.getLocalizedMessage(String.valueOf(vendorCode), params); sendTelemetryData(null, SQLState, vendorCode, session, this); } + public SnowflakeSQLLoggedException( + String queryId, + SFBaseSession session, + String SQLState, + int vendorCode, + Throwable ex, + Object... params) { + super(queryId, ex, SQLState, vendorCode, params); + sendTelemetryData(queryId, SQLState, vendorCode, session, this); + } + public SnowflakeSQLLoggedException(SFBaseSession session, ErrorCode errorCode, Object... params) { super(errorCode, params); - // add telemetry - String reason = - errorResourceBundleManager.getLocalizedMessage( - String.valueOf(errorCode.getMessageCode()), params); sendTelemetryData(null, null, -1, session, this); } public SnowflakeSQLLoggedException(SFBaseSession session, SFException e) { super(e); - // add telemetry sendTelemetryData(null, null, -1, session, this); } diff --git a/src/main/java/net/snowflake/client/jdbc/cloud/storage/QueryIdHelper.java b/src/main/java/net/snowflake/client/jdbc/cloud/storage/QueryIdHelper.java new file mode 100644 index 000000000..34520cabd --- /dev/null +++ b/src/main/java/net/snowflake/client/jdbc/cloud/storage/QueryIdHelper.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2023 Snowflake Computing Inc. All rights reserved. + */ +package net.snowflake.client.jdbc.cloud.storage; + +import net.snowflake.common.core.RemoteStoreFileEncryptionMaterial; + +class QueryIdHelper { + static String queryIdFromEncMatOr(RemoteStoreFileEncryptionMaterial encMat, String queryId) { + return encMat != null && encMat.getQueryId() != null ? encMat.getQueryId() : queryId; + } +} diff --git a/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClient.java b/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClient.java index 7ff7f9e41..4622e7cb5 100644 --- a/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClient.java +++ b/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClient.java @@ -108,6 +108,7 @@ private void setupAzureClient( if (encryptionKeySize != 128 && encryptionKeySize != 192 && encryptionKeySize != 256) { throw new SnowflakeSQLLoggedException( + QueryIdHelper.queryIdFromEncMatOr(encMat, null), session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -272,6 +273,7 @@ public StorageObjectMetadata getObjectMetadata(String remoteStorageLocation, Str * @param stageFilePath stage file path * @param stageRegion region name where the stage persists * @param presignedUrl Unused in Azure + * @param queryId last query id * @throws SnowflakeSQLException download failure */ @Override @@ -284,7 +286,8 @@ public void download( String remoteStorageLocation, String stageFilePath, String stageRegion, - String presignedUrl) + String presignedUrl, + String queryId) throws SnowflakeSQLException { int retryCount = 0; do { @@ -305,7 +308,7 @@ public void download( // Get the user-defined BLOB metadata Map userDefinedMetadata = blob.getMetadata(); AbstractMap.SimpleEntry encryptionData = - parseEncryptionData(userDefinedMetadata.get(AZ_ENCRYPTIONDATAPROP)); + parseEncryptionData(userDefinedMetadata.get(AZ_ENCRYPTIONDATAPROP), queryId); String key = encryptionData.getKey(); String iv = encryptionData.getValue(); @@ -313,6 +316,7 @@ public void download( if (this.isEncrypting() && this.getEncryptionKeySize() <= 256) { if (key == null || iv == null) { throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -331,11 +335,12 @@ public void download( } catch (Exception ex) { logger.debug("Download unsuccessful {}", ex); - handleAzureException(ex, ++retryCount, "download", session, command, this); + handleAzureException(ex, ++retryCount, "download", session, command, this, queryId); } } while (retryCount <= getMaxRetries()); throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -352,6 +357,7 @@ public void download( * @param stageFilePath stage file path * @param stageRegion region name where the stage persists * @param presignedUrl Unused in Azure + * @param queryId last query id * @return input file stream * @throws SnowflakeSQLException when download failure */ @@ -363,7 +369,8 @@ public InputStream downloadToStream( String remoteStorageLocation, String stageFilePath, String stageRegion, - String presignedUrl) + String presignedUrl, + String queryId) throws SnowflakeSQLException { int retryCount = 0; @@ -378,7 +385,7 @@ public InputStream downloadToStream( Map userDefinedMetadata = blob.getMetadata(); AbstractMap.SimpleEntry encryptionData = - parseEncryptionData(userDefinedMetadata.get(AZ_ENCRYPTIONDATAPROP)); + parseEncryptionData(userDefinedMetadata.get(AZ_ENCRYPTIONDATAPROP), queryId); String key = encryptionData.getKey(); @@ -387,6 +394,7 @@ public InputStream downloadToStream( if (this.isEncrypting() && this.getEncryptionKeySize() <= 256) { if (key == null || iv == null) { throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -408,11 +416,12 @@ public InputStream downloadToStream( } catch (Exception ex) { logger.debug("Downloading unsuccessful {}", ex); - handleAzureException(ex, ++retryCount, "download", session, command, this); + handleAzureException(ex, ++retryCount, "download", session, command, this, queryId); } } while (retryCount < getMaxRetries()); throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -434,6 +443,7 @@ public InputStream downloadToStream( * @param meta object meta data * @param stageRegion region name where the stage persists * @param presignedUrl Unused in Azure + * @param queryId last query id * @throws SnowflakeSQLException if upload failed even after retry */ @Override @@ -449,7 +459,8 @@ public void upload( FileBackedOutputStream fileBackedOutputStream, StorageObjectMetadata meta, String stageRegion, - String presignedUrl) + String presignedUrl, + String queryId) throws SnowflakeSQLException { final List toClose = new ArrayList<>(); long originalContentLength = meta.getContentLength(); @@ -462,7 +473,8 @@ public void upload( meta, originalContentLength, fileBackedOutputStream, - toClose); + toClose, + queryId); if (!(meta instanceof CommonObjectMetadata)) { throw new IllegalArgumentException("Unexpected metadata object type"); @@ -497,10 +509,11 @@ public void upload( return; } catch (Exception ex) { - handleAzureException(ex, ++retryCount, "upload", session, command, this); + handleAzureException(ex, ++retryCount, "upload", session, command, this, queryId); if (uploadFromStream && fileBackedOutputStream == null) { throw new SnowflakeSQLException( + queryId, ex, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), @@ -516,7 +529,8 @@ public void upload( meta, originalContentLength, fileBackedOutputStream, - toClose); + toClose, + queryId); } } while (retryCount <= getMaxRetries()); @@ -524,6 +538,7 @@ public void upload( for (FileInputStream is : toClose) IOUtils.closeQuietly(is); throw new SnowflakeSQLException( + queryId, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), "Unexpected: upload unsuccessful without exception!"); @@ -538,13 +553,19 @@ public void upload( * exception was raised, for example "upload" * @param session the current SFSession object used by the client * @param command the command attempted at the time of the exception + * @param queryId last query id * @throws SnowflakeSQLException exceptions not handled */ @Override public void handleStorageException( - Exception ex, int retryCount, String operation, SFSession session, String command) + Exception ex, + int retryCount, + String operation, + SFSession session, + String command, + String queryId) throws SnowflakeSQLException { - handleAzureException(ex, retryCount, operation, session, command, this); + handleAzureException(ex, retryCount, operation, session, command, this, queryId); } private SFPair createUploadStream( @@ -554,7 +575,8 @@ private SFPair createUploadStream( StorageObjectMetadata meta, long originalContentLength, FileBackedOutputStream fileBackedOutputStream, - List toClose) + List toClose, + String queryId) throws SnowflakeSQLException { logger.debug( "createUploadStream({}, {}, {}, {}, {}, {})", @@ -586,6 +608,7 @@ private SFPair createUploadStream( } catch (Exception ex) { logger.error("Failed to encrypt input", ex); throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), @@ -609,6 +632,7 @@ private SFPair createUploadStream( } catch (FileNotFoundException ex) { logger.error("Failed to open input file", ex); throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), @@ -618,6 +642,7 @@ private SFPair createUploadStream( } catch (IOException ex) { logger.error("Failed to open input stream", ex); throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), @@ -648,20 +673,21 @@ private static void handleAzureException( String operation, SFSession session, String command, - SnowflakeAzureClient azClient) + SnowflakeAzureClient azClient, + String queryId) throws SnowflakeSQLException { // no need to retry if it is invalid key exception if (ex.getCause() instanceof InvalidKeyException) { // Most likely cause is that the unlimited strength policy files are not installed // Log the error and throw a message that explains the cause - SnowflakeFileTransferAgent.throwJCEMissingError(operation, ex); + SnowflakeFileTransferAgent.throwJCEMissingError(operation, ex, queryId); } // If there is no space left in the download location, java.io.IOException is thrown. // Don't retry. if (SnowflakeUtil.getRootCause(ex) instanceof IOException) { - SnowflakeFileTransferAgent.throwNoSpaceLeftError(session, operation, ex); + SnowflakeFileTransferAgent.throwNoSpaceLeftError(session, operation, ex, queryId); } if (ex instanceof StorageException) { @@ -675,6 +701,7 @@ private static void handleAzureException( } else { // If session is null we cannot renew the token so throw the ExpiredToken exception throw new SnowflakeSQLException( + queryId, se.getErrorCode(), CLOUD_STORAGE_CREDENTIALS_EXPIRED, "Azure credentials may have expired"); @@ -685,6 +712,7 @@ private static void handleAzureException( if (retryCount > azClient.getMaxRetries() || ((StorageException) ex).getHttpStatusCode() == 404) { throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.SYSTEM_ERROR, ErrorCode.AZURE_SERVICE_ERROR.getMessageCode(), @@ -728,6 +756,7 @@ private static void handleAzureException( || SnowflakeUtil.getRootCause(ex) instanceof SocketTimeoutException) { if (retryCount > azClient.getMaxRetries()) { throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), @@ -742,6 +771,7 @@ private static void handleAzureException( } } else { throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), @@ -822,7 +852,7 @@ private String buildEncryptionMetadataJSON(String iv64, String key64) { * Takes the json string in the encryptiondata metadata field of the encrypted * blob and parses out the key and iv. Returns the pair as key = key, iv = value. */ - private SimpleEntry parseEncryptionData(String jsonEncryptionData) + private SimpleEntry parseEncryptionData(String jsonEncryptionData, String queryId) throws SnowflakeSQLException { ObjectMapper mapper = ObjectMapperFactory.getObjectMapper(); JsonFactory factory = mapper.getFactory(); @@ -836,6 +866,7 @@ private SimpleEntry parseEncryptionData(String jsonEncryptionDat return new SimpleEntry(key, iv); } catch (Exception ex) { throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), diff --git a/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeGCSClient.java b/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeGCSClient.java index 74435a571..40e8582cc 100644 --- a/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeGCSClient.java +++ b/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeGCSClient.java @@ -199,6 +199,7 @@ public StorageObjectMetadata getObjectMetadata(String remoteStorageLocation, Str * @param stageFilePath stage file path * @param stageRegion region name where the stage persists * @param presignedUrl Credential to use for download + * @param queryId last query id * @throws SnowflakeSQLException download failure */ @Override @@ -211,7 +212,8 @@ public void download( String remoteStorageLocation, String stageFilePath, String stageRegion, - String presignedUrl) + String presignedUrl, + String queryId) throws SnowflakeSQLException { int retryCount = 0; String localFilePath = localLocation + localFileSep + destFileName; @@ -271,7 +273,7 @@ public void download( .getName() .equalsIgnoreCase(GCS_METADATA_PREFIX + GCS_ENCRYPTIONDATAPROP)) { AbstractMap.SimpleEntry encryptionData = - parseEncryptionData(header.getValue()); + parseEncryptionData(header.getValue(), queryId); key = encryptionData.getKey(); iv = encryptionData.getValue(); @@ -282,14 +284,14 @@ public void download( logger.debug("Download successful", false); } catch (IOException ex) { logger.debug("Download unsuccessful {}", ex); - handleStorageException(ex, ++retryCount, "download", session, command); + handleStorageException(ex, ++retryCount, "download", session, command, queryId); } } else { Exception ex = new HttpResponseException( response.getStatusLine().getStatusCode(), EntityUtils.toString(response.getEntity())); - handleStorageException(ex, ++retryCount, "download", session, command); + handleStorageException(ex, ++retryCount, "download", session, command, queryId); } } else { BlobId blobId = BlobId.of(remoteStorageLocation, stageFilePath); @@ -311,7 +313,7 @@ public void download( if (isEncrypting()) { if (userDefinedMetadata != null) { AbstractMap.SimpleEntry encryptionData = - parseEncryptionData(userDefinedMetadata.get(GCS_ENCRYPTIONDATAPROP)); + parseEncryptionData(userDefinedMetadata.get(GCS_ENCRYPTIONDATAPROP), queryId); key = encryptionData.getKey(); iv = encryptionData.getValue(); @@ -325,6 +327,7 @@ public void download( && this.getEncryptionKeySize() <= 256) { if (key == null || iv == null) { throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -337,6 +340,7 @@ public void download( } catch (Exception ex) { logger.error("Error decrypting file", ex); throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -346,11 +350,12 @@ public void download( return; } catch (Exception ex) { logger.debug("Download unsuccessful {}", ex); - handleStorageException(ex, ++retryCount, "download", session, command); + handleStorageException(ex, ++retryCount, "download", session, command, queryId); } } while (retryCount <= getMaxRetries()); throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -367,6 +372,7 @@ public void download( * @param stageFilePath stage file path * @param stageRegion region name where the stage persists * @param presignedUrl Signed credential for download + * @param queryId last query id * @return input file stream * @throws SnowflakeSQLException when download failure */ @@ -378,7 +384,8 @@ public InputStream downloadToStream( String remoteStorageLocation, String stageFilePath, String stageRegion, - String presignedUrl) + String presignedUrl, + String queryId) throws SnowflakeSQLException { int retryCount = 0; InputStream inputStream = null; @@ -429,7 +436,7 @@ public InputStream downloadToStream( .getName() .equalsIgnoreCase(GCS_METADATA_PREFIX + GCS_ENCRYPTIONDATAPROP)) { AbstractMap.SimpleEntry encryptionData = - parseEncryptionData(header.getValue()); + parseEncryptionData(header.getValue(), queryId); key = encryptionData.getKey(); iv = encryptionData.getValue(); @@ -440,14 +447,14 @@ public InputStream downloadToStream( logger.debug("Download successful", false); } catch (IOException ex) { logger.debug("Download unsuccessful {}", ex); - handleStorageException(ex, ++retryCount, "download", session, command); + handleStorageException(ex, ++retryCount, "download", session, command, queryId); } } else { Exception ex = new HttpResponseException( response.getStatusLine().getStatusCode(), EntityUtils.toString(response.getEntity())); - handleStorageException(ex, ++retryCount, "download", session, command); + handleStorageException(ex, ++retryCount, "download", session, command, queryId); } } else { BlobId blobId = BlobId.of(remoteStorageLocation, stageFilePath); @@ -463,7 +470,7 @@ public InputStream downloadToStream( // Get the user-defined BLOB metadata Map userDefinedMetadata = blob.getMetadata(); AbstractMap.SimpleEntry encryptionData = - parseEncryptionData(userDefinedMetadata.get(GCS_ENCRYPTIONDATAPROP)); + parseEncryptionData(userDefinedMetadata.get(GCS_ENCRYPTIONDATAPROP), queryId); key = encryptionData.getKey(); iv = encryptionData.getValue(); @@ -473,6 +480,7 @@ public InputStream downloadToStream( if (this.isEncrypting() && this.getEncryptionKeySize() <= 256) { if (key == null || iv == null) { throw new SnowflakeSQLException( + queryId, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), "File metadata incomplete"); @@ -487,6 +495,7 @@ public InputStream downloadToStream( } catch (Exception ex) { logger.error("Error decrypting file", ex); throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -495,11 +504,12 @@ public InputStream downloadToStream( } } catch (Exception ex) { logger.debug("Download unsuccessful {}", ex); - handleStorageException(ex, ++retryCount, "download", session, command); + handleStorageException(ex, ++retryCount, "download", session, command, queryId); } } while (retryCount <= getMaxRetries()); throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -521,6 +531,7 @@ public InputStream downloadToStream( * @param meta object meta data * @param stageRegion region name where the stage persists * @param presignedUrl presigned URL for upload. Used by GCP. + * @param queryId last query id * @throws SnowflakeSQLException if upload failed */ @Override @@ -536,7 +547,8 @@ public void uploadWithPresignedUrlWithoutConnection( FileBackedOutputStream fileBackedOutputStream, StorageObjectMetadata meta, String stageRegion, - String presignedUrl) + String presignedUrl, + String queryId) throws SnowflakeSQLException { final List toClose = new ArrayList<>(); long originalContentLength = meta.getContentLength(); @@ -549,7 +561,8 @@ public void uploadWithPresignedUrlWithoutConnection( meta, originalContentLength, fileBackedOutputStream, - toClose); + toClose, + queryId); if (!(meta instanceof CommonObjectMetadata)) { throw new IllegalArgumentException("Unexpected metadata object type"); @@ -575,7 +588,8 @@ public void uploadWithPresignedUrlWithoutConnection( meta.getUserMetadata(), uploadStreamInfo.left, presignedUrl, - ocspModeAndProxyKey); + ocspModeAndProxyKey, + queryId); logger.debug("Upload successfully with presigned url"); } @@ -600,6 +614,7 @@ public void uploadWithPresignedUrlWithoutConnection( * @param meta object meta data * @param stageRegion region name where the stage persists * @param presignedUrl Credential used for upload of a file + * @param queryId last query id * @throws SnowflakeSQLException if upload failed even after retry */ @Override @@ -615,7 +630,8 @@ public void upload( FileBackedOutputStream fileBackedOutputStream, StorageObjectMetadata meta, String stageRegion, - String presignedUrl) + String presignedUrl, + String queryId) throws SnowflakeSQLException { final List toClose = new ArrayList<>(); long originalContentLength = meta.getContentLength(); @@ -628,7 +644,8 @@ public void upload( meta, originalContentLength, fileBackedOutputStream, - toClose); + toClose, + queryId); if (!(meta instanceof CommonObjectMetadata)) { throw new IllegalArgumentException("Unexpected metadata object type"); @@ -644,7 +661,8 @@ public void upload( meta.getUserMetadata(), uploadStreamInfo.left, presignedUrl, - session.getHttpClientKey()); + session.getHttpClientKey(), + queryId); logger.debug("Upload successful", false); // close any open streams in the "toClose" list and return @@ -673,10 +691,11 @@ public void upload( return; } catch (Exception ex) { - handleStorageException(ex, ++retryCount, "upload", session, command); + handleStorageException(ex, ++retryCount, "upload", session, command, queryId); if (uploadFromStream && fileBackedOutputStream == null) { throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), @@ -693,7 +712,8 @@ public void upload( meta, originalContentLength, fileBackedOutputStream, - toClose); + toClose, + queryId); } } while (retryCount <= getMaxRetries()); @@ -701,6 +721,7 @@ public void upload( for (FileInputStream is : toClose) IOUtils.closeQuietly(is); throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -752,7 +773,8 @@ private void uploadWithPresignedUrl( Map metadata, InputStream content, String presignedUrl, - HttpClientSettingsKey ocspAndProxyKey) + HttpClientSettingsKey ocspAndProxyKey, + String queryId) throws SnowflakeSQLException { try { URIBuilder uriBuilder = new URIBuilder(presignedUrl); @@ -806,16 +828,18 @@ private void uploadWithPresignedUrl( new HttpResponseException( response.getStatusLine().getStatusCode(), EntityUtils.toString(response.getEntity())); - handleStorageException(ex, 0, "upload", session, null); + handleStorageException(ex, 0, "upload", session, null, queryId); } } catch (URISyntaxException e) { throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "Unexpected: upload presigned URL invalid"); } catch (Exception e) { throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -845,7 +869,8 @@ private SFPair createUploadStream( StorageObjectMetadata meta, long originalContentLength, FileBackedOutputStream fileBackedOutputStream, - List toClose) + List toClose, + String queryId) throws SnowflakeSQLException { logger.debug( "createUploadStream({}, {}, {}, {}, {}, {})", @@ -877,6 +902,7 @@ private SFPair createUploadStream( } catch (Exception ex) { logger.error("Failed to encrypt input", ex); throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), @@ -900,6 +926,7 @@ private SFPair createUploadStream( } catch (FileNotFoundException ex) { logger.error("Failed to open input file", ex); throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), @@ -909,6 +936,7 @@ private SFPair createUploadStream( } catch (IOException ex) { logger.error("Failed to open input stream", ex); throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), @@ -922,19 +950,24 @@ private SFPair createUploadStream( @Override public void handleStorageException( - Exception ex, int retryCount, String operation, SFSession session, String command) + Exception ex, + int retryCount, + String operation, + SFSession session, + String command, + String queryId) throws SnowflakeSQLException { // no need to retry if it is invalid key exception if (ex.getCause() instanceof InvalidKeyException) { // Most likely cause is that the unlimited strength policy files are not installed // Log the error and throw a message that explains the cause - SnowflakeFileTransferAgent.throwJCEMissingError(operation, ex); + SnowflakeFileTransferAgent.throwJCEMissingError(operation, ex, queryId); } // If there is no space left in the download location, java.io.IOException is thrown. // Don't retry. if (SnowflakeUtil.getRootCause(ex) instanceof IOException) { - SnowflakeFileTransferAgent.throwNoSpaceLeftError(session, operation, ex); + SnowflakeFileTransferAgent.throwNoSpaceLeftError(session, operation, ex, queryId); } if (ex instanceof StorageException) { @@ -946,6 +979,7 @@ public void handleStorageException( // If we have exceeded the max number of retries, propagate the error if (retryCount > getMaxRetries()) { throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.SYSTEM_ERROR, ErrorCode.GCP_SERVICE_ERROR.getMessageCode(), @@ -984,7 +1018,10 @@ public void handleStorageException( SnowflakeFileTransferAgent.renewExpiredToken(session, command, this); } else { throw new SnowflakeSQLException( - se.getMessage(), CLOUD_STORAGE_CREDENTIALS_EXPIRED, "GCS credentials have expired"); + queryId, + se.getMessage(), + CLOUD_STORAGE_CREDENTIALS_EXPIRED, + "GCS credentials have expired"); } } } @@ -992,6 +1029,7 @@ public void handleStorageException( || SnowflakeUtil.getRootCause(ex) instanceof SocketTimeoutException) { if (retryCount > getMaxRetries()) { throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), @@ -1006,6 +1044,7 @@ public void handleStorageException( } } else { throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), @@ -1059,8 +1098,8 @@ private String buildEncryptionMetadataJSON(String iv64, String key64) { * Takes the json string in the encryptiondata metadata field of the encrypted * blob and parses out the key and iv. Returns the pair as key = key, iv = value. */ - private AbstractMap.SimpleEntry parseEncryptionData(String jsonEncryptionData) - throws SnowflakeSQLException { + private AbstractMap.SimpleEntry parseEncryptionData( + String jsonEncryptionData, String queryId) throws SnowflakeSQLException { ObjectMapper mapper = ObjectMapperFactory.getObjectMapper(); JsonFactory factory = mapper.getFactory(); try { @@ -1073,6 +1112,7 @@ private AbstractMap.SimpleEntry parseEncryptionData(String jsonE return new AbstractMap.SimpleEntry<>(key, iv); } catch (Exception ex) { throw new SnowflakeSQLException( + queryId, ex, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), @@ -1140,6 +1180,7 @@ private void setupGCSClient( if (encryptionKeySize != 128 && encryptionKeySize != 192 && encryptionKeySize != 256) { throw new SnowflakeSQLException( + QueryIdHelper.queryIdFromEncMatOr(encMat, null), SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), "unsupported key size", diff --git a/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeS3Client.java b/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeS3Client.java index 6aed5d317..86c571ee2 100644 --- a/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeS3Client.java +++ b/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeS3Client.java @@ -174,6 +174,7 @@ private void setupSnowflakeS3Client( .withClientConfiguration(clientConfig); } else { throw new SnowflakeSQLLoggedException( + QueryIdHelper.queryIdFromEncMatOr(encMat, null), session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -294,6 +295,7 @@ public StorageObjectMetadata getObjectMetadata(String remoteStorageLocation, Str * @param stageFilePath stage file path * @param stageRegion region name where the stage persists * @param presignedUrl Not used in S3 + * @param queryId last query id * @throws SnowflakeSQLException if download failed without an exception * @throws SnowflakeSQLException if failed to decrypt downloaded file * @throws SnowflakeSQLException if file metadata is incomplete @@ -308,7 +310,8 @@ public void download( String remoteStorageLocation, String stageFilePath, String stageRegion, - String presignedUrl) + String presignedUrl, + String queryId) throws SnowflakeSQLException { TransferManager tx = null; int retryCount = 0; @@ -347,6 +350,7 @@ public ExecutorService newExecutor() { if (this.isEncrypting() && this.getEncryptionKeySize() < 256) { if (key == null || iv == null) { throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -365,7 +369,7 @@ public ExecutorService newExecutor() { return; } catch (Exception ex) { - handleS3Exception(ex, ++retryCount, "download", session, command, this); + handleS3Exception(ex, ++retryCount, "download", session, command, this, queryId); } finally { if (tx != null) { @@ -375,6 +379,7 @@ public ExecutorService newExecutor() { } while (retryCount <= getMaxRetries()); throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -391,6 +396,7 @@ public ExecutorService newExecutor() { * @param stageFilePath stage file path * @param stageRegion region name where the stage persists * @param presignedUrl Not used in S3 + * @param queryId last query id * @return input file stream * @throws SnowflakeSQLException when download failure */ @@ -402,7 +408,8 @@ public InputStream downloadToStream( String remoteStorageLocation, String stageFilePath, String stageRegion, - String presignedUrl) + String presignedUrl, + String queryId) throws SnowflakeSQLException { int retryCount = 0; do { @@ -421,6 +428,7 @@ public InputStream downloadToStream( if (this.isEncrypting() && this.getEncryptionKeySize() < 256) { if (key == null || iv == null) { throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -439,11 +447,12 @@ public InputStream downloadToStream( return stream; } } catch (Exception ex) { - handleS3Exception(ex, ++retryCount, "download", session, command, this); + handleS3Exception(ex, ++retryCount, "download", session, command, this, queryId); } } while (retryCount <= getMaxRetries()); throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -465,6 +474,7 @@ public InputStream downloadToStream( * @param meta object meta data * @param stageRegion region name where the stage persists * @param presignedUrl Not used in S3 + * @param queryId last query id * @throws SnowflakeSQLException if upload failed even after retry */ @Override @@ -480,7 +490,8 @@ public void upload( FileBackedOutputStream fileBackedOutputStream, StorageObjectMetadata meta, String stageRegion, - String presignedUrl) + String presignedUrl, + String queryId) throws SnowflakeSQLException { final long originalContentLength = meta.getContentLength(); final List toClose = new ArrayList<>(); @@ -492,7 +503,8 @@ public void upload( fileBackedOutputStream, ((S3ObjectMetadata) meta).getS3ObjectMetadata(), originalContentLength, - toClose); + toClose, + queryId); ObjectMetadata s3Meta; if (meta instanceof S3ObjectMetadata) { @@ -548,9 +560,10 @@ public ExecutorService newExecutor() { return; } catch (Exception ex) { - handleS3Exception(ex, ++retryCount, "upload", session, command, this); + handleS3Exception(ex, ++retryCount, "upload", session, command, this, queryId); if (uploadFromStream && fileBackedOutputStream == null) { throw new SnowflakeSQLException( + queryId, ex, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), @@ -566,7 +579,8 @@ public ExecutorService newExecutor() { fileBackedOutputStream, s3Meta, originalContentLength, - toClose); + toClose, + queryId); } finally { if (tx != null) { tx.shutdownNow(false); @@ -577,6 +591,7 @@ public ExecutorService newExecutor() { for (FileInputStream is : toClose) IOUtils.closeQuietly(is); throw new SnowflakeSQLLoggedException( + queryId, session, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -590,7 +605,8 @@ private SFPair createUploadStream( FileBackedOutputStream fileBackedOutputStream, ObjectMetadata meta, long originalContentLength, - List toClose) + List toClose, + String queryId) throws SnowflakeSQLException { logger.debug( "createUploadStream({}, {}, {}, {}, {}, {}, {}) " + "keySize={}", @@ -623,6 +639,7 @@ private SFPair createUploadStream( } catch (Exception ex) { logger.error("Failed to encrypt input", ex); throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), @@ -649,6 +666,7 @@ private SFPair createUploadStream( } catch (FileNotFoundException ex) { logger.error("Failed to open input file", ex); throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), @@ -658,6 +676,7 @@ private SFPair createUploadStream( } catch (IOException ex) { logger.error("Failed to open input stream", ex); throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), @@ -671,9 +690,14 @@ private SFPair createUploadStream( @Override public void handleStorageException( - Exception ex, int retryCount, String operation, SFSession session, String command) + Exception ex, + int retryCount, + String operation, + SFSession session, + String command, + String queryId) throws SnowflakeSQLException { - handleS3Exception(ex, retryCount, operation, session, command, this); + handleS3Exception(ex, retryCount, operation, session, command, this, queryId); } private static void handleS3Exception( @@ -682,19 +706,20 @@ private static void handleS3Exception( String operation, SFSession session, String command, - SnowflakeS3Client s3Client) + SnowflakeS3Client s3Client, + String queryId) throws SnowflakeSQLException { // no need to retry if it is invalid key exception if (ex.getCause() instanceof InvalidKeyException) { // Most likely cause is that the unlimited strength policy files are not installed // Log the error and throw a message that explains the cause - SnowflakeFileTransferAgent.throwJCEMissingError(operation, ex); + SnowflakeFileTransferAgent.throwJCEMissingError(operation, ex, queryId); } // If there is no space left in the download location, java.io.IOException is thrown. // Don't retry. if (SnowflakeUtil.getRootCause(ex) instanceof IOException) { - SnowflakeFileTransferAgent.throwNoSpaceLeftError(session, operation, ex); + SnowflakeFileTransferAgent.throwNoSpaceLeftError(session, operation, ex, queryId); } // Don't retry if max retries has been reached or the error code is 404/400 @@ -718,6 +743,7 @@ private static void handleS3Exception( SnowflakeFileTransferAgent.renewExpiredToken(session, command, s3Client); } else { throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.SYSTEM_ERROR, ErrorCode.S3_OPERATION_ERROR.getMessageCode(), @@ -732,6 +758,7 @@ private static void handleS3Exception( } else { throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.SYSTEM_ERROR, ErrorCode.AWS_CLIENT_ERROR.getMessageCode(), @@ -772,6 +799,7 @@ private static void handleS3Exception( SnowflakeFileTransferAgent.renewExpiredToken(session, command, s3Client); } else { throw new SnowflakeSQLException( + queryId, s3ex.getErrorCode(), CLOUD_STORAGE_CREDENTIALS_EXPIRED, "S3 credentials have expired"); @@ -784,6 +812,7 @@ private static void handleS3Exception( || SnowflakeUtil.getRootCause(ex) instanceof SocketTimeoutException) { if (retryCount > s3Client.getMaxRetries()) { throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), @@ -798,6 +827,7 @@ private static void handleS3Exception( } } else { throw new SnowflakeSQLLoggedException( + queryId, session, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), diff --git a/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeStorageClient.java b/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeStorageClient.java index 90eea4cd0..1bd4127f6 100644 --- a/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeStorageClient.java +++ b/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeStorageClient.java @@ -90,6 +90,8 @@ StorageObjectMetadata getObjectMetadata(String remoteStorageLocation, String pre /** * Download a file from remote storage. * + * @deprecated use {@link #download(SFSession, String, String, String, int, String, String, + * String, String, String)} * @param connection connection object * @param command command to download file * @param localLocation local file path @@ -101,7 +103,8 @@ StorageObjectMetadata getObjectMetadata(String remoteStorageLocation, String pre * @param presignedUrl presigned URL for download. Used by GCP. * @throws SnowflakeSQLException download failure */ - void download( + @Deprecated + default void download( SFSession connection, String command, String localLocation, @@ -111,11 +114,53 @@ void download( String stageFilePath, String stageRegion, String presignedUrl) + throws SnowflakeSQLException { + download( + connection, + command, + localLocation, + destFileName, + parallelism, + remoteStorageLocation, + stageFilePath, + stageRegion, + presignedUrl, + null); + } + + /** + * Download a file from remote storage. + * + * @param connection connection object + * @param command command to download file + * @param localLocation local file path + * @param destFileName destination file name + * @param parallelism number of threads for parallel downloading + * @param remoteStorageLocation remote storage location, i.e. bucket for S3 + * @param stageFilePath stage file path + * @param stageRegion region name where the stage persists + * @param presignedUrl presigned URL for download. Used by GCP. + * @param queryId last query id + * @throws SnowflakeSQLException download failure + */ + void download( + SFSession connection, + String command, + String localLocation, + String destFileName, + int parallelism, + String remoteStorageLocation, + String stageFilePath, + String stageRegion, + String presignedUrl, + String queryId) throws SnowflakeSQLException; /** * Download a file from remote storage * + * @deprecated use {@link #download(SFSession, String, String, String, int, String, String, + * String, String, String)} * @param connection connection object * @param command command to download file * @param parallelism number of threads for parallel downloading @@ -126,7 +171,8 @@ void download( * @return input file stream * @throws SnowflakeSQLException when download failure */ - InputStream downloadToStream( + @Deprecated + default InputStream downloadToStream( SFSession connection, String command, int parallelism, @@ -134,11 +180,48 @@ InputStream downloadToStream( String stageFilePath, String stageRegion, String presignedUrl) + throws SnowflakeSQLException { + return downloadToStream( + connection, + command, + parallelism, + remoteStorageLocation, + stageFilePath, + stageRegion, + presignedUrl, + null); + } + + /** + * Download a file from remote storage + * + * @param connection connection object + * @param command command to download file + * @param parallelism number of threads for parallel downloading + * @param remoteStorageLocation remote storage location, i.e. bucket for s3 + * @param stageFilePath stage file path + * @param stageRegion region name where the stage persists + * @param presignedUrl presigned URL for download. Used by GCP. + * @param queryId last query id + * @return input file stream + * @throws SnowflakeSQLException when download failure + */ + InputStream downloadToStream( + SFSession connection, + String command, + int parallelism, + String remoteStorageLocation, + String stageFilePath, + String stageRegion, + String presignedUrl, + String queryId) throws SnowflakeSQLException; /** * Upload a file (-stream) to remote storage * + * @deprecated user {@link #upload(SFSession, String, int, boolean, String, File, String, + * InputStream, FileBackedOutputStream, StorageObjectMetadata, String, String, String)} * @param connection connection object * @param command upload command * @param parallelism number of threads do parallel uploading @@ -153,7 +236,8 @@ InputStream downloadToStream( * @param presignedUrl presigned URL for upload. Used by GCP. * @throws SnowflakeSQLException if upload failed even after retry */ - void upload( + @Deprecated + default void upload( SFSession connection, String command, int parallelism, @@ -166,6 +250,55 @@ void upload( StorageObjectMetadata meta, String stageRegion, String presignedUrl) + throws SnowflakeSQLException { + upload( + connection, + command, + parallelism, + uploadFromStream, + remoteStorageLocation, + srcFile, + destFileName, + inputStream, + fileBackedOutputStream, + meta, + stageRegion, + presignedUrl, + null); + } + + /** + * Upload a file (-stream) to remote storage + * + * @param connection connection object + * @param command upload command + * @param parallelism number of threads do parallel uploading + * @param uploadFromStream true if upload source is stream + * @param remoteStorageLocation s3 bucket name + * @param srcFile source file if not uploading from a stream + * @param destFileName file name on remote storage after upload + * @param inputStream stream used for uploading if fileBackedOutputStream is null + * @param fileBackedOutputStream stream used for uploading if not null + * @param meta object meta data + * @param stageRegion region name where the stage persists + * @param presignedUrl presigned URL for upload. Used by GCP. + * @param queryId last query id + * @throws SnowflakeSQLException if upload failed even after retry + */ + void upload( + SFSession connection, + String command, + int parallelism, + boolean uploadFromStream, + String remoteStorageLocation, + File srcFile, + String destFileName, + InputStream inputStream, + FileBackedOutputStream fileBackedOutputStream, + StorageObjectMetadata meta, + String stageRegion, + String presignedUrl, + String queryId) throws SnowflakeSQLException; /** @@ -173,6 +306,10 @@ void upload( * *

NOTE: This function is only supported when pre-signed URL is used. * + * @deprecated use {@link #uploadWithPresignedUrlWithoutConnection(int, HttpClientSettingsKey, + * int, boolean, String, File, String, InputStream, FileBackedOutputStream, + * StorageObjectMetadata, String, String, String)} This method was left to keep backward + * compatibility * @param networkTimeoutInMilli Network timeout for the upload * @param ocspModeAndProxyKey OCSP mode and proxy settings for the upload. * @param parallelism number of threads do parallel uploading @@ -187,6 +324,7 @@ void upload( * @param presignedUrl presigned URL for upload. Used by GCP. * @throws SnowflakeSQLException if upload failed even after retry */ + @Deprecated default void uploadWithPresignedUrlWithoutConnection( int networkTimeoutInMilli, HttpClientSettingsKey ocspModeAndProxyKey, @@ -201,8 +339,60 @@ default void uploadWithPresignedUrlWithoutConnection( String stageRegion, String presignedUrl) throws SnowflakeSQLException { + uploadWithPresignedUrlWithoutConnection( + networkTimeoutInMilli, + ocspModeAndProxyKey, + parallelism, + uploadFromStream, + remoteStorageLocation, + srcFile, + destFileName, + inputStream, + fileBackedOutputStream, + meta, + stageRegion, + presignedUrl, + null); + } + + /** + * Upload a file (-stream) to remote storage with Pre-signed URL without JDBC connection. + * + *

NOTE: This function is only supported when pre-signed URL is used. + * + * @param networkTimeoutInMilli Network timeout for the upload + * @param ocspModeAndProxyKey OCSP mode and proxy settings for the upload. + * @param parallelism number of threads do parallel uploading + * @param uploadFromStream true if upload source is stream + * @param remoteStorageLocation s3 bucket name + * @param srcFile source file if not uploading from a stream + * @param destFileName file name on remote storage after upload + * @param inputStream stream used for uploading if fileBackedOutputStream is null + * @param fileBackedOutputStream stream used for uploading if not null + * @param meta object meta data + * @param stageRegion region name where the stage persists + * @param presignedUrl presigned URL for upload. Used by GCP. + * @param queryId last query id + * @throws SnowflakeSQLException if upload failed even after retry + */ + default void uploadWithPresignedUrlWithoutConnection( + int networkTimeoutInMilli, + HttpClientSettingsKey ocspModeAndProxyKey, + int parallelism, + boolean uploadFromStream, + String remoteStorageLocation, + File srcFile, + String destFileName, + InputStream inputStream, + FileBackedOutputStream fileBackedOutputStream, + StorageObjectMetadata meta, + String stageRegion, + String presignedUrl, + String queryId) + throws SnowflakeSQLException { if (!requirePresignedUrl()) { throw new SnowflakeSQLLoggedException( + queryId, null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, @@ -214,6 +404,8 @@ default void uploadWithPresignedUrlWithoutConnection( /** * Handles exceptions thrown by the remote storage provider * + * @deprecated use {@link #handleStorageException(Exception, int, String, SFSession, String, + * String)} * @param ex the exception to handle * @param retryCount current number of retries, incremented by the caller before each call * @param operation string that indicates the function/operation that was taking place, when the @@ -223,8 +415,33 @@ default void uploadWithPresignedUrlWithoutConnection( * @throws SnowflakeSQLException exceptions that were not handled, or retried past what the retry * policy allows, are propagated */ - void handleStorageException( + @Deprecated + default void handleStorageException( Exception ex, int retryCount, String operation, SFSession connection, String command) + throws SnowflakeSQLException { + handleStorageException(ex, retryCount, operation, connection, command, null); + } + + /** + * Handles exceptions thrown by the remote storage provider + * + * @param ex the exception to handle + * @param retryCount current number of retries, incremented by the caller before each call + * @param operation string that indicates the function/operation that was taking place, when the + * exception was raised, for example "upload" + * @param connection the current SFSession object used by the client + * @param command the command attempted at the time of the exception + * @param queryId last query id + * @throws SnowflakeSQLException exceptions that were not handled, or retried past what the retry + * policy allows, are propagated + */ + void handleStorageException( + Exception ex, + int retryCount, + String operation, + SFSession connection, + String command, + String queryId) throws SnowflakeSQLException; /** diff --git a/src/test/java/net/snowflake/client/jdbc/ConnectionLatestIT.java b/src/test/java/net/snowflake/client/jdbc/ConnectionLatestIT.java index 5a983fc18..ae896b477 100644 --- a/src/test/java/net/snowflake/client/jdbc/ConnectionLatestIT.java +++ b/src/test/java/net/snowflake/client/jdbc/ConnectionLatestIT.java @@ -194,16 +194,18 @@ public void putGetStatementsHaveQueryIDEvenWhenFail() throws Throwable { try { statement.executeQuery("PUT file://" + sourceFilePath + " @not_existing_state"); fail("PUT statement should fail"); - } catch (Exception __) { + } catch (SnowflakeSQLException e) { TestUtil.assertValidQueryId(snowflakeStatement.getQueryID()); + assertEquals(snowflakeStatement.getQueryID(), e.getQueryId()); } String putQueryId = snowflakeStatement.getQueryID(); try { statement.executeQuery( "GET @not_existing_state 'file://" + destFolderCanonicalPath + "' parallel=8"); fail("GET statement should fail"); - } catch (Exception __) { + } catch (SnowflakeSQLException e) { TestUtil.assertValidQueryId(snowflakeStatement.getQueryID()); + assertEquals(snowflakeStatement.getQueryID(), e.getQueryId()); } String getQueryId = snowflakeStatement.getQueryID(); assertNotEquals("put and get query id should be different", putQueryId, getQueryId); @@ -213,8 +215,9 @@ public void putGetStatementsHaveQueryIDEvenWhenFail() throws Throwable { try { statement.executeQuery("PUT file://not_existing_file @" + stageName); fail("PUT statement should fail"); - } catch (Exception __) { - assertNull(snowflakeStatement.getQueryID()); + } catch (SnowflakeSQLException e) { + TestUtil.assertValidQueryId(snowflakeStatement.getQueryID()); + assertEquals(snowflakeStatement.getQueryID(), e.getQueryId()); } } } diff --git a/src/test/java/net/snowflake/client/jdbc/FileUploaderExpandFileNamesTest.java b/src/test/java/net/snowflake/client/jdbc/FileUploaderExpandFileNamesTest.java index fefd55e13..9f329b01c 100644 --- a/src/test/java/net/snowflake/client/jdbc/FileUploaderExpandFileNamesTest.java +++ b/src/test/java/net/snowflake/client/jdbc/FileUploaderExpandFileNamesTest.java @@ -36,7 +36,7 @@ public void testProcessFileNames() throws Exception { folderName + "/TestFileE~" }; - Set files = SnowflakeFileTransferAgent.expandFileNames(locations); + Set files = SnowflakeFileTransferAgent.expandFileNames(locations, null); assertTrue(files.contains(folderName + "/TestFileA")); assertTrue(files.contains(folderName + "/TestFileB")); @@ -52,7 +52,7 @@ public void testProcessFileNamesException() { String[] locations = {"/Tes*Fil*A", "/TestFil?B", "~/TestFileC", "TestFileD"}; try { - SnowflakeFileTransferAgent.expandFileNames(locations); + SnowflakeFileTransferAgent.expandFileNames(locations, null); } catch (SnowflakeSQLException err) { Assert.assertEquals(200007, err.getErrorCode()); Assert.assertEquals("22000", err.getSQLState()); diff --git a/src/test/java/net/snowflake/client/jdbc/SnowflakeAzureClientHandleExceptionLatestIT.java b/src/test/java/net/snowflake/client/jdbc/SnowflakeAzureClientHandleExceptionLatestIT.java index 9d59a39f9..d562cff34 100644 --- a/src/test/java/net/snowflake/client/jdbc/SnowflakeAzureClientHandleExceptionLatestIT.java +++ b/src/test/java/net/snowflake/client/jdbc/SnowflakeAzureClientHandleExceptionLatestIT.java @@ -65,7 +65,8 @@ public void error403RenewExpired() throws SQLException, InterruptedException { 0, "upload", sfSession, - command); + command, + null); Mockito.verify(spyingClient, Mockito.times(2)).renew(Mockito.anyMap()); // Unauthenticated, backoff with interrupt, renew is called @@ -86,7 +87,8 @@ public void run() { maxRetry, "upload", sfSession, - command); + command, + null); } catch (SnowflakeSQLException e) { exceptionContainer[0] = e; } @@ -108,7 +110,8 @@ public void error403OverMaxRetryThrow() throws SQLException { overMaxRetry, "upload", sfSession, - command); + command, + null); } @Test(expected = SnowflakeSQLException.class) @@ -120,14 +123,15 @@ public void error403NullSession() throws SQLException { 0, "upload", null, - command); + command, + null); } @Test(expected = SnowflakeSQLException.class) @ConditionalIgnoreRule.ConditionalIgnore(condition = RunningOnGithubAction.class) public void errorInvalidKey() throws SQLException { spyingClient.handleStorageException( - new Exception(new InvalidKeyException()), 0, "upload", sfSession, command); + new Exception(new InvalidKeyException()), 0, "upload", sfSession, command, null); } @Test(expected = SnowflakeSQLException.class) @@ -136,13 +140,13 @@ public void errorInterruptedException() throws SQLException { // Can still retry, no error thrown try { spyingClient.handleStorageException( - new InterruptedException(), 0, "upload", sfSession, command); + new InterruptedException(), 0, "upload", sfSession, command, null); } catch (Exception e) { Assert.fail("Should not have exception here"); } Mockito.verify(spyingClient, Mockito.never()).renew(Mockito.anyMap()); spyingClient.handleStorageException( - new InterruptedException(), 26, "upload", sfSession, command); + new InterruptedException(), 26, "upload", sfSession, command, null); } @Test(expected = SnowflakeSQLException.class) @@ -151,19 +155,19 @@ public void errorSocketTimeoutException() throws SQLException { // Can still retry, no error thrown try { spyingClient.handleStorageException( - new SocketTimeoutException(), 0, "upload", sfSession, command); + new SocketTimeoutException(), 0, "upload", sfSession, command, null); } catch (Exception e) { Assert.fail("Should not have exception here"); } Mockito.verify(spyingClient, Mockito.never()).renew(Mockito.anyMap()); spyingClient.handleStorageException( - new SocketTimeoutException(), 26, "upload", sfSession, command); + new SocketTimeoutException(), 26, "upload", sfSession, command, null); } @Test(expected = SnowflakeSQLException.class) @ConditionalIgnoreRule.ConditionalIgnore(condition = RunningOnGithubAction.class) public void errorUnknownException() throws SQLException { - spyingClient.handleStorageException(new Exception(), 0, "upload", sfSession, command); + spyingClient.handleStorageException(new Exception(), 0, "upload", sfSession, command, null); } @Test(expected = SnowflakeSQLException.class) @@ -181,7 +185,8 @@ public void errorNoSpaceLeftOnDevice() throws SQLException, IOException { 0, "download", null, - getCommand); + getCommand, + null); } @After diff --git a/src/test/java/net/snowflake/client/jdbc/SnowflakeDriverLatestIT.java b/src/test/java/net/snowflake/client/jdbc/SnowflakeDriverLatestIT.java index da29b287e..93ec59580 100644 --- a/src/test/java/net/snowflake/client/jdbc/SnowflakeDriverLatestIT.java +++ b/src/test/java/net/snowflake/client/jdbc/SnowflakeDriverLatestIT.java @@ -1529,7 +1529,8 @@ public void testNoSpaceLeftOnDeviceException() throws SQLException { client.getMaxRetries(), "download", null, - command); + command, + null); } finally { if (connection != null) { connection.createStatement().execute("DROP STAGE if exists testPutGet_stage"); diff --git a/src/test/java/net/snowflake/client/jdbc/SnowflakeGcsClientHandleExceptionLatestIT.java b/src/test/java/net/snowflake/client/jdbc/SnowflakeGcsClientHandleExceptionLatestIT.java index 92763f083..effa3c9c1 100644 --- a/src/test/java/net/snowflake/client/jdbc/SnowflakeGcsClientHandleExceptionLatestIT.java +++ b/src/test/java/net/snowflake/client/jdbc/SnowflakeGcsClientHandleExceptionLatestIT.java @@ -59,12 +59,12 @@ public void setup() throws SQLException { public void error401RenewExpired() throws SQLException, InterruptedException { // Unauthenticated, renew is called. spyingClient.handleStorageException( - new StorageException(401, "Unauthenticated"), 0, "upload", sfSession, command); + new StorageException(401, "Unauthenticated"), 0, "upload", sfSession, command, null); Mockito.verify(spyingClient, Mockito.times(1)).renew(Mockito.anyMap()); // Unauthenticated, command null, not renew, renew called remaining 1 spyingClient.handleStorageException( - new StorageException(401, "Unauthenticated"), 0, "upload", sfSession, null); + new StorageException(401, "Unauthenticated"), 0, "upload", sfSession, null, null); Mockito.verify(spyingClient, Mockito.times(1)).renew(Mockito.anyMap()); // Unauthenticated, backoff with interrupt, renew is called @@ -80,7 +80,8 @@ public void run() { maxRetry, "upload", sfSession, - command); + command, + null); } catch (SnowflakeSQLException e) { exceptionContainer[0] = e; } @@ -97,7 +98,12 @@ public void run() { @ConditionalIgnoreRule.ConditionalIgnore(condition = RunningOnGithubAction.class) public void error401OverMaxRetryThrow() throws SQLException { spyingClient.handleStorageException( - new StorageException(401, "Unauthenticated"), overMaxRetry, "upload", sfSession, command); + new StorageException(401, "Unauthenticated"), + overMaxRetry, + "upload", + sfSession, + command, + null); } @Test(expected = SnowflakeSQLException.class) @@ -105,7 +111,7 @@ public void error401OverMaxRetryThrow() throws SQLException { public void errorInvalidKey() throws SQLException { // Unauthenticated, renew is called. spyingClient.handleStorageException( - new Exception(new InvalidKeyException()), 0, "upload", sfSession, command); + new Exception(new InvalidKeyException()), 0, "upload", sfSession, command, null); } @Test(expected = SnowflakeSQLException.class) @@ -114,13 +120,13 @@ public void errorInterruptedException() throws SQLException { // Can still retry, no error thrown try { spyingClient.handleStorageException( - new InterruptedException(), 0, "upload", sfSession, command); + new InterruptedException(), 0, "upload", sfSession, command, null); } catch (Exception e) { Assert.fail("Should not have exception here"); } Mockito.verify(spyingClient, Mockito.never()).renew(Mockito.anyMap()); spyingClient.handleStorageException( - new InterruptedException(), 26, "upload", sfSession, command); + new InterruptedException(), 26, "upload", sfSession, command, null); } @Test(expected = SnowflakeSQLException.class) @@ -129,27 +135,27 @@ public void errorSocketTimeoutException() throws SQLException { // Can still retry, no error thrown try { spyingClient.handleStorageException( - new SocketTimeoutException(), 0, "upload", sfSession, command); + new SocketTimeoutException(), 0, "upload", sfSession, command, null); } catch (Exception e) { Assert.fail("Should not have exception here"); } Mockito.verify(spyingClient, Mockito.never()).renew(Mockito.anyMap()); spyingClient.handleStorageException( - new SocketTimeoutException(), 26, "upload", sfSession, command); + new SocketTimeoutException(), 26, "upload", sfSession, command, null); } @Test(expected = SnowflakeSQLException.class) @ConditionalIgnoreRule.ConditionalIgnore(condition = RunningOnGithubAction.class) public void errorUnknownException() throws SQLException { // Unauthenticated, renew is called. - spyingClient.handleStorageException(new Exception(), 0, "upload", sfSession, command); + spyingClient.handleStorageException(new Exception(), 0, "upload", sfSession, command, null); } @Test(expected = SnowflakeSQLException.class) @ConditionalIgnoreRule.ConditionalIgnore(condition = RunningOnGithubAction.class) public void errorWithNullSession() throws SQLException { spyingClient.handleStorageException( - new StorageException(401, "Unauthenticated"), 0, "upload", null, command); + new StorageException(401, "Unauthenticated"), 0, "upload", null, command, null); } @Test(expected = SnowflakeSQLException.class) @@ -167,7 +173,8 @@ public void errorNoSpaceLeftOnDevice() throws SQLException, IOException { 0, "download", null, - getCommand); + getCommand, + null); } @After diff --git a/src/test/java/net/snowflake/client/jdbc/SnowflakeS3ClientHandleExceptionLatestIT.java b/src/test/java/net/snowflake/client/jdbc/SnowflakeS3ClientHandleExceptionLatestIT.java index 5cbdfc4dd..523475251 100644 --- a/src/test/java/net/snowflake/client/jdbc/SnowflakeS3ClientHandleExceptionLatestIT.java +++ b/src/test/java/net/snowflake/client/jdbc/SnowflakeS3ClientHandleExceptionLatestIT.java @@ -75,7 +75,7 @@ public void setup() throws SQLException { public void errorRenewExpired() throws SQLException, InterruptedException { AmazonS3Exception ex = new AmazonS3Exception("unauthenticated"); ex.setErrorCode(EXPIRED_AWS_TOKEN_ERROR_CODE); - spyingClient.handleStorageException(ex, 0, "upload", sfSession, command); + spyingClient.handleStorageException(ex, 0, "upload", sfSession, command, null); Mockito.verify(spyingClient, Mockito.times(1)).renew(Mockito.anyMap()); // Unauthenticated, backoff with interrupt, renew is called @@ -86,7 +86,8 @@ public void errorRenewExpired() throws SQLException, InterruptedException { @Override public void run() { try { - spyingClient.handleStorageException(ex, maxRetry, "upload", sfSession, command); + spyingClient.handleStorageException( + ex, maxRetry, "upload", sfSession, command, null); } catch (SnowflakeSQLException e) { exceptionContainer[0] = e; } @@ -103,7 +104,7 @@ public void run() { @ConditionalIgnoreRule.ConditionalIgnore(condition = RunningOnGithubAction.class) public void errorNotFound() throws SQLException { spyingClient.handleStorageException( - new AmazonS3Exception("Not found"), overMaxRetry, "upload", sfSession, command); + new AmazonS3Exception("Not found"), overMaxRetry, "upload", sfSession, command, null); } @Test @@ -115,7 +116,7 @@ public void errorBadRequestTokenExpired() throws SQLException { ex.setErrorCode("400 Bad Request"); ex.setErrorType(AmazonServiceException.ErrorType.Client); Mockito.doReturn(true).when(spyingClient).isClientException400Or404(ex); - spyingClient.handleStorageException(ex, 0, "download", sfSession, command); + spyingClient.handleStorageException(ex, 0, "download", sfSession, command, null); // renew token Mockito.verify(spyingClient, Mockito.times(1)).isClientException400Or404(ex); Mockito.verify(spyingClient, Mockito.times(1)).renew(Mockito.anyMap()); @@ -129,7 +130,8 @@ public void errorClientUnknown() throws SQLException { overMaxRetry, "upload", sfSession, - command); + command, + null); } @Test(expected = SnowflakeSQLException.class) @@ -137,7 +139,7 @@ public void errorClientUnknown() throws SQLException { public void errorInvalidKey() throws SQLException { // Unauthenticated, renew is called. spyingClient.handleStorageException( - new Exception(new InvalidKeyException()), 0, "upload", sfSession, command); + new Exception(new InvalidKeyException()), 0, "upload", sfSession, command, null); } @Test(expected = SnowflakeSQLException.class) @@ -146,13 +148,13 @@ public void errorInterruptedException() throws SQLException { // Can still retry, no error thrown try { spyingClient.handleStorageException( - new InterruptedException(), 0, "upload", sfSession, command); + new InterruptedException(), 0, "upload", sfSession, command, null); } catch (Exception e) { Assert.fail("Should not have exception here"); } Mockito.verify(spyingClient, Mockito.never()).renew(Mockito.anyMap()); spyingClient.handleStorageException( - new InterruptedException(), 26, "upload", sfSession, command); + new InterruptedException(), 26, "upload", sfSession, command, null); } @Test(expected = SnowflakeSQLException.class) @@ -161,19 +163,19 @@ public void errorSocketTimeoutException() throws SQLException { // Can still retry, no error thrown try { spyingClient.handleStorageException( - new SocketTimeoutException(), 0, "upload", sfSession, command); + new SocketTimeoutException(), 0, "upload", sfSession, command, null); } catch (Exception e) { Assert.fail("Should not have exception here"); } Mockito.verify(spyingClient, Mockito.never()).renew(Mockito.anyMap()); spyingClient.handleStorageException( - new SocketTimeoutException(), 26, "upload", sfSession, command); + new SocketTimeoutException(), 26, "upload", sfSession, command, null); } @Test(expected = SnowflakeSQLException.class) @ConditionalIgnoreRule.ConditionalIgnore(condition = RunningOnGithubAction.class) public void errorUnknownException() throws SQLException { - spyingClient.handleStorageException(new Exception(), 0, "upload", sfSession, command); + spyingClient.handleStorageException(new Exception(), 0, "upload", sfSession, command, null); } @Test(expected = SnowflakeSQLException.class) @@ -182,7 +184,7 @@ public void errorRenewExpiredNullSession() throws SQLException { // Unauthenticated, renew is called. AmazonS3Exception ex = new AmazonS3Exception("unauthenticated"); ex.setErrorCode(EXPIRED_AWS_TOKEN_ERROR_CODE); - spyingClient.handleStorageException(ex, 0, "upload", null, command); + spyingClient.handleStorageException(ex, 0, "upload", null, command, null); } @Test(expected = SnowflakeSQLException.class) @@ -200,7 +202,8 @@ public void errorNoSpaceLeftOnDevice() throws SQLException, IOException { 0, "download", null, - getCommand); + getCommand, + null); } @After diff --git a/src/test/java/net/snowflake/client/jdbc/StatementLatestIT.java b/src/test/java/net/snowflake/client/jdbc/StatementLatestIT.java index d05b767a3..35d2a65d5 100644 --- a/src/test/java/net/snowflake/client/jdbc/StatementLatestIT.java +++ b/src/test/java/net/snowflake/client/jdbc/StatementLatestIT.java @@ -248,9 +248,10 @@ public void testQueryIdIsSetOnFailedQueryExecute() throws SQLException { try { stmt.execute("use database not_existing_database"); fail("Statement should fail with exception"); - } catch (Exception __) { + } catch (SnowflakeSQLException e) { String queryID = stmt.unwrap(SnowflakeStatement.class).getQueryID(); TestUtil.assertValidQueryId(queryID); + assertEquals(queryID, e.getQueryId()); } } } @@ -265,9 +266,10 @@ public void testQueryIdIsSetOnFailedExecuteUpdate() throws SQLException { try { stmt.executeUpdate("update not_existing_table set a = 1 where id = 42"); fail("Statement should fail with exception"); - } catch (Exception __) { + } catch (SnowflakeSQLException e) { String queryID = stmt.unwrap(SnowflakeStatement.class).getQueryID(); TestUtil.assertValidQueryId(queryID); + assertEquals(queryID, e.getQueryId()); } } } @@ -282,9 +284,10 @@ public void testQueryIdIsSetOnFailedExecuteQuery() throws SQLException { try { stmt.executeQuery("select * from not_existing_table"); fail("Statement should fail with exception"); - } catch (Exception __) { + } catch (SnowflakeSQLException e) { String queryID = stmt.unwrap(SnowflakeStatement.class).getQueryID(); TestUtil.assertValidQueryId(queryID); + assertEquals(queryID, e.getQueryId()); } } } diff --git a/src/test/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeS3ClientLatestIT.java b/src/test/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeS3ClientLatestIT.java index df5bf635a..fb3d2005e 100644 --- a/src/test/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeS3ClientLatestIT.java +++ b/src/test/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeS3ClientLatestIT.java @@ -158,12 +158,14 @@ public void testPutGetMaxRetries() throws SQLException { // Should retry one time, then throw error try { - spy.handleStorageException(new InterruptedException(), 0, "download", sfSession, command); + spy.handleStorageException( + new InterruptedException(), 0, "download", sfSession, command, null); } catch (Exception e) { Assert.fail("Should not have exception here"); } Mockito.verify(spy, Mockito.never()).renew(Mockito.anyMap()); - spy.handleStorageException(new InterruptedException(), 1, "download", sfSession, command); + spy.handleStorageException( + new InterruptedException(), 1, "download", sfSession, command, null); } } }