diff --git a/madoop/__main__.py b/madoop/__main__.py index 5c545a4..4559d07 100644 --- a/madoop/__main__.py +++ b/madoop/__main__.py @@ -87,8 +87,7 @@ def __call__(self, parser, *args, **kwargs): src = madoop_dir/"example" dst = pathlib.Path("example") if dst.exists(): - print(f"Error: directory already exists: {dst}") - parser.exit(1) + parser.error(f"directory already exists: {dst}") shutil.copytree(src, dst) print(textwrap.dedent(f"""\ Created {dst}, try: diff --git a/madoop/mapreduce.py b/madoop/mapreduce.py index f1cbd42..b05b564 100644 --- a/madoop/mapreduce.py +++ b/madoop/mapreduce.py @@ -4,6 +4,7 @@ """ import contextlib +import collections import hashlib import logging import math @@ -84,15 +85,15 @@ def mapreduce(input_dir, output_dir, map_exe, reduce_exe): ) # Move files from temporary output dir to user-specified output dir - for filename in reduce_output_dir.glob("*"): + total_size = 0 + for filename in sorted(reduce_output_dir.glob("*")): + st_size = filename.stat().st_size + total_size += st_size shutil.copy(filename, output_dir) + output_path = output_dir.parent/last_two(filename) + LOGGER.debug("%s size=%sB", output_path, st_size) # Remind user where to find output - total_size = 0 - for outpath in sorted(output_dir.iterdir()): - st_size = outpath.stat().st_size - total_size += st_size - LOGGER.debug("%s size=%sB", outpath, st_size) LOGGER.debug("total output size=%sB", total_size) LOGGER.info("Output directory: %s", output_dir) @@ -220,37 +221,25 @@ def keyhash(key): return int(hexdigest, base=16) -def partition_keys(inpath, outpaths): - """Allocate lines of inpath among outpaths using hash of key.""" +def partition_keys(inpath, outpaths, input_keys_stats, output_keys_stats): + """Allocate lines of inpath among outpaths using hash of key. + + Update the data structures provided by the caller input_keys_stats and + output_keys_stats. Both map a filename to a set of of keys. + + """ assert len(outpaths) == MAX_NUM_REDUCE outparent = outpaths[0].parent assert all(i.parent == outparent for i in outpaths) - outnames = [i.name for i in outpaths] - LOGGER.debug( - "partition %s >> %s/{%s}", - last_two(inpath), outparent.name, ",".join(outnames), - ) with contextlib.ExitStack() as stack: outfiles = [stack.enter_context(p.open("a")) for p in outpaths] for line in stack.enter_context(inpath.open()): key = line.partition('\t')[0] + input_keys_stats[inpath].add(key) reducer_idx = keyhash(key) % MAX_NUM_REDUCE outfiles[reducer_idx].write(line) - - -def keyspace(path): - """Return the number of unique keys in {path}. - - WARNING: This is a terribly slow implementation. It would be faster to - record this information while grouping.x - - """ - keys = set() - with path.open() as infile: - for line in infile: - key = line.partition('\t')[0] - keys.add(key) - return keys + outpath = outpaths[reducer_idx] + output_keys_stats[outpath].add(key) def group_stage(input_dir, output_dir): @@ -260,22 +249,34 @@ def group_stage(input_dir, output_dir): using the hash and modulo of the key. """ - # Detailed keyspace debug output THIS IS SLOW - all_keys = set() - for inpath in sorted(input_dir.iterdir()): - keys = keyspace(inpath) - all_keys.update(keys) - LOGGER.debug("%s unique_keys=%s", last_two(inpath), len(keys)) - LOGGER.debug("%s all_unique_keys=%s", input_dir.name, len(all_keys)) - # Compute output filenames outpaths = [] for i in range(MAX_NUM_REDUCE): outpaths.append(output_dir/part_filename(i)) - # Parition input, appending to output files + # Track keyspace stats, map filename -> set of keys + input_keys_stats = collections.defaultdict(set) + output_keys_stats = collections.defaultdict(set) + + # Partition input, appending to output files for inpath in sorted(input_dir.iterdir()): - partition_keys(inpath, outpaths) + partition_keys(inpath, outpaths, input_keys_stats, output_keys_stats) + + # Log input keyspace stats + all_input_keys = set() + for inpath, keys in sorted(input_keys_stats.items()): + all_input_keys.update(keys) + LOGGER.debug("%s unique_keys=%s", last_two(inpath), len(keys)) + LOGGER.debug("%s all_unique_keys=%s", input_dir.name, len(all_input_keys)) + + # Log partition input and output filenames + outnames = [i.name for i in outpaths] + outparent = outpaths[0].parent + for inpath in sorted(input_keys_stats.keys()): + LOGGER.debug( + "partition %s >> %s/{%s}", + last_two(inpath), outparent.name, ",".join(outnames), + ) # Remove empty output files. We won't always use the maximum number of # reducers because some MapReduce programs have fewer intermediate keys. @@ -288,13 +289,13 @@ def group_stage(input_dir, output_dir): for path in sorted(output_dir.iterdir()): sort_file(path) - # Detailed keyspace debug output THIS IS SLOW - all_keys = set() - for outpath in sorted(output_dir.iterdir()): - keys = keyspace(outpath) - all_keys.update(keys) + # Log output keyspace stats + all_output_keys = set() + for outpath, keys in sorted(output_keys_stats.items()): + all_output_keys.update(keys) LOGGER.debug("%s unique_keys=%s", last_two(outpath), len(keys)) - LOGGER.debug("%s all_unique_keys=%s", output_dir.name, len(all_keys)) + LOGGER.debug("%s all_unique_keys=%s", output_dir.name, + len(all_output_keys)) def reduce_stage(exe, input_dir, output_dir): diff --git a/setup.py b/setup.py index 45083be..f2babc7 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ description="A light weight MapReduce framework for education.", long_description=LONG_DESCRIPTION, long_description_content_type="text/markdown", - version="0.2.0", + version="0.3.0", author="Andrew DeOrio", author_email="awdeorio@umich.edu", url="https://github.com/eecs485staff/madoop/", diff --git a/tests/test_cli.py b/tests/test_cli.py index 8365530..0d6c7f1 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -91,7 +91,12 @@ def test_hadoop_arguments(tmpdir): def test_example(tmpdir): """Example option should copy files.""" with tmpdir.as_cwd(): - subprocess.run(["madoop", "--example"], check=True) + subprocess.run( + ["madoop", "--example"], + check=True, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + ) assert (tmpdir/"example/input/input01.txt").exists() assert (tmpdir/"example/input/input02.txt").exists() assert (tmpdir/"example/map.py").exists() @@ -99,4 +104,9 @@ def test_example(tmpdir): # Call it again and it should refuse to clobber with tmpdir.as_cwd(), pytest.raises(subprocess.CalledProcessError): - subprocess.run(["madoop", "--example"], check=True) + subprocess.run( + ["madoop", "--example"], + check=True, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + ) diff --git a/tests/test_stages.py b/tests/test_stages.py index 1ea61eb..dad39f7 100644 --- a/tests/test_stages.py +++ b/tests/test_stages.py @@ -9,7 +9,7 @@ def test_map_stage(tmpdir): """Test the map stage using word count example.""" map_stage( exe=TESTDATA_DIR/"word_count/map.py", - input_dir=TESTDATA_DIR/"word_count/correct/input", + input_dir=TESTDATA_DIR/"word_count/input", output_dir=Path(tmpdir), ) utils.assert_dirs_eq( diff --git a/tests/testdata/word_count/correct/grouper-output/part-00000 b/tests/testdata/word_count/correct/grouper-output/part-00000 index 9701581..440a92d 100644 --- a/tests/testdata/word_count/correct/grouper-output/part-00000 +++ b/tests/testdata/word_count/correct/grouper-output/part-00000 @@ -1,6 +1 @@ -cool 1 -file 1 -file 1 -file 1 -file 1 -streaming 1 +Goodbye 1 diff --git a/tests/testdata/word_count/correct/grouper-output/part-00001 b/tests/testdata/word_count/correct/grouper-output/part-00001 deleted file mode 100644 index 67f0749..0000000 --- a/tests/testdata/word_count/correct/grouper-output/part-00001 +++ /dev/null @@ -1,6 +0,0 @@ -hadoop 1 -hadoop 1 -is 1 -reduce 1 -reduce 1 -reduce 1 diff --git a/tests/testdata/word_count/correct/grouper-output/part-00002 b/tests/testdata/word_count/correct/grouper-output/part-00002 index 8aa6fe6..9f59c7c 100644 --- a/tests/testdata/word_count/correct/grouper-output/part-00002 +++ b/tests/testdata/word_count/correct/grouper-output/part-00002 @@ -1,7 +1,5 @@ -google 1 -map 1 -map 1 -map 1 -map 1 -system 1 -system 1 +Bye 1 +Hadoop 1 +Hadoop 1 +World 1 +World 1 diff --git a/tests/testdata/word_count/correct/grouper-output/part-00003 b/tests/testdata/word_count/correct/grouper-output/part-00003 new file mode 100644 index 0000000..f7fef0c --- /dev/null +++ b/tests/testdata/word_count/correct/grouper-output/part-00003 @@ -0,0 +1,2 @@ +Hello 1 +Hello 1 diff --git a/tests/testdata/word_count/correct/input/part-00000 b/tests/testdata/word_count/correct/input/part-00000 deleted file mode 100644 index 4431d42..0000000 --- a/tests/testdata/word_count/correct/input/part-00000 +++ /dev/null @@ -1,2 +0,0 @@ -hadoop file system -google file system diff --git a/tests/testdata/word_count/correct/input/part-00001 b/tests/testdata/word_count/correct/input/part-00001 deleted file mode 100644 index f6a00be..0000000 --- a/tests/testdata/word_count/correct/input/part-00001 +++ /dev/null @@ -1,3 +0,0 @@ -hadoop map reduce file map -map streaming file reduce -map reduce is cool diff --git a/tests/testdata/word_count/correct/mapper-output/part-00000 b/tests/testdata/word_count/correct/mapper-output/part-00000 deleted file mode 100644 index 2546129..0000000 --- a/tests/testdata/word_count/correct/mapper-output/part-00000 +++ /dev/null @@ -1,6 +0,0 @@ -hadoop 1 -file 1 -system 1 -google 1 -file 1 -system 1 diff --git a/tests/testdata/word_count/correct/mapper-output/part-00001 b/tests/testdata/word_count/correct/mapper-output/part-00001 index 3203c61..ef7cc97 100644 --- a/tests/testdata/word_count/correct/mapper-output/part-00001 +++ b/tests/testdata/word_count/correct/mapper-output/part-00001 @@ -1,13 +1,4 @@ -hadoop 1 -map 1 -reduce 1 -file 1 -map 1 -map 1 -streaming 1 -file 1 -reduce 1 -map 1 -reduce 1 -is 1 -cool 1 +Hello 1 +World 1 +Bye 1 +World 1 diff --git a/tests/testdata/word_count/correct/mapper-output/part-00002 b/tests/testdata/word_count/correct/mapper-output/part-00002 new file mode 100644 index 0000000..589170c --- /dev/null +++ b/tests/testdata/word_count/correct/mapper-output/part-00002 @@ -0,0 +1,4 @@ +Hello 1 +Hadoop 1 +Goodbye 1 +Hadoop 1 diff --git a/tests/testdata/word_count/correct/output/part-00000 b/tests/testdata/word_count/correct/output/part-00000 index a5bd725..70db879 100644 --- a/tests/testdata/word_count/correct/output/part-00000 +++ b/tests/testdata/word_count/correct/output/part-00000 @@ -1,3 +1 @@ -cool 1 -file 4 -streaming 1 +Goodbye 1 diff --git a/tests/testdata/word_count/correct/output/part-00001 b/tests/testdata/word_count/correct/output/part-00001 index 5166d03..ecc21b4 100644 --- a/tests/testdata/word_count/correct/output/part-00001 +++ b/tests/testdata/word_count/correct/output/part-00001 @@ -1,3 +1,3 @@ -hadoop 2 -is 1 -reduce 3 +Bye 1 +Hadoop 2 +World 2 diff --git a/tests/testdata/word_count/correct/output/part-00002 b/tests/testdata/word_count/correct/output/part-00002 index 2ad7b99..30f4be7 100644 --- a/tests/testdata/word_count/correct/output/part-00002 +++ b/tests/testdata/word_count/correct/output/part-00002 @@ -1,3 +1 @@ -google 1 -map 4 -system 2 +Hello 2 diff --git a/tests/testdata/word_count/correct/reducer-output/part-00000 b/tests/testdata/word_count/correct/reducer-output/part-00000 index a5bd725..70db879 100644 --- a/tests/testdata/word_count/correct/reducer-output/part-00000 +++ b/tests/testdata/word_count/correct/reducer-output/part-00000 @@ -1,3 +1 @@ -cool 1 -file 4 -streaming 1 +Goodbye 1 diff --git a/tests/testdata/word_count/correct/reducer-output/part-00001 b/tests/testdata/word_count/correct/reducer-output/part-00001 index 5166d03..ecc21b4 100644 --- a/tests/testdata/word_count/correct/reducer-output/part-00001 +++ b/tests/testdata/word_count/correct/reducer-output/part-00001 @@ -1,3 +1,3 @@ -hadoop 2 -is 1 -reduce 3 +Bye 1 +Hadoop 2 +World 2 diff --git a/tests/testdata/word_count/correct/reducer-output/part-00002 b/tests/testdata/word_count/correct/reducer-output/part-00002 index 2ad7b99..30f4be7 100644 --- a/tests/testdata/word_count/correct/reducer-output/part-00002 +++ b/tests/testdata/word_count/correct/reducer-output/part-00002 @@ -1,3 +1 @@ -google 1 -map 4 -system 2 +Hello 2 diff --git a/tests/testdata/word_count/input/file01 b/tests/testdata/word_count/input/file01 deleted file mode 100644 index 9912eb6..0000000 --- a/tests/testdata/word_count/input/file01 +++ /dev/null @@ -1,3 +0,0 @@ -hadoop map reduce file map -map streaming file reduce -map reduce is cool diff --git a/tests/testdata/word_count/input/file02 b/tests/testdata/word_count/input/file02 deleted file mode 100644 index 4431d42..0000000 --- a/tests/testdata/word_count/input/file02 +++ /dev/null @@ -1,2 +0,0 @@ -hadoop file system -google file system diff --git a/tests/testdata/word_count/input/input01.txt b/tests/testdata/word_count/input/input01.txt new file mode 100644 index 0000000..c614f1f --- /dev/null +++ b/tests/testdata/word_count/input/input01.txt @@ -0,0 +1,2 @@ +Hello World +Bye World diff --git a/tests/testdata/word_count/input/input02.txt b/tests/testdata/word_count/input/input02.txt new file mode 100644 index 0000000..acd80a3 --- /dev/null +++ b/tests/testdata/word_count/input/input02.txt @@ -0,0 +1,2 @@ +Hello Hadoop +Goodbye Hadoop diff --git a/tests/testdata/word_count/map.sh b/tests/testdata/word_count/map.sh index d1b4729..7d8c500 100755 --- a/tests/testdata/word_count/map.sh +++ b/tests/testdata/word_count/map.sh @@ -9,4 +9,4 @@ set -Eeuo pipefail # Map -cat | tr '[ \t]' '\n' | tr '[:upper:]' '[:lower:]' | awk '{print $1"\t1"}' +cat | tr '[ \t]' '\n' | awk '{print $1"\t1"}' diff --git a/tox.ini b/tox.ini index 8e82040..7bfade9 100644 --- a/tox.ini +++ b/tox.ini @@ -17,9 +17,14 @@ python = [testenv] setenv = PYTHONPATH = {toxinidir} -allowlist_externals = sh +allowlist_externals = + sh + diff extras = dev commands = + diff -r madoop/example/input tests/testdata/word_count/input/ + diff -r madoop/example/map.py tests/testdata/word_count/map.py + diff -r madoop/example/reduce.py tests/testdata/word_count/reduce.py pycodestyle madoop tests setup.py sh -c "pydocstyle madoop tests/* setup.py" pylint madoop tests setup.py