Skip to content

Commit

Permalink
Post-meeting fixes 1 (#50)
Browse files Browse the repository at this point in the history
* adjust workflows; adjust test cases; add logic; refactor asset handler

* remove from main testing

* debug

* fix CI

* fix CI

* fix CI

---------

Co-authored-by: CodyCBakerPhD <[email protected]>
  • Loading branch information
CodyCBakerPhD and CodyCBakerPhD authored Aug 16, 2024
1 parent a2b19d2 commit 683c281
Show file tree
Hide file tree
Showing 28 changed files with 143 additions and 53 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/deploy_daily_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ concurrency:
jobs:

DailyTests:
uses: ./.github/workflows/testing_dev.yml
uses: ./.github/workflows/testing.yml
secrets:
CODECOV_CREDENTIALS: ${{ secrets.CODECOV_CREDENTIALS }}

LiveServices:
uses: ./.github/workflows/testing_live_services.yml
secrets:
IP_HASH_SALT: ${{ secrets.IP_HASH_SALT }}
IPINFO_CREDENTIALS: ${{ secrets.IPINFO_CREDENTIALS }}
Expand Down
9 changes: 7 additions & 2 deletions .github/workflows/deploy_tests_on_pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@ concurrency:

jobs:

DevTests:
uses: ./.github/workflows/testing_dev.yml
Tests:
uses: ./.github/workflows/testing.yml
secrets:
CODECOV_CREDENTIALS: ${{ secrets.CODECOV_CREDENTIALS }}

LiveServices:
uses: ./.github/workflows/testing_live_services.yml
secrets:
IP_HASH_SALT: ${{ secrets.IP_HASH_SALT }}
IPINFO_CREDENTIALS: ${{ secrets.IPINFO_CREDENTIALS }}
Expand Down
51 changes: 51 additions & 0 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
name: Dev tests
on:
workflow_call:
secrets:
CODECOV_CREDENTIALS:
required: true

jobs:

run:
# Will read on PR dashboard as 'Deploy / DevTests / ubuntu'
# Action dashboard identified by 'Dev tests'
# Requirement settings identified as 'DevTests / ubuntu'
name: ubuntu
runs-on: ubuntu-latest
strategy:
fail-fast: false

steps:
- uses: actions/checkout@v4
- run: git fetch --prune --unshallow --tags
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.12"

- name: Global Setup
run: |
python -m pip install -U pip
pip install pytest-cov
- name: Install local checkout
run: pip install --no-cache-dir .

- name: Display installed packages and their sources for debugging
run: pip list

- name: Run pytest with coverage and printout coverage for debugging
run: |
pytest tests -vv -rsx --cov=dandi_s3_log_parser --cov-report xml:./coverage.xml
cat ./coverage.xml
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_CREDENTIALS }}
file: ./coverage.xml
flags: unittests
name: codecov-umbrella
fail_ci_if_error: true
verbose: true
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:
- name: Run pytest with coverage and printout coverage for debugging
run: |
pytest -vv -rsx --cov=dandi_s3_log_parser --cov-report xml:./coverage.xml
pytest test_live_services -vv -rsx --cov=dandi_s3_log_parser --cov-report xml:./coverage.xml
cat ./coverage.xml
- name: Upload coverage to Codecov
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ packages = ["src/dandi_s3_log_parser"]

[project]
name = "dandi_s3_log_parser"
version="0.2.0"
version="0.3.0"
authors = [
{ name="Cody Baker", email="[email protected]" },
]
Expand Down
40 changes: 27 additions & 13 deletions src/dandi_s3_log_parser/_dandi_s3_log_file_reducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,17 @@ def reduce_all_dandi_raw_s3_logs(

print("\n\nParallel parsing complete!\n\n")

for per_worker_temporary_folder_path in tqdm.tqdm(
iterable=per_worker_temporary_folder_paths,
desc="Merging results across workers...",
total=len(per_worker_temporary_folder_paths),
position=0,
leave=True,
mininterval=2.0,
for worker_index, per_worker_temporary_folder_path in enumerate(
tqdm.tqdm(
iterable=per_worker_temporary_folder_paths,
desc="Merging results across workers...",
total=len(per_worker_temporary_folder_paths),
position=0,
leave=True,
mininterval=2.0,
)
):
per_worker_reduced_s3_log_file_paths = list(per_worker_temporary_folder_path.iterdir())
per_worker_reduced_s3_log_file_paths = list(per_worker_temporary_folder_path.rglob("*.tsv"))
assert (
len(per_worker_reduced_s3_log_file_paths) != 0
), f"No files found in {per_worker_temporary_folder_path}!"
Expand All @@ -160,20 +162,26 @@ def reduce_all_dandi_raw_s3_logs(
leave=False,
mininterval=2.0,
):
merged_temporary_file_path = reduced_s3_logs_folder_path / per_worker_reduced_s3_log_file_path.name
merge_target_file_path = reduced_s3_logs_folder_path / per_worker_reduced_s3_log_file_path.relative_to(
per_worker_temporary_folder_path
)

parsed_s3_log = pandas.read_table(filepath_or_buffer=per_worker_reduced_s3_log_file_path, header=0)

header = False if merged_temporary_file_path.exists() else True
merge_target_file_path_exists = merge_target_file_path.exists()
if not merge_target_file_path_exists and not merge_target_file_path.parent.exists():
merge_target_file_path.parent.mkdir(exist_ok=True, parents=True)

header = False if merge_target_file_path_exists else True
parsed_s3_log.to_csv(
path_or_buf=merged_temporary_file_path,
path_or_buf=merge_target_file_path,
mode="a",
sep="\t",
header=header,
index=False,
)

print("\n\n")
shutil.rmtree(path=temporary_base_folder_path)


# Function cannot be covered because the line calls occur on subprocesses
Expand Down Expand Up @@ -302,6 +310,12 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
def _get_default_dandi_asset_id_handler() -> Callable:
def asset_id_handler(*, raw_asset_id: str) -> str:
split_by_slash = raw_asset_id.split("/")
return split_by_slash[0] + "_" + split_by_slash[-1]

asset_type = split_by_slash[0]
if asset_type == "zarr":
zarr_blob_form = "/".join(split_by_slash[:2])
return zarr_blob_form

return raw_asset_id

return asset_id_handler
32 changes: 18 additions & 14 deletions src/dandi_s3_log_parser/_dandiset_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,33 +72,37 @@ def _map_reduced_logs_to_dandiset(

dandiset_version = client.get_dandiset(dandiset_id=dandiset_id, version_id=version_id)

all_reduced_logs = []
all_reduced_s3_logs = []
for asset in dandiset_version.get_assets():
asset_suffixes = pathlib.Path(asset.path).suffixes
is_asset_zarr = ".zarr" in asset_suffixes

blob_id = asset.blob if not is_asset_zarr else asset.zarr
blobs_or_zarr = "blobs" if not is_asset_zarr else "zarr"
if is_asset_zarr:
blob_id = asset.zarr
reduced_s3_log_file_path = reduced_s3_logs_folder_path / "zarr" / f"{blob_id}.tsv"
else:
blob_id = asset.blob
reduced_s3_log_file_path = (
reduced_s3_logs_folder_path / "blobs" / blob_id[:3] / blob_id[3:6] / f"{blob_id}.tsv"
)

reduced_log_file_path = reduced_s3_logs_folder_path / f"{blobs_or_zarr}_{blob_id}.tsv"

if not reduced_log_file_path.exists():
if not reduced_s3_log_file_path.exists():
continue # No reduced logs found (possible asset was never accessed); skip to next asset

reduced_log = pandas.read_table(filepath_or_buffer=reduced_log_file_path, header=0)
reduced_log["filename"] = [asset.path] * len(reduced_log)
reduced_log["region"] = [
reduced_s3_log = pandas.read_table(filepath_or_buffer=reduced_s3_log_file_path, header=0)
reduced_s3_log["filename"] = [asset.path] * len(reduced_s3_log)
reduced_s3_log["region"] = [
get_region_from_ip_address(ip_address=ip_address, ip_hash_to_region=ip_hash_to_region)
for ip_address in reduced_log["ip_address"]
for ip_address in reduced_s3_log["ip_address"]
]

reordered_reduced_log = reduced_log.reindex(columns=("filename", "timestamp", "bytes_sent", "region"))
all_reduced_logs.append(reordered_reduced_log)
reordered_reduced_s3_log = reduced_s3_log.reindex(columns=("filename", "timestamp", "bytes_sent", "region"))
all_reduced_s3_logs.append(reordered_reduced_s3_log)

if len(all_reduced_logs) == 0:
if len(all_reduced_s3_logs) == 0:
continue # No reduced logs found (possible dandiset version was never accessed); skip to next version

mapped_log = pandas.concat(objs=all_reduced_logs, ignore_index=True)
mapped_log = pandas.concat(objs=all_reduced_s3_logs, ignore_index=True)
mapped_log.sort_values(by="timestamp")
mapped_log.index = range(len(mapped_log))

Expand Down
14 changes: 10 additions & 4 deletions src/dandi_s3_log_parser/_s3_log_file_reducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def reduce_raw_s3_log(
----------
raw_s3_log_file_path : str or pathlib.Path
The path to the raw S3 log file.
reduced_s3_log_folder_path : str or pathlib.Path
reduced_s3_logs_folder_path : str or pathlib.Path
The path to write each reduced S3 log file to.
There will be one file per handled asset ID.
mode : "w" or "a", default: "a"
Expand Down Expand Up @@ -94,12 +94,18 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
)

for handled_asset_id, reduced_logs_per_handled_asset_id in reduced_and_binned_logs.items():
parsed_s3_log_file_path = reduced_s3_logs_folder_path / f"{handled_asset_id}.tsv"
handled_asset_id_path = pathlib.Path(handled_asset_id)
blob_id = handled_asset_id_path.stem
reduced_s3_log_file_path = reduced_s3_logs_folder_path / handled_asset_id_path.parent / f"{blob_id}.tsv"

reduced_log_file_exists = reduced_s3_log_file_path.exists()
if not reduced_log_file_exists and not reduced_s3_log_file_path.parent.exists():
reduced_s3_log_file_path.parent.mkdir(exist_ok=True, parents=True)

data_frame = pandas.DataFrame(data=reduced_logs_per_handled_asset_id)

header = False if parsed_s3_log_file_path.exists() is True and mode == "a" else True
data_frame.to_csv(path_or_buf=parsed_s3_log_file_path, mode=mode, sep="\t", header=header, index=False)
header = False if reduced_log_file_exists is True and mode == "a" else True
data_frame.to_csv(path_or_buf=reduced_s3_log_file_path, mode=mode, sep="\t", header=header, index=False)


def _get_reduced_and_binned_log_lines(
Expand Down

This file was deleted.

8 changes: 5 additions & 3 deletions tests/test_reduce_all_dandi_raw_s3_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def test_reduce_all_dandi_raw_s3_logs_example_1(tmpdir: py.path.local) -> None:
base_raw_s3_logs_folder_path=examples_folder_path,
reduced_s3_logs_folder_path=test_reduced_s3_logs_folder_path,
)
test_output_file_paths = list(test_reduced_s3_logs_folder_path.iterdir())
test_output_file_paths = list(test_reduced_s3_logs_folder_path.rglob("*.tsv"))

number_of_output_files = len(test_output_file_paths)
assert number_of_output_files != 0, f"Test expected_output folder ({test_reduced_s3_logs_folder_path}) is empty!"
Expand All @@ -32,15 +32,17 @@ def test_reduce_all_dandi_raw_s3_logs_example_1(tmpdir: py.path.local) -> None:
number_of_output_files == expected_number_of_output_files
), f"The number of asset files ({number_of_output_files}) does not match expectation!"

expected_asset_ids = [file_path.stem for file_path in expected_reduced_s3_logs_folder_path.iterdir()]
expected_asset_ids = [file_path.stem for file_path in expected_reduced_s3_logs_folder_path.rglob("*.tsv")]
for test_parsed_s3_log_file_path in test_output_file_paths:
assert (
test_parsed_s3_log_file_path.stem in expected_asset_ids
), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!"

test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path)

blob_id = test_parsed_s3_log_file_path.stem
expected_parsed_s3_log_file_path = (
expected_reduced_s3_logs_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv"
expected_reduced_s3_logs_folder_path / "blobs" / blob_id[:3] / blob_id[3:6] / f"{blob_id}.tsv"
)
expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path)

Expand Down
10 changes: 5 additions & 5 deletions tests/test_reduce_all_dandi_raw_s3_logs_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ def test_reduce_all_dandi_raw_s3_logs_example_0_parallel(tmpdir: py.path.local)
reduced_s3_logs_folder_path=test_reduced_s3_logs_folder_path,
maximum_number_of_workers=2,
)
test_output_file_paths = [
path for path in test_reduced_s3_logs_folder_path.iterdir() if path.is_file()
] # Skip .temp
test_output_file_paths = [path for path in test_reduced_s3_logs_folder_path.rglob("*.tsv")]

number_of_output_files = len(test_output_file_paths)
assert number_of_output_files != 0, f"Test expected_output folder ({test_reduced_s3_logs_folder_path}) is empty!"
Expand All @@ -35,15 +33,17 @@ def test_reduce_all_dandi_raw_s3_logs_example_0_parallel(tmpdir: py.path.local)
number_of_output_files == expected_number_of_output_files
), f"The number of asset files ({number_of_output_files}) does not match expectation!"

expected_asset_ids = [file_path.stem for file_path in expected_reduced_s3_logs_folder_path.iterdir()]
expected_asset_ids = [file_path.stem for file_path in expected_reduced_s3_logs_folder_path.rglob("*.tsv")]
for test_parsed_s3_log_file_path in test_output_file_paths:
assert (
test_parsed_s3_log_file_path.stem in expected_asset_ids
), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!"

test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path)

blob_id = test_parsed_s3_log_file_path.stem
expected_parsed_s3_log_file_path = (
expected_reduced_s3_logs_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv"
expected_reduced_s3_logs_folder_path / "blobs" / blob_id[:3] / blob_id[3:6] / f"{blob_id}.tsv"
)
expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path)

Expand Down
9 changes: 6 additions & 3 deletions tests/test_reduce_dandi_raw_s3_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_reduce_dandi_raw_s3_log_example_0(tmpdir: py.path.local) -> None:
raw_s3_log_file_path=example_raw_s3_log_file_path,
reduced_s3_logs_folder_path=test_reduced_s3_logs_folder_path,
)
test_output_file_paths = list(test_reduced_s3_logs_folder_path.iterdir())
test_output_file_paths = list(test_reduced_s3_logs_folder_path.rglob("*.tsv"))

number_of_output_files = len(test_output_file_paths)
assert number_of_output_files != 0, f"Test expected_output folder ({test_reduced_s3_logs_folder_path}) is empty!"
Expand All @@ -38,15 +38,18 @@ def test_reduce_dandi_raw_s3_log_example_0(tmpdir: py.path.local) -> None:
number_of_output_files == expected_number_of_output_files
), f"The number of asset files ({number_of_output_files}) does not match expectation!"

expected_asset_ids = [path.stem for path in expected_reduced_s3_logs_folder_path.iterdir() if path.is_file()]
expected_asset_ids = [path.stem for path in expected_reduced_s3_logs_folder_path.rglob("*.tsv")]
for test_parsed_s3_log_file_path in test_output_file_paths:
assert (
test_parsed_s3_log_file_path.stem in expected_asset_ids
), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!"

test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path)

blob_id = test_parsed_s3_log_file_path.stem
expected_parsed_s3_log_file_path = (
expected_reduced_s3_logs_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv"
expected_reduced_s3_logs_folder_path / "blobs" / blob_id[:3] / blob_id[3:6] / f"{blob_id}.tsv"
)
expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path)

pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log)
9 changes: 6 additions & 3 deletions tests/test_reduce_dandi_raw_s3_log_bad_lines.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,26 @@ def test_reduce_dandi_raw_s3_log_bad_lines(tmpdir: py.path.local) -> None:
raw_s3_log_file_path=example_raw_s3_log_file_path,
reduced_s3_logs_folder_path=test_reduced_s3_logs_folder_path,
)
test_output_file_paths = list(test_reduced_s3_logs_folder_path.iterdir())
test_output_file_paths = list(test_reduced_s3_logs_folder_path.rglob("*.tsv"))

number_of_output_files = len(test_output_file_paths)
expected_number_of_output_files = 3
assert number_of_output_files == expected_number_of_output_files

expected_asset_ids = [path.stem for path in expected_reduced_s3_logs_folder_path.iterdir() if path.is_file()]
expected_asset_ids = [path.stem for path in expected_reduced_s3_logs_folder_path.rglob("*.tsv")]
for test_parsed_s3_log_file_path in test_output_file_paths:
assert (
test_parsed_s3_log_file_path.stem in expected_asset_ids
), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!"

test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path)

blob_id = test_parsed_s3_log_file_path.stem
expected_parsed_s3_log_file_path = (
expected_reduced_s3_logs_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv"
expected_reduced_s3_logs_folder_path / "blobs" / blob_id[:3] / blob_id[3:6] / f"{blob_id}.tsv"
)
expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path)

pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log)

post_test_error_folder_contents = list(error_folder.iterdir()) if error_folder.exists() else list()
Expand Down

0 comments on commit 683c281

Please sign in to comment.