Skip to content

Commit

Permalink
Merge branch 'main' into fix/parallel-tag-addition
Browse files Browse the repository at this point in the history
# Conflicts:
#	CHANGELOG.md
  • Loading branch information
mihran113 committed Dec 5, 2024
2 parents 28385bb + 8c51301 commit ae84342
Show file tree
Hide file tree
Showing 14 changed files with 263 additions and 22 deletions.
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
# Changelog

## 3.25.2
## Unreleased:

### Fixes:
- Fix aggregated metrics' computations (mihran113)
- Fix bug in RunStatusReporter raising non-deterministic RuntimeError exception (VassilisVassiliadis)
- Fix tag addition issue from parallel runs (mihran113)

## 3.26.1 Dec 3, 2024
- Re-upload after PyPI size limitation fix

## 3.26.0 Dec 3, 2024

### Enhancements:
- Improved performance of metric queries by sequence metadata separation (alberttorosyan)
- Add statistics dump script for basic troubleshooting (alberttorosyan)

## 3.25.1 Nov 6, 2024
- Fix corruption marking on empty index db (mihran113)

Expand Down
2 changes: 1 addition & 1 deletion aim/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.25.1
3.26.1
42 changes: 42 additions & 0 deletions aim/cli/runs/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

from aim.cli.runs.utils import make_zip_archive, match_runs, upload_repo_runs
from aim.sdk.repo import Repo
from aim.sdk.index_manager import RepoIndexManager
from aim.sdk.sequences.sequence_type_map import SEQUENCE_TYPE_MAP
from psutil import cpu_count


Expand Down Expand Up @@ -169,3 +171,43 @@ def close_runs(ctx, hashes, yes):

for _ in tqdm.tqdm(pool.imap_unordered(repo._close_run, hashes), desc='Closing runs', total=len(hashes)):
pass


@runs.command(name='update-metrics')
@click.pass_context
@click.option('-y', '--yes', is_flag=True, help='Automatically confirm prompt')
def update_metrics(ctx, yes):
"""Separate Sequence metadata for optimal read."""
repo_path = ctx.obj['repo']
repo = Repo.from_path(repo_path)

click.secho(
f"This command will update Runs from Aim Repo '{repo_path}' to the latest data format to ensure better "
f'performance. Please make sure no Runs are active and Aim UI is not running.'
)
if yes:
confirmed = True
else:
confirmed = click.confirm('Do you want to proceed?')
if not confirmed:
return

index_manager = RepoIndexManager.get_index_manager(repo)
hashes = repo.list_all_runs()
for run_hash in tqdm.tqdm(hashes, desc='Updating runs', total=len(hashes)):
meta_tree = repo.request_tree('meta', run_hash, read_only=False, from_union=False)
meta_run_tree = meta_tree.subtree(('meta', 'chunks', run_hash))
try:
# check if the Run has already been updated.
meta_run_tree.first_key('typed_traces')
click.secho(f'Run {run_hash} is uo-to-date. Skipping.')
continue
except KeyError:
for ctx_idx, run_ctx_dict in meta_run_tree.subtree('traces').items():
assert isinstance(ctx_idx, int)
for seq_name in run_ctx_dict.keys():
assert isinstance(seq_name, str)
dtype = run_ctx_dict[seq_name].get('dtype', 'float')
seq_type = SEQUENCE_TYPE_MAP.get(dtype, 'sequence')
meta_run_tree['typed_traces', seq_type, ctx_idx, seq_name] = 1
index_manager.index(run_hash)
2 changes: 1 addition & 1 deletion aim/sdk/reporter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ def flush(
logger.debug(f'notifying {self}')

with self.reporter_lock:
flag_names = [flag_name] if flag_name is not None else self.timed_tasks
flag_names = [flag_name] if flag_name is not None else list(self.timed_tasks)
with self.flush_condition:
for flag_name in flag_names:
logger.debug(f'flushing {flag_name}')
Expand Down
6 changes: 4 additions & 2 deletions aim/sdk/reporter/file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@

class FileManager(object):
@abstractmethod
def poll(self, pattern: str) -> Optional[str]: ...
def poll(self, pattern: str) -> Optional[str]:
...

@abstractmethod
def touch(self, filename: str, cleanup_file_pattern: Optional[str] = None): ...
def touch(self, filename: str, cleanup_file_pattern: Optional[str] = None):
...


class LocalFileManager(FileManager):
Expand Down
42 changes: 31 additions & 11 deletions aim/sdk/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from aim.sdk.reporter import RunStatusReporter, ScheduledStatusReporter
from aim.sdk.reporter.file_manager import LocalFileManager
from aim.sdk.sequence import Sequence
from aim.sdk.sequences.sequence_type_map import SEQUENCE_TYPE_MAP
from aim.sdk.sequence_collection import SingleRunSequenceCollection
from aim.sdk.tracker import RunTracker
from aim.sdk.types import AimObject
Expand Down Expand Up @@ -496,17 +497,36 @@ def iter_sequence_info_by_type(self, dtypes: Union[str, Tuple[str, ...]]) -> Ite
"""
if isinstance(dtypes, str):
dtypes = (dtypes,)
for ctx_idx, run_ctx_dict in self.meta_run_tree.subtree('traces').items():
assert isinstance(ctx_idx, int)
ctx = self.idx_to_ctx(ctx_idx)
# run_ctx_view = run_meta_traces.view(ctx_idx)
for seq_name in run_ctx_dict.keys():
assert isinstance(seq_name, str)
# skip sequences not matching dtypes.
# sequences with no dtype are considered to be float sequences.
# '*' stands for all data types
if '*' in dtypes or run_ctx_dict[seq_name].get('dtype', 'float') in dtypes:
yield seq_name, ctx, self
try:
self.meta_run_tree.first_key('typed_traces')
has_trace_type_info = True
except KeyError:
has_trace_type_info = False

if has_trace_type_info:
# use set to remove duplicates for overlapping types (such as int and float for metric)
trace_types = set()
for dtype in dtypes:
trace_types.add(SEQUENCE_TYPE_MAP.get(dtype))
for trace_type in trace_types:
for ctx_idx, run_ctx_dict in self.meta_run_tree.subtree('typed_traces').get(trace_type, {}).items():
assert isinstance(ctx_idx, int)
ctx = self.idx_to_ctx(ctx_idx)
for seq_name in run_ctx_dict.keys():
assert isinstance(seq_name, str)
yield seq_name, ctx, self
else:
for ctx_idx, run_ctx_dict in self.meta_run_tree.subtree('traces').items():
assert isinstance(ctx_idx, int)
ctx = self.idx_to_ctx(ctx_idx)
# run_ctx_view = run_meta_traces.view(ctx_idx)
for seq_name in run_ctx_dict.keys():
assert isinstance(seq_name, str)
# skip sequences not matching dtypes.
# sequences with no dtype are considered to be float sequences.
# '*' stands for all data types
if '*' in dtypes or run_ctx_dict[seq_name].get('dtype', 'float') in dtypes:
yield seq_name, ctx, self

def metrics(self) -> 'SequenceCollection':
"""Get iterable object for all run tracked metrics.
Expand Down
9 changes: 6 additions & 3 deletions aim/sdk/run_status_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,16 @@ def __init__(self, *, obj_idx: Optional[str] = None, rank: Optional[int] = None,
self.message = message

@abstractmethod
def is_sent(self): ...
def is_sent(self):
...

@abstractmethod
def update_last_sent(self): ...
def update_last_sent(self):
...

@abstractmethod
def get_msg_details(self): ...
def get_msg_details(self):
...


class StatusNotification(Notification):
Expand Down
14 changes: 14 additions & 0 deletions aim/sdk/sequences/sequence_type_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
SEQUENCE_TYPE_MAP = {
'int': 'metric',
'float': 'metric',
'float64': 'metric',
'number': 'metric',
'aim.image': 'images',
'list(aim.image)': 'images',
'aim.audio': 'audios',
'list(aim.audio)': 'audios',
'aim.text': 'texts',
'list(aim.text)': 'texts',
'aim.distribution': 'distributions',
'aim.figure': 'figures',
}
9 changes: 8 additions & 1 deletion aim/sdk/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from aim.storage.hashing import hash_auto
from aim.storage.object import CustomObject
from aim.storage.types import AimObject
from aim.sdk.sequences.sequence_type_map import SEQUENCE_TYPE_MAP


if TYPE_CHECKING:
Expand Down Expand Up @@ -152,7 +153,7 @@ def _load_sequence_info(self, ctx_id: int, name: str):
try:
seq_info.version = self.meta_run_tree['traces', ctx_id, name, 'version']
except KeyError:
self.meta_run_tree['traces', ctx_id, name, 'version'] = seq_info.dtype = 1
self.meta_run_tree['traces', ctx_id, name, 'version'] = seq_info.version = 1
try:
seq_info.dtype = self.meta_run_tree['traces', ctx_id, name, 'dtype']
except KeyError:
Expand Down Expand Up @@ -213,7 +214,11 @@ def _update_sequence_info(self, ctx_id: int, name: str, val, step: int):

def update_trace_dtype(old_dtype: str, new_dtype: str):
logger.warning(f"Updating sequence '{name}' data type from {old_dtype} to {new_dtype}.")
new_trace_type = SEQUENCE_TYPE_MAP.get(
dtype, 'sequence'
) # use mapping from value type to sequence type
self.meta_tree['traces_types', new_dtype, ctx_id, name] = 1
self.meta_run_tree['typed_traces', new_trace_type, ctx_id, name] = 1
self.meta_run_tree['traces', ctx_id, name, 'dtype'] = new_dtype
seq_info.dtype = new_dtype

Expand All @@ -222,11 +227,13 @@ def update_trace_dtype(old_dtype: str, new_dtype: str):
raise ValueError(f"Cannot log value '{val}' on sequence '{name}'. Incompatible data types.")

if seq_info.count == 0:
trace_type = SEQUENCE_TYPE_MAP.get(dtype, 'sequence') # use mapping from value type to sequence type
self.meta_tree['traces_types', dtype, ctx_id, name] = 1
self.meta_run_tree['traces', ctx_id, name, 'dtype'] = dtype
self.meta_run_tree['traces', ctx_id, name, 'version'] = seq_info.version
self.meta_run_tree['traces', ctx_id, name, 'first'] = val
self.meta_run_tree['traces', ctx_id, name, 'first_step'] = step
self.meta_run_tree['typed_traces', trace_type, ctx_id, name] = 1
seq_info.dtype = dtype

if step >= seq_info.count:
Expand Down
2 changes: 1 addition & 1 deletion aim/web/ui/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ui_v2",
"version": "3.25.1",
"version": "3.26.1",
"private": true,
"dependencies": {
"@aksel/structjs": "^1.0.0",
Expand Down
2 changes: 1 addition & 1 deletion tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-r ../requirements.txt
torch
tensorflow
deeplake
deeplake<4.0.0 # update when proper documentation is available
# hub
fastapi>=0.87.0
httpx
Expand Down
53 changes: 53 additions & 0 deletions troubleshooting/TROUBLESHOOTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Troubleshooting for Aim

This is a collection of scripts and utilities to run over an Aim repo to collect unanimous information for debugging and reproducibility purposes.

## Base Project Stats

The `troubleshooting.base_project_statistics` script is a utility to collect and analyze statistics from **AIM repo**.
It provides a report with base stats: the number of runs, metrics, and query times.
Query time samples are collected by running sample basic queries.

The output of the script is a JSON file containing the statistics.

## How to Use the Command

**Step 1:** Download the `base_project_statistics.py` is in your working directory.
**Step 2:** Execute the script on the repo of interest

```bash
wget https://raw.githubusercontent.com/aimhubio/aim/main/troubleshooting/base_project_statistics.py
python -m base_project_statistics --repo <path_to_repo>

```
The command generates a JSON file in the current working directory, containing statistics about the repository.

## Example JSON Output

A typical output file contains the following information:

```json
{
"runs_count": 19,
"unindexed_runs_count": 3,
"metrics_count": 323,
"avg_metrics_per_run": 17.0,
"max_metrics_per_run": 17,
"avg_params_per_run": 224.58,
"max_params_per_run": 225,
"runs_query_time": 0.036,
"metrics_query_time": 15.7
}
```

### Key Fields

- `runs_count`: Total number of runs in the repository.
- `unindexed_runs_count`: Number of runs that are not indexed.
- `metrics_count`: Total count of metrics across all runs.
- `avg_metrics_per_run`: Average number of metrics per run.
- `max_metrics_per_run`: Maximum number of metrics in a single run.
- `avg_params_per_run`: Average number of parameters per run.
- `max_params_per_run`: Maximum number of parameters in a single run.
- `runs_query_time`: Time taken to query runs, in seconds.
- `metrics_query_time`: Time taken to query metrics, in seconds.
Empty file added troubleshooting/__init__.py
Empty file.
88 changes: 88 additions & 0 deletions troubleshooting/base_project_statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import argparse
import json
import sys
import time

import tqdm

import aim


def count_metrics(run):
run_metric_count = 0
for context in run.meta_run_tree.subtree('traces').keys():
run_metric_count += len(run.meta_run_tree.subtree(('traces', context)).keys_eager())
return run_metric_count


def count_params(run):
return count_dict_keys(run.meta_run_tree['attrs'])


def count_dict_keys(params):
"""
Count the number of leaf nodes in a nested dictionary.
A leaf node is a value that is not a dictionary.
"""
return sum(
count_dict_keys(value) if isinstance(value, dict) else 1
for value in params.values()
)


parser = argparse.ArgumentParser(description='Process command line arguments.')
parser.add_argument('--repo', type=str, required=True)
args = parser.parse_args(sys.argv[1:])

repo = aim.Repo.from_path(args.repo)

print('This script will collect basic statistics for Aim repository.')

stats = {}

all_runs = set(repo.list_all_runs())
unindexed_runs = set(repo.list_active_runs())

stats['runs_count'] = len(all_runs)
stats['unindexed_runs_count'] = len(unindexed_runs)

print('Collecting Runs info')
metrics_count_list = []
params_count_list = []
for run in tqdm.tqdm(repo.iter_runs()):
metrics_count_list.append(count_metrics(run))
params_count_list.append(count_params(run))
print('Done')

stats['metrics_count'] = sum(metrics_count_list)
stats['avg_metrics_per_run'] = round(sum(metrics_count_list) / len(all_runs), 2)
stats['max_metrics_per_run'] = max(metrics_count_list)
stats['avg_params_per_run'] = round(sum(params_count_list) / len(all_runs), 2)
stats['max_params_per_run'] = max(params_count_list)


print('Collecting query performance info')

# Execute Run query with a single parameter access.
# This is required to make sure there is a point lookup for each of the runs in the repo.
run_query = 'run.hparams.lr < 0.001'

start = time.time()
for run in repo.query_runs(run_query, report_mode=0).iter_runs():
pass
end = time.time()
stats['runs_query_time'] = round(end - start, 4)

# Execute Metric query with a single run parameter access and metric name check.
# This is required to make sure there is a point lookup for each of the metrics in the repo.
metric_query = 'run.hparams.lr < 0.001 and metric.name == "loss"'
start = time.time()
for metric in repo.query_metrics(metric_query, report_mode=0).iter():
pass
end = time.time()
stats['metrics_query_time'] = round(end - start, 4)
print('Done')

with open('aim_repo_stats.json', 'w') as fp:
json.dump(stats, fp)
print(f'Stats for Aim repo "{args.repo}" are available at `aim_repo_stats.json`.')

0 comments on commit ae84342

Please sign in to comment.