Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize over CPUs #20

Merged
merged 17 commits into from
Aug 9, 2024
Merged
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies = [
"PyYAML",
"click",
"natsort",
"dandi",
]
classifiers = [
"Programming Language :: Python",
Expand Down
2 changes: 2 additions & 0 deletions src/dandi_s3_log_parser/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH, IPINFO_CREDENTIALS, get_hash_salt
from ._s3_log_file_parser import parse_dandi_raw_s3_log, parse_raw_s3_log, parse_all_dandi_raw_s3_logs
from ._buffered_text_reader import BufferedTextReader
from ._order_parsed_logs import order_parsed_logs

__all__ = [
"DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH",
Expand All @@ -12,4 +13,5 @@
"parse_raw_s3_log",
"parse_dandi_raw_s3_log",
"parse_all_dandi_raw_s3_logs",
"order_parsed_logs",
]
10 changes: 5 additions & 5 deletions src/dandi_s3_log_parser/_buffered_text_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@


class BufferedTextReader:
def __init__(self, *, file_path: str | pathlib.Path, maximum_ram_usage: int = 10**9):
def __init__(self, *, file_path: str | pathlib.Path, maximum_ram_usage_in_bytes: int = 10**9):
"""
Lazily read a text file into RAM using buffers of a specified size.

Parameters
----------
file_path : string or pathlib.Path
The path to the text file to be read.
maximum_ram_usage : int, default: 1 GB
The theoretical maximum amount of RAM to be used by the BufferedTextReader object.
maximum_ram_usage_in_bytes : int, default: 1 GB
The theoretical maximum amount of RAM (in bytes) to be used by the BufferedTextReader object.
"""
self.file_path = file_path
self.maximum_ram_usage = maximum_ram_usage
self.maximum_ram_usage_in_bytes = maximum_ram_usage_in_bytes

# The actual amount of bytes to read per iteration is 3x less than theoretical maximum usage
# due to decoding and handling
self.buffer_size_in_bytes = int(maximum_ram_usage / 3)
self.buffer_size_in_bytes = int(maximum_ram_usage_in_bytes / 3)

self.total_file_size = pathlib.Path(file_path).stat().st_size
self.offset = 0
Expand Down
19 changes: 19 additions & 0 deletions src/dandi_s3_log_parser/_order_parsed_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import pathlib
import pandas


def order_parsed_logs(
unordered_parsed_s3_log_folder_path: pathlib.Path, ordered_parsed_s3_log_folder_path: pathlib.Path
) -> None:
"""Order the contents of all parsed log files chronologically."""
ordered_parsed_s3_log_folder_path.mkdir(exist_ok=True)

for unordered_parsed_s3_log_file_path in unordered_parsed_s3_log_folder_path.iterdir():
unordered_parsed_s3_log = pandas.read_table(filepath_or_buffer=unordered_parsed_s3_log_file_path, index_col=0)
ordered_parsed_s3_log = unordered_parsed_s3_log.sort_values(by="timestamp")

# correct index of first column
ordered_parsed_s3_log.index = range(len(ordered_parsed_s3_log))

ordered_parsed_s3_log_file_path = ordered_parsed_s3_log_folder_path / unordered_parsed_s3_log_file_path.name
ordered_parsed_s3_log.to_csv(path_or_buf=ordered_parsed_s3_log_file_path, sep="\t")
Loading