From f8cf5dfaae83e27117753b8751236a0e7ecc4ce7 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Fri, 26 Jul 2024 09:50:00 -0400 Subject: [PATCH 01/12] add test placeholder and fixture --- tests/test_output_sorting.py | 61 ++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 tests/test_output_sorting.py diff --git a/tests/test_output_sorting.py b/tests/test_output_sorting.py new file mode 100644 index 0000000..02dd92e --- /dev/null +++ b/tests/test_output_sorting.py @@ -0,0 +1,61 @@ +import pytest +import pathlib +import random +import calendar +import datetime + +SEED = 0 +random.seed(seed=SEED) + + +def _generate_random_datetime() -> datetime.datetime: + """Generate a random datetime for testing.""" + year = random.randint(2000, 2020) + month = random.randint(1, 12) + + max_days = calendar.monthrange(year=year, month=month)[1] + day = random.randint(1, max_days) + hour = random.randint(0, 23) + minute = random.randint(0, 59) + second = random.randint(0, 59) + + result = datetime(year=year, month=month, day=day, hour=hour, minute=minute, second=second) + return result + + +def _generate_random_datetimes(number_of_elements: int) -> list[datetime.datetime]: + """Generate random datetimes for testing.""" + random_datetimes = [_generate_random_datetime() for _ in range(number_of_elements)] + return random_datetimes + + +@pytest.fixture(scope="session") +def unordered_output_file_content(tmp_path_factory: pytest.TempPathFactory) -> list[str]: + """Generate file content equivalent to an unordered output file.""" + example_output_file_path = ( + pathlib.Path(__file__).parent + / "examples" + / "example_0" + / "expected_output" + / "blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv" + ) + + with open(file=example_output_file_path, mode="r") as io: + example_output_lines = io.readlines() + base_line = example_output_lines[1].split("\t") + + random_datetimes = _generate_random_datetimes(number_of_elements=100) + + scrambled_example_output_lines = [ + [base_line[0], random_datetime, base_line[2], base_line[3]] for random_datetime in random_datetimes + ] + return scrambled_example_output_lines + + +def test_output_file_reordering() -> None: + """ + Performing parallelized parsing can result in both race conditions and a break to chronological ordering. + + This is a test for the utility function for reordering the output of a parsed log file according to time. + """ + pass From fa372ed4a1e76c7d34813c464e8c763dae46d542 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Fri, 26 Jul 2024 09:51:48 -0400 Subject: [PATCH 02/12] style --- tests/test_output_sorting.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_output_sorting.py b/tests/test_output_sorting.py index 02dd92e..901b200 100644 --- a/tests/test_output_sorting.py +++ b/tests/test_output_sorting.py @@ -5,7 +5,7 @@ import datetime SEED = 0 -random.seed(seed=SEED) +random.seed(SEED) def _generate_random_datetime() -> datetime.datetime: @@ -19,7 +19,7 @@ def _generate_random_datetime() -> datetime.datetime: minute = random.randint(0, 59) second = random.randint(0, 59) - result = datetime(year=year, month=month, day=day, hour=hour, minute=minute, second=second) + result = datetime.datetime(year=year, month=month, day=day, hour=hour, minute=minute, second=second) return result @@ -30,7 +30,7 @@ def _generate_random_datetimes(number_of_elements: int) -> list[datetime.datetim @pytest.fixture(scope="session") -def unordered_output_file_content(tmp_path_factory: pytest.TempPathFactory) -> list[str]: +def unordered_output_file_content(tmp_path_factory: pytest.TempPathFactory) -> list[list[str]]: """Generate file content equivalent to an unordered output file.""" example_output_file_path = ( pathlib.Path(__file__).parent From f1391d10f370d09928c9029c98bea910fc680807 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Fri, 26 Jul 2024 13:19:31 -0400 Subject: [PATCH 03/12] scope seed --- tests/test_output_sorting.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_output_sorting.py b/tests/test_output_sorting.py index 901b200..925713c 100644 --- a/tests/test_output_sorting.py +++ b/tests/test_output_sorting.py @@ -5,7 +5,6 @@ import datetime SEED = 0 -random.seed(SEED) def _generate_random_datetime() -> datetime.datetime: @@ -25,6 +24,8 @@ def _generate_random_datetime() -> datetime.datetime: def _generate_random_datetimes(number_of_elements: int) -> list[datetime.datetime]: """Generate random datetimes for testing.""" + random.seed(SEED) + random_datetimes = [_generate_random_datetime() for _ in range(number_of_elements)] return random_datetimes From c8e13e8efacf31b96bf51262aa00ea8e38e60ac3 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Mon, 5 Aug 2024 12:16:24 -0400 Subject: [PATCH 04/12] setup ordering helper and tests --- src/dandi_s3_log_parser/__init__.py | 2 + src/dandi_s3_log_parser/_order_parsed_logs.py | 19 ++++++ .../example_0/example_dandi_s3_log.log | 2 - ...s_11ec8933-1456-4942-922b-94e5878bb991.tsv | 0 ...s_a7b032b8-1e31-429f-975f-52a28cec6629.tsv | 0 ...s_11ec8933-1456-4942-922b-94e5878bb991.tsv | 5 ++ ...s_a7b032b8-1e31-429f-975f-52a28cec6629.tsv | 5 ++ ...s_11ec8933-1456-4942-922b-94e5878bb991.tsv | 5 ++ ...s_a7b032b8-1e31-429f-975f-52a28cec6629.tsv | 5 ++ tests/test_dandi_s3_log_parser.py | 6 +- tests/test_order_parsed_logs.py | 33 ++++++++++ tests/test_output_sorting.py | 62 ------------------- 12 files changed, 77 insertions(+), 67 deletions(-) create mode 100644 src/dandi_s3_log_parser/_order_parsed_logs.py delete mode 100644 tests/examples/example_0/example_dandi_s3_log.log rename tests/examples/{example_0 => ordered_example_0}/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv (100%) rename tests/examples/{example_0 => ordered_example_0}/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv (100%) create mode 100644 tests/examples/unordered_example_0/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv create mode 100644 tests/examples/unordered_example_0/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv create mode 100644 tests/examples/unordered_example_0/unordered_parsed_logs/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv create mode 100644 tests/examples/unordered_example_0/unordered_parsed_logs/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv create mode 100644 tests/test_order_parsed_logs.py delete mode 100644 tests/test_output_sorting.py diff --git a/src/dandi_s3_log_parser/__init__.py b/src/dandi_s3_log_parser/__init__.py index 279cb4c..ec3f54f 100644 --- a/src/dandi_s3_log_parser/__init__.py +++ b/src/dandi_s3_log_parser/__init__.py @@ -3,6 +3,7 @@ from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH, IPINFO_CREDENTIALS, get_hash_salt from ._s3_log_file_parser import parse_dandi_raw_s3_log, parse_raw_s3_log, parse_all_dandi_raw_s3_logs from ._buffered_text_reader import BufferedTextReader +from ._order_parsed_logs import order_parsed_logs __all__ = [ "DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH", @@ -12,4 +13,5 @@ "parse_raw_s3_log", "parse_dandi_raw_s3_log", "parse_all_dandi_raw_s3_logs", + "order_parsed_logs", ] diff --git a/src/dandi_s3_log_parser/_order_parsed_logs.py b/src/dandi_s3_log_parser/_order_parsed_logs.py new file mode 100644 index 0000000..75fe932 --- /dev/null +++ b/src/dandi_s3_log_parser/_order_parsed_logs.py @@ -0,0 +1,19 @@ +import pathlib +import pandas + + +def order_parsed_logs( + unordered_parsed_s3_log_folder_path: pathlib.Path, ordered_parsed_s3_log_folder_path: pathlib.Path +) -> None: + """Order the contents of all parsed log files chronologically.""" + ordered_parsed_s3_log_folder_path.mkdir(exist_ok=True) + + for unordered_parsed_s3_log_file_path in unordered_parsed_s3_log_folder_path.iterdir(): + unordered_parsed_s3_log = pandas.read_table(filepath_or_buffer=unordered_parsed_s3_log_file_path, index_col=0) + ordered_parsed_s3_log = unordered_parsed_s3_log.sort_values(by="timestamp") + + # correct index of first column + ordered_parsed_s3_log.index = range(len(ordered_parsed_s3_log)) + + ordered_parsed_s3_log_file_path = ordered_parsed_s3_log_folder_path / unordered_parsed_s3_log_file_path.name + ordered_parsed_s3_log.to_csv(path_or_buf=ordered_parsed_s3_log_file_path, sep="\t") diff --git a/tests/examples/example_0/example_dandi_s3_log.log b/tests/examples/example_0/example_dandi_s3_log.log deleted file mode 100644 index 43bacc6..0000000 --- a/tests/examples/example_0/example_dandi_s3_log.log +++ /dev/null @@ -1,2 +0,0 @@ -8787a3c41bf7ce0d54359d9348ad5b08e16bd5bb8ae5aa4e1508b435773a066e dandiarchive [31/Dec/2021:23:06:42 +0000] 192.0.2.0 - NWC7V1KE70QZYJ5Q REST.GET.OBJECT blobs/a7b/032/a7b032b8-1e31-429f-975f-52a28cec6629 "GET /blobs/a7b/032/a7b032b8-1e31-429f-975f-52a28cec6629?versionId=yn5YAJiwT36Rv78jGYLM71GZumWL.QWn HTTP/1.1" 200 - 1443 1443 35 35 "-" "git-annex/8.20211028-g1c76278" yn5YAJiwT36Rv78jGYLM71GZumWL.QWn ojBg2QLVTSTWsCAe1HoC6IBNLUSPmWH276FdsedhZ/4CQ67DWuZQHcXXB9XUJxYKpnPHpJyBjMM= - ECDHE-RSA-AES128-GCM-SHA256 - dandiarchive.s3.amazonaws.com TLSv1.2 - -8787a3c41bf7ce0d54359d9348ad5b08e16bd5bb8ae5aa4e1508b435773a066e dandiarchive [04/May/2022:05:06:35 +0000] 192.0.2.0 - J42N2W7ET0EC03CV REST.GET.OBJECT blobs/11e/c89/11ec8933-1456-4942-922b-94e5878bb991 "GET /blobs/11e/c89/11ec8933-1456-4942-922b-94e5878bb991 HTTP/1.1" 206 - 512 171408 53 52 "-" "-" - DX8oFoKQx0o5V3lwEuWBxF5p2fSXrwINj0rnxmas0YgjWuPqYLK/vnW60Txh23K93aahe0IFw2c= - ECDHE-RSA-AES128-GCM-SHA256 - dandiarchive.s3.amazonaws.com TLSv1.2 - diff --git a/tests/examples/example_0/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv b/tests/examples/ordered_example_0/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv similarity index 100% rename from tests/examples/example_0/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv rename to tests/examples/ordered_example_0/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv diff --git a/tests/examples/example_0/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv b/tests/examples/ordered_example_0/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv similarity index 100% rename from tests/examples/example_0/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv rename to tests/examples/ordered_example_0/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv diff --git a/tests/examples/unordered_example_0/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv b/tests/examples/unordered_example_0/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv new file mode 100644 index 0000000..eed1bdf --- /dev/null +++ b/tests/examples/unordered_example_0/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv @@ -0,0 +1,5 @@ + timestamp bytes_sent region +0 2020-02-24 05:06:35 124 CA/Ontario +1 2021-05-21 05:06:35 1234 US/California +2 2022-07-01 05:06:35 512 unknown +3 2022-11-04 05:06:35 141424 US/Virginia diff --git a/tests/examples/unordered_example_0/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv b/tests/examples/unordered_example_0/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv new file mode 100644 index 0000000..f7d6a57 --- /dev/null +++ b/tests/examples/unordered_example_0/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv @@ -0,0 +1,5 @@ + timestamp bytes_sent region +0 2019-11-13 05:06:35 4332423 US/Colorado +1 2020-04-16 05:06:35 12313153 unknown +2 2022-03-21 05:06:35 24323 MX/Zacatecas +3 2022-08-05 05:06:35 2141 CH/BeiJing diff --git a/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv b/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv new file mode 100644 index 0000000..931a755 --- /dev/null +++ b/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv @@ -0,0 +1,5 @@ + timestamp bytes_sent region +0 2022-07-01 05:06:35 512 unknown +1 2021-05-21 05:06:35 1234 US/California +2 2022-11-04 05:06:35 141424 US/Virginia +3 2020-02-24 05:06:35 124 CA/Ontario diff --git a/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv b/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv new file mode 100644 index 0000000..8e0ac58 --- /dev/null +++ b/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv @@ -0,0 +1,5 @@ + timestamp bytes_sent region +0 2022-08-05 05:06:35 2141 CH/BeiJing +1 2022-03-21 05:06:35 24323 MX/Zacatecas +2 2019-11-13 05:06:35 4332423 US/Colorado +3 2020-04-16 05:06:35 12313153 unknown diff --git a/tests/test_dandi_s3_log_parser.py b/tests/test_dandi_s3_log_parser.py index 4ba1983..0d7e4d0 100644 --- a/tests/test_dandi_s3_log_parser.py +++ b/tests/test_dandi_s3_log_parser.py @@ -16,7 +16,7 @@ def test_parse_dandi_raw_s3_log_example_0(tmpdir: py.path.local): tmpdir = pathlib.Path(tmpdir) file_parent = pathlib.Path(__file__).parent - examples_folder_path = file_parent / "examples" / "example_0" + examples_folder_path = file_parent / "examples" / "ordered_example_0" example_raw_s3_log_file_path = examples_folder_path / "example_dandi_s3_log.log" expected_parsed_s3_log_folder_path = examples_folder_path / "expected_output" @@ -41,11 +41,11 @@ def test_parse_dandi_raw_s3_log_example_0(tmpdir: py.path.local): test_parsed_s3_log_file_path.stem in expected_asset_ids ), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!" - test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path) + test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path, index_col=0) expected_parsed_s3_log_file_path = ( expected_parsed_s3_log_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv" ) - expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path) + expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path, index_col=0) pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log) diff --git a/tests/test_order_parsed_logs.py b/tests/test_order_parsed_logs.py new file mode 100644 index 0000000..b32ee88 --- /dev/null +++ b/tests/test_order_parsed_logs.py @@ -0,0 +1,33 @@ +import pathlib +import py +import pandas + +import dandi_s3_log_parser + + +def test_output_file_reordering(tmpdir: py.path.local) -> None: + """ + Performing parallelized parsing can result in both race conditions and a break to chronological ordering. + + This is a test for the utility function for reordering the output of a parsed log file according to time. + """ + tmpdir = pathlib.Path(tmpdir) + + unordered_example_base_folder_path = pathlib.Path(__file__).parent / "examples" / "unordered_example_0" + unordered_parsed_s3_log_folder_path = unordered_example_base_folder_path / "unordered_parsed_logs" + ordered_parsed_s3_log_folder_path = tmpdir + + dandi_s3_log_parser.order_parsed_logs( + unordered_parsed_s3_log_folder_path=unordered_parsed_s3_log_folder_path, + ordered_parsed_s3_log_folder_path=ordered_parsed_s3_log_folder_path, + ) + + parsed_log_file_stems = [path.name for path in unordered_parsed_s3_log_folder_path.iterdir()] + expected_output_folder_path = unordered_example_base_folder_path / "expected_output" + for parsed_log_file_name in parsed_log_file_stems: + test_parsed_s3_log_file_path = ordered_parsed_s3_log_folder_path / parsed_log_file_name + expected_parsed_s3_log_file_path = expected_output_folder_path / parsed_log_file_name + + test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path, index_col=0) + expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path, index_col=0) + pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log) diff --git a/tests/test_output_sorting.py b/tests/test_output_sorting.py deleted file mode 100644 index 925713c..0000000 --- a/tests/test_output_sorting.py +++ /dev/null @@ -1,62 +0,0 @@ -import pytest -import pathlib -import random -import calendar -import datetime - -SEED = 0 - - -def _generate_random_datetime() -> datetime.datetime: - """Generate a random datetime for testing.""" - year = random.randint(2000, 2020) - month = random.randint(1, 12) - - max_days = calendar.monthrange(year=year, month=month)[1] - day = random.randint(1, max_days) - hour = random.randint(0, 23) - minute = random.randint(0, 59) - second = random.randint(0, 59) - - result = datetime.datetime(year=year, month=month, day=day, hour=hour, minute=minute, second=second) - return result - - -def _generate_random_datetimes(number_of_elements: int) -> list[datetime.datetime]: - """Generate random datetimes for testing.""" - random.seed(SEED) - - random_datetimes = [_generate_random_datetime() for _ in range(number_of_elements)] - return random_datetimes - - -@pytest.fixture(scope="session") -def unordered_output_file_content(tmp_path_factory: pytest.TempPathFactory) -> list[list[str]]: - """Generate file content equivalent to an unordered output file.""" - example_output_file_path = ( - pathlib.Path(__file__).parent - / "examples" - / "example_0" - / "expected_output" - / "blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv" - ) - - with open(file=example_output_file_path, mode="r") as io: - example_output_lines = io.readlines() - base_line = example_output_lines[1].split("\t") - - random_datetimes = _generate_random_datetimes(number_of_elements=100) - - scrambled_example_output_lines = [ - [base_line[0], random_datetime, base_line[2], base_line[3]] for random_datetime in random_datetimes - ] - return scrambled_example_output_lines - - -def test_output_file_reordering() -> None: - """ - Performing parallelized parsing can result in both race conditions and a break to chronological ordering. - - This is a test for the utility function for reordering the output of a parsed log file according to time. - """ - pass From ff38e85c8feeeca8ce6d2dcc9ca8254326664130 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Mon, 5 Aug 2024 14:23:46 -0400 Subject: [PATCH 05/12] add basic parallel structure --- pyproject.toml | 1 + .../_buffered_text_reader.py | 10 +- .../_s3_log_file_parser.py | 465 ++++++++++-------- tests/test_buffered_text_reader.py | 8 +- 4 files changed, 281 insertions(+), 203 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 3de495d..439312a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ dependencies = [ "PyYAML", "click", "natsort", + "dandi", ] classifiers = [ "Programming Language :: Python", diff --git a/src/dandi_s3_log_parser/_buffered_text_reader.py b/src/dandi_s3_log_parser/_buffered_text_reader.py index 6ebe0bf..df175cc 100644 --- a/src/dandi_s3_log_parser/_buffered_text_reader.py +++ b/src/dandi_s3_log_parser/_buffered_text_reader.py @@ -2,7 +2,7 @@ class BufferedTextReader: - def __init__(self, *, file_path: str | pathlib.Path, maximum_ram_usage: int = 10**9): + def __init__(self, *, file_path: str | pathlib.Path, maximum_ram_usage_in_bytes: int = 10**9): """ Lazily read a text file into RAM using buffers of a specified size. @@ -10,15 +10,15 @@ def __init__(self, *, file_path: str | pathlib.Path, maximum_ram_usage: int = 10 ---------- file_path : string or pathlib.Path The path to the text file to be read. - maximum_ram_usage : int, default: 1 GB - The theoretical maximum amount of RAM to be used by the BufferedTextReader object. + maximum_ram_usage_in_bytes : int, default: 1 GB + The theoretical maximum amount of RAM (in bytes) to be used by the BufferedTextReader object. """ self.file_path = file_path - self.maximum_ram_usage = maximum_ram_usage + self.maximum_ram_usage_in_bytes = maximum_ram_usage_in_bytes # The actual amount of bytes to read per iteration is 3x less than theoretical maximum usage # due to decoding and handling - self.buffer_size_in_bytes = int(maximum_ram_usage / 3) + self.buffer_size_in_bytes = int(maximum_ram_usage_in_bytes / 3) self.total_file_size = pathlib.Path(file_path).stat().st_size self.offset = 0 diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_parser.py index 9f75438..8904c04 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_file_parser.py @@ -3,6 +3,9 @@ import collections import datetime import pathlib +import os +import uuid +from concurrent.futures import ProcessPoolExecutor, as_completed from typing import Callable, Literal import pandas @@ -19,187 +22,160 @@ from ._buffered_text_reader import BufferedTextReader -def _get_reduced_log_lines( - *, - raw_s3_log_file_path: pathlib.Path, - bucket: str | None, - request_type: Literal["GET", "PUT"], - excluded_ips: collections.defaultdict[str, bool], - tqdm_kwargs: dict | None = None, -) -> list[ReducedLogLine]: - """ - Reduce the full S3 log file to minimal content and return a list of in-memory collections.namedtuple objects. - - Parameters - ---------- - raw_s3_log_file_path : str or pathlib.Path - Path to the raw S3 log file. - bucket : str - Only parse and return lines that match this bucket. - request_type : str - The type of request to filter for. - excluded_ips : collections.defaultdict of strings to booleans - A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. - """ - assert raw_s3_log_file_path.suffix == ".log", f"{raw_s3_log_file_path=} should end in '.log'!" - - # Collapse bucket to empty string instead of asking if it is None on each iteration - bucket = "" if bucket is None else bucket - tqdm_kwargs = tqdm_kwargs or dict() - - # One-time initialization/read of IP address to region cache for performance - # This dictionary is intended to be mutated throughout the process - ip_address_to_region = _load_ip_address_to_region_cache() - - # Perform I/O read in batches to improve performance - resolved_tqdm_kwargs = dict(desc="Parsing line buffers...", leave=False, mininterval=1.0) - resolved_tqdm_kwargs.update(tqdm_kwargs) - - reduced_log_lines = list() - per_buffer_index = 0 - buffered_text_reader = BufferedTextReader(file_path=raw_s3_log_file_path) - for buffered_raw_lines in tqdm.tqdm(iterable=buffered_text_reader, **resolved_tqdm_kwargs): - for index, raw_line in enumerate(iterable=buffered_raw_lines, start=per_buffer_index): - _append_reduced_log_line( - raw_line=raw_line, - reduced_log_lines=reduced_log_lines, - bucket=bucket, - request_type=request_type, - excluded_ips=excluded_ips, - log_file_path=raw_s3_log_file_path, - index=index, - ip_hash_to_region=ip_address_to_region, - ) - per_buffer_index += index - - _save_ip_address_to_region_cache(ip_hash_to_region=ip_address_to_region) - - return reduced_log_lines - - -def parse_raw_s3_log( +def parse_all_dandi_raw_s3_logs( *, - raw_s3_log_file_path: str | pathlib.Path, + base_raw_s3_log_folder_path: str | pathlib.Path, parsed_s3_log_folder_path: str | pathlib.Path, mode: Literal["w", "a"] = "a", - bucket: str | None = None, - request_type: Literal["GET", "PUT"] = "GET", excluded_ips: collections.defaultdict[str, bool] | None = None, + exclude_github_ips: bool = True, number_of_jobs: int = 1, - total_memory_in_bytes: int = 1e9, - asset_id_handler: Callable | None = None, - tqdm_kwargs: dict | None = None, + maximum_ram_usage_in_bytes: int = 40**9, ) -> None: """ - Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. + Batch parse all raw S3 log files in a folder and write the results to a folder of TSV files. - 'Parsing' here means: - - limiting only to requests of the specified type (i.e., GET, PUT, etc.) - - reducing the information to the asset ID, request time, request size, and geographic IP of the requester + Assumes the following folder structure... + + |- + |-- 2019 (year) + |--- 01 (month) + |---- 01.log (day) + | ... Parameters ---------- - raw_s3_log_file_path : str or pathlib.Path - Path to the raw S3 log file. - parsed_s3_log_folder_path : str or pathlib.Path + base_raw_s3_log_folder_path : string or pathlib.Path + Path to the folder containing the raw S3 log files. + parsed_s3_log_folder_path : string or pathlib.Path Path to write each parsed S3 log file to. There will be one file per handled asset ID. mode : "w" or "a", default: "a" How to resolve the case when files already exist in the folder containing parsed logs. "w" will overwrite existing content, "a" will append or create if the file does not yet exist. - - The intention of the default usage is to have one consolidated raw S3 log file per day and then to iterate - over each day, parsing and binning by asset, effectively 'updating' the parsed collection on each iteration. - HINT: If this iteration is done in chronological order, the resulting parsed logs will also maintain that order. - bucket : str - Only parse and return lines that match this bucket. - request_type : str, default: "GET" - The type of request to filter for. excluded_ips : collections.defaultdict of strings to booleans, optional A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. + exclude_github_ips : bool, default: True + Include all GitHub action IP addresses in the `excluded_ips`. number_of_jobs : int, default: 1 The number of jobs to use for parallel processing. Allows negative range to mean 'all but this many (minus one) jobs'. E.g., -1 means use all workers, -2 means all but one worker. WARNING: planned but not yet supported. - total_memory_in_bytes : int, default: 2e9 - The number of bytes to load as a buffer into RAM per job. - Will automatically distribute this amount over the number of jobs. - WARNING: planned but not yet supported. - asset_id_handler : callable, optional - If your asset IDs in the raw log require custom handling (i.e., they contain slashes that you do not wish to - translate into nested directory paths) then define a function of the following form: - - # For example - def asset_id_handler(*, raw_asset_id: str) -> str: - split_by_slash = raw_asset_id.split("/") - return split_by_slash[0] + "_" + split_by_slash[-1] + maximum_ram_usage_in_bytes : int, default: 1 GB + The theoretical maximum amount of RAM (in bytes) to be used throughout the process. """ - raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path) + base_raw_s3_log_folder_path = pathlib.Path(base_raw_s3_log_folder_path) parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path) parsed_s3_log_folder_path.mkdir(exist_ok=True) - excluded_ips = excluded_ips or collections.defaultdict(bool) - tqdm_kwargs = tqdm_kwargs or dict() - # TODO: buffering control - # total_file_size_in_bytes = raw_s3_log_file_path.lstat().st_size - # buffer_per_job_in_bytes = int(total_memory_in_bytes / number_of_jobs) - # Approximate using ~600 bytes per line - # number_of_lines_to_read_per_job = int(buffer_per_job_in_bytes / 600) - # number_of_iterations_per_job = int(total_file_size_in_bytes / number_of_lines_to_read_per_job) - - # TODO: finish polishing parallelization - just a draft for now - if number_of_jobs > 1: - raise NotImplementedError("Parallelization has not yet been implemented!") - # for _ in range(5) - # reduced_logs = _get_reduced_logs( - # raw_s3_log_file_path=raw_s3_log_file_path, - # lines_errors_file_path=lines_errors_file_path, - # bucket=bucket, - # request_type=request_type - # ) - else: - reduced_logs = _get_reduced_log_lines( - raw_s3_log_file_path=raw_s3_log_file_path, - bucket=bucket, - request_type=request_type, - excluded_ips=excluded_ips, - tqdm_kwargs=tqdm_kwargs, - ) + # Re-define some top-level pass-through items here to avoid repeated constructions + excluded_ips = excluded_ips or collections.defaultdict(bool) + if exclude_github_ips: + for github_ip in _get_latest_github_ip_ranges(): + excluded_ips[github_ip] = True - reduced_logs_binned_by_unparsed_asset = dict() - for reduced_log in reduced_logs: - raw_asset_id = reduced_log.asset_id - reduced_logs_binned_by_unparsed_asset[raw_asset_id] = reduced_logs_binned_by_unparsed_asset.get( - raw_asset_id, collections.defaultdict(list) - ) + def asset_id_handler(*, raw_asset_id: str) -> str: + split_by_slash = raw_asset_id.split("/") + return split_by_slash[0] + "_" + split_by_slash[-1] - reduced_logs_binned_by_unparsed_asset[raw_asset_id]["timestamp"].append(reduced_log.timestamp) - reduced_logs_binned_by_unparsed_asset[raw_asset_id]["bytes_sent"].append(reduced_log.bytes_sent) - reduced_logs_binned_by_unparsed_asset[raw_asset_id]["region"].append(reduced_log.region) + daily_raw_s3_log_file_paths = list() + base_folder_paths = [path for path in base_raw_s3_log_folder_path.iterdir() if path.stem.startswith("20")] + yearly_folder_paths = natsort.natsorted(seq=list(base_folder_paths)) + for yearly_folder_path in yearly_folder_paths: + monthly_folder_paths = natsort.natsorted(seq=list(yearly_folder_path.iterdir())) - if asset_id_handler is not None: - reduced_logs_binned_by_asset = dict() - for raw_asset_id, reduced_logs_per_asset in reduced_logs_binned_by_unparsed_asset.items(): - parsed_asset_id = asset_id_handler(raw_asset_id=raw_asset_id) + for monthly_folder_path in monthly_folder_paths: + daily_raw_s3_log_file_paths.extend(natsort.natsorted(seq=list(monthly_folder_path.glob("*.log")))) - reduced_logs_binned_by_asset[parsed_asset_id] = reduced_logs_per_asset + if number_of_jobs == 1: + for raw_s3_log_file_path in tqdm.tqdm( + iterable=daily_raw_s3_log_file_paths, + desc="Parsing log files...", + position=0, + leave=True, + ): + parse_dandi_raw_s3_log( + raw_s3_log_file_path=raw_s3_log_file_path, + parsed_s3_log_folder_path=parsed_s3_log_folder_path, + mode=mode, + excluded_ips=excluded_ips, + exclude_github_ips=False, # Already included in list so avoid repeated construction + asset_id_handler=asset_id_handler, + tqdm_kwargs=dict(position=1, leave=False), + maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes, + ) else: - reduced_logs_binned_by_asset = reduced_logs_binned_by_unparsed_asset - - for raw_asset_id, reduced_logs_per_asset in reduced_logs_binned_by_asset.items(): - parsed_s3_log_file_path = parsed_s3_log_folder_path / f"{raw_asset_id}.tsv" - - data_frame = pandas.DataFrame(data=reduced_logs_per_asset) - data_frame.to_csv(path_or_buf=parsed_s3_log_file_path, mode=mode, sep="\t") + # Create a fresh temporary directory in the home folder and then fresh subfolders for each job + temporary_base_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "temp" + temporary_base_folder_path.mkdir(exist_ok=True) + task_id = uuid.uuid4()[:5] + temporary_folder_path = temporary_base_folder_path / task_id + temporary_folder_path.mkdir(exist_ok=True) + for job_index in range(number_of_jobs): + per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}" + per_job_temporary_folder_path.mkdir(exist_ok=True) + + futures = [] + with ProcessPoolExecutor(max_workers=number_of_jobs) as executor: + for raw_s3_log_file_path in daily_raw_s3_log_file_paths: + futures.append( + executor.submit( + _multi_job_parse_dandi_raw_s3_log, + number_of_jobs=number_of_jobs, + raw_s3_log_file_path=raw_s3_log_file_path, + temporary_folder_path=temporary_folder_path, + mode=mode, + excluded_ips=excluded_ips, + exclude_github_ips=False, # Already included in list so avoid repeated construction + asset_id_handler=asset_id_handler, + maximum_ram_usage_in_bytes=int(maximum_ram_usage_in_bytes / number_of_jobs), + ) + ) + + # Perform the iteration to trigger processing + for _ in tqdm.tqdm( + iterable=as_completed(daily_raw_s3_log_file_paths), + desc=f"Parsing log files using {number_of_jobs} jobs...", + total=len(daily_raw_s3_log_file_paths), + position=0, + leave=True, + ): + pass + + # TODO: merge results back to central (also temporary) folder + # TODO: order results chronologically + + return None + + +def _multi_job_parse_dandi_raw_s3_log( + *, + number_of_jobs: int, + raw_s3_log_file_path: pathlib.Path, + temporary_folder_path: pathlib.Path, + mode: Literal["w", "a"], + excluded_ips: collections.defaultdict[str, bool] | None, + exclude_github_ips: bool, + asset_id_handler: Callable | None, + maximum_ram_usage_in_bytes: int, +) -> None: + """A mostly pass-through function to calculate the job index on the worker and target the correct subfolder.""" + job_index = os.getpid() % number_of_jobs + per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}" - progress_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "progress" - progress_folder_path.mkdir(exist_ok=True) + parse_dandi_raw_s3_log( + raw_s3_log_file_path=raw_s3_log_file_path, + parsed_s3_log_folder_path=per_job_temporary_folder_path, + mode=mode, + excluded_ips=excluded_ips, + exclude_github_ips=exclude_github_ips, + asset_id_handler=asset_id_handler, + tqdm_kwargs=dict(position=job_index + 1, leave=False), + maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes, + ) - date = datetime.datetime.now().strftime("%y%m%d") - progress_file_path = progress_folder_path / f"{date}.txt" - with open(file=progress_file_path, mode="a") as io: - io.write(f"Parsed {raw_s3_log_file_path} successfully!\n") + return None def parse_dandi_raw_s3_log( @@ -211,6 +187,7 @@ def parse_dandi_raw_s3_log( exclude_github_ips: bool = True, asset_id_handler: Callable | None = None, tqdm_kwargs: dict | None = None, + maximum_ram_usage_in_bytes: int = 40**9, ) -> None: """ Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. @@ -245,6 +222,10 @@ def parse_dandi_raw_s3_log( def asset_id_handler(*, raw_asset_id: str) -> str: split_by_slash = raw_asset_id.split("/") return split_by_slash[0] + "_" + split_by_slash[-1] + tqdm_kwargs : dict, optional + Keyword arguments to pass to the tqdm progress bar. + maximum_ram_usage_in_bytes : int, default: 1 GB + The theoretical maximum amount of RAM (in bytes) to be used throughout the process. """ tqdm_kwargs = tqdm_kwargs or dict() @@ -273,78 +254,174 @@ def asset_id_handler(*, raw_asset_id: str) -> str: excluded_ips=excluded_ips, asset_id_handler=asset_id_handler, tqdm_kwargs=tqdm_kwargs, + maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes, ) -def parse_all_dandi_raw_s3_logs( +def parse_raw_s3_log( *, - base_raw_s3_log_folder_path: str | pathlib.Path, + raw_s3_log_file_path: str | pathlib.Path, parsed_s3_log_folder_path: str | pathlib.Path, mode: Literal["w", "a"] = "a", + bucket: str | None = None, + request_type: Literal["GET", "PUT"] = "GET", excluded_ips: collections.defaultdict[str, bool] | None = None, - exclude_github_ips: bool = True, + asset_id_handler: Callable | None = None, + tqdm_kwargs: dict | None = None, + maximum_ram_usage_in_bytes: int = 40**9, ) -> None: """ - Batch parse all raw S3 log files in a folder and write the results to a folder of TSV files. - - Assumes the following folder structure... + Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. - |- - |-- 2019 (year) - |--- 01 (month) - |---- 01.log (day) - | ... + 'Parsing' here means: + - limiting only to requests of the specified type (i.e., GET, PUT, etc.) + - reducing the information to the asset ID, request time, request size, and geographic IP of the requester Parameters ---------- - base_raw_s3_log_folder_path : string or pathlib.Path - Path to the folder containing the raw S3 log files. - parsed_s3_log_folder_path : string or pathlib.Path + raw_s3_log_file_path : str or pathlib.Path + Path to the raw S3 log file. + parsed_s3_log_folder_path : str or pathlib.Path Path to write each parsed S3 log file to. There will be one file per handled asset ID. mode : "w" or "a", default: "a" How to resolve the case when files already exist in the folder containing parsed logs. "w" will overwrite existing content, "a" will append or create if the file does not yet exist. + + The intention of the default usage is to have one consolidated raw S3 log file per day and then to iterate + over each day, parsing and binning by asset, effectively 'updating' the parsed collection on each iteration. + HINT: If this iteration is done in chronological order, the resulting parsed logs will also maintain that order. + bucket : str + Only parse and return lines that match this bucket. + request_type : str, default: "GET" + The type of request to filter for. excluded_ips : collections.defaultdict of strings to booleans, optional A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. - exclude_github_ips : bool, default: True - Include all GitHub action IP addresses in the `excluded_ips`. + asset_id_handler : callable, optional + If your asset IDs in the raw log require custom handling (i.e., they contain slashes that you do not wish to + translate into nested directory paths) then define a function of the following form: + + # For example + def asset_id_handler(*, raw_asset_id: str) -> str: + split_by_slash = raw_asset_id.split("/") + return split_by_slash[0] + "_" + split_by_slash[-1] + tqdm_kwargs : dict, optional + Keyword arguments to pass to the tqdm progress bar. + maximum_ram_usage_in_bytes : int, default: 1 GB + The theoretical maximum amount of RAM (in bytes) to be used throughout the process. """ - base_raw_s3_log_folder_path = pathlib.Path(base_raw_s3_log_folder_path) + raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path) parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path) parsed_s3_log_folder_path.mkdir(exist_ok=True) - - # Re-define some top-level pass-through items here to avoid repeated constructions excluded_ips = excluded_ips or collections.defaultdict(bool) - if exclude_github_ips: - for github_ip in _get_latest_github_ip_ranges(): - excluded_ips[github_ip] = True + tqdm_kwargs = tqdm_kwargs or dict() - def asset_id_handler(*, raw_asset_id: str) -> str: - split_by_slash = raw_asset_id.split("/") - return split_by_slash[0] + "_" + split_by_slash[-1] + reduced_logs = _get_reduced_log_lines( + raw_s3_log_file_path=raw_s3_log_file_path, + bucket=bucket, + request_type=request_type, + excluded_ips=excluded_ips, + tqdm_kwargs=tqdm_kwargs, + maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes, + ) - daily_raw_s3_log_file_paths = list() - base_folder_paths = [path for path in base_raw_s3_log_folder_path.iterdir() if path.stem.startswith("20")] - yearly_folder_paths = natsort.natsorted(seq=list(base_folder_paths)) - for yearly_folder_path in yearly_folder_paths: - monthly_folder_paths = natsort.natsorted(seq=list(yearly_folder_path.iterdir())) + reduced_logs_binned_by_unparsed_asset = dict() + for reduced_log in reduced_logs: + raw_asset_id = reduced_log.asset_id + reduced_logs_binned_by_unparsed_asset[raw_asset_id] = reduced_logs_binned_by_unparsed_asset.get( + raw_asset_id, collections.defaultdict(list) + ) - for monthly_folder_path in monthly_folder_paths: - daily_raw_s3_log_file_paths.extend(natsort.natsorted(seq=list(monthly_folder_path.glob("*.log")))) + reduced_logs_binned_by_unparsed_asset[raw_asset_id]["timestamp"].append(reduced_log.timestamp) + reduced_logs_binned_by_unparsed_asset[raw_asset_id]["bytes_sent"].append(reduced_log.bytes_sent) + reduced_logs_binned_by_unparsed_asset[raw_asset_id]["region"].append(reduced_log.region) - for raw_s3_log_file_path in tqdm.tqdm( - iterable=daily_raw_s3_log_file_paths, - desc="Parsing log files...", - position=0, - leave=True, - ): - parse_dandi_raw_s3_log( - raw_s3_log_file_path=raw_s3_log_file_path, - parsed_s3_log_folder_path=parsed_s3_log_folder_path, - mode=mode, - excluded_ips=excluded_ips, - exclude_github_ips=False, # Already included in list so avoid repeated construction - asset_id_handler=asset_id_handler, - tqdm_kwargs=dict(position=1, leave=False), - ) + if asset_id_handler is not None: + reduced_logs_binned_by_asset = dict() + for raw_asset_id, reduced_logs_per_asset in reduced_logs_binned_by_unparsed_asset.items(): + parsed_asset_id = asset_id_handler(raw_asset_id=raw_asset_id) + + reduced_logs_binned_by_asset[parsed_asset_id] = reduced_logs_per_asset + else: + reduced_logs_binned_by_asset = reduced_logs_binned_by_unparsed_asset + + for raw_asset_id, reduced_logs_per_asset in reduced_logs_binned_by_asset.items(): + parsed_s3_log_file_path = parsed_s3_log_folder_path / f"{raw_asset_id}.tsv" + + data_frame = pandas.DataFrame(data=reduced_logs_per_asset) + data_frame.to_csv(path_or_buf=parsed_s3_log_file_path, mode=mode, sep="\t") + + progress_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "progress" + progress_folder_path.mkdir(exist_ok=True) + + date = datetime.datetime.now().strftime("%y%m%d") + progress_file_path = progress_folder_path / f"{date}.txt" + with open(file=progress_file_path, mode="a") as io: + io.write(f"Parsed {raw_s3_log_file_path} successfully!\n") + + return None + + +def _get_reduced_log_lines( + *, + raw_s3_log_file_path: pathlib.Path, + bucket: str | None, + request_type: Literal["GET", "PUT"], + excluded_ips: collections.defaultdict[str, bool], + tqdm_kwargs: dict | None = None, + maximum_ram_usage_in_bytes: int = 10**9, +) -> list[ReducedLogLine]: + """ + Reduce the full S3 log file to minimal content and return a list of in-memory collections.namedtuple objects. + + Parameters + ---------- + raw_s3_log_file_path : str or pathlib.Path + Path to the raw S3 log file. + bucket : str + Only parse and return lines that match this bucket. + request_type : str + The type of request to filter for. + excluded_ips : collections.defaultdict of strings to booleans + A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. + tqdm_kwargs : dict, optional + Keyword arguments to pass to the tqdm progress bar. + maximum_ram_usage_in_bytes : int, default: 1 GB + The theoretical maximum amount of RAM (in bytes) to be used throughout the process. + """ + assert raw_s3_log_file_path.suffix == ".log", f"{raw_s3_log_file_path=} should end in '.log'!" + + # Collapse bucket to empty string instead of asking if it is None on each iteration + bucket = "" if bucket is None else bucket + tqdm_kwargs = tqdm_kwargs or dict() + + # One-time initialization/read of IP address to region cache for performance + # This dictionary is intended to be mutated throughout the process + ip_address_to_region = _load_ip_address_to_region_cache() + + # Perform I/O read in batches to improve performance + resolved_tqdm_kwargs = dict(desc="Parsing line buffers...", leave=False, mininterval=1.0) + resolved_tqdm_kwargs.update(tqdm_kwargs) + + reduced_log_lines = list() + per_buffer_index = 0 + buffered_text_reader = BufferedTextReader( + file_path=raw_s3_log_file_path, maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes + ) + for buffered_raw_lines in tqdm.tqdm(iterable=buffered_text_reader, **resolved_tqdm_kwargs): + for index, raw_line in enumerate(iterable=buffered_raw_lines, start=per_buffer_index): + _append_reduced_log_line( + raw_line=raw_line, + reduced_log_lines=reduced_log_lines, + bucket=bucket, + request_type=request_type, + excluded_ips=excluded_ips, + log_file_path=raw_s3_log_file_path, + index=index, + ip_hash_to_region=ip_address_to_region, + ) + per_buffer_index += index + + _save_ip_address_to_region_cache(ip_hash_to_region=ip_address_to_region) + + return reduced_log_lines diff --git a/tests/test_buffered_text_reader.py b/tests/test_buffered_text_reader.py index 6ffaa95..6fae0f8 100644 --- a/tests/test_buffered_text_reader.py +++ b/tests/test_buffered_text_reader.py @@ -35,9 +35,9 @@ def single_line_text_file_path(tmp_path_factory: pytest.TempPathFactory): def test_buffered_text_reader(large_text_file_path: pathlib.Path): - maximum_ram_usage = 10**6 # 1 MB + maximum_ram_usage_in_bytes = 10**6 # 1 MB buffered_text_reader = dandi_s3_log_parser.BufferedTextReader( - file_path=large_text_file_path, maximum_ram_usage=maximum_ram_usage + file_path=large_text_file_path, maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes ) assert iter(buffered_text_reader) is buffered_text_reader, "BufferedTextReader object is not iterable!" @@ -55,10 +55,10 @@ def test_buffered_text_reader(large_text_file_path: pathlib.Path): def test_value_error(single_line_text_file_path: pathlib.Path): - maximum_ram_usage = 10**6 # 1 MB + maximum_ram_usage_in_bytes = 10**6 # 1 MB with pytest.raises(ValueError) as error_info: buffered_text_reader = dandi_s3_log_parser.BufferedTextReader( - file_path=single_line_text_file_path, maximum_ram_usage=maximum_ram_usage + file_path=single_line_text_file_path, maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes ) next(buffered_text_reader) From 5ad025ed5e91a988e9130636476bee0be3681e92 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Mon, 5 Aug 2024 14:25:19 -0400 Subject: [PATCH 06/12] force include new testing log location --- tests/examples/ordered_example_0/example_dandi_s3_log.log | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 tests/examples/ordered_example_0/example_dandi_s3_log.log diff --git a/tests/examples/ordered_example_0/example_dandi_s3_log.log b/tests/examples/ordered_example_0/example_dandi_s3_log.log new file mode 100644 index 0000000..43bacc6 --- /dev/null +++ b/tests/examples/ordered_example_0/example_dandi_s3_log.log @@ -0,0 +1,2 @@ +8787a3c41bf7ce0d54359d9348ad5b08e16bd5bb8ae5aa4e1508b435773a066e dandiarchive [31/Dec/2021:23:06:42 +0000] 192.0.2.0 - NWC7V1KE70QZYJ5Q REST.GET.OBJECT blobs/a7b/032/a7b032b8-1e31-429f-975f-52a28cec6629 "GET /blobs/a7b/032/a7b032b8-1e31-429f-975f-52a28cec6629?versionId=yn5YAJiwT36Rv78jGYLM71GZumWL.QWn HTTP/1.1" 200 - 1443 1443 35 35 "-" "git-annex/8.20211028-g1c76278" yn5YAJiwT36Rv78jGYLM71GZumWL.QWn ojBg2QLVTSTWsCAe1HoC6IBNLUSPmWH276FdsedhZ/4CQ67DWuZQHcXXB9XUJxYKpnPHpJyBjMM= - ECDHE-RSA-AES128-GCM-SHA256 - dandiarchive.s3.amazonaws.com TLSv1.2 - +8787a3c41bf7ce0d54359d9348ad5b08e16bd5bb8ae5aa4e1508b435773a066e dandiarchive [04/May/2022:05:06:35 +0000] 192.0.2.0 - J42N2W7ET0EC03CV REST.GET.OBJECT blobs/11e/c89/11ec8933-1456-4942-922b-94e5878bb991 "GET /blobs/11e/c89/11ec8933-1456-4942-922b-94e5878bb991 HTTP/1.1" 206 - 512 171408 53 52 "-" "-" - DX8oFoKQx0o5V3lwEuWBxF5p2fSXrwINj0rnxmas0YgjWuPqYLK/vnW60Txh23K93aahe0IFw2c= - ECDHE-RSA-AES128-GCM-SHA256 - dandiarchive.s3.amazonaws.com TLSv1.2 - From 35a12b35f777ac23df7f80fe61881fca17c06107 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Tue, 6 Aug 2024 12:32:33 -0400 Subject: [PATCH 07/12] try restoring previous default for CI? --- src/dandi_s3_log_parser/_s3_log_file_parser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_parser.py index 8904c04..9ee25dd 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_file_parser.py @@ -30,7 +30,7 @@ def parse_all_dandi_raw_s3_logs( excluded_ips: collections.defaultdict[str, bool] | None = None, exclude_github_ips: bool = True, number_of_jobs: int = 1, - maximum_ram_usage_in_bytes: int = 40**9, + maximum_ram_usage_in_bytes: int = 10**9, ) -> None: """ Batch parse all raw S3 log files in a folder and write the results to a folder of TSV files. From 05abb2ced315ced525227b0effaad0ff8414d0da Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Tue, 6 Aug 2024 12:47:12 -0400 Subject: [PATCH 08/12] fix size issue --- .../_s3_log_file_parser.py | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_parser.py index 9ee25dd..3ffb0e2 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_file_parser.py @@ -30,7 +30,7 @@ def parse_all_dandi_raw_s3_logs( excluded_ips: collections.defaultdict[str, bool] | None = None, exclude_github_ips: bool = True, number_of_jobs: int = 1, - maximum_ram_usage_in_bytes: int = 10**9, + maximum_ram_usage_in_bytes: int = 4 * 10**9, ) -> None: """ Batch parse all raw S3 log files in a folder and write the results to a folder of TSV files. @@ -62,8 +62,8 @@ def parse_all_dandi_raw_s3_logs( Allows negative range to mean 'all but this many (minus one) jobs'. E.g., -1 means use all workers, -2 means all but one worker. WARNING: planned but not yet supported. - maximum_ram_usage_in_bytes : int, default: 1 GB - The theoretical maximum amount of RAM (in bytes) to be used throughout the process. + maximum_ram_usage_in_bytes : int, default: 4 GB + The theoretical maximum amount of RAM (in bytes) to be used across all the processes. """ base_raw_s3_log_folder_path = pathlib.Path(base_raw_s3_log_folder_path) parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path) @@ -116,6 +116,8 @@ def asset_id_handler(*, raw_asset_id: str) -> str: per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}" per_job_temporary_folder_path.mkdir(exist_ok=True) + maximum_ram_usage_in_bytes_per_job = maximum_ram_usage_in_bytes // number_of_jobs + futures = [] with ProcessPoolExecutor(max_workers=number_of_jobs) as executor: for raw_s3_log_file_path in daily_raw_s3_log_file_paths: @@ -129,7 +131,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str: excluded_ips=excluded_ips, exclude_github_ips=False, # Already included in list so avoid repeated construction asset_id_handler=asset_id_handler, - maximum_ram_usage_in_bytes=int(maximum_ram_usage_in_bytes / number_of_jobs), + maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes_per_job, ) ) @@ -187,7 +189,7 @@ def parse_dandi_raw_s3_log( exclude_github_ips: bool = True, asset_id_handler: Callable | None = None, tqdm_kwargs: dict | None = None, - maximum_ram_usage_in_bytes: int = 40**9, + maximum_ram_usage_in_bytes: int = 4 * 10**9, ) -> None: """ Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. @@ -224,7 +226,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str: return split_by_slash[0] + "_" + split_by_slash[-1] tqdm_kwargs : dict, optional Keyword arguments to pass to the tqdm progress bar. - maximum_ram_usage_in_bytes : int, default: 1 GB + maximum_ram_usage_in_bytes : int, default: 4 GB The theoretical maximum amount of RAM (in bytes) to be used throughout the process. """ tqdm_kwargs = tqdm_kwargs or dict() @@ -268,7 +270,7 @@ def parse_raw_s3_log( excluded_ips: collections.defaultdict[str, bool] | None = None, asset_id_handler: Callable | None = None, tqdm_kwargs: dict | None = None, - maximum_ram_usage_in_bytes: int = 40**9, + maximum_ram_usage_in_bytes: int = 4 * 10**9, ) -> None: """ Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. @@ -307,7 +309,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str: return split_by_slash[0] + "_" + split_by_slash[-1] tqdm_kwargs : dict, optional Keyword arguments to pass to the tqdm progress bar. - maximum_ram_usage_in_bytes : int, default: 1 GB + maximum_ram_usage_in_bytes : int, default: 4 GB The theoretical maximum amount of RAM (in bytes) to be used throughout the process. """ raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path) @@ -369,7 +371,7 @@ def _get_reduced_log_lines( request_type: Literal["GET", "PUT"], excluded_ips: collections.defaultdict[str, bool], tqdm_kwargs: dict | None = None, - maximum_ram_usage_in_bytes: int = 10**9, + maximum_ram_usage_in_bytes: int = 4 * 10**9, ) -> list[ReducedLogLine]: """ Reduce the full S3 log file to minimal content and return a list of in-memory collections.namedtuple objects. @@ -386,7 +388,7 @@ def _get_reduced_log_lines( A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. tqdm_kwargs : dict, optional Keyword arguments to pass to the tqdm progress bar. - maximum_ram_usage_in_bytes : int, default: 1 GB + maximum_ram_usage_in_bytes : int, default: 4 GB The theoretical maximum amount of RAM (in bytes) to be used throughout the process. """ assert raw_s3_log_file_path.suffix == ".log", f"{raw_s3_log_file_path=} should end in '.log'!" From 977bd7ddc79b12fc62867ba3d7962f8fdcd7e902 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Tue, 6 Aug 2024 13:27:03 -0400 Subject: [PATCH 09/12] finalize logical structure --- .../_s3_log_file_parser.py | 35 +++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_parser.py index 3ffb0e2..543ec2e 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_file_parser.py @@ -20,6 +20,7 @@ from ._s3_log_line_parser import ReducedLogLine, _append_reduced_log_line from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH from ._buffered_text_reader import BufferedTextReader +from ._order_parsed_logs import order_parsed_logs def parse_all_dandi_raw_s3_logs( @@ -112,9 +113,11 @@ def asset_id_handler(*, raw_asset_id: str) -> str: task_id = uuid.uuid4()[:5] temporary_folder_path = temporary_base_folder_path / task_id temporary_folder_path.mkdir(exist_ok=True) + per_job_temporary_folder_paths = list() for job_index in range(number_of_jobs): per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}" per_job_temporary_folder_path.mkdir(exist_ok=True) + per_job_temporary_folder_paths.append(per_job_temporary_folder_path) maximum_ram_usage_in_bytes_per_job = maximum_ram_usage_in_bytes // number_of_jobs @@ -145,8 +148,36 @@ def asset_id_handler(*, raw_asset_id: str) -> str: ): pass - # TODO: merge results back to central (also temporary) folder - # TODO: order results chronologically + merged_temporary_folder_path = temporary_folder_path / "merged" + merged_temporary_folder_path.mkdir(exist_ok=True) + + print("\n\nParallel parsing complete!\n\n") + + for per_job_temporary_folder_path in tqdm.tqdm( + iterable=per_job_temporary_folder_paths, + desc="Merging results across jobs...", + total=len(per_job_temporary_folder_paths), + position=0, + leave=True, + ): + parsed_s3_log_file_paths = list(per_job_temporary_folder_path.iterdir()) + for parsed_s3_log_file_path in tqdm.tqdm( + iterable=parsed_s3_log_file_paths, + desc="Merging results per job...", + total=len(parsed_s3_log_file_paths), + position=1, + leave=False, + mininterval=1.0, + ): + merged_temporary_file_path = merged_temporary_folder_path / parsed_s3_log_file_path.name + + parsed_s3_log = pandas.read_table(filepath_or_buffer=parsed_s3_log_file_path, index_col=0) + parsed_s3_log.to_csv(path_or_buf=merged_temporary_file_path, mode="a", sep="\t") + + order_parsed_logs( + unordered_parsed_s3_log_folder_path=merged_temporary_folder_path, + ordered_parsed_s3_log_folder_path=parsed_s3_log_folder_path, + ) return None From 039c6502d91aee1b177f9b5711d2043cafca080f Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Fri, 9 Aug 2024 12:23:47 -0400 Subject: [PATCH 10/12] finalize logical structure; add cleanup; add test --- .../_s3_log_file_parser.py | 12 ++++- tests/test_dandi_s3_log_parser.py | 45 +++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_parser.py index 543ec2e..80423a5 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_file_parser.py @@ -4,6 +4,7 @@ import datetime import pathlib import os +import shutil import uuid from concurrent.futures import ProcessPoolExecutor, as_completed from typing import Callable, Literal @@ -110,6 +111,11 @@ def asset_id_handler(*, raw_asset_id: str) -> str: # Create a fresh temporary directory in the home folder and then fresh subfolders for each job temporary_base_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "temp" temporary_base_folder_path.mkdir(exist_ok=True) + + # Clean up any previous tasks that failed to clean themselves up + for previous_task_folder_path in temporary_base_folder_path.iterdir(): + shutil.rmtree(path=previous_task_folder_path, ignore_errors=True) + task_id = uuid.uuid4()[:5] temporary_folder_path = temporary_base_folder_path / task_id temporary_folder_path.mkdir(exist_ok=True) @@ -179,6 +185,8 @@ def asset_id_handler(*, raw_asset_id: str) -> str: ordered_parsed_s3_log_folder_path=parsed_s3_log_folder_path, ) + shutil.rmtree(path=temporary_folder_path, ignore_errors=True) + return None @@ -442,7 +450,8 @@ def _get_reduced_log_lines( file_path=raw_s3_log_file_path, maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes ) for buffered_raw_lines in tqdm.tqdm(iterable=buffered_text_reader, **resolved_tqdm_kwargs): - for index, raw_line in enumerate(iterable=buffered_raw_lines, start=per_buffer_index): + index = 0 + for raw_line in buffered_raw_lines: _append_reduced_log_line( raw_line=raw_line, reduced_log_lines=reduced_log_lines, @@ -453,6 +462,7 @@ def _get_reduced_log_lines( index=index, ip_hash_to_region=ip_address_to_region, ) + index += 1 per_buffer_index += index _save_ip_address_to_region_cache(ip_hash_to_region=ip_address_to_region) diff --git a/tests/test_dandi_s3_log_parser.py b/tests/test_dandi_s3_log_parser.py index 0d7e4d0..6baabdf 100644 --- a/tests/test_dandi_s3_log_parser.py +++ b/tests/test_dandi_s3_log_parser.py @@ -49,4 +49,49 @@ def test_parse_dandi_raw_s3_log_example_0(tmpdir: py.path.local): pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log) +def test_parse_dandi_raw_s3_log_example_0_parallel(tmpdir: py.path.local): + """ + Most basic test of functionality. + + If there are failures in the parsing of any lines found in application, + please raise an issue and contribute them to the example log collection. + """ + tmpdir = pathlib.Path(tmpdir) + + file_parent = pathlib.Path(__file__).parent + examples_folder_path = file_parent / "examples" / "ordered_example_0" + example_raw_s3_log_file_path = examples_folder_path / "example_dandi_s3_log.log" + expected_parsed_s3_log_folder_path = examples_folder_path / "expected_output" + + test_parsed_s3_log_folder_path = tmpdir / "parsed_example_0" + dandi_s3_log_parser.parse_dandi_raw_s3_log( + raw_s3_log_file_path=example_raw_s3_log_file_path, + parsed_s3_log_folder_path=test_parsed_s3_log_folder_path, + number_of_jobs=2, + ) + test_output_file_paths = list(test_parsed_s3_log_folder_path.iterdir()) + + number_of_output_files = len(test_output_file_paths) + assert number_of_output_files != 0, f"Test expected_output folder ({test_parsed_s3_log_folder_path}) is empty!" + + # Increment this over time as more examples are added + expected_number_of_output_files = 2 + assert ( + number_of_output_files == expected_number_of_output_files + ), f"The number of asset files ({number_of_output_files}) does not match expectation!" + + expected_asset_ids = [file_path.stem for file_path in expected_parsed_s3_log_folder_path.iterdir()] + for test_parsed_s3_log_file_path in test_output_file_paths: + assert ( + test_parsed_s3_log_file_path.stem in expected_asset_ids + ), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!" + + test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path, index_col=0) + expected_parsed_s3_log_file_path = ( + expected_parsed_s3_log_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv" + ) + expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path, index_col=0) + pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log) + + # TODO: add tests for API and CLI usage of finding random example line from testing submodule From a771a5d1c9feb667c71ef7912dc58c3d0ef9ffec Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Fri, 9 Aug 2024 12:25:34 -0400 Subject: [PATCH 11/12] clarify role --- src/dandi_s3_log_parser/_s3_log_file_parser.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_parser.py index 80423a5..c31e8cf 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_file_parser.py @@ -166,18 +166,18 @@ def asset_id_handler(*, raw_asset_id: str) -> str: position=0, leave=True, ): - parsed_s3_log_file_paths = list(per_job_temporary_folder_path.iterdir()) - for parsed_s3_log_file_path in tqdm.tqdm( - iterable=parsed_s3_log_file_paths, + per_job_parsed_s3_log_file_paths = list(per_job_temporary_folder_path.iterdir()) + for per_job_parsed_s3_log_file_path in tqdm.tqdm( + iterable=per_job_parsed_s3_log_file_paths, desc="Merging results per job...", - total=len(parsed_s3_log_file_paths), + total=len(per_job_parsed_s3_log_file_paths), position=1, leave=False, mininterval=1.0, ): - merged_temporary_file_path = merged_temporary_folder_path / parsed_s3_log_file_path.name + merged_temporary_file_path = merged_temporary_folder_path / per_job_parsed_s3_log_file_path.name - parsed_s3_log = pandas.read_table(filepath_or_buffer=parsed_s3_log_file_path, index_col=0) + parsed_s3_log = pandas.read_table(filepath_or_buffer=per_job_parsed_s3_log_file_path, index_col=0) parsed_s3_log.to_csv(path_or_buf=merged_temporary_file_path, mode="a", sep="\t") order_parsed_logs( From 65c5d1579c656cac34a368bba79ba2c3fba59a6a Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Fri, 9 Aug 2024 19:01:57 -0400 Subject: [PATCH 12/12] fix tests --- tests/test_dandi_s3_log_parser.py | 48 +++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 9 deletions(-) diff --git a/tests/test_dandi_s3_log_parser.py b/tests/test_dandi_s3_log_parser.py index 6baabdf..6bb8e32 100644 --- a/tests/test_dandi_s3_log_parser.py +++ b/tests/test_dandi_s3_log_parser.py @@ -49,23 +49,53 @@ def test_parse_dandi_raw_s3_log_example_0(tmpdir: py.path.local): pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log) -def test_parse_dandi_raw_s3_log_example_0_parallel(tmpdir: py.path.local): - """ - Most basic test of functionality. +def parse_all_dandi_raw_s3_logs_example_0(tmpdir: py.path.local): + tmpdir = pathlib.Path(tmpdir) - If there are failures in the parsing of any lines found in application, - please raise an issue and contribute them to the example log collection. - """ + file_parent = pathlib.Path(__file__).parent + examples_folder_path = file_parent / "examples" / "ordered_example_0" + expected_parsed_s3_log_folder_path = examples_folder_path / "expected_output" + + test_parsed_s3_log_folder_path = tmpdir / "parsed_example_0" + dandi_s3_log_parser.parse_all_dandi_raw_s3_logs( + base_raw_s3_log_folder_path=examples_folder_path, + parsed_s3_log_folder_path=test_parsed_s3_log_folder_path, + ) + test_output_file_paths = list(test_parsed_s3_log_folder_path.iterdir()) + + number_of_output_files = len(test_output_file_paths) + assert number_of_output_files != 0, f"Test expected_output folder ({test_parsed_s3_log_folder_path}) is empty!" + + # Increment this over time as more examples are added + expected_number_of_output_files = 2 + assert ( + number_of_output_files == expected_number_of_output_files + ), f"The number of asset files ({number_of_output_files}) does not match expectation!" + + expected_asset_ids = [file_path.stem for file_path in expected_parsed_s3_log_folder_path.iterdir()] + for test_parsed_s3_log_file_path in test_output_file_paths: + assert ( + test_parsed_s3_log_file_path.stem in expected_asset_ids + ), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!" + + test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path, index_col=0) + expected_parsed_s3_log_file_path = ( + expected_parsed_s3_log_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv" + ) + expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path, index_col=0) + pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log) + + +def parse_all_dandi_raw_s3_logs_example_0_parallel(tmpdir: py.path.local): tmpdir = pathlib.Path(tmpdir) file_parent = pathlib.Path(__file__).parent examples_folder_path = file_parent / "examples" / "ordered_example_0" - example_raw_s3_log_file_path = examples_folder_path / "example_dandi_s3_log.log" expected_parsed_s3_log_folder_path = examples_folder_path / "expected_output" test_parsed_s3_log_folder_path = tmpdir / "parsed_example_0" - dandi_s3_log_parser.parse_dandi_raw_s3_log( - raw_s3_log_file_path=example_raw_s3_log_file_path, + dandi_s3_log_parser.parse_all_dandi_raw_s3_logs( + base_raw_s3_log_folder_path=examples_folder_path, parsed_s3_log_folder_path=test_parsed_s3_log_folder_path, number_of_jobs=2, )