Skip to content

Commit

Permalink
debug
Browse files Browse the repository at this point in the history
  • Loading branch information
CodyCBakerPhD committed Aug 16, 2024
1 parent d71fec0 commit 59ba412
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 26 deletions.
32 changes: 20 additions & 12 deletions src/dandi_s3_log_parser/_dandi_s3_log_file_reducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,17 @@ def reduce_all_dandi_raw_s3_logs(

print("\n\nParallel parsing complete!\n\n")

for per_worker_temporary_folder_path in tqdm.tqdm(
iterable=per_worker_temporary_folder_paths,
desc="Merging results across workers...",
total=len(per_worker_temporary_folder_paths),
position=0,
leave=True,
mininterval=2.0,
for worker_index, per_worker_temporary_folder_path in enumerate(
tqdm.tqdm(
iterable=per_worker_temporary_folder_paths,
desc="Merging results across workers...",
total=len(per_worker_temporary_folder_paths),
position=0,
leave=True,
mininterval=2.0,
)
):
per_worker_reduced_s3_log_file_paths = list(per_worker_temporary_folder_path.iterdir())
per_worker_reduced_s3_log_file_paths = list(per_worker_temporary_folder_path.rglob("*.tsv"))
assert (
len(per_worker_reduced_s3_log_file_paths) != 0
), f"No files found in {per_worker_temporary_folder_path}!"
Expand All @@ -160,20 +162,26 @@ def reduce_all_dandi_raw_s3_logs(
leave=False,
mininterval=2.0,
):
merged_temporary_file_path = reduced_s3_logs_folder_path / per_worker_reduced_s3_log_file_path.name
merge_target_file_path = reduced_s3_logs_folder_path / per_worker_reduced_s3_log_file_path.relative_to(
per_worker_temporary_folder_path
)

parsed_s3_log = pandas.read_table(filepath_or_buffer=per_worker_reduced_s3_log_file_path, header=0)

header = False if merged_temporary_file_path.exists() else True
merge_target_file_path_exists = merge_target_file_path.exists()
if not merge_target_file_path_exists and not merge_target_file_path.parent.exists():
merge_target_file_path.parent.mkdir(exist_ok=True, parents=True)

header = False if merge_target_file_path_exists else True
parsed_s3_log.to_csv(
path_or_buf=merged_temporary_file_path,
path_or_buf=merge_target_file_path,
mode="a",
sep="\t",
header=header,
index=False,
)

print("\n\n")
shutil.rmtree(path=temporary_base_folder_path)


# Function cannot be covered because the line calls occur on subprocesses
Expand Down
8 changes: 5 additions & 3 deletions tests/test_reduce_all_dandi_raw_s3_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def test_reduce_all_dandi_raw_s3_logs_example_1(tmpdir: py.path.local) -> None:
base_raw_s3_logs_folder_path=examples_folder_path,
reduced_s3_logs_folder_path=test_reduced_s3_logs_folder_path,
)
test_output_file_paths = list(test_reduced_s3_logs_folder_path.iterdir())
test_output_file_paths = list(test_reduced_s3_logs_folder_path.rglob("*.tsv"))

number_of_output_files = len(test_output_file_paths)
assert number_of_output_files != 0, f"Test expected_output folder ({test_reduced_s3_logs_folder_path}) is empty!"
Expand All @@ -32,15 +32,17 @@ def test_reduce_all_dandi_raw_s3_logs_example_1(tmpdir: py.path.local) -> None:
number_of_output_files == expected_number_of_output_files
), f"The number of asset files ({number_of_output_files}) does not match expectation!"

expected_asset_ids = [file_path.stem for file_path in expected_reduced_s3_logs_folder_path.iterdir()]
expected_asset_ids = [file_path.stem for file_path in expected_reduced_s3_logs_folder_path.rglob("*.tsv")]
for test_parsed_s3_log_file_path in test_output_file_paths:
assert (
test_parsed_s3_log_file_path.stem in expected_asset_ids
), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!"

test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path)

blob_id = test_parsed_s3_log_file_path.stem
expected_parsed_s3_log_file_path = (
expected_reduced_s3_logs_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv"
expected_reduced_s3_logs_folder_path / "blobs" / blob_id[:3] / blob_id[3:6] / f"{blob_id}.tsv"
)
expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path)

Expand Down
10 changes: 5 additions & 5 deletions tests/test_reduce_all_dandi_raw_s3_logs_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ def test_reduce_all_dandi_raw_s3_logs_example_0_parallel(tmpdir: py.path.local)
reduced_s3_logs_folder_path=test_reduced_s3_logs_folder_path,
maximum_number_of_workers=2,
)
test_output_file_paths = [
path for path in test_reduced_s3_logs_folder_path.iterdir() if path.is_file()
] # Skip .temp
test_output_file_paths = [path for path in test_reduced_s3_logs_folder_path.rglob("*.tsv")]

number_of_output_files = len(test_output_file_paths)
assert number_of_output_files != 0, f"Test expected_output folder ({test_reduced_s3_logs_folder_path}) is empty!"
Expand All @@ -35,15 +33,17 @@ def test_reduce_all_dandi_raw_s3_logs_example_0_parallel(tmpdir: py.path.local)
number_of_output_files == expected_number_of_output_files
), f"The number of asset files ({number_of_output_files}) does not match expectation!"

expected_asset_ids = [file_path.stem for file_path in expected_reduced_s3_logs_folder_path.iterdir()]
expected_asset_ids = [file_path.stem for file_path in expected_reduced_s3_logs_folder_path.rglob("*.tsv")]
for test_parsed_s3_log_file_path in test_output_file_paths:
assert (
test_parsed_s3_log_file_path.stem in expected_asset_ids
), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!"

test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path)

blob_id = test_parsed_s3_log_file_path.stem
expected_parsed_s3_log_file_path = (
expected_reduced_s3_logs_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv"
expected_reduced_s3_logs_folder_path / "blobs" / blob_id[:3] / blob_id[3:6] / f"{blob_id}.tsv"
)
expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path)

Expand Down
9 changes: 6 additions & 3 deletions tests/test_reduce_dandi_raw_s3_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_reduce_dandi_raw_s3_log_example_0(tmpdir: py.path.local) -> None:
raw_s3_log_file_path=example_raw_s3_log_file_path,
reduced_s3_logs_folder_path=test_reduced_s3_logs_folder_path,
)
test_output_file_paths = list(test_reduced_s3_logs_folder_path.iterdir())
test_output_file_paths = list(test_reduced_s3_logs_folder_path.rglob("*.tsv"))

number_of_output_files = len(test_output_file_paths)
assert number_of_output_files != 0, f"Test expected_output folder ({test_reduced_s3_logs_folder_path}) is empty!"
Expand All @@ -38,15 +38,18 @@ def test_reduce_dandi_raw_s3_log_example_0(tmpdir: py.path.local) -> None:
number_of_output_files == expected_number_of_output_files
), f"The number of asset files ({number_of_output_files}) does not match expectation!"

expected_asset_ids = [path.stem for path in expected_reduced_s3_logs_folder_path.iterdir() if path.is_file()]
expected_asset_ids = [path.stem for path in expected_reduced_s3_logs_folder_path.rglob("*.tsv")]
for test_parsed_s3_log_file_path in test_output_file_paths:
assert (
test_parsed_s3_log_file_path.stem in expected_asset_ids
), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!"

test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path)

blob_id = test_parsed_s3_log_file_path.stem
expected_parsed_s3_log_file_path = (
expected_reduced_s3_logs_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv"
expected_reduced_s3_logs_folder_path / "blobs" / blob_id[:3] / blob_id[3:6] / f"{blob_id}.tsv"
)
expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path)

pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log)
9 changes: 6 additions & 3 deletions tests/test_reduce_dandi_raw_s3_log_bad_lines.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,26 @@ def test_reduce_dandi_raw_s3_log_bad_lines(tmpdir: py.path.local) -> None:
raw_s3_log_file_path=example_raw_s3_log_file_path,
reduced_s3_logs_folder_path=test_reduced_s3_logs_folder_path,
)
test_output_file_paths = list(test_reduced_s3_logs_folder_path.iterdir())
test_output_file_paths = list(test_reduced_s3_logs_folder_path.rglob("*.tsv"))

number_of_output_files = len(test_output_file_paths)
expected_number_of_output_files = 3
assert number_of_output_files == expected_number_of_output_files

expected_asset_ids = [path.stem for path in expected_reduced_s3_logs_folder_path.iterdir() if path.is_file()]
expected_asset_ids = [path.stem for path in expected_reduced_s3_logs_folder_path.rglob("*.tsv")]
for test_parsed_s3_log_file_path in test_output_file_paths:
assert (
test_parsed_s3_log_file_path.stem in expected_asset_ids
), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!"

test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path)

blob_id = test_parsed_s3_log_file_path.stem
expected_parsed_s3_log_file_path = (
expected_reduced_s3_logs_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv"
expected_reduced_s3_logs_folder_path / "blobs" / blob_id[:3] / blob_id[3:6] / f"{blob_id}.tsv"
)
expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path)

pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log)

post_test_error_folder_contents = list(error_folder.iterdir()) if error_folder.exists() else list()
Expand Down

0 comments on commit 59ba412

Please sign in to comment.