Skip to content

Commit

Permalink
finalize logical structure
Browse files Browse the repository at this point in the history
  • Loading branch information
CodyCBakerPhD committed Aug 6, 2024
1 parent 05abb2c commit 977bd7d
Showing 1 changed file with 33 additions and 2 deletions.
35 changes: 33 additions & 2 deletions src/dandi_s3_log_parser/_s3_log_file_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ._s3_log_line_parser import ReducedLogLine, _append_reduced_log_line
from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH
from ._buffered_text_reader import BufferedTextReader
from ._order_parsed_logs import order_parsed_logs


def parse_all_dandi_raw_s3_logs(
Expand Down Expand Up @@ -112,9 +113,11 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
task_id = uuid.uuid4()[:5]
temporary_folder_path = temporary_base_folder_path / task_id
temporary_folder_path.mkdir(exist_ok=True)
per_job_temporary_folder_paths = list()
for job_index in range(number_of_jobs):
per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}"
per_job_temporary_folder_path.mkdir(exist_ok=True)
per_job_temporary_folder_paths.append(per_job_temporary_folder_path)

maximum_ram_usage_in_bytes_per_job = maximum_ram_usage_in_bytes // number_of_jobs

Expand Down Expand Up @@ -145,8 +148,36 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
):
pass

# TODO: merge results back to central (also temporary) folder
# TODO: order results chronologically
merged_temporary_folder_path = temporary_folder_path / "merged"
merged_temporary_folder_path.mkdir(exist_ok=True)

print("\n\nParallel parsing complete!\n\n")

for per_job_temporary_folder_path in tqdm.tqdm(
iterable=per_job_temporary_folder_paths,
desc="Merging results across jobs...",
total=len(per_job_temporary_folder_paths),
position=0,
leave=True,
):
parsed_s3_log_file_paths = list(per_job_temporary_folder_path.iterdir())
for parsed_s3_log_file_path in tqdm.tqdm(
iterable=parsed_s3_log_file_paths,
desc="Merging results per job...",
total=len(parsed_s3_log_file_paths),
position=1,
leave=False,
mininterval=1.0,
):
merged_temporary_file_path = merged_temporary_folder_path / parsed_s3_log_file_path.name

parsed_s3_log = pandas.read_table(filepath_or_buffer=parsed_s3_log_file_path, index_col=0)
parsed_s3_log.to_csv(path_or_buf=merged_temporary_file_path, mode="a", sep="\t")

order_parsed_logs(
unordered_parsed_s3_log_folder_path=merged_temporary_folder_path,
ordered_parsed_s3_log_folder_path=parsed_s3_log_folder_path,
)

return None

Expand Down

0 comments on commit 977bd7d

Please sign in to comment.