diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_parser.py index 3ffb0e2..543ec2e 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_file_parser.py @@ -20,6 +20,7 @@ from ._s3_log_line_parser import ReducedLogLine, _append_reduced_log_line from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH from ._buffered_text_reader import BufferedTextReader +from ._order_parsed_logs import order_parsed_logs def parse_all_dandi_raw_s3_logs( @@ -112,9 +113,11 @@ def asset_id_handler(*, raw_asset_id: str) -> str: task_id = uuid.uuid4()[:5] temporary_folder_path = temporary_base_folder_path / task_id temporary_folder_path.mkdir(exist_ok=True) + per_job_temporary_folder_paths = list() for job_index in range(number_of_jobs): per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}" per_job_temporary_folder_path.mkdir(exist_ok=True) + per_job_temporary_folder_paths.append(per_job_temporary_folder_path) maximum_ram_usage_in_bytes_per_job = maximum_ram_usage_in_bytes // number_of_jobs @@ -145,8 +148,36 @@ def asset_id_handler(*, raw_asset_id: str) -> str: ): pass - # TODO: merge results back to central (also temporary) folder - # TODO: order results chronologically + merged_temporary_folder_path = temporary_folder_path / "merged" + merged_temporary_folder_path.mkdir(exist_ok=True) + + print("\n\nParallel parsing complete!\n\n") + + for per_job_temporary_folder_path in tqdm.tqdm( + iterable=per_job_temporary_folder_paths, + desc="Merging results across jobs...", + total=len(per_job_temporary_folder_paths), + position=0, + leave=True, + ): + parsed_s3_log_file_paths = list(per_job_temporary_folder_path.iterdir()) + for parsed_s3_log_file_path in tqdm.tqdm( + iterable=parsed_s3_log_file_paths, + desc="Merging results per job...", + total=len(parsed_s3_log_file_paths), + position=1, + leave=False, + mininterval=1.0, + ): + merged_temporary_file_path = merged_temporary_folder_path / parsed_s3_log_file_path.name + + parsed_s3_log = pandas.read_table(filepath_or_buffer=parsed_s3_log_file_path, index_col=0) + parsed_s3_log.to_csv(path_or_buf=merged_temporary_file_path, mode="a", sep="\t") + + order_parsed_logs( + unordered_parsed_s3_log_folder_path=merged_temporary_folder_path, + ordered_parsed_s3_log_folder_path=parsed_s3_log_folder_path, + ) return None