Skip to content

Commit

Permalink
add more summaries
Browse files Browse the repository at this point in the history
  • Loading branch information
CodyCBakerPhD committed Sep 12, 2024
1 parent 1d95f3e commit 2d1a0d8
Show file tree
Hide file tree
Showing 31 changed files with 157 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ def bin_all_reduced_s3_logs_by_object_key(
for reduced_s3_log_file in tqdm.tqdm(
iterable=reduced_s3_log_files,
total=len(reduced_s3_log_files),
desc="Binning reduced logs...",
desc="Binning reduced logs",
position=0,
leave=True,
mininterval=3.0,
smoothing=0,
unit="file",
):
if reduced_s3_log_file.stat().st_size == 0:
with open(file=started_tracking_file_path, mode="a") as io:
Expand Down Expand Up @@ -93,11 +94,12 @@ def bin_all_reduced_s3_logs_by_object_key(
for object_key, data in tqdm.tqdm(
iterable=object_keys_to_data.items(),
total=len(object_keys_to_data),
desc=f"Binning {reduced_s3_log_file}...",
desc=f"Binning {reduced_s3_log_file}",
position=1,
leave=False,
mininterval=3.0,
smoothing=0,
unit="asset",
):
object_key_as_path = pathlib.Path(object_key)
binned_s3_log_file_path = (
Expand Down
10 changes: 7 additions & 3 deletions src/dandi_s3_log_parser/_dandi_s3_log_file_reducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ def reduce_all_dandi_raw_s3_logs(
fields_to_reduce = ["object_key", "timestamp", "bytes_sent", "ip_address"]
object_key_parents_to_reduce = ["blobs", "zarr"]
line_buffer_tqdm_kwargs = dict(position=1, leave=False)
# TODO: add better reporting units to all TQDMs (lines / s, files / s, etc.)
if maximum_number_of_workers == 1:
for relative_s3_log_file_path in tqdm.tqdm(
iterable=relative_s3_log_file_paths_to_reduce,
total=len(relative_s3_log_file_paths_to_reduce),
desc="Parsing log files...",
desc="Parsing log files",
position=0,
leave=True,
smoothing=0, # Use true historical average, not moving average since shuffling makes it more uniform
unit="file",
):
raw_s3_log_file_path = raw_s3_logs_folder_path / relative_s3_log_file_path
reduced_s3_log_file_path = (
Expand Down Expand Up @@ -144,6 +144,7 @@ def reduce_all_dandi_raw_s3_logs(
leave=True,
mininterval=3.0,
smoothing=0, # Use true historical average, not moving average since shuffling makes it more uniform
unit="file",
)
for future in progress_bar_iterable:
future.result() # This is the call that finally triggers the deployment to the workers
Expand Down Expand Up @@ -177,7 +178,10 @@ def _multi_worker_reduce_dandi_raw_s3_log(
object_key_parents_to_reduce = ["blobs", "zarr"]
object_key_handler = _get_default_dandi_object_key_handler()
line_buffer_tqdm_kwargs = dict(
position=worker_index + 1, leave=False, desc=f"Parsing line buffers on worker {worker_index + 1}..."
position=worker_index + 1,
leave=False,
desc=f"Parsing line buffers on worker {worker_index + 1}...",
unit="buffer",
)

reduce_raw_s3_log(
Expand Down
97 changes: 79 additions & 18 deletions src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import collections
import os
import pathlib
from typing import Iterable

import dandi.dandiapi
import natsort
Expand Down Expand Up @@ -81,6 +83,7 @@ def map_binned_s3_logs_to_dandisets(
leave=True,
mininterval=5.0,
smoothing=0,
unit="dandiset",
):
_map_binned_logs_to_dandiset(
dandiset=dandiset,
Expand Down Expand Up @@ -108,31 +111,37 @@ def _map_binned_logs_to_dandiset(
dandiset_id = dandiset.identifier
dandiset_log_folder_path = dandiset_logs_folder_path / dandiset_id

all_activity_across_versions_per_blob_id_by_day = dict()
blob_id_to_asset_path = dict()
all_activity_across_versions_by_blob_id = dict()
dandiset_versions = list(dandiset.get_versions())
for version in tqdm.tqdm(
iterable=dandiset_versions,
total=len(dandiset_versions),
desc=f"Mapping Dandiset {dandiset_id} versions...",
desc=f"Mapping Dandiset {dandiset_id} versions",
position=1,
leave=False,
mininterval=5.0,
smoothing=0,
unit="version",
):
version_id = version.identifier
dandiset_version_log_folder_path = dandiset_log_folder_path / version_id

dandiset_version = client.get_dandiset(dandiset_id=dandiset_id, version_id=version_id)

all_activity_for_version = []
all_activity_for_version_by_day = []
all_activity_for_version_by_asset_path = dict()
dandiset_version_assets = list(dandiset_version.get_assets())
for asset in tqdm.tqdm(
iterable=dandiset_version_assets,
total=len(dandiset_version_assets),
desc="Mapping assets...",
desc=f"Mapping {dandiset_id}/{version}",
position=2,
leave=False,
mininterval=5.0,
smoothing=0,
unit="asset",
):
asset_as_path = pathlib.Path(asset.path)
asset_suffixes = asset_as_path.suffixes
Expand Down Expand Up @@ -182,26 +191,78 @@ def _map_binned_logs_to_dandiset(
)

reordered_reduced_s3_log["date"] = [entry[:10] for entry in reordered_reduced_s3_log["timestamp"]]
reordered_reduced_s3_log_binned_per_day = reordered_reduced_s3_log.reindex(columns=("date", "bytes_sent"))
all_activity_for_version_by_day.append(reordered_reduced_s3_log_binned_per_day)
all_activity_across_versions_per_blob_id_by_day[blob_id] = reordered_reduced_s3_log_binned_per_day

reordered_reduced_s3_log_aggregated = reordered_reduced_s3_log.groupby("date", as_index=False)[
"bytes_sent"
].agg([list, "sum"])
reordered_reduced_s3_log_aggregated.rename(columns={"sum": "bytes_sent"}, inplace=True)
total_bytes = sum(reduced_s3_log_binned_by_blob_id["bytes_sent"])
all_activity_for_version_by_asset_path[asset.path] = total_bytes

reordered_reduced_s3_log_binned_per_day = reordered_reduced_s3_log_aggregated.reindex(
columns=("date", "bytes_sent")
)
reordered_reduced_s3_log_binned_per_day.sort_values(by="date", key=natsort.natsort_keygen(), inplace=True)
blob_id_to_asset_path[blob_id] = asset.path
all_activity_across_versions_by_blob_id[blob_id] = total_bytes

all_activity_for_version.append(reordered_reduced_s3_log_binned_per_day)
if len(all_activity_for_version_by_day) == 0:
continue # No activity found (possible dandiset version was never accessed); skip to next version

if len(all_activity_for_version) == 0:
continue # No reduced logs found (possible dandiset version was never accessed); skip to next version
aggregated_activity_for_version_by_day = _aggregate_activity_by_day(
reduced_s3_logs_per_day=all_activity_for_version_by_day
)
aggregated_activity_for_version_by_asset = _aggregate_activity_by_asset(
total_bytes_per_asset_path=all_activity_for_version_by_asset_path
)

summary_logs = pandas.concat(objs=all_activity_for_version, ignore_index=True)
summary_logs.sort_values(by="date", key=natsort.natsort_keygen(), inplace=True)
version_summary_by_day_file_path = dandiset_version_log_folder_path / "version_summary_by_day.tsv"
version_summary_by_asset_file_path = dandiset_version_log_folder_path / "version_summary_by_asset.tsv"
aggregated_activity_for_version_by_day.to_csv(
path_or_buf=version_summary_by_day_file_path, mode="w", sep="\t", header=True, index=False
)
aggregated_activity_for_version_by_asset.to_csv(
path_or_buf=version_summary_by_asset_file_path, mode="w", sep="\t", header=True, index=False
)

summary_file_path = dandiset_version_log_folder_path / "summary.tsv"
summary_logs.to_csv(path_or_buf=summary_file_path, mode="w", sep="\t", header=True, index=False)
if len(all_activity_across_versions_per_blob_id_by_day) == 0:
return None # No activity found (possible dandiset was never accessed); skip to next version

all_activity_across_versions_by_asset = collections.defaultdict(int)
for blob_id, asset_path in blob_id_to_asset_path.items():
all_activity_across_versions_by_asset[asset_path] += all_activity_across_versions_by_blob_id[blob_id]

aggregated_activity_for_dandiset_by_day = _aggregate_activity_by_day(
reduced_s3_logs_per_day=all_activity_across_versions_per_blob_id_by_day.values()
)
aggregated_activity_for_dandiset_by_asset = _aggregate_activity_by_asset(
total_bytes_per_asset_path=all_activity_across_versions_by_asset
)

dandiset_summary_by_day_file_path = dandiset_log_folder_path / "dandiset_summary_by_day.tsv"
dandiset_summary_by_asset_file_path = dandiset_log_folder_path / "dandiset_summary_by_asset.tsv"
aggregated_activity_for_dandiset_by_day.to_csv(
path_or_buf=dandiset_summary_by_day_file_path, mode="w", sep="\t", header=True, index=False
)
aggregated_activity_for_dandiset_by_asset.to_csv(
path_or_buf=dandiset_summary_by_asset_file_path, mode="w", sep="\t", header=True, index=False
)

return None


def _aggregate_activity_by_day(reduced_s3_logs_per_day: Iterable[pandas.DataFrame]) -> pandas.DataFrame:
all_reduced_s3_logs = pandas.concat(objs=reduced_s3_logs_per_day, ignore_index=True)

pre_aggregated = all_reduced_s3_logs.groupby(by="date", as_index=False)["bytes_sent"].agg([list, "sum"])
pre_aggregated.rename(columns={"sum": "bytes_sent"}, inplace=True)
pre_aggregated.sort_values(by="date", key=natsort.natsort_keygen(), inplace=True)

aggregated_activity_by_day = pre_aggregated.reindex(columns=("date", "bytes_sent"))

return aggregated_activity_by_day


def _aggregate_activity_by_asset(total_bytes_per_asset_path: dict[str, int]) -> pandas.DataFrame:
aggregated_activity_by_asset = pandas.DataFrame(
data=[list(total_bytes_per_asset_path.keys()), list(total_bytes_per_asset_path.values())]
).T
aggregated_activity_by_asset.rename(columns={"0": "asset_path", "1": "bytes_sent"}, inplace=True)
aggregated_activity_by_asset.sort_values(by="bytes_sent", inplace=True)

return aggregated_activity_by_asset
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
timestamp bytes_sent ip_address
2022-03-16T02:21:12 1234 18.220.4.80
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
timestamp bytes_sent ip_address line_index
2022-03-16T02:21:12 512 192.0.2.0 1
2022-05-04T05:06:35 512 192.0.2.0 1
timestamp bytes_sent ip_address
2022-03-16T02:21:12 512 192.0.2.0
2022-05-04T05:06:35 512 192.0.2.0
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
timestamp bytes_sent ip_address line_index
2021-04-24T12:03:05 1443 192.0.2.0 0
2021-12-31T23:06:42 1443 192.0.2.0 0
timestamp bytes_sent ip_address
2021-04-24T12:03:05 1443 192.0.2.0
2021-12-31T23:06:42 1443 192.0.2.0
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
timestamp bytes_sent ip_address line_index
2023-01-01T22:42:58 1526223 192.0.2.0 2
timestamp bytes_sent ip_address
2023-01-01T22:42:58 1526223 192.0.2.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
timestamp bytes_sent region
0 2022-03-16T02:21:12 1234 AWS
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
asset bytes_sent
sub-YutaMouse20/sub-YutaMouse20_ses-YutaMouse20-140327_behavior+ecephys.nwb 1024
sub-YutaMouse37/sub-YutaMouse37_ses-YutaMouse37-150610_behavior+ecephys.nwb 1234
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
date bytes_sent
2022-03-16 1746
2022-05-04 512
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
timestamp bytes_sent region
0 2022-03-16T02:21:12 1234 AWS
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
asset bytes_sent
sub-YutaMouse20/sub-YutaMouse20_ses-YutaMouse20-140327_behavior+ecephys.nwb 1024
sub-YutaMouse37/sub-YutaMouse37_ses-YutaMouse37-150610_behavior+ecephys.nwb 1234
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
asset bytes_sent
sub-YutaMouse20/sub-YutaMouse20_ses-YutaMouse20-140327_behavior+ecephys.nwb 1024
sub-YutaMouse37/sub-YutaMouse37_ses-YutaMouse37-150610_behavior+ecephys.nwb 1234
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
timestamp bytes_sent region
0 2022-03-16T02:21:12 1234 AWS
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
asset bytes_sent
sub-YutaMouse20/sub-YutaMouse20_ses-YutaMouse20-140327_behavior+ecephys.nwb 1024
sub-YutaMouse37/sub-YutaMouse37_ses-YutaMouse37-150610_behavior+ecephys.nwb 1234

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
asset bytes_sent
sub-YutaMouse20/sub-YutaMouse20_ses-YutaMouse20-140327_behavior+ecephys.nwb 1024
sub-YutaMouse37/sub-YutaMouse37_ses-YutaMouse37-150610_behavior+ecephys.nwb 1234
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
date bytes_sent
2022-03-16 512
2022-05-04 512
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
asset bytes_sent
sub-YutaMouse20/sub-YutaMouse20_ses-YutaMouse20-140327_behavior+ecephys.nwb 1024
sub-YutaMouse37/sub-YutaMouse37_ses-YutaMouse37-150610_behavior+ecephys.nwb 1234
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
date bytes_sent
2022-03-16 512
2022-05-04 512

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
asset bytes_sent
sub-YutaMouse20/sub-YutaMouse20_ses-YutaMouse20-140327_behavior+ecephys.nwb 1024
sub-YutaMouse37/sub-YutaMouse37_ses-YutaMouse37-150610_behavior+ecephys.nwb 1234
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
date bytes_sent
2022-03-16 512
2022-05-04 512
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
asset bytes_sent
sub-YutaMouse20/sub-YutaMouse20_ses-YutaMouse20-140327_behavior+ecephys.nwb 1024
sub-YutaMouse37/sub-YutaMouse37_ses-YutaMouse37-150610_behavior+ecephys.nwb 1234
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
date bytes_sent
2022-03-16 512
2022-05-04 512
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
asset bytes_sent
sub-YutaMouse20/sub-YutaMouse20_ses-YutaMouse20-140327_behavior+ecephys.nwb 1024
sub-YutaMouse37/sub-YutaMouse37_ses-YutaMouse37-150610_behavior+ecephys.nwb 1234
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ def test_map_all_reduced_s3_logs_to_dandisets(tmpdir: py.path.local):
relative_file_path = expected_file_path.relative_to(expected_output_folder_path)
test_file_path = test_mapped_s3_logs_folder_path / relative_file_path

# Pandas assertion makes no reference to the file being tested when it fails
print(f"{test_file_path=}")
print(f"{expected_file_path=}")

test_mapped_log = pandas.read_table(filepath_or_buffer=test_file_path, index_col=0)
expected_mapped_log = pandas.read_table(filepath_or_buffer=expected_file_path, index_col=0)

pandas.testing.assert_frame_equal(left=test_mapped_log, right=expected_mapped_log)
# Pandas assertion makes no reference to the case being tested when it fails
try:
pandas.testing.assert_frame_equal(left=test_mapped_log, right=expected_mapped_log)
except AssertionError as exception:
message = (
f"\n\nTest file path: {test_file_path}\nExpected file path: {expected_file_path}\n\n"
f"{str(exception)}\n\n"
)
raise AssertionError(message)

0 comments on commit 2d1a0d8

Please sign in to comment.