From 34929763135a0b3e35227da539863c63347df7e5 Mon Sep 17 00:00:00 2001 From: gsteelman Date: Mon, 11 Aug 2014 09:56:52 -0700 Subject: [PATCH 01/10] Added option to skip indexing small files. Added option to not recurse for files to index from input paths. --- .../lzo/DistributedLzoIndexer.java | 116 ++++++++++++++---- 1 file changed, 89 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java index edbde8f4..96e75c1f 100644 --- a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java +++ b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java @@ -9,8 +9,10 @@ import com.hadoop.mapreduce.LzoIndexOutputFormat; import com.hadoop.mapreduce.LzoSplitInputFormat; import com.hadoop.mapreduce.LzoSplitRecordReader; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -25,47 +27,95 @@ public class DistributedLzoIndexer extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(DistributedLzoIndexer.class); - private final String LZO_EXTENSION = new LzopCodec().getDefaultExtension(); + private final String LZO_EXTENSION = new LzopCodec().getDefaultExtension(); + + private final String SKIP_INDEXING_SMALL_FILES_KEY = "skip_indexing_small_files"; + private final String RECURSIVE_INDEXING_KEY = "recursive_indexing"; + private final boolean SKIP_INDEXING_SMALL_FILES_DEFAULT = false; + private final boolean RECURSIVE_INDEXING_DEFAULT = true; + private boolean skipIndexingSmallFiles = this.SKIP_INDEXING_SMALL_FILES_DEFAULT; + private boolean recursiveIndexing = this.RECURSIVE_INDEXING_DEFAULT; + + private Configuration conf = getConf(); + + /** + * Accepts paths not ending in /_temporary. + */ private final PathFilter nonTemporaryFilter = new PathFilter() { + @Override public boolean accept(Path path) { return !path.toString().endsWith("/_temporary"); } }; - private void walkPath(Path path, PathFilter pathFilter, List accumulator) { + /** + * Accepts paths pointing to files with length greater than a block size. + */ + private final PathFilter bigFileFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + FileStatus status; + try { + FileSystem fs = path.getFileSystem(getConf()); + status = fs.getFileStatus(path); + } catch (IOException e) { + LOG.info("Unable to get status of path " + path); + return false; + } + return status.getLen() >= status.getBlockSize() ? true : false; + } + }; + + private void visitPath(Path path, PathFilter pathFilter, List accumulator, boolean recursive) { try { - FileSystem fs = path.getFileSystem(getConf()); + FileSystem fs = path.getFileSystem(this.conf); FileStatus fileStatus = fs.getFileStatus(path); - if (fileStatus.isDir()) { - FileStatus[] children = fs.listStatus(path, pathFilter); - for (FileStatus childStatus : children) { - walkPath(childStatus.getPath(), pathFilter, accumulator); - } - } else if (path.toString().endsWith(LZO_EXTENSION)) { - Path lzoIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX); - if (fs.exists(lzoIndexPath)) { - // If the index exists and is of nonzero size, we're already done. - // We re-index a file with a zero-length index, because every file has at least one block. - if (fs.getFileStatus(lzoIndexPath).getLen() > 0) { - LOG.info("[SKIP] LZO index file already exists for " + path); - return; - } else { - LOG.info("Adding LZO file " + path + " to indexing list (index file exists but is zero length)"); - accumulator.add(path); + if (fileStatus.isDirectory()) { + if (recursive) { + FileStatus[] children = fs.listStatus(path, pathFilter); + for (FileStatus childStatus : children) { + visitPath(childStatus.getPath(), pathFilter, accumulator, recursive); } } else { - // If no index exists, we need to index the file. - LOG.info("Adding LZO file " + path + " to indexing list (no index currently exists)"); - accumulator.add(path); + LOG.info("[SKIP] Path " + path + " is a directory and recursion is not enabled."); } + } else if (shouldIndexPath(path, fs)) { + accumulator.add(path); } } catch (IOException ioe) { LOG.warn("Error walking path: " + path, ioe); } } + private boolean shouldIndexPath(Path path, FileSystem fs) throws IOException { + if (path.toString().endsWith(LZO_EXTENSION)) { + if (this.skipIndexingSmallFiles && !this.bigFileFilter.accept(path)) { + LOG.info("[SKIP] Skip indexing small files enabled and " + path + " is too small"); + return false; + } + + Path lzoIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX); + if (fs.exists(lzoIndexPath)) { + // If the index exists and is of nonzero size, we're already done. + // We re-index a file with a zero-length index, because every file has at least one block. + if (fs.getFileStatus(path).getLen() > 0) { + LOG.info("[SKIP] LZO index file already exists for " + path); + return false; + } else { + LOG.info("Adding LZO file " + path + " to indexing list (index file exists but is zero length)"); + return true; + } + } else { + // If no index exists, we need to index the file. + LOG.info("Adding LZO file " + path + " to indexing list (no index currently exists)"); + return true; + } + } + return false; + } + public int run(String[] args) throws Exception { if (args.length == 0 || (args.length == 1 && "--help".equals(args[0]))) { printUsage(); @@ -73,9 +123,16 @@ public int run(String[] args) throws Exception { return -1; } + this.conf = getConf(); + + this.skipIndexingSmallFiles = + this.conf.getBoolean(SKIP_INDEXING_SMALL_FILES_KEY, false); + + // Find paths to index based on recursive/not + this.recursiveIndexing = this.conf.getBoolean(RECURSIVE_INDEXING_KEY, true); List inputPaths = new ArrayList(); - for (String strPath: args) { - walkPath(new Path(strPath), nonTemporaryFilter, inputPaths); + for (String strPath : args) { + visitPath(new Path(strPath), nonTemporaryFilter, inputPaths, this.recursiveIndexing); } if (inputPaths.isEmpty()) { @@ -84,7 +141,7 @@ public int run(String[] args) throws Exception { return 0; } - Job job = new Job(getConf()); + Job job = new Job(this.conf); job.setJobName("Distributed Lzo Indexer " + Arrays.toString(args)); job.setOutputKeyClass(Path.class); @@ -134,7 +191,12 @@ public static void main(String[] args) throws Exception { System.exit(exitCode); } - public static void printUsage() { - System.err.println("Usage: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.DistributedLzoIndexer [file2.lzo directory3 ...]"); + public void printUsage() { + String usage = + "Command: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.DistributedLzoIndexer [file2.lzo directory3 ...]" + + "\nConfiguration options: [values] description" + + "\n" + this.SKIP_INDEXING_SMALL_FILES_KEY + " [true,false] <" + this.SKIP_INDEXING_SMALL_FILES_DEFAULT + "> When indexing, skip files smaller than a block in size." + + "\n" + this.RECURSIVE_INDEXING_KEY + " [true,false] <" + this.RECURSIVE_INDEXING_DEFAULT + "> Look for files to index recursively from paths on command line."; + System.err.println(usage); } } From 1a12f895c4fb3756b08ba39461d1b238457401d5 Mon Sep 17 00:00:00 2001 From: gsteelman Date: Mon, 11 Aug 2014 15:09:52 -0700 Subject: [PATCH 02/10] Removed hard-coded default values in run(). --- .../com/hadoop/compression/lzo/DistributedLzoIndexer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java index 96e75c1f..5b5c5ce7 100644 --- a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java +++ b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java @@ -126,10 +126,10 @@ public int run(String[] args) throws Exception { this.conf = getConf(); this.skipIndexingSmallFiles = - this.conf.getBoolean(SKIP_INDEXING_SMALL_FILES_KEY, false); + this.conf.getBoolean(SKIP_INDEXING_SMALL_FILES_KEY, this.SKIP_INDEXING_SMALL_FILES_DEFAULT); // Find paths to index based on recursive/not - this.recursiveIndexing = this.conf.getBoolean(RECURSIVE_INDEXING_KEY, true); + this.recursiveIndexing = this.conf.getBoolean(RECURSIVE_INDEXING_KEY, this.RECURSIVE_INDEXING_DEFAULT); List inputPaths = new ArrayList(); for (String strPath : args) { visitPath(new Path(strPath), nonTemporaryFilter, inputPaths, this.recursiveIndexing); From 8759f784a6c5a955c144131db139c4beb94cbeee Mon Sep 17 00:00:00 2001 From: gsteelman Date: Thu, 14 Aug 2014 14:07:44 -0700 Subject: [PATCH 03/10] Added config option for file size to be considered 'small'. --- .../lzo/DistributedLzoIndexer.java | 58 +++++++++++-------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java index 5b5c5ce7..6d223acf 100644 --- a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java +++ b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java @@ -29,41 +29,46 @@ public class DistributedLzoIndexer extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(DistributedLzoIndexer.class); private final String LZO_EXTENSION = new LzopCodec().getDefaultExtension(); - - private final String SKIP_INDEXING_SMALL_FILES_KEY = "skip_indexing_small_files"; - private final String RECURSIVE_INDEXING_KEY = "recursive_indexing"; - private final boolean SKIP_INDEXING_SMALL_FILES_DEFAULT = false; - private final boolean RECURSIVE_INDEXING_DEFAULT = true; - private boolean skipIndexingSmallFiles = this.SKIP_INDEXING_SMALL_FILES_DEFAULT; - private boolean recursiveIndexing = this.RECURSIVE_INDEXING_DEFAULT; + + private final String LZO_SKIP_INDEXING_SMALL_FILES_KEY = "lzo_skip_indexing_small_files"; + private final String LZO_SMALL_FILE_SIZE_KEY = "lzo_small file_size"; + private final String LZO_RECURSIVE_INDEXING_KEY = "lzo_recursive_indexing"; + private final boolean LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT = false; + private final boolean LZO_RECURSIVE_INDEXING_DEFAULT = true; + private final long LZO_SMALL_FILE_SIZE_DEFAULT = Long.MIN_VALUE; + private boolean lzoSkipIndexingSmallFiles = this.LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT; + private boolean lzoRecursiveIndexing = this.LZO_RECURSIVE_INDEXING_DEFAULT; + private long lzoSmallFileSize = this.LZO_SMALL_FILE_SIZE_DEFAULT; + + private final String TEMP_FILE_EXTENSION = "/_temporary"; private Configuration conf = getConf(); /** - * Accepts paths not ending in /_temporary. + * Accepts paths which don't end in TEMP_FILE_EXTENSION */ private final PathFilter nonTemporaryFilter = new PathFilter() { @Override public boolean accept(Path path) { - return !path.toString().endsWith("/_temporary"); + return !path.toString().endsWith(TEMP_FILE_EXTENSION); } }; /** - * Accepts paths pointing to files with length greater than a block size. + * Accepts paths pointing to files with length >= lzoSmallFileSize. */ private final PathFilter bigFileFilter = new PathFilter() { @Override public boolean accept(Path path) { FileStatus status; try { - FileSystem fs = path.getFileSystem(getConf()); + FileSystem fs = path.getFileSystem(conf); status = fs.getFileStatus(path); } catch (IOException e) { LOG.info("Unable to get status of path " + path); return false; } - return status.getLen() >= status.getBlockSize() ? true : false; + return status.getLen() >= lzoSmallFileSize ? true : false; } }; @@ -81,7 +86,7 @@ private void visitPath(Path path, PathFilter pathFilter, List accumulator, } else { LOG.info("[SKIP] Path " + path + " is a directory and recursion is not enabled."); } - } else if (shouldIndexPath(path, fs)) { + } else if (shouldIndexPath(fileStatus, fs)) { accumulator.add(path); } } catch (IOException ioe) { @@ -89,18 +94,19 @@ private void visitPath(Path path, PathFilter pathFilter, List accumulator, } } - private boolean shouldIndexPath(Path path, FileSystem fs) throws IOException { + private boolean shouldIndexPath(FileStatus fileStatus, FileSystem fs) throws IOException { + Path path = fileStatus.getPath(); if (path.toString().endsWith(LZO_EXTENSION)) { - if (this.skipIndexingSmallFiles && !this.bigFileFilter.accept(path)) { + if (this.lzoSkipIndexingSmallFiles && !this.bigFileFilter.accept(path)) { LOG.info("[SKIP] Skip indexing small files enabled and " + path + " is too small"); return false; } - Path lzoIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX); + Path lzoIndexPath = new Path(path, LzoIndex.LZO_INDEX_SUFFIX); if (fs.exists(lzoIndexPath)) { // If the index exists and is of nonzero size, we're already done. // We re-index a file with a zero-length index, because every file has at least one block. - if (fs.getFileStatus(path).getLen() > 0) { + if (fileStatus.getLen() > 0) { LOG.info("[SKIP] LZO index file already exists for " + path); return false; } else { @@ -125,14 +131,17 @@ public int run(String[] args) throws Exception { this.conf = getConf(); - this.skipIndexingSmallFiles = - this.conf.getBoolean(SKIP_INDEXING_SMALL_FILES_KEY, this.SKIP_INDEXING_SMALL_FILES_DEFAULT); + this.lzoSkipIndexingSmallFiles = + this.conf.getBoolean(this.LZO_SKIP_INDEXING_SMALL_FILES_KEY, this.LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT); + + this.lzoSmallFileSize = + this.conf.getLong(this.LZO_SMALL_FILE_SIZE_KEY, this.LZO_SMALL_FILE_SIZE_DEFAULT); // Find paths to index based on recursive/not - this.recursiveIndexing = this.conf.getBoolean(RECURSIVE_INDEXING_KEY, this.RECURSIVE_INDEXING_DEFAULT); + this.lzoRecursiveIndexing = this.conf.getBoolean(LZO_RECURSIVE_INDEXING_KEY, this.LZO_RECURSIVE_INDEXING_DEFAULT); List inputPaths = new ArrayList(); for (String strPath : args) { - visitPath(new Path(strPath), nonTemporaryFilter, inputPaths, this.recursiveIndexing); + visitPath(new Path(strPath), nonTemporaryFilter, inputPaths, this.lzoRecursiveIndexing); } if (inputPaths.isEmpty()) { @@ -194,9 +203,10 @@ public static void main(String[] args) throws Exception { public void printUsage() { String usage = "Command: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.DistributedLzoIndexer [file2.lzo directory3 ...]" + - "\nConfiguration options: [values] description" + - "\n" + this.SKIP_INDEXING_SMALL_FILES_KEY + " [true,false] <" + this.SKIP_INDEXING_SMALL_FILES_DEFAULT + "> When indexing, skip files smaller than a block in size." + - "\n" + this.RECURSIVE_INDEXING_KEY + " [true,false] <" + this.RECURSIVE_INDEXING_DEFAULT + "> Look for files to index recursively from paths on command line."; + "\nConfiguration options: \"key\" [values] description" + + "\n" + this.LZO_SKIP_INDEXING_SMALL_FILES_KEY + " [true,false] <" + this.LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT + "> When indexing, skip files smaller than " + this.LZO_SMALL_FILE_SIZE_KEY + " bytes." + + "\n" + this.LZO_SMALL_FILE_SIZE_KEY + " [long] <" + this.LZO_SMALL_FILE_SIZE_DEFAULT + "> When indexing, skip files smaller than this number of bytes if " + this.LZO_SKIP_INDEXING_SMALL_FILES_KEY + " is true." + + "\n" + this.LZO_RECURSIVE_INDEXING_KEY + " [true,false] <" + this.LZO_RECURSIVE_INDEXING_DEFAULT + "> Look for files to index recursively from paths on command line."; System.err.println(usage); } } From 37381dac5e34d64b4592f1126ecbe2783c680d86 Mon Sep 17 00:00:00 2001 From: gsteelman Date: Mon, 1 Sep 2014 15:39:12 -0700 Subject: [PATCH 04/10] Added static modifiers to constants, replaced unnecessary PathFilter with helper method. --- .../lzo/DistributedLzoIndexer.java | 49 +++++++------------ 1 file changed, 18 insertions(+), 31 deletions(-) diff --git a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java index 6d223acf..2080e049 100644 --- a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java +++ b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java @@ -28,19 +28,19 @@ public class DistributedLzoIndexer extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(DistributedLzoIndexer.class); - private final String LZO_EXTENSION = new LzopCodec().getDefaultExtension(); + private static final String LZO_EXTENSION = new LzopCodec().getDefaultExtension(); - private final String LZO_SKIP_INDEXING_SMALL_FILES_KEY = "lzo_skip_indexing_small_files"; - private final String LZO_SMALL_FILE_SIZE_KEY = "lzo_small file_size"; - private final String LZO_RECURSIVE_INDEXING_KEY = "lzo_recursive_indexing"; - private final boolean LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT = false; - private final boolean LZO_RECURSIVE_INDEXING_DEFAULT = true; - private final long LZO_SMALL_FILE_SIZE_DEFAULT = Long.MIN_VALUE; - private boolean lzoSkipIndexingSmallFiles = this.LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT; - private boolean lzoRecursiveIndexing = this.LZO_RECURSIVE_INDEXING_DEFAULT; - private long lzoSmallFileSize = this.LZO_SMALL_FILE_SIZE_DEFAULT; + private static final String LZO_SKIP_INDEXING_SMALL_FILES_KEY = "lzo_skip_indexing_small_files"; + private static final String LZO_SMALL_FILE_SIZE_KEY = "lzo_small_file_size"; + private static final String LZO_RECURSIVE_INDEXING_KEY = "lzo_recursive_indexing"; + private static final boolean LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT = false; + private static final boolean LZO_RECURSIVE_INDEXING_DEFAULT = true; + private static final long LZO_SMALL_FILE_SIZE_DEFAULT = 0; + private boolean lzoSkipIndexingSmallFiles = LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT; + private boolean lzoRecursiveIndexing = LZO_RECURSIVE_INDEXING_DEFAULT; + private long lzoSmallFileSize = LZO_SMALL_FILE_SIZE_DEFAULT; - private final String TEMP_FILE_EXTENSION = "/_temporary"; + private static final String TEMP_FILE_EXTENSION = "/_temporary"; private Configuration conf = getConf(); @@ -55,23 +55,12 @@ public boolean accept(Path path) { }; /** - * Accepts paths pointing to files with length >= lzoSmallFileSize. + * Returns whether a file should be considered small enough to skip indexing. */ - private final PathFilter bigFileFilter = new PathFilter() { - @Override - public boolean accept(Path path) { - FileStatus status; - try { - FileSystem fs = path.getFileSystem(conf); - status = fs.getFileStatus(path); - } catch (IOException e) { - LOG.info("Unable to get status of path " + path); - return false; - } - return status.getLen() >= lzoSmallFileSize ? true : false; - } - }; - + private boolean isSmallFile(FileStatus status) { + return status.getLen() < lzoSmallFileSize; + } + private void visitPath(Path path, PathFilter pathFilter, List accumulator, boolean recursive) { try { FileSystem fs = path.getFileSystem(this.conf); @@ -97,7 +86,7 @@ private void visitPath(Path path, PathFilter pathFilter, List accumulator, private boolean shouldIndexPath(FileStatus fileStatus, FileSystem fs) throws IOException { Path path = fileStatus.getPath(); if (path.toString().endsWith(LZO_EXTENSION)) { - if (this.lzoSkipIndexingSmallFiles && !this.bigFileFilter.accept(path)) { + if (this.lzoSkipIndexingSmallFiles && isSmallFile(fileStatus)) { LOG.info("[SKIP] Skip indexing small files enabled and " + path + " is too small"); return false; } @@ -128,9 +117,7 @@ public int run(String[] args) throws Exception { ToolRunner.printGenericCommandUsage(System.err); return -1; } - - this.conf = getConf(); - + this.lzoSkipIndexingSmallFiles = this.conf.getBoolean(this.LZO_SKIP_INDEXING_SMALL_FILES_KEY, this.LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT); From a42e041c0a4949c91756a452960d854f72c30e43 Mon Sep 17 00:00:00 2001 From: gsteelman Date: Tue, 2 Sep 2014 10:41:18 -0700 Subject: [PATCH 05/10] Removed 'this' modifier from static variables. --- .../compression/lzo/DistributedLzoIndexer.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java index 2080e049..f1d1d530 100644 --- a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java +++ b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java @@ -119,13 +119,13 @@ public int run(String[] args) throws Exception { } this.lzoSkipIndexingSmallFiles = - this.conf.getBoolean(this.LZO_SKIP_INDEXING_SMALL_FILES_KEY, this.LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT); + this.conf.getBoolean(LZO_SKIP_INDEXING_SMALL_FILES_KEY, LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT); this.lzoSmallFileSize = - this.conf.getLong(this.LZO_SMALL_FILE_SIZE_KEY, this.LZO_SMALL_FILE_SIZE_DEFAULT); + this.conf.getLong(LZO_SMALL_FILE_SIZE_KEY, LZO_SMALL_FILE_SIZE_DEFAULT); // Find paths to index based on recursive/not - this.lzoRecursiveIndexing = this.conf.getBoolean(LZO_RECURSIVE_INDEXING_KEY, this.LZO_RECURSIVE_INDEXING_DEFAULT); + this.lzoRecursiveIndexing = this.conf.getBoolean(LZO_RECURSIVE_INDEXING_KEY, LZO_RECURSIVE_INDEXING_DEFAULT); List inputPaths = new ArrayList(); for (String strPath : args) { visitPath(new Path(strPath), nonTemporaryFilter, inputPaths, this.lzoRecursiveIndexing); @@ -191,9 +191,9 @@ public void printUsage() { String usage = "Command: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.DistributedLzoIndexer [file2.lzo directory3 ...]" + "\nConfiguration options: \"key\" [values] description" + - "\n" + this.LZO_SKIP_INDEXING_SMALL_FILES_KEY + " [true,false] <" + this.LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT + "> When indexing, skip files smaller than " + this.LZO_SMALL_FILE_SIZE_KEY + " bytes." + - "\n" + this.LZO_SMALL_FILE_SIZE_KEY + " [long] <" + this.LZO_SMALL_FILE_SIZE_DEFAULT + "> When indexing, skip files smaller than this number of bytes if " + this.LZO_SKIP_INDEXING_SMALL_FILES_KEY + " is true." + - "\n" + this.LZO_RECURSIVE_INDEXING_KEY + " [true,false] <" + this.LZO_RECURSIVE_INDEXING_DEFAULT + "> Look for files to index recursively from paths on command line."; + "\n" + LZO_SKIP_INDEXING_SMALL_FILES_KEY + " [true,false] <" + LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT + "> When indexing, skip files smaller than " + LZO_SMALL_FILE_SIZE_KEY + " bytes." + + "\n" + LZO_SMALL_FILE_SIZE_KEY + " [long] <" +LZO_SMALL_FILE_SIZE_DEFAULT + "> When indexing, skip files smaller than this number of bytes if " + LZO_SKIP_INDEXING_SMALL_FILES_KEY + " is true." + + "\n" + LZO_RECURSIVE_INDEXING_KEY + " [true,false] <" + LZO_RECURSIVE_INDEXING_DEFAULT + "> Look for files to index recursively from paths on command line."; System.err.println(usage); } } From 6d6dda1d89645a54bef9be76092670fb55d0c1c3 Mon Sep 17 00:00:00 2001 From: gsteelman Date: Mon, 14 Dec 2015 16:19:43 -0800 Subject: [PATCH 06/10] Remove unnecessary this keyword scoping, fix style for static/member variable grouping, rename constants for grouping, rename constants values for hadoop naming style. --- .../lzo/DistributedLzoIndexer.java | 51 ++++++++++--------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java index f1d1d530..c501d80e 100644 --- a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java +++ b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java @@ -30,18 +30,18 @@ public class DistributedLzoIndexer extends Configured implements Tool { private static final String LZO_EXTENSION = new LzopCodec().getDefaultExtension(); - private static final String LZO_SKIP_INDEXING_SMALL_FILES_KEY = "lzo_skip_indexing_small_files"; - private static final String LZO_SMALL_FILE_SIZE_KEY = "lzo_small_file_size"; - private static final String LZO_RECURSIVE_INDEXING_KEY = "lzo_recursive_indexing"; - private static final boolean LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT = false; - private static final boolean LZO_RECURSIVE_INDEXING_DEFAULT = true; - private static final long LZO_SMALL_FILE_SIZE_DEFAULT = 0; - private boolean lzoSkipIndexingSmallFiles = LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT; - private boolean lzoRecursiveIndexing = LZO_RECURSIVE_INDEXING_DEFAULT; - private long lzoSmallFileSize = LZO_SMALL_FILE_SIZE_DEFAULT; - + private static final String LZO_INDEXING_SKIP_SMALL_FILES_KEY = "lzo.indexing.skip-small-files.enabled"; + private static final boolean LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT = false; + private static final String LZO_INDEXING_SMALL_FILE_SIZE_KEY = "lzo.indexing.skip-small-files.size"; + private static final long LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT = 0; + private static final String LZO_INDEXING_RECURSIVE_KEY = "lzo.indexing.recursive.enabled"; + private static final boolean LZO_INDEXING_RECURSIVE_DEFAULT = true; private static final String TEMP_FILE_EXTENSION = "/_temporary"; + private boolean lzoSkipIndexingSmallFiles = LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT; + private boolean lzoRecursiveIndexing = LZO_INDEXING_RECURSIVE_DEFAULT; + private long lzoSmallFileSize = LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT; + private Configuration conf = getConf(); /** @@ -61,16 +61,16 @@ private boolean isSmallFile(FileStatus status) { return status.getLen() < lzoSmallFileSize; } - private void visitPath(Path path, PathFilter pathFilter, List accumulator, boolean recursive) { + private void visitPath(Path path, PathFilter pathFilter, List accumulator) { try { - FileSystem fs = path.getFileSystem(this.conf); + FileSystem fs = path.getFileSystem(conf); FileStatus fileStatus = fs.getFileStatus(path); if (fileStatus.isDirectory()) { - if (recursive) { + if (lzoRecursiveIndexing) { FileStatus[] children = fs.listStatus(path, pathFilter); for (FileStatus childStatus : children) { - visitPath(childStatus.getPath(), pathFilter, accumulator, recursive); + visitPath(childStatus.getPath(), pathFilter, accumulator); } } else { LOG.info("[SKIP] Path " + path + " is a directory and recursion is not enabled."); @@ -86,7 +86,7 @@ private void visitPath(Path path, PathFilter pathFilter, List accumulator, private boolean shouldIndexPath(FileStatus fileStatus, FileSystem fs) throws IOException { Path path = fileStatus.getPath(); if (path.toString().endsWith(LZO_EXTENSION)) { - if (this.lzoSkipIndexingSmallFiles && isSmallFile(fileStatus)) { + if (lzoSkipIndexingSmallFiles && isSmallFile(fileStatus)) { LOG.info("[SKIP] Skip indexing small files enabled and " + path + " is too small"); return false; } @@ -118,17 +118,18 @@ public int run(String[] args) throws Exception { return -1; } - this.lzoSkipIndexingSmallFiles = - this.conf.getBoolean(LZO_SKIP_INDEXING_SMALL_FILES_KEY, LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT); + lzoSkipIndexingSmallFiles = + conf.getBoolean(LZO_INDEXING_SKIP_SMALL_FILES_KEY, LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT); - this.lzoSmallFileSize = - this.conf.getLong(LZO_SMALL_FILE_SIZE_KEY, LZO_SMALL_FILE_SIZE_DEFAULT); + lzoSmallFileSize = + conf.getLong(LZO_INDEXING_SMALL_FILE_SIZE_KEY, LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT); // Find paths to index based on recursive/not - this.lzoRecursiveIndexing = this.conf.getBoolean(LZO_RECURSIVE_INDEXING_KEY, LZO_RECURSIVE_INDEXING_DEFAULT); + lzoRecursiveIndexing = + conf.getBoolean(LZO_INDEXING_RECURSIVE_KEY, LZO_INDEXING_RECURSIVE_DEFAULT); List inputPaths = new ArrayList(); for (String strPath : args) { - visitPath(new Path(strPath), nonTemporaryFilter, inputPaths, this.lzoRecursiveIndexing); + visitPath(new Path(strPath), nonTemporaryFilter, inputPaths); } if (inputPaths.isEmpty()) { @@ -137,7 +138,7 @@ public int run(String[] args) throws Exception { return 0; } - Job job = new Job(this.conf); + Job job = new Job(conf); job.setJobName("Distributed Lzo Indexer " + Arrays.toString(args)); job.setOutputKeyClass(Path.class); @@ -191,9 +192,9 @@ public void printUsage() { String usage = "Command: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.DistributedLzoIndexer [file2.lzo directory3 ...]" + "\nConfiguration options: \"key\" [values] description" + - "\n" + LZO_SKIP_INDEXING_SMALL_FILES_KEY + " [true,false] <" + LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT + "> When indexing, skip files smaller than " + LZO_SMALL_FILE_SIZE_KEY + " bytes." + - "\n" + LZO_SMALL_FILE_SIZE_KEY + " [long] <" +LZO_SMALL_FILE_SIZE_DEFAULT + "> When indexing, skip files smaller than this number of bytes if " + LZO_SKIP_INDEXING_SMALL_FILES_KEY + " is true." + - "\n" + LZO_RECURSIVE_INDEXING_KEY + " [true,false] <" + LZO_RECURSIVE_INDEXING_DEFAULT + "> Look for files to index recursively from paths on command line."; + "\n" + LZO_INDEXING_SKIP_SMALL_FILES_KEY + " [true,false] <" + LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT + "> When indexing, skip files smaller than " + LZO_INDEXING_SMALL_FILE_SIZE_KEY + " bytes." + + "\n" + LZO_INDEXING_SMALL_FILE_SIZE_KEY + " [long] <" + LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT + "> When indexing, skip files smaller than this number of bytes if " + LZO_INDEXING_SKIP_SMALL_FILES_KEY + " is true." + + "\n" + LZO_INDEXING_RECURSIVE_KEY + " [true,false] <" + LZO_INDEXING_RECURSIVE_DEFAULT + "> Look for files to index recursively from paths on command line."; System.err.println(usage); } } From b0fef1d83453a4af462840bd8155f49541a3cba0 Mon Sep 17 00:00:00 2001 From: gsteelman Date: Tue, 15 Dec 2015 11:45:20 -0800 Subject: [PATCH 07/10] Make recursive behavior mimic Hadoop recursive behavior. Use path.getName instead of path.getString for extension filtering. Add comments. Reduce number of Path.getFileSystem and getFileStatus. --- .../lzo/DistributedLzoIndexer.java | 39 +++++++++++++------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java index c501d80e..490caa42 100644 --- a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java +++ b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java @@ -50,7 +50,7 @@ public class DistributedLzoIndexer extends Configured implements Tool { private final PathFilter nonTemporaryFilter = new PathFilter() { @Override public boolean accept(Path path) { - return !path.toString().endsWith(TEMP_FILE_EXTENSION); + return !path.getName().endsWith(TEMP_FILE_EXTENSION); } }; @@ -60,17 +60,31 @@ public boolean accept(Path path) { private boolean isSmallFile(FileStatus status) { return status.getLen() < lzoSmallFileSize; } - + private void visitPath(Path path, PathFilter pathFilter, List accumulator) { try { FileSystem fs = path.getFileSystem(conf); FileStatus fileStatus = fs.getFileStatus(path); + visitPathHelper(fileStatus, fs, pathFilter, accumulator, true); + } catch (IOException ioe) { + LOG.error("Error visiting root path: " + path, ioe); + } + } + // isInitialCall exists for this method to be consistent with Hadoop's defintion + // of "recursive" where if the root path to a job is a directory and + // and recursive = false, it still uses the files in that directory but does not + // recurse into children directories. The initial call is from visitPath + // with isInitialCall = true to mimic this behavior. Afterwards the recursive + // case sets isInitialCall = false. + private void visitPathHelper(FileStatus fileStatus, FileSystem fs, PathFilter pathFilter, List accumulator, boolean isInitialCall) { + try { + Path path = fileStatus.getPath(); if (fileStatus.isDirectory()) { - if (lzoRecursiveIndexing) { + if (lzoRecursiveIndexing || isInitialCall) { FileStatus[] children = fs.listStatus(path, pathFilter); for (FileStatus childStatus : children) { - visitPath(childStatus.getPath(), pathFilter, accumulator); + visitPathHelper(childStatus, fs, pathFilter, accumulator, false); } } else { LOG.info("[SKIP] Path " + path + " is a directory and recursion is not enabled."); @@ -79,32 +93,33 @@ private void visitPath(Path path, PathFilter pathFilter, List accumulator) accumulator.add(path); } } catch (IOException ioe) { - LOG.warn("Error walking path: " + path, ioe); + LOG.warn("Error visiting path: " + fileStatus.getPath(), ioe); } } private boolean shouldIndexPath(FileStatus fileStatus, FileSystem fs) throws IOException { Path path = fileStatus.getPath(); - if (path.toString().endsWith(LZO_EXTENSION)) { + if (path.getName().endsWith(LZO_EXTENSION)) { if (lzoSkipIndexingSmallFiles && isSmallFile(fileStatus)) { LOG.info("[SKIP] Skip indexing small files enabled and " + path + " is too small"); return false; } - Path lzoIndexPath = new Path(path, LzoIndex.LZO_INDEX_SUFFIX); + Path lzoIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX); if (fs.exists(lzoIndexPath)) { // If the index exists and is of nonzero size, we're already done. // We re-index a file with a zero-length index, because every file has at least one block. - if (fileStatus.getLen() > 0) { + FileStatus indexFileStatus = fs.getFileStatus(lzoIndexPath); + if (indexFileStatus.getLen() > 0) { LOG.info("[SKIP] LZO index file already exists for " + path); return false; } else { - LOG.info("Adding LZO file " + path + " to indexing list (index file exists but is zero length)"); + LOG.info("[ADD] LZO file " + path + " to indexing list (index file exists but is zero length)"); return true; } } else { // If no index exists, we need to index the file. - LOG.info("Adding LZO file " + path + " to indexing list (no index currently exists)"); + LOG.info("[ADD] LZO file " + path + " to indexing list (no index currently exists)"); return true; } } @@ -115,7 +130,7 @@ public int run(String[] args) throws Exception { if (args.length == 0 || (args.length == 1 && "--help".equals(args[0]))) { printUsage(); ToolRunner.printGenericCommandUsage(System.err); - return -1; + return -1; // error } lzoSkipIndexingSmallFiles = @@ -194,7 +209,7 @@ public void printUsage() { "\nConfiguration options: \"key\" [values] description" + "\n" + LZO_INDEXING_SKIP_SMALL_FILES_KEY + " [true,false] <" + LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT + "> When indexing, skip files smaller than " + LZO_INDEXING_SMALL_FILE_SIZE_KEY + " bytes." + "\n" + LZO_INDEXING_SMALL_FILE_SIZE_KEY + " [long] <" + LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT + "> When indexing, skip files smaller than this number of bytes if " + LZO_INDEXING_SKIP_SMALL_FILES_KEY + " is true." + - "\n" + LZO_INDEXING_RECURSIVE_KEY + " [true,false] <" + LZO_INDEXING_RECURSIVE_DEFAULT + "> Look for files to index recursively from paths on command line."; + "\n" + LZO_INDEXING_RECURSIVE_KEY + " [true,false] <" + LZO_INDEXING_RECURSIVE_DEFAULT + "> When indexing, recurse into child directories of input paths."; System.err.println(usage); } } From f76ef078f6d310040ae3d627440fe2a2a8ff85b8 Mon Sep 17 00:00:00 2001 From: gsteelman Date: Wed, 16 Dec 2015 10:29:11 -0800 Subject: [PATCH 08/10] Change many methods and constants to public for testing and for external access. Refactor configuration/job setup. Add unit tests. Remove unused variable in TestLzoRandData. --- .../lzo/DistributedLzoIndexer.java | 94 +++++---- .../lzo/TestDistributedLzoIndexer.java | 180 ++++++++++++++++++ .../compression/lzo/TestLzoRandData.java | 3 - 3 files changed, 239 insertions(+), 38 deletions(-) create mode 100644 src/test/java/com/hadoop/compression/lzo/TestDistributedLzoIndexer.java diff --git a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java index 490caa42..1d0d61fe 100644 --- a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java +++ b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java @@ -30,18 +30,22 @@ public class DistributedLzoIndexer extends Configured implements Tool { private static final String LZO_EXTENSION = new LzopCodec().getDefaultExtension(); - private static final String LZO_INDEXING_SKIP_SMALL_FILES_KEY = "lzo.indexing.skip-small-files.enabled"; - private static final boolean LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT = false; - private static final String LZO_INDEXING_SMALL_FILE_SIZE_KEY = "lzo.indexing.skip-small-files.size"; - private static final long LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT = 0; - private static final String LZO_INDEXING_RECURSIVE_KEY = "lzo.indexing.recursive.enabled"; - private static final boolean LZO_INDEXING_RECURSIVE_DEFAULT = true; + public static final String LZO_INDEXING_SKIP_SMALL_FILES_KEY = "lzo.indexing.skip-small-files.enabled"; + public static final boolean LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT = false; + public static final String LZO_INDEXING_SMALL_FILE_SIZE_KEY = "lzo.indexing.skip-small-files.size"; + public static final long LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT = 0; + public static final String LZO_INDEXING_RECURSIVE_KEY = "lzo.indexing.recursive.enabled"; + public static final boolean LZO_INDEXING_RECURSIVE_DEFAULT = true; private static final String TEMP_FILE_EXTENSION = "/_temporary"; private boolean lzoSkipIndexingSmallFiles = LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT; private boolean lzoRecursiveIndexing = LZO_INDEXING_RECURSIVE_DEFAULT; private long lzoSmallFileSize = LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT; + public boolean getLzoSkipIndexingSmallFiles() { return lzoSkipIndexingSmallFiles; } + public boolean getLzoRecursiveIndexing() { return lzoRecursiveIndexing; } + public long getLzoSmallFileSize() { return lzoSmallFileSize; } + private Configuration conf = getConf(); /** @@ -57,10 +61,18 @@ public boolean accept(Path path) { /** * Returns whether a file should be considered small enough to skip indexing. */ - private boolean isSmallFile(FileStatus status) { - return status.getLen() < lzoSmallFileSize; + public boolean isSmallFile(FileStatus status) { + return status.getLen() <= lzoSmallFileSize; } + /** + * Adds into accumulator paths under path which pass pathFilter and which + * shouldIndexFile returns true. + * @param path The root path to check under. + * @param pathFilter The filter to apply for all paths. + * @param accumulator The list to accumulate paths to process in. The state + * of this list is changed in this call. + */ private void visitPath(Path path, PathFilter pathFilter, List accumulator) { try { FileSystem fs = path.getFileSystem(conf); @@ -71,7 +83,7 @@ private void visitPath(Path path, PathFilter pathFilter, List accumulator) } } - // isInitialCall exists for this method to be consistent with Hadoop's defintion + // isInitialCall exists for this method to be consistent with Hadoop's definition // of "recursive" where if the root path to a job is a directory and // and recursive = false, it still uses the files in that directory but does not // recurse into children directories. The initial call is from visitPath @@ -89,7 +101,7 @@ private void visitPathHelper(FileStatus fileStatus, FileSystem fs, PathFilter pa } else { LOG.info("[SKIP] Path " + path + " is a directory and recursion is not enabled."); } - } else if (shouldIndexPath(fileStatus, fs)) { + } else if (shouldIndexFile(fileStatus, fs)) { accumulator.add(path); } } catch (IOException ioe) { @@ -97,7 +109,7 @@ private void visitPathHelper(FileStatus fileStatus, FileSystem fs, PathFilter pa } } - private boolean shouldIndexPath(FileStatus fileStatus, FileSystem fs) throws IOException { + public boolean shouldIndexFile(FileStatus fileStatus, FileSystem fs) throws IOException { Path path = fileStatus.getPath(); if (path.getName().endsWith(LZO_EXTENSION)) { if (lzoSkipIndexingSmallFiles && isSmallFile(fileStatus)) { @@ -117,42 +129,30 @@ private boolean shouldIndexPath(FileStatus fileStatus, FileSystem fs) throws IOE LOG.info("[ADD] LZO file " + path + " to indexing list (index file exists but is zero length)"); return true; } - } else { - // If no index exists, we need to index the file. - LOG.info("[ADD] LZO file " + path + " to indexing list (no index currently exists)"); - return true; } + + // If no index exists, we need to index the file. + LOG.info("[ADD] LZO file " + path + " to indexing list (no index currently exists)"); + return true; } + + // If not an LZO file, skip the file. + LOG.info("[SKIP] Not an LZO file: " + path); return false; } - public int run(String[] args) throws Exception { - if (args.length == 0 || (args.length == 1 && "--help".equals(args[0]))) { - printUsage(); - ToolRunner.printGenericCommandUsage(System.err); - return -1; // error - } - + public void configure(Configuration conf) { lzoSkipIndexingSmallFiles = conf.getBoolean(LZO_INDEXING_SKIP_SMALL_FILES_KEY, LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT); lzoSmallFileSize = conf.getLong(LZO_INDEXING_SMALL_FILE_SIZE_KEY, LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT); - // Find paths to index based on recursive/not lzoRecursiveIndexing = conf.getBoolean(LZO_INDEXING_RECURSIVE_KEY, LZO_INDEXING_RECURSIVE_DEFAULT); - List inputPaths = new ArrayList(); - for (String strPath : args) { - visitPath(new Path(strPath), nonTemporaryFilter, inputPaths); - } - - if (inputPaths.isEmpty()) { - LOG.info("No input paths found - perhaps all " + - ".lzo files have already been indexed."); - return 0; - } + } + private Job createJob(Configuration conf, String[] args) throws IOException { Job job = new Job(conf); job.setJobName("Distributed Lzo Indexer " + Arrays.toString(args)); @@ -162,13 +162,37 @@ public int run(String[] args) throws Exception { // The LzoIndexOutputFormat doesn't currently work with speculative execution. // Patches welcome. job.getConfiguration().setBoolean( - "mapred.map.tasks.speculative.execution", false); + "mapred.map.tasks.speculative.execution", false); job.setJarByClass(DistributedLzoIndexer.class); job.setInputFormatClass(LzoSplitInputFormat.class); job.setOutputFormatClass(LzoIndexOutputFormat.class); job.setNumReduceTasks(0); job.setMapperClass(Mapper.class); + return job; + } + + public int run(String[] args) throws Exception { + if (args.length == 0 || (args.length == 1 && "--help".equals(args[0]))) { + printUsage(); + ToolRunner.printGenericCommandUsage(System.err); + return -1; // error + } + + configure(conf); + + List inputPaths = new ArrayList(); + for (String strPath : args) { + visitPath(new Path(strPath), nonTemporaryFilter, inputPaths); + } + + if (inputPaths.isEmpty()) { + LOG.info("No input paths found - perhaps all " + + ".lzo files have already been indexed."); + return 0; + } + + Job job = createJob(conf, args); for (Path p : inputPaths) { FileInputFormat.addInputPath(job, p); @@ -210,6 +234,6 @@ public void printUsage() { "\n" + LZO_INDEXING_SKIP_SMALL_FILES_KEY + " [true,false] <" + LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT + "> When indexing, skip files smaller than " + LZO_INDEXING_SMALL_FILE_SIZE_KEY + " bytes." + "\n" + LZO_INDEXING_SMALL_FILE_SIZE_KEY + " [long] <" + LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT + "> When indexing, skip files smaller than this number of bytes if " + LZO_INDEXING_SKIP_SMALL_FILES_KEY + " is true." + "\n" + LZO_INDEXING_RECURSIVE_KEY + " [true,false] <" + LZO_INDEXING_RECURSIVE_DEFAULT + "> When indexing, recurse into child directories of input paths."; - System.err.println(usage); + LOG.error(usage); } } diff --git a/src/test/java/com/hadoop/compression/lzo/TestDistributedLzoIndexer.java b/src/test/java/com/hadoop/compression/lzo/TestDistributedLzoIndexer.java new file mode 100644 index 00000000..37da379f --- /dev/null +++ b/src/test/java/com/hadoop/compression/lzo/TestDistributedLzoIndexer.java @@ -0,0 +1,180 @@ +package com.hadoop.compression.lzo; + +import junit.framework.TestCase; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class TestDistributedLzoIndexer extends TestCase { + @Override + protected void setUp() throws Exception { + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + } + + public void testConfigureDefaults() { + Configuration conf = new Configuration(); + DistributedLzoIndexer indexer = new DistributedLzoIndexer(); + indexer.configure(conf); + assertEquals(DistributedLzoIndexer.LZO_INDEXING_RECURSIVE_DEFAULT, indexer.getLzoRecursiveIndexing()); + assertEquals(DistributedLzoIndexer.LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT, indexer.getLzoSkipIndexingSmallFiles()); + assertEquals(DistributedLzoIndexer.LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT, indexer.getLzoSmallFileSize()); + } + + public void testConfigureSettings() { + Configuration conf = new Configuration(); + conf.setBoolean(DistributedLzoIndexer.LZO_INDEXING_RECURSIVE_KEY, false); + conf.setBoolean(DistributedLzoIndexer.LZO_INDEXING_SKIP_SMALL_FILES_KEY, true); + conf.setLong(DistributedLzoIndexer.LZO_INDEXING_SMALL_FILE_SIZE_KEY, 5 * 1024L); + DistributedLzoIndexer indexer = new DistributedLzoIndexer(); + indexer.configure(conf); + assertEquals(false, indexer.getLzoRecursiveIndexing()); + assertEquals(true, indexer.getLzoSkipIndexingSmallFiles()); + assertEquals(5 * 1024L, indexer.getLzoSmallFileSize()); + } + + protected void doTestIsSmallFile(long fileSize, long smallThreshold, boolean expectedResult) { + Configuration conf = new Configuration(); + conf.setLong(DistributedLzoIndexer.LZO_INDEXING_SMALL_FILE_SIZE_KEY, smallThreshold); + DistributedLzoIndexer indexer = new DistributedLzoIndexer(); + indexer.configure(conf); + FileStatus status = new FileStatus(fileSize, false, 3, 512L, 100L, new Path("/tmp/my/file")); + + assertEquals(expectedResult, indexer.isSmallFile(status)); + } + + public void testIsSmallFileSmaller() throws Exception { + doTestIsSmallFile(500L, 1000L, true); + } + + public void testIsSmallFileEquals() throws Exception { + doTestIsSmallFile(500L, 500L, true); + } + + public void testIsSmallFileGreater() throws Exception { + doTestIsSmallFile(500L, 200L, false); + } + + public void testShouldIndexFileNotLzoFile() throws Exception { +// File testDataDir = new File(System.getProperty("test.build.data"), "TestDistributedLzoIndexer"); +// System.out.println("testDataDir = " + testDataDir); +// File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo", testDataDir); + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + DistributedLzoIndexer indexer = new DistributedLzoIndexer(); + indexer.configure(conf); + + File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".foo"); + System.out.println("tempFile = " + tempFile); + FileStatus status = new FileStatus(5L, false, 3, 512L, 100L, new Path(tempFile.getAbsolutePath())); + System.out.println("status = " + status); + + assertEquals(false, indexer.shouldIndexFile(status, fs)); + } + + public void testShouldIndexFileSkipSmallFiles() throws Exception { +// File testDataDir = new File(System.getProperty("test.build.data"), "TestDistributedLzoIndexer"); +// System.out.println("testDataDir = " + testDataDir); +// File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo", testDataDir); + Configuration conf = new Configuration(); + conf.setBoolean(DistributedLzoIndexer.LZO_INDEXING_SKIP_SMALL_FILES_KEY, true); + conf.setLong(DistributedLzoIndexer.LZO_INDEXING_SMALL_FILE_SIZE_KEY, 100L); + FileSystem fs = FileSystem.getLocal(conf); + DistributedLzoIndexer indexer = new DistributedLzoIndexer(); + indexer.configure(conf); + + File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo"); + System.out.println("tempFile = " + tempFile); + FileStatus status = new FileStatus(50L, false, 3, 512L, 100L, new Path(tempFile.getAbsolutePath())); + System.out.println("status = " + status); + + assertEquals(false, indexer.shouldIndexFile(status, fs)); + } + + public void testShouldIndexFileIndexNonexistent() throws Exception { +// File testDataDir = new File(System.getProperty("test.build.data"), "TestDistributedLzoIndexer"); +// System.out.println("testDataDir = " + testDataDir); +// File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo", testDataDir); + Configuration conf = new Configuration(); + conf.setBoolean(DistributedLzoIndexer.LZO_INDEXING_SKIP_SMALL_FILES_KEY, true); + conf.setLong(DistributedLzoIndexer.LZO_INDEXING_SMALL_FILE_SIZE_KEY, 100L); + FileSystem fs = FileSystem.getLocal(conf); + DistributedLzoIndexer indexer = new DistributedLzoIndexer(); + indexer.configure(conf); + + File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo"); + System.out.println("tempFile = " + tempFile); + FileStatus status = new FileStatus(200L, false, 3, 512L, 100L, new Path(tempFile.getAbsolutePath())); + System.out.println("status = " + status); + + assertEquals(true, indexer.shouldIndexFile(status, fs)); + } + + public void testShouldIndexFileEmptyIndexExists() throws Exception { +// File testDataDir = new File(System.getProperty("test.build.data"), "TestDistributedLzoIndexer"); +// System.out.println("testDataDir = " + testDataDir); +// File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo", testDataDir); + Configuration conf = new Configuration(); + conf.setBoolean(DistributedLzoIndexer.LZO_INDEXING_SKIP_SMALL_FILES_KEY, true); + conf.setLong(DistributedLzoIndexer.LZO_INDEXING_SMALL_FILE_SIZE_KEY, 100L); + FileSystem fs = FileSystem.getLocal(conf); + DistributedLzoIndexer indexer = new DistributedLzoIndexer(); + indexer.configure(conf); + + File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo"); + System.out.println("tempFile = " + tempFile); + FileStatus status = new FileStatus(200L, false, 3, 512L, 100L, new Path(tempFile.getAbsolutePath())); + System.out.println("status = " + status); + + String tempFileIndexPath = tempFile.getAbsolutePath() + LzoIndex.LZO_INDEX_SUFFIX; + File tempFileIndex = new File(tempFileIndexPath); + System.out.println("tempFileIndex = " + tempFileIndex); + if (!tempFileIndex.createNewFile()) { + throw new IOException("Could not create temp file for testing " + tempFileIndex); + } + + assertEquals(true, indexer.shouldIndexFile(status, fs)); + } + + public void testShouldIndexFileIndexExists() throws Exception { +// File testDataDir = new File(System.getProperty("test.build.data"), "TestDistributedLzoIndexer"); +// System.out.println("testDataDir = " + testDataDir); +// File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo", testDataDir); + Configuration conf = new Configuration(); + conf.setBoolean(DistributedLzoIndexer.LZO_INDEXING_SKIP_SMALL_FILES_KEY, true); + conf.setLong(DistributedLzoIndexer.LZO_INDEXING_SMALL_FILE_SIZE_KEY, 100L); + FileSystem fs = FileSystem.getLocal(conf); + DistributedLzoIndexer indexer = new DistributedLzoIndexer(); + indexer.configure(conf); + + File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo"); + System.out.println("tempFile = " + tempFile); + FileStatus status = new FileStatus(200L, false, 3, 512L, 100L, new Path(tempFile.getAbsolutePath())); + System.out.println("status = " + status); + + String tempFileIndexPath = tempFile.getAbsolutePath() + LzoIndex.LZO_INDEX_SUFFIX; + File tempFileIndex = new File(tempFileIndexPath); + System.out.println("tempFileIndex = " + tempFileIndex); +// if (!tempFileIndex.createNewFile()) { +// throw new IOException("Could not create temp file for testing " + tempFileIndex); +// } + OutputStream fos = new FileOutputStream(tempFileIndex); + fos.write(1); + fos.close(); + + assertEquals(false, indexer.shouldIndexFile(status, fs)); + } +} diff --git a/src/test/java/com/hadoop/compression/lzo/TestLzoRandData.java b/src/test/java/com/hadoop/compression/lzo/TestLzoRandData.java index 025d2cd2..f8576328 100644 --- a/src/test/java/com/hadoop/compression/lzo/TestLzoRandData.java +++ b/src/test/java/com/hadoop/compression/lzo/TestLzoRandData.java @@ -14,15 +14,12 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.util.ReflectionUtils; -import com.hadoop.compression.lzo.LzopCodec; - /** * Unit Test for LZO with random data. */ public class TestLzoRandData extends TestCase { Configuration conf; - CompressionCodec codec; @Override protected void setUp() throws Exception { From 00fc2bf8354bde22c95928187c2f27717c948ebc Mon Sep 17 00:00:00 2001 From: gsteelman Date: Thu, 17 Dec 2015 13:41:38 -0800 Subject: [PATCH 09/10] Comment cleanup in tests, remove useless setUp and tearDown methods. Add javadoc to DistributedLzoIndexer. --- .../lzo/DistributedLzoIndexer.java | 42 +++++++++++++----- .../lzo/TestDistributedLzoIndexer.java | 43 +------------------ 2 files changed, 32 insertions(+), 53 deletions(-) diff --git a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java index 1d0d61fe..887d071b 100644 --- a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java +++ b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java @@ -66,8 +66,7 @@ public boolean isSmallFile(FileStatus status) { } /** - * Adds into accumulator paths under path which pass pathFilter and which - * shouldIndexFile returns true. + * Adds into accumulator paths under path which this indexer should index. * @param path The root path to check under. * @param pathFilter The filter to apply for all paths. * @param accumulator The list to accumulate paths to process in. The state @@ -83,12 +82,12 @@ private void visitPath(Path path, PathFilter pathFilter, List accumulator) } } - // isInitialCall exists for this method to be consistent with Hadoop's definition - // of "recursive" where if the root path to a job is a directory and - // and recursive = false, it still uses the files in that directory but does not - // recurse into children directories. The initial call is from visitPath - // with isInitialCall = true to mimic this behavior. Afterwards the recursive - // case sets isInitialCall = false. + /* isInitialCall exists for this method to be consistent with Hadoop's definition + of "recursive": if the root path to a job is a directory and + and "recursive = false", it still uses the files in that directory but does not + recurse into children directories. The initial call is from visitPath + with isInitialCall = true to mimic this behavior. Afterwards the recursive + case sets isInitialCall = false. */ private void visitPathHelper(FileStatus fileStatus, FileSystem fs, PathFilter pathFilter, List accumulator, boolean isInitialCall) { try { Path path = fileStatus.getPath(); @@ -109,6 +108,15 @@ private void visitPathHelper(FileStatus fileStatus, FileSystem fs, PathFilter pa } } + /** + * Determine based on previous configuration of this indexer whether a file + * represented by fileStatus on the given FileSystem should be indexed or not. + * @param fileStatus The file to consider for indexing. + * @param fs The FileSystem on which the file resides. + * @return true if this indexer is configured to consider fileStatus indexable + * false if this indexer doesn't consider fileStatus indexable. + * @throws IOException + */ public boolean shouldIndexFile(FileStatus fileStatus, FileSystem fs) throws IOException { Path path = fileStatus.getPath(); if (path.getName().endsWith(LZO_EXTENSION)) { @@ -141,6 +149,10 @@ public boolean shouldIndexFile(FileStatus fileStatus, FileSystem fs) throws IOEx return false; } + /** + * Configures this indexer from the values set in conf. + * @param conf The Configuration to read values from. + */ public void configure(Configuration conf) { lzoSkipIndexingSmallFiles = conf.getBoolean(LZO_INDEXING_SKIP_SMALL_FILES_KEY, LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT); @@ -152,9 +164,16 @@ public void configure(Configuration conf) { conf.getBoolean(LZO_INDEXING_RECURSIVE_KEY, LZO_INDEXING_RECURSIVE_DEFAULT); } - private Job createJob(Configuration conf, String[] args) throws IOException { + /** + * Creates a Job from the given Configuration and commandline arguments. + * @param conf The base Configuration to use for the job. + * @param name The name to give the Job. Appened to "Distributed Lzo Indexer". + * @return The configured Job object. + * @throws IOException + */ + private Job createJob(Configuration conf, String name) throws IOException { Job job = new Job(conf); - job.setJobName("Distributed Lzo Indexer " + Arrays.toString(args)); + job.setJobName("Distributed Lzo Indexer " + name); job.setOutputKeyClass(Path.class); job.setOutputValueClass(LongWritable.class); @@ -192,7 +211,8 @@ public int run(String[] args) throws Exception { return 0; } - Job job = createJob(conf, args); + String jobName = Arrays.toString(args); + Job job = createJob(conf, jobName); for (Path p : inputPaths) { FileInputFormat.addInputPath(job, p); diff --git a/src/test/java/com/hadoop/compression/lzo/TestDistributedLzoIndexer.java b/src/test/java/com/hadoop/compression/lzo/TestDistributedLzoIndexer.java index 37da379f..07a54d89 100644 --- a/src/test/java/com/hadoop/compression/lzo/TestDistributedLzoIndexer.java +++ b/src/test/java/com/hadoop/compression/lzo/TestDistributedLzoIndexer.java @@ -3,10 +3,8 @@ import junit.framework.TestCase; import java.io.File; -import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import org.apache.hadoop.conf.Configuration; @@ -15,16 +13,6 @@ import org.apache.hadoop.fs.Path; public class TestDistributedLzoIndexer extends TestCase { - @Override - protected void setUp() throws Exception { - super.setUp(); - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - } - public void testConfigureDefaults() { Configuration conf = new Configuration(); DistributedLzoIndexer indexer = new DistributedLzoIndexer(); @@ -69,26 +57,18 @@ public void testIsSmallFileGreater() throws Exception { } public void testShouldIndexFileNotLzoFile() throws Exception { -// File testDataDir = new File(System.getProperty("test.build.data"), "TestDistributedLzoIndexer"); -// System.out.println("testDataDir = " + testDataDir); -// File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo", testDataDir); Configuration conf = new Configuration(); FileSystem fs = FileSystem.getLocal(conf); DistributedLzoIndexer indexer = new DistributedLzoIndexer(); indexer.configure(conf); File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".foo"); - System.out.println("tempFile = " + tempFile); FileStatus status = new FileStatus(5L, false, 3, 512L, 100L, new Path(tempFile.getAbsolutePath())); - System.out.println("status = " + status); assertEquals(false, indexer.shouldIndexFile(status, fs)); } public void testShouldIndexFileSkipSmallFiles() throws Exception { -// File testDataDir = new File(System.getProperty("test.build.data"), "TestDistributedLzoIndexer"); -// System.out.println("testDataDir = " + testDataDir); -// File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo", testDataDir); Configuration conf = new Configuration(); conf.setBoolean(DistributedLzoIndexer.LZO_INDEXING_SKIP_SMALL_FILES_KEY, true); conf.setLong(DistributedLzoIndexer.LZO_INDEXING_SMALL_FILE_SIZE_KEY, 100L); @@ -97,17 +77,12 @@ public void testShouldIndexFileSkipSmallFiles() throws Exception { indexer.configure(conf); File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo"); - System.out.println("tempFile = " + tempFile); FileStatus status = new FileStatus(50L, false, 3, 512L, 100L, new Path(tempFile.getAbsolutePath())); - System.out.println("status = " + status); assertEquals(false, indexer.shouldIndexFile(status, fs)); } public void testShouldIndexFileIndexNonexistent() throws Exception { -// File testDataDir = new File(System.getProperty("test.build.data"), "TestDistributedLzoIndexer"); -// System.out.println("testDataDir = " + testDataDir); -// File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo", testDataDir); Configuration conf = new Configuration(); conf.setBoolean(DistributedLzoIndexer.LZO_INDEXING_SKIP_SMALL_FILES_KEY, true); conf.setLong(DistributedLzoIndexer.LZO_INDEXING_SMALL_FILE_SIZE_KEY, 100L); @@ -116,17 +91,12 @@ public void testShouldIndexFileIndexNonexistent() throws Exception { indexer.configure(conf); File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo"); - System.out.println("tempFile = " + tempFile); FileStatus status = new FileStatus(200L, false, 3, 512L, 100L, new Path(tempFile.getAbsolutePath())); - System.out.println("status = " + status); assertEquals(true, indexer.shouldIndexFile(status, fs)); } public void testShouldIndexFileEmptyIndexExists() throws Exception { -// File testDataDir = new File(System.getProperty("test.build.data"), "TestDistributedLzoIndexer"); -// System.out.println("testDataDir = " + testDataDir); -// File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo", testDataDir); Configuration conf = new Configuration(); conf.setBoolean(DistributedLzoIndexer.LZO_INDEXING_SKIP_SMALL_FILES_KEY, true); conf.setLong(DistributedLzoIndexer.LZO_INDEXING_SMALL_FILE_SIZE_KEY, 100L); @@ -135,13 +105,10 @@ public void testShouldIndexFileEmptyIndexExists() throws Exception { indexer.configure(conf); File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo"); - System.out.println("tempFile = " + tempFile); FileStatus status = new FileStatus(200L, false, 3, 512L, 100L, new Path(tempFile.getAbsolutePath())); - System.out.println("status = " + status); String tempFileIndexPath = tempFile.getAbsolutePath() + LzoIndex.LZO_INDEX_SUFFIX; File tempFileIndex = new File(tempFileIndexPath); - System.out.println("tempFileIndex = " + tempFileIndex); if (!tempFileIndex.createNewFile()) { throw new IOException("Could not create temp file for testing " + tempFileIndex); } @@ -150,9 +117,6 @@ public void testShouldIndexFileEmptyIndexExists() throws Exception { } public void testShouldIndexFileIndexExists() throws Exception { -// File testDataDir = new File(System.getProperty("test.build.data"), "TestDistributedLzoIndexer"); -// System.out.println("testDataDir = " + testDataDir); -// File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo", testDataDir); Configuration conf = new Configuration(); conf.setBoolean(DistributedLzoIndexer.LZO_INDEXING_SKIP_SMALL_FILES_KEY, true); conf.setLong(DistributedLzoIndexer.LZO_INDEXING_SMALL_FILE_SIZE_KEY, 100L); @@ -161,16 +125,11 @@ public void testShouldIndexFileIndexExists() throws Exception { indexer.configure(conf); File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo"); - System.out.println("tempFile = " + tempFile); FileStatus status = new FileStatus(200L, false, 3, 512L, 100L, new Path(tempFile.getAbsolutePath())); - System.out.println("status = " + status); String tempFileIndexPath = tempFile.getAbsolutePath() + LzoIndex.LZO_INDEX_SUFFIX; File tempFileIndex = new File(tempFileIndexPath); - System.out.println("tempFileIndex = " + tempFileIndex); -// if (!tempFileIndex.createNewFile()) { -// throw new IOException("Could not create temp file for testing " + tempFileIndex); -// } + OutputStream fos = new FileOutputStream(tempFileIndex); fos.write(1); fos.close(); From c1968fa90b84700177bf16b76d8527f46ad5efb7 Mon Sep 17 00:00:00 2001 From: gsteelman Date: Thu, 17 Dec 2015 14:40:20 -0800 Subject: [PATCH 10/10] Rename TEMP_FILE_EXTENSION to TEMP_FILE_PATH and use toString for checking Path names. --- .../com/hadoop/compression/lzo/DistributedLzoIndexer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java index 887d071b..3b5f68cc 100644 --- a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java +++ b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java @@ -36,7 +36,7 @@ public class DistributedLzoIndexer extends Configured implements Tool { public static final long LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT = 0; public static final String LZO_INDEXING_RECURSIVE_KEY = "lzo.indexing.recursive.enabled"; public static final boolean LZO_INDEXING_RECURSIVE_DEFAULT = true; - private static final String TEMP_FILE_EXTENSION = "/_temporary"; + private static final String TEMP_FILE_NAME = "/_temporary"; private boolean lzoSkipIndexingSmallFiles = LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT; private boolean lzoRecursiveIndexing = LZO_INDEXING_RECURSIVE_DEFAULT; @@ -49,12 +49,12 @@ public class DistributedLzoIndexer extends Configured implements Tool { private Configuration conf = getConf(); /** - * Accepts paths which don't end in TEMP_FILE_EXTENSION + * Accepts paths which don't end in TEMP_FILE_NAME */ private final PathFilter nonTemporaryFilter = new PathFilter() { @Override public boolean accept(Path path) { - return !path.getName().endsWith(TEMP_FILE_EXTENSION); + return !path.toString().endsWith(TEMP_FILE_NAME); } };