Skip to content

Commit

Permalink
Merge pull request #10 from civitaspo/v0.1.4
Browse files Browse the repository at this point in the history
support glob path: FileNotFoundException
  • Loading branch information
civitaspo committed Oct 22, 2015
2 parents 1552171 + 7fa14ce commit 7fcc88e
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions src/main/java/org/embulk/input/hdfs/HdfsFileInputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control contr
String pathString = strftime(task.getPath(), task.getRewindSeconds());
try {
List<String> originalFileList = buildFileList(getFs(task), pathString);

if (originalFileList.isEmpty()) {
throw new PathNotFoundException(pathString);
}

task.setFiles(allocateHdfsFilesToTasks(task, getFs(task), originalFileList));
logger.info("Loading target files: {}", originalFileList);
}
Expand Down Expand Up @@ -192,19 +197,15 @@ private List<String> buildFileList(final FileSystem fs, final String pathString)
List<String> fileList = new ArrayList<>();
Path rootPath = new Path(pathString);

if (fs.exists(rootPath)) {
for (FileStatus entry : fs.globStatus(rootPath)) {
if (entry.isDirectory()) {
fileList.addAll(lsr(fs, entry));
}
else {
fileList.add(entry.getPath().toString());
}
for (FileStatus entry : fs.globStatus(rootPath)) {
if (entry.isDirectory()) {
fileList.addAll(lsr(fs, entry));
}
else {
fileList.add(entry.getPath().toString());
}
}
else {
throw new PathNotFoundException(rootPath.toString());
}

return fileList;
}

Expand Down

0 comments on commit 7fcc88e

Please sign in to comment.