From 021431b0af5d47f73886df59d024ea6f404d248e Mon Sep 17 00:00:00 2001 From: Joel Baranick Date: Wed, 23 Dec 2015 10:48:08 -0800 Subject: [PATCH 01/15] Adding file level uploading and state tracking to BaseDataPublisher. --- .../gobblin/configuration/SourceState.java | 3 +- .../gobblin/publisher/BaseDataPublisher.java | 258 ++++++++++-------- .../TimePartitionedDataPublisher.java | 33 --- .../extractor/hadoop/HadoopFsHelper.java | 16 +- .../src/main/java/gobblin/util/Action.java | 17 ++ .../main/java/gobblin/util/FileListUtils.java | 18 +- .../main/java/gobblin/util/HadoopUtils.java | 24 +- .../java/gobblin/util/ParallelRunner.java | 7 +- .../java/gobblin/util/ParallelRunnerTest.java | 2 +- 9 files changed, 211 insertions(+), 167 deletions(-) create mode 100644 gobblin-utility/src/main/java/gobblin/util/Action.java diff --git a/gobblin-api/src/main/java/gobblin/configuration/SourceState.java b/gobblin-api/src/main/java/gobblin/configuration/SourceState.java index 03104f04cd9..a7c9bd946d1 100644 --- a/gobblin-api/src/main/java/gobblin/configuration/SourceState.java +++ b/gobblin-api/src/main/java/gobblin/configuration/SourceState.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -51,7 +52,7 @@ */ public class SourceState extends State { - private static final Set EXTRACT_SET = Sets.newConcurrentHashSet(); + private static final Set EXTRACT_SET = Sets.newSetFromMap(new ConcurrentHashMap()); private static final DateTimeFormatter DTF = DateTimeFormat.forPattern("yyyyMMddHHmmss").withLocale(Locale.US).withZone(DateTimeZone.UTC); diff --git a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java index 75753b84ce9..afe0f641c2e 100644 --- a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java +++ b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java @@ -15,13 +15,14 @@ import java.io.IOException; import java.net.URI; import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.Objects; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import gobblin.util.Action; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -31,7 +32,6 @@ import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.google.common.io.Closer; import gobblin.configuration.State; @@ -125,105 +125,98 @@ public void close() throws IOException { @Override public void publishData(WorkUnitState state) throws IOException { - for (int branchId = 0; branchId < this.numBranches; branchId++) { - publishSingleTaskData(state, branchId); - } - } + WorkUnitPublishGroup group = getWorkUnitPublishGroup(state); + if (group != null) { + // Get a ParallelRunner instance for moving files in parallel + ParallelRunner parallelRunner = this.getParallelRunner( + this.writerFileSystemByBranches.get(group.getBranchId())); - /** - * This method publishes output data for a single task based on the given {@link WorkUnitState}. - * Output data from other tasks won't be published even if they are in the same folder. - */ - private void publishSingleTaskData(WorkUnitState state, int branchId) throws IOException { - publishData(state, branchId, true, new HashSet()); + // Create final output directory + WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get( + group.getBranchId()), group.getPublisherOutputDir(), + this.permissions.get(group.getBranchId())); + addSingleTaskWriterOutputToExistingDir(group.getWriterOutputDir(), + group.getPublisherOutputDir(), state, group.getBranchId(), + parallelRunner); + } } @Override public void publishData(Collection states) throws IOException { - - // We need a Set to collect unique writer output paths as multiple tasks may belong to the same extract. Tasks that - // belong to the same Extract will by default have the same output directory - Set writerOutputPathsMoved = Sets.newHashSet(); - - for (WorkUnitState workUnitState : states) { - for (int branchId = 0; branchId < this.numBranches; branchId++) { - publishMultiTaskData(workUnitState, branchId, writerOutputPathsMoved); - } - - // Upon successfully committing the data to the final output directory, set states - // of successful tasks to COMMITTED. leaving states of unsuccessful ones unchanged. - // This makes sense to the COMMIT_ON_PARTIAL_SUCCESS policy. - workUnitState.setWorkingState(WorkUnitState.WorkingState.COMMITTED); - } - } - - /** - * This method publishes task output data for the given {@link WorkUnitState}, but if there are output data of - * other tasks in the same folder, it may also publish those data. - */ - private void publishMultiTaskData(WorkUnitState state, int branchId, Set writerOutputPathsMoved) - throws IOException { - publishData(state, branchId, false, writerOutputPathsMoved); - } - - protected void publishData(WorkUnitState state, int branchId, boolean publishSingleTaskData, - Set writerOutputPathsMoved) throws IOException { - // Get a ParallelRunner instance for moving files in parallel - ParallelRunner parallelRunner = this.getParallelRunner(this.writerFileSystemByBranches.get(branchId)); - - // The directory where the workUnitState wrote its output data. - Path writerOutputDir = WriterUtils.getWriterOutputDir(state, this.numBranches, branchId); - - if (!this.writerFileSystemByBranches.get(branchId).exists(writerOutputDir)) { - LOG.warn(String.format("Branch %d of WorkUnit %s produced no data", branchId, state.getId())); - return; + Multimap roots = ArrayListMultimap.create(); + for (WorkUnitState state : states) { + WorkUnitPublishGroup group = getWorkUnitPublishGroup(state); + if (group != null) { + roots.put(group, state); + } } - // The directory where the final output directory for this job will be placed. - // It is a combination of DATA_PUBLISHER_FINAL_DIR and WRITER_FILE_PATH. - Path publisherOutputDir = getPublisherOutputDir(state, branchId); + for (WorkUnitPublishGroup group : roots.keySet()) { + ParallelRunner parallelRunner = this.getParallelRunner(this.writerFileSystemByBranches.get(group.getBranchId())); - if (publishSingleTaskData) { - - // Create final output directory - WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(branchId), publisherOutputDir, - this.permissions.get(branchId)); - addSingleTaskWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner); - } else { - if (writerOutputPathsMoved.contains(writerOutputDir)) { - // This writer output path has already been moved for another task of the same extract - // If publishSingleTaskData=true, writerOutputPathMoved is ignored. - return; - } - - if (this.publisherFileSystemByBranches.get(branchId).exists(publisherOutputDir)) { + if (this.publisherFileSystemByBranches.get(group.getBranchId()).exists(group.getPublisherOutputDir())) { // The final output directory already exists, check if the job is configured to replace it. // If publishSingleTaskData=true, final output directory is never replaced. - boolean replaceFinalOutputDir = this.getState().getPropAsBoolean(ForkOperatorUtils - .getPropertyNameForBranch(ConfigurationKeys.DATA_PUBLISHER_REPLACE_FINAL_DIR, this.numBranches, branchId)); + boolean replaceFinalOutputDir = this.getState().getPropAsBoolean( + ForkOperatorUtils.getPropertyNameForBranch( + ConfigurationKeys.DATA_PUBLISHER_REPLACE_FINAL_DIR, this.numBranches, group.getBranchId())); - // If the final output directory is not configured to be replaced, put new data to the existing directory. + // If the final output directory is configured to be replaced, delete the existing publisher output directory if (!replaceFinalOutputDir) { - addWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner); - writerOutputPathsMoved.add(writerOutputDir); - return; + LOG.info("Deleting publisher output dir " + group.getPublisherOutputDir()); + this.publisherFileSystemByBranches.get(group.getBranchId()).delete(group.getPublisherOutputDir(), true); } - // Delete the final output directory if it is configured to be replaced - LOG.info("Deleting publisher output dir " + publisherOutputDir); - this.publisherFileSystemByBranches.get(branchId).delete(publisherOutputDir, true); } else { - // Create the parent directory of the final output directory if it does not exist - WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(branchId), - publisherOutputDir.getParent(), this.permissions.get(branchId)); + // Create the parent directory of the final output directory if it does not exist + WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(group.getBranchId()), + group.getPublisherOutputDir().getParent(), this.permissions.get(group.getBranchId())); } - LOG.info(String.format("Moving %s to %s", writerOutputDir, publisherOutputDir)); - parallelRunner.movePath(writerOutputDir, this.publisherFileSystemByBranches.get(branchId), - publisherOutputDir, this.publisherFinalDirOwnerGroupsByBranches.get(branchId)); - writerOutputPathsMoved.add(writerOutputDir); + for (WorkUnitState state : roots.get(group)) { + boolean preserveFileName = state.getPropAsBoolean(ForkOperatorUtils.getPropertyNameForBranch( + ConfigurationKeys.SOURCE_FILEBASED_PRESERVE_FILE_NAME, this.numBranches, group.getBranchId()), false); + + String outputFilePropName = ForkOperatorUtils + .getPropertyNameForBranch(ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS, this.numBranches, group.getBranchId()); + + Iterable taskOutputFiles = state.getPropAsList(outputFilePropName); + for (String taskOutputFile : taskOutputFiles) { + Path taskOutputPath = new Path(taskOutputFile); + if (!this.writerFileSystemByBranches.get(group.getBranchId()).exists(taskOutputPath)) { + LOG.warn("Task output file " + taskOutputFile + " doesn't exist."); + continue; + } + String pathSuffix; + if (preserveFileName) { + pathSuffix = state.getProp(ForkOperatorUtils.getPropertyNameForBranch( + ConfigurationKeys.DATA_PUBLISHER_FINAL_NAME, this.numBranches, group.getBranchId())); + } else { + pathSuffix = taskOutputFile + .substring(taskOutputFile.indexOf(group.getWriterOutputDir().toString()) + + group.getWriterOutputDir().toString().length() + 1); + } + Path publisherOutputPath = new Path(group.getPublisherOutputDir(), pathSuffix); + WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(group.getBranchId()), + publisherOutputPath.getParent(), this.permissions.get(group.getBranchId())); + + LOG.info(String.format("Moving %s to %s", taskOutputFile, publisherOutputPath)); + parallelRunner.movePath(taskOutputPath, this.publisherFileSystemByBranches.get(group.getBranchId()), + publisherOutputPath, Optional.absent(), new CommitAction(state)); + } + } } } + @Override + public void publishMetadata(Collection states) throws IOException { + // Nothing to do + } + + @Override + public void publishMetadata(WorkUnitState state) throws IOException { + // Nothing to do + } + /** * Get the output directory path this {@link BaseDataPublisher} will write to. * @@ -265,29 +258,7 @@ protected void addSingleTaskWriterOutputToExistingDir(Path writerOutputDir, Path LOG.info(String.format("Moving %s to %s", taskOutputFile, publisherOutputPath)); parallelRunner.movePath(taskOutputPath, this.publisherFileSystemByBranches.get(branchId), - publisherOutputPath, Optional. absent()); - } - } - - protected void addWriterOutputToExistingDir(Path writerOutputDir, Path publisherOutputDir, - WorkUnitState workUnitState, int branchId, ParallelRunner parallelRunner) throws IOException { - boolean preserveFileName = workUnitState.getPropAsBoolean(ForkOperatorUtils.getPropertyNameForBranch( - ConfigurationKeys.SOURCE_FILEBASED_PRESERVE_FILE_NAME, this.numBranches, branchId), false); - - // Go through each file in writerOutputDir and move it into publisherOutputDir - for (FileStatus status : this.writerFileSystemByBranches.get(branchId).listStatus(writerOutputDir)) { - - // Preserve the file name if configured, use specified name otherwise - Path finalOutputPath = - preserveFileName - ? new Path(publisherOutputDir, - workUnitState.getProp(ForkOperatorUtils.getPropertyNameForBranch( - ConfigurationKeys.DATA_PUBLISHER_FINAL_NAME, this.numBranches, branchId))) - : new Path(publisherOutputDir, status.getPath().getName()); - - LOG.info(String.format("Moving %s to %s", status.getPath(), finalOutputPath)); - parallelRunner.movePath(status.getPath(), this.publisherFileSystemByBranches.get(branchId), - finalOutputPath, Optional. absent()); + publisherOutputPath, Optional. absent(), new CommitAction(workUnitState)); } } @@ -299,14 +270,75 @@ private ParallelRunner getParallelRunner(FileSystem fs) { return this.parallelRunners.get(uri); } - @Override - public void publishMetadata(Collection states) throws IOException { - // Nothing to do + private WorkUnitPublishGroup getWorkUnitPublishGroup(WorkUnitState state) throws IOException { + for (int branchId = 0; branchId < this.numBranches; branchId++) { + // The directory where the workUnitState wrote its output data. + Path writerOutputDir = WriterUtils.getWriterOutputDir(state, this.numBranches, branchId); + + if (!this.writerFileSystemByBranches.get(branchId).exists(writerOutputDir)) { + LOG.warn(String.format("Branch %d of WorkUnit %s produced no data", branchId, state.getId())); + continue; + } + + // The directory where the final output directory for this job will be placed. + // It is a combination of DATA_PUBLISHER_FINAL_DIR and WRITER_FILE_PATH. + Path publisherOutputDir = getPublisherOutputDir(state, branchId); + + return new WorkUnitPublishGroup(branchId, writerOutputDir, publisherOutputDir); + } + return null; } - @Override - public void publishMetadata(WorkUnitState state) throws IOException { - // Nothing to do + private static class CommitAction implements Action { + private final WorkUnitState state; + + public CommitAction(WorkUnitState state) { + this.state = state; + } + + @Override + public void apply() { + state.setWorkingState(WorkUnitState.WorkingState.COMMITTED); + } } + private class WorkUnitPublishGroup { + private final int branchId; + private final Path writerOutputDir; + private final Path publisherOutputDir; + + public WorkUnitPublishGroup(int branchId, Path writerOutputDir, Path publisherOutputDir) { + this.branchId = branchId; + this.writerOutputDir = writerOutputDir; + this.publisherOutputDir = publisherOutputDir; + } + + public int getBranchId() { + return branchId; + } + + public Path getWriterOutputDir() { + return writerOutputDir; + } + + public Path getPublisherOutputDir() { + return publisherOutputDir; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + WorkUnitPublishGroup that = (WorkUnitPublishGroup) o; + return Objects.equals(branchId, that.branchId) && + Objects.equals(writerOutputDir, that.writerOutputDir) && + Objects.equals(publisherOutputDir, that.publisherOutputDir); + } + + @Override + public int hashCode() { + return Objects.hash(branchId, writerOutputDir, publisherOutputDir); + } + } } diff --git a/gobblin-core/src/main/java/gobblin/publisher/TimePartitionedDataPublisher.java b/gobblin-core/src/main/java/gobblin/publisher/TimePartitionedDataPublisher.java index aacb966ec3e..2d79de92cd8 100644 --- a/gobblin-core/src/main/java/gobblin/publisher/TimePartitionedDataPublisher.java +++ b/gobblin-core/src/main/java/gobblin/publisher/TimePartitionedDataPublisher.java @@ -12,19 +12,12 @@ package gobblin.publisher; -import com.google.common.base.Optional; import java.io.IOException; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import gobblin.configuration.State; -import gobblin.configuration.WorkUnitState; -import gobblin.util.FileListUtils; -import gobblin.util.ParallelRunner; -import gobblin.util.WriterUtils; /** @@ -43,30 +36,4 @@ public class TimePartitionedDataPublisher extends BaseDataPublisher { public TimePartitionedDataPublisher(State state) throws IOException { super(state); } - - /** - * This method needs to be overridden for TimePartitionedDataPublisher, since the output folder structure - * contains timestamp, we have to move the files recursively. - * - * For example, move {writerOutput}/2015/04/08/15/output.avro to {publisherOutput}/2015/04/08/15/output.avro - */ - @Override - protected void addWriterOutputToExistingDir(Path writerOutput, Path publisherOutput, WorkUnitState workUnitState, - int branchId, ParallelRunner parallelRunner) throws IOException { - - for (FileStatus status : FileListUtils.listFilesRecursively(this.writerFileSystemByBranches.get(branchId), - writerOutput)) { - String filePathStr = status.getPath().toString(); - String pathSuffix = - filePathStr.substring(filePathStr.indexOf(writerOutput.toString()) + writerOutput.toString().length() + 1); - Path outputPath = new Path(publisherOutput, pathSuffix); - - WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(branchId), outputPath.getParent(), - this.permissions.get(branchId)); - - LOG.info(String.format("Moving %s to %s", status.getPath(), outputPath)); - parallelRunner.movePath(status.getPath(), this.publisherFileSystemByBranches.get(branchId), - outputPath, Optional. absent()); - } - } } diff --git a/gobblin-core/src/main/java/gobblin/source/extractor/hadoop/HadoopFsHelper.java b/gobblin-core/src/main/java/gobblin/source/extractor/hadoop/HadoopFsHelper.java index 773e778260e..2b62bdf17ea 100644 --- a/gobblin-core/src/main/java/gobblin/source/extractor/hadoop/HadoopFsHelper.java +++ b/gobblin-core/src/main/java/gobblin/source/extractor/hadoop/HadoopFsHelper.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.List; +import gobblin.util.FileListUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -79,19 +80,8 @@ public List ls(String path) throws FileBasedHelperException { } public void lsr(Path p, List results) throws IOException { - if (!this.fs.getFileStatus(p).isDir()) { - results.add(p.toString()); - } - Path qualifiedPath = this.fs.makeQualified(p); - for (FileStatus status : this.fs.listStatus(p)) { - if (status.isDir()) { - // Fix for hadoop issue: https://issues.apache.org/jira/browse/HADOOP-12169 - if (!qualifiedPath.equals(status.getPath())) { - lsr(status.getPath(), results); - } - } else { - results.add(status.getPath().toString()); - } + for (FileStatus fileStatus : FileListUtils.listFilesRecursively(this.fs, p)) { + results.add(fileStatus.getPath().toString()); } } diff --git a/gobblin-utility/src/main/java/gobblin/util/Action.java b/gobblin-utility/src/main/java/gobblin/util/Action.java new file mode 100644 index 00000000000..5ebb6395968 --- /dev/null +++ b/gobblin-utility/src/main/java/gobblin/util/Action.java @@ -0,0 +1,17 @@ +/* + * Copyright (C) 2014-2015 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. + */ + +package gobblin.util; + +public interface Action { + public void apply(); +} diff --git a/gobblin-utility/src/main/java/gobblin/util/FileListUtils.java b/gobblin-utility/src/main/java/gobblin/util/FileListUtils.java index 867f504b07a..adbb54c5900 100644 --- a/gobblin-utility/src/main/java/gobblin/util/FileListUtils.java +++ b/gobblin-utility/src/main/java/gobblin/util/FileListUtils.java @@ -55,9 +55,13 @@ public static List listFilesRecursively(FileSystem fs, Path path, Pa @SuppressWarnings("deprecation") private static List listFilesRecursivelyHelper(FileSystem fs, List files, FileStatus fileStatus, PathFilter fileFilter) throws FileNotFoundException, IOException { + Path qualifiedPath = fs.makeQualified(fileStatus.getPath()); if (fileStatus.isDir()) { for (FileStatus status : fs.listStatus(fileStatus.getPath())) { - listFilesRecursivelyHelper(fs, files, status, fileFilter); + // Fix for hadoop issue: https://issues.apache.org/jira/browse/HADOOP-12169 + if (!qualifiedPath.equals(status.getPath())) { + listFilesRecursivelyHelper(fs, files, status, fileFilter); + } } } else if (fileFilter.accept(fileStatus.getPath())) { files.add(fileStatus); @@ -87,12 +91,16 @@ public static List listMostNestedPathRecursively(FileSystem fs, Path private static List listMostNestedPathRecursivelyHelper(FileSystem fs, List files, FileStatus fileStatus, PathFilter fileFilter) throws FileNotFoundException, IOException { if (fileStatus.isDir()) { + Path qualifiedPath = fs.makeQualified(fileStatus.getPath()); FileStatus[] curFileStatus = fs.listStatus(fileStatus.getPath()); if (ArrayUtils.isEmpty(curFileStatus)) { files.add(fileStatus); } else { for (FileStatus status : curFileStatus) { - listMostNestedPathRecursivelyHelper(fs, files, status, fileFilter); + // Fix for hadoop issue: https://issues.apache.org/jira/browse/HADOOP-12169 + if (!qualifiedPath.equals(status.getPath())) { + listMostNestedPathRecursivelyHelper(fs, files, status, fileFilter); + } } } } else if (fileFilter.accept(fileStatus.getPath())) { @@ -118,8 +126,12 @@ private static List listPathsRecursivelyHelper(FileSystem fs, List group) { + public void movePath(final Path src, final FileSystem dstFs, final Path dst, final Optional group, + final Action commitAction) { this.futures.add(this.executor.submit(new Callable() { @Override public Void call() throws Exception { @@ -266,6 +268,9 @@ public Void call() throws Exception { if (group.isPresent()) { HadoopUtils.setGroup(dstFs, dst, group.get()); } + if (commitAction != null) { + commitAction.apply(); + } } return null; } catch (FileAlreadyExistsException e) { diff --git a/gobblin-utility/src/test/java/gobblin/util/ParallelRunnerTest.java b/gobblin-utility/src/test/java/gobblin/util/ParallelRunnerTest.java index 88be08bf5c2..03674fcaaa0 100644 --- a/gobblin-utility/src/test/java/gobblin/util/ParallelRunnerTest.java +++ b/gobblin-utility/src/test/java/gobblin/util/ParallelRunnerTest.java @@ -172,7 +172,7 @@ public void testMovePath() throws IOException, URISyntaxException { Mockito.when(fs2.create(dst, false)).thenReturn(new FSDataOutputStream(actual, null)); try (ParallelRunner parallelRunner = new ParallelRunner(1, fs1)) { - parallelRunner.movePath(src, fs2, dst, Optional.absent()); + parallelRunner.movePath(src, fs2, dst, Optional.absent(), null); } Assert.assertEquals(actual.toString(), expected); From a7427b1dd77bab0d9c24e507bee0df85e807cf97 Mon Sep 17 00:00:00 2001 From: Joel Baranick Date: Wed, 23 Dec 2015 15:11:30 -0800 Subject: [PATCH 02/15] Handle branched workunits correctly. Only run the commitaction when all the files for the workunit branch are moved successfully. --- .../gobblin/publisher/BaseDataPublisher.java | 30 +++++---- .../runtime/JobLauncherTestHelper.java | 1 - .../java/gobblin/util/ParallelRunner.java | 62 ++++++++++++++----- .../java/gobblin/util/ParallelRunnerTest.java | 3 +- 4 files changed, 65 insertions(+), 31 deletions(-) diff --git a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java index afe0f641c2e..63b29b615b1 100644 --- a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java +++ b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java @@ -20,6 +20,8 @@ import java.util.Objects; import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; import gobblin.util.Action; import org.apache.hadoop.conf.Configuration; @@ -125,8 +127,8 @@ public void close() throws IOException { @Override public void publishData(WorkUnitState state) throws IOException { - WorkUnitPublishGroup group = getWorkUnitPublishGroup(state); - if (group != null) { + List groups = getWorkUnitPublishGroup(state); + for (WorkUnitPublishGroup group : groups) { // Get a ParallelRunner instance for moving files in parallel ParallelRunner parallelRunner = this.getParallelRunner( this.writerFileSystemByBranches.get(group.getBranchId())); @@ -145,10 +147,10 @@ public void publishData(WorkUnitState state) throws IOException { public void publishData(Collection states) throws IOException { Multimap roots = ArrayListMultimap.create(); for (WorkUnitState state : states) { - WorkUnitPublishGroup group = getWorkUnitPublishGroup(state); - if (group != null) { - roots.put(group, state); - } + List groups = getWorkUnitPublishGroup(state); + for (WorkUnitPublishGroup group : groups) { + roots.put(group, state); + } } for (WorkUnitPublishGroup group : roots.keySet()) { @@ -179,7 +181,8 @@ public void publishData(Collection states) throws IOExc String outputFilePropName = ForkOperatorUtils .getPropertyNameForBranch(ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS, this.numBranches, group.getBranchId()); - Iterable taskOutputFiles = state.getPropAsList(outputFilePropName); + Iterable taskOutputFiles = state.getPropAsList(outputFilePropName, ""); + ImmutableMap.Builder movesBuilder = new ImmutableMap.Builder<>(); for (String taskOutputFile : taskOutputFiles) { Path taskOutputPath = new Path(taskOutputFile); if (!this.writerFileSystemByBranches.get(group.getBranchId()).exists(taskOutputPath)) { @@ -199,10 +202,10 @@ public void publishData(Collection states) throws IOExc WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(group.getBranchId()), publisherOutputPath.getParent(), this.permissions.get(group.getBranchId())); - LOG.info(String.format("Moving %s to %s", taskOutputFile, publisherOutputPath)); - parallelRunner.movePath(taskOutputPath, this.publisherFileSystemByBranches.get(group.getBranchId()), - publisherOutputPath, Optional.absent(), new CommitAction(state)); + movesBuilder.put(taskOutputPath, publisherOutputPath); } + parallelRunner.movePaths(this.publisherFileSystemByBranches.get(group.getBranchId()), + movesBuilder.build(), Optional.absent(), new CommitAction(state)); } } } @@ -270,7 +273,8 @@ private ParallelRunner getParallelRunner(FileSystem fs) { return this.parallelRunners.get(uri); } - private WorkUnitPublishGroup getWorkUnitPublishGroup(WorkUnitState state) throws IOException { + private List getWorkUnitPublishGroup(WorkUnitState state) throws IOException { + ImmutableList.Builder builder = new ImmutableList.Builder<>(); for (int branchId = 0; branchId < this.numBranches; branchId++) { // The directory where the workUnitState wrote its output data. Path writerOutputDir = WriterUtils.getWriterOutputDir(state, this.numBranches, branchId); @@ -284,9 +288,9 @@ private WorkUnitPublishGroup getWorkUnitPublishGroup(WorkUnitState state) throws // It is a combination of DATA_PUBLISHER_FINAL_DIR and WRITER_FILE_PATH. Path publisherOutputDir = getPublisherOutputDir(state, branchId); - return new WorkUnitPublishGroup(branchId, writerOutputDir, publisherOutputDir); + builder.add(new WorkUnitPublishGroup(branchId, writerOutputDir, publisherOutputDir)); } - return null; + return builder.build(); } private static class CommitAction implements Action { diff --git a/gobblin-runtime/src/test/java/gobblin/runtime/JobLauncherTestHelper.java b/gobblin-runtime/src/test/java/gobblin/runtime/JobLauncherTestHelper.java index 69b0aa7afaa..da097927ca7 100644 --- a/gobblin-runtime/src/test/java/gobblin/runtime/JobLauncherTestHelper.java +++ b/gobblin-runtime/src/test/java/gobblin/runtime/JobLauncherTestHelper.java @@ -294,7 +294,6 @@ public void runTestWithMultipleDatasetsAndFaultyExtractor(Properties jobProps, b Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED); Assert.assertEquals(datasetState.getTaskCount(), 1); TaskState taskState = datasetState.getTaskStates().get(0); - // BaseDataPublisher will change the state to COMMITTED Assert.assertEquals(taskState.getWorkingState(), WorkUnitState.WorkingState.COMMITTED); } else { // Task 0 should have failed diff --git a/gobblin-utility/src/main/java/gobblin/util/ParallelRunner.java b/gobblin-utility/src/main/java/gobblin/util/ParallelRunner.java index 158acbd465d..dc9cd4a8ac7 100644 --- a/gobblin-utility/src/main/java/gobblin/util/ParallelRunner.java +++ b/gobblin-utility/src/main/java/gobblin/util/ParallelRunner.java @@ -16,6 +16,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -23,6 +24,8 @@ import java.util.concurrent.Future; import java.util.concurrent.locks.Lock; +import com.google.common.collect.ImmutableMap; +import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -68,6 +71,7 @@ * * @author ynli */ +@Slf4j public class ParallelRunner implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(ParallelRunner.class); @@ -216,7 +220,10 @@ public Void call() throws Exception { * @param src path to be renamed * @param dst new path after rename * @param group an optional group name for the destination path + * + * @Deprecated Use {@link gobblin.util.ParallelRunner#movePath(Path, FileSystem, Path, Optional, Action)} */ + @Deprecated public void renamePath(final Path src, final Path dst, final Optional group) { this.futures.add(this.executor.submit(new Callable() { @Override @@ -257,28 +264,51 @@ public Void call() throws Exception { */ public void movePath(final Path src, final FileSystem dstFs, final Path dst, final Optional group, final Action commitAction) { + movePaths(dstFs, ImmutableMap.of(src, dst), group, commitAction); + } + + /** + * Move a set of {@link Path}. + * + *

+ * This method submits a task to move a set of {@link Path} and returns immediately + * after the task is submitted. + *

+ * + * @param dstFs the destination {@link FileSystem} + * @param moves the set of src path to dst path pairs + * @param group an optional group name for the destination path + * @param commitAction an action to perform when the move completes successfully + */ + public void movePaths(final FileSystem dstFs, final Map moves, final Optional group, + final Action commitAction) { this.futures.add(this.executor.submit(new Callable() { @Override public Void call() throws Exception { - Lock lock = locks.get(src.toString()); - lock.lock(); - try { - if (fs.exists(src)) { - HadoopUtils.movePath(fs, src, dstFs, dst); - if (group.isPresent()) { - HadoopUtils.setGroup(dstFs, dst, group.get()); - } - if (commitAction != null) { - commitAction.apply(); + for (Map.Entry move : moves.entrySet()) { + Path src = move.getKey(); + Path dst = move.getValue(); + log.info(String.format("Moving %s to %s", src, dst)); + Lock lock = locks.get(src.toString()); + lock.lock(); + try { + if (fs.exists(src)) { + HadoopUtils.movePath(fs, src, dstFs, dst); + if (group.isPresent()) { + HadoopUtils.setGroup(dstFs, dst, group.get()); + } } + } catch (FileAlreadyExistsException e) { + LOGGER.warn(String.format("Failed to move %s to %s: dst already exists", src, dst), e); + return null; + } finally { + lock.unlock(); } - return null; - } catch (FileAlreadyExistsException e) { - LOGGER.warn(String.format("Failed to move %s to %s: dst already exists", src, dst), e); - return null; - } finally { - lock.unlock(); } + if (commitAction != null) { + commitAction.apply(); + } + return null; } })); } diff --git a/gobblin-utility/src/test/java/gobblin/util/ParallelRunnerTest.java b/gobblin-utility/src/test/java/gobblin/util/ParallelRunnerTest.java index 03674fcaaa0..f4242b61002 100644 --- a/gobblin-utility/src/test/java/gobblin/util/ParallelRunnerTest.java +++ b/gobblin-utility/src/test/java/gobblin/util/ParallelRunnerTest.java @@ -19,6 +19,7 @@ import java.net.URISyntaxException; import java.util.Queue; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -172,7 +173,7 @@ public void testMovePath() throws IOException, URISyntaxException { Mockito.when(fs2.create(dst, false)).thenReturn(new FSDataOutputStream(actual, null)); try (ParallelRunner parallelRunner = new ParallelRunner(1, fs1)) { - parallelRunner.movePath(src, fs2, dst, Optional.absent(), null); + parallelRunner.movePaths(fs2, ImmutableMap.of(src, dst), Optional.absent(), null); } Assert.assertEquals(actual.toString(), expected); From 68071bc6eaf33c86c1f079b9fc39b4beebcf7233 Mon Sep 17 00:00:00 2001 From: Joel Baranick Date: Wed, 23 Dec 2015 20:13:34 -0800 Subject: [PATCH 03/15] Remove FileSystem from ParallelRunner and changed BaseDataPublisher to not set the state on a WorkUnit to COMMITTED until all branches are published. --- .../gobblin/publisher/BaseDataPublisher.java | 151 ++++++------------ .../runtime/TaskStateCollectorService.java | 4 +- .../runtime/mapreduce/MRJobLauncher.java | 4 +- .../java/gobblin/util/JobLauncherUtils.java | 6 +- .../java/gobblin/util/ParallelRunner.java | 85 ++++++---- .../java/gobblin/util/ParallelRunnerTest.java | 23 ++- .../gobblin/yarn/GobblinHelixJobLauncher.java | 15 +- 7 files changed, 127 insertions(+), 161 deletions(-) diff --git a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java index 63b29b615b1..341d6b30ffd 100644 --- a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java +++ b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java @@ -16,13 +16,11 @@ import java.net.URI; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Objects; +import java.util.Set; -import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; import gobblin.util.Action; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -33,7 +31,6 @@ import com.google.common.base.Optional; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.io.Closer; import gobblin.configuration.State; @@ -67,8 +64,7 @@ public class BaseDataPublisher extends SingleTaskDataPublisher { protected final List> publisherFinalDirOwnerGroupsByBranches; protected final List permissions; protected final Closer closer; - protected final int parallelRunnerThreads; - protected final Map parallelRunners = Maps.newHashMap(); + protected final ParallelRunner parallelRunner; public BaseDataPublisher(State state) throws IOException { super(state); @@ -111,8 +107,9 @@ public BaseDataPublisher(State state) throws IOException { FsPermission.getDefault().toShort(), ConfigurationKeys.PERMISSION_PARSING_RADIX))); } - this.parallelRunnerThreads = + int parallelRunnerThreads = state.getPropAsInt(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY, ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS); + this.parallelRunner = this.closer.register(new ParallelRunner(parallelRunnerThreads * this.numBranches)); } @Override @@ -127,86 +124,77 @@ public void close() throws IOException { @Override public void publishData(WorkUnitState state) throws IOException { - List groups = getWorkUnitPublishGroup(state); - for (WorkUnitPublishGroup group : groups) { - // Get a ParallelRunner instance for moving files in parallel - ParallelRunner parallelRunner = this.getParallelRunner( - this.writerFileSystemByBranches.get(group.getBranchId())); - - // Create final output directory - WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get( - group.getBranchId()), group.getPublisherOutputDir(), - this.permissions.get(group.getBranchId())); - addSingleTaskWriterOutputToExistingDir(group.getWriterOutputDir(), - group.getPublisherOutputDir(), state, group.getBranchId(), - parallelRunner); - } + publishData(ImmutableList.of(state)); } @Override public void publishData(Collection states) throws IOException { - Multimap roots = ArrayListMultimap.create(); + Set preparedGroups = Sets.newHashSet(); for (WorkUnitState state : states) { + ImmutableList.Builder movesBuilder = new ImmutableList.Builder<>(); List groups = getWorkUnitPublishGroup(state); for (WorkUnitPublishGroup group : groups) { - roots.put(group, state); - } - } - - for (WorkUnitPublishGroup group : roots.keySet()) { - ParallelRunner parallelRunner = this.getParallelRunner(this.writerFileSystemByBranches.get(group.getBranchId())); - - if (this.publisherFileSystemByBranches.get(group.getBranchId()).exists(group.getPublisherOutputDir())) { - // The final output directory already exists, check if the job is configured to replace it. - // If publishSingleTaskData=true, final output directory is never replaced. - boolean replaceFinalOutputDir = this.getState().getPropAsBoolean( - ForkOperatorUtils.getPropertyNameForBranch( - ConfigurationKeys.DATA_PUBLISHER_REPLACE_FINAL_DIR, this.numBranches, group.getBranchId())); - - // If the final output directory is configured to be replaced, delete the existing publisher output directory - if (!replaceFinalOutputDir) { - LOG.info("Deleting publisher output dir " + group.getPublisherOutputDir()); - this.publisherFileSystemByBranches.get(group.getBranchId()).delete(group.getPublisherOutputDir(), true); + if (preparedGroups.add(group)) { + prepareWorkUnitPublishGroup(group); } - } else { - // Create the parent directory of the final output directory if it does not exist - WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(group.getBranchId()), - group.getPublisherOutputDir().getParent(), this.permissions.get(group.getBranchId())); - } - - for (WorkUnitState state : roots.get(group)) { boolean preserveFileName = state.getPropAsBoolean(ForkOperatorUtils.getPropertyNameForBranch( ConfigurationKeys.SOURCE_FILEBASED_PRESERVE_FILE_NAME, this.numBranches, group.getBranchId()), false); - String outputFilePropName = ForkOperatorUtils - .getPropertyNameForBranch(ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS, this.numBranches, group.getBranchId()); + String outputFilePropName = ForkOperatorUtils.getPropertyNameForBranch( + ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS, this.numBranches, group.getBranchId()); + + if (!state.contains(outputFilePropName)) { + LOG.warn("Missing property " + outputFilePropName + ". This task may have pulled no data."); + continue; + } - Iterable taskOutputFiles = state.getPropAsList(outputFilePropName, ""); - ImmutableMap.Builder movesBuilder = new ImmutableMap.Builder<>(); + Iterable taskOutputFiles = state.getPropAsList(outputFilePropName); for (String taskOutputFile : taskOutputFiles) { Path taskOutputPath = new Path(taskOutputFile); - if (!this.writerFileSystemByBranches.get(group.getBranchId()).exists(taskOutputPath)) { + FileSystem writerFileSystem = this.writerFileSystemByBranches.get(group.getBranchId()); + if (!writerFileSystem.exists(taskOutputPath)) { LOG.warn("Task output file " + taskOutputFile + " doesn't exist."); continue; } String pathSuffix; if (preserveFileName) { - pathSuffix = state.getProp(ForkOperatorUtils.getPropertyNameForBranch( - ConfigurationKeys.DATA_PUBLISHER_FINAL_NAME, this.numBranches, group.getBranchId())); + pathSuffix = state.getProp(ForkOperatorUtils.getPropertyNameForBranch( + ConfigurationKeys.DATA_PUBLISHER_FINAL_NAME, this.numBranches, group.getBranchId())); } else { - pathSuffix = taskOutputFile - .substring(taskOutputFile.indexOf(group.getWriterOutputDir().toString()) + - group.getWriterOutputDir().toString().length() + 1); + pathSuffix = taskOutputFile.substring( + taskOutputFile.indexOf(group.getWriterOutputDir().toString()) + + group.getWriterOutputDir().toString().length() + 1); } Path publisherOutputPath = new Path(group.getPublisherOutputDir(), pathSuffix); - WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(group.getBranchId()), - publisherOutputPath.getParent(), this.permissions.get(group.getBranchId())); + FileSystem publisherFileSystem = this.publisherFileSystemByBranches.get(group.getBranchId()); + WriterUtils.mkdirsWithRecursivePermission(publisherFileSystem, publisherOutputPath.getParent(), + this.permissions.get(group.getBranchId())); - movesBuilder.put(taskOutputPath, publisherOutputPath); + movesBuilder.add(new ParallelRunner.MoveCommand(writerFileSystem, taskOutputPath, + publisherFileSystem, publisherOutputPath)); } - parallelRunner.movePaths(this.publisherFileSystemByBranches.get(group.getBranchId()), - movesBuilder.build(), Optional.absent(), new CommitAction(state)); } + this.parallelRunner.movePaths(movesBuilder.build(), Optional.absent(), new CommitAction(state)); + } + } + + private void prepareWorkUnitPublishGroup(WorkUnitPublishGroup group) throws IOException { + if (this.publisherFileSystemByBranches.get(group.getBranchId()).exists(group.getPublisherOutputDir())) { + // The final output directory already exists, check if the job is configured to replace it. + // If publishSingleTaskData=true, final output directory is never replaced. + boolean replaceFinalOutputDir = this.getState().getPropAsBoolean( + ForkOperatorUtils.getPropertyNameForBranch( + ConfigurationKeys.DATA_PUBLISHER_REPLACE_FINAL_DIR, this.numBranches, group.getBranchId())); + + // If the final output directory is configured to be replaced, delete the existing publisher output directory + if (!replaceFinalOutputDir) { + LOG.info("Deleting publisher output dir " + group.getPublisherOutputDir()); + this.publisherFileSystemByBranches.get(group.getBranchId()).delete(group.getPublisherOutputDir(), true); + } + } else { + // Create the parent directory of the final output directory if it does not exist + WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(group.getBranchId()), + group.getPublisherOutputDir().getParent(), this.permissions.get(group.getBranchId())); } } @@ -236,43 +224,6 @@ protected Path getPublisherOutputDir(WorkUnitState workUnitState, int branchId) return WriterUtils.getDataPublisherFinalDir(workUnitState, this.numBranches, branchId); } - protected void addSingleTaskWriterOutputToExistingDir(Path writerOutputDir, Path publisherOutputDir, - WorkUnitState workUnitState, int branchId, ParallelRunner parallelRunner) throws IOException { - String outputFilePropName = ForkOperatorUtils - .getPropertyNameForBranch(ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS, this.numBranches, branchId); - - if (!workUnitState.contains(outputFilePropName)) { - LOG.warn("Missing property " + outputFilePropName + ". This task may have pulled no data."); - return; - } - - Iterable taskOutputFiles = workUnitState.getPropAsList(outputFilePropName); - for (String taskOutputFile : taskOutputFiles) { - Path taskOutputPath = new Path(taskOutputFile); - if (!this.writerFileSystemByBranches.get(branchId).exists(taskOutputPath)) { - LOG.warn("Task output file " + taskOutputFile + " doesn't exist."); - continue; - } - String pathSuffix = taskOutputFile - .substring(taskOutputFile.indexOf(writerOutputDir.toString()) + writerOutputDir.toString().length() + 1); - Path publisherOutputPath = new Path(publisherOutputDir, pathSuffix); - WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(branchId), - publisherOutputPath.getParent(), this.permissions.get(branchId)); - - LOG.info(String.format("Moving %s to %s", taskOutputFile, publisherOutputPath)); - parallelRunner.movePath(taskOutputPath, this.publisherFileSystemByBranches.get(branchId), - publisherOutputPath, Optional. absent(), new CommitAction(workUnitState)); - } - } - - private ParallelRunner getParallelRunner(FileSystem fs) { - String uri = fs.getUri().toString(); - if (!this.parallelRunners.containsKey(uri)) { - this.parallelRunners.put(uri, this.closer.register(new ParallelRunner(this.parallelRunnerThreads, fs))); - } - return this.parallelRunners.get(uri); - } - private List getWorkUnitPublishGroup(WorkUnitState state) throws IOException { ImmutableList.Builder builder = new ImmutableList.Builder<>(); for (int branchId = 0; branchId < this.numBranches; branchId++) { diff --git a/gobblin-runtime/src/main/java/gobblin/runtime/TaskStateCollectorService.java b/gobblin-runtime/src/main/java/gobblin/runtime/TaskStateCollectorService.java index b2166fe8413..a13c3cfa1ee 100644 --- a/gobblin-runtime/src/main/java/gobblin/runtime/TaskStateCollectorService.java +++ b/gobblin-runtime/src/main/java/gobblin/runtime/TaskStateCollectorService.java @@ -135,11 +135,11 @@ public boolean accept(Path path) { } Queue taskStateQueue = Queues.newConcurrentLinkedQueue(); - try (ParallelRunner stateSerDeRunner = new ParallelRunner(stateSerDeRunnerThreads, this.fs)) { + try (ParallelRunner stateSerDeRunner = new ParallelRunner(stateSerDeRunnerThreads)) { for (FileStatus status : fileStatuses) { LOGGER.info("Found output task state file " + status.getPath()); // Deserialize the TaskState and delete the file - stateSerDeRunner.deserializeFromSequenceFile(Text.class, TaskState.class, status.getPath(), + stateSerDeRunner.deserializeFromSequenceFile(Text.class, TaskState.class, this.fs, status.getPath(), taskStateQueue, true); } } diff --git a/gobblin-runtime/src/main/java/gobblin/runtime/mapreduce/MRJobLauncher.java b/gobblin-runtime/src/main/java/gobblin/runtime/mapreduce/MRJobLauncher.java index b03610fdfce..a861e21f798 100644 --- a/gobblin-runtime/src/main/java/gobblin/runtime/mapreduce/MRJobLauncher.java +++ b/gobblin-runtime/src/main/java/gobblin/runtime/mapreduce/MRJobLauncher.java @@ -397,7 +397,7 @@ private Path prepareJobInput(List workUnits) throws IOException { Closer closer = Closer.create(); try { - ParallelRunner parallelRunner = closer.register(new ParallelRunner(this.parallelRunnerThreads, this.fs)); + ParallelRunner parallelRunner = closer.register(new ParallelRunner(this.parallelRunnerThreads)); // Open the job input file OutputStream os = closer.register(this.fs.create(jobInputFile)); @@ -417,7 +417,7 @@ private Path prepareJobInput(List workUnits) throws IOException { } Path workUnitFile = new Path(this.jobInputPath, workUnitFileName); - parallelRunner.serializeToFile(workUnit, workUnitFile); + parallelRunner.serializeToFile(workUnit, this.fs, workUnitFile); // Append the work unit file path to the job input file bw.write(workUnitFile.toUri().getPath() + "\n"); diff --git a/gobblin-utility/src/main/java/gobblin/util/JobLauncherUtils.java b/gobblin-utility/src/main/java/gobblin/util/JobLauncherUtils.java index 04f59d629e1..9e88b042a74 100644 --- a/gobblin-utility/src/main/java/gobblin/util/JobLauncherUtils.java +++ b/gobblin-utility/src/main/java/gobblin/util/JobLauncherUtils.java @@ -211,13 +211,13 @@ public static void cleanTaskStagingData(State state, Logger logger, Closer close Path stagingPath = WriterUtils.getWriterStagingDir(state, numBranches, branchId); if (fs.exists(stagingPath)) { logger.info("Cleaning up staging directory " + stagingPath.toUri().getPath()); - parallelRunner.deletePath(stagingPath, true); + parallelRunner.deletePath(fs, stagingPath, true); } Path outputPath = WriterUtils.getWriterOutputDir(state, numBranches, branchId); if (fs.exists(outputPath)) { logger.info("Cleaning up output directory " + outputPath.toUri().getPath()); - parallelRunner.deletePath(outputPath, true); + parallelRunner.deletePath(fs, outputPath, true); } } } @@ -252,7 +252,7 @@ private static ParallelRunner getParallelRunner(FileSystem fs, Closer closer, in Map parallelRunners) { String uriAndHomeDir = new Path(new Path(fs.getUri()), fs.getHomeDirectory()).toString(); if (!parallelRunners.containsKey(uriAndHomeDir)) { - parallelRunners.put(uriAndHomeDir, closer.register(new ParallelRunner(parallelRunnerThreads, fs))); + parallelRunners.put(uriAndHomeDir, closer.register(new ParallelRunner(parallelRunnerThreads))); } return parallelRunners.get(uriAndHomeDir); } diff --git a/gobblin-utility/src/main/java/gobblin/util/ParallelRunner.java b/gobblin-utility/src/main/java/gobblin/util/ParallelRunner.java index dc9cd4a8ac7..45276ba058f 100644 --- a/gobblin-utility/src/main/java/gobblin/util/ParallelRunner.java +++ b/gobblin-utility/src/main/java/gobblin/util/ParallelRunner.java @@ -16,7 +16,6 @@ import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -24,7 +23,7 @@ import java.util.concurrent.Future; import java.util.concurrent.locks.Lock; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableList; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; @@ -80,16 +79,14 @@ public class ParallelRunner implements Closeable { public static final int DEFAULT_PARALLEL_RUNNER_THREADS = 10; private final ExecutorService executor; - private final FileSystem fs; private final List> futures = Lists.newArrayList(); private final Striped locks = Striped.lazyWeakLock(Integer.MAX_VALUE); - public ParallelRunner(int threads, FileSystem fs) { + public ParallelRunner(int threads) { this.executor = Executors.newFixedThreadPool(threads, ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("ParallelRunner"))); - this.fs = fs; } /** @@ -101,16 +98,17 @@ public ParallelRunner(int threads, FileSystem fs) { *

* * @param state the {@link State} object to be serialized + * @param fileSystem the {@link FileSystem} to write the file to * @param outputFilePath the file to write the serialized {@link State} object to * @param the {@link State} object type */ - public void serializeToFile(final T state, final Path outputFilePath) { + public void serializeToFile(final T state, final FileSystem fileSystem, final Path outputFilePath) { // Use a Callable with a Void return type to allow exceptions to be thrown this.futures.add(this.executor.submit(new Callable() { @Override public Void call() throws Exception { - SerializationUtils.serializeState(fs, outputFilePath, state); + SerializationUtils.serializeState(fileSystem, outputFilePath, state); return null; } })); @@ -125,15 +123,16 @@ public Void call() throws Exception { *

* * @param state an empty {@link State} object to which the deserialized content will be populated + * @param fileSystem the {@link FileSystem} to read the file from * @param inputFilePath the input file to read from * @param the {@link State} object type */ - public void deserializeFromFile(final T state, final Path inputFilePath) { + public void deserializeFromFile(final T state, final FileSystem fileSystem, final Path inputFilePath) { this.futures.add(this.executor.submit(new Callable() { @Override public Void call() throws Exception { - SerializationUtils.deserializeState(fs, inputFilePath, state); + SerializationUtils.deserializeState(fileSystem, inputFilePath, state); return null; } })); @@ -148,20 +147,23 @@ public Void call() throws Exception { *

* * @param stateClass the {@link Class} object of the {@link State} class + * @param fileSystem the {@link FileSystem} to read the file from * @param inputFilePath the input {@link SequenceFile} to read from * @param states a {@link Collection} object to store the deserialized {@link State} objects * @param deleteAfter a flag telling whether to delete the {@link SequenceFile} afterwards * @param the {@link State} object type */ public void deserializeFromSequenceFile(final Class keyClass, - final Class stateClass, final Path inputFilePath, final Collection states, final boolean deleteAfter) { + final Class stateClass, final FileSystem fileSystem, final Path inputFilePath, final Collection states, + final boolean deleteAfter) { this.futures.add(this.executor.submit(new Callable() { @Override public Void call() throws Exception { Closer closer = Closer.create(); try { @SuppressWarnings("deprecation") - SequenceFile.Reader reader = closer.register(new SequenceFile.Reader(fs, inputFilePath, fs.getConf())); + SequenceFile.Reader reader = closer.register( + new SequenceFile.Reader(fileSystem, inputFilePath, fileSystem.getConf())); Writable key = keyClass.newInstance(); T state = stateClass.newInstance(); while (reader.next(key, state)) { @@ -170,7 +172,7 @@ public Void call() throws Exception { } if (deleteAfter) { - HadoopUtils.deletePath(fs, inputFilePath, false); + HadoopUtils.deletePath(fileSystem, inputFilePath, false); } } catch (Throwable t) { throw closer.rethrow(t); @@ -190,17 +192,18 @@ public Void call() throws Exception { * This method submits a task to delete a {@link Path} and returns immediately * after the task is submitted. *

- * + * @param fileSystem the {@link FileSystem} to delete the path from * @param path path to be deleted. + * @param recursive true if the delete is recursive; otherwise, false */ - public void deletePath(final Path path, final boolean recursive) { + public void deletePath(final FileSystem fileSystem, final Path path, final boolean recursive) { this.futures.add(this.executor.submit(new Callable() { @Override public Void call() throws Exception { Lock lock = locks.get(path.toString()); lock.lock(); try { - HadoopUtils.deletePath(fs, path, recursive); + HadoopUtils.deletePath(fileSystem, path, recursive); return null; } finally { lock.unlock(); @@ -217,24 +220,25 @@ public Void call() throws Exception { * after the task is submitted. *

* + * @param fileSystem the {@link FileSystem} where the rename will be done * @param src path to be renamed * @param dst new path after rename * @param group an optional group name for the destination path * - * @Deprecated Use {@link gobblin.util.ParallelRunner#movePath(Path, FileSystem, Path, Optional, Action)} + * @Deprecated Use {@link gobblin.util.ParallelRunner#movePath(FileSystem, Path, FileSystem, Path, Optional, Action)} */ @Deprecated - public void renamePath(final Path src, final Path dst, final Optional group) { + public void renamePath(final FileSystem fileSystem, final Path src, final Path dst, final Optional group) { this.futures.add(this.executor.submit(new Callable() { @Override public Void call() throws Exception { Lock lock = locks.get(src.toString()); lock.lock(); try { - if (fs.exists(src)) { - HadoopUtils.renamePath(fs, src, dst); + if (fileSystem.exists(src)) { + HadoopUtils.renamePath(fileSystem, src, dst); if (group.isPresent()) { - HadoopUtils.setGroup(fs, dst, group.get()); + HadoopUtils.setGroup(fileSystem, dst, group.get()); } } return null; @@ -256,15 +260,16 @@ public Void call() throws Exception { * after the task is submitted. *

* + * @param srcFs the source {@link FileSystem} * @param src path to be moved * @param dstFs the destination {@link FileSystem} * @param dst the destination path * @param group an optional group name for the destination path * @param commitAction an action to perform when the move completes successfully */ - public void movePath(final Path src, final FileSystem dstFs, final Path dst, final Optional group, + public void movePath(final FileSystem srcFs, final Path src, final FileSystem dstFs, final Path dst, final Optional group, final Action commitAction) { - movePaths(dstFs, ImmutableMap.of(src, dst), group, commitAction); + movePaths(ImmutableList.of(new MoveCommand(srcFs, src, dstFs, dst)), group, commitAction); } /** @@ -275,31 +280,27 @@ public void movePath(final Path src, final FileSystem dstFs, final Path dst, fin * after the task is submitted. *

* - * @param dstFs the destination {@link FileSystem} - * @param moves the set of src path to dst path pairs + * @param commands the set of move commands * @param group an optional group name for the destination path * @param commitAction an action to perform when the move completes successfully */ - public void movePaths(final FileSystem dstFs, final Map moves, final Optional group, - final Action commitAction) { + public void movePaths(final List commands, final Optional group, final Action commitAction) { this.futures.add(this.executor.submit(new Callable() { @Override public Void call() throws Exception { - for (Map.Entry move : moves.entrySet()) { - Path src = move.getKey(); - Path dst = move.getValue(); - log.info(String.format("Moving %s to %s", src, dst)); - Lock lock = locks.get(src.toString()); + for (MoveCommand command : commands) { + log.info(String.format("Moving %s to %s", command.src, command.dst)); + Lock lock = locks.get(command.src.toString()); lock.lock(); try { - if (fs.exists(src)) { - HadoopUtils.movePath(fs, src, dstFs, dst); + if (command.srcFs.exists(command.src)) { + HadoopUtils.movePath(command.srcFs, command.src, command.dstFs, command.dst); if (group.isPresent()) { - HadoopUtils.setGroup(dstFs, dst, group.get()); + HadoopUtils.setGroup(command.dstFs, command.dst, group.get()); } } } catch (FileAlreadyExistsException e) { - LOGGER.warn(String.format("Failed to move %s to %s: dst already exists", src, dst), e); + LOGGER.warn(String.format("Failed to move %s to %s: dst already exists", command.src, command.dst), e); return null; } finally { lock.unlock(); @@ -328,4 +329,18 @@ public void close() throws IOException { ExecutorsUtils.shutdownExecutorService(this.executor, Optional.of(LOGGER)); } } + + public static class MoveCommand { + private FileSystem srcFs; + private Path src; + private FileSystem dstFs; + private Path dst; + + public MoveCommand(FileSystem srcFs, Path src, FileSystem dstFs, Path dst) { + this.srcFs = srcFs; + this.src = src; + this.dstFs = dstFs; + this.dst = dst; + } + } } diff --git a/gobblin-utility/src/test/java/gobblin/util/ParallelRunnerTest.java b/gobblin-utility/src/test/java/gobblin/util/ParallelRunnerTest.java index f4242b61002..5dd949e95e0 100644 --- a/gobblin-utility/src/test/java/gobblin/util/ParallelRunnerTest.java +++ b/gobblin-utility/src/test/java/gobblin/util/ParallelRunnerTest.java @@ -19,7 +19,6 @@ import java.net.URISyntaxException; import java.util.Queue; -import com.google.common.collect.ImmutableMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -66,16 +65,16 @@ public void setUp() throws IOException { @Test public void testSerializeToFile() throws IOException { - try (ParallelRunner parallelRunner = new ParallelRunner(2, this.fs)) { + try (ParallelRunner parallelRunner = new ParallelRunner(2)) { WorkUnit workUnit1 = WorkUnit.createEmpty(); workUnit1.setProp("foo", "bar"); workUnit1.setProp("a", 10); - parallelRunner.serializeToFile(workUnit1, new Path(this.outputPath, "wu1")); + parallelRunner.serializeToFile(workUnit1, this.fs, new Path(this.outputPath, "wu1")); WorkUnit workUnit2 = WorkUnit.createEmpty(); workUnit2.setProp("foo", "baz"); workUnit2.setProp("b", 20); - parallelRunner.serializeToFile(workUnit2, new Path(this.outputPath, "wu2")); + parallelRunner.serializeToFile(workUnit2, this.fs, new Path(this.outputPath, "wu2")); } } @@ -84,9 +83,9 @@ public void testDeserializeFromFile() throws IOException { WorkUnit workUnit1 = WorkUnit.createEmpty(); WorkUnit workUnit2 = WorkUnit.createEmpty(); - try (ParallelRunner parallelRunner = new ParallelRunner(2, this.fs)) { - parallelRunner.deserializeFromFile(workUnit1, new Path(this.outputPath, "wu1")); - parallelRunner.deserializeFromFile(workUnit2, new Path(this.outputPath, "wu2")); + try (ParallelRunner parallelRunner = new ParallelRunner(2)) { + parallelRunner.deserializeFromFile(workUnit1, this.fs, new Path(this.outputPath, "wu1")); + parallelRunner.deserializeFromFile(workUnit2, this.fs, new Path(this.outputPath, "wu2")); } Assert.assertEquals(workUnit1.getPropertyNames().size(), 2); @@ -134,9 +133,9 @@ public void testDeserializeFromSequenceFile() throws IOException { Path seqPath1 = new Path(this.outputPath, "seq1"); Path seqPath2 = new Path(this.outputPath, "seq2"); - try (ParallelRunner parallelRunner = new ParallelRunner(2, this.fs)) { - parallelRunner.deserializeFromSequenceFile(Text.class, WorkUnitState.class, seqPath1, workUnitStates, true); - parallelRunner.deserializeFromSequenceFile(Text.class, WorkUnitState.class, seqPath2, workUnitStates, true); + try (ParallelRunner parallelRunner = new ParallelRunner(2)) { + parallelRunner.deserializeFromSequenceFile(Text.class, WorkUnitState.class, this.fs, seqPath1, workUnitStates, true); + parallelRunner.deserializeFromSequenceFile(Text.class, WorkUnitState.class, this.fs, seqPath2, workUnitStates, true); } Assert.assertFalse(this.fs.exists(seqPath1)); @@ -172,8 +171,8 @@ public void testMovePath() throws IOException, URISyntaxException { Mockito.when(fs2.getConf()).thenReturn(new Configuration()); Mockito.when(fs2.create(dst, false)).thenReturn(new FSDataOutputStream(actual, null)); - try (ParallelRunner parallelRunner = new ParallelRunner(1, fs1)) { - parallelRunner.movePaths(fs2, ImmutableMap.of(src, dst), Optional.absent(), null); + try (ParallelRunner parallelRunner = new ParallelRunner(1)) { + parallelRunner.movePath(fs1, src, fs2, dst, Optional.absent(), null); } Assert.assertEquals(actual.toString(), expected); diff --git a/gobblin-yarn/src/main/java/gobblin/yarn/GobblinHelixJobLauncher.java b/gobblin-yarn/src/main/java/gobblin/yarn/GobblinHelixJobLauncher.java index 9f490a09d9b..8d860029047 100644 --- a/gobblin-yarn/src/main/java/gobblin/yarn/GobblinHelixJobLauncher.java +++ b/gobblin-yarn/src/main/java/gobblin/yarn/GobblinHelixJobLauncher.java @@ -177,13 +177,13 @@ protected void executeCancellation() { private JobConfig.Builder createJob(List workUnits) throws IOException { Map taskConfigMap = Maps.newHashMap(); - try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) { + try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads)) { int multiTaskIdSequence = 0; for (WorkUnit workUnit : workUnits) { if (workUnit instanceof MultiWorkUnit) { workUnit.setId(JobLauncherUtils.newMultiTaskId(this.jobContext.getJobId(), multiTaskIdSequence++)); } - addWorkUnit(workUnit, stateSerDeRunner, taskConfigMap); + addWorkUnit(this.fs, workUnit, stateSerDeRunner, taskConfigMap); } Path jobStateFilePath = new Path(this.appWorkDir, this.jobContext.getJobId() + "." + JOB_STATE_FILE_NAME); @@ -218,10 +218,10 @@ private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws Excepti /** * Add a single {@link WorkUnit} (flattened). */ - private void addWorkUnit(WorkUnit workUnit, ParallelRunner stateSerDeRunner, - Map taskConfigMap) throws IOException { + private void addWorkUnit(FileSystem fileSystem, WorkUnit workUnit, ParallelRunner stateSerDeRunner, + Map taskConfigMap) throws IOException { String workUnitFilePath = persistWorkUnit( - new Path(this.inputWorkUnitDir, this.jobContext.getJobId()), workUnit, stateSerDeRunner); + fileSystem, new Path(this.inputWorkUnitDir, this.jobContext.getJobId()), workUnit, stateSerDeRunner); Map rawConfigMap = Maps.newHashMap(); rawConfigMap.put(GobblinYarnConfigurationKeys.WORK_UNIT_FILE_PATH, workUnitFilePath); @@ -236,12 +236,13 @@ private void addWorkUnit(WorkUnit workUnit, ParallelRunner stateSerDeRunner, /** * Persist a single {@link WorkUnit} (flattened) to a file. */ - private String persistWorkUnit(Path workUnitFileDir, WorkUnit workUnit, ParallelRunner stateSerDeRunner) + private String persistWorkUnit(FileSystem fileSystem, Path workUnitFileDir, WorkUnit workUnit, + ParallelRunner stateSerDeRunner) throws IOException { String workUnitFileName = workUnit.getId() + (workUnit instanceof MultiWorkUnit ? MULTI_WORK_UNIT_FILE_EXTENSION : WORK_UNIT_FILE_EXTENSION); Path workUnitFile = new Path(workUnitFileDir, workUnitFileName); - stateSerDeRunner.serializeToFile(workUnit, workUnitFile); + stateSerDeRunner.serializeToFile(workUnit, fileSystem, workUnitFile); return workUnitFile.toString(); } From 160e5f6ba1753583911d517f2651a00b5729b98f Mon Sep 17 00:00:00 2001 From: Joel Baranick Date: Wed, 23 Dec 2015 22:48:45 -0800 Subject: [PATCH 04/15] Reverted SourceState change --- .../src/main/java/gobblin/configuration/SourceState.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/gobblin-api/src/main/java/gobblin/configuration/SourceState.java b/gobblin-api/src/main/java/gobblin/configuration/SourceState.java index a7c9bd946d1..03104f04cd9 100644 --- a/gobblin-api/src/main/java/gobblin/configuration/SourceState.java +++ b/gobblin-api/src/main/java/gobblin/configuration/SourceState.java @@ -20,7 +20,6 @@ import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -52,7 +51,7 @@ */ public class SourceState extends State { - private static final Set EXTRACT_SET = Sets.newSetFromMap(new ConcurrentHashMap()); + private static final Set EXTRACT_SET = Sets.newConcurrentHashSet(); private static final DateTimeFormatter DTF = DateTimeFormat.forPattern("yyyyMMddHHmmss").withLocale(Locale.US).withZone(DateTimeZone.UTC); From b74418f28ae73a722c6a162bf1af315942fb7eba Mon Sep 17 00:00:00 2001 From: Joel Baranick Date: Wed, 23 Dec 2015 22:50:00 -0800 Subject: [PATCH 05/15] Reverted JobLauncherTestHelper change --- .../src/test/java/gobblin/runtime/JobLauncherTestHelper.java | 1 + 1 file changed, 1 insertion(+) diff --git a/gobblin-runtime/src/test/java/gobblin/runtime/JobLauncherTestHelper.java b/gobblin-runtime/src/test/java/gobblin/runtime/JobLauncherTestHelper.java index da097927ca7..69b0aa7afaa 100644 --- a/gobblin-runtime/src/test/java/gobblin/runtime/JobLauncherTestHelper.java +++ b/gobblin-runtime/src/test/java/gobblin/runtime/JobLauncherTestHelper.java @@ -294,6 +294,7 @@ public void runTestWithMultipleDatasetsAndFaultyExtractor(Properties jobProps, b Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED); Assert.assertEquals(datasetState.getTaskCount(), 1); TaskState taskState = datasetState.getTaskStates().get(0); + // BaseDataPublisher will change the state to COMMITTED Assert.assertEquals(taskState.getWorkingState(), WorkUnitState.WorkingState.COMMITTED); } else { // Task 0 should have failed From ce630361e5e4f428339fdfc2d54d1d2ac9114c95 Mon Sep 17 00:00:00 2001 From: Joel Baranick Date: Thu, 7 Jan 2016 16:33:02 -0800 Subject: [PATCH 06/15] Fixed import ordering --- .../src/main/java/gobblin/publisher/BaseDataPublisher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java index 341d6b30ffd..fc610af9d90 100644 --- a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java +++ b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; -import gobblin.util.Action; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -36,6 +35,7 @@ import gobblin.configuration.State; import gobblin.configuration.WorkUnitState; import gobblin.configuration.ConfigurationKeys; +import gobblin.util.Action; import gobblin.util.ForkOperatorUtils; import gobblin.util.ParallelRunner; import gobblin.util.WriterUtils; From 2a926dd997a9fa5709bcc06f97762ab051e43f90 Mon Sep 17 00:00:00 2001 From: Joel Baranick Date: Thu, 7 Jan 2016 16:33:58 -0800 Subject: [PATCH 07/15] Fixed import ordering --- .../src/main/java/gobblin/publisher/BaseDataPublisher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java index fc610af9d90..aef5673d825 100644 --- a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java +++ b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java @@ -19,8 +19,6 @@ import java.util.Objects; import java.util.Set; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -29,7 +27,9 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.common.io.Closer; import gobblin.configuration.State; From 49384c8394f1a040997a648067fd9db630300e6e Mon Sep 17 00:00:00 2001 From: Joel Baranick Date: Thu, 7 Jan 2016 17:04:45 -0800 Subject: [PATCH 08/15] Address some PR comments --- .../main/java/gobblin/publisher/BaseDataPublisher.java | 7 ++++--- .../src/main/java/gobblin/util/ParallelRunner.java | 10 +++++----- .../src/test/java/gobblin/util/ParallelRunnerTest.java | 2 +- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java index aef5673d825..27e2a7a28c4 100644 --- a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java +++ b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java @@ -174,7 +174,8 @@ public void publishData(Collection states) throws IOExc publisherFileSystem, publisherOutputPath)); } } - this.parallelRunner.movePaths(movesBuilder.build(), Optional.absent(), new CommitAction(state)); + this.parallelRunner.movePaths(movesBuilder.build(), Optional.absent(), + Optional.of(new CommitAction(state))); } } @@ -257,7 +258,7 @@ public void apply() { } } - private class WorkUnitPublishGroup { + private static class WorkUnitPublishGroup { private final int branchId; private final Path writerOutputDir; private final Path publisherOutputDir; @@ -283,7 +284,7 @@ public Path getPublisherOutputDir() { @Override public boolean equals(Object o) { if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (!(o instanceof WorkUnitPublishGroup)) return false; WorkUnitPublishGroup that = (WorkUnitPublishGroup) o; return Objects.equals(branchId, that.branchId) && diff --git a/gobblin-utility/src/main/java/gobblin/util/ParallelRunner.java b/gobblin-utility/src/main/java/gobblin/util/ParallelRunner.java index 45276ba058f..b2af50e3548 100644 --- a/gobblin-utility/src/main/java/gobblin/util/ParallelRunner.java +++ b/gobblin-utility/src/main/java/gobblin/util/ParallelRunner.java @@ -23,7 +23,6 @@ import java.util.concurrent.Future; import java.util.concurrent.locks.Lock; -import com.google.common.collect.ImmutableList; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; @@ -34,6 +33,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.io.Closer; import com.google.common.util.concurrent.Striped; @@ -268,7 +268,7 @@ public Void call() throws Exception { * @param commitAction an action to perform when the move completes successfully */ public void movePath(final FileSystem srcFs, final Path src, final FileSystem dstFs, final Path dst, final Optional group, - final Action commitAction) { + final Optional commitAction) { movePaths(ImmutableList.of(new MoveCommand(srcFs, src, dstFs, dst)), group, commitAction); } @@ -284,7 +284,7 @@ public void movePath(final FileSystem srcFs, final Path src, final FileSystem ds * @param group an optional group name for the destination path * @param commitAction an action to perform when the move completes successfully */ - public void movePaths(final List commands, final Optional group, final Action commitAction) { + public void movePaths(final List commands, final Optional group, final Optional commitAction) { this.futures.add(this.executor.submit(new Callable() { @Override public Void call() throws Exception { @@ -306,8 +306,8 @@ public Void call() throws Exception { lock.unlock(); } } - if (commitAction != null) { - commitAction.apply(); + if (commitAction.isPresent()) { + commitAction.get().apply(); } return null; } diff --git a/gobblin-utility/src/test/java/gobblin/util/ParallelRunnerTest.java b/gobblin-utility/src/test/java/gobblin/util/ParallelRunnerTest.java index 5dd949e95e0..00224d7813c 100644 --- a/gobblin-utility/src/test/java/gobblin/util/ParallelRunnerTest.java +++ b/gobblin-utility/src/test/java/gobblin/util/ParallelRunnerTest.java @@ -172,7 +172,7 @@ public void testMovePath() throws IOException, URISyntaxException { Mockito.when(fs2.create(dst, false)).thenReturn(new FSDataOutputStream(actual, null)); try (ParallelRunner parallelRunner = new ParallelRunner(1)) { - parallelRunner.movePath(fs1, src, fs2, dst, Optional.absent(), null); + parallelRunner.movePath(fs1, src, fs2, dst, Optional.absent(), Optional.absent()); } Assert.assertEquals(actual.toString(), expected); From b7cbad499e188ac385f92dec255a9c3112e58fbe Mon Sep 17 00:00:00 2001 From: Joel Baranick Date: Thu, 7 Jan 2016 17:06:03 -0800 Subject: [PATCH 09/15] Addressed comments --- gobblin-utility/src/main/java/gobblin/util/Action.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-utility/src/main/java/gobblin/util/Action.java b/gobblin-utility/src/main/java/gobblin/util/Action.java index 5ebb6395968..9cd3d203ab8 100644 --- a/gobblin-utility/src/main/java/gobblin/util/Action.java +++ b/gobblin-utility/src/main/java/gobblin/util/Action.java @@ -13,5 +13,5 @@ package gobblin.util; public interface Action { - public void apply(); + void apply() throws Exception; } From 4dcb9d1386e9bed60db34584319961ddb9b9187f Mon Sep 17 00:00:00 2001 From: Joel Baranick Date: Thu, 7 Jan 2016 17:09:47 -0800 Subject: [PATCH 10/15] Added comment --- gobblin-utility/src/main/java/gobblin/util/HadoopUtils.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/gobblin-utility/src/main/java/gobblin/util/HadoopUtils.java b/gobblin-utility/src/main/java/gobblin/util/HadoopUtils.java index 5cd4ff37577..c7ff4b21432 100644 --- a/gobblin-utility/src/main/java/gobblin/util/HadoopUtils.java +++ b/gobblin-utility/src/main/java/gobblin/util/HadoopUtils.java @@ -153,6 +153,10 @@ public static void movePath(FileSystem srcFs, Path src, FileSystem dstFs, Path d if (srcFs.getUri().equals(dstFs.getUri())) { renamePath(srcFs, src, dst); } else { + // Determining if the source filesystem in local will allow us to call moveFromLocalFile and give the + // destination filesystem the ability optimize the transfer. For example, when the source file is + // local the S3AFileSystem will skip buffering it into the fs.s3a.buffer.dir and immediately transfer the + // file to S3. boolean isSourceFileSystemLocal = srcFs instanceof LocalFileSystem; FileStatus srcStatus = srcFs.getFileStatus(src); if (srcStatus.isDir()) { From df25221030015994970157f288dafe4e427e40c3 Mon Sep 17 00:00:00 2001 From: Joel Baranick Date: Fri, 15 Jan 2016 09:32:00 -0800 Subject: [PATCH 11/15] Publish changes to improve HDFS performance. Publishing now happens in two stages. The first is to group all publish commands (source and destination path) by the source filesystem, source path root, destination filesystem, and destination path root. Then we walk over the groups. If the source and destination filesystems are equivalent and the filesystem supports atomically renaming a folder, we rename the source path root to the destination path root. Otherwise, we run a move for each publish command. NOTE: each publish command has an associated CommitAction which indicates the number of publishes which must succeed before the WorkUnitState is set to COMPLETE. --- .../gobblin/publisher/BaseDataPublisher.java | 278 ++++++++++++------ .../main/java/gobblin/util/HadoopUtils.java | 70 +++-- .../java/gobblin/util/ParallelRunner.java | 62 ++-- 3 files changed, 256 insertions(+), 154 deletions(-) diff --git a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java index 27e2a7a28c4..79fc92925ed 100644 --- a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java +++ b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java @@ -16,8 +16,10 @@ import java.net.URI; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -26,9 +28,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.common.io.Closer; @@ -37,10 +43,10 @@ import gobblin.configuration.ConfigurationKeys; import gobblin.util.Action; import gobblin.util.ForkOperatorUtils; +import gobblin.util.HadoopUtils; import gobblin.util.ParallelRunner; import gobblin.util.WriterUtils; - /** * A basic implementation of {@link SingleTaskDataPublisher} that publishes the data from the writer output directory * to the final output directory. @@ -129,73 +135,20 @@ public void publishData(WorkUnitState state) throws IOException { @Override public void publishData(Collection states) throws IOException { - Set preparedGroups = Sets.newHashSet(); - for (WorkUnitState state : states) { - ImmutableList.Builder movesBuilder = new ImmutableList.Builder<>(); - List groups = getWorkUnitPublishGroup(state); - for (WorkUnitPublishGroup group : groups) { - if (preparedGroups.add(group)) { - prepareWorkUnitPublishGroup(group); - } - boolean preserveFileName = state.getPropAsBoolean(ForkOperatorUtils.getPropertyNameForBranch( - ConfigurationKeys.SOURCE_FILEBASED_PRESERVE_FILE_NAME, this.numBranches, group.getBranchId()), false); - - String outputFilePropName = ForkOperatorUtils.getPropertyNameForBranch( - ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS, this.numBranches, group.getBranchId()); - - if (!state.contains(outputFilePropName)) { - LOG.warn("Missing property " + outputFilePropName + ". This task may have pulled no data."); - continue; - } - - Iterable taskOutputFiles = state.getPropAsList(outputFilePropName); - for (String taskOutputFile : taskOutputFiles) { - Path taskOutputPath = new Path(taskOutputFile); - FileSystem writerFileSystem = this.writerFileSystemByBranches.get(group.getBranchId()); - if (!writerFileSystem.exists(taskOutputPath)) { - LOG.warn("Task output file " + taskOutputFile + " doesn't exist."); - continue; - } - String pathSuffix; - if (preserveFileName) { - pathSuffix = state.getProp(ForkOperatorUtils.getPropertyNameForBranch( - ConfigurationKeys.DATA_PUBLISHER_FINAL_NAME, this.numBranches, group.getBranchId())); - } else { - pathSuffix = taskOutputFile.substring( - taskOutputFile.indexOf(group.getWriterOutputDir().toString()) + - group.getWriterOutputDir().toString().length() + 1); - } - Path publisherOutputPath = new Path(group.getPublisherOutputDir(), pathSuffix); - FileSystem publisherFileSystem = this.publisherFileSystemByBranches.get(group.getBranchId()); - WriterUtils.mkdirsWithRecursivePermission(publisherFileSystem, publisherOutputPath.getParent(), - this.permissions.get(group.getBranchId())); - - movesBuilder.add(new ParallelRunner.MoveCommand(writerFileSystem, taskOutputPath, - publisherFileSystem, publisherOutputPath)); + Multimap publishCommands = getGroupedPublishCommands(states); + for (Map.Entry> entry : publishCommands.asMap().entrySet()) { + PublishGroup group = entry.getKey(); + if (canPublishByRenamingFolder(group.getSourceFileSystem(), group.getDestinationFileSystem())) { + this.parallelRunner.renamePath(group.getSourceFileSystem(), group.getSourcePath(), + group.getDestinationPath(), Optional.absent(), + Optional.of(getCompositeCommitAction(entry.getValue()))); + } else { + for (PublishCommand command : entry.getValue()) { + this.parallelRunner.movePath(group.getSourceFileSystem(), command.getSrc(), + group.getDestinationFileSystem(), command.getDst(), Optional.absent(), + Optional.of(command.getCommitAction())); } } - this.parallelRunner.movePaths(movesBuilder.build(), Optional.absent(), - Optional.of(new CommitAction(state))); - } - } - - private void prepareWorkUnitPublishGroup(WorkUnitPublishGroup group) throws IOException { - if (this.publisherFileSystemByBranches.get(group.getBranchId()).exists(group.getPublisherOutputDir())) { - // The final output directory already exists, check if the job is configured to replace it. - // If publishSingleTaskData=true, final output directory is never replaced. - boolean replaceFinalOutputDir = this.getState().getPropAsBoolean( - ForkOperatorUtils.getPropertyNameForBranch( - ConfigurationKeys.DATA_PUBLISHER_REPLACE_FINAL_DIR, this.numBranches, group.getBranchId())); - - // If the final output directory is configured to be replaced, delete the existing publisher output directory - if (!replaceFinalOutputDir) { - LOG.info("Deleting publisher output dir " + group.getPublisherOutputDir()); - this.publisherFileSystemByBranches.get(group.getBranchId()).delete(group.getPublisherOutputDir(), true); - } - } else { - // Create the parent directory of the final output directory if it does not exist - WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(group.getBranchId()), - group.getPublisherOutputDir().getParent(), this.permissions.get(group.getBranchId())); } } @@ -225,76 +178,227 @@ protected Path getPublisherOutputDir(WorkUnitState workUnitState, int branchId) return WriterUtils.getDataPublisherFinalDir(workUnitState, this.numBranches, branchId); } - private List getWorkUnitPublishGroup(WorkUnitState state) throws IOException { - ImmutableList.Builder builder = new ImmutableList.Builder<>(); + private boolean canPublishByRenamingFolder(FileSystem srcFs, FileSystem dstFs) { + return HadoopUtils.areFileSystemsEquivalent(srcFs, dstFs) && HadoopUtils.isFolderRenameAtomic(srcFs); + } + + private Multimap getGroupedPublishCommands(Collection states) throws IOException { + Set preparedGroups = Sets.newHashSet(); + Multimap publishCommands = ArrayListMultimap.create(); + for (WorkUnitState state : states) { + CommitAction commitAction = null; + List groups = getPublishGroups(state); + for (PublishGroup group : groups) { + if (preparedGroups.add(group)) { + preparePublishGroup(group); + } + boolean preserveFileName = state.getPropAsBoolean(ForkOperatorUtils.getPropertyNameForBranch( + ConfigurationKeys.SOURCE_FILEBASED_PRESERVE_FILE_NAME, this.numBranches, group.getBranchId()), false); + + String outputFilePropName = ForkOperatorUtils.getPropertyNameForBranch( + ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS, this.numBranches, group.getBranchId()); + + if (!state.contains(outputFilePropName)) { + LOG.warn("Missing property " + outputFilePropName + ". This task may have pulled no data."); + continue; + } + + Iterable taskOutputFiles = state.getPropAsList(outputFilePropName); + for (String taskOutputFile : taskOutputFiles) { + Path taskOutputPath = new Path(taskOutputFile); + if (!group.getSourceFileSystem().exists(taskOutputPath)) { + LOG.warn("Task output file " + taskOutputFile + " doesn't exist."); + continue; + } + String pathSuffix; + if (preserveFileName) { + pathSuffix = state.getProp(ForkOperatorUtils.getPropertyNameForBranch( + ConfigurationKeys.DATA_PUBLISHER_FINAL_NAME, this.numBranches, group.getBranchId())); + } else { + pathSuffix = taskOutputFile.substring( + taskOutputFile.indexOf(group.getSourcePath().toString()) + + group.getSourcePath().toString().length() + 1); + } + Path publisherOutputPath = new Path(group.getDestinationPath(), pathSuffix); + WriterUtils.mkdirsWithRecursivePermission(group.getDestinationFileSystem(), + publisherOutputPath.getParent(), this.permissions.get(group.getBranchId())); + + if (commitAction == null) { + commitAction = new CommitAction(state); + } else { + commitAction.increment(); + } + + PublishCommand publishCommand = new PublishCommand(taskOutputPath, publisherOutputPath, commitAction); + publishCommands.put(group, publishCommand); + } + } + } + return publishCommands; + } + + private void preparePublishGroup(PublishGroup group) throws IOException { + if (group.getDestinationFileSystem().exists(group.getDestinationPath())) { + // The final output directory already exists, check if the job is configured to replace it. + // If publishSingleTaskData=true, final output directory is never replaced. + boolean replaceFinalOutputDir = this.getState().getPropAsBoolean( + ForkOperatorUtils.getPropertyNameForBranch( + ConfigurationKeys.DATA_PUBLISHER_REPLACE_FINAL_DIR, this.numBranches, group.getBranchId())); + + // If the final output directory is configured to be replaced, delete the existing publisher output directory + if (!replaceFinalOutputDir) { + LOG.info("Deleting publisher output dir " + group.getDestinationPath()); + group.getDestinationFileSystem().delete(group.getDestinationPath(), true); + } + } else { + // Create the parent directory of the final output directory if it does not exist + WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(group.getBranchId()), + group.getDestinationPath().getParent(), this.permissions.get(group.getBranchId())); + } + } + + private List getPublishGroups(WorkUnitState state) throws IOException { + ImmutableList.Builder builder = new ImmutableList.Builder<>(); for (int branchId = 0; branchId < this.numBranches; branchId++) { // The directory where the workUnitState wrote its output data. Path writerOutputDir = WriterUtils.getWriterOutputDir(state, this.numBranches, branchId); - if (!this.writerFileSystemByBranches.get(branchId).exists(writerOutputDir)) { + FileSystem writerFileSystem = this.writerFileSystemByBranches.get(branchId); + if (!writerFileSystem.exists(writerOutputDir)) { LOG.warn(String.format("Branch %d of WorkUnit %s produced no data", branchId, state.getId())); continue; } + FileSystem publishFileSystem = this.publisherFileSystemByBranches.get(branchId); // The directory where the final output directory for this job will be placed. // It is a combination of DATA_PUBLISHER_FINAL_DIR and WRITER_FILE_PATH. Path publisherOutputDir = getPublisherOutputDir(state, branchId); - builder.add(new WorkUnitPublishGroup(branchId, writerOutputDir, publisherOutputDir)); + builder.add(new PublishGroup(branchId, writerFileSystem, writerOutputDir, publishFileSystem, publisherOutputDir)); } return builder.build(); } + private CompositeCommitAction getCompositeCommitAction(Iterable publishCommands) { + return new CompositeCommitAction(Iterables.transform(publishCommands, + new Function() { + @Override + public CommitAction apply(PublishCommand publishCommand) { + return publishCommand.getCommitAction(); + } + })); + } + + private static class CompositeCommitAction implements Action { + private final Iterable commitActions; + + public CompositeCommitAction(Iterable commitActions) { + this.commitActions = commitActions; + } + + @Override + public void apply() throws Exception { + for (CommitAction commitAction : commitActions) { + commitAction.apply(); + } + } + } + private static class CommitAction implements Action { private final WorkUnitState state; + private AtomicInteger requiredSuccesses = new AtomicInteger(); public CommitAction(WorkUnitState state) { this.state = state; + this.requiredSuccesses.incrementAndGet(); } @Override public void apply() { - state.setWorkingState(WorkUnitState.WorkingState.COMMITTED); + if (requiredSuccesses.decrementAndGet() == 0) { + state.setWorkingState(WorkUnitState.WorkingState.COMMITTED); + } + } + + public void increment() { + this.requiredSuccesses.incrementAndGet(); } } - private static class WorkUnitPublishGroup { + private static class PublishGroup { private final int branchId; - private final Path writerOutputDir; - private final Path publisherOutputDir; + private final FileSystem srcFs; + private final Path srcPath; + private final FileSystem dstFs; + private final Path dstPath; - public WorkUnitPublishGroup(int branchId, Path writerOutputDir, Path publisherOutputDir) { + public PublishGroup(int branchId, FileSystem srcFs, Path srcPath, FileSystem dstFs, Path dstPath) { this.branchId = branchId; - this.writerOutputDir = writerOutputDir; - this.publisherOutputDir = publisherOutputDir; + this.srcFs = srcFs; + this.srcPath = srcPath; + this.dstFs = dstFs; + this.dstPath = dstPath; } public int getBranchId() { return branchId; } - public Path getWriterOutputDir() { - return writerOutputDir; + public FileSystem getSourceFileSystem() { + return srcFs; + } + + public Path getSourcePath() { + return srcPath; + } + + public FileSystem getDestinationFileSystem() { + return dstFs; } - public Path getPublisherOutputDir() { - return publisherOutputDir; + public Path getDestinationPath() { + return dstPath; } @Override public boolean equals(Object o) { if (this == o) return true; - if (!(o instanceof WorkUnitPublishGroup)) return false; + if (!(o instanceof PublishGroup)) return false; - WorkUnitPublishGroup that = (WorkUnitPublishGroup) o; - return Objects.equals(branchId, that.branchId) && - Objects.equals(writerOutputDir, that.writerOutputDir) && - Objects.equals(publisherOutputDir, that.publisherOutputDir); + PublishGroup that = (PublishGroup) o; + return Objects.equals(srcFs.getUri(), that.srcFs.getUri()) && + Objects.equals(srcPath, that.srcPath) && + Objects.equals(srcFs.getUri(), that.srcFs.getUri()) && + Objects.equals(dstPath, that.dstPath); } @Override public int hashCode() { - return Objects.hash(branchId, writerOutputDir, publisherOutputDir); + return Objects.hash(srcFs.getUri(), srcPath, dstFs.getUri(), dstPath); + } + } + + public static class PublishCommand { + private final Path src; + private final Path dst; + private final CommitAction commitAction; + + public PublishCommand(Path src, Path dst, CommitAction commitAction) { + this.src = src; + this.dst = dst; + this.commitAction = commitAction; + } + + public Path getSrc() { + return src; + } + + public Path getDst() { + return dst; + } + + public CommitAction getCommitAction() { + return commitAction; } } } diff --git a/gobblin-utility/src/main/java/gobblin/util/HadoopUtils.java b/gobblin-utility/src/main/java/gobblin/util/HadoopUtils.java index c7ff4b21432..7bb16aa1c0d 100644 --- a/gobblin-utility/src/main/java/gobblin/util/HadoopUtils.java +++ b/gobblin-utility/src/main/java/gobblin/util/HadoopUtils.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.List; import java.util.Map.Entry; +import java.util.Set; import lombok.extern.slf4j.Slf4j; @@ -40,6 +41,7 @@ import org.apache.hadoop.util.ReflectionUtils; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Lists; import com.google.common.io.BaseEncoding; import com.google.common.io.Closer; @@ -53,6 +55,12 @@ public class HadoopUtils { public static final String HDFS_ILLEGAL_TOKEN_REGEX = "[\\s:\\\\]"; + private static final Set SCHEMES_SUPPORTING_ATOMIC_FOLDER_RENAME = + ImmutableSortedSet + .orderedBy(String.CASE_INSENSITIVE_ORDER) + .add("hdfs") + .build(); + public static Configuration newConfiguration() { Configuration conf = new Configuration(); @@ -150,34 +158,17 @@ public static void copyPath(FileSystem fs, Path src, Path dst) throws IOExceptio * An {@link IOException} if {@link FileUtil#copy(FileSystem, Path, FileSystem, Path, boolean, Configuration)} returns false. */ public static void movePath(FileSystem srcFs, Path src, FileSystem dstFs, Path dst) throws IOException { - if (srcFs.getUri().equals(dstFs.getUri())) { - renamePath(srcFs, src, dst); + // Determining if the source filesystem in local will allow us to call moveFromLocalFile and give the + // destination filesystem the ability optimize the transfer. For example, when the source file is + // local the S3AFileSystem will skip buffering it into the fs.s3a.buffer.dir and immediately transfer the + // file to S3. + boolean isSourceFileSystemLocal = srcFs instanceof LocalFileSystem; + FileStatus srcStatus = srcFs.getFileStatus(src); + if (isSourceFileSystemLocal) { + dstFs.moveFromLocalFile(srcStatus.getPath(), dst); } else { - // Determining if the source filesystem in local will allow us to call moveFromLocalFile and give the - // destination filesystem the ability optimize the transfer. For example, when the source file is - // local the S3AFileSystem will skip buffering it into the fs.s3a.buffer.dir and immediately transfer the - // file to S3. - boolean isSourceFileSystemLocal = srcFs instanceof LocalFileSystem; - FileStatus srcStatus = srcFs.getFileStatus(src); - if (srcStatus.isDir()) { - for (FileStatus srcFile : FileListUtils.listFilesRecursively(srcFs, src)) { - if (isSourceFileSystemLocal) { - Path dstFile = new Path(dst, srcFile.getPath().getName()); - dstFs.moveFromLocalFile(srcFile.getPath(), dstFile); - } else { - if (!FileUtil.copy(srcFs, src, dstFs, dst, true, false, dstFs.getConf())) { - throw new IOException(String.format("Failed to move %s to %s", src, dst)); - } - } - } - } else { - if (isSourceFileSystemLocal) { - dstFs.moveFromLocalFile(srcStatus.getPath(), dst); - } else { - if (!FileUtil.copy(srcFs, src, dstFs, dst, true, false, dstFs.getConf())) { - throw new IOException(String.format("Failed to move %s to %s", src, dst)); - } - } + if (!FileUtil.copy(srcFs, src, dstFs, dst, true, false, dstFs.getConf())) { + throw new IOException(String.format("Failed to move %s to %s", src, dst)); } } } @@ -222,7 +213,7 @@ public static void renameRecursively(FileSystem fileSystem, Path from, Path to) Path toFilePath = new Path(to, relativeFilePath); if (!safeRenameIfNotExists(fileSystem, fromFile.getPath(), toFilePath)) { - if(fromFile.isDir()) { + if(fromFile.isDirectory()) { renameRecursively(fileSystem, fromFile.getPath(), toFilePath); } else { log.info(String.format("File already exists %s. Will not rewrite", toFilePath)); @@ -489,4 +480,27 @@ public static String sanitizePath(String path, String substitute) { public static Path sanitizePath(Path path, String substitute) { return new Path(sanitizePath(path.toString(), substitute)); } + + /** + * Returns {@code true} if the specified {@link FileSystem}s are equivalent; otherwise, {@code false}. + * + * @param left The left {@link FileSystem}. + * @param right the right {@link FileSystem}. + * @return A {@link boolean} indicating whether the specified {@link FileSystem}s are equivalent. + */ + public static boolean areFileSystemsEquivalent(FileSystem left, FileSystem right) { + return left.getUri().equals(right.getUri()); + } + + /** + * Returns {@code true} if the specified {@link FileSystem}s supports atomically renaming a + * folder; otherwise, {@code false}. + * + * @param fileSystem The {@link FileSystem} to check. + * @return A {@link boolean} indicating whether the specified {@link FileSystem}s supports atomic folder renames. + */ + public static boolean isFolderRenameAtomic(FileSystem fileSystem) { + String scheme = fileSystem.getUri().getScheme(); + return SCHEMES_SUPPORTING_ATOMIC_FOLDER_RENAME.contains(scheme); + } } diff --git a/gobblin-utility/src/main/java/gobblin/util/ParallelRunner.java b/gobblin-utility/src/main/java/gobblin/util/ParallelRunner.java index 4b721a5a6dc..0eb51fd797f 100644 --- a/gobblin-utility/src/main/java/gobblin/util/ParallelRunner.java +++ b/gobblin-utility/src/main/java/gobblin/util/ParallelRunner.java @@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.io.Closer; import com.google.common.util.concurrent.Striped; @@ -76,7 +75,7 @@ public class ParallelRunner implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(ParallelRunner.class); public static final String PARALLEL_RUNNER_THREADS_KEY = "parallel.runner.threads"; - public static final int DEFAULT_PARALLEL_RUNNER_THREADS = 10; + public static final int DEFAULT_PARALLEL_RUNNER_THREADS = Runtime.getRuntime().availableProcessors(); private final ExecutorService executor; @@ -224,11 +223,11 @@ public Void call() throws Exception { * @param src path to be renamed * @param dst new path after rename * @param group an optional group name for the destination path + * @param commitAction an action to perform when the rename completes successfully * - * @Deprecated Use {@link gobblin.util.ParallelRunner#movePath(FileSystem, Path, FileSystem, Path, Optional, Action)} */ - @Deprecated - public void renamePath(final FileSystem fileSystem, final Path src, final Path dst, final Optional group) { + public void renamePath(final FileSystem fileSystem, final Path src, final Path dst, final Optional group, + final Optional commitAction) { this.futures.add(this.executor.submit(new Callable() { @Override public Void call() throws Exception { @@ -241,13 +240,16 @@ public Void call() throws Exception { HadoopUtils.setGroup(fileSystem, dst, group.get()); } } - return null; } catch (FileAlreadyExistsException e) { LOGGER.warn(String.format("Failed to rename %s to %s: dst already exists", src, dst), e); return null; } finally { lock.unlock(); } + if (commitAction.isPresent()) { + commitAction.get().apply(); + } + return null; } })); } @@ -267,44 +269,26 @@ public Void call() throws Exception { * @param group an optional group name for the destination path * @param commitAction an action to perform when the move completes successfully */ - public void movePath(final FileSystem srcFs, final Path src, final FileSystem dstFs, final Path dst, final Optional group, - final Optional commitAction) { - movePaths(ImmutableList.of(new MoveCommand(srcFs, src, dstFs, dst)), group, commitAction); - } - - /** - * Move a set of {@link Path}. - * - *

- * This method submits a task to move a set of {@link Path} and returns immediately - * after the task is submitted. - *

- * - * @param commands the set of move commands - * @param group an optional group name for the destination path - * @param commitAction an action to perform when the move completes successfully - */ - public void movePaths(final List commands, final Optional group, final Optional commitAction) { + public void movePath(final FileSystem srcFs, final Path src, final FileSystem dstFs, final Path dst, + final Optional group, final Optional commitAction) { this.futures.add(this.executor.submit(new Callable() { @Override public Void call() throws Exception { - for (MoveCommand command : commands) { - log.info(String.format("Moving %s to %s", command.src, command.dst)); - Lock lock = locks.get(command.src.toString()); - lock.lock(); - try { - if (command.srcFs.exists(command.src)) { - HadoopUtils.movePath(command.srcFs, command.src, command.dstFs, command.dst); - if (group.isPresent()) { - HadoopUtils.setGroup(command.dstFs, command.dst, group.get()); - } + log.info(String.format("Moving %s to %s", src, dst)); + Lock lock = locks.get(src.toString()); + lock.lock(); + try { + if (srcFs.exists(src)) { + HadoopUtils.movePath(srcFs, src, dstFs, dst); + if (group.isPresent()) { + HadoopUtils.setGroup(dstFs, dst, group.get()); } - } catch (FileAlreadyExistsException e) { - LOGGER.warn(String.format("Failed to move %s to %s: dst already exists", command.src, command.dst), e); - return null; - } finally { - lock.unlock(); } + } catch (FileAlreadyExistsException e) { + LOGGER.warn(String.format("Failed to move %s to %s: dst already exists", src, dst), e); + return null; + } finally { + lock.unlock(); } if (commitAction.isPresent()) { commitAction.get().apply(); From ae62e35021d759e2d69ba8b61f584b483f7ad6fe Mon Sep 17 00:00:00 2001 From: Joel Baranick Date: Fri, 15 Jan 2016 12:21:35 -0800 Subject: [PATCH 12/15] Reverted change from isDir to isDirectory as isDirectory is causing build failures --- gobblin-utility/src/main/java/gobblin/util/HadoopUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-utility/src/main/java/gobblin/util/HadoopUtils.java b/gobblin-utility/src/main/java/gobblin/util/HadoopUtils.java index 342f3e8a021..b94daa268ca 100644 --- a/gobblin-utility/src/main/java/gobblin/util/HadoopUtils.java +++ b/gobblin-utility/src/main/java/gobblin/util/HadoopUtils.java @@ -213,7 +213,7 @@ public static void renameRecursively(FileSystem fileSystem, Path from, Path to) Path toFilePath = new Path(to, relativeFilePath); if (!safeRenameIfNotExists(fileSystem, fromFile.getPath(), toFilePath)) { - if(fromFile.isDirectory()) { + if(fromFile.isDir()) { renameRecursively(fileSystem, fromFile.getPath(), toFilePath); } else { log.info(String.format("File already exists %s. Will not rewrite", toFilePath)); From c7ab44311dcb51319d7e63e773a98ece82e609e7 Mon Sep 17 00:00:00 2001 From: Joel Baranick Date: Fri, 15 Jan 2016 13:49:09 -0800 Subject: [PATCH 13/15] Ensure WorkUnits without results are marked as committed. --- .../main/java/gobblin/publisher/BaseDataPublisher.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java index b6a3ce1cba7..9c262981e88 100644 --- a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java +++ b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java @@ -233,6 +233,9 @@ private Multimap getGroupedPublishCommands(Collect publishCommands.put(group, publishCommand); } } + if (commitAction == null) { + markWorkUnitCommitted(state); + } } return publishCommands; } @@ -289,6 +292,10 @@ public CommitAction apply(PublishCommand publishCommand) { })); } + private static void markWorkUnitCommitted(WorkUnitState state) { + state.setWorkingState(WorkUnitState.WorkingState.COMMITTED); + } + private static class CompositeCommitAction implements Action { private final Iterable commitActions; @@ -316,7 +323,7 @@ public CommitAction(WorkUnitState state) { @Override public void apply() { if (requiredSuccesses.decrementAndGet() == 0) { - state.setWorkingState(WorkUnitState.WorkingState.COMMITTED); + markWorkUnitCommitted(state); } } From 9c7df9bba0f13bfada07d9b215aa6444acc6d4f4 Mon Sep 17 00:00:00 2001 From: Joel Baranick Date: Tue, 19 Jan 2016 21:08:30 -0800 Subject: [PATCH 14/15] Address some comments --- .../gobblin/publisher/BaseDataPublisher.java | 72 +++++++++---------- .../java/gobblin/util/CompositeAction.java | 16 +++++ 2 files changed, 49 insertions(+), 39 deletions(-) create mode 100644 gobblin-utility/src/main/java/gobblin/util/CompositeAction.java diff --git a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java index 9c262981e88..7c2e544f203 100644 --- a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java +++ b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java @@ -42,6 +42,7 @@ import gobblin.configuration.WorkUnitState; import gobblin.configuration.ConfigurationKeys; import gobblin.util.Action; +import gobblin.util.CompositeAction; import gobblin.util.ForkOperatorUtils; import gobblin.util.HadoopUtils; import gobblin.util.ParallelRunner; @@ -141,12 +142,12 @@ public void publishData(Collection states) throws IOExc if (canPublishByRenamingFolder(group.getSourceFileSystem(), group.getDestinationFileSystem())) { this.parallelRunner.renamePath(group.getSourceFileSystem(), group.getSourcePath(), group.getDestinationPath(), Optional.absent(), - Optional.of(getCompositeCommitAction(entry.getValue()))); + Optional.of(getCompositeAction(entry.getValue()))); } else { for (PublishCommand command : entry.getValue()) { this.parallelRunner.movePath(group.getSourceFileSystem(), command.getSrc(), group.getDestinationFileSystem(), command.getDst(), Optional.absent(), - Optional.of(command.getCommitAction())); + Optional.of(command.getAction())); } } } @@ -186,7 +187,7 @@ private Multimap getGroupedPublishCommands(Collect Set preparedGroups = Sets.newHashSet(); Multimap publishCommands = ArrayListMultimap.create(); for (WorkUnitState state : states) { - CommitAction commitAction = null; + MarkWorkUnitCommittedAction markWorkUnitCommittedAction = null; List groups = getPublishGroups(state); for (PublishGroup group : groups) { if (preparedGroups.add(group)) { @@ -223,17 +224,17 @@ private Multimap getGroupedPublishCommands(Collect WriterUtils.mkdirsWithRecursivePermission(group.getDestinationFileSystem(), publisherOutputPath.getParent(), this.permissions.get(group.getBranchId())); - if (commitAction == null) { - commitAction = new CommitAction(state); - } else { - commitAction.increment(); + if (markWorkUnitCommittedAction == null) { + markWorkUnitCommittedAction = new MarkWorkUnitCommittedAction(state); } + markWorkUnitCommittedAction.requireSuccessfulPublish(); - PublishCommand publishCommand = new PublishCommand(taskOutputPath, publisherOutputPath, commitAction); + PublishCommand publishCommand = new PublishCommand(taskOutputPath, publisherOutputPath, + markWorkUnitCommittedAction); publishCommands.put(group, publishCommand); } } - if (commitAction == null) { + if (markWorkUnitCommittedAction == null) { markWorkUnitCommitted(state); } } @@ -282,12 +283,12 @@ private List getPublishGroups(WorkUnitState state) throws IOExcept return builder.build(); } - private CompositeCommitAction getCompositeCommitAction(Iterable publishCommands) { - return new CompositeCommitAction(Iterables.transform(publishCommands, - new Function() { + private CompositeAction getCompositeAction(Iterable publishCommands) { + return new CompositeAction(Iterables.transform(publishCommands, + new Function() { @Override - public CommitAction apply(PublishCommand publishCommand) { - return publishCommand.getCommitAction(); + public Action apply(PublishCommand publishCommand) { + return publishCommand.getAction(); } })); } @@ -296,38 +297,31 @@ private static void markWorkUnitCommitted(WorkUnitState state) { state.setWorkingState(WorkUnitState.WorkingState.COMMITTED); } - private static class CompositeCommitAction implements Action { - private final Iterable commitActions; - - public CompositeCommitAction(Iterable commitActions) { - this.commitActions = commitActions; - } - - @Override - public void apply() throws Exception { - for (CommitAction commitAction : commitActions) { - commitAction.apply(); - } - } - } - - private static class CommitAction implements Action { + /** + * An implementation of {@link Action} which sets the specified {@link WorkUnitState} to + * COMMITTED when the required files have been published. + */ + private static class MarkWorkUnitCommittedAction implements Action { private final WorkUnitState state; private AtomicInteger requiredSuccesses = new AtomicInteger(); - public CommitAction(WorkUnitState state) { + public MarkWorkUnitCommittedAction(WorkUnitState state) { this.state = state; - this.requiredSuccesses.incrementAndGet(); } @Override public void apply() { - if (requiredSuccesses.decrementAndGet() == 0) { + if (requiredSuccesses.decrementAndGet() <= 0) { markWorkUnitCommitted(state); } } - public void increment() { + /** + * Requires that a file be successfully published before this {@link WorkUnitState} will be set + * to COMMITTED. This method can be called multiple times to indicate that multiple files must be + * successfully published. + */ + public void requireSuccessfulPublish() { this.requiredSuccesses.incrementAndGet(); } } @@ -388,12 +382,12 @@ public int hashCode() { public static class PublishCommand { private final Path src; private final Path dst; - private final CommitAction commitAction; + private final Action action; - public PublishCommand(Path src, Path dst, CommitAction commitAction) { + public PublishCommand(Path src, Path dst, Action action) { this.src = src; this.dst = dst; - this.commitAction = commitAction; + this.action = action; } public Path getSrc() { @@ -404,8 +398,8 @@ public Path getDst() { return dst; } - public CommitAction getCommitAction() { - return commitAction; + public Action getAction() { + return action; } } } diff --git a/gobblin-utility/src/main/java/gobblin/util/CompositeAction.java b/gobblin-utility/src/main/java/gobblin/util/CompositeAction.java new file mode 100644 index 00000000000..2e21adf2e08 --- /dev/null +++ b/gobblin-utility/src/main/java/gobblin/util/CompositeAction.java @@ -0,0 +1,16 @@ +package gobblin.util; + +public class CompositeAction implements Action { + private final Iterable actions; + + public CompositeAction(Iterable actions) { + this.actions = actions; + } + + @Override + public void apply() throws Exception { + for (Action action : actions) { + action.apply(); + } + } +} From 38c1d8c82962332f24670c1275522ab5612ad35b Mon Sep 17 00:00:00 2001 From: Joel Baranick Date: Mon, 25 Jan 2016 12:01:21 -0500 Subject: [PATCH 15/15] Temp changes --- .../gobblin/publisher/BaseDataPublisher.java | 114 +++++++++--------- 1 file changed, 60 insertions(+), 54 deletions(-) diff --git a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java index 7c2e544f203..676bd3cbdd2 100644 --- a/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java +++ b/gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java @@ -131,18 +131,22 @@ public void close() throws IOException { @Override public void publishData(WorkUnitState state) throws IOException { - publishData(ImmutableList.of(state)); + publishData(ImmutableList.of(state), false); } @Override public void publishData(Collection states) throws IOException { - Multimap publishCommands = getGroupedPublishCommands(states); + publishData(states, true); + } + + private void publishData(Collection states, boolean supportsReplaceFinalDir) throws IOException { + Multimap publishCommands = getGroupedPublishCommands(states, supportsReplaceFinalDir); for (Map.Entry> entry : publishCommands.asMap().entrySet()) { PublishGroup group = entry.getKey(); if (canPublishByRenamingFolder(group.getSourceFileSystem(), group.getDestinationFileSystem())) { this.parallelRunner.renamePath(group.getSourceFileSystem(), group.getSourcePath(), - group.getDestinationPath(), Optional.absent(), - Optional.of(getCompositeAction(entry.getValue()))); + group.getDestinationPath(), Optional.absent(), + Optional.of(getCompositeAction(entry.getValue()))); } else { for (PublishCommand command : entry.getValue()) { this.parallelRunner.movePath(group.getSourceFileSystem(), command.getSrc(), @@ -183,65 +187,67 @@ private boolean canPublishByRenamingFolder(FileSystem srcFs, FileSystem dstFs) { return HadoopUtils.areFileSystemsEquivalent(srcFs, dstFs) && HadoopUtils.isFolderRenameAtomic(srcFs); } - private Multimap getGroupedPublishCommands(Collection states) throws IOException { - Set preparedGroups = Sets.newHashSet(); - Multimap publishCommands = ArrayListMultimap.create(); - for (WorkUnitState state : states) { - MarkWorkUnitCommittedAction markWorkUnitCommittedAction = null; - List groups = getPublishGroups(state); - for (PublishGroup group : groups) { - if (preparedGroups.add(group)) { - preparePublishGroup(group); - } - boolean preserveFileName = state.getPropAsBoolean(ForkOperatorUtils.getPropertyNameForBranch( - ConfigurationKeys.SOURCE_FILEBASED_PRESERVE_FILE_NAME, this.numBranches, group.getBranchId()), false); + private Multimap getGroupedPublishCommands(Collection states, + boolean supportsReplaceFinalDir) + throws IOException { + Set preparedGroups = Sets.newHashSet(); + Multimap publishCommands = ArrayListMultimap.create(); + for (WorkUnitState state : states) { + MarkWorkUnitCommittedAction markWorkUnitCommittedAction = null; + List groups = getPublishGroups(state); + for (PublishGroup group : groups) { + if (preparedGroups.add(group)) { + preparePublishGroup(group, supportsReplaceFinalDir); + } + boolean preserveFileName = state.getPropAsBoolean(ForkOperatorUtils.getPropertyNameForBranch( + ConfigurationKeys.SOURCE_FILEBASED_PRESERVE_FILE_NAME, this.numBranches, group.getBranchId()), false); + + String outputFilePropName = ForkOperatorUtils.getPropertyNameForBranch( + ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS, this.numBranches, group.getBranchId()); - String outputFilePropName = ForkOperatorUtils.getPropertyNameForBranch( - ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS, this.numBranches, group.getBranchId()); + if (!state.contains(outputFilePropName)) { + LOG.warn("Missing property " + outputFilePropName + ". This task may have pulled no data."); + continue; + } - if (!state.contains(outputFilePropName)) { - LOG.warn("Missing property " + outputFilePropName + ". This task may have pulled no data."); + Iterable taskOutputFiles = state.getPropAsList(outputFilePropName); + for (String taskOutputFile : taskOutputFiles) { + Path taskOutputPath = new Path(taskOutputFile); + if (!group.getSourceFileSystem().exists(taskOutputPath)) { + LOG.warn("Task output file " + taskOutputFile + " doesn't exist."); continue; } + String pathSuffix; + if (preserveFileName) { + pathSuffix = state.getProp(ForkOperatorUtils.getPropertyNameForBranch( + ConfigurationKeys.DATA_PUBLISHER_FINAL_NAME, this.numBranches, group.getBranchId())); + } else { + pathSuffix = taskOutputFile.substring( + taskOutputFile.indexOf(group.getSourcePath().toString()) + + group.getSourcePath().toString().length() + 1); + } + Path publisherOutputPath = new Path(group.getDestinationPath(), pathSuffix); + WriterUtils.mkdirsWithRecursivePermission(group.getDestinationFileSystem(), + publisherOutputPath.getParent(), this.permissions.get(group.getBranchId())); - Iterable taskOutputFiles = state.getPropAsList(outputFilePropName); - for (String taskOutputFile : taskOutputFiles) { - Path taskOutputPath = new Path(taskOutputFile); - if (!group.getSourceFileSystem().exists(taskOutputPath)) { - LOG.warn("Task output file " + taskOutputFile + " doesn't exist."); - continue; - } - String pathSuffix; - if (preserveFileName) { - pathSuffix = state.getProp(ForkOperatorUtils.getPropertyNameForBranch( - ConfigurationKeys.DATA_PUBLISHER_FINAL_NAME, this.numBranches, group.getBranchId())); - } else { - pathSuffix = taskOutputFile.substring( - taskOutputFile.indexOf(group.getSourcePath().toString()) + - group.getSourcePath().toString().length() + 1); - } - Path publisherOutputPath = new Path(group.getDestinationPath(), pathSuffix); - WriterUtils.mkdirsWithRecursivePermission(group.getDestinationFileSystem(), - publisherOutputPath.getParent(), this.permissions.get(group.getBranchId())); - - if (markWorkUnitCommittedAction == null) { - markWorkUnitCommittedAction = new MarkWorkUnitCommittedAction(state); - } - markWorkUnitCommittedAction.requireSuccessfulPublish(); - - PublishCommand publishCommand = new PublishCommand(taskOutputPath, publisherOutputPath, - markWorkUnitCommittedAction); - publishCommands.put(group, publishCommand); + if (markWorkUnitCommittedAction == null) { + markWorkUnitCommittedAction = new MarkWorkUnitCommittedAction(state); } - } - if (markWorkUnitCommittedAction == null) { - markWorkUnitCommitted(state); + markWorkUnitCommittedAction.requireSuccessfulPublish(); + + PublishCommand publishCommand = new PublishCommand(taskOutputPath, publisherOutputPath, + markWorkUnitCommittedAction); + publishCommands.put(group, publishCommand); } } - return publishCommands; + if (markWorkUnitCommittedAction == null) { + markWorkUnitCommitted(state); + } + } + return publishCommands; } - private void preparePublishGroup(PublishGroup group) throws IOException { + private void preparePublishGroup(PublishGroup group, boolean supportsReplaceFinalDir) throws IOException { if (group.getDestinationFileSystem().exists(group.getDestinationPath())) { // The final output directory already exists, check if the job is configured to replace it. // If publishSingleTaskData=true, final output directory is never replaced. @@ -250,7 +256,7 @@ private void preparePublishGroup(PublishGroup group) throws IOException { ConfigurationKeys.DATA_PUBLISHER_REPLACE_FINAL_DIR, this.numBranches, group.getBranchId())); // If the final output directory is configured to be replaced, delete the existing publisher output directory - if (!replaceFinalOutputDir) { + if (supportsReplaceFinalDir && replaceFinalOutputDir) { LOG.info("Deleting publisher output dir " + group.getDestinationPath()); group.getDestinationFileSystem().delete(group.getDestinationPath(), true); }