MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE = ConfigProperty
.key("hoodie.merge.allow.duplicate.on.inserts")
- .defaultValue("false")
+ .defaultValue("true")
.markAdvanced()
.withDocumentation("When enabled, we allow duplicate keys even if inserts are routed to merge with an existing file (for ensuring file sizing)."
+ " This is only relevant for insert operation, since upsert, delete operations will ensure unique key constraints are maintained.");
@@ -2238,6 +2239,26 @@ public int getGraphiteReportPeriodSeconds() {
return getInt(HoodieMetricsGraphiteConfig.GRAPHITE_REPORT_PERIOD_IN_SECONDS);
}
+ public String getM3ServerHost() {
+ return getString(HoodieMetricsM3Config.M3_SERVER_HOST_NAME);
+ }
+
+ public int getM3ServerPort() {
+ return getInt(HoodieMetricsM3Config.M3_SERVER_PORT_NUM);
+ }
+
+ public String getM3Tags() {
+ return getString(HoodieMetricsM3Config.M3_TAGS);
+ }
+
+ public String getM3Env() {
+ return getString(HoodieMetricsM3Config.M3_ENV);
+ }
+
+ public String getM3Service() {
+ return getString(HoodieMetricsM3Config.M3_SERVICE);
+ }
+
public String getJmxHost() {
return getString(HoodieMetricsJmxConfig.JMX_HOST_NAME);
}
@@ -2697,6 +2718,29 @@ public boolean isNonBlockingConcurrencyControl() {
return getWriteConcurrencyMode().isNonBlockingConcurrencyControl();
}
+ /**
+ * TTL configs.
+ */
+ public boolean isInlinePartitionTTLEnable() {
+ return getBoolean(HoodieTTLConfig.INLINE_PARTITION_TTL);
+ }
+
+ public String getPartitionTTLStrategyClassName() {
+ return getString(HoodieTTLConfig.PARTITION_TTL_STRATEGY_CLASS_NAME);
+ }
+
+ public Integer getPartitionTTLStrategyDaysRetain() {
+ return getInt(HoodieTTLConfig.DAYS_RETAIN);
+ }
+
+ public String getPartitionTTLPartitionSelected() {
+ return getString(HoodieTTLConfig.PARTITION_SELECTED);
+ }
+
+ public Integer getPartitionTTLMaxPartitionsToDelete() {
+ return getInt(HoodieTTLConfig.MAX_PARTITION_TO_DELETE);
+ }
+
public static class Builder {
protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -2716,10 +2760,13 @@ public static class Builder {
private boolean isCallbackConfigSet = false;
private boolean isPayloadConfigSet = false;
private boolean isMetadataConfigSet = false;
+
+ private boolean isTTLConfigSet = false;
private boolean isLockConfigSet = false;
private boolean isPreCommitValidationConfigSet = false;
private boolean isMetricsJmxConfigSet = false;
private boolean isMetricsGraphiteConfigSet = false;
+ private boolean isMetricsM3ConfigSet = false;
private boolean isLayoutConfigSet = false;
public Builder withEngineType(EngineType engineType) {
@@ -2959,6 +3006,12 @@ public Builder withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig mericsGraph
return this;
}
+ public Builder withMetricsM3Config(HoodieMetricsM3Config metricsM3Config) {
+ writeConfig.getProps().putAll(metricsM3Config.getProps());
+ isMetricsM3ConfigSet = true;
+ return this;
+ }
+
public Builder withPreCommitValidatorConfig(HoodiePreCommitValidatorConfig validatorConfig) {
writeConfig.getProps().putAll(validatorConfig.getProps());
isPreCommitValidationConfigSet = true;
@@ -2995,6 +3048,12 @@ public Builder withMetadataConfig(HoodieMetadataConfig metadataConfig) {
return this;
}
+ public Builder withTTLConfig(HoodieTTLConfig ttlConfig) {
+ writeConfig.getProps().putAll(ttlConfig.getProps());
+ isTTLConfigSet = true;
+ return this;
+ }
+
public Builder withAutoCommit(boolean autoCommit) {
writeConfig.setValue(AUTO_COMMIT_ENABLE, String.valueOf(autoCommit));
return this;
@@ -3262,6 +3321,8 @@ protected void setDefaults() {
final boolean isLockProviderPropertySet = writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key());
writeConfig.setDefaultOnCondition(!isLockConfigSet,
HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
+ writeConfig.setDefaultOnCondition(!isTTLConfigSet,
+ HoodieTTLConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
autoAdjustConfigsForConcurrencyMode(isLockProviderPropertySet);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
index e1d0afeb6fa4..328619f5e9c8 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
@@ -220,6 +220,8 @@ public HoodieMetricsConfig build() {
HoodieMetricsGraphiteConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
hoodieMetricsConfig.setDefaultOnCondition(reporterType == MetricsReporterType.CLOUDWATCH,
HoodieMetricsCloudWatchConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
+ hoodieMetricsConfig.setDefaultOnCondition(reporterType == MetricsReporterType.M3,
+ HoodieMetricsM3Config.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
return hoodieMetricsConfig;
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsM3Config.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsM3Config.java
new file mode 100644
index 000000000000..cc675eebfbbf
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsM3Config.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config.metrics;
+
+import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRIC_PREFIX;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+/**
+ * Configs for M3 reporter type.
+ *
+ * {@link org.apache.hudi.metrics.MetricsReporterType#M3}
+ */
+@ConfigClassProperty(name = "Metrics Configurations for M3",
+ groupName = ConfigGroups.Names.METRICS,
+ description = "Enables reporting on Hudi metrics using M3. "
+ + " Hudi publishes metrics on every commit, clean, rollback etc.")
+public class HoodieMetricsM3Config extends HoodieConfig {
+
+ public static final String M3_PREFIX = METRIC_PREFIX + ".m3";
+
+ public static final ConfigProperty M3_SERVER_HOST_NAME = ConfigProperty
+ .key(M3_PREFIX + ".host")
+ .defaultValue("localhost")
+ .withDocumentation("M3 host to connect to.");
+
+ public static final ConfigProperty M3_SERVER_PORT_NUM = ConfigProperty
+ .key(M3_PREFIX + ".port")
+ .defaultValue(9052)
+ .withDocumentation("M3 port to connect to.");
+
+ public static final ConfigProperty M3_TAGS = ConfigProperty
+ .key(M3_PREFIX + ".tags")
+ .defaultValue("")
+ .withDocumentation("Optional M3 tags applied to all metrics.");
+
+ public static final ConfigProperty M3_ENV = ConfigProperty
+ .key(M3_PREFIX + ".env")
+ .defaultValue("production")
+ .withDocumentation("M3 tag to label the environment (defaults to 'production'), "
+ + "applied to all metrics.");
+
+ public static final ConfigProperty M3_SERVICE = ConfigProperty
+ .key(M3_PREFIX + ".service")
+ .defaultValue("hoodie")
+ .withDocumentation("M3 tag to label the service name (defaults to 'hoodie'), "
+ + "applied to all metrics.");
+
+ private HoodieMetricsM3Config() {
+ super();
+ }
+
+ public static HoodieMetricsM3Config.Builder newBuilder() {
+ return new HoodieMetricsM3Config.Builder();
+ }
+
+ public static class Builder {
+
+ private final HoodieMetricsM3Config hoodieMetricsM3Config = new HoodieMetricsM3Config();
+
+ public HoodieMetricsM3Config.Builder fromFile(File propertiesFile) throws IOException {
+ try (FileReader reader = new FileReader(propertiesFile)) {
+ this.hoodieMetricsM3Config.getProps().load(reader);
+ return this;
+ }
+ }
+
+ public HoodieMetricsM3Config.Builder fromProperties(Properties props) {
+ this.hoodieMetricsM3Config.getProps().putAll(props);
+ return this;
+ }
+
+ public HoodieMetricsM3Config.Builder toM3Host(String host) {
+ hoodieMetricsM3Config.setValue(M3_SERVER_HOST_NAME, host);
+ return this;
+ }
+
+ public HoodieMetricsM3Config.Builder onM3Port(int port) {
+ hoodieMetricsM3Config.setValue(M3_SERVER_PORT_NUM, String.valueOf(port));
+ return this;
+ }
+
+ public HoodieMetricsM3Config.Builder useM3Tags(String tags) {
+ hoodieMetricsM3Config.setValue(M3_TAGS, tags);
+ return this;
+ }
+
+ public HoodieMetricsM3Config.Builder useM3Env(String env) {
+ hoodieMetricsM3Config.setValue(M3_ENV, env);
+ return this;
+ }
+
+ public HoodieMetricsM3Config.Builder useM3Service(String service) {
+ hoodieMetricsM3Config.setValue(M3_SERVICE, service);
+ return this;
+ }
+
+ public HoodieMetricsM3Config build() {
+ hoodieMetricsM3Config.setDefaults(HoodieMetricsM3Config.class.getName());
+ return hoodieMetricsM3Config;
+ }
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
index d22e4b21a5ec..0e47d0a688ab 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
@@ -210,7 +210,16 @@ private static void createCommitMarker(HoodieTable table, Path fileStatus, Path
if (fs.exists(fullPath)) {
return;
}
- FileIOUtils.createFileInPath(fs, fullPath, Option.of(getUTF8Bytes(StringUtils.EMPTY_STRING)));
+ //prevent exception from race condition. We are ok with the file being created in another thread, so we should
+ // check for the marker after catching the exception and we don't need to fail if the file exists
+ try {
+ FileIOUtils.createFileInPath(fs, fullPath, Option.of(getUTF8Bytes(StringUtils.EMPTY_STRING)));
+ } catch (HoodieIOException e) {
+ if (!fs.exists(fullPath)) {
+ throw e;
+ }
+ LOG.warn("Failed to create marker but " + fullPath + " exists", e);
+ }
}
/***
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index f8cc77274c2e..1301f046ae29 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -89,7 +89,6 @@ public class HoodieAppendHandle extends HoodieWriteHandle recordList = new ArrayList<>();
@@ -158,7 +157,6 @@ public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTa
? Option.of(new Schema.Parser().parse(config.getPartialUpdateSchema()))
: Option.empty(),
taskContextSupplier);
- this.fileId = fileId;
this.recordItr = recordItr;
this.sizeEstimator = new DefaultSizeEstimator();
this.statuses = new ArrayList<>();
@@ -173,50 +171,52 @@ public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTa
}
private void init(HoodieRecord record) {
- if (doInit) {
- String prevCommit = instantTime;
- String baseFile = "";
- List logFiles = new ArrayList<>();
- if (config.isCDCEnabled()) {
- // the cdc reader needs the base file metadata to have deterministic update sequence.
- TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
- Option fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
- if (fileSlice.isPresent()) {
- prevCommit = fileSlice.get().getBaseInstantTime();
- baseFile = fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
- logFiles = fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
- }
+ if (!doInit) {
+ return;
+ }
+
+ String prevCommit = instantTime;
+ String baseFile = "";
+ List logFiles = new ArrayList<>();
+ if (config.isCDCEnabled()) {
+ // the cdc reader needs the base file metadata to have deterministic update sequence.
+ TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
+ Option fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
+ if (fileSlice.isPresent()) {
+ prevCommit = fileSlice.get().getBaseInstantTime();
+ baseFile = fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
+ logFiles = fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
}
+ }
- // Prepare the first write status
- HoodieDeltaWriteStat deltaWriteStat = new HoodieDeltaWriteStat();
- writeStatus.setStat(deltaWriteStat);
- writeStatus.setFileId(fileId);
- writeStatus.setPartitionPath(partitionPath);
- averageRecordSize = sizeEstimator.sizeEstimate(record);
+ // Prepare the first write status
+ HoodieDeltaWriteStat deltaWriteStat = new HoodieDeltaWriteStat();
+ writeStatus.setStat(deltaWriteStat);
+ writeStatus.setFileId(fileId);
+ writeStatus.setPartitionPath(partitionPath);
+ averageRecordSize = sizeEstimator.sizeEstimate(record);
- deltaWriteStat.setPrevCommit(prevCommit);
- deltaWriteStat.setPartitionPath(partitionPath);
- deltaWriteStat.setFileId(fileId);
- deltaWriteStat.setBaseFile(baseFile);
- deltaWriteStat.setLogFiles(logFiles);
+ deltaWriteStat.setPrevCommit(prevCommit);
+ deltaWriteStat.setPartitionPath(partitionPath);
+ deltaWriteStat.setFileId(fileId);
+ deltaWriteStat.setBaseFile(baseFile);
+ deltaWriteStat.setLogFiles(logFiles);
- try {
- // Save hoodie partition meta in the partition path
- HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime,
- new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
- hoodieTable.getPartitionMetafileFormat());
- partitionMetadata.trySave(getPartitionId());
-
- this.writer = createLogWriter(getFileInstant(record));
- } catch (Exception e) {
- LOG.error("Error in update task at commit " + instantTime, e);
- writeStatus.setGlobalError(e);
- throw new HoodieUpsertException("Failed to initialize HoodieAppendHandle for FileId: " + fileId + " on commit "
- + instantTime + " on HDFS path " + hoodieTable.getMetaClient().getBasePathV2() + "/" + partitionPath, e);
- }
- doInit = false;
+ try {
+ // Save hoodie partition meta in the partition path
+ HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime,
+ new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
+ hoodieTable.getPartitionMetafileFormat());
+ partitionMetadata.trySave(getPartitionId());
+
+ this.writer = createLogWriter(getFileInstant(record));
+ } catch (Exception e) {
+ LOG.error("Error in update task at commit " + instantTime, e);
+ writeStatus.setGlobalError(e);
+ throw new HoodieUpsertException("Failed to initialize HoodieAppendHandle for FileId: " + fileId + " on commit "
+ + instantTime + " on HDFS path " + hoodieTable.getMetaClient().getBasePathV2() + "/" + partitionPath, e);
}
+ doInit = false;
}
/**
@@ -324,12 +324,7 @@ private MetadataValues populateMetadataFields(HoodieRecord hoodieRecord) {
private void initNewStatus() {
HoodieDeltaWriteStat prevStat = (HoodieDeltaWriteStat) this.writeStatus.getStat();
// Make a new write status and copy basic fields over.
- HoodieDeltaWriteStat stat = new HoodieDeltaWriteStat();
- stat.setFileId(fileId);
- stat.setPartitionPath(partitionPath);
- stat.setPrevCommit(prevStat.getPrevCommit());
- stat.setBaseFile(prevStat.getBaseFile());
- stat.setLogFiles(new ArrayList<>(prevStat.getLogFiles()));
+ HoodieDeltaWriteStat stat = prevStat.copy();
this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(),
hoodieTable.shouldTrackSuccessRecords(), config.getWriteStatusFailureFraction());
@@ -567,7 +562,7 @@ public IOType getIOType() {
return IOType.APPEND;
}
- public List writeStatuses() {
+ public List getWriteStatuses() {
return statuses;
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index bdb35641f268..0a0f3352069a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -34,7 +34,6 @@
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
-import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
@@ -115,8 +114,7 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable,
String partitionPath, String fileId, Map> recordMap,
TaskContextSupplier taskContextSupplier) {
- // preserveMetadata is disabled by default for MDT but enabled otherwise
- this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier, !HoodieTableMetadata.isMetadataTable(config.getBasePath()));
+ this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier, true);
this.recordMap = recordMap;
this.useWriterSchema = true;
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index a9b22d083326..4f5f240c4fd0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -47,7 +47,6 @@
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
-import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
@@ -145,8 +144,7 @@ public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTab
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
this.keyToNewRecords = keyToNewRecords;
this.useWriterSchemaForCompaction = true;
- // preserveMetadata is disabled by default for MDT but enabled otherwise
- this.preserveMetadata = !HoodieTableMetadata.isMetadataTable(config.getBasePath());
+ this.preserveMetadata = true;
init(fileId, this.partitionPath, dataFileToBeMerged);
validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
}
@@ -484,6 +482,15 @@ public void performMergeDataValidationCheck(WriteStatus writeStatus) {
}
}
+ public Iterator> getWriteStatusesAsIterator() {
+ List statuses = getWriteStatuses();
+ // TODO(vc): This needs to be revisited
+ if (getPartitionPath() == null) {
+ LOG.info("Upsert Handle has partition path as null {}, {}", getOldFilePath(), statuses);
+ }
+ return Collections.singletonList(statuses).iterator();
+ }
+
public Path getOldFilePath() {
return oldFilePath;
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 9d1bb6d511e8..ab80629c941b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -190,7 +190,7 @@ protected void markClosed() {
public abstract List close();
- public List writeStatuses() {
+ public List getWriteStatuses() {
return Collections.singletonList(writeStatus);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index 7b88a0ab979b..4d7c83a7794d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -146,21 +146,24 @@ public static String[] extractRecordKeysByFields(String recordKey, List
public static String getRecordKey(GenericRecord record, List recordKeyFields, boolean consistentLogicalTimestampEnabled) {
boolean keyIsNullEmpty = true;
StringBuilder recordKey = new StringBuilder();
- for (String recordKeyField : recordKeyFields) {
+ for (int i = 0; i < recordKeyFields.size(); i++) {
+ String recordKeyField = recordKeyFields.get(i);
String recordKeyValue = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true, consistentLogicalTimestampEnabled);
if (recordKeyValue == null) {
- recordKey.append(recordKeyField + DEFAULT_COMPOSITE_KEY_FILED_VALUE + NULL_RECORDKEY_PLACEHOLDER + DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
+ recordKey.append(recordKeyField).append(DEFAULT_COMPOSITE_KEY_FILED_VALUE).append(NULL_RECORDKEY_PLACEHOLDER);
} else if (recordKeyValue.isEmpty()) {
- recordKey.append(recordKeyField + DEFAULT_COMPOSITE_KEY_FILED_VALUE + EMPTY_RECORDKEY_PLACEHOLDER + DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
+ recordKey.append(recordKeyField).append(DEFAULT_COMPOSITE_KEY_FILED_VALUE).append(EMPTY_RECORDKEY_PLACEHOLDER);
} else {
- recordKey.append(recordKeyField + DEFAULT_COMPOSITE_KEY_FILED_VALUE + recordKeyValue + DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
+ recordKey.append(recordKeyField).append(DEFAULT_COMPOSITE_KEY_FILED_VALUE).append(recordKeyValue);
keyIsNullEmpty = false;
}
+ if (i != recordKeyFields.size() - 1) {
+ recordKey.append(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
+ }
}
- recordKey.deleteCharAt(recordKey.length() - 1);
if (keyIsNullEmpty) {
throw new HoodieKeyException("recordKey values: \"" + recordKey + "\" for fields: "
- + recordKeyFields.toString() + " cannot be entirely null or empty.");
+ + recordKeyFields + " cannot be entirely null or empty.");
}
return recordKey.toString();
}
@@ -172,20 +175,27 @@ public static String getRecordPartitionPath(GenericRecord record, List p
}
StringBuilder partitionPath = new StringBuilder();
- for (String partitionPathField : partitionPathFields) {
+ for (int i = 0; i < partitionPathFields.size(); i++) {
+ String partitionPathField = partitionPathFields.get(i);
String fieldVal = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true, consistentLogicalTimestampEnabled);
if (fieldVal == null || fieldVal.isEmpty()) {
- partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + HUDI_DEFAULT_PARTITION_PATH
- : HUDI_DEFAULT_PARTITION_PATH);
+ if (hiveStylePartitioning) {
+ partitionPath.append(partitionPathField).append("=");
+ }
+ partitionPath.append(HUDI_DEFAULT_PARTITION_PATH);
} else {
if (encodePartitionPath) {
fieldVal = PartitionPathEncodeUtils.escapePathName(fieldVal);
}
- partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + fieldVal : fieldVal);
+ if (hiveStylePartitioning) {
+ partitionPath.append(partitionPathField).append("=");
+ }
+ partitionPath.append(fieldVal);
+ }
+ if (i != partitionPathFields.size() - 1) {
+ partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
}
- partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
}
- partitionPath.deleteCharAt(partitionPath.length() - 1);
return partitionPath.toString();
}
@@ -257,7 +267,7 @@ public static List getRecordKeyFields(TypedProperties props) {
* @param props props of interest.
* @return true if record keys need to be auto generated. false otherwise.
*/
- public static boolean enableAutoGenerateRecordKeys(TypedProperties props) {
+ public static boolean isAutoGeneratedRecordKeysEnabled(TypedProperties props) {
return !props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java
index f375095122da..f68e3232753a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java
@@ -98,7 +98,7 @@ public static KeyGenerator createAvroKeyGeneratorByType(TypedProperties props) t
throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " + keyGeneratorType);
}
- if (KeyGenUtils.enableAutoGenerateRecordKeys(props)) {
+ if (KeyGenUtils.isAutoGeneratedRecordKeysEnabled(props)) {
return new AutoRecordGenWrapperAvroKeyGenerator(props, keyGenerator);
} else {
return keyGenerator;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 8d40fc240952..99739947077c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -101,7 +101,7 @@
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
-import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.getIndexInflightInstant;
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeIndexPlan;
@@ -392,7 +392,6 @@ private boolean initializeFromFilesystem(String initializationTime, List dataMetaClient.getTableConfig().isMetadataPartitionAvailable((metadataPartition)));
-
// Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT
List partitionInfoList;
if (filesPartitionAvailable) {
@@ -462,7 +461,9 @@ private boolean initializeFromFilesystem(String initializationTime, List records = fileGroupCountAndRecordsPair.getValue();
bulkCommit(commitTimeForPartition, partitionType, records, fileGroupCount);
metadataMetaClient.reloadActiveTimeline();
- dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionType, true);
+ String partitionPath = (partitionType == FUNCTIONAL_INDEX) ? dataWriteConfig.getFunctionalIndexConfig().getIndexName() : partitionType.getPartitionPath();
+
+ dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionPath, true);
// initialize the metadata reader again so the MDT partition can be read after initialization
initMetadataReader();
long totalInitTime = partitionInitTimer.endTimer();
@@ -795,7 +796,7 @@ public void dropMetadataPartitions(List metadataPartition
for (MetadataPartitionType partitionType : metadataPartitions) {
String partitionPath = partitionType.getPartitionPath();
// first update table config
- dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionType, false);
+ dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionPath, false);
LOG.warn("Deleting Metadata Table partition: " + partitionPath);
dataMetaClient.getFs().delete(new Path(metadataWriteConfig.getBasePath(), partitionPath), true);
// delete corresponding pending indexing instant file in the timeline
@@ -829,7 +830,7 @@ private static void deletePendingIndexingInstant(HoodieTableMetaClient metaClien
protected static void checkNumDeltaCommits(HoodieTableMetaClient metaClient, int maxNumDeltaCommitsWhenPending) {
final HoodieActiveTimeline activeTimeline = metaClient.reloadActiveTimeline();
Option lastCompaction = activeTimeline.filterCompletedInstants()
- .filter(s -> s.getAction().equals(COMPACTION_ACTION)).lastInstant();
+ .filter(s -> s.getAction().equals(COMMIT_ACTION)).lastInstant();
int numDeltaCommits = lastCompaction.isPresent()
? activeTimeline.getDeltaCommitTimeline().findInstantsAfter(lastCompaction.get().getTimestamp()).countInstants()
: activeTimeline.getDeltaCommitTimeline().countInstants();
@@ -900,6 +901,7 @@ public void buildMetadataPartitions(HoodieEngineContext engineContext, List partitionPaths = new ArrayList<>();
List partitionTypes = new ArrayList<>();
indexPartitionInfos.forEach(indexPartitionInfo -> {
String relativePartitionPath = indexPartitionInfo.getMetadataPartitionPath();
@@ -913,10 +915,11 @@ public void buildMetadataPartitions(HoodieEngineContext engineContext, List HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, dataMetaClient, rollbackMetadata, instantTime));
-
+ // The deltacommit that will be rolled back
+ HoodieInstant deltaCommitInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, commitToRollbackInstantTime);
if (deltacommitsSinceCompaction.containsInstant(deltaCommitInstant)) {
LOG.info("Rolling back MDT deltacommit " + commitToRollbackInstantTime);
+ String rollbackInstantTime = createRollbackTimestamp(instantTime);
if (!getWriteClient().rollback(commitToRollbackInstantTime, rollbackInstantTime)) {
throw new HoodieMetadataException("Failed to rollback deltacommit at " + commitToRollbackInstantTime);
}
@@ -1131,8 +1126,9 @@ protected void validateRollback(
String commitToRollbackInstantTime,
HoodieInstant compactionInstant,
HoodieTimeline deltacommitsSinceCompaction) {
- // The commit being rolled back should not be earlier than the latest compaction on the MDT. Compaction on MDT only occurs when all actions
- // are completed on the dataset. Hence, this case implies a rollback of completed commit which should actually be handled using restore.
+ // The commit being rolled back should not be earlier than the latest compaction on the MDT because the latest file slice does not change after all.
+ // Compaction on MDT only occurs when all actions are completed on the dataset.
+ // Hence, this case implies a rollback of completed commit which should actually be handled using restore.
if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
final String compactionInstantTime = compactionInstant.getTimestamp();
if (commitToRollbackInstantTime.length() == compactionInstantTime.length() && HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitToRollbackInstantTime, compactionInstantTime)) {
@@ -1316,9 +1312,8 @@ public void performTableServices(Option inFlightInstantTimestamp) {
.getTimestamp();
LOG.info("Latest deltacommit time found is " + latestDeltacommitTime + ", running clean operations.");
cleanIfNecessary(writeClient, latestDeltacommitTime);
-
// Do timeline validation before scheduling compaction/logCompaction operations.
- if (validateTimelineBeforeSchedulingCompaction(inFlightInstantTimestamp, latestDeltacommitTime)) {
+ if (validateCompactionScheduling()) {
compactIfNecessary(writeClient, latestDeltacommitTime);
}
writeClient.archive();
@@ -1355,10 +1350,12 @@ private void runPendingTableServicesOperations(BaseHoodieWriteClient writeClient
* deltacommit.
*/
protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String latestDeltacommitTime) {
- // Trigger compaction with suffixes based on the same instant time. This ensures that any future
- // delta commits synced over will not have an instant time lesser than the last completed instant on the
- // metadata table.
- final String compactionInstantTime = writeClient.createNewInstantTime(false);
+ // IMPORTANT: Trigger compaction with max instant time that is smaller than(or equals) the earliest pending instant from DT.
+ // The compaction planner will manage to filter out the log files that finished with greater completion time.
+ // see BaseHoodieCompactionPlanGenerator.generateCompactionPlan for more details.
+ final String compactionInstantTime = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+ .findInstantsBeforeOrEquals(latestDeltacommitTime).firstInstant().map(HoodieInstant::getTimestamp)
+ .orElse(writeClient.createNewInstantTime(false));
// we need to avoid checking compaction w/ same instant again.
// let's say we trigger compaction after C5 in MDT and so compaction completes with C4001. but C5 crashed before completing in MDT.
@@ -1407,35 +1404,19 @@ protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instan
/**
* Validates the timeline for both main and metadata tables to ensure compaction on MDT can be scheduled.
*/
- protected boolean validateTimelineBeforeSchedulingCompaction(Option inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
- // we need to find if there are any inflights in data table timeline before or equal to the latest delta commit in metadata table.
- // Whenever you want to change this logic, please ensure all below scenarios are considered.
- // a. There could be a chance that latest delta commit in MDT is committed in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed
- // b. There could be DT inflights after latest delta commit in MDT and we are ok with it. bcoz, the contract is, the latest compaction instant time in MDT represents
- // any instants before that is already synced with metadata table.
- // c. Do consider out of order commits. For eg, c4 from DT could complete before c3. and we can't trigger compaction in MDT with c4 as base instant time, until every
- // instant before c4 is synced with metadata table.
- List pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
- .findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
-
- if (!pendingInstants.isEmpty()) {
- checkNumDeltaCommits(metadataMetaClient, dataWriteConfig.getMetadataConfig().getMaxNumDeltacommitsWhenPending());
- LOG.info(String.format(
- "Cannot compact metadata table as there are %d inflight instants in data table before latest deltacommit in metadata table: %s. Inflight instants in data table: %s",
- pendingInstants.size(), latestDeltaCommitTimeInMetadataTable, Arrays.toString(pendingInstants.toArray())));
- return false;
- }
-
- // Check if there are any pending compaction or log compaction instants in the timeline.
- // If pending compact/logCompaction operations are found abort scheduling new compaction/logCompaction operations.
- Option pendingLogCompactionInstant =
- metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
- Option pendingCompactionInstant =
- metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
- if (pendingLogCompactionInstant.isPresent() || pendingCompactionInstant.isPresent()) {
- LOG.warn(String.format("Not scheduling compaction or logCompaction, since a pending compaction instant %s or logCompaction %s instant is present",
- pendingCompactionInstant, pendingLogCompactionInstant));
- return false;
+ protected boolean validateCompactionScheduling() {
+ // Under the log compaction scope, the sequence of the log-compaction and compaction needs to be ensured because metadata items such as RLI
+ // only has proc-time ordering semantics. For "ensured", it means the completion sequence of the log-compaction/compaction is the same as the start sequence.
+ if (metadataWriteConfig.isLogCompactionEnabled()) {
+ Option pendingLogCompactionInstant =
+ metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+ Option pendingCompactionInstant =
+ metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+ if (pendingLogCompactionInstant.isPresent() || pendingCompactionInstant.isPresent()) {
+ LOG.warn(String.format("Not scheduling compaction or logCompaction, since a pending compaction instant %s or logCompaction %s instant is present",
+ pendingCompactionInstant, pendingLogCompactionInstant));
+ return false;
+ }
}
return true;
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index 7c42ccf50161..48cfb46b49f2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -37,6 +37,7 @@
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
+import org.apache.hudi.config.metrics.HoodieMetricsM3Config;
import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig;
import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig;
import org.apache.hudi.exception.HoodieMetadataException;
@@ -81,11 +82,31 @@ public static HoodieWriteConfig createMetadataWriteConfig(
String tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
final long maxLogFileSizeBytes = writeConfig.getMetadataConfig().getMaxLogFileSize();
+ // Borrow the cleaner policy from the main table and adjust the cleaner policy based on the main table's cleaner policy
+ HoodieCleaningPolicy dataTableCleaningPolicy = writeConfig.getCleanerPolicy();
+ HoodieCleanConfig.Builder cleanConfigBuilder = HoodieCleanConfig.newBuilder()
+ .withAsyncClean(DEFAULT_METADATA_ASYNC_CLEAN)
+ .withAutoClean(false)
+ .withCleanerParallelism(MDT_DEFAULT_PARALLELISM)
+ .withFailedWritesCleaningPolicy(failedWritesCleaningPolicy)
+ .withCleanerPolicy(dataTableCleaningPolicy);
+
+ if (HoodieCleaningPolicy.KEEP_LATEST_COMMITS.equals(dataTableCleaningPolicy)) {
+ int retainCommits = (int) Math.max(DEFAULT_METADATA_CLEANER_COMMITS_RETAINED, writeConfig.getCleanerCommitsRetained() * 1.2);
+ cleanConfigBuilder.retainCommits(retainCommits);
+ } else if (HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.equals(dataTableCleaningPolicy)) {
+ int retainFileVersions = (int) Math.ceil(writeConfig.getCleanerFileVersionsRetained() * 1.2);
+ cleanConfigBuilder.retainFileVersions(retainFileVersions);
+ } else if (HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.equals(dataTableCleaningPolicy)) {
+ int numHoursRetained = (int) Math.ceil(writeConfig.getCleanerHoursRetained() * 1.2);
+ cleanConfigBuilder.cleanerNumHoursRetained(numHoursRetained);
+ }
// Create the write config for the metadata table by borrowing options from the main write config.
HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
.withEngineType(writeConfig.getEngineType())
.withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+ .withMergeAllowDuplicateOnInserts(false)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
.withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled())
.withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs())
@@ -103,14 +124,7 @@ public static HoodieWriteConfig createMetadataWriteConfig(
.withSchema(HoodieMetadataRecord.getClassSchema().toString())
.forTable(tableName)
// we will trigger cleaning manually, to control the instant times
- .withCleanConfig(HoodieCleanConfig.newBuilder()
- .withAsyncClean(DEFAULT_METADATA_ASYNC_CLEAN)
- .withAutoClean(false)
- .withCleanerParallelism(MDT_DEFAULT_PARALLELISM)
- .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
- .withFailedWritesCleaningPolicy(failedWritesCleaningPolicy)
- .retainCommits(DEFAULT_METADATA_CLEANER_COMMITS_RETAINED)
- .build())
+ .withCleanConfig(cleanConfigBuilder.build())
// we will trigger archive manually, to ensure only regular writer invokes it
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
.archiveCommitsWith(
@@ -182,6 +196,15 @@ public static HoodieWriteConfig createMetadataWriteConfig(
.withPushgatewayPortNum(writeConfig.getPushGatewayPort()).build();
builder.withProperties(prometheusConfig.getProps());
break;
+ case M3:
+ HoodieMetricsM3Config m3Config = HoodieMetricsM3Config.newBuilder()
+ .onM3Port(writeConfig.getM3ServerPort())
+ .toM3Host(writeConfig.getM3ServerHost())
+ .useM3Tags(writeConfig.getM3Tags())
+ .useM3Service(writeConfig.getM3Service())
+ .useM3Env(writeConfig.getM3Env()).build();
+ builder.withProperties(m3Config.getProps());
+ break;
case DATADOG:
HoodieMetricsDatadogConfig.Builder datadogConfig = HoodieMetricsDatadogConfig.newBuilder()
.withDatadogApiKey(writeConfig.getDatadogApiKey())
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
index 27034735a040..0d20337fa5c5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
@@ -27,6 +27,7 @@
import org.apache.hudi.metrics.cloudwatch.CloudWatchMetricsReporter;
import org.apache.hudi.metrics.custom.CustomizableMetricsReporter;
import org.apache.hudi.metrics.datadog.DatadogMetricsReporter;
+import org.apache.hudi.metrics.m3.M3MetricsReporter;
import org.apache.hudi.metrics.prometheus.PrometheusReporter;
import org.apache.hudi.metrics.prometheus.PushGatewayMetricsReporter;
@@ -89,6 +90,9 @@ public static Option createReporter(HoodieWriteConfig config, M
case CLOUDWATCH:
reporter = new CloudWatchMetricsReporter(config, registry);
break;
+ case M3:
+ reporter = new M3MetricsReporter(config, registry);
+ break;
default:
LOG.error("Reporter type[" + type + "] is not supported.");
break;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java
index 3c8600159287..6d05e443e6b9 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java
@@ -22,5 +22,5 @@
* Types of the reporter supported, hudi also supports user defined reporter.
*/
public enum MetricsReporterType {
- GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS, CLOUDWATCH
+ GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS, CLOUDWATCH, M3
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3MetricsReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3MetricsReporter.java
new file mode 100644
index 000000000000..a658476ef754
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3MetricsReporter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.m3;
+
+import com.codahale.metrics.MetricRegistry;
+import com.uber.m3.tally.m3.M3Reporter;
+import com.uber.m3.util.Duration;
+import com.uber.m3.util.ImmutableMap;
+import com.uber.m3.tally.RootScopeBuilder;
+import com.uber.m3.tally.Scope;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metrics.MetricsReporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of M3 Metrics reporter, which can report metrics to a https://m3db.io/ service
+ */
+public class M3MetricsReporter extends MetricsReporter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(M3MetricsReporter.class);
+ private final HoodieWriteConfig config;
+ private final MetricRegistry registry;
+ private final ImmutableMap tags;
+
+ public M3MetricsReporter(HoodieWriteConfig config, MetricRegistry registry) {
+ this.config = config;
+ this.registry = registry;
+
+ ImmutableMap.Builder tagBuilder = new ImmutableMap.Builder<>();
+ tagBuilder.putAll(parseOptionalTags(config.getM3Tags()));
+ tagBuilder.put("service", config.getM3Service());
+ tagBuilder.put("env", config.getM3Env());
+ this.tags = tagBuilder.build();
+ LOG.info(String.format("Building M3 Reporter with M3 tags mapping: %s", tags));
+ }
+
+ private static Map parseOptionalTags(String tagValueString) {
+ Map parsedTags = new HashMap();
+ if (!tagValueString.isEmpty()) {
+ Arrays.stream(tagValueString.split(",")).forEach((tagValuePair) -> {
+ String[] parsedTagValuePair = Arrays.stream(tagValuePair.split("="))
+ .map((tagOrValue) -> tagOrValue.trim()).filter((tagOrValue) -> !tagOrValue.isEmpty())
+ .toArray(String[]::new);
+ if (parsedTagValuePair.length != 2) {
+ throw new RuntimeException(String.format(
+ "M3 Reporter tags cannot be initialized with tags [%s] due to not being in format `tag=value, . . .`.",
+ tagValuePair));
+ }
+ parsedTags.put(parsedTagValuePair[0], parsedTagValuePair[1]);
+ });
+ }
+ return parsedTags;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void report() {
+ /*
+ Although com.uber.m3.tally.Scope supports automatically submitting metrics in an interval
+ via a background task, it does not seem to support
+ - an API for explicitly flushing/emitting all metrics
+ - Taking in an external com.codahale.metrics.MetricRegistry metrics registry and automatically
+ adding any new counters/gauges whenever they are added to the registry
+ Due to this, this implementation emits metrics by creating a Scope, adding all metrics from
+ the HUDI metircs registry as counters/gauges to the scope, and then closing the Scope. Since
+ closing this Scope will implicitly flush all M3 metrics, the reporting intervals
+ are configured to be Integer.MAX_VALUE.
+ */
+ synchronized (this) {
+ try (Scope scope = new RootScopeBuilder()
+ .reporter(new M3Reporter.Builder(
+ new InetSocketAddress(config.getM3ServerHost(), config.getM3ServerPort()))
+ .includeHost(true).commonTags(tags)
+ .build())
+ .reportEvery(Duration.ofSeconds(Integer.MAX_VALUE))
+ .tagged(tags)) {
+
+ M3ScopeReporterAdaptor scopeReporter = new M3ScopeReporterAdaptor(registry, scope);
+ scopeReporter.start(Integer.MAX_VALUE, TimeUnit.SECONDS);
+ scopeReporter.report();
+ scopeReporter.stop();
+ } catch (Exception e) {
+ LOG.error(String.format("Error reporting metrics to M3: %s", e));
+ }
+ }
+ }
+
+ @Override
+ public void stop() {}
+}
+
+
+
+
+
+
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3ScopeReporterAdaptor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3ScopeReporterAdaptor.java
new file mode 100644
index 000000000000..ae66914400b9
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3ScopeReporterAdaptor.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.m3;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metered;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import com.uber.m3.tally.Scope;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.hudi.common.util.collection.Pair;
+
+/**
+ * Implementation of com.codahale.metrics.ScheduledReporter, to emit metrics from
+ * com.codahale.metrics.MetricRegistry to M3
+ */
+public class M3ScopeReporterAdaptor extends ScheduledReporter {
+ private final Scope scope;
+
+ protected M3ScopeReporterAdaptor(MetricRegistry registry, Scope scope) {
+ super(registry, "hudi-m3-reporter", MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.SECONDS);
+ this.scope = scope;
+ }
+
+ @Override
+ public void start(long period, TimeUnit unit) {
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public void report(SortedMap gauges, SortedMap counters,
+ SortedMap histograms, SortedMap meters,
+ SortedMap timers) {
+ /*
+ When reporting, process each com.codahale.metrics metric and add counters & gauges to
+ the passed-in com.uber.m3.tally.Scope with the same name and value. This is needed
+ for the Scope to register these metrics
+ */
+ report(scope,
+ gauges,
+ counters,
+ histograms,
+ meters,
+ timers);
+ }
+
+ private void report(Scope scope,
+ Map gauges,
+ Map counters,
+ Map histograms,
+ Map meters,
+ Map timers) {
+
+ for (Entry entry : gauges.entrySet()) {
+ scope.gauge(entry.getKey()).update(
+ ((Number) entry.getValue().getValue()).doubleValue());
+ }
+
+ for (Entry entry : counters.entrySet()) {
+ scope.counter(entry.getKey()).inc(
+ ((Number) entry.getValue().getCount()).longValue());
+ }
+
+ for (Entry entry : histograms.entrySet()) {
+ scope.gauge(MetricRegistry.name(entry.getKey(), "count")).update(
+ entry.getValue().getCount());
+ reportSnapshot(entry.getKey(), entry.getValue().getSnapshot());
+ }
+
+ for (Entry entry : meters.entrySet()) {
+ reportMetered(entry.getKey(), entry.getValue());
+ }
+
+ for (Entry entry : timers.entrySet()) {
+ reportTimer(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private void reportMetered(String name, Metered meter) {
+ scope.counter(MetricRegistry.name(name, "count")).inc(meter.getCount());
+ List> meterGauges = Arrays.asList(
+ Pair.of("m1_rate", meter.getOneMinuteRate()),
+ Pair.of("m5_rate", meter.getFiveMinuteRate()),
+ Pair.of("m15_rate", meter.getFifteenMinuteRate()),
+ Pair.of("mean_rate", meter.getMeanRate())
+ );
+ for (Pair pair : meterGauges) {
+ scope.gauge(MetricRegistry.name(name, pair.getLeft())).update(pair.getRight());
+ }
+ }
+
+ private void reportSnapshot(String name, Snapshot snapshot) {
+ List> snapshotGauges = Arrays.asList(
+ Pair.of("max", snapshot.getMax()),
+ Pair.of("mean", snapshot.getMean()),
+ Pair.of("min", snapshot.getMin()),
+ Pair.of("stddev", snapshot.getStdDev()),
+ Pair.of("p50", snapshot.getMedian()),
+ Pair.of("p75", snapshot.get75thPercentile()),
+ Pair.of("p95", snapshot.get95thPercentile()),
+ Pair.of("p98", snapshot.get98thPercentile()),
+ Pair.of("p99", snapshot.get99thPercentile()),
+ Pair.of("p999", snapshot.get999thPercentile())
+ );
+ for (Pair pair : snapshotGauges) {
+ scope.gauge(MetricRegistry.name(name, pair.getLeft())).update(pair.getRight().doubleValue());
+ }
+ }
+
+ private void reportTimer(String name, Timer timer) {
+ reportMetered(name, timer);
+ reportSnapshot(name, timer.getSnapshot());
+ }
+
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 080fe5f357d6..aadb0d486857 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -67,14 +67,17 @@
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.hudi.hadoop.fs.ConsistencyGuard;
import org.apache.hudi.hadoop.fs.ConsistencyGuard.FileVisibility;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
+import org.apache.hudi.table.action.commit.HoodieMergeHelper;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.storage.HoodieLayoutFactory;
@@ -121,13 +124,12 @@
* @param Type of outputs
*/
public abstract class HoodieTable implements Serializable {
-
private static final Logger LOG = LoggerFactory.getLogger(HoodieTable.class);
protected final HoodieWriteConfig config;
protected final HoodieTableMetaClient metaClient;
protected final HoodieIndex, ?> index;
- private SerializableConfiguration hadoopConfiguration;
+ private final SerializableConfiguration hadoopConfiguration;
protected final TaskContextSupplier taskContextSupplier;
private final HoodieTableMetadata metadata;
private final HoodieStorageLayout storageLayout;
@@ -146,7 +148,7 @@ protected HoodieTable(HoodieWriteConfig config, HoodieEngineContext context, Hoo
.build();
this.metadata = HoodieTableMetadata.create(context, metadataConfig, config.getBasePath());
- this.viewManager = FileSystemViewManager.createViewManager(context, config.getMetadataConfig(), config.getViewStorageConfig(), config.getCommonConfig(), unused -> metadata);
+ this.viewManager = getViewManager();
this.metaClient = metaClient;
this.index = getIndex(config, context);
this.storageLayout = getStorageLayout(config);
@@ -165,7 +167,7 @@ protected HoodieStorageLayout getStorageLayout(HoodieWriteConfig config) {
private synchronized FileSystemViewManager getViewManager() {
if (null == viewManager) {
- viewManager = FileSystemViewManager.createViewManager(getContext(), config.getMetadataConfig(), config.getViewStorageConfig(), config.getCommonConfig(), unused -> metadata);
+ viewManager = FileSystemViewManager.createViewManager(getContext(), config.getViewStorageConfig(), config.getCommonConfig(), unused -> metadata);
}
return viewManager;
}
@@ -177,8 +179,7 @@ private synchronized FileSystemViewManager getViewManager() {
* @param records hoodieRecords to upsert
* @return HoodieWriteMetadata
*/
- public abstract HoodieWriteMetadata upsert(HoodieEngineContext context, String instantTime,
- I records);
+ public abstract HoodieWriteMetadata upsert(HoodieEngineContext context, String instantTime, I records);
/**
* Insert a batch of new records into Hoodie table at the supplied instantTime.
@@ -187,8 +188,7 @@ public abstract HoodieWriteMetadata upsert(HoodieEngineContext context, Strin
* @param records hoodieRecords to upsert
* @return HoodieWriteMetadata
*/
- public abstract HoodieWriteMetadata insert(HoodieEngineContext context, String instantTime,
- I records);
+ public abstract HoodieWriteMetadata insert(HoodieEngineContext context, String instantTime, I records);
/**
* Bulk Insert a batch of new records into Hoodie table at the supplied instantTime.
@@ -267,7 +267,7 @@ public abstract HoodieWriteMetadata insertPrepped(HoodieEngineContext context
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata bulkInsertPrepped(HoodieEngineContext context, String instantTime,
- I preppedRecords, Option bulkInsertPartitioner);
+ I preppedRecords, Option bulkInsertPartitioner);
/**
* Replaces all the existing records and inserts the specified new records into Hoodie table at the supplied instantTime,
@@ -291,6 +291,14 @@ public abstract HoodieWriteMetadata bulkInsertPrepped(HoodieEngineContext con
*/
public abstract HoodieWriteMetadata insertOverwriteTable(HoodieEngineContext context, String instantTime, I records);
+ /**
+ * Delete expired partition by config
+ * @param context HoodieEngineContext
+ * @param instantTime Instant Time for the action
+ * @return HoodieWriteMetadata
+ */
+ public abstract HoodieWriteMetadata managePartitionTTL(HoodieEngineContext context, String instantTime);
+
public HoodieWriteConfig getConfig() {
return config;
}
@@ -564,7 +572,9 @@ public abstract HoodieRollbackMetadata rollback(HoodieEngineContext context,
* @param partitionsToIndex List of {@link MetadataPartitionType} that should be indexed.
* @return HoodieIndexPlan containing metadata partitions and instant upto which they should be indexed.
*/
- public abstract Option scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List partitionsToIndex);
+ public abstract Option scheduleIndexing(HoodieEngineContext context, String indexInstantTime,
+ List partitionsToIndex,
+ List partitionPaths);
/**
* Execute requested index action.
@@ -859,8 +869,10 @@ private void validateSchema() throws HoodieUpsertException, HoodieInsertExceptio
Schema writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
Schema tableSchema = HoodieAvroUtils.createHoodieWriteSchema(existingTableSchema.get());
AvroSchemaUtils.checkSchemaCompatible(tableSchema, writerSchema, shouldValidate, allowProjection, getDropPartitionColNames());
+ } catch (SchemaCompatibilityException e) {
+ throw e;
} catch (Exception e) {
- throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);
+ throw new SchemaCompatibilityException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);
}
}
@@ -984,8 +996,8 @@ public void deleteMetadataIndexIfNecessary() {
if (shouldDeleteMetadataPartition(partitionType)) {
try {
LOG.info("Deleting metadata partition because it is disabled in writer: " + partitionType.name());
- if (metadataPartitionExists(metaClient.getBasePath(), context, partitionType)) {
- deleteMetadataPartition(metaClient.getBasePath(), context, partitionType);
+ if (metadataPartitionExists(metaClient.getBasePath(), context, partitionType.getPartitionPath())) {
+ deleteMetadataPartition(metaClient.getBasePath(), context, partitionType.getPartitionPath());
}
clearMetadataTablePartitionsConfig(Option.of(partitionType), false);
} catch (HoodieMetadataException e) {
@@ -1083,4 +1095,12 @@ private Set getDropPartitionColNames() {
}
return new HashSet<>(Arrays.asList(partitionFields.get()));
}
+
+ public void runMerge(HoodieMergeHandle, ?, ?, ?> upsertHandle, String instantTime, String fileId) throws IOException {
+ if (upsertHandle.getOldFilePath() == null) {
+ throw new HoodieUpsertException("Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
+ } else {
+ HoodieMergeHelper.newInstance().runMerge(this, upsertHandle);
+ }
+ }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index 61c0eeeffb0f..c7e294410e3d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -234,7 +234,7 @@ private HoodieCleanMetadata runClean(HoodieTable table, HoodieInstan
throw new HoodieIOException("Failed to clean up after commit", e);
} finally {
if (!skipLocking) {
- this.txnManager.endTransaction(Option.of(inflightInstant));
+ this.txnManager.endTransaction(Option.ofNullable(inflightInstant));
}
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index 723a95bb2181..77c96b47f057 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -48,6 +48,7 @@
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.hudi.client.utils.MetadataTableUtils.shouldUseBatchLookup;
import static org.apache.hudi.common.util.MapUtils.nonEmpty;
import static org.apache.hudi.table.action.clean.CleanPlanner.SAVEPOINTED_TIMESTAMPS;
@@ -122,10 +123,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
Map> cleanOps = new HashMap<>();
List partitionsToDelete = new ArrayList<>();
+ boolean shouldUseBatchLookup = shouldUseBatchLookup(table.getMetaClient().getTableConfig(), config);
for (int i = 0; i < partitionsToClean.size(); i += cleanerParallelism) {
// Handles at most 'cleanerParallelism' number of partitions once at a time to avoid overlarge memory pressure to the timeline server
// (remote or local embedded), thus to reduce the risk of an OOM exception.
List subPartitionsToClean = partitionsToClean.subList(i, Math.min(i + cleanerParallelism, partitionsToClean.size()));
+ if (shouldUseBatchLookup) {
+ LOG.info("Load partitions and files into file system view in advance. Paths: {}", subPartitionsToClean);
+ table.getHoodieView().loadPartitions(subPartitionsToClean);
+ }
Map>> cleanOpsWithPartitionMeta = context
.map(subPartitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean, earliestInstant)), cleanerParallelism)
.stream()
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 19cbe0f91a73..753f8c8253d5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -64,8 +64,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.hudi.client.utils.MetadataTableUtils.shouldUseBatchLookup;
-
/**
* Cleaner is responsible for garbage collecting older files in a given partition path. Such that
*
@@ -108,14 +106,9 @@ public CleanPlanner(HoodieEngineContext context, HoodieTable hoodieT
.map(entry -> Pair.of(new HoodieFileGroupId(entry.getValue().getPartitionPath(), entry.getValue().getFileId()), entry.getValue()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
- // load all partitions in advance if necessary.
- if (shouldUseBatchLookup(hoodieTable.getMetaClient().getTableConfig(), config)) {
- LOG.info("Load all partitions and files into file system view in advance.");
- fileSystemView.loadAllPartitions();
- }
- // collect savepointed timestamps to be assist with incremental cleaning. For non-partitioned and metadata table, we may not need this.
- this.savepointedTimestamps = hoodieTable.isMetadataTable() ? Collections.EMPTY_LIST : (hoodieTable.isPartitioned() ? hoodieTable.getSavepointTimestamps().stream().collect(Collectors.toList())
- : Collections.EMPTY_LIST);
+ // collect savepointed timestamps to assist with incremental cleaning. For non-partitioned and metadata table, we may not need this.
+ this.savepointedTimestamps = hoodieTable.isMetadataTable() ? Collections.emptyList() : (hoodieTable.isPartitioned() ? new ArrayList<>(hoodieTable.getSavepointTimestamps())
+ : Collections.emptyList());
}
/**
@@ -234,8 +227,8 @@ private List getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata
}
private List getPartitionsFromDeletedSavepoint(HoodieCleanMetadata cleanMetadata) {
- List savepointedTimestampsFromLastClean = Arrays.stream(cleanMetadata.getExtraMetadata()
- .getOrDefault(SAVEPOINTED_TIMESTAMPS, StringUtils.EMPTY_STRING).split(","))
+ List savepointedTimestampsFromLastClean = cleanMetadata.getExtraMetadata() == null ? Collections.emptyList()
+ : Arrays.stream(cleanMetadata.getExtraMetadata().getOrDefault(SAVEPOINTED_TIMESTAMPS, StringUtils.EMPTY_STRING).split(","))
.filter(partition -> !StringUtils.isNullOrEmpty(partition)).collect(Collectors.toList());
if (savepointedTimestampsFromLastClean.isEmpty()) {
return Collections.emptyList();
@@ -252,6 +245,7 @@ private List getPartitionsFromDeletedSavepoint(HoodieCleanMetadata clean
Option instantOption = hoodieTable.getCompletedCommitsTimeline().filter(instant -> instant.getTimestamp().equals(savepointCommit)).firstInstant();
if (!instantOption.isPresent()) {
LOG.warn("Skipping to process a commit for which savepoint was removed as the instant moved to archived timeline already");
+ return Stream.empty();
}
HoodieInstant instant = instantOption.get();
return getPartitionsForInstants(instant);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
index 18c98d377f6c..6cb8f023ba83 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
@@ -57,7 +57,8 @@ public ClusteringPlanActionExecutor(HoodieEngineContext context,
protected Option createClusteringPlan() {
LOG.info("Checking if clustering needs to be run on " + config.getBasePath());
- Option lastClusteringInstant = table.getActiveTimeline().getLastClusterCommit();
+ Option lastClusteringInstant =
+ table.getActiveTimeline().getLastClusteringInstant();
int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
index 0d07bed531a4..a6894388f6d2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
@@ -54,7 +54,7 @@ public abstract class ClusteringPlanStrategy implements Serializable {
public static final int CLUSTERING_PLAN_VERSION_1 = 1;
- private final HoodieTable hoodieTable;
+ protected final HoodieTable hoodieTable;
private final transient HoodieEngineContext engineContext;
private final HoodieWriteConfig writeConfig;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index c5f5273fbad9..36f75b6a5b07 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -263,6 +263,7 @@ protected abstract Iterator> handleUpdate(String partitionPath
Iterator> recordItr) throws IOException;
protected HoodieWriteMetadata> executeClustering(HoodieClusteringPlan clusteringPlan) {
+ context.setJobStatus(this.getClass().getSimpleName(), "Clustering records for " + config.getTableName());
HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
// Mark instant as clustering inflight
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
@@ -285,6 +286,7 @@ protected HoodieWriteMetadata> executeClustering(HoodieC
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan, writeMetadata));
commitOnAutoCommit(writeMetadata);
if (!writeMetadata.getCommitMetadata().isPresent()) {
+ LOG.info("Found empty commit metadata for clustering with instant time " + instantTime);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
writeMetadata.setCommitMetadata(Option.of(commitMetadata));
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index 2aac6f0db8ee..941d93fd3506 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -45,7 +45,6 @@
import javax.annotation.Nullable;
import java.io.IOException;
-import java.text.ParseException;
import java.util.Map;
import static org.apache.hudi.common.util.CollectionUtils.nonEmpty;
@@ -211,12 +210,7 @@ private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy)
}
private Long parsedToSeconds(String time) {
- long timestamp;
- try {
- timestamp = HoodieActiveTimeline.parseDateFromInstantTime(time).getTime() / 1000;
- } catch (ParseException e) {
- throw new HoodieCompactionException(e.getMessage(), e);
- }
- return timestamp;
+ return HoodieActiveTimeline.parseDateFromInstantTimeSafely(time).orElseThrow(() -> new HoodieCompactionException("Failed to parse timestamp " + time))
+ .getTime() / 1000;
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index fc0c320b4406..94c4296e470e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -51,7 +51,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -72,7 +71,6 @@
import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
import static org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
-import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
@@ -220,9 +218,8 @@ private void abort(HoodieInstant indexInstant, Set requestedPartitions)
// delete metadata partition
requestedPartitions.forEach(partition -> {
- MetadataPartitionType partitionType = MetadataPartitionType.valueOf(partition.toUpperCase(Locale.ROOT));
- if (metadataPartitionExists(table.getMetaClient().getBasePathV2().toString(), context, partitionType)) {
- deleteMetadataPartition(table.getMetaClient().getBasePathV2().toString(), context, partitionType);
+ if (metadataPartitionExists(table.getMetaClient().getBasePathV2().toString(), context, partition)) {
+ deleteMetadataPartition(table.getMetaClient().getBasePathV2().toString(), context, partition);
}
});
@@ -320,9 +317,7 @@ private static List getCompletedArchivedAndActiveInstantsAfter(St
private void updateMetadataPartitionsTableConfig(HoodieTableMetaClient metaClient, Set metadataPartitions) {
metadataPartitions.forEach(metadataPartition -> {
- MetadataPartitionType partitionType = metadataPartition.startsWith(PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX) ? MetadataPartitionType.FUNCTIONAL_INDEX :
- MetadataPartitionType.valueOf(metadataPartition.toUpperCase(Locale.ROOT));
- metaClient.getTableConfig().setMetadataPartitionState(metaClient, partitionType, true);
+ metaClient.getTableConfig().setMetadataPartitionState(metaClient, metadataPartition, true);
});
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
index 7b27d7ef6e1c..da85fc4d6340 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
@@ -67,15 +67,20 @@ public class ScheduleIndexActionExecutor extends BaseActionExecutor<
private static final Integer LATEST_INDEX_PLAN_VERSION = INDEX_PLAN_VERSION_1;
private final List partitionIndexTypes;
+
+ private final List partitionPaths;
+
private final TransactionManager txnManager;
public ScheduleIndexActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable table,
String instantTime,
- List partitionIndexTypes) {
+ List partitionIndexTypes,
+ List partitionPaths) {
super(context, config, table, instantTime);
this.partitionIndexTypes = partitionIndexTypes;
+ this.partitionPaths = partitionPaths;
this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
}
@@ -84,8 +89,11 @@ public Option execute() {
validateBeforeScheduling();
// make sure that it is idempotent, check with previously pending index operations.
Set indexesInflightOrCompleted = getInflightAndCompletedMetadataPartitions(table.getMetaClient().getTableConfig());
+
Set requestedPartitions = partitionIndexTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
+ requestedPartitions.addAll(partitionPaths);
requestedPartitions.removeAll(indexesInflightOrCompleted);
+
if (!requestedPartitions.isEmpty()) {
LOG.warn(String.format("Following partitions already exist or inflight: %s. Going to schedule indexing of only these partitions: %s",
indexesInflightOrCompleted, requestedPartitions));
@@ -142,8 +150,8 @@ private void validateBeforeScheduling() {
private void abort(HoodieInstant indexInstant) {
// delete metadata partition
partitionIndexTypes.forEach(partitionType -> {
- if (metadataPartitionExists(table.getMetaClient().getBasePath(), context, partitionType)) {
- deleteMetadataPartition(table.getMetaClient().getBasePath(), context, partitionType);
+ if (metadataPartitionExists(table.getMetaClient().getBasePath(), context, partitionType.getPartitionPath())) {
+ deleteMetadataPartition(table.getMetaClient().getBasePath(), context, partitionType.getPartitionPath());
}
});
// delete requested instant
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 077f956eb7be..51159d3d5c3a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -188,7 +188,7 @@ private void validateRollbackCommitSequence() {
if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
return true;
}
- return !ClusteringUtils.isPendingClusteringInstant(table.getMetaClient(), instant);
+ return !ClusteringUtils.isClusteringInstant(table.getActiveTimeline(), instant);
}).map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
if ((instantTimeToRollback != null) && !inflights.isEmpty()
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
index b3ee11b9836e..2f9e96859ff6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
@@ -71,7 +71,7 @@ public Option execute() {
// rollback pending clustering instants first before other instants (See HUDI-3362)
List pendingClusteringInstantsToRollback = table.getActiveTimeline().filterPendingReplaceTimeline()
// filter only clustering related replacecommits (Not insert_overwrite related commits)
- .filter(instant -> ClusteringUtils.isPendingClusteringInstant(table.getMetaClient(), instant))
+ .filter(instant -> ClusteringUtils.isClusteringInstant(table.getActiveTimeline(), instant))
.getReverseOrderedInstants()
.filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), savepointToRestoreTimestamp))
.collect(Collectors.toList());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/HoodiePartitionTTLStrategyFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/HoodiePartitionTTLStrategyFactory.java
new file mode 100644
index 000000000000..26bcaa9fe51e
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/HoodiePartitionTTLStrategyFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.ttl.strategy;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieTTLConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Locale;
+
+import static org.apache.hudi.config.HoodieTTLConfig.PARTITION_TTL_STRATEGY_CLASS_NAME;
+import static org.apache.hudi.config.HoodieTTLConfig.PARTITION_TTL_STRATEGY_TYPE;
+
+/**
+ * Factory help to create {@link PartitionTTLStrategy}.
+ *
+ * This factory will try {@link HoodieTTLConfig#PARTITION_TTL_STRATEGY_CLASS_NAME} firstly,
+ * this ensures the class prop will not be overwritten by {@link PartitionTTLStrategyType}
+ */
+public class HoodiePartitionTTLStrategyFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HoodiePartitionTTLStrategyFactory.class);
+
+ public static PartitionTTLStrategy createStrategy(HoodieTable hoodieTable, TypedProperties props, String instantTime) throws IOException {
+ String strategyClassName = getPartitionTTLStrategyClassName(props);
+ try {
+ return (PartitionTTLStrategy) ReflectionUtils.loadClass(strategyClassName,
+ new Class>[] {HoodieTable.class, String.class}, hoodieTable, instantTime);
+ } catch (Throwable e) {
+ throw new IOException("Could not load partition ttl management strategy class " + strategyClassName, e);
+ }
+ }
+
+ private static String getPartitionTTLStrategyClassName(TypedProperties props) {
+ String strategyClassName =
+ props.getString(PARTITION_TTL_STRATEGY_CLASS_NAME.key(), null);
+ if (StringUtils.isNullOrEmpty(strategyClassName)) {
+ String strategyType = props.getString(PARTITION_TTL_STRATEGY_TYPE.key(),
+ PARTITION_TTL_STRATEGY_TYPE.defaultValue());
+ PartitionTTLStrategyType strategyTypeEnum;
+ try {
+ strategyTypeEnum = PartitionTTLStrategyType.valueOf(strategyType.toUpperCase(Locale.ROOT));
+ } catch (IllegalArgumentException e) {
+ throw new HoodieException("Unsupported PartitionTTLStrategy Type " + strategyType);
+ }
+ strategyClassName = getPartitionTTLStrategyFromType(strategyTypeEnum);
+ }
+ return strategyClassName;
+ }
+
+ /**
+ * @param type {@link PartitionTTLStrategyType} enum.
+ * @return The partition ttl management strategy class name based on the {@link PartitionTTLStrategyType}.
+ */
+ public static String getPartitionTTLStrategyFromType(PartitionTTLStrategyType type) {
+ switch (type) {
+ case KEEP_BY_TIME:
+ return KeepByTimeStrategy.class.getName();
+ case KEEP_BY_CREATION_TIME:
+ return KeepByCreationTimeStrategy.class.getName();
+ default:
+ throw new HoodieException("Unsupported PartitionTTLStrategy Type " + type);
+ }
+ }
+
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByCreationTimeStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByCreationTimeStrategy.java
new file mode 100644
index 000000000000..a350086f2dcb
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByCreationTimeStrategy.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.ttl.strategy;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * KeepByTimeStrategy will return expired partitions by their lastCommitTime.
+ */
+public class KeepByCreationTimeStrategy extends KeepByTimeStrategy {
+
+ public KeepByCreationTimeStrategy(HoodieTable hoodieTable, String instantTime) {
+ super(hoodieTable, instantTime);
+ }
+
+ @Override
+ protected List getExpiredPartitionsForTimeStrategy(List partitionPathsForTTL) {
+ HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+ return partitionPathsForTTL.stream().parallel().filter(part -> {
+ HoodiePartitionMetadata hoodiePartitionMetadata =
+ new HoodiePartitionMetadata(metaClient.getFs(), FSUtils.getPartitionPath(metaClient.getBasePath(), part));
+ Option instantOption = hoodiePartitionMetadata.readPartitionCreatedCommitTime();
+ if (instantOption.isPresent()) {
+ String instantTime = instantOption.get();
+ return isPartitionExpired(instantTime);
+ }
+ return false;
+ }).collect(Collectors.toList());
+ }
+
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java
new file mode 100644
index 000000000000..b6d67bb9e8a3
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.ttl.strategy;
+
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.table.HoodieTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.fixInstantTimeCompatibility;
+import static org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.instantTimePlusMillis;
+
+/**
+ * KeepByTimeStrategy will return expired partitions by their lastCommitTime.
+ */
+public class KeepByTimeStrategy extends PartitionTTLStrategy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KeepByTimeStrategy.class);
+
+ protected final long ttlInMilis;
+
+ public KeepByTimeStrategy(HoodieTable hoodieTable, String instantTime) {
+ super(hoodieTable, instantTime);
+ this.ttlInMilis = writeConfig.getPartitionTTLStrategyDaysRetain() * 1000 * 3600 * 24;
+ }
+
+ @Override
+ public List getExpiredPartitionPaths() {
+ Option lastCompletedInstant = hoodieTable.getActiveTimeline().filterCompletedInstants().lastInstant();
+ if (!lastCompletedInstant.isPresent() || ttlInMilis <= 0
+ || !hoodieTable.getMetaClient().getTableConfig().getPartitionFields().isPresent()) {
+ return Collections.emptyList();
+ }
+ List expiredPartitions = getExpiredPartitionsForTimeStrategy(getPartitionPathsForTTL());
+ int limit = writeConfig.getPartitionTTLMaxPartitionsToDelete();
+ LOG.info("Total expired partitions count {}, limit {}", expiredPartitions.size(), limit);
+ return expiredPartitions.stream()
+ .limit(limit) // Avoid a single replace commit too large
+ .collect(Collectors.toList());
+ }
+
+ protected List getExpiredPartitionsForTimeStrategy(List partitionsForTTLManagement) {
+ HoodieTimer timer = HoodieTimer.start();
+ Map> lastCommitTimeForPartitions = getLastCommitTimeForPartitions(partitionsForTTLManagement);
+ LOG.info("Collect last commit time for partitions cost {} ms", timer.endTimer());
+ return lastCommitTimeForPartitions.entrySet()
+ .stream()
+ .filter(entry -> entry.getValue().isPresent())
+ .filter(entry -> isPartitionExpired(entry.getValue().get()))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * @param partitionPaths Partitions to collect stats.
+ */
+ private Map> getLastCommitTimeForPartitions(List partitionPaths) {
+ int statsParallelism = Math.min(partitionPaths.size(), 200);
+ return hoodieTable.getContext().map(partitionPaths, partitionPath -> {
+ Option partitionLastModifiedTime = hoodieTable.getHoodieView()
+ .getLatestFileSlicesBeforeOrOn(partitionPath, instantTime, true)
+ .map(FileSlice::getBaseInstantTime)
+ .max(Comparator.naturalOrder())
+ .map(Option::ofNullable)
+ .orElse(Option.empty());
+ return Pair.of(partitionPath, partitionLastModifiedTime);
+ }, statsParallelism).stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+ }
+
+ /**
+ * Determines if a partition's reference time has exceeded its time-to-live (TTL).
+ *
+ * This method checks if the current time has passed the TTL threshold based on a
+ * reference time, which could be the creation time or the last commit time of the partition.
+ *
+ * @param referenceTime last commit time or creation time for partition
+ */
+ protected boolean isPartitionExpired(String referenceTime) {
+ String expiredTime = instantTimePlusMillis(fixInstantTimeCompatibility(referenceTime), ttlInMilis);
+ return fixInstantTimeCompatibility(instantTime).compareTo(expiredTime) > 0;
+ }
+
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategy.java
new file mode 100644
index 000000000000..477688709303
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategy.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.ttl.strategy;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Strategy for partition-level ttl management.
+ */
+public abstract class PartitionTTLStrategy implements TTLStrategy, Serializable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PartitionTTLStrategy.class);
+
+ protected final HoodieTable hoodieTable;
+ protected final HoodieWriteConfig writeConfig;
+ protected final String instantTime;
+
+ public PartitionTTLStrategy(HoodieTable hoodieTable, String instantTime) {
+ this.writeConfig = hoodieTable.getConfig();
+ this.hoodieTable = hoodieTable;
+ this.instantTime = instantTime;
+ }
+
+ /**
+ * Get expired partition paths for a specific partition ttl strategy.
+ *
+ * @return Expired partition paths.
+ */
+ public abstract List getExpiredPartitionPaths();
+
+ /**
+ * Scan and list all partitions for partition ttl management.
+ *
+ * @return all partitions paths for the dataset.
+ */
+ protected List getPartitionPathsForTTL() {
+ String partitionSelected = writeConfig.getClusteringPartitionSelected();
+ HoodieTimer timer = HoodieTimer.start();
+ List partitionsForTTL;
+ if (StringUtils.isNullOrEmpty(partitionSelected)) {
+ // Return All partition paths
+ partitionsForTTL = FSUtils.getAllPartitionPaths(hoodieTable.getContext(), writeConfig.getMetadataConfig(), writeConfig.getBasePath());
+ } else {
+ partitionsForTTL = Arrays.asList(partitionSelected.split(","));
+ }
+ LOG.info("Get partitions for ttl cost {} ms", timer.endTimer());
+ return partitionsForTTL;
+ }
+
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategyType.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategyType.java
new file mode 100644
index 000000000000..6dfbcd6d0e59
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategyType.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.ttl.strategy;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hudi.config.HoodieTTLConfig.PARTITION_TTL_STRATEGY_CLASS_NAME;
+import static org.apache.hudi.config.HoodieTTLConfig.PARTITION_TTL_STRATEGY_TYPE;
+
+/**
+ * Types of {@link PartitionTTLStrategy}.
+ */
+public enum PartitionTTLStrategyType {
+ KEEP_BY_TIME("org.apache.hudi.table.action.ttl.strategy.KeepByTimeStrategy"),
+ KEEP_BY_CREATION_TIME("org.apache.hudi.table.action.ttl.strategy.KeepByCreationTimeStrategy");
+
+ private final String className;
+
+ PartitionTTLStrategyType(String className) {
+ this.className = className;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ public static PartitionTTLStrategyType fromClassName(String className) {
+ for (PartitionTTLStrategyType type : PartitionTTLStrategyType.values()) {
+ if (type.getClassName().equals(className)) {
+ return type;
+ }
+ }
+ throw new IllegalArgumentException("No PartitionTTLStrategyType found for class name: " + className);
+ }
+
+ public static List getPartitionTTLStrategyNames() {
+ List names = new ArrayList<>(PartitionTTLStrategyType.values().length);
+ Arrays.stream(PartitionTTLStrategyType.values())
+ .forEach(x -> names.add(x.name()));
+ return names;
+ }
+
+ @Nullable
+ public static String getPartitionTTLStrategyClassName(HoodieConfig config) {
+ if (config.contains(PARTITION_TTL_STRATEGY_CLASS_NAME)) {
+ return config.getString(PARTITION_TTL_STRATEGY_CLASS_NAME);
+ } else if (config.contains(PARTITION_TTL_STRATEGY_TYPE)) {
+ return KeyGeneratorType.valueOf(config.getString(PARTITION_TTL_STRATEGY_TYPE)).getClassName();
+ }
+ return null;
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncompatibleSchemaException.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/TTLStrategy.java
similarity index 71%
rename from hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncompatibleSchemaException.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/TTLStrategy.java
index a739af67909b..ad41f95fba27 100644
--- a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncompatibleSchemaException.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/TTLStrategy.java
@@ -16,18 +16,10 @@
* limitations under the License.
*/
-package org.apache.hudi.exception;
+package org.apache.hudi.table.action.ttl.strategy;
/**
- * Exception for incompatible schema.
+ * Strategy for ttl management.
*/
-public class HoodieIncompatibleSchemaException extends RuntimeException {
-
- public HoodieIncompatibleSchemaException(String msg, Throwable e) {
- super(msg, e);
- }
-
- public HoodieIncompatibleSchemaException(String msg) {
- super(msg);
- }
+public interface TTLStrategy {
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java
index c7cb544aec94..edc2d19cf4bc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java
@@ -49,7 +49,7 @@ public Map upgrade(HoodieWriteConfig config, HoodieEngin
tablePropsToAdd.put(TABLE_CHECKSUM, String.valueOf(HoodieTableConfig.generateChecksum(config.getProps())));
// if metadata is enabled and files partition exist then update TABLE_METADATA_INDEX_COMPLETED
// schema for the files partition is same between the two versions
- if (config.isMetadataTableEnabled() && metadataPartitionExists(config.getBasePath(), context, MetadataPartitionType.FILES)) {
+ if (config.isMetadataTableEnabled() && metadataPartitionExists(config.getBasePath(), context, MetadataPartitionType.FILES.getPartitionPath())) {
tablePropsToAdd.put(TABLE_METADATA_PARTITIONS, MetadataPartitionType.FILES.getPartitionPath());
}
return tablePropsToAdd;
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/HoodieTestCommitGenerator.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/HoodieTestCommitGenerator.java
index edb3617ea9ef..296cb162a424 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/HoodieTestCommitGenerator.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/HoodieTestCommitGenerator.java
@@ -24,6 +24,7 @@
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.InProcessTimeGenerator;
@@ -111,7 +112,7 @@ public static void setupTimelineInFS(
}
public static String getBaseFilename(String instantTime, String fileId) {
- return FSUtils.makeBaseFileName(instantTime, BASE_FILE_WRITE_TOKEN, fileId);
+ return FSUtils.makeBaseFileName(instantTime, BASE_FILE_WRITE_TOKEN, fileId, HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension());
}
public static String getLogFilename(String instantTime, String fileId) {
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieArchivedTimeline.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieArchivedTimeline.java
new file mode 100644
index 000000000000..664385f9ae06
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieArchivedTimeline.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.timeline;
+
+import org.apache.hudi.DummyActiveAction;
+import org.apache.hudi.client.timeline.LSMTimelineWriter;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test cases for {@link HoodieArchivedTimeline}.
+ */
+public class TestHoodieArchivedTimeline extends HoodieCommonTestHarness {
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ initMetaClient();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ cleanMetaClient();
+ }
+
+ @Test
+ public void testLoadingInstantsIncrementally() throws Exception {
+ writeArchivedTimeline(10, 10000000);
+ // now we got 500 instants spread in 5 parquets.
+ HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline("10000043");
+ assertThat(archivedTimeline.firstInstant().map(HoodieInstant::getTimestamp).orElse(""), is("10000043"));
+ assertThat(archivedTimeline.lastInstant().map(HoodieInstant::getTimestamp).orElse(""), is("10000050"));
+ // load incrementally
+ archivedTimeline.reload("10000034");
+ assertThat(archivedTimeline.firstInstant().map(HoodieInstant::getTimestamp).orElse(""), is("10000034"));
+ archivedTimeline.reload("10000011");
+ assertThat(archivedTimeline.firstInstant().map(HoodieInstant::getTimestamp).orElse(""), is("10000011"));
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ private void writeArchivedTimeline(int batchSize, long startTs) throws Exception {
+ HoodieTestTable testTable = HoodieTestTable.of(this.metaClient);
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(this.metaClient.getBasePathV2().toString())
+ .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+ .withMarkersType("DIRECT")
+ .build();
+ HoodieEngineContext engineContext = new HoodieLocalEngineContext(new Configuration());
+ LSMTimelineWriter writer = LSMTimelineWriter.getInstance(writeConfig, new LocalTaskContextSupplier(), metaClient);
+ List instantBuffer = new ArrayList<>();
+ for (int i = 1; i <= 50; i++) {
+ long instantTimeTs = startTs + i;
+ String instantTime = String.valueOf(instantTimeTs);
+ String completionTime = String.valueOf(instantTimeTs + 10);
+ HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", instantTime, completionTime);
+ HoodieCommitMetadata metadata = testTable.createCommitMetadata(instantTime, WriteOperationType.INSERT, Arrays.asList("par1", "par2"), 10, false);
+ byte[] serializedMetadata = TimelineMetadataUtils.serializeCommitMetadata(metadata).get();
+ instantBuffer.add(new DummyActiveAction(instant, serializedMetadata));
+ if (i % batchSize == 0) {
+ // archive 10 instants each time
+ writer.write(instantBuffer, org.apache.hudi.common.util.Option.empty(), org.apache.hudi.common.util.Option.empty());
+ writer.compactAndClean(engineContext);
+ instantBuffer.clear();
+ }
+ }
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
index 3bcba72eb684..5024a9f59d2c 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.testutils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -134,6 +135,16 @@ public HoodieCleanMetadata doClean(String commitTime, Map parti
return cleanMetadata;
}
+ @Override
+ public void repeatClean(String cleanCommitTime,
+ HoodieCleanerPlan cleanerPlan,
+ HoodieCleanMetadata cleanMetadata) throws IOException {
+ super.repeatClean(cleanCommitTime, cleanerPlan, cleanMetadata);
+ if (writer != null) {
+ writer.update(cleanMetadata, cleanCommitTime);
+ }
+ }
+
public HoodieTestTable addCompaction(String instantTime, HoodieCommitMetadata commitMetadata) throws Exception {
super.addCompaction(instantTime, commitMetadata);
if (writer != null) {
@@ -148,7 +159,6 @@ public HoodieTestTable addRollback(String instantTime, HoodieRollbackMetadata ro
if (writer != null) {
writer.update(rollbackMetadata, instantTime);
}
- super.addRollbackCompleted(instantTime, rollbackMetadata, false);
return this;
}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 5c93f924ecef..90fcfd4fd7ae 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -89,6 +89,7 @@ public void testPropertyLoading(boolean withAlternative) throws IOException {
assertEquals(5, config.getMaxCommitsToKeep());
assertEquals(2, config.getMinCommitsToKeep());
assertTrue(config.shouldUseExternalSchemaTransformation());
+ assertTrue(config.allowDuplicateInserts());
}
@Test
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
new file mode 100644
index 000000000000..529d2ddfc7ff
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class TestHoodieMetadataWriteUtils {
+
+ @Test
+ public void testCreateMetadataWriteConfigForCleaner() {
+ HoodieWriteConfig writeConfig1 = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp")
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+ .retainCommits(5).build())
+ .build();
+
+ HoodieWriteConfig metadataWriteConfig1 = HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig1, HoodieFailedWritesCleaningPolicy.EAGER);
+ assertEquals(HoodieFailedWritesCleaningPolicy.EAGER, metadataWriteConfig1.getFailedWritesCleanPolicy());
+ assertEquals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS, metadataWriteConfig1.getCleanerPolicy());
+ // default value already greater than data cleaner commits retained * 1.2
+ assertEquals(HoodieMetadataConfig.DEFAULT_METADATA_CLEANER_COMMITS_RETAINED, metadataWriteConfig1.getCleanerCommitsRetained());
+
+ assertNotEquals(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, metadataWriteConfig1.getCleanerPolicy());
+ assertNotEquals(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS, metadataWriteConfig1.getCleanerPolicy());
+
+ HoodieWriteConfig writeConfig2 = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp")
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+ .retainCommits(20).build())
+ .build();
+ HoodieWriteConfig metadataWriteConfig2 = HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig2, HoodieFailedWritesCleaningPolicy.EAGER);
+ assertEquals(HoodieFailedWritesCleaningPolicy.EAGER, metadataWriteConfig2.getFailedWritesCleanPolicy());
+ assertEquals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS, metadataWriteConfig2.getCleanerPolicy());
+ // data cleaner commits retained * 1.2 is greater than default
+ assertEquals(24, metadataWriteConfig2.getCleanerCommitsRetained());
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java
new file mode 100644
index 000000000000..e7299d706b89
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.m3;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+import org.apache.hudi.common.testutils.NetworkTestUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metrics.HoodieMetrics;
+import org.apache.hudi.metrics.Metrics;
+import org.apache.hudi.metrics.MetricsReporterType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class TestM3Metrics {
+
+ @Mock
+ HoodieWriteConfig config;
+ HoodieMetrics hoodieMetrics;
+ Metrics metrics;
+
+ @BeforeEach
+ public void start() {
+ when(config.isMetricsOn()).thenReturn(true);
+ when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.M3);
+ when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID());
+ }
+
+ @Test
+ public void testRegisterGauge() {
+ when(config.getM3ServerHost()).thenReturn("localhost");
+ when(config.getM3ServerPort()).thenReturn(NetworkTestUtils.nextFreePort());
+ when(config.getTableName()).thenReturn("raw_table");
+ when(config.getM3Env()).thenReturn("dev");
+ when(config.getM3Service()).thenReturn("hoodie");
+ when(config.getM3Tags()).thenReturn("tag1=value1,tag2=value2");
+ when(config.getMetricReporterMetricsNamePrefix()).thenReturn("");
+ hoodieMetrics = new HoodieMetrics(config);
+ metrics = hoodieMetrics.getMetrics();
+ metrics.registerGauge("metric1", 123L);
+ assertEquals("123", metrics.getRegistry().getGauges().get("metric1").getValue().toString());
+ metrics.shutdown();
+ }
+
+ @Test
+ public void testEmptyM3Tags() {
+ when(config.getM3ServerHost()).thenReturn("localhost");
+ when(config.getM3ServerPort()).thenReturn(NetworkTestUtils.nextFreePort());
+ when(config.getTableName()).thenReturn("raw_table");
+ when(config.getM3Env()).thenReturn("dev");
+ when(config.getM3Service()).thenReturn("hoodie");
+ when(config.getM3Tags()).thenReturn("");
+ when(config.getMetricReporterMetricsNamePrefix()).thenReturn("");
+ hoodieMetrics = new HoodieMetrics(config);
+ metrics = hoodieMetrics.getMetrics();
+ metrics.registerGauge("metric1", 123L);
+ assertEquals("123", metrics.getRegistry().getGauges().get("metric1").getValue().toString());
+ metrics.shutdown();
+ }
+
+ @Test
+ public void testInvalidM3Tags() {
+ when(config.getTableName()).thenReturn("raw_table");
+ when(config.getMetricReporterMetricsNamePrefix()).thenReturn("");
+ assertThrows(RuntimeException.class, () -> {
+ hoodieMetrics = new HoodieMetrics(config);
+ });
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
index 4268cc36d474..9989273b723f 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
@@ -138,14 +138,14 @@ void testGetDeletePaths(HoodieWriteConfig config, String earliestInstant, List partitionsInLastClean,
Map> savepointsTrackedInLastClean, Map> activeInstantsPartitions,
- Map> savepoints, List expectedPartitions) throws IOException {
+ Map> savepoints, List expectedPartitions, boolean areCommitsForSavepointsRemoved) throws IOException {
HoodieActiveTimeline activeTimeline = mock(HoodieActiveTimeline.class);
when(mockHoodieTable.getActiveTimeline()).thenReturn(activeTimeline);
// setup savepoint mocks
Set savepointTimestamps = savepoints.keySet().stream().collect(Collectors.toSet());
when(mockHoodieTable.getSavepointTimestamps()).thenReturn(savepointTimestamps);
if (!savepoints.isEmpty()) {
- for (Map.Entry> entry: savepoints.entrySet()) {
+ for (Map.Entry> entry : savepoints.entrySet()) {
Pair> savepointMetadataOptionPair = getSavepointMetadata(entry.getValue());
HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, entry.getKey());
when(activeTimeline.getInstantDetails(instant)).thenReturn(savepointMetadataOptionPair.getRight());
@@ -156,7 +156,7 @@ void testPartitionsForIncrCleaning(HoodieWriteConfig config, String earliestInst
Pair> cleanMetadataOptionPair =
getCleanCommitMetadata(partitionsInLastClean, lastCleanInstant, earliestInstantsInLastClean, lastCompletedTimeInLastClean, savepointsTrackedInLastClean.keySet());
mockLastCleanCommit(mockHoodieTable, lastCleanInstant, earliestInstantsInLastClean, activeTimeline, cleanMetadataOptionPair);
- mockFewActiveInstants(mockHoodieTable, activeInstantsPartitions, savepointsTrackedInLastClean);
+ mockFewActiveInstants(mockHoodieTable, activeInstantsPartitions, savepointsTrackedInLastClean, areCommitsForSavepointsRemoved);
// Trigger clean and validate partitions to clean.
CleanPlanner, ?, ?, ?> cleanPlanner = new CleanPlanner<>(context, mockHoodieTable, config);
@@ -332,7 +332,7 @@ static Stream