diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index 8e017152407d..0a0b1f3b886d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -92,6 +92,8 @@ public static class Config implements Serializable { public String sparkMemory = null; @Parameter(names = {"--retry", "-rt"}, description = "number of retries") public int retry = 0; + @Parameter(names = {"--skip-clean", "-sc"}, description = "do not trigger clean after compaction", required = false) + public Boolean skipClean = true; @Parameter(names = {"--schedule", "-sc"}, description = "Schedule clustering @desperate soon please use \"--mode schedule\" instead") public Boolean runSchedule = false; @@ -131,6 +133,7 @@ public String toString() { + " --spark-master " + sparkMaster + ", \n" + " --spark-memory " + sparkMemory + ", \n" + " --retry " + retry + ", \n" + + " --skipClean " + skipClean + ", \n" + " --schedule " + runSchedule + ", \n" + " --retry-last-failed-clustering-job " + retryLastFailedClusteringJob + ", \n" + " --mode " + runningMode + ", \n" @@ -297,7 +300,7 @@ private int doPurgePendingInstant(JavaSparkContext jsc) throws Exception { } private void clean(SparkRDDWriteClient client) { - if (client.getConfig().isAutoClean()) { + if (!cfg.skipClean && client.getConfig().isAutoClean()) { client.clean(); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java index 42633ee55582..e8e941261183 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java @@ -94,6 +94,8 @@ public static class Config implements Serializable { public String sparkMemory = null; @Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false) public int retry = 0; + @Parameter(names = {"--skip-clean", "-sc"}, description = "do not trigger clean after compaction", required = false) + public Boolean skipClean = true; @Parameter(names = {"--schedule", "-sc"}, description = "Schedule compaction", required = false) public Boolean runSchedule = false; @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" means make a compact plan; " @@ -124,6 +126,7 @@ public String toString() { + " --schema-file " + schemaFile + ", \n" + " --spark-master " + sparkMaster + ", \n" + " --spark-memory " + sparkMemory + ", \n" + + " --skipClean " + skipClean + ", \n" + " --retry " + retry + ", \n" + " --schedule " + runSchedule + ", \n" + " --mode " + runningMode + ", \n" @@ -150,6 +153,7 @@ public boolean equals(Object o) { && Objects.equals(sparkMaster, config.sparkMaster) && Objects.equals(sparkMemory, config.sparkMemory) && Objects.equals(retry, config.retry) + && Objects.equals(skipClean, config.skipClean) && Objects.equals(runSchedule, config.runSchedule) && Objects.equals(runningMode, config.runningMode) && Objects.equals(strategyClassName, config.strategyClassName) @@ -160,7 +164,7 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash(basePath, tableName, compactionInstantTime, schemaFile, - sparkMaster, parallelism, sparkMemory, retry, runSchedule, runningMode, strategyClassName, propsFilePath, configs, help); + sparkMaster, parallelism, sparkMemory, retry, skipClean, runSchedule, runningMode, strategyClassName, propsFilePath, configs, help); } } @@ -292,7 +296,7 @@ private String getSchemaFromLatestInstant() throws Exception { } private void clean(SparkRDDWriteClient client) { - if (client.getConfig().isAutoClean()) { + if (!cfg.skipClean && client.getConfig().isAutoClean()) { client.clean(); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CleanTask.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CleanTask.java index 785628a30023..45e0cc8d02b1 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CleanTask.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CleanTask.java @@ -37,6 +37,7 @@ void run() { HoodieCleaner.Config cleanConfig = new HoodieCleaner.Config(); cleanConfig.basePath = basePath; UtilHelpers.retry(retry, () -> { + // HoodieWriteClient within HoodieCleaner is closed internally. not closing HoodieCleaner here is not leaking any resources. new HoodieCleaner(cleanConfig, jsc, props).run(); return 0; }, "Clean Failed"); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java index e20d71e8cc9c..66efbd475dc4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java @@ -55,6 +55,7 @@ void run() { clusteringConfig.basePath = basePath; clusteringConfig.parallelism = parallelism; clusteringConfig.runningMode = clusteringMode; + // HoodieWriteClient within HoodieClusteringJob is closed internally. not closing HoodieCleaner here is not leaking any resources. new HoodieClusteringJob(jsc, clusteringConfig, props, metaClient).cluster(retry); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CompactionTask.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CompactionTask.java index 25b80e7cd45d..0b946b8cc3ad 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CompactionTask.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/CompactionTask.java @@ -62,6 +62,7 @@ void run() { compactionCfg.runningMode = compactionRunningMode; compactionCfg.parallelism = parallelism; compactionCfg.retry = retry; + // HoodieWriteClient within HoodieCompactor is closed internally. not closing HoodieCleaner here is not leaking any resources. new HoodieCompactor(jsc, compactionCfg, props, metaClient).compact(retry); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java index d56cc3102f80..ca285257efab 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java @@ -34,6 +34,8 @@ import org.apache.hudi.utilities.HoodieClusteringJob; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.Properties; @@ -49,9 +51,10 @@ */ public class TestHoodieClusteringJob extends HoodieOfflineJobTestBase { - @Test - public void testHoodieClusteringJobWithClean() throws Exception { - String tableBasePath = basePath + "/asyncClustering"; + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testHoodieClusteringJobWithClean(boolean skipClean) throws Exception { + String tableBasePath = basePath + "/asyncClustering_" + skipClean; Properties props = getPropertiesForKeyGen(true); HoodieWriteConfig config = getWriteConfig(tableBasePath); props.putAll(config.getProps()); @@ -70,7 +73,7 @@ public void testHoodieClusteringJobWithClean() throws Exception { // offline clustering execute without clean HoodieClusteringJob hoodieCluster = - init(tableBasePath, true, "scheduleAndExecute", false); + init(tableBasePath, true, "scheduleAndExecute", false, skipClean); hoodieCluster.cluster(0); HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(1, tableBasePath); HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(0, tableBasePath); @@ -80,10 +83,12 @@ public void testHoodieClusteringJobWithClean() throws Exception { // offline clustering execute with sync clean hoodieCluster = - init(tableBasePath, true, "scheduleAndExecute", true); + init(tableBasePath, true, "scheduleAndExecute", false, skipClean); hoodieCluster.cluster(0); HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(2, tableBasePath); - HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(1, tableBasePath); + if (!skipClean) { + HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(1, tableBasePath); + } } @Test @@ -107,7 +112,7 @@ public void testPurgePendingInstants() throws Exception { // offline clustering execute without clean HoodieClusteringJob hoodieCluster = - init(tableBasePath, true, "scheduleAndExecute", false); + init(tableBasePath, true, "scheduleAndExecute", false, false); hoodieCluster.cluster(0); HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(1, tableBasePath); HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(0, tableBasePath); @@ -120,7 +125,7 @@ public void testPurgePendingInstants() throws Exception { // trigger purge. hoodieCluster = - getClusteringConfigForPurge(tableBasePath, true, PURGE_PENDING_INSTANT, false, latestClusteringInstant.getTimestamp()); + getClusteringConfigForPurge(tableBasePath, true, PURGE_PENDING_INSTANT, latestClusteringInstant.getTimestamp()); hoodieCluster.cluster(0); // validate that there are no clustering commits in timeline. HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(0, tableBasePath); @@ -143,27 +148,28 @@ private void deleteCommitMetaFile(String instantTime, String suffix) throws IOEx // Utilities // ------------------------------------------------------------------------- - private HoodieClusteringJob init(String tableBasePath, boolean runSchedule, String scheduleAndExecute, boolean isAutoClean) { - HoodieClusteringJob.Config clusterConfig = buildHoodieClusteringUtilConfig(tableBasePath, runSchedule, scheduleAndExecute, isAutoClean); + private HoodieClusteringJob init(String tableBasePath, boolean runSchedule, String scheduleAndExecute, boolean isAutoClean, boolean skipClean) { + HoodieClusteringJob.Config clusterConfig = buildHoodieClusteringUtilConfig(tableBasePath, runSchedule, scheduleAndExecute, skipClean); clusterConfig.configs.add(String.format("%s=%s", "hoodie.datasource.write.row.writer.enable", "false")); return new HoodieClusteringJob(jsc, clusterConfig); } - private HoodieClusteringJob getClusteringConfigForPurge(String tableBasePath, boolean runSchedule, String scheduleAndExecute, boolean isAutoClean, + private HoodieClusteringJob getClusteringConfigForPurge(String tableBasePath, boolean runSchedule, String scheduleAndExecute, String pendingInstant) { - HoodieClusteringJob.Config clusterConfig = buildHoodieClusteringUtilConfig(tableBasePath, runSchedule, scheduleAndExecute, isAutoClean); + HoodieClusteringJob.Config clusterConfig = buildHoodieClusteringUtilConfig(tableBasePath, runSchedule, scheduleAndExecute, false); clusterConfig.configs.add(String.format("%s=%s", "hoodie.datasource.write.row.writer.enable", "false")); clusterConfig.clusteringInstantTime = pendingInstant; return new HoodieClusteringJob(jsc, clusterConfig); } - private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath, boolean runSchedule, String runningMode, boolean isAutoClean) { + private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath, boolean runSchedule, String runningMode, + boolean skipClean) { HoodieClusteringJob.Config config = new HoodieClusteringJob.Config(); config.basePath = basePath; config.runSchedule = runSchedule; config.runningMode = runningMode; config.configs.add("hoodie.metadata.enable=false"); - config.configs.add(String.format("%s=%s", HoodieCleanConfig.AUTO_CLEAN.key(), isAutoClean)); + config.skipClean = skipClean; config.configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), 1)); config.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), 1)); return config; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieCompactorJob.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieCompactorJob.java index be5866292148..bd0000efd005 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieCompactorJob.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieCompactorJob.java @@ -36,7 +36,8 @@ import org.apache.hudi.table.storage.HoodieStorageLayout; import org.apache.hudi.utilities.HoodieCompactor; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.Properties; @@ -47,9 +48,10 @@ */ public class TestHoodieCompactorJob extends HoodieOfflineJobTestBase { - @Test - public void testHoodieCompactorWithClean() throws Exception { - String tableBasePath = basePath + "/asyncCompaction"; + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testHoodieCompactorWithOptionalClean(boolean skipClean) throws Exception { + String tableBasePath = basePath + "/asyncCompaction_" + skipClean; Properties props = getPropertiesForKeyGen(true); HoodieWriteConfig config = HoodieWriteConfig.newBuilder() .forTable("asyncCompaction") @@ -86,7 +88,7 @@ public void testHoodieCompactorWithClean() throws Exception { // offline compaction schedule HoodieCompactor hoodieCompactorSchedule = - init(tableBasePath, true, "SCHEDULE", false); + init(tableBasePath, true, "SCHEDULE", skipClean); hoodieCompactorSchedule.compact(0); TestHelpers.assertNCompletedCommits(2, tableBasePath); TestHelpers.assertNCleanCommits(0, tableBasePath); @@ -94,30 +96,32 @@ public void testHoodieCompactorWithClean() throws Exception { writeData(true, client.createNewInstantTime(), 100, true); writeData(true, client.createNewInstantTime(), 100, true); - // offline compaction execute with sync clean + // offline compaction execute with optional clean HoodieCompactor hoodieCompactorExecute = - init(tableBasePath, false, "EXECUTE", true); + init(tableBasePath, false, "EXECUTE", skipClean); hoodieCompactorExecute.compact(0); TestHelpers.assertNCompletedCommits(5, tableBasePath); - TestHelpers.assertNCleanCommits(1, tableBasePath); + if (!skipClean) { + TestHelpers.assertNCleanCommits(1, tableBasePath); + } } // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- - private HoodieCompactor init(String tableBasePath, boolean runSchedule, String scheduleAndExecute, boolean isAutoClean) { - HoodieCompactor.Config compactionConfig = buildCompactionConfig(tableBasePath, runSchedule, scheduleAndExecute, isAutoClean); + private HoodieCompactor init(String tableBasePath, boolean runSchedule, String scheduleAndExecute, boolean skipClean) { + HoodieCompactor.Config compactionConfig = buildCompactionConfig(tableBasePath, runSchedule, scheduleAndExecute, skipClean); return new HoodieCompactor(jsc, compactionConfig); } - private HoodieCompactor.Config buildCompactionConfig(String basePath, boolean runSchedule, String runningMode, boolean isAutoClean) { + private HoodieCompactor.Config buildCompactionConfig(String basePath, boolean runSchedule, String runningMode, boolean skipClean) { HoodieCompactor.Config config = new HoodieCompactor.Config(); config.basePath = basePath; config.runSchedule = runSchedule; config.runningMode = runningMode; config.configs.add("hoodie.metadata.enable=false"); - config.configs.add(String.format("%s=%s", HoodieCleanConfig.AUTO_CLEAN.key(), isAutoClean)); + config.skipClean = skipClean; config.configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), 1)); config.configs.add(String.format("%s=%s", HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 1)); return config;