Skip to content

Commit

Permalink
Parallelize over CPUs (#20)
Browse files Browse the repository at this point in the history
* add test placeholder and fixture

* style

* scope seed

* setup ordering helper and tests

* add basic parallel structure

* force include new testing log location

* try restoring previous default for CI?

* fix size issue

* finalize logical structure

* finalize logical structure; add cleanup; add test

* clarify role

* fix tests

---------

Co-authored-by: CodyCBakerPhD <[email protected]>
  • Loading branch information
CodyCBakerPhD and CodyCBakerPhD authored Aug 9, 2024
1 parent b86d789 commit 5e2ef3b
Show file tree
Hide file tree
Showing 15 changed files with 474 additions and 204 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies = [
"PyYAML",
"click",
"natsort",
"dandi",
]
classifiers = [
"Programming Language :: Python",
Expand Down
2 changes: 2 additions & 0 deletions src/dandi_s3_log_parser/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH, IPINFO_CREDENTIALS, get_hash_salt
from ._s3_log_file_parser import parse_dandi_raw_s3_log, parse_raw_s3_log, parse_all_dandi_raw_s3_logs
from ._buffered_text_reader import BufferedTextReader
from ._order_parsed_logs import order_parsed_logs

__all__ = [
"DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH",
Expand All @@ -12,4 +13,5 @@
"parse_raw_s3_log",
"parse_dandi_raw_s3_log",
"parse_all_dandi_raw_s3_logs",
"order_parsed_logs",
]
10 changes: 5 additions & 5 deletions src/dandi_s3_log_parser/_buffered_text_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@


class BufferedTextReader:
def __init__(self, *, file_path: str | pathlib.Path, maximum_ram_usage: int = 10**9):
def __init__(self, *, file_path: str | pathlib.Path, maximum_ram_usage_in_bytes: int = 10**9):
"""
Lazily read a text file into RAM using buffers of a specified size.
Parameters
----------
file_path : string or pathlib.Path
The path to the text file to be read.
maximum_ram_usage : int, default: 1 GB
The theoretical maximum amount of RAM to be used by the BufferedTextReader object.
maximum_ram_usage_in_bytes : int, default: 1 GB
The theoretical maximum amount of RAM (in bytes) to be used by the BufferedTextReader object.
"""
self.file_path = file_path
self.maximum_ram_usage = maximum_ram_usage
self.maximum_ram_usage_in_bytes = maximum_ram_usage_in_bytes

# The actual amount of bytes to read per iteration is 3x less than theoretical maximum usage
# due to decoding and handling
self.buffer_size_in_bytes = int(maximum_ram_usage / 3)
self.buffer_size_in_bytes = int(maximum_ram_usage_in_bytes / 3)

self.total_file_size = pathlib.Path(file_path).stat().st_size
self.offset = 0
Expand Down
19 changes: 19 additions & 0 deletions src/dandi_s3_log_parser/_order_parsed_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import pathlib
import pandas


def order_parsed_logs(
unordered_parsed_s3_log_folder_path: pathlib.Path, ordered_parsed_s3_log_folder_path: pathlib.Path
) -> None:
"""Order the contents of all parsed log files chronologically."""
ordered_parsed_s3_log_folder_path.mkdir(exist_ok=True)

for unordered_parsed_s3_log_file_path in unordered_parsed_s3_log_folder_path.iterdir():
unordered_parsed_s3_log = pandas.read_table(filepath_or_buffer=unordered_parsed_s3_log_file_path, index_col=0)
ordered_parsed_s3_log = unordered_parsed_s3_log.sort_values(by="timestamp")

# correct index of first column
ordered_parsed_s3_log.index = range(len(ordered_parsed_s3_log))

ordered_parsed_s3_log_file_path = ordered_parsed_s3_log_folder_path / unordered_parsed_s3_log_file_path.name
ordered_parsed_s3_log.to_csv(path_or_buf=ordered_parsed_s3_log_file_path, sep="\t")
Loading

0 comments on commit 5e2ef3b

Please sign in to comment.