diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..2ba6cc1 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,3 @@ +[run] +concurrency = thread,multiprocessing +parallel = true diff --git a/.github/workflows/continuous_integration.yml b/.github/workflows/continuous_integration.yml index 6586618..5d8af87 100644 --- a/.github/workflows/continuous_integration.yml +++ b/.github/workflows/continuous_integration.yml @@ -57,6 +57,8 @@ jobs: # Upload coverage report # https://github.com/codecov/codecov-action - name: Upload coverage report - uses: codecov/codecov-action@v1 + uses: codecov/codecov-action@v4 with: + token: ${{ secrets.CODECOV_TOKEN }} + slug: eecs485staff/madoop fail_ci_if_error: true diff --git a/.gitignore b/.gitignore index 8c61743..f75b093 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,8 @@ build/ /.tox/ /.coverage* *,cover +# Make an exception for .coveragerc, which should be checked into version control. +!.coveragerc # Text editors and IDEs *~ diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 2c2d58d..26b3033 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -6,7 +6,7 @@ Set up a development virtual environment. ```console $ python3 -m venv .venv $ source .venv/bin/activate -$ pip install --editable .[dev,test] +$ pip install --editable .[dev] ``` A `madoop` entry point script is installed in your virtual environment. diff --git a/MANIFEST.in b/MANIFEST.in index 9200ff1..af35254 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -9,6 +9,7 @@ graft madoop/example # Avoid dev and and binary files exclude tox.ini +exclude .coveragerc exclude .editorconfig global-exclude *.pyc global-exclude __pycache__ diff --git a/README_Hadoop_Streaming.md b/README_Hadoop_Streaming.md index 0fadafa..b18837a 100644 --- a/README_Hadoop_Streaming.md +++ b/README_Hadoop_Streaming.md @@ -1,7 +1,7 @@ Hadoop Streaming in Python =========================== -This tutorial shows how to write MapReduce programs in Python that are compatible with [Hadoop Streaming](https://hadoop.apache.org/docs/r1.2.1/streaming.html). We'll use Python's `itertools.groupby()` function to simplify our code. +This tutorial shows how to write MapReduce programs in Python that are compatible with [Hadoop Streaming](https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html). We'll use Python's `itertools.groupby()` function to simplify our code. Install Madoop, a light weight MapReduce framework for education. Madoop implements the Hadoop Streaming interface. ```console diff --git a/madoop/mapreduce.py b/madoop/mapreduce.py index fc05853..3d2c660 100644 --- a/madoop/mapreduce.py +++ b/madoop/mapreduce.py @@ -7,25 +7,24 @@ import collections import hashlib import logging -import math import pathlib import shutil import subprocess import tempfile +import multiprocessing +import concurrent.futures from .exceptions import MadoopError # Large input files are automatically split -MAX_INPUT_SPLIT_SIZE = 2**21 # 2 MB - -# The number of reducers is dynamically determined by the number of unique keys -# but will not be more than num_reducers +MAX_INPUT_SPLIT_SIZE = 10 * 1024 * 1024 # 10 MB # Madoop logger LOGGER = logging.getLogger("madoop") def mapreduce( + *, input_path, output_dir, map_exe, @@ -51,18 +50,15 @@ def mapreduce( LOGGER.debug("tmpdir=%s", tmpdir) # Create stage input and output directory - map_input_dir = tmpdir/'input' map_output_dir = tmpdir/'mapper-output' reduce_input_dir = tmpdir/'reducer-input' reduce_output_dir = tmpdir/'output' - map_input_dir.mkdir() map_output_dir.mkdir() reduce_input_dir.mkdir() reduce_output_dir.mkdir() # Copy and rename input files: part-00000, part-00001, etc. input_path = pathlib.Path(input_path) - prepare_input_files(input_path, map_input_dir) # Executables must be absolute paths map_exe = pathlib.Path(map_exe).resolve() @@ -72,7 +68,7 @@ def mapreduce( LOGGER.info("Starting map stage") map_stage( exe=map_exe, - input_dir=map_input_dir, + input_dir=input_path, output_dir=map_output_dir, ) @@ -98,7 +94,7 @@ def mapreduce( for filename in sorted(reduce_output_dir.glob("*")): st_size = filename.stat().st_size total_size += st_size - shutil.copy(filename, output_dir) + shutil.move(filename, output_dir) output_path = output_dir.parent/last_two(filename) LOGGER.debug("%s size=%sB", output_path, st_size) @@ -107,52 +103,36 @@ def mapreduce( LOGGER.info("Output directory: %s", output_dir) -def prepare_input_files(input_path, output_dir): - """Copy and split input files. Rename to part-00000, part-00001, etc. +def split_file(input_filename, max_chunksize): + """Iterate over the data in a file one chunk at a time.""" + with open(input_filename, "rb") as input_file: + buffer = b"" - The input_path can be a file or a directory of files. If a file is smaller - than MAX_INPUT_SPLIT_SIZE, then copy it to output_dir. For larger files, - split into blocks of MAX_INPUT_SPLIT_SIZE bytes and write block to - output_dir. Input files will never be combined. + while True: + chunk = input_file.read(max_chunksize) + # Break if no more data remains. + if not chunk: + break - The number of files created will be the number of mappers since we will - assume that the number of tasks per mapper is 1. Apache Hadoop has a - configurable number of tasks per mapper, however for both simplicity and - because our use case has smaller inputs we use 1. + # Add the chunk to the buffer. + buffer += chunk - """ - part_num = 0 - total_size = 0 - for inpath in normalize_input_paths(input_path): - assert inpath.is_file() - - # Compute output filenames - st_size = inpath.stat().st_size - total_size += st_size - n_splits = math.ceil(st_size / MAX_INPUT_SPLIT_SIZE) - n_splits = 1 if not n_splits else n_splits # Handle empty input file - LOGGER.debug( - "input %s size=%sB partitions=%s", inpath, st_size, n_splits - ) - outpaths = [ - output_dir/part_filename(part_num + i) for i in range(n_splits) - ] - part_num += n_splits - - # Copy to new output files - with contextlib.ExitStack() as stack: - outfiles = [stack.enter_context(i.open('w')) for i in outpaths] - infile = stack.enter_context(inpath.open(encoding="utf-8")) - outparent = outpaths[0].parent - assert all(i.parent == outparent for i in outpaths) - outnames = [i.name for i in outpaths] - logging.debug( - "partition %s >> %s/{%s}", - last_two(inpath), outparent.name, ",".join(outnames), - ) - for i, line in enumerate(infile): - outfiles[i % n_splits].write(line) - LOGGER.debug("total input size=%sB", total_size) + # Find the last newline character in the buffer. We don't want to + # yield a chunk that ends in the middle of a line; we have to + # respect line boundaries or we'll corrupt the input. + last_newline = buffer.rfind(b"\n") + if last_newline != -1: + # Yield the content up to the last newline, saving the rest + # for the next chunk. + yield buffer[:last_newline + 1] + + # Remove processed data from the buffer. The next chunk will + # start with whatever data came after the last newline. + buffer = buffer[last_newline + 1:] + + # Yield any remaining data. + if buffer: + yield buffer def normalize_input_paths(input_path): @@ -208,30 +188,51 @@ def part_filename(num): return f"part-{num:05d}" +def map_single_chunk(exe, input_path, output_path, chunk): + """Execute mapper on a single chunk.""" + with output_path.open("w") as outfile: + try: + subprocess.run( + str(exe), + shell=False, + check=True, + input=chunk, + stdout=outfile, + ) + except subprocess.CalledProcessError as err: + raise MadoopError( + f"Command returned non-zero: " + f"{exe} < {input_path} > {output_path}" + ) from err + + def map_stage(exe, input_dir, output_dir): """Execute mappers.""" - i = 0 - for i, input_path in enumerate(sorted(input_dir.iterdir()), 1): - output_path = output_dir/part_filename(i) - LOGGER.debug( - "%s < %s > %s", - exe.name, last_two(input_path), last_two(output_path), - ) - with input_path.open() as infile, output_path.open('w') as outfile: - try: - subprocess.run( - str(exe), - shell=False, - check=True, - stdin=infile, - stdout=outfile, + part_num = 0 + futures = [] + with concurrent.futures.ThreadPoolExecutor( + max_workers=multiprocessing.cpu_count() + ) as pool: + for input_path in normalize_input_paths(input_dir): + for chunk in split_file(input_path, MAX_INPUT_SPLIT_SIZE): + output_path = output_dir/part_filename(part_num) + LOGGER.debug( + "%s < %s > %s", + exe.name, last_two(input_path), last_two(output_path), ) - except subprocess.CalledProcessError as err: - raise MadoopError( - f"Command returned non-zero: " - f"{exe} < {input_path} > {output_path}" - ) from err - LOGGER.info("Finished map executions: %s", i) + futures.append(pool.submit( + map_single_chunk, + exe, + input_path, + output_path, + chunk, + )) + part_num += 1 + for future in concurrent.futures.as_completed(futures): + exception = future.exception() + if exception: + raise exception + LOGGER.info("Finished map executions: %s", part_num) def sort_file(path): @@ -287,7 +288,9 @@ def partition_keys_custom( Update the data structures provided by the caller input_keys_stats and output_keys_stats. Both map a filename to a set of of keys. """ - # pylint: disable=too-many-arguments,too-many-locals + # pylint: disable=too-many-arguments + # pylint: disable=too-many-positional-arguments + # pylint: disable=too-many-locals assert len(outpaths) == num_reducers outparent = outpaths[0].parent assert all(i.parent == outparent for i in outpaths) @@ -392,35 +395,61 @@ def group_stage(input_dir, output_dir, num_reducers, partitioner): path.unlink() # Sort output files - for path in sorted(output_dir.iterdir()): - sort_file(path) + try: + # Don't use a with statement here, because Coverage won't be able to + # detect code running in a subprocess if we do. + # https://pytest-cov.readthedocs.io/en/latest/subprocess-support.html + # pylint: disable=consider-using-with + pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()) + pool.map(sort_file, sorted(output_dir.iterdir())) + finally: + pool.close() + pool.join() log_output_key_stats(output_keys_stats, output_dir) +def reduce_single_file(exe, input_path, output_path): + """Execute reducer on a single file.""" + with input_path.open() as infile, output_path.open("w") as outfile: + try: + subprocess.run( + str(exe), + shell=False, + check=True, + stdin=infile, + stdout=outfile, + ) + except subprocess.CalledProcessError as err: + raise MadoopError( + f"Command returned non-zero: " + f"{exe} < {input_path} > {output_path}" + ) from err + + def reduce_stage(exe, input_dir, output_dir): """Execute reducers.""" i = 0 - for i, input_path in enumerate(sorted(input_dir.iterdir())): - output_path = output_dir/part_filename(i) - LOGGER.debug( - "%s < %s > %s", - exe.name, last_two(input_path), last_two(output_path), - ) - with input_path.open() as infile, output_path.open('w') as outfile: - try: - subprocess.run( - str(exe), - shell=False, - check=True, - stdin=infile, - stdout=outfile, - ) - except subprocess.CalledProcessError as err: - raise MadoopError( - f"Command returned non-zero: " - f"{exe} < {input_path} > {output_path}" - ) from err + futures = [] + with concurrent.futures.ThreadPoolExecutor( + max_workers=multiprocessing.cpu_count() + ) as pool: + for i, input_path in enumerate(sorted(input_dir.iterdir())): + output_path = output_dir/part_filename(i) + LOGGER.debug( + "%s < %s > %s", + exe.name, last_two(input_path), last_two(output_path), + ) + futures.append(pool.submit( + reduce_single_file, + exe, + input_path, + output_path, + )) + for future in concurrent.futures.as_completed(futures): + exception = future.exception() + if exception: + raise exception LOGGER.info("Finished reduce executions: %s", i+1) diff --git a/pyproject.toml b/pyproject.toml index 6f0ae08..e5ac193 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "madoop" -version = "1.2.2" +version = "1.3.0" description="A light weight MapReduce framework for education." license = {file = "LICENSE"} authors = [ @@ -26,15 +26,14 @@ madoop = "madoop.__main__:main" [project.optional-dependencies] dev = [ "build", - "twine", - "tox", "check-manifest", - "freezegun", "pycodestyle", "pydocstyle", "pylint", "pytest", "pytest-cov", + "tox", + "twine", ] [tool.setuptools.packages.find] diff --git a/tests/test_api.py b/tests/test_api.py index cb3de9f..137fed4 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1,6 +1,8 @@ """System tests for the API interface.""" +from pathlib import Path import pytest import madoop +from madoop.mapreduce import map_stage, reduce_stage from . import utils from .utils import TESTDATA_DIR @@ -56,6 +58,18 @@ def test_bash_executable(tmpdir): ) +def test_output_already_exists(tmpdir): + """Output already existing should raise an error.""" + with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError): + madoop.mapreduce( + input_path=TESTDATA_DIR/"word_count/input", + output_dir=tmpdir, + map_exe=TESTDATA_DIR/"word_count/map.py", + reduce_exe=TESTDATA_DIR/"word_count/reduce.py", + num_reducers=2, + ) + + def test_bad_map_exe(tmpdir): """Map exe returns non-zero should produce an error message.""" with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError): @@ -94,6 +108,23 @@ def test_noninteger_partition_exe(tmpdir): partitioner=TESTDATA_DIR/"word_count/partition_noninteger.py", ) + with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError): + map_stage( + exe=TESTDATA_DIR/"word_count/map_invalid.py", + input_dir=TESTDATA_DIR/"word_count/input", + output_dir=Path(tmpdir), + ) + + +def test_bad_reduce_exe(tmpdir): + """Reduce exe returns non-zero should produce an error message.""" + with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError): + reduce_stage( + exe=TESTDATA_DIR/"word_count/reduce_exit_1.py", + input_dir=TESTDATA_DIR/"word_count/input", + output_dir=Path(tmpdir), + ) + def test_missing_shebang(tmpdir): """Reduce exe with a bad shebag should produce an error message.""" diff --git a/tests/test_cli.py b/tests/test_cli.py index dab719e..9156c37 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,6 +1,6 @@ """System tests for the command line interface.""" import subprocess -import pkg_resources +import importlib.metadata import pytest from . import utils from .utils import TESTDATA_DIR @@ -15,7 +15,7 @@ def test_version(): ) output = result.stdout.decode("utf-8") assert "Madoop" in output - assert pkg_resources.get_distribution("madoop").version in output + assert importlib.metadata.version("madoop") in output def test_help(): diff --git a/tests/test_stages.py b/tests/test_stages.py index 2bc58b0..1de5818 100644 --- a/tests/test_stages.py +++ b/tests/test_stages.py @@ -1,6 +1,13 @@ """System tests for the map stage of Michigan Hadoop.""" +import shutil from pathlib import Path -from madoop.mapreduce import map_stage, group_stage, reduce_stage +from madoop.mapreduce import ( + map_stage, + group_stage, + reduce_stage, + split_file, + MAX_INPUT_SPLIT_SIZE, +) from . import utils from .utils import TESTDATA_DIR @@ -84,3 +91,44 @@ def test_reduce_stage_2_reducers(tmpdir): TESTDATA_DIR/"word_count/correct/reducer-output-2-reducers", tmpdir, ) + + +def test_input_splitting(tmp_path): + """Test that the Map Stage correctly splits input.""" + input_data = "o" * (MAX_INPUT_SPLIT_SIZE - 10) + "\n" + \ + "a" * int(MAX_INPUT_SPLIT_SIZE / 2) + input_dir = tmp_path/"input" + output_dir = tmp_path/"output" + input_dir.mkdir() + output_dir.mkdir() + + with open(input_dir/"input.txt", "w", encoding="utf-8") as input_file: + input_file.write(input_data) + + map_stage( + exe=Path(shutil.which("cat")), + input_dir=input_dir, + output_dir=output_dir, + ) + + output_files = sorted(output_dir.glob("*")) + assert len(output_files) == 2 + assert output_files == [output_dir/"part-00000", output_dir/"part-00001"] + + with open(output_dir/"part-00000", "r", encoding="utf-8") as outfile1: + data = outfile1.read() + assert data == "o" * (MAX_INPUT_SPLIT_SIZE - 10) + "\n" + with open(output_dir/"part-00001", "r", encoding="utf-8") as outfile2: + data = outfile2.read() + assert data == "a" * int(MAX_INPUT_SPLIT_SIZE / 2) + + +def test_split_file_mid_chunk(tmp_path): + """Test that file splitting still works when data remains in the buffer.""" + input_data = "noah says\nhello world" + input_file = tmp_path/"input.txt" + with open(input_file, "w", encoding="utf-8") as infile: + infile.write(input_data) + + splits = list(split_file(input_file, 50)) + assert splits == [b"noah says\n", b"hello world"] diff --git a/tests/testdata/word_count/reduce_exit_1.py b/tests/testdata/word_count/reduce_exit_1.py new file mode 100755 index 0000000..4561d53 --- /dev/null +++ b/tests/testdata/word_count/reduce_exit_1.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 +"""Invalid reduce executable exits 1.""" + +import sys + +sys.exit(1)