Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datapublisher mark workunits committed when actually published #11

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
391 changes: 245 additions & 146 deletions gobblin-core/src/main/java/gobblin/publisher/BaseDataPublisher.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,12 @@

package gobblin.publisher;

import com.google.common.base.Optional;
import java.io.IOException;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import gobblin.configuration.State;
import gobblin.configuration.WorkUnitState;
import gobblin.util.FileListUtils;
import gobblin.util.ParallelRunner;
import gobblin.util.WriterUtils;


/**
Expand All @@ -43,30 +36,4 @@ public class TimePartitionedDataPublisher extends BaseDataPublisher {
public TimePartitionedDataPublisher(State state) throws IOException {
super(state);
}

/**
* This method needs to be overridden for TimePartitionedDataPublisher, since the output folder structure
* contains timestamp, we have to move the files recursively.
*
* For example, move {writerOutput}/2015/04/08/15/output.avro to {publisherOutput}/2015/04/08/15/output.avro
*/
@Override
protected void addWriterOutputToExistingDir(Path writerOutput, Path publisherOutput, WorkUnitState workUnitState,
int branchId, ParallelRunner parallelRunner) throws IOException {

for (FileStatus status : FileListUtils.listFilesRecursively(this.writerFileSystemByBranches.get(branchId),
writerOutput)) {
String filePathStr = status.getPath().toString();
String pathSuffix =
filePathStr.substring(filePathStr.indexOf(writerOutput.toString()) + writerOutput.toString().length() + 1);
Path outputPath = new Path(publisherOutput, pathSuffix);

WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(branchId), outputPath.getParent(),
this.permissions.get(branchId));

LOG.info(String.format("Moving %s to %s", status.getPath(), outputPath));
parallelRunner.movePath(status.getPath(), this.publisherFileSystemByBranches.get(branchId),
outputPath, Optional.<String> absent());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.ArrayList;
import java.util.List;

import gobblin.util.FileListUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -79,19 +80,8 @@ public List<String> ls(String path) throws FileBasedHelperException {
}

public void lsr(Path p, List<String> results) throws IOException {
if (!this.fs.getFileStatus(p).isDir()) {
results.add(p.toString());
}
Path qualifiedPath = this.fs.makeQualified(p);
for (FileStatus status : this.fs.listStatus(p)) {
if (status.isDir()) {
// Fix for hadoop issue: https://issues.apache.org/jira/browse/HADOOP-12169
if (!qualifiedPath.equals(status.getPath())) {
lsr(status.getPath(), results);
}
} else {
results.add(status.getPath().toString());
}
for (FileStatus fileStatus : FileListUtils.listFilesRecursively(this.fs, p)) {
results.add(fileStatus.getPath().toString());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,11 @@ public boolean accept(Path path) {
}

Queue<TaskState> taskStateQueue = Queues.newConcurrentLinkedQueue();
try (ParallelRunner stateSerDeRunner = new ParallelRunner(stateSerDeRunnerThreads, this.fs)) {
try (ParallelRunner stateSerDeRunner = new ParallelRunner(stateSerDeRunnerThreads)) {
for (FileStatus status : fileStatuses) {
LOGGER.info("Found output task state file " + status.getPath());
// Deserialize the TaskState and delete the file
stateSerDeRunner.deserializeFromSequenceFile(Text.class, TaskState.class, status.getPath(),
stateSerDeRunner.deserializeFromSequenceFile(Text.class, TaskState.class, this.fs, status.getPath(),
taskStateQueue, true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ private Path prepareJobInput(List<WorkUnit> workUnits) throws IOException {

Closer closer = Closer.create();
try {
ParallelRunner parallelRunner = closer.register(new ParallelRunner(this.parallelRunnerThreads, this.fs));
ParallelRunner parallelRunner = closer.register(new ParallelRunner(this.parallelRunnerThreads));

// Open the job input file
OutputStream os = closer.register(this.fs.create(jobInputFile));
Expand All @@ -419,7 +419,7 @@ private Path prepareJobInput(List<WorkUnit> workUnits) throws IOException {
}
Path workUnitFile = new Path(this.jobInputPath, workUnitFileName);

parallelRunner.serializeToFile(workUnit, workUnitFile);
parallelRunner.serializeToFile(workUnit, this.fs, workUnitFile);

// Append the work unit file path to the job input file
bw.write(workUnitFile.toUri().getPath() + "\n");
Expand Down
17 changes: 17 additions & 0 deletions gobblin-utility/src/main/java/gobblin/util/Action.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (C) 2014-2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied.
*/

package gobblin.util;

public interface Action {
void apply() throws Exception;
}
16 changes: 16 additions & 0 deletions gobblin-utility/src/main/java/gobblin/util/CompositeAction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package gobblin.util;

public class CompositeAction implements Action {
private final Iterable<Action> actions;

public CompositeAction(Iterable<Action> actions) {
this.actions = actions;
}

@Override
public void apply() throws Exception {
for (Action action : actions) {
action.apply();
}
}
}
18 changes: 15 additions & 3 deletions gobblin-utility/src/main/java/gobblin/util/FileListUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,13 @@ public static List<FileStatus> listFilesRecursively(FileSystem fs, Path path, Pa
@SuppressWarnings("deprecation")
private static List<FileStatus> listFilesRecursivelyHelper(FileSystem fs, List<FileStatus> files,
FileStatus fileStatus, PathFilter fileFilter) throws FileNotFoundException, IOException {
Path qualifiedPath = fs.makeQualified(fileStatus.getPath());
if (fileStatus.isDir()) {
for (FileStatus status : fs.listStatus(fileStatus.getPath())) {
listFilesRecursivelyHelper(fs, files, status, fileFilter);
// Fix for hadoop issue: https://issues.apache.org/jira/browse/HADOOP-12169
if (!qualifiedPath.equals(status.getPath())) {
listFilesRecursivelyHelper(fs, files, status, fileFilter);
}
}
} else if (fileFilter.accept(fileStatus.getPath())) {
files.add(fileStatus);
Expand Down Expand Up @@ -87,12 +91,16 @@ public static List<FileStatus> listMostNestedPathRecursively(FileSystem fs, Path
private static List<FileStatus> listMostNestedPathRecursivelyHelper(FileSystem fs, List<FileStatus> files,
FileStatus fileStatus, PathFilter fileFilter) throws FileNotFoundException, IOException {
if (fileStatus.isDir()) {
Path qualifiedPath = fs.makeQualified(fileStatus.getPath());
FileStatus[] curFileStatus = fs.listStatus(fileStatus.getPath());
if (ArrayUtils.isEmpty(curFileStatus)) {
files.add(fileStatus);
} else {
for (FileStatus status : curFileStatus) {
listMostNestedPathRecursivelyHelper(fs, files, status, fileFilter);
// Fix for hadoop issue: https://issues.apache.org/jira/browse/HADOOP-12169
if (!qualifiedPath.equals(status.getPath())) {
listMostNestedPathRecursivelyHelper(fs, files, status, fileFilter);
}
}
}
} else if (fileFilter.accept(fileStatus.getPath())) {
Expand All @@ -118,8 +126,12 @@ private static List<FileStatus> listPathsRecursivelyHelper(FileSystem fs, List<F
}
if (fileStatus.isDir()) {
try {
Path qualifiedPath = fs.makeQualified(fileStatus.getPath());
for (FileStatus status : fs.listStatus(fileStatus.getPath())) {
listPathsRecursivelyHelper(fs, files, status, fileFilter);
// Fix for hadoop issue: https://issues.apache.org/jira/browse/HADOOP-12169
if (!qualifiedPath.equals(status.getPath())) {
listPathsRecursivelyHelper(fs, files, status, fileFilter);
}
}
} catch (IOException ioe) {
LOG.error("Could not list contents of path " + fileStatus.getPath());
Expand Down
42 changes: 40 additions & 2 deletions gobblin-utility/src/main/java/gobblin/util/HadoopUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -33,12 +34,14 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Lists;
import com.google.common.io.BaseEncoding;
import com.google.common.io.Closer;
Expand All @@ -52,6 +55,12 @@ public class HadoopUtils {

public static final String HDFS_ILLEGAL_TOKEN_REGEX = "[\\s:\\\\]";

private static final Set<String> SCHEMES_SUPPORTING_ATOMIC_FOLDER_RENAME =
ImmutableSortedSet
.orderedBy(String.CASE_INSENSITIVE_ORDER)
.add("hdfs")
.build();

public static Configuration newConfiguration() {
Configuration conf = new Configuration();

Expand Down Expand Up @@ -149,8 +158,14 @@ public static void copyPath(FileSystem fs, Path src, Path dst) throws IOExceptio
* An {@link IOException} if {@link FileUtil#copy(FileSystem, Path, FileSystem, Path, boolean, Configuration)} returns false.
*/
public static void movePath(FileSystem srcFs, Path src, FileSystem dstFs, Path dst) throws IOException {
if (srcFs.getUri().equals(dstFs.getUri())) {
renamePath(srcFs, src, dst);
// Determining if the source filesystem in local will allow us to call moveFromLocalFile and give the
// destination filesystem the ability optimize the transfer. For example, when the source file is
// local the S3AFileSystem will skip buffering it into the fs.s3a.buffer.dir and immediately transfer the
// file to S3.
boolean isSourceFileSystemLocal = srcFs instanceof LocalFileSystem;
FileStatus srcStatus = srcFs.getFileStatus(src);
if (isSourceFileSystemLocal) {
dstFs.moveFromLocalFile(srcStatus.getPath(), dst);
} else {
if (!FileUtil.copy(srcFs, src, dstFs, dst, true, false, dstFs.getConf())) {
throw new IOException(String.format("Failed to move %s to %s", src, dst));
Expand Down Expand Up @@ -465,4 +480,27 @@ public static String sanitizePath(String path, String substitute) {
public static Path sanitizePath(Path path, String substitute) {
return new Path(sanitizePath(path.toString(), substitute));
}

/**
* Returns {@code true} if the specified {@link FileSystem}s are equivalent; otherwise, {@code false}.
*
* @param left The left {@link FileSystem}.
* @param right the right {@link FileSystem}.
* @return A {@link boolean} indicating whether the specified {@link FileSystem}s are equivalent.
*/
public static boolean areFileSystemsEquivalent(FileSystem left, FileSystem right) {
return left.getUri().equals(right.getUri());
}

/**
* Returns {@code true} if the specified {@link FileSystem}s supports atomically renaming a
* folder; otherwise, {@code false}.
*
* @param fileSystem The {@link FileSystem} to check.
* @return A {@link boolean} indicating whether the specified {@link FileSystem}s supports atomic folder renames.
*/
public static boolean isFolderRenameAtomic(FileSystem fileSystem) {
String scheme = fileSystem.getUri().getScheme();
return SCHEMES_SUPPORTING_ATOMIC_FOLDER_RENAME.contains(scheme);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,13 @@ public static void cleanTaskStagingData(State state, Logger logger, Closer close
Path stagingPath = WriterUtils.getWriterStagingDir(state, numBranches, branchId);
if (fs.exists(stagingPath)) {
logger.info("Cleaning up staging directory " + stagingPath.toUri().getPath());
parallelRunner.deletePath(stagingPath, true);
parallelRunner.deletePath(fs, stagingPath, true);
}

Path outputPath = WriterUtils.getWriterOutputDir(state, numBranches, branchId);
if (fs.exists(outputPath)) {
logger.info("Cleaning up output directory " + outputPath.toUri().getPath());
parallelRunner.deletePath(outputPath, true);
parallelRunner.deletePath(fs, outputPath, true);
}
}
}
Expand Down Expand Up @@ -252,7 +252,7 @@ private static ParallelRunner getParallelRunner(FileSystem fs, Closer closer, in
Map<String, ParallelRunner> parallelRunners) {
String uriAndHomeDir = new Path(new Path(fs.getUri()), fs.getHomeDirectory()).toString();
if (!parallelRunners.containsKey(uriAndHomeDir)) {
parallelRunners.put(uriAndHomeDir, closer.register(new ParallelRunner(parallelRunnerThreads, fs)));
parallelRunners.put(uriAndHomeDir, closer.register(new ParallelRunner(parallelRunnerThreads)));
}
return parallelRunners.get(uriAndHomeDir);
}
Expand Down
Loading