diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index 569cf908e..6d7366193 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -535,7 +535,10 @@ public void dropStage(final String stageName) public void purgeStage(final String stageName, final List files) { InternalUtils.assertNotEmpty("stageName", stageName); - removeFile(stageName, files); + for (String fileName : files) + { + removeFile(stageName, fileName); + } logInfo("purge {} files from stage: {}", files.size(), stageName); } @@ -568,11 +571,11 @@ public void moveToTableStage(final String tableName, final String stageName { throw SnowflakeErrors.ERROR_2003.getException(e); } + //remove + removeFile(stageName, name); logInfo("moved file: {} from stage: {} to table stage: {}", name, stageName, tableName); } - //remove - removeFile(stageName, files); } @Override @@ -774,36 +777,27 @@ private String pipeDefinition(String tableName, String stageName) * Remove one file from given stage * * @param stageName stage name - * @param fileList file names list + * @param fileName file name */ - private void removeFile(String stageName, List fileList) + private void removeFile(String stageName, String fileName) { - if (fileList.size() == 0) - { - return; - } InternalUtils.assertNotEmpty("stageName", stageName); + String query = "rm @" + stageName + "/" + fileName; + try { InternalUtils.backoffAndRetry(telemetry, - () -> - { - String query = ""; - for (String fileName : fileList) - { - query = query + "rm @" + stageName + "/" + fileName + "; "; - } - Statement stmt = conn.createStatement(); - stmt.unwrap(SnowflakeStatement.class).setParameter("MULTI_STATEMENT_COUNT", fileList.size()); - stmt.execute(query); - stmt.close(); - return true; - } - ); + () -> + { + PreparedStatement stmt = conn.prepareStatement(query); + stmt.execute(); + stmt.close(); + return true; + }); } catch (Exception e) { throw SnowflakeErrors.ERROR_2001.getException(e); } - logDebug("deleted {} files from stage {}", fileList.size(), stageName); + logDebug("deleted {} from stage {}", fileName, stageName); } }