diff --git a/.github/CODE_OF_CONDUCT.rst b/.github/CODE_OF_CONDUCT.rst new file mode 100644 index 0000000..745f9c2 --- /dev/null +++ b/.github/CODE_OF_CONDUCT.rst @@ -0,0 +1,53 @@ +Contributor Covenant Code of Conduct +==================================== + +Our Pledge +---------- + +In the interest of fostering an open and welcoming environment, we as contributors and maintainers pledge to making participation in our project and our community a harassment-free experience for everyone, regardless of age, body size, disability, ethnicity, gender identity and expression, level of experience, nationality, personal appearance, race, religion, or sexual identity and orientation. + +Our Standards +------------- + +Examples of behavior that contributes to creating a positive environment include: + +* Using welcoming and inclusive language +* Being respectful of differing viewpoints and experiences +* Gracefully accepting constructive criticism +* Focusing on what is best for the community +* Showing empathy towards other community members + +Examples of unacceptable behavior by participants include: + +* The use of sexualized language or imagery and unwelcome sexual attention or advances +* Trolling, insulting/derogatory comments, and personal or political attacks +* Public or private harassment +* Publishing others’ private information, such as a physical or electronic address, without explicit permission +* Other conduct which could reasonably be considered inappropriate in a professional setting + +Our Responsibilities +-------------------- + +Project maintainers are responsible for clarifying the standards of acceptable behavior and are expected to take appropriate and fair corrective action in response to any instances of unacceptable behavior. + +Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors that they deem inappropriate, threatening, offensive, or harmful. + +Scope +----- + +This Code of Conduct applies both within project spaces and in public spaces when an individual is representing the project or its community. Examples of representing a project or community include using an official project e-mail address, posting via an official social media account, or acting as an appointed representative at an online or offline event. Representation of a project may be further defined and clarified by project maintainers. + +Enforcement +----------- + +Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project team. All complaints will be reviewed and investigated and will result in a response that is deemed necessary and appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. Further details of specific enforcement policies may be posted separately. + +Project maintainers who do not follow or enforce the Code of Conduct in good faith may face temporary or permanent repercussions as determined by other members of the project’s leadership. + +Attribution +----------- + +This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, available at [http://contributor-covenant.org/version/1/4][version] + +[homepage]: http://contributor-covenant.org +[version]: http://contributor-covenant.org/version/1/4/ diff --git a/.github/workflows/deploy_tests_on_pull_request.yml b/.github/workflows/deploy_tests_on_pull_request.yml new file mode 100644 index 0000000..72584f2 --- /dev/null +++ b/.github/workflows/deploy_tests_on_pull_request.yml @@ -0,0 +1,16 @@ +name: Deploy + +on: + pull_request: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + + DevTests: + uses: ./.github/workflows/testing_dev.yml + secrets: + IPINFO_CREDENTIALS: ${{ secrets.IPINFO_CREDENTIALS }} + CODECOV_CREDENTIALS: ${{ secrets.CODECOV_CREDENTIALS }} diff --git a/.github/workflows/testing_dev.yml b/.github/workflows/testing_dev.yml new file mode 100644 index 0000000..99c91fb --- /dev/null +++ b/.github/workflows/testing_dev.yml @@ -0,0 +1,63 @@ +name: Dev tests +on: + workflow_call: + secrets: + IPINFO_CREDENTIALS: + required: true + CODECOV_CREDENTIALS: + required: true + +env: + IPINFO_CREDENTIALS: ${{ secrets.IPINFO_CREDENTIALS }} + +jobs: + + run: + # Will read on PR dashboard as 'Deploy / DevTests / ubuntu' + # Action dashboard identified by 'Dev tests' + # Requirement settings identified as 'DevTests / ubuntu' + name: ubuntu + runs-on: ubuntu-latest + strategy: + fail-fast: false + + steps: + - uses: actions/checkout@v4 + - run: git fetch --prune --unshallow --tags + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Global Setup + run: | + python -m pip install -U pip + pip install pytest-cov + + - name: Install local checkout + run: pip install --no-cache-dir . + + - name: Display installed packages and their sources for debugging + run: pip list + + - name: Cache IP to region mapping to prevent overuse of ipinfo + uses: actions/cache@v4 + id: dandi_home_cache + with: + path: ~/.dandi_s3_log_parser + key: ubuntu_dandi_home_cache + + - name: Run pytest with coverage and printout coverage for debugging + run: | + pytest -vv -rsx --cov=dandi_s3_log_parser --cov-report xml:./coverage.xml + cat ./coverage.xml + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v3 + with: + token: ${{ secrets.CODECOV_CREDENTIALS }} + file: ./coverage.xml + flags: unittests + name: codecov-umbrella + fail_ci_if_error: true + verbose: true diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..65de2af --- /dev/null +++ b/.gitignore @@ -0,0 +1,162 @@ +# ASV and customization +.asv/intermediate_results +.asv/.raw_environment_info.txt + +# Dataset file / log file types: +/**/*.log + +# Byte-compiled / optimized / DLL files +/**/__pycache__/ +*.py[cod] + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Sphinx documentation +docs/build +docs/_build +docs/make.bat +docs/Makefile + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/ +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/artifacts +# .idea/compiler.xml +# .idea/jarRepositories.xml +# .idea/modules.xml +# .idea/*.iml +# .idea/modules +# *.iml +# *.ipr + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests + +# Android studio 3.1+ serialized cache file +.idea/caches/build_file_checksums.ser + +#Mac +.DS_Store + +# Vscode +.vscode/ +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +!.vscode/*.code-snippets + +# Local History for Visual Studio Code +.history/ + +# Built Visual Studio Code Extensions +*.vsix + +# Spyder +.spyproject/* diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..2fa87f1 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,26 @@ +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.6.0 + hooks: + - id: check-yaml + - id: end-of-file-fixer + - id: trailing-whitespace + +- repo: https://github.com/psf/black + rev: 24.4.2 + hooks: + - id: black + exclude: ^docs/ + +- repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.3.5 + hooks: + - id: ruff + args: [ --fix ] + +- repo: https://github.com/codespell-project/codespell + rev: v2.3.0 + hooks: + - id: codespell + additional_dependencies: + - tomli diff --git a/README.md b/README.md index b3bab96..66630fc 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,15 @@ -# dandi_s3_log_parser -S3 log parsing for the DANDI Archive. +# Raw S3 Log parser + +Simple reductions of consolidated S3 logs (consolidation step not included in this repository) into minimal information for sharing and plotting. + +Developed for the DANDI Archive. + + + +# Developer notes + +`.log` file suffixes should typically be ignored when working with Git, so when committing changes to the example log collection, you will have to forcibly include it with + +```bash +git add -f .log +``` diff --git a/license.txt b/license.txt new file mode 100644 index 0000000..6d5ffc6 --- /dev/null +++ b/license.txt @@ -0,0 +1,29 @@ +BSD 3-Clause License + +Copyright (c) 2024, CatalystNeuro +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..98c590a --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,71 @@ +[build-system] +requires = ["hatchling", "hatch-vcs"] +build-backend = "hatchling.build" + +[tool.hatch.version] +source = "vcs" + +[tool.hatch.build.targets.wheel] +packages = ["src/dandi_s3_log_parser"] + +[project] +name = "dandi_s3_log_parser" +version="0.0.1" +authors = [ + { name="Cody Baker", email="cody.baker@catalystneuro.com" }, +] +description = "Parse S3 logs to more easily calculate usage metrics per asset." +readme = "README.md" +keywords = ["aws", "s3", "log", "download tracking"] +license = {file = "license.txt"} +requires-python = ">=3.12" +dependencies = [ + "pandas", + "tqdm", + "ipinfo", + "PyYAML", +] +classifiers = [ + "Programming Language :: Python", + "Programming Language :: Python :: 3.12", + "License :: OSI Approved :: BSD License", + "Intended Audience :: Developers", + "Operating System :: Unix", +] + +[project.scripts] +parse_s3_logs = "dandi_s3_log_parser._command_line_interface:main" + + + +[tool.black] +line-length = 120 +target-version = ['py312'] +include = '\.pyi?$' +extend-exclude = ''' +/( + \.toml + |\.yml + |\.txt + |\.sh + |\.git + |\.ini + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | build + | dist +)/ +''' + + + +[tool.ruff] +exclude = [ + "*/__init__.py" +] + +[tool.ruff.lint.isort] +relative-imports-order = "closest-to-furthest" +known-first-party = ["dandi_s3_log_parser"] diff --git a/src/dandi_s3_log_parser/__init__.py b/src/dandi_s3_log_parser/__init__.py new file mode 100644 index 0000000..b0c6bed --- /dev/null +++ b/src/dandi_s3_log_parser/__init__.py @@ -0,0 +1,6 @@ +"""Outermost exposed imports; including global environment variables.""" + +from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH, IPINFO_CREDENTIALS +from ._s3_log_file_parser import parse_dandi_raw_s3_log, parse_raw_s3_log + +__all__ = ["DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH", "IPINFO_CREDENTIALS", "parse_raw_s3_log", "parse_dandi_raw_s3_log"] diff --git a/src/dandi_s3_log_parser/_command_line_interface.py b/src/dandi_s3_log_parser/_command_line_interface.py new file mode 100644 index 0000000..2ae1cf4 --- /dev/null +++ b/src/dandi_s3_log_parser/_command_line_interface.py @@ -0,0 +1,12 @@ +"""Call the raw S3 log parser from the command line.""" + +from ._s3_log_file_parser import parse_raw_s3_log + + +# TODO +def main() -> None: + parse_raw_s3_log() + + +if __name__ == "__main__": + main() diff --git a/src/dandi_s3_log_parser/_config.py b/src/dandi_s3_log_parser/_config.py new file mode 100644 index 0000000..20098d3 --- /dev/null +++ b/src/dandi_s3_log_parser/_config.py @@ -0,0 +1,11 @@ +import os +import pathlib + +DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH = pathlib.Path.home() / ".dandi_s3_log_parser" +DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH.mkdir(exist_ok=True) + +_IP_ADDRESS_TO_REGION_FILE_PATH = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "ip_address_to_region.yaml" + +if "IPINFO_CREDENTIALS" not in os.environ: + raise ValueError("The environment variable 'IPINFO_CREDENTIALS' must be set to import `dandi_s3_log_parser`!") +IPINFO_CREDENTIALS = os.environ["IPINFO_CREDENTIALS"] diff --git a/src/dandi_s3_log_parser/_ip_utils.py b/src/dandi_s3_log_parser/_ip_utils.py new file mode 100644 index 0000000..5b306e4 --- /dev/null +++ b/src/dandi_s3_log_parser/_ip_utils.py @@ -0,0 +1,88 @@ +"""Various private utility functions for handling IP address related tasks.""" + +import ipaddress +from typing import List + +import ipinfo +import requests +import yaml + +from ._config import _IP_ADDRESS_TO_REGION_FILE_PATH, IPINFO_CREDENTIALS + + +def _cidr_address_to_ip_range(cidr_address: str) -> List[str]: + """Convert a CIDR address to a list of IP addresses.""" + cidr_address_class = type(ipaddress.ip_address(cidr_address.split("/")[0])) + ip_address_range = list() + if cidr_address_class is ipaddress.IPv4Address: + ip_address_range = ipaddress.IPv4Network(address=cidr_address) + elif cidr_address_class is ipaddress.IPv6Address: + ip_address_range = ipaddress.IPv6Network(address=cidr_address) + + return [str(ip_address) for ip_address in ip_address_range] + + +def _get_latest_github_ip_ranges() -> list[str]: + """Retrieve the latest GitHub CIDR ranges from their API and expand them into a list of IP addresses.""" + github_ip_request = requests.get("https://api.github.com/meta").json() + + skip_keys = ["domains", "ssh_key_fingerprints", "verifiable_password_authentication", "ssh_keys"] + keys = set(github_ip_request.keys()) - set(skip_keys) + github_cidr_addresses = [ + cidr_address for key in keys for cidr_address in github_ip_request[key] if "::" not in cidr_address # Skip IPv6 + ] + + all_github_ips = [ + str(ip_address) + for cidr_address in github_cidr_addresses + for ip_address in _cidr_address_to_ip_range(cidr_address=cidr_address) + ] + + return all_github_ips + + +def _load_ip_address_to_region_cache() -> dict[str, str]: + """Load the IP address to region cache from disk.""" + if not _IP_ADDRESS_TO_REGION_FILE_PATH.exists(): + return dict() + + with open(file=_IP_ADDRESS_TO_REGION_FILE_PATH, mode="r") as stream: + return yaml.load(stream=stream, Loader=yaml.SafeLoader) + + +def _save_ip_address_to_region_cache(ip_address_to_region: dict[str, str]) -> None: + """Save the IP address to region cache to disk.""" + with open(file=_IP_ADDRESS_TO_REGION_FILE_PATH, mode="w") as stream: + yaml.dump(data=ip_address_to_region, stream=stream) + + +def _get_region_from_ip_address(ip_address_to_region: dict[str, str], ip_address: str) -> str: + """ + If the parsed S3 logs are meant to be shared openly, the remote IP could be used to directly identify individuals. + + Instead, identify the generic region of the world the request came from and report that instead. + """ + # Early return for speed + lookup_result = ip_address_to_region.get(ip_address, None) + if lookup_result is not None: + return lookup_result + + handler = ipinfo.getHandler(access_token=IPINFO_CREDENTIALS) + details = handler.getDetails(ip_address=ip_address) + + country = details.details.get("country", None) + region = details.details.get("region", None) + + region_string = "" # Not technically necessary, but quiets the linter + match (country is None, region is None): + case (True, True): + region_string = "unknown" + case (True, False): + region_string = region + case (False, True): + region_string = country + case (False, False): + region_string = f"{country}/{region}" + ip_address_to_region[ip_address] = region_string + + return region_string diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_parser.py new file mode 100644 index 0000000..260a9e4 --- /dev/null +++ b/src/dandi_s3_log_parser/_s3_log_file_parser.py @@ -0,0 +1,233 @@ +"""Primary functions for parsing raw S3 log file for DANDI.""" + +import collections +import pathlib +from typing import Callable, Literal + +import pandas +import tqdm + +from ._ip_utils import ( + _get_latest_github_ip_ranges, + _load_ip_address_to_region_cache, + _save_ip_address_to_region_cache, +) +from ._s3_log_line_parser import ReducedLogLine, _append_reduced_log_line + + +def _get_reduced_log_lines( + *, + raw_s3_log_file_path: pathlib.Path, + bucket: str | None, + request_type: Literal["GET", "PUT"], + excluded_ips: collections.defaultdict[str, bool], +) -> list[ReducedLogLine]: + """ + Reduce the full S3 log file to minimal content and return a list of in-memory collections.namedtuple objects. + + Parameters + ---------- + raw_s3_log_file_path : str or pathlib.Path + Path to the raw S3 log file. + bucket : str + Only parse and return lines that match this bucket. + request_type : str + The type of request to filter for. + excluded_ips : collections.defaultdict of strings to booleans + A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. + """ + assert raw_s3_log_file_path.suffix == ".log", f"{raw_s3_log_file_path=} should end in '.log'!" + + # Collapse bucket to empty string instead of asking if it is None on each iteration + bucket = "" if bucket is None else bucket + + # One-time initialization/read of IP address to region cache for performance + # This dictionary is intended to be mutated throughout the process + ip_address_to_region = _load_ip_address_to_region_cache() + + reduced_log_lines = list() + with open(file=raw_s3_log_file_path, mode="r") as io: + # Perform I/O read in one batch to improve performance + # TODO: for larger files, this loads entirely into RAM - need buffering + raw_lines = tqdm.tqdm(iterable=io.readlines()) # TODO: limit update speed of tqdm to improve performance + for index, raw_line in enumerate(raw_lines): + _append_reduced_log_line( + raw_line=raw_line, + reduced_log_lines=reduced_log_lines, + bucket=bucket, + request_type=request_type, + excluded_ips=excluded_ips, + log_file_path=raw_s3_log_file_path, + index=index, + ip_address_to_region=ip_address_to_region, + ) + + _save_ip_address_to_region_cache(ip_address_to_region=ip_address_to_region) + + return reduced_log_lines + + +def parse_raw_s3_log( + *, + raw_s3_log_file_path: str | pathlib.Path, + parsed_s3_log_folder_path: str | pathlib.Path, + mode: Literal["w", "a"] = "a", + bucket: str | None = None, + request_type: Literal["GET", "PUT"] = "GET", + excluded_ips: collections.defaultdict[str, bool] | None = None, + number_of_jobs: int = 1, + total_memory_in_bytes: int = 1e9, + asset_id_handler: Callable | None = None, +) -> None: + """ + Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. + + 'Parsing' here means: + - limiting only to requests of the specified type (i.e., GET, PUT, etc.) + - reducing the information to the asset ID, request time, request size, and geographic IP of the requester + + Parameters + ---------- + raw_s3_log_file_path : str or pathlib.Path + Path to the raw S3 log file. + parsed_s3_log_folder_path : str or pathlib.Path + Path to write each parsed S3 log file to. + There will be one file per handled asset ID. + mode : "w" or "a", default: "a" + How to resolve the case when files already exist in the folder containing parsed logs. + "w" will overwrite existing content, "a" will append or create if the file does not yet exist. + + The intention of the default usage is to have one consolidated raw S3 log file per day and then to iterate + over each day, parsing and binning by asset, effectively 'updating' the parsed collection on each iteration. + HINT: If this iteration is done in chronological order, the resulting parsed logs will also maintain that order. + bucket : str + Only parse and return lines that match this bucket. + request_type : str, default: "GET" + The type of request to filter for. + excluded_ips : collections.defaultdict of strings to booleans, optional + A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. + number_of_jobs : int, default: 1 + The number of jobs to use for parallel processing. + Allows negative range to mean 'all but this many (minus one) jobs'. + E.g., -1 means use all workers, -2 means all but one worker. + WARNING: planned but not yet supported. + total_memory_in_bytes : int, default: 2e9 + The number of bytes to load as a buffer into RAM per job. + Will automatically distribute this amount over the number of jobs. + WARNING: planned but not yet supported. + asset_id_handler : callable, optional + If your asset IDs in the raw log require custom handling (i.e., they contain slashes that you do not wish to + translate into nested directory paths) then define a function of the following form: + + # For example + def asset_id_handler(*, raw_asset_id: str) -> str: + split_by_slash = raw_asset_id.split("/") + return split_by_slash[0] + "_" + split_by_slash[-1] + """ + raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path) + parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path) + parsed_s3_log_folder_path.mkdir(exist_ok=True) + excluded_ips = excluded_ips or collections.defaultdict(bool) + + # TODO: buffering control + # total_file_size_in_bytes = raw_s3_log_file_path.lstat().st_size + # buffer_per_job_in_bytes = int(total_memory_in_bytes / number_of_jobs) + # Approximate using ~600 bytes per line + # number_of_lines_to_read_per_job = int(buffer_per_job_in_bytes / 600) + # number_of_iterations_per_job = int(total_file_size_in_bytes / number_of_lines_to_read_per_job) + + # TODO: finish polishing parallelization - just a draft for now + if number_of_jobs > 1: + raise NotImplementedError("Parallelization has not yet been implemented!") + # for _ in range(5) + # reduced_logs = _get_reduced_logs( + # raw_s3_log_file_path=raw_s3_log_file_path, + # lines_errors_file_path=lines_errors_file_path, + # bucket=bucket, + # request_type=request_type + # ) + else: + reduced_logs = _get_reduced_log_lines( + raw_s3_log_file_path=raw_s3_log_file_path, + bucket=bucket, + request_type=request_type, + excluded_ips=excluded_ips, + ) + + reduced_logs_binned_by_unparsed_asset = dict() + for reduced_log in reduced_logs: + raw_asset_id = reduced_log.asset_id + reduced_logs_binned_by_unparsed_asset[raw_asset_id] = reduced_logs_binned_by_unparsed_asset.get( + raw_asset_id, collections.defaultdict(list) + ) + + reduced_logs_binned_by_unparsed_asset[raw_asset_id]["timestamp"].append(reduced_log.timestamp) + reduced_logs_binned_by_unparsed_asset[raw_asset_id]["bytes_sent"].append(reduced_log.bytes_sent) + reduced_logs_binned_by_unparsed_asset[raw_asset_id]["region"].append(reduced_log.region) + + if asset_id_handler is not None: + reduced_logs_binned_by_asset = dict() + for raw_asset_id, reduced_logs_per_asset in reduced_logs_binned_by_unparsed_asset.items(): + parsed_asset_id = asset_id_handler(raw_asset_id=raw_asset_id) + + reduced_logs_binned_by_asset[parsed_asset_id] = reduced_logs_per_asset + else: + reduced_logs_binned_by_asset = reduced_logs_binned_by_unparsed_asset + + for raw_asset_id, reduced_logs_per_asset in reduced_logs_binned_by_asset.items(): + parsed_s3_log_file_path = parsed_s3_log_folder_path / f"{raw_asset_id}.tsv" + + data_frame = pandas.DataFrame(data=reduced_logs_per_asset) + data_frame.to_csv(path_or_buf=parsed_s3_log_file_path, mode=mode, sep="\t") + + +def parse_dandi_raw_s3_log( + *, + raw_s3_log_file_path: str | pathlib.Path, + parsed_s3_log_folder_path: str | pathlib.Path, + mode: Literal["w", "a"] = "a", +) -> None: + """ + Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. + + 'Parsing' here means: + - limiting only to requests of the specified type (i.e., GET, PUT, etc.) + - reducing the information to the asset ID, request time, request size, and geographic IP of the requester + + Parameters + ---------- + raw_s3_log_file_path : str or pathlib.Path + Path to the raw S3 log file. + parsed_s3_log_folder_path : str or pathlib.Path + Path to write each parsed S3 log file to. + There will be one file per handled asset ID. + mode : "w" or "a", default: "a" + How to resolve the case when files already exist in the folder containing parsed logs. + "w" will overwrite existing content, "a" will append or create if the file does not yet exist. + + The intention of the default usage is to have one consolidated raw S3 log file per day and then to iterate + over each day, parsing and binning by asset, effectively 'updating' the parsed collection on each iteration. + HINT: If this iteration is done in chronological order, the resulting parsed logs will also maintain that order. + """ + bucket = "dandiarchive" + request_type = "GET" + + # Form a lookup for IP addresses to exclude; much faster than asking 'if in' a list on each iteration + # Exclude GitHub actions, which are responsible for running health checks on archive which bloat the logs + excluded_ips = collections.defaultdict(bool) + for github_ip in _get_latest_github_ip_ranges(): + excluded_ips[github_ip] = True + + def asset_id_handler(*, raw_asset_id: str) -> str: + split_by_slash = raw_asset_id.split("/") + return split_by_slash[0] + "_" + split_by_slash[-1] + + return parse_raw_s3_log( + raw_s3_log_file_path=raw_s3_log_file_path, + parsed_s3_log_folder_path=parsed_s3_log_folder_path, + mode=mode, + bucket=bucket, + request_type=request_type, + excluded_ips=excluded_ips, + asset_id_handler=asset_id_handler, + ) diff --git a/src/dandi_s3_log_parser/_s3_log_line_parser.py b/src/dandi_s3_log_parser/_s3_log_line_parser.py new file mode 100644 index 0000000..a17775d --- /dev/null +++ b/src/dandi_s3_log_parser/_s3_log_line_parser.py @@ -0,0 +1,188 @@ +""" +Primary functions for parsing a single line of a raw S3 log. + +The strategy is to... + +1) Parse the raw line into a list of strings using a regex pattern. +2) Construct a FullLogLine object from the parsed line. A collections.namedtuple object is used for performance. +3) Reduce and map the information from the FullLogLine into a ReducedLogLine object. + This uses a lot less memory than the full version. + Some of the mapping operations at this step include... + - Identifying the DANDI asset ID from the full blob. + - Parsing the timestamp in memory as a datetime.datetime object. + - Filtering out log lines from excluded IPs (such as Drogon or GitHub actions). + - Converting the full remote IP to a country and region, so it can be saved without violating privacy. +""" + +import collections +import datetime +import importlib +import pathlib +import re + +from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH +from ._ip_utils import _get_region_from_ip_address + +FULL_PATTERN_TO_FIELD_MAPPING = [ + "bucket_owner", + "bucket", + "timestamp", + "remote_ip", + "requester", + "request_id", + "operation", + "asset_id", + "request_uri", + # "http_version", # Regex not splitting this from the request_uri... + "status_code", + "error_code", + "bytes_sent", + "object_size", + "total_time", + "turn_around_time", + "referrer", + "user_agent", + "version", + "host_id", + "sigv", + "cipher_suite", + "auth_type", + "endpoint", + "tls_version", + "access_point_arn", + "extra", # TODO: Never figured out what this field is... +] +REDUCED_PATTERN_TO_FIELD_MAPPING = ["asset_id", "timestamp", "bytes_sent", "region"] + +FullLogLine = collections.namedtuple("FullLogLine", FULL_PATTERN_TO_FIELD_MAPPING) +ReducedLogLine = collections.namedtuple("ReducedLogLine", REDUCED_PATTERN_TO_FIELD_MAPPING) + + +# Original +# S3_LOG_REGEX = re.compile(r'(?:"([^"]+)")|(?:\[([^\]]+)\])|([^ ]+)') + + +# AI corrected... +S3_LOG_REGEX = re.compile(r'"([^"]+)"|\[([^]]+)]|([^ ]+)') + + +def _parse_s3_log_line(*, raw_line: str) -> list[str]: + """The current method of parsing lines of an S3 log file.""" + parsed_log_line = [a or b or c for a, b, c in S3_LOG_REGEX.findall(raw_line)] + + return parsed_log_line + + +def _get_full_log_line( + *, + parsed_log_line: list[str], + log_file_path: pathlib.Path, + index: int, + raw_line: str, +) -> FullLogLine | None: + """Construct a FullLogLine from a single parsed log line, or dump to error collection file and return None.""" + full_log_line = None + + number_of_parsed_items = len(parsed_log_line) + match number_of_parsed_items: + # ARN not detected + case 24: + parsed_log_line.append("-") + parsed_log_line.append("-") + full_log_line = FullLogLine(*parsed_log_line) + # Expected form most of the time + case 25: + parsed_log_line.append("-") + full_log_line = FullLogLine(*parsed_log_line) + # Happens for certain types of HEAD requests + case 26: + full_log_line = FullLogLine(*parsed_log_line) + + # Deviant log entry; usually some very ill-formed content in the URI + # Dump information to a log file in the base folder for easy sharing + if full_log_line is None: + errors_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "errors" + errors_folder_path.mkdir(exist_ok=True) + + dandi_s3_log_parser_version = importlib.metadata.version("dandi_s3_log_parser") + date = datetime.datetime.now().strftime("%y%m%d") + lines_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_lines_errors.txt" + + with open(file=lines_errors_file_path, mode="a") as io: + io.write(f"Line {index} of {log_file_path} (parsed {number_of_parsed_items} items): {raw_line}") + + return full_log_line + + +def _append_reduced_log_line( + *, + raw_line: str, + reduced_log_lines: list[ReducedLogLine], + bucket: str, + request_type: str, + excluded_ips: collections.defaultdict[str, bool], + log_file_path: pathlib.Path, + index: int, + ip_address_to_region: dict[str, str], +) -> None: + """ + Append the `reduced_log_lines` list with a ReducedLogLine constructed from a single raw log line, if it is valid. + + Parameters + ---------- + raw_line : str + A single line from the raw S3 log file. + reduced_log_lines : list of ReducedLogLine + The list of ReducedLogLine objects to mutate in place. + This is done to reduce overhead of copying/returning items in-memory via a return-based approach. + bucket : string + Only parse and return lines that match this bucket string. + request_type : string + The type of request to filter for. + excluded_ips : collections.defaultdict of strings to booleans + A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. + """ + bucket = "" if bucket is None else bucket + excluded_ips = excluded_ips or collections.defaultdict(bool) + + parsed_log_line = _parse_s3_log_line(raw_line=raw_line) + + full_log_line = _get_full_log_line( + parsed_log_line=parsed_log_line, + log_file_path=log_file_path, + index=index, + raw_line=raw_line, + ) + + # Various early skip conditions + if full_log_line.bucket != bucket: + return None + + # Skip all non-perfect status + if full_log_line.status_code != "200": + return None + + # Derived from command string, e.g., "HEAD /blobs/b38/..." + # Subset first 7 characters for performance + parsed_request_type = full_log_line.request_uri[:4].removesuffix(" ") + if parsed_request_type != request_type: + return None + + if excluded_ips[full_log_line.remote_ip]: + return None + + assert ( + full_log_line.timestamp[-5:] == "+0000" + ), f"Unexpected time shift attached to log! Have always seen '+0000', found '{full_log_line.timestamp[-5:]}'." + + parsed_timestamp = datetime.datetime.strptime(full_log_line.timestamp[:-6], "%d/%b/%Y:%H:%M:%S") + parsed_bytes_sent = int(full_log_line.bytes_sent) if full_log_line.bytes_sent != "-" else 0 + region = _get_region_from_ip_address(ip_address_to_region=ip_address_to_region, ip_address=full_log_line.remote_ip) + reduced_log_line = ReducedLogLine( + asset_id=full_log_line.asset_id, + timestamp=parsed_timestamp, + bytes_sent=parsed_bytes_sent, + region=region, + ) + + reduced_log_lines.append(reduced_log_line) diff --git a/tests/examples/example_0/example_dandi_s3_log.log b/tests/examples/example_0/example_dandi_s3_log.log new file mode 100644 index 0000000..e77c15a --- /dev/null +++ b/tests/examples/example_0/example_dandi_s3_log.log @@ -0,0 +1 @@ +8787a3c41bf7ce0d54359d9348ad5b08e16bd5bb8ae5aa4e1508b435773a066e dandiarchive [31/Dec/2021:23:06:42 +0000] 192.0.2.0 - NWC7V1KE70QZYJ5Q REST.GET.OBJECT blobs/a7b/032/a7b032b8-1e31-429f-975f-52a28cec6629 "GET /blobs/a7b/032/a7b032b8-1e31-429f-975f-52a28cec6629?versionId=yn5YAJiwT36Rv78jGYLM71GZumWL.QWn HTTP/1.1" 200 - 1443 1443 35 35 "-" "git-annex/8.20211028-g1c76278" yn5YAJiwT36Rv78jGYLM71GZumWL.QWn ojBg2QLVTSTWsCAe1HoC6IBNLUSPmWH276FdsedhZ/4CQ67DWuZQHcXXB9XUJxYKpnPHpJyBjMM= - ECDHE-RSA-AES128-GCM-SHA256 - dandiarchive.s3.amazonaws.com TLSv1.2 - diff --git a/tests/examples/example_0/output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv b/tests/examples/example_0/output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv new file mode 100644 index 0000000..6980324 --- /dev/null +++ b/tests/examples/example_0/output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv @@ -0,0 +1,2 @@ + timestamp bytes_sent region +0 2021-12-31 23:06:42 1443 unknown diff --git a/tests/test_dandi_s3_log_parser.py b/tests/test_dandi_s3_log_parser.py new file mode 100644 index 0000000..506812a --- /dev/null +++ b/tests/test_dandi_s3_log_parser.py @@ -0,0 +1,49 @@ +import pathlib + +import pandas +import py + +import dandi_s3_log_parser + + +def test_parse_dandi_raw_s3_log_example_0(tmpdir: py.path.local): + """ + Most basic test of functionality. + + If there are failures in the parsing of any lines found in application, + please raise an issue and contribute them to the example log collection. + """ + tmpdir = pathlib.Path(tmpdir) + + file_parent = pathlib.Path(__file__).parent + examples_folder_path = file_parent / "examples" / "example_0" + example_raw_s3_log_file_path = examples_folder_path / "example_dandi_s3_log.log" + expected_parsed_s3_log_folder_path = examples_folder_path / "output" + + test_parsed_s3_log_folder_path = tmpdir / "parsed_example_0" + dandi_s3_log_parser.parse_dandi_raw_s3_log( + raw_s3_log_file_path=example_raw_s3_log_file_path, parsed_s3_log_folder_path=test_parsed_s3_log_folder_path + ) + test_output_file_paths = list(test_parsed_s3_log_folder_path.iterdir()) + + number_of_output_files = len(test_output_file_paths) + assert number_of_output_files != 0, f"Test output folder ({test_parsed_s3_log_folder_path}) is empty!" + + # Increment this over time as more examples are added + expected_number_of_output_files = 1 + assert ( + number_of_output_files == expected_number_of_output_files + ), f"The number of asset files ({number_of_output_files}) does not match expectation!" + + expected_asset_ids = [file_path.stem for file_path in expected_parsed_s3_log_folder_path.iterdir()] + for test_parsed_s3_log_file_path in test_output_file_paths: + assert ( + test_parsed_s3_log_file_path.stem in expected_asset_ids + ), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!" + + test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path) + expected_parsed_s3_log_file_path = ( + expected_parsed_s3_log_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv" + ) + expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path) + pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log)