Skip to content

Commit

Permalink
[HUDI-7407] Making clean optional in standalone compaction and cluste…
Browse files Browse the repository at this point in the history
…ring jobs (#10668)

* Making clean optional in standalone compaction and clustering standlaone jobs
  • Loading branch information
nsivabalan authored May 30, 2024
1 parent cd62c31 commit f0c1a88
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public static class Config implements Serializable {
public String sparkMemory = null;
@Parameter(names = {"--retry", "-rt"}, description = "number of retries")
public int retry = 0;
@Parameter(names = {"--skip-clean", "-sc"}, description = "do not trigger clean after compaction", required = false)
public Boolean skipClean = true;

@Parameter(names = {"--schedule", "-sc"}, description = "Schedule clustering @desperate soon please use \"--mode schedule\" instead")
public Boolean runSchedule = false;
Expand Down Expand Up @@ -131,6 +133,7 @@ public String toString() {
+ " --spark-master " + sparkMaster + ", \n"
+ " --spark-memory " + sparkMemory + ", \n"
+ " --retry " + retry + ", \n"
+ " --skipClean " + skipClean + ", \n"
+ " --schedule " + runSchedule + ", \n"
+ " --retry-last-failed-clustering-job " + retryLastFailedClusteringJob + ", \n"
+ " --mode " + runningMode + ", \n"
Expand Down Expand Up @@ -297,7 +300,7 @@ private int doPurgePendingInstant(JavaSparkContext jsc) throws Exception {
}

private void clean(SparkRDDWriteClient<?> client) {
if (client.getConfig().isAutoClean()) {
if (!cfg.skipClean && client.getConfig().isAutoClean()) {
client.clean();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public static class Config implements Serializable {
public String sparkMemory = null;
@Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false)
public int retry = 0;
@Parameter(names = {"--skip-clean", "-sc"}, description = "do not trigger clean after compaction", required = false)
public Boolean skipClean = true;
@Parameter(names = {"--schedule", "-sc"}, description = "Schedule compaction", required = false)
public Boolean runSchedule = false;
@Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" means make a compact plan; "
Expand Down Expand Up @@ -124,6 +126,7 @@ public String toString() {
+ " --schema-file " + schemaFile + ", \n"
+ " --spark-master " + sparkMaster + ", \n"
+ " --spark-memory " + sparkMemory + ", \n"
+ " --skipClean " + skipClean + ", \n"
+ " --retry " + retry + ", \n"
+ " --schedule " + runSchedule + ", \n"
+ " --mode " + runningMode + ", \n"
Expand All @@ -150,6 +153,7 @@ public boolean equals(Object o) {
&& Objects.equals(sparkMaster, config.sparkMaster)
&& Objects.equals(sparkMemory, config.sparkMemory)
&& Objects.equals(retry, config.retry)
&& Objects.equals(skipClean, config.skipClean)
&& Objects.equals(runSchedule, config.runSchedule)
&& Objects.equals(runningMode, config.runningMode)
&& Objects.equals(strategyClassName, config.strategyClassName)
Expand All @@ -160,7 +164,7 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
return Objects.hash(basePath, tableName, compactionInstantTime, schemaFile,
sparkMaster, parallelism, sparkMemory, retry, runSchedule, runningMode, strategyClassName, propsFilePath, configs, help);
sparkMaster, parallelism, sparkMemory, retry, skipClean, runSchedule, runningMode, strategyClassName, propsFilePath, configs, help);
}
}

Expand Down Expand Up @@ -292,7 +296,7 @@ private String getSchemaFromLatestInstant() throws Exception {
}

private void clean(SparkRDDWriteClient<?> client) {
if (client.getConfig().isAutoClean()) {
if (!cfg.skipClean && client.getConfig().isAutoClean()) {
client.clean();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ void run() {
HoodieCleaner.Config cleanConfig = new HoodieCleaner.Config();
cleanConfig.basePath = basePath;
UtilHelpers.retry(retry, () -> {
// HoodieWriteClient within HoodieCleaner is closed internally. not closing HoodieCleaner here is not leaking any resources.
new HoodieCleaner(cleanConfig, jsc, props).run();
return 0;
}, "Clean Failed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ void run() {
clusteringConfig.basePath = basePath;
clusteringConfig.parallelism = parallelism;
clusteringConfig.runningMode = clusteringMode;
// HoodieWriteClient within HoodieClusteringJob is closed internally. not closing HoodieCleaner here is not leaking any resources.
new HoodieClusteringJob(jsc, clusteringConfig, props, metaClient).cluster(retry);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ void run() {
compactionCfg.runningMode = compactionRunningMode;
compactionCfg.parallelism = parallelism;
compactionCfg.retry = retry;
// HoodieWriteClient within HoodieCompactor is closed internally. not closing HoodieCleaner here is not leaking any resources.
new HoodieCompactor(jsc, compactionCfg, props, metaClient).compact(retry);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.hudi.utilities.HoodieClusteringJob;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.util.Properties;
Expand All @@ -49,9 +51,10 @@
*/
public class TestHoodieClusteringJob extends HoodieOfflineJobTestBase {

@Test
public void testHoodieClusteringJobWithClean() throws Exception {
String tableBasePath = basePath + "/asyncClustering";
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testHoodieClusteringJobWithClean(boolean skipClean) throws Exception {
String tableBasePath = basePath + "/asyncClustering_" + skipClean;
Properties props = getPropertiesForKeyGen(true);
HoodieWriteConfig config = getWriteConfig(tableBasePath);
props.putAll(config.getProps());
Expand All @@ -70,7 +73,7 @@ public void testHoodieClusteringJobWithClean() throws Exception {

// offline clustering execute without clean
HoodieClusteringJob hoodieCluster =
init(tableBasePath, true, "scheduleAndExecute", false);
init(tableBasePath, true, "scheduleAndExecute", false, skipClean);
hoodieCluster.cluster(0);
HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(1, tableBasePath);
HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(0, tableBasePath);
Expand All @@ -80,10 +83,12 @@ public void testHoodieClusteringJobWithClean() throws Exception {

// offline clustering execute with sync clean
hoodieCluster =
init(tableBasePath, true, "scheduleAndExecute", true);
init(tableBasePath, true, "scheduleAndExecute", false, skipClean);
hoodieCluster.cluster(0);
HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(2, tableBasePath);
HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(1, tableBasePath);
if (!skipClean) {
HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(1, tableBasePath);
}
}

@Test
Expand All @@ -107,7 +112,7 @@ public void testPurgePendingInstants() throws Exception {

// offline clustering execute without clean
HoodieClusteringJob hoodieCluster =
init(tableBasePath, true, "scheduleAndExecute", false);
init(tableBasePath, true, "scheduleAndExecute", false, false);
hoodieCluster.cluster(0);
HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(1, tableBasePath);
HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(0, tableBasePath);
Expand All @@ -120,7 +125,7 @@ public void testPurgePendingInstants() throws Exception {

// trigger purge.
hoodieCluster =
getClusteringConfigForPurge(tableBasePath, true, PURGE_PENDING_INSTANT, false, latestClusteringInstant.getTimestamp());
getClusteringConfigForPurge(tableBasePath, true, PURGE_PENDING_INSTANT, latestClusteringInstant.getTimestamp());
hoodieCluster.cluster(0);
// validate that there are no clustering commits in timeline.
HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(0, tableBasePath);
Expand All @@ -143,27 +148,28 @@ private void deleteCommitMetaFile(String instantTime, String suffix) throws IOEx
// Utilities
// -------------------------------------------------------------------------

private HoodieClusteringJob init(String tableBasePath, boolean runSchedule, String scheduleAndExecute, boolean isAutoClean) {
HoodieClusteringJob.Config clusterConfig = buildHoodieClusteringUtilConfig(tableBasePath, runSchedule, scheduleAndExecute, isAutoClean);
private HoodieClusteringJob init(String tableBasePath, boolean runSchedule, String scheduleAndExecute, boolean isAutoClean, boolean skipClean) {
HoodieClusteringJob.Config clusterConfig = buildHoodieClusteringUtilConfig(tableBasePath, runSchedule, scheduleAndExecute, skipClean);
clusterConfig.configs.add(String.format("%s=%s", "hoodie.datasource.write.row.writer.enable", "false"));
return new HoodieClusteringJob(jsc, clusterConfig);
}

private HoodieClusteringJob getClusteringConfigForPurge(String tableBasePath, boolean runSchedule, String scheduleAndExecute, boolean isAutoClean,
private HoodieClusteringJob getClusteringConfigForPurge(String tableBasePath, boolean runSchedule, String scheduleAndExecute,
String pendingInstant) {
HoodieClusteringJob.Config clusterConfig = buildHoodieClusteringUtilConfig(tableBasePath, runSchedule, scheduleAndExecute, isAutoClean);
HoodieClusteringJob.Config clusterConfig = buildHoodieClusteringUtilConfig(tableBasePath, runSchedule, scheduleAndExecute, false);
clusterConfig.configs.add(String.format("%s=%s", "hoodie.datasource.write.row.writer.enable", "false"));
clusterConfig.clusteringInstantTime = pendingInstant;
return new HoodieClusteringJob(jsc, clusterConfig);
}

private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath, boolean runSchedule, String runningMode, boolean isAutoClean) {
private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath, boolean runSchedule, String runningMode,
boolean skipClean) {
HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
config.basePath = basePath;
config.runSchedule = runSchedule;
config.runningMode = runningMode;
config.configs.add("hoodie.metadata.enable=false");
config.configs.add(String.format("%s=%s", HoodieCleanConfig.AUTO_CLEAN.key(), isAutoClean));
config.skipClean = skipClean;
config.configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), 1));
config.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), 1));
return config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.hudi.utilities.HoodieCompactor;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Properties;

Expand All @@ -47,9 +48,10 @@
*/
public class TestHoodieCompactorJob extends HoodieOfflineJobTestBase {

@Test
public void testHoodieCompactorWithClean() throws Exception {
String tableBasePath = basePath + "/asyncCompaction";
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testHoodieCompactorWithOptionalClean(boolean skipClean) throws Exception {
String tableBasePath = basePath + "/asyncCompaction_" + skipClean;
Properties props = getPropertiesForKeyGen(true);
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.forTable("asyncCompaction")
Expand Down Expand Up @@ -86,38 +88,40 @@ public void testHoodieCompactorWithClean() throws Exception {

// offline compaction schedule
HoodieCompactor hoodieCompactorSchedule =
init(tableBasePath, true, "SCHEDULE", false);
init(tableBasePath, true, "SCHEDULE", skipClean);
hoodieCompactorSchedule.compact(0);
TestHelpers.assertNCompletedCommits(2, tableBasePath);
TestHelpers.assertNCleanCommits(0, tableBasePath);

writeData(true, client.createNewInstantTime(), 100, true);
writeData(true, client.createNewInstantTime(), 100, true);

// offline compaction execute with sync clean
// offline compaction execute with optional clean
HoodieCompactor hoodieCompactorExecute =
init(tableBasePath, false, "EXECUTE", true);
init(tableBasePath, false, "EXECUTE", skipClean);
hoodieCompactorExecute.compact(0);
TestHelpers.assertNCompletedCommits(5, tableBasePath);
TestHelpers.assertNCleanCommits(1, tableBasePath);
if (!skipClean) {
TestHelpers.assertNCleanCommits(1, tableBasePath);
}
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------

private HoodieCompactor init(String tableBasePath, boolean runSchedule, String scheduleAndExecute, boolean isAutoClean) {
HoodieCompactor.Config compactionConfig = buildCompactionConfig(tableBasePath, runSchedule, scheduleAndExecute, isAutoClean);
private HoodieCompactor init(String tableBasePath, boolean runSchedule, String scheduleAndExecute, boolean skipClean) {
HoodieCompactor.Config compactionConfig = buildCompactionConfig(tableBasePath, runSchedule, scheduleAndExecute, skipClean);
return new HoodieCompactor(jsc, compactionConfig);
}

private HoodieCompactor.Config buildCompactionConfig(String basePath, boolean runSchedule, String runningMode, boolean isAutoClean) {
private HoodieCompactor.Config buildCompactionConfig(String basePath, boolean runSchedule, String runningMode, boolean skipClean) {
HoodieCompactor.Config config = new HoodieCompactor.Config();
config.basePath = basePath;
config.runSchedule = runSchedule;
config.runningMode = runningMode;
config.configs.add("hoodie.metadata.enable=false");
config.configs.add(String.format("%s=%s", HoodieCleanConfig.AUTO_CLEAN.key(), isAutoClean));
config.skipClean = skipClean;
config.configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), 1));
config.configs.add(String.format("%s=%s", HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 1));
return config;
Expand Down

0 comments on commit f0c1a88

Please sign in to comment.