Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added minor logging facility for exceptions #865

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 35 additions & 20 deletions hed/tools/remodeling/cli/run_remodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import os
import json
import argparse
import logging
from hed.errors.exceptions import HedFileError
from hed.tools.util.io_util import get_file_list, get_task_dict

from hed.tools.bids.bids_dataset import BidsDataset
from hed.tools.remodeling.remodeler_validator import RemodelerValidator
from hed.tools.remodeling.dispatcher import Dispatcher
from hed.tools.remodeling.backup_manager import BackupManager
from hed.tools.util.io_util import get_alphanumeric_path, get_file_list, get_task_dict, get_timestamp


def get_parser():
Expand Down Expand Up @@ -36,6 +38,8 @@ def get_parser():
help="Controls individual file summaries ('none', 'separate', 'consolidated')")
parser.add_argument("-j", "--json-sidecar", dest="json_sidecar", nargs="?",
help="Optional path to JSON sidecar with HED information")
parser.add_argument("-ld", "--log_dir", dest="log_dir", default="",
help="Directory for storing log entries for errors.")
# parser.add_argument("-n", "--backup-name", default=BackupManager.DEFAULT_BACKUP_NAME, dest="backup_name",
# help="Name of the default backup for remodeling")
parser.add_argument("-nb", "--no-backup", action='store_true', dest="no_backup",
Expand Down Expand Up @@ -202,25 +206,36 @@ def main(arg_list=None):

"""
args, operations = parse_arguments(arg_list)
if not os.path.isdir(args.data_dir):
raise HedFileError("DataDirectoryDoesNotExist", f"The root data directory {args.data_dir} does not exist", "")
backup_name = handle_backup(args)
save_dir = None
if args.work_dir:
save_dir = os.path.realpath(os.path.join(args.work_dir, Dispatcher.REMODELING_SUMMARY_PATH))
files = get_file_list(args.data_dir, name_suffix=args.file_suffix, extensions=args.extensions,
exclude_dirs=args.exclude_dirs)
task_dict = parse_tasks(files, args.task_names)
for task, files in task_dict.items():
dispatch = Dispatcher(operations, data_root=args.data_dir, backup_name=backup_name,
hed_versions=args.hed_versions)
if args.use_bids:
run_bids_ops(dispatch, args, files)
else:
run_direct_ops(dispatch, args, files)
if not args.no_summaries:
dispatch.save_summaries(args.save_formats, individual_summaries=args.individual_summaries,
summary_dir=save_dir, task_name=task)

if args.log_dir:
os.makedirs(args.log_dir, exist_ok=True)
timestamp = get_timestamp()
try:
if not os.path.isdir(args.data_dir):
raise HedFileError("DataDirectoryDoesNotExist", f"The root data directory {args.data_dir} does not exist", "")
backup_name = handle_backup(args)
save_dir = None
if args.work_dir:
save_dir = os.path.realpath(os.path.join(args.work_dir, Dispatcher.REMODELING_SUMMARY_PATH))
files = get_file_list(args.data_dir, name_suffix=args.file_suffix, extensions=args.extensions,
exclude_dirs=args.exclude_dirs)
task_dict = parse_tasks(files, args.task_names)
for task, files in task_dict.items():
dispatch = Dispatcher(operations, data_root=args.data_dir, backup_name=backup_name,
hed_versions=args.hed_versions)
if args.use_bids:
run_bids_ops(dispatch, args, files)
else:
run_direct_ops(dispatch, args, files)
if not args.no_summaries:
dispatch.save_summaries(args.save_formats, individual_summaries=args.individual_summaries,
summary_dir=save_dir, task_name=task)
except Exception as ex:
if args.log_dir:
log_name = get_alphanumeric_path(os.path.realpath(args.data_dir)) + '_' + timestamp + '.txt'
logging.basicConfig(filename=os.path.join(args.log_dir, log_name), level=logging.ERROR)
logging.exception(f"{args.data_dir}: {args.model_path}")
raise


if __name__ == '__main__':
Expand Down
14 changes: 14 additions & 0 deletions hed/tools/util/io_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,20 @@ def get_allowed(value, allowed_values=None, starts_with=True):
return result


def get_alphanumeric_path(pathname, replace_char='_'):
""" Replace sequences of non-alphanumeric characters in string (usually a path) with specified character.

Parameters:
pathname (str): A string usually representing a pathname, but could be any string.
replace_char (str): Replacement character(s).

Returns:
str: New string with characters replaced.

"""
return re.sub(r'[^a-zA-Z0-9]+', replace_char, pathname)


def extract_suffix_path(path, prefix_path):
""" Return the suffix of path after prefix path has been removed.

Expand Down
14 changes: 11 additions & 3 deletions tests/tools/util/test_io_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import unittest
from hed.errors.exceptions import HedFileError
from hed.tools.util.io_util import check_filename, extract_suffix_path, clean_filename, \
get_dir_dictionary, get_file_list, get_path_components, get_task_from_file, parse_bids_filename, \
_split_entity, get_allowed, get_filtered_by_element
get_alphanumeric_path, get_dir_dictionary, get_file_list, get_path_components, get_task_from_file, \
parse_bids_filename, _split_entity, get_allowed, get_filtered_by_element



class Test(unittest.TestCase):
Expand Down Expand Up @@ -92,7 +93,14 @@ def test_get_allowed(self):
self.assertEqual(value1, "events", "get_allowed is case insensitive")
value2 = get_allowed(test_value1, [])
self.assertEqual(value2, test_value1)


def test_get_alphanumeric_path(self):
mypath1 = 'g:\\String1%_-sTring2\n//string3\\\\\string4.pnG'
repath1 = get_alphanumeric_path(mypath1)
self.assertEqual('g_String1_sTring2_string3_string4_pnG', repath1)
repath2 = get_alphanumeric_path(mypath1, '$')
self.assertEqual('g$String1$sTring2$string3$string4$pnG', repath2)

def test_get_dir_dictionary(self):
dir_dict = get_dir_dictionary(self.bids_dir, name_suffix="_events")
self.assertTrue(isinstance(dir_dict, dict), "get_dir_dictionary returns a dictionary")
Expand Down