diff --git a/src/main/java/com/m6d/filecrush/crush/Crush.java b/src/main/java/com/m6d/filecrush/crush/Crush.java index 96f5072..e1c12fa 100644 --- a/src/main/java/com/m6d/filecrush/crush/Crush.java +++ b/src/main/java/com/m6d/filecrush/crush/Crush.java @@ -153,6 +153,11 @@ public class Crush extends Configured implements Tool { * specification. */ private List matchers; + + /** + * Regex from the --ignore-regex option used for filtering out files for crushing. + */ + private Matcher ignoredFiles; /** * The counters from the completed job. @@ -206,6 +211,15 @@ Options buildOptions() { options.addOption(option); + option = OptionBuilder + .hasArg() + .withArgName("ignore file regex") + .withDescription("Regular expression to apply for filtering out crush candidate files. Any files in the input crush directory matching this will be ignored") + .withLongOpt("ignore-regex") + .create(); + + options.addOption(option); + option = OptionBuilder .hasArg() .withArgName("replacement string") @@ -344,6 +358,10 @@ boolean createJobConfAndParseArgs(String... args) throws ParseException, IOExcep } else { console = Verbosity.NONE; } + + if (cli.hasOption("ignore-regex")) { + ignoredFiles = Pattern.compile(cli.getOptionValue("ignore-regex")).matcher(""); + } excludeSingleFileDirs = !cli.hasOption("include-single-file-dirs"); @@ -478,6 +496,10 @@ boolean createJobConfAndParseArgs(String... args) throws ParseException, IOExcep * Add the crush specs and compression options to the configuration. */ job.set("crush.timestamp", crushTimestamp); + + if (ignoredFiles != null) { + job.set("crush.ignore-regex", ignoredFiles.pattern().pattern()); + } if (regexes.size() != replacements.size() || replacements.size() != inFormats.size() || inFormats.size() != outFormats.size()) { throw new IllegalArgumentException("Must be an equal number of regex, replacement, in-format, and out-format options"); @@ -654,6 +676,14 @@ private void standAlone() throws IOException { for (FileStatus content : contents) { if (!content.isDir()) { + if (ignoredFiles != null) { + // Check for files to skip + ignoredFiles.reset(content.getPath().toUri().getPath()); + if (ignoredFiles.matches()) { + LOG.trace("Ignoring " + content.getPath().toString()); + continue; + } + } files.add(new Text(content.getPath().toUri().getPath())); } } @@ -681,17 +711,19 @@ private void standAlone() throws IOException { CrushReducer reducer = new CrushReducer(); reducer.configure(job); - reducer.reduce(bucket, files.iterator(), new NullOutputCollector(), Reporter.NULL); + reducer.reduce(bucket, files.iterator(), new NullOutputCollector(), Reporter.NULL); + reducer.close(); /* * Use a glob here because the temporary and task attempt work dirs have funny names. + * Include a * at the end to cover wildcards for compressed files. */ - Path crushOutput = new Path(absOutDir + "/*/*/crush" + absSrcDir + "/" + dest.getName()); + Path crushOutput = new Path(absOutDir + "/*/*/crush" + absSrcDir + "/" + dest.getName() + "*"); FileStatus[] statuses = fs.globStatus(crushOutput); if (statuses == null || 1 != statuses.length) { - throw new AssertionError(); + throw new AssertionError("Did not find the expected output in " + crushOutput.toString()); } rename(statuses[0].getPath(), dest.getParent(), dest.getName()); @@ -992,7 +1024,15 @@ void writeDirs() throws IOException { print(Verbosity.INFO, "\n\n" + dir.toUri().getPath()); - FileStatus[] contents = fs.listStatus(dir); + FileStatus[] contents = fs.listStatus(dir, new PathFilter() { + @Override + public boolean accept(Path testPath) { + if (ignoredFiles == null) return true; + ignoredFiles.reset(testPath.toUri().getPath()); + return !ignoredFiles.matches(); + } + + }); if (contents == null || contents.length == 0) { print(Verbosity.INFO, " is empty"); diff --git a/src/test/java/com/m6d/filecrush/crush/CrushStandAloneTextTest.java b/src/test/java/com/m6d/filecrush/crush/CrushStandAloneTextTest.java index 846d606..74e93ce 100644 --- a/src/test/java/com/m6d/filecrush/crush/CrushStandAloneTextTest.java +++ b/src/test/java/com/m6d/filecrush/crush/CrushStandAloneTextTest.java @@ -140,6 +140,50 @@ public void noFiles() throws Exception { assertThat(out.exists(), is(false)); } + @Test + public void ignoreRegexTest() throws Exception { + + File in = tmp.newFolder("skip_test"); + + createFile(in, "lil-0", 0, 1); + createFile(in, "lil-1", 1, 2); + createFile(in, "big-2", 2, 5); + createFile(in, "big-3", 3, 5); + // Files to be ignored + createFile(in, "lil-0.index", 0, 10); + createFile(in, "lil-1.index", 1, 20); + createFile(in, "big-2.index", 2, 50); + createFile(in, "big-3.index", 3, 50); + + File out = new File(tmp.getRoot(), "skip_test_out"); + + ToolRunner.run(job, new Crush(), new String[] { + "--input-format=text", + "--output-format=text", + "--ignore-regex=.*\\.index", + "--compress=none", + + in.getAbsolutePath(), out.getAbsolutePath() + }); + + /* + * Make sure the original files are still there. + */ + verifyFile(in, "lil-0", 0, 1); + verifyFile(in, "lil-1", 1, 2); + verifyFile(in, "big-2", 2, 5); + verifyFile(in, "big-3", 3, 5); + verifyFile(in, "lil-0.index", 0, 10); + verifyFile(in, "lil-1.index", 1, 20); + verifyFile(in, "big-2.index", 2, 50); + verifyFile(in, "big-3.index", 3, 50); + + /* + * Verify the crush output. + */ + verifyCrushOutput(out, new int[] { 0, 1 }, new int[] { 1, 2}, new int[] { 2, 5 }, new int[] { 3, 5 }); + } + private void verifyCrushOutput(File crushOutput, int[]... keyCounts) throws IOException { List actual = new ArrayList(); diff --git a/src/test/java/com/m6d/filecrush/crush/integration/CrushMapReduceTest.java b/src/test/java/com/m6d/filecrush/crush/integration/CrushMapReduceTest.java index 5f6f86b..4ef4206 100644 --- a/src/test/java/com/m6d/filecrush/crush/integration/CrushMapReduceTest.java +++ b/src/test/java/com/m6d/filecrush/crush/integration/CrushMapReduceTest.java @@ -809,6 +809,37 @@ public void executeBackwardsCompatibleSequence() throws Exception { assertThat(jobCounters.getCounter(ReducerCounter.FILES_CRUSHED), equalTo( 23L)); assertThat(jobCounters.getCounter(ReducerCounter.RECORDS_CRUSHED), equalTo(964L)); } + + + @Test + public void executeIgnoreFile() throws Exception { + + // Ones to include + writeFile("in_skip_test/file10", Format.TEXT); + writeFile("in_skip_test/file11", Format.TEXT); + writeFile("in_skip_test/file12", Format.TEXT); + // Ones to skip + writeFile("in_skip_test/file90", Format.TEXT); + writeFile("in_skip_test/file91", Format.TEXT); + writeFile("in_skip_test/file92", Format.TEXT); + + Crush crush = new Crush(); + + ToolRunner.run(job, crush, new String [] { + "--threshold=0.015", + "--max-file-blocks=1", + "--verbose", + "--input-format=text", + "--output-format=text", + "--compress=none", + "--ignore-regex=.*9[0-9]", + + "in_skip_test", "out_skip_test", "20101116153015" + }); + + verifyOutput(homeDir + "/out_skip_test", "crushed_file-*-*-*", Format.TEXT, Format.TEXT, null, "file10", "file11", "file12"); + + } /** * Copies data from the given input stream to an HDFS file at the given path. This method will close the input stream. @@ -921,7 +952,7 @@ private void verifyOutput(String dir, String crushOutMask, Format inFmt, Format if (Format.TEXT == outFmt) { /* * TextInputFormat will produce keys that are byte offsets and values that are the line. This is not actually what we want. - * We want to preserve the actualy keys and values in the files, just like SequenceFileInputFormat. So, either way, the + * We want to preserve the actual keys and values in the files, just like SequenceFileInputFormat. So, either way, the * keys and values should be the text representations of what went in. */ BufferedReader reader; @@ -931,9 +962,9 @@ private void verifyOutput(String dir, String crushOutMask, Format inFmt, Format Path path = new Path(dir + "/" + crushOutMask); FileStatus[] globStatus = getFileSystem().globStatus(path); - + if (globStatus == null || 1 != globStatus.length || globStatus[0].isDir()) { - fail(crushOutMask); + fail(crushOutMask + " was not found in " + path); } crushOut = globStatus[0].getPath();