From 9a2afc24ea8552ec6869f9d5819a00c78067a4e6 Mon Sep 17 00:00:00 2001 From: danny0405 Date: Tue, 1 Aug 2023 16:06:58 +0800 Subject: [PATCH] write the manifest as JSON, move the timeline write path to separate class for convenient review --- .../hudi/client/HoodieTimelineArchiver.java | 333 +-------------- .../client/utils/ArchivedTimelineWriter.java | 387 ++++++++++++++++++ .../hudi/io/TestHoodieTimelineArchiver.java | 14 +- .../common/model/HoodieArchivedManifest.java | 132 ++++++ .../timeline/HoodieArchivedTimeline.java | 28 +- .../ArchivedTimelineReadBenchmark.scala | 13 +- 6 files changed, 550 insertions(+), 357 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ArchivedTimelineWriter.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/model/HoodieArchivedManifest.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index 485f8d2cba6e6..1fc2002f7bcac 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -19,21 +19,15 @@ package org.apache.hudi.client; -import org.apache.hudi.avro.model.HoodieArchivedInstant; import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.client.utils.ActiveInstant; -import org.apache.hudi.client.utils.MetadataConversionUtils; +import org.apache.hudi.client.utils.ArchivedTimelineWriter; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieCleaningPolicy; -import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; @@ -41,41 +35,23 @@ import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.CompactionUtils; -import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.VisibleForTesting; -import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.io.storage.HoodieAvroParquetReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; -import org.apache.hudi.io.storage.HoodieFileWriter; -import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; -import org.apache.hudi.table.marker.WriteMarkers; -import org.apache.hudi.table.marker.WriteMarkersFactory; -import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.time.Instant; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -103,18 +79,20 @@ public class HoodieTimelineArchiver { private static final Logger LOG = LoggerFactory.getLogger(HoodieTimelineArchiver.class); private final HoodieWriteConfig config; - private HoodieWriteConfig writerConfig; private final int maxInstantsToKeep; private final int minInstantsToKeep; private final HoodieTable table; private final HoodieTableMetaClient metaClient; private final TransactionManager txnManager; + private final ArchivedTimelineWriter timelineWriter; + public HoodieTimelineArchiver(HoodieWriteConfig config, HoodieTable table) { this.config = config; this.table = table; this.metaClient = table.getMetaClient(); this.txnManager = new TransactionManager(config, table.getMetaClient().getFs()); + this.timelineWriter = ArchivedTimelineWriter.getInstance(config, table); HoodieTimeline completedCommitsTimeline = table.getCompletedCommitsTimeline(); Option latestCommit = completedCommitsTimeline.lastInstant(); HoodieCleaningPolicy cleanerPolicy = config.getCleanerPolicy(); @@ -183,27 +161,6 @@ private Option getEarliestCommitToRetain(Option la return earliestCommitToRetain; } - /** - * Get or create a writer config for parquet writer. - */ - private HoodieWriteConfig getOrCreateWriterConfig() { - if (this.writerConfig == null) { - this.writerConfig = HoodieWriteConfig.newBuilder() - .withProperties(this.config.getProps()) - .withPopulateMetaFields(false).build(); - } - return this.writerConfig; - } - - private HoodieFileWriter openWriter(Path filePath) { - try { - return HoodieFileWriterFactory.getFileWriter("", filePath, metaClient.getHadoopConf(), getOrCreateWriterConfig(), - HoodieArchivedInstant.getClassSchema(), table.getTaskContextSupplier(), HoodieRecordType.AVRO); - } catch (IOException e) { - throw new HoodieException("Unable to initialize archiving writer", e); - } - } - public boolean archiveIfRequired(HoodieEngineContext context) throws IOException { return archiveIfRequired(context, false); } @@ -222,11 +179,11 @@ public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLoc boolean success = true; if (!instantsToArchive.isEmpty()) { LOG.info("Archiving instants " + instantsToArchive); - archive(context, instantsToArchive); + this.timelineWriter.write(context, instantsToArchive); LOG.info("Deleting archived instants " + instantsToArchive); success = deleteArchivedInstants(instantsToArchive, context); // triggers compaction and cleaning only after archiving action - compactAndClean(context); + this.timelineWriter.compactAndClean(context); } else { LOG.info("No Instants to archive"); } @@ -238,252 +195,6 @@ public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLoc } } - /** - * Compacts the small archive files. - * - *

The parquet naming convention is: - * - *

${min_instant}_${max_instant}_${level}.parquet
- * - *

The 'min_instant' and 'max_instant' represent the instant time range of the parquet file. - * The 'level' represents the number of the level where the file is located, currently we - * have no limit for the number of layers. - * - *

These archive parquet files composite as an LSM tree layout, one parquet file contains - * a consecutive timestamp instant metadata entries. Different parquet files may have - * overlapping with the instant time ranges. - * - *

-   *   t1_t2_0.parquet, t3_t4_0.parquet, ... t5_t6_0.parquet       L0 layer
-   *                         \            /
-   *                             \     /
-   *                                |
-   *                                V
-   *                          t3_t6_1.parquet                      L1 layer
-   * 
- * - *

Compaction and cleaning: once the files number exceed a threshold(now constant 10) N, - * the oldest N files are then replaced with a compacted file in the next layer. - * A cleaning action is triggered right after the compaction. - * - * @param context HoodieEngineContext - */ - @VisibleForTesting - public void compactAndClean(HoodieEngineContext context) throws IOException { - // 1. List all the latest snapshot files - List latestSnapshotFiles = HoodieArchivedTimeline.latestSnapshotFiles(metaClient); - int layer = 0; - // 2. triggers the compaction for L0 - Option compactedFileName = doCompact(latestSnapshotFiles, layer); - while (compactedFileName.isPresent()) { - // 3. once a compaction had been executed for the current layer, - // continues to trigger compaction for the next layer. - latestSnapshotFiles.add(compactedFileName.get()); - compactedFileName = doCompact(latestSnapshotFiles, ++layer); - } - - // cleaning - clean(context, layer + 1); - } - - private Option doCompact(List latestSnapshotFiles, int layer) throws IOException { - // 1. list all the files that belong to current layer - List files = latestSnapshotFiles - .stream().filter(file -> HoodieArchivedTimeline.isFileFromLayer(file, layer)).collect(Collectors.toList()); - - int archiveMergeFilesBatchSize = config.getArchiveMergeFilesBatchSize(); - - if (files.size() >= archiveMergeFilesBatchSize) { - // 2. sort files by min instant time (implies ascending chronological order) - files.sort(new HoodieArchivedTimeline.ArchiveParquetVersionComparator()); - List candidateFiles = getCandidateFiles(files, archiveMergeFilesBatchSize); - if (candidateFiles.size() < 2) { - // the file is too large to compact, returns early. - return Option.empty(); - } - String compactedFileName = compactedFileName(candidateFiles); - - // 3. compaction - compactArchiveFiles(candidateFiles, compactedFileName); - // 4. update the manifest file - updateManifest(candidateFiles, compactedFileName); - LOG.info("Finishes compaction of archive files: " + candidateFiles); - return Option.of(compactedFileName); - } - return Option.empty(); - } - - /** - * Returns at most {@code filesBatch} number of source files - * restricted by the gross file size by 1GB. - */ - private List getCandidateFiles(List files, int filesBatch) throws IOException { - List candidates = new ArrayList<>(); - long totalFileLen = 0L; - long maxFileSizeInBytes = 1024 * 1024 * 1000; - for (int i = 0; i < filesBatch; i++) { - String file = files.get(i); - if (totalFileLen > maxFileSizeInBytes) { - return candidates; - } - long fileLen = metaClient.getFs().getFileStatus(new Path(metaClient.getArchivePath(), file)).getLen(); - // we may also need to consider a single file that is very close to the threshold in size, - // to avoid the write amplification, - // for e.g, two 800MB files compact into a 1.6GB file. - totalFileLen += fileLen; - candidates.add(file); - } - return candidates; - } - - /** - * Returns a new file name. - */ - private static String newFileName(String minInstant, String maxInstant, int layer) { - return minInstant + "_" + maxInstant + "_" + layer + HoodieFileFormat.PARQUET.getFileExtension(); - } - - /** - * Returns a new file name. - */ - @VisibleForTesting - public static String compactedFileName(List files) { - String minInstant = files.stream().map(HoodieArchivedTimeline::getMinInstantTime) - .min(Comparator.naturalOrder()).get(); - String maxInstant = files.stream().map(HoodieArchivedTimeline::getMaxInstantTime) - .max(Comparator.naturalOrder()).get(); - int currentLayer = HoodieArchivedTimeline.getFileLayer(files.get(0)); - return newFileName(minInstant, maxInstant, currentLayer + 1); - } - - /** - * Checks whether there is any unfinished compaction operation. - * - * @param context HoodieEngineContext used for parallelize to delete small archive files if necessary. - */ - private void clean(HoodieEngineContext context, int compactedVersions) throws IOException { - // if there are more than 3 version of snapshots, clean the oldest files. - List allSnapshotVersions = HoodieArchivedTimeline.allSnapshotVersions(metaClient); - int numVersionsToKeep = 3 + compactedVersions; // should make the threshold configurable. - if (allSnapshotVersions.size() > numVersionsToKeep) { - allSnapshotVersions.sort((v1, v2) -> v2 - v1); - List versionsToKeep = allSnapshotVersions.subList(0, numVersionsToKeep); - Set filesToKeep = versionsToKeep.stream() - .flatMap(version -> HoodieArchivedTimeline.latestSnapshotFiles(metaClient, version).stream()) - .collect(Collectors.toSet()); - // delete the manifest file first - List manifestFilesToClean = new ArrayList<>(); - Arrays.stream(HoodieArchivedTimeline.listAllManifestFiles(metaClient)).forEach(fileStatus -> { - if (!versionsToKeep.contains(HoodieArchivedTimeline.getManifestVersion(fileStatus.getPath().getName()))) { - manifestFilesToClean.add(fileStatus.getPath().toString()); - } - }); - deleteFilesParallelize(metaClient, manifestFilesToClean, context, false); - // delete the archive data files - List dataFilesToClean = Arrays.stream(HoodieArchivedTimeline.listAllMetaFiles(metaClient)) - .filter(fileStatus -> !filesToKeep.contains(fileStatus.getPath().getName())) - .map(fileStatus -> fileStatus.getPath().toString()) - .collect(Collectors.toList()); - deleteFilesParallelize(metaClient, dataFilesToClean, context, false); - } - } - - public void updateManifest(String fileToAdd) throws IOException { - // 1. read the latest manifest version file; - // 2. read the latest manifest file for valid files; - // 3. add this new file to the existing file list from step2. - int latestVersion = HoodieArchivedTimeline.latestSnapshotVersion(metaClient); - List latestSnapshotFiles = HoodieArchivedTimeline.latestSnapshotFiles(metaClient, latestVersion); - List newFileList = new ArrayList<>(latestSnapshotFiles); - newFileList.add(fileToAdd); - createManifestFile(newFileList, latestVersion); - } - - public void updateManifest(List filesToRemove, String fileToAdd) throws IOException { - // 1. read the latest manifest version file; - // 2. read the latest manifest file for valid files; - // 3. remove files to the existing file list from step2; - // 4. add this new file to the existing file list from step2. - int latestVersion = HoodieArchivedTimeline.latestSnapshotVersion(metaClient); - List latestSnapshotFiles = HoodieArchivedTimeline.latestSnapshotFiles(metaClient, latestVersion); - List newFileList = new ArrayList<>(latestSnapshotFiles); - newFileList.removeAll(filesToRemove); - newFileList.add(fileToAdd); - createManifestFile(newFileList, latestVersion); - } - - private void createManifestFile(List newFileList, int currentVersion) throws IOException { - byte[] content = String.join(",", newFileList).getBytes(StandardCharsets.UTF_8); - // version starts from 1 and increases monotonically - int newVersion = currentVersion < 0 ? 1 : currentVersion + 1; - // create manifest file - final Path tempManifestFilePath = HoodieArchivedTimeline.getTempManifestFilePath(metaClient, newVersion); - final Path manifestFilePath = HoodieArchivedTimeline.getManifestFilePath(metaClient, newVersion); - FileIOUtils.createFileInPath(metaClient.getFs(), tempManifestFilePath, Option.of(content)); - metaClient.getFs().rename(tempManifestFilePath, manifestFilePath); - // update version file - updateVersionFile(newVersion); - } - - private void updateVersionFile(int newVersion) throws IOException { - byte[] content = (String.valueOf(newVersion)).getBytes(StandardCharsets.UTF_8); - final Path tempVersionFilePath = HoodieArchivedTimeline.getTempVersionFilePath(metaClient); - final Path versionFilePath = HoodieArchivedTimeline.getVersionFilePath(metaClient); - FileIOUtils.createFileInPath(metaClient.getFs(), tempVersionFilePath, Option.of(content)); - metaClient.getFs().delete(versionFilePath, false); - metaClient.getFs().rename(tempVersionFilePath, versionFilePath); - } - - public void compactArchiveFiles(List candidateFiles, String compactedFileName) { - LOG.info("Starting to merge small archive files."); - try (HoodieFileWriter writer = openWriter(new Path(metaClient.getArchivePath(), compactedFileName))) { - for (String fileName : candidateFiles) { - // Read the archived file - try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) - .getFileReader(metaClient.getHadoopConf(), new Path(metaClient.getArchivePath(), fileName))) { - // Read the meta entry - try (ClosableIterator iterator = reader.getIndexedRecordIterator(HoodieArchivedInstant.getClassSchema(), HoodieArchivedInstant.getClassSchema())) { - while (iterator.hasNext()) { - IndexedRecord record = iterator.next(); - writer.write(record.get(0).toString(), new HoodieAvroIndexedRecord(record), HoodieArchivedInstant.getClassSchema()); - } - } - } - } - } catch (Exception e) { - throw new HoodieCommitException("Failed to merge small archive files", e); - } - LOG.info("Success to merge small archive files."); - } - - private Map deleteFilesParallelize( - HoodieTableMetaClient metaClient, - List paths, - HoodieEngineContext context, - boolean ignoreFailed) { - return FSUtils.parallelizeFilesProcess(context, - metaClient.getFs(), - config.getArchiveDeleteParallelism(), - pairOfSubPathAndConf -> { - Path file = new Path(pairOfSubPathAndConf.getKey()); - try { - FileSystem fs = metaClient.getFs(); - if (fs.exists(file)) { - return fs.delete(file, false); - } - return true; - } catch (IOException e) { - if (!ignoreFailed) { - throw new HoodieIOException("Failed to delete : " + file, e); - } else { - LOG.warn("Ignore failed deleting : " + file); - return true; - } - } - }, - paths); - } - private Stream getCleanInstantsToArchive() { HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline() .getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION, HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants(); @@ -690,36 +401,4 @@ private boolean deleteArchivedInstants(List archivedInstants, Hoo return true; } - - public void archive(HoodieEngineContext context, List instants) throws HoodieCommitException { - Path filePath = new Path(metaClient.getArchivePath(), - newFileName(instants.get(0).getInstantTime(), instants.get(instants.size() - 1).getInstantTime(), HoodieArchivedTimeline.FILE_LAYER_ZERO)); - try (HoodieFileWriter writer = openWriter(filePath)) { - Schema wrapperSchema = HoodieArchivedInstant.getClassSchema(); - LOG.info("Archiving schema " + wrapperSchema.toString()); - for (ActiveInstant triple : instants) { - try { - deleteAnyLeftOverMarkers(context, triple); - // in local FS and HDFS, there could be empty completed instants due to crash. - final HoodieArchivedInstant metaEntry = MetadataConversionUtils.createArchivedInstant(triple, metaClient); - writer.write(metaEntry.getInstantTime(), new HoodieAvroIndexedRecord(metaEntry), wrapperSchema); - } catch (Exception e) { - LOG.error("Failed to archive instant: " + triple.getInstantTime(), e); - if (this.config.isFailOnTimelineArchivingEnabled()) { - throw e; - } - } - } - updateManifest(filePath.getName()); - } catch (Exception e) { - throw new HoodieCommitException("Failed to archive commits", e); - } - } - - private void deleteAnyLeftOverMarkers(HoodieEngineContext context, ActiveInstant activeInstant) { - WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, activeInstant.getInstantTime()); - if (writeMarkers.deleteMarkerDir(context, config.getMarkersDeleteParallelism())) { - LOG.info("Cleaned up left over marker directory for instant :" + activeInstant.getCompleted()); - } - } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ArchivedTimelineWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ArchivedTimelineWriter.java new file mode 100644 index 0000000000000..d2c57acdda55e --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ArchivedTimelineWriter.java @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client.utils; + +import org.apache.hudi.avro.model.HoodieArchivedInstant; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieArchivedManifest; +import org.apache.hudi.common.model.HoodieAvroIndexedRecord; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieCommitException; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroParquetReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieFileWriter; +import org.apache.hudi.io.storage.HoodieFileWriterFactory; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.marker.WriteMarkers; +import org.apache.hudi.table.marker.WriteMarkersFactory; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * An archived timeline writer which organizes the files as an LSM tree. + */ +public class ArchivedTimelineWriter { + private static final Logger LOG = LoggerFactory.getLogger(ArchivedTimelineWriter.class); + + private final HoodieWriteConfig config; + private final HoodieTable table; + private final HoodieTableMetaClient metaClient; + + private HoodieWriteConfig writeConfig; + + private ArchivedTimelineWriter(HoodieWriteConfig config, HoodieTable table) { + this.config = config; + this.table = table; + this.metaClient = table.getMetaClient(); + } + + public static ArchivedTimelineWriter getInstance(HoodieWriteConfig config, HoodieTable table) { + return new ArchivedTimelineWriter(config, table); + } + + public void write(HoodieEngineContext context, List instants) throws HoodieCommitException { + Path filePath = new Path(metaClient.getArchivePath(), + newFileName(instants.get(0).getInstantTime(), instants.get(instants.size() - 1).getInstantTime(), HoodieArchivedTimeline.FILE_LAYER_ZERO)); + try (HoodieFileWriter writer = openWriter(filePath)) { + Schema wrapperSchema = HoodieArchivedInstant.getClassSchema(); + LOG.info("Archiving schema " + wrapperSchema.toString()); + for (ActiveInstant triple : instants) { + try { + deleteAnyLeftOverMarkers(context, triple); + // in local FS and HDFS, there could be empty completed instants due to crash. + final HoodieArchivedInstant metaEntry = MetadataConversionUtils.createArchivedInstant(triple, metaClient); + writer.write(metaEntry.getInstantTime(), new HoodieAvroIndexedRecord(metaEntry), wrapperSchema); + } catch (Exception e) { + LOG.error("Failed to archive instant: " + triple.getInstantTime(), e); + if (this.config.isFailOnTimelineArchivingEnabled()) { + throw e; + } + } + } + updateManifest(filePath.getName()); + } catch (Exception e) { + throw new HoodieCommitException("Failed to archive commits", e); + } + } + + public void updateManifest(String fileToAdd) throws IOException { + // 1. read the latest manifest version file; + // 2. read the latest manifest file for valid files; + // 3. add this new file to the existing file list from step2. + int latestVersion = HoodieArchivedTimeline.latestSnapshotVersion(metaClient); + HoodieArchivedManifest latestManifest = HoodieArchivedTimeline.latestSnapshotManifest(metaClient, latestVersion); + HoodieArchivedManifest newManifest = latestManifest.copy(); + newManifest.addFile(getFileEntry(fileToAdd)); + createManifestFile(newManifest, latestVersion); + } + + public void updateManifest(List filesToRemove, String fileToAdd) throws IOException { + // 1. read the latest manifest version file; + // 2. read the latest manifest file for valid files; + // 3. remove files to the existing file list from step2; + // 4. add this new file to the existing file list from step2. + int latestVersion = HoodieArchivedTimeline.latestSnapshotVersion(metaClient); + HoodieArchivedManifest latestManifest = HoodieArchivedTimeline.latestSnapshotManifest(metaClient, latestVersion); + HoodieArchivedManifest newManifest = latestManifest.copy(filesToRemove); + newManifest.addFile(getFileEntry(fileToAdd)); + createManifestFile(newManifest, latestVersion); + } + + private void createManifestFile(HoodieArchivedManifest manifest, int currentVersion) throws IOException { + byte[] content = manifest.toJsonString().getBytes(StandardCharsets.UTF_8); + // version starts from 1 and increases monotonically + int newVersion = currentVersion < 0 ? 1 : currentVersion + 1; + // create manifest file + final Path tempManifestFilePath = HoodieArchivedTimeline.getTempManifestFilePath(metaClient, newVersion); + final Path manifestFilePath = HoodieArchivedTimeline.getManifestFilePath(metaClient, newVersion); + FileIOUtils.createFileInPath(metaClient.getFs(), tempManifestFilePath, Option.of(content)); + metaClient.getFs().rename(tempManifestFilePath, manifestFilePath); + // update version file + updateVersionFile(newVersion); + } + + private void updateVersionFile(int newVersion) throws IOException { + byte[] content = (String.valueOf(newVersion)).getBytes(StandardCharsets.UTF_8); + final Path tempVersionFilePath = HoodieArchivedTimeline.getTempVersionFilePath(metaClient); + final Path versionFilePath = HoodieArchivedTimeline.getVersionFilePath(metaClient); + FileIOUtils.createFileInPath(metaClient.getFs(), tempVersionFilePath, Option.of(content)); + metaClient.getFs().delete(versionFilePath, false); + metaClient.getFs().rename(tempVersionFilePath, versionFilePath); + } + + /** + * Compacts the small archive files. + * + *

The parquet naming convention is: + * + *

${min_instant}_${max_instant}_${level}.parquet
+ * + *

The 'min_instant' and 'max_instant' represent the instant time range of the parquet file. + * The 'level' represents the number of the level where the file is located, currently we + * have no limit for the number of layers. + * + *

These archive parquet files composite as an LSM tree layout, one parquet file contains + * a consecutive timestamp instant metadata entries. Different parquet files may have + * overlapping with the instant time ranges. + * + *

+   *   t1_t2_0.parquet, t3_t4_0.parquet, ... t5_t6_0.parquet       L0 layer
+   *                          \            /
+   *                             \     /
+   *                                |
+   *                                V
+   *                          t3_t6_1.parquet                      L1 layer
+   * 
+ * + *

Compaction and cleaning: once the files number exceed a threshold(now constant 10) N, + * the oldest N files are then replaced with a compacted file in the next layer. + * A cleaning action is triggered right after the compaction. + * + * @param context HoodieEngineContext + */ + @VisibleForTesting + public void compactAndClean(HoodieEngineContext context) throws IOException { + // 1. List all the latest snapshot files + HoodieArchivedManifest latestManifest = HoodieArchivedTimeline.latestSnapshotManifest(metaClient); + int layer = 0; + // 2. triggers the compaction for L0 + Option compactedFileName = doCompact(latestManifest, layer); + while (compactedFileName.isPresent()) { + // 3. once a compaction had been executed for the current layer, + // continues to trigger compaction for the next layer. + latestManifest.addFile(getFileEntry(compactedFileName.get())); + compactedFileName = doCompact(latestManifest, ++layer); + } + + // cleaning + clean(context, layer); + } + + private Option doCompact(HoodieArchivedManifest manifest, int layer) throws IOException { + // 1. list all the files that belong to current layer + List files = manifest.getFiles() + .stream().filter(file -> HoodieArchivedTimeline.isFileFromLayer(file.getFileName(), layer)).collect(Collectors.toList()); + + int compactionBatchSize = config.getArchiveMergeFilesBatchSize(); + + if (files.size() >= compactionBatchSize) { + // 2. sort files by min instant time (implies ascending chronological order) + files.sort(HoodieArchivedManifest.FileEntry::compareTo); + List candidateFiles = getCandidateFiles(files, compactionBatchSize); + if (candidateFiles.size() < 2) { + // the file is too large to compact, returns early. + return Option.empty(); + } + String compactedFileName = compactedFileName(candidateFiles); + + // 3. compaction + compactArchiveFiles(candidateFiles, compactedFileName); + // 4. update the manifest file + updateManifest(candidateFiles, compactedFileName); + LOG.info("Finishes compaction of archive files: " + candidateFiles); + return Option.of(compactedFileName); + } + return Option.empty(); + } + + public void compactArchiveFiles(List candidateFiles, String compactedFileName) { + LOG.info("Starting to merge small archive files."); + try (HoodieFileWriter writer = openWriter(new Path(metaClient.getArchivePath(), compactedFileName))) { + for (String fileName : candidateFiles) { + // Read the archived file + try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader) HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + .getFileReader(metaClient.getHadoopConf(), new Path(metaClient.getArchivePath(), fileName))) { + // Read the meta entry + try (ClosableIterator iterator = reader.getIndexedRecordIterator(HoodieArchivedInstant.getClassSchema(), HoodieArchivedInstant.getClassSchema())) { + while (iterator.hasNext()) { + IndexedRecord record = iterator.next(); + writer.write(record.get(0).toString(), new HoodieAvroIndexedRecord(record), HoodieArchivedInstant.getClassSchema()); + } + } + } + } + } catch (Exception e) { + throw new HoodieCommitException("Failed to merge small archive files", e); + } + LOG.info("Success to merge small archive files."); + } + + /** + * Checks whether there is any unfinished compaction operation. + * + * @param context HoodieEngineContext used for parallelize to delete small archive files if necessary. + */ + public void clean(HoodieEngineContext context, int compactedVersions) throws IOException { + // if there are more than 3 version of snapshots, clean the oldest files. + List allSnapshotVersions = HoodieArchivedTimeline.allSnapshotVersions(metaClient); + int numVersionsToKeep = 3 + compactedVersions; // should make the threshold configurable. + if (allSnapshotVersions.size() > numVersionsToKeep) { + allSnapshotVersions.sort((v1, v2) -> v2 - v1); + List versionsToKeep = allSnapshotVersions.subList(0, numVersionsToKeep); + Set filesToKeep = versionsToKeep.stream() + .flatMap(version -> HoodieArchivedTimeline.latestSnapshotManifest(metaClient, version).getFileNames().stream()) + .collect(Collectors.toSet()); + // delete the manifest file first + List manifestFilesToClean = new ArrayList<>(); + Arrays.stream(HoodieArchivedTimeline.listAllManifestFiles(metaClient)).forEach(fileStatus -> { + if (!versionsToKeep.contains(HoodieArchivedTimeline.getManifestVersion(fileStatus.getPath().getName()))) { + manifestFilesToClean.add(fileStatus.getPath().toString()); + } + }); + deleteFilesParallelize(metaClient, manifestFilesToClean, context, false); + // delete the archive data files + List dataFilesToClean = Arrays.stream(HoodieArchivedTimeline.listAllMetaFiles(metaClient)) + .filter(fileStatus -> !filesToKeep.contains(fileStatus.getPath().getName())) + .map(fileStatus -> fileStatus.getPath().toString()) + .collect(Collectors.toList()); + deleteFilesParallelize(metaClient, dataFilesToClean, context, false); + } + } + + private HoodieArchivedManifest.FileEntry getFileEntry(String fileName) throws IOException { + long fileLen = metaClient.getFs().getFileStatus(new Path(metaClient.getArchivePath(), fileName)).getLen(); + return HoodieArchivedManifest.FileEntry.getInstance(fileName, fileLen); + } + + /** + * Returns at most {@code filesBatch} number of source files + * restricted by the gross file size by 1GB. + */ + private List getCandidateFiles(List files, int filesBatch) throws IOException { + List candidates = new ArrayList<>(); + long totalFileLen = 0L; + long maxFileSizeInBytes = 1024 * 1024 * 1000; + for (int i = 0; i < filesBatch; i++) { + HoodieArchivedManifest.FileEntry fileEntry = files.get(i); + if (totalFileLen > maxFileSizeInBytes) { + return candidates; + } + // we may also need to consider a single file that is very close to the threshold in size, + // to avoid the write amplification, + // for e.g, two 800MB files compact into a 1.6GB file. + totalFileLen += fileEntry.getFileLen(); + candidates.add(fileEntry.getFileName()); + } + return candidates; + } + + /** + * Returns a new file name. + */ + private static String newFileName(String minInstant, String maxInstant, int layer) { + return minInstant + "_" + maxInstant + "_" + layer + HoodieFileFormat.PARQUET.getFileExtension(); + } + + /** + * Returns a new file name. + */ + @VisibleForTesting + public static String compactedFileName(List files) { + String minInstant = files.stream().map(HoodieArchivedTimeline::getMinInstantTime) + .min(Comparator.naturalOrder()).get(); + String maxInstant = files.stream().map(HoodieArchivedTimeline::getMaxInstantTime) + .max(Comparator.naturalOrder()).get(); + int currentLayer = HoodieArchivedTimeline.getFileLayer(files.get(0)); + return newFileName(minInstant, maxInstant, currentLayer + 1); + } + + /** + * Get or create a writer config for parquet writer. + */ + private HoodieWriteConfig getOrCreateWriterConfig() { + if (this.writeConfig == null) { + this.writeConfig = HoodieWriteConfig.newBuilder() + .withProperties(this.config.getProps()) + .withPopulateMetaFields(false).build(); + } + return this.writeConfig; + } + + private HoodieFileWriter openWriter(Path filePath) { + try { + return HoodieFileWriterFactory.getFileWriter("", filePath, metaClient.getHadoopConf(), getOrCreateWriterConfig(), + HoodieArchivedInstant.getClassSchema(), table.getTaskContextSupplier(), HoodieRecord.HoodieRecordType.AVRO); + } catch (IOException e) { + throw new HoodieException("Unable to initialize archiving writer", e); + } + } + + private void deleteAnyLeftOverMarkers(HoodieEngineContext context, ActiveInstant activeInstant) { + WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, activeInstant.getInstantTime()); + if (writeMarkers.deleteMarkerDir(context, config.getMarkersDeleteParallelism())) { + LOG.info("Cleaned up left over marker directory for instant :" + activeInstant.getCompleted()); + } + } + + private Map deleteFilesParallelize( + HoodieTableMetaClient metaClient, + List paths, + HoodieEngineContext context, + boolean ignoreFailed) { + return FSUtils.parallelizeFilesProcess(context, + metaClient.getFs(), + config.getArchiveDeleteParallelism(), + pairOfSubPathAndConf -> { + Path file = new Path(pairOfSubPathAndConf.getKey()); + try { + FileSystem fs = metaClient.getFs(); + if (fs.exists(file)) { + return fs.delete(file, false); + } + return true; + } catch (IOException e) { + if (!ignoreFailed) { + throw new HoodieIOException("Failed to delete : " + file, e); + } else { + LOG.warn("Ignore failed deleting : " + file); + return true; + } + } + }, + paths); + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 91d5edb701924..8948a0b14c00a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -22,9 +22,11 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.HoodieTimelineArchiver; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; +import org.apache.hudi.client.utils.ArchivedTimelineWriter; import org.apache.hudi.client.utils.MetadataConversionUtils; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.model.HoodieArchivedManifest; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; @@ -542,12 +544,12 @@ public void testCompactionRecoverWithoutManifestFile() throws Exception { // do a single merge small archive files HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); - HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table); - List candidateFiles = HoodieArchivedTimeline.latestSnapshotFiles(metaClient); - candidateFiles.sort(new HoodieArchivedTimeline.ArchiveParquetVersionComparator()); + ArchivedTimelineWriter timelineWriter = ArchivedTimelineWriter.getInstance(writeConfig, table); + List candidateFiles = HoodieArchivedTimeline.latestSnapshotManifest(metaClient).getFiles().stream() + .sorted().map(HoodieArchivedManifest.FileEntry::getFileName).collect(Collectors.toList()); - String compactedFileName = HoodieTimelineArchiver.compactedFileName(candidateFiles); - archiver.compactArchiveFiles(candidateFiles, compactedFileName); + String compactedFileName = ArchivedTimelineWriter.compactedFileName(candidateFiles); + timelineWriter.compactArchiveFiles(candidateFiles, compactedFileName); // check loading archived and active timeline success HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false); @@ -572,7 +574,7 @@ public void testCompactionCleaning() throws Exception { assertEquals(4 * 3 + 14, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants()); assertEquals(9, HoodieArchivedTimeline.latestSnapshotVersion(metaClient)); - assertEquals(Arrays.asList(6, 7, 8, 9), HoodieArchivedTimeline.allSnapshotVersions(metaClient).stream().sorted().collect(Collectors.toList())); + assertEquals(Arrays.asList(7, 8, 9), HoodieArchivedTimeline.allSnapshotVersions(metaClient).stream().sorted().collect(Collectors.toList())); } @Test diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieArchivedManifest.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieArchivedManifest.java new file mode 100644 index 0000000000000..8b7b8417679f9 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieArchivedManifest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.common.util.JsonUtils; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Manifest entry for a version snapshot of the archived timeline. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class HoodieArchivedManifest implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(HoodieArchivedManifest.class); + + public static final HoodieArchivedManifest EMPTY = new HoodieArchivedManifest(); + + private final List files; + + // for ser/deser + public HoodieArchivedManifest() { + this.files = new ArrayList<>(); + } + + public HoodieArchivedManifest(List files) { + this.files = files; + } + + public void addFile(String fileName, long fileLen) { + this.files.add(FileEntry.getInstance(fileName, fileLen)); + } + + public void addFile(FileEntry fileEntry) { + this.files.add(fileEntry); + } + + public List getFiles() { + return files; + } + + public List getFileNames() { + return files.stream().map(FileEntry::getFileName).collect(Collectors.toList()); + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + public String toJsonString() throws IOException { + return JsonUtils.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this); + } + + public static T fromJsonString(String jsonStr, Class clazz) throws Exception { + if (jsonStr == null || jsonStr.isEmpty()) { + // For empty commit file (no data or something bad happen). + return clazz.newInstance(); + } + return JsonUtils.getObjectMapper().readValue(jsonStr, clazz); + } + + public HoodieArchivedManifest copy(List filesToRemove) { + List newFiles = this.files.stream().filter(fileEntry -> !filesToRemove.contains(fileEntry.getFileName())).collect(Collectors.toList()); + return new HoodieArchivedManifest(newFiles); + } + + public HoodieArchivedManifest copy() { + return new HoodieArchivedManifest(new ArrayList<>(this.files)); + } + + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + + /** + * A file entry. + */ + public static class FileEntry implements Serializable, Comparable { + private String fileName; + private long fileLen; + + // for ser/deser + public FileEntry() { + } + + private FileEntry(String fileName, long fileLen) { + this.fileName = fileName; + this.fileLen = fileLen; + } + + public static FileEntry getInstance(String fileName, long fileLen) { + return new FileEntry(fileName, fileLen); + } + + public String getFileName() { + return fileName; + } + + public long getFileLen() { + return fileLen; + } + + @Override + public int compareTo(HoodieArchivedManifest.FileEntry other) { + // sorts the files by order of min instant time in file name. + return this.fileName.compareTo(other.fileName); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index c52907aff324a..ffdd4372fdc86 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.table.timeline; import org.apache.hudi.avro.model.HoodieArchivedInstant; +import org.apache.hudi.common.model.HoodieArchivedManifest; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ArchivedInstantReadSchemas; @@ -49,7 +50,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -275,7 +275,7 @@ private List loadInstants( Function commitsFilter) { try { // List all files - List fileNames = latestSnapshotFiles(metaClient); + List fileNames = latestSnapshotManifest(metaClient).getFileNames(); Map instantsInRange = new ConcurrentHashMap<>(); Schema readSchema = getReadSchema(loadMode); @@ -377,22 +377,26 @@ public static List allSnapshotVersions(HoodieTableMetaClient metaClient /** * Returns the latest snapshot metadata files. */ - public static List latestSnapshotFiles(HoodieTableMetaClient metaClient) throws IOException { + public static HoodieArchivedManifest latestSnapshotManifest(HoodieTableMetaClient metaClient) throws IOException { int latestVersion = latestSnapshotVersion(metaClient); - return latestSnapshotFiles(metaClient, latestVersion); + return latestSnapshotManifest(metaClient, latestVersion); } /** * Reads the file list from the manifest file for the latest snapshot. */ - public static List latestSnapshotFiles(HoodieTableMetaClient metaClient, int latestVersion) { + public static HoodieArchivedManifest latestSnapshotManifest(HoodieTableMetaClient metaClient, int latestVersion) { if (latestVersion < 0) { // there is no valid snapshot of the timeline. - return Collections.emptyList(); + return HoodieArchivedManifest.EMPTY; } // read and deserialize the valid files. byte[] content = FileIOUtils.readDataFromPath(metaClient.getFs(), getManifestFilePath(metaClient, latestVersion)).get(); - return Arrays.stream(new String(content).split(",")).collect(Collectors.toList()); + try { + return HoodieArchivedManifest.fromJsonString(new String(content, StandardCharsets.UTF_8), HoodieArchivedManifest.class); + } catch (Exception e) { + throw new HoodieException("Error deserializing manifest entries", e); + } } /** @@ -557,14 +561,4 @@ public boolean isInRange(String instantTime) { return HoodieTimeline.compareTimestamps(instantTime, GREATER_THAN_OR_EQUALS, startTs); } } - - /** - * Sort files by order of min instant time in file name. - */ - public static class ArchiveParquetVersionComparator implements Comparator, Serializable { - @Override - public int compare(String f1, String f2) { - return f1.compareTo(f2); - } - } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala index 04989880472da..0c418281f2e4a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ArchivedTimelineReadBenchmark.scala @@ -21,9 +21,8 @@ package org.apache.spark.sql.execution.benchmark import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.DummyActiveInstant -import org.apache.hudi.client.HoodieTimelineArchiver import org.apache.hudi.client.common.HoodieJavaEngineContext -import org.apache.hudi.client.utils.ActiveInstant +import org.apache.hudi.client.utils.{ActiveInstant, ArchivedTimelineWriter} import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieTableType, WriteOperationType} import org.apache.hudi.common.table.timeline.{HoodieArchivedTimeline, HoodieInstant} import org.apache.hudi.common.testutils.{HoodieTestTable, HoodieTestUtils} @@ -56,7 +55,7 @@ object ArchivedTimelineReadBenchmark extends HoodieBenchmarkBase { .withMarkersType("DIRECT") .build() val engineContext = new HoodieJavaEngineContext(new Configuration()) - val archiver = new HoodieTimelineArchiver(writeConfig, HoodieJavaTable.create(writeConfig, engineContext).asInstanceOf[HoodieJavaTable[HoodieAvroPayload]]) + val writer = ArchivedTimelineWriter.getInstance(writeConfig, HoodieJavaTable.create(writeConfig, engineContext).asInstanceOf[HoodieJavaTable[HoodieAvroPayload]]) val startTs = System.currentTimeMillis() val startInstant = startTs + 1 + "" @@ -71,8 +70,8 @@ object ArchivedTimelineReadBenchmark extends HoodieBenchmarkBase { instantBuffer.add(new DummyActiveInstant(instant, metadata)) if (i % batchSize == 0) { // archive 10 instants each time - archiver.archive(engineContext, instantBuffer) - archiver.compactAndClean(engineContext) + writer.write(engineContext, instantBuffer) + writer.compactAndClean(engineContext) instantBuffer.clear() } } @@ -85,8 +84,8 @@ object ArchivedTimelineReadBenchmark extends HoodieBenchmarkBase { new HoodieArchivedTimeline(metaClient, startInstant) } benchmark.run() - val totalSize = HoodieArchivedTimeline.latestSnapshotFiles(metaClient).asScala - .map(name => metaClient.getFs.getFileStatus(new Path(metaClient.getArchivePath, name)).getLen) + val totalSize = HoodieArchivedTimeline.latestSnapshotManifest(metaClient).getFiles.asScala + .map(f => f.getFileLen) .sum println("Total file size in bytes: " + totalSize) })