From 1874d4fadea7db5be55f0fd938276eb594258357 Mon Sep 17 00:00:00 2001 From: danny0405 Date: Mon, 31 Jul 2023 16:26:36 +0800 Subject: [PATCH] Maintain only one version pointer file, add file size limination to compaction strategy --- .../hudi/client/HoodieTimelineArchiver.java | 62 +++++++++++---- .../hudi/io/TestHoodieTimelineArchiver.java | 15 +--- .../timeline/HoodieArchivedTimeline.java | 77 +++++++++++-------- .../ArchivedTimelineReadBenchmark.scala | 2 +- 4 files changed, 95 insertions(+), 61 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index 71717e2ae37c5..485f8d2cba6e6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -296,9 +296,11 @@ private Option doCompact(List latestSnapshotFiles, int layer) th if (files.size() >= archiveMergeFilesBatchSize) { // 2. sort files by min instant time (implies ascending chronological order) files.sort(new HoodieArchivedTimeline.ArchiveParquetVersionComparator()); - List candidateFiles = files.stream() - .limit(archiveMergeFilesBatchSize) - .collect(Collectors.toList()); + List candidateFiles = getCandidateFiles(files, archiveMergeFilesBatchSize); + if (candidateFiles.size() < 2) { + // the file is too large to compact, returns early. + return Option.empty(); + } String compactedFileName = compactedFileName(candidateFiles); // 3. compaction @@ -311,6 +313,29 @@ private Option doCompact(List latestSnapshotFiles, int layer) th return Option.empty(); } + /** + * Returns at most {@code filesBatch} number of source files + * restricted by the gross file size by 1GB. + */ + private List getCandidateFiles(List files, int filesBatch) throws IOException { + List candidates = new ArrayList<>(); + long totalFileLen = 0L; + long maxFileSizeInBytes = 1024 * 1024 * 1000; + for (int i = 0; i < filesBatch; i++) { + String file = files.get(i); + if (totalFileLen > maxFileSizeInBytes) { + return candidates; + } + long fileLen = metaClient.getFs().getFileStatus(new Path(metaClient.getArchivePath(), file)).getLen(); + // we may also need to consider a single file that is very close to the threshold in size, + // to avoid the write amplification, + // for e.g, two 800MB files compact into a 1.6GB file. + totalFileLen += fileLen; + candidates.add(file); + } + return candidates; + } + /** * Returns a new file name. */ @@ -347,18 +372,13 @@ private void clean(HoodieEngineContext context, int compactedVersions) throws IO .flatMap(version -> HoodieArchivedTimeline.latestSnapshotFiles(metaClient, version).stream()) .collect(Collectors.toSet()); // delete the manifest file first - List metaFilesToClean = new ArrayList<>(); - Arrays.stream(HoodieArchivedTimeline.listAllVersionFiles(metaClient)).forEach(fileStatus -> { - if (!versionsToKeep.contains(HoodieArchivedTimeline.getSnapshotVersion(fileStatus.getPath().getName()))) { - metaFilesToClean.add(fileStatus.getPath().toString()); - } - }); + List manifestFilesToClean = new ArrayList<>(); Arrays.stream(HoodieArchivedTimeline.listAllManifestFiles(metaClient)).forEach(fileStatus -> { if (!versionsToKeep.contains(HoodieArchivedTimeline.getManifestVersion(fileStatus.getPath().getName()))) { - metaFilesToClean.add(fileStatus.getPath().toString()); + manifestFilesToClean.add(fileStatus.getPath().toString()); } }); - deleteFilesParallelize(metaClient, metaFilesToClean, context, false); + deleteFilesParallelize(metaClient, manifestFilesToClean, context, false); // delete the archive data files List dataFilesToClean = Arrays.stream(HoodieArchivedTimeline.listAllMetaFiles(metaClient)) .filter(fileStatus -> !filesToKeep.contains(fileStatus.getPath().getName())) @@ -392,14 +412,26 @@ public void updateManifest(List filesToRemove, String fileToAdd) throws createManifestFile(newFileList, latestVersion); } - private void createManifestFile(List newFileList, int currentVersion) { + private void createManifestFile(List newFileList, int currentVersion) throws IOException { byte[] content = String.join(",", newFileList).getBytes(StandardCharsets.UTF_8); // version starts from 1 and increases monotonically int newVersion = currentVersion < 0 ? 1 : currentVersion + 1; // create manifest file - FileIOUtils.createFileInPath(metaClient.getFs(), HoodieArchivedTimeline.getManifestFilePath(metaClient, newVersion), Option.of(content)); - // create version file - FileIOUtils.createFileInPath(metaClient.getFs(), HoodieArchivedTimeline.getVersionFilePath(metaClient, newVersion), Option.empty()); + final Path tempManifestFilePath = HoodieArchivedTimeline.getTempManifestFilePath(metaClient, newVersion); + final Path manifestFilePath = HoodieArchivedTimeline.getManifestFilePath(metaClient, newVersion); + FileIOUtils.createFileInPath(metaClient.getFs(), tempManifestFilePath, Option.of(content)); + metaClient.getFs().rename(tempManifestFilePath, manifestFilePath); + // update version file + updateVersionFile(newVersion); + } + + private void updateVersionFile(int newVersion) throws IOException { + byte[] content = (String.valueOf(newVersion)).getBytes(StandardCharsets.UTF_8); + final Path tempVersionFilePath = HoodieArchivedTimeline.getTempVersionFilePath(metaClient); + final Path versionFilePath = HoodieArchivedTimeline.getVersionFilePath(metaClient); + FileIOUtils.createFileInPath(metaClient.getFs(), tempVersionFilePath, Option.of(content)); + metaClient.getFs().delete(versionFilePath, false); + metaClient.getFs().rename(tempVersionFilePath, versionFilePath); } public void compactArchiveFiles(List candidateFiles, String compactedFileName) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 5f957c09912a3..91d5edb701924 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -492,7 +492,7 @@ public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exc } @Test - public void testCompactionWithCorruptManifestFile() throws Exception { + public void testCompactionWithCorruptVersionFile() throws Exception { HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3); // do ingestion and trigger archive actions here. @@ -501,16 +501,9 @@ public void testCompactionWithCorruptManifestFile() throws Exception { archiveAndGetCommitsList(writeConfig); } - // build a compaction archive plan with dummy content - // this plan can not be deserialized. - HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); - HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table); - List files = HoodieArchivedTimeline.latestSnapshotFiles(metaClient); - files.sort(new HoodieArchivedTimeline.ArchiveParquetVersionComparator()); - - archiver.updateManifest("dummy.parquet"); - // remove the version file created by the last manifest update - metaClient.getFs().delete(HoodieArchivedTimeline.getVersionFilePath(metaClient, HoodieArchivedTimeline.latestSnapshotVersion(metaClient))); + // create a version pointer file with invalid version number. + metaClient.getFs().delete(HoodieArchivedTimeline.getVersionFilePath(metaClient)); + FileIOUtils.createFileInPath(metaClient.getFs(), HoodieArchivedTimeline.getVersionFilePath(metaClient), Option.of("invalid_version".getBytes(StandardCharsets.UTF_8))); // check that invalid manifest file will not block archived timeline loading. HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index 3668c92668ba5..c52907aff324a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -45,6 +45,7 @@ import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -69,10 +70,10 @@ * *
  *   t111, t112 ... t120 ... ->
- *   \              /
- *      \        /
- *          |
- *          V
+ *     \              /
+ *        \        /
+ *            |
+ *            V
  *   t111_t120_0.parquet, t101_t110_0.parquet,...  t11_t20_0.parquet    L0
  *                                  \                    /
  *                                     \              /
@@ -81,14 +82,14 @@
  *                                    t11_t100_1.parquet                L1
  *
  *      manifest_1, manifest_2, ... manifest_12
- *          |           |               |
- *          V           V               V
- *      _version_1, _version_2, ... _version_12
+ *                                      |
+ *                                      V
+ *                                  _version_
  * 
* *

The LSM Tree Compaction

- * Use the universal compaction strategy, that is: when N(by default 10) number of parquet files exist in the current layer, they are merged and flush as a large file in the next layer. - * We have no limit for the layer number, assumes there are 10 instants for each file in L0, there could be 100 instants per-file in L1, + * Use the universal compaction strategy, that is: when N(by default 10) number of parquet files exist in the current layer, they are merged and flush as a compacted file in the next layer. + * We have no limit for the layer number, assumes there are 10 instants for each file in L0, there could be 100 instants per file in L1, * so 3000 instants could be represented as 3 parquets in L2, it is pretty fast if we use concurrent read. * *

The benchmark shows 1000 instants read cost about 10 ms. @@ -125,8 +126,9 @@ *

This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized. */ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { - private static final String VERSION_FILE_PREFIX = "_version_"; // _version_[N] + private static final String VERSION_FILE_NAME = "_version_"; // _version_ private static final String MANIFEST_FILE_PREFIX = "manifest_"; // manifest_[N] + private static final String TEMP_FILE_SUFFIX = ".tmp"; public static final int FILE_LAYER_ZERO = 0; private static final Pattern ARCHIVE_FILE_PATTERN = Pattern.compile("^(\\d+)_(\\d+)_(\\d)\\.parquet"); @@ -347,6 +349,18 @@ private static boolean isFileInRange(TimeRangeFilter filter, String fileName) { * Returns the latest snapshot version. */ public static int latestSnapshotVersion(HoodieTableMetaClient metaClient) throws IOException { + Path versionFilePath = getVersionFilePath(metaClient); + if (metaClient.getFs().exists(versionFilePath)) { + try { + Option content = FileIOUtils.readDataFromPath(metaClient.getFs(), versionFilePath); + if (content.isPresent()) { + return Integer.parseInt(new String(content.get(), StandardCharsets.UTF_8)); + } + } catch (Exception e) { + // fallback to manifest file listing. + LOG.warn("Error reading version file {}", versionFilePath, e); + } + } return allSnapshotVersions(metaClient).stream().max(Integer::compareTo).orElse(-1); } @@ -354,9 +368,9 @@ public static int latestSnapshotVersion(HoodieTableMetaClient metaClient) throws * Returns all the valid snapshot versions. */ public static List allSnapshotVersions(HoodieTableMetaClient metaClient) throws IOException { - return Arrays.stream(metaClient.getFs().listStatus(new Path(metaClient.getArchivePath()), getVersionFilePathFilter())) + return Arrays.stream(metaClient.getFs().listStatus(new Path(metaClient.getArchivePath()), getManifestFilePathFilter())) .map(fileStatus -> fileStatus.getPath().getName()) - .map(HoodieArchivedTimeline::getSnapshotVersion) + .map(HoodieArchivedTimeline::getManifestVersion) .collect(Collectors.toList()); } @@ -388,18 +402,27 @@ public static Path getManifestFilePath(HoodieTableMetaClient metaClient, int sna return new Path(metaClient.getArchivePath(), MANIFEST_FILE_PREFIX + snapshotVersion); } + public static Path getTempManifestFilePath(HoodieTableMetaClient metaClient, int snapshotVersion) throws IOException { + Path path = new Path(metaClient.getArchivePath(), MANIFEST_FILE_PREFIX + snapshotVersion + TEMP_FILE_SUFFIX); + if (metaClient.getFs().exists(path)) { + metaClient.getFs().delete(path); + } + return path; + } + /** * Returns the full version file path with given version number. */ - public static Path getVersionFilePath(HoodieTableMetaClient metaClient, int snapshotVersion) { - return new Path(metaClient.getArchivePath(), VERSION_FILE_PREFIX + snapshotVersion); + public static Path getVersionFilePath(HoodieTableMetaClient metaClient) { + return new Path(metaClient.getArchivePath(), VERSION_FILE_NAME); } - /** - * List all the version files. - */ - public static FileStatus[] listAllVersionFiles(HoodieTableMetaClient metaClient) throws IOException { - return metaClient.getFs().listStatus(new Path(metaClient.getArchivePath()), getVersionFilePathFilter()); + public static Path getTempVersionFilePath(HoodieTableMetaClient metaClient) throws IOException { + Path path = new Path(metaClient.getArchivePath(), VERSION_FILE_NAME + TEMP_FILE_SUFFIX); + if (metaClient.getFs().exists(path)) { + metaClient.getFs().delete(path); + } + return path; } /** @@ -417,13 +440,6 @@ public static FileStatus[] listAllMetaFiles(HoodieTableMetaClient metaClient) th new Path(metaClient.getArchivePath() + "/*.parquet")); } - /** - * Parse the snapshot version from the version file name. - */ - public static int getSnapshotVersion(String fileName) { - return Integer.parseInt(fileName.split("_")[2]); - } - /** * Parse the snapshot version from the manifest file name. */ @@ -480,18 +496,11 @@ public static boolean isFileFromLayer(String fileName, int layer) { return getFileLayer(fileName) == layer; } - /** - * Returns a path filter for the version pointer files. - */ - public static PathFilter getVersionFilePathFilter() { - return path -> path.getName().startsWith(VERSION_FILE_PREFIX); - } - /** * Returns a path filter for the manifest files. */ public static PathFilter getManifestFilePathFilter() { - return path -> path.getName().startsWith(MANIFEST_FILE_PREFIX); + return path -> path.getName().startsWith(MANIFEST_FILE_PREFIX) && !path.getName().endsWith(TEMP_FILE_SUFFIX); } // ------------------------------------------------------------------------- diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala index 6ac7cc057627d..04989880472da 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala @@ -78,7 +78,7 @@ object ArchivedTimelineReadBenchmark extends HoodieBenchmarkBase { } val benchmark = new HoodieBenchmark("pref load archived instants", commitsNum, 3) - benchmark.addCase("read shim instants") { _ => + benchmark.addCase("read slim instants") { _ => new HoodieArchivedTimeline(metaClient) } benchmark.addCase("read instants with commit metadata") { _ =>