From 0f8f0b682a77573b3c3a9ee9c0765c63394a7e32 Mon Sep 17 00:00:00 2001
From: Cody Baker <51133164+CodyCBakerPhD@users.noreply.github.com>
Date: Sat, 14 Sep 2024 03:02:27 -0400
Subject: [PATCH] Hotfix RAM usage (#70)

* try to reduce RAM usage

* version bump

* fix
---
 pyproject.toml                                |  2 +-
 .../_map_binned_s3_logs_to_dandisets.py       | 33 +++++++++++++------
 2 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/pyproject.toml b/pyproject.toml
index d58f17b..6074eb8 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -10,7 +10,7 @@ packages = ["src/dandi_s3_log_parser"]
 
 [project]
 name = "dandi_s3_log_parser"
-version="0.4.1"
+version="0.4.2"
 authors = [
   { name="Cody Baker", email="cody.c.baker.phd@gmail.com" },
 ]
diff --git a/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py b/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py
index 9f0d3ac..28ceb43 100644
--- a/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py
+++ b/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py
@@ -111,7 +111,8 @@ def _map_binned_logs_to_dandiset(
     dandiset_id = dandiset.identifier
     dandiset_log_folder_path = dandiset_logs_folder_path / dandiset_id
 
-    all_reduced_s3_logs_per_blob_id = dict()
+    all_reduced_s3_logs_per_blob_id_aggregated_by_day = dict()
+    all_reduced_s3_logs_per_blob_id_aggregated_by_region = dict()
     blob_id_to_asset_path = dict()
     total_bytes_across_versions_by_blob_id = dict()
     dandiset_versions = list(dandiset.get_versions())
@@ -130,7 +131,8 @@ def _map_binned_logs_to_dandiset(
 
         dandiset_version = client.get_dandiset(dandiset_id=dandiset_id, version_id=version_id)
 
-        reduced_s3_logs_per_day = []
+        all_reduced_s3_logs_aggregated_by_day_for_version = []
+        all_reduced_s3_logs_aggregated_by_region_for_version = []
         total_bytes_per_asset_path = dict()
         dandiset_version_assets = list(dandiset_version.get_assets())
         for asset in tqdm.tqdm(
@@ -191,8 +193,17 @@ def _map_binned_logs_to_dandiset(
             )
 
             reordered_reduced_s3_log["date"] = [entry[:10] for entry in reordered_reduced_s3_log["timestamp"]]
-            reduced_s3_logs_per_day.append(reordered_reduced_s3_log)
-            all_reduced_s3_logs_per_blob_id[blob_id] = reordered_reduced_s3_log
+
+            # Aggregate per asset to save memory (most impactful for 000108)
+            aggregated_activity_by_day = _aggregate_activity_by_day(reduced_s3_logs_per_day=[reordered_reduced_s3_log])
+            all_reduced_s3_logs_aggregated_by_day_for_version.append(aggregated_activity_by_day)
+            all_reduced_s3_logs_per_blob_id_aggregated_by_day[blob_id] = aggregated_activity_by_day
+
+            aggregated_activity_by_region = _aggregate_activity_by_region(
+                reduced_s3_logs_per_day=[reordered_reduced_s3_log]
+            )
+            all_reduced_s3_logs_aggregated_by_region_for_version.append(aggregated_activity_by_region)
+            all_reduced_s3_logs_per_blob_id_aggregated_by_region[blob_id] = aggregated_activity_by_region
 
             total_bytes = sum(reduced_s3_log_binned_by_blob_id["bytes_sent"])
             total_bytes_per_asset_path[asset.path] = total_bytes
@@ -200,17 +211,19 @@ def _map_binned_logs_to_dandiset(
             blob_id_to_asset_path[blob_id] = asset.path
             total_bytes_across_versions_by_blob_id[blob_id] = total_bytes
 
-        if len(reduced_s3_logs_per_day) == 0:
+        if len(all_reduced_s3_logs_aggregated_by_day_for_version) == 0:
             continue  # No activity found (possible dandiset version was never accessed); skip to next version
 
         version_summary_by_day_file_path = dandiset_version_log_folder_path / "version_summary_by_day.tsv"
         _write_aggregated_activity_by_day(
-            reduced_s3_logs_per_day=reduced_s3_logs_per_day, file_path=version_summary_by_day_file_path
+            reduced_s3_logs_per_day=all_reduced_s3_logs_aggregated_by_day_for_version,
+            file_path=version_summary_by_day_file_path,
         )
 
         version_summary_by_region_file_path = dandiset_version_log_folder_path / "version_summary_by_region.tsv"
         _write_aggregated_activity_by_region(
-            reduced_s3_logs_per_day=reduced_s3_logs_per_day, file_path=version_summary_by_region_file_path
+            reduced_s3_logs_per_day=all_reduced_s3_logs_aggregated_by_region_for_version,
+            file_path=version_summary_by_region_file_path,
         )
 
         version_summary_by_asset_file_path = dandiset_version_log_folder_path / "version_summary_by_asset.tsv"
@@ -218,7 +231,7 @@ def _map_binned_logs_to_dandiset(
             total_bytes_per_asset_path=total_bytes_per_asset_path, file_path=version_summary_by_asset_file_path
         )
 
-    if len(all_reduced_s3_logs_per_blob_id) == 0:
+    if len(all_reduced_s3_logs_per_blob_id_aggregated_by_day) == 0:
         return None  # No activity found (possible dandiset was never accessed); skip to next version
 
     # Single path across versions could have been replaced at various points by a new blob
@@ -228,13 +241,13 @@ def _map_binned_logs_to_dandiset(
 
     dandiset_summary_by_day_file_path = dandiset_log_folder_path / "dandiset_summary_by_day.tsv"
     _write_aggregated_activity_by_day(
-        reduced_s3_logs_per_day=all_reduced_s3_logs_per_blob_id.values(),
+        reduced_s3_logs_per_day=all_reduced_s3_logs_per_blob_id_aggregated_by_day.values(),
         file_path=dandiset_summary_by_day_file_path,
     )
 
     dandiset_summary_by_region_file_path = dandiset_log_folder_path / "dandiset_summary_by_region.tsv"
     _write_aggregated_activity_by_region(
-        reduced_s3_logs_per_day=all_reduced_s3_logs_per_blob_id.values(),
+        reduced_s3_logs_per_day=all_reduced_s3_logs_per_blob_id_aggregated_by_region.values(),
         file_path=dandiset_summary_by_region_file_path,
     )