diff --git a/hed/models/hed_string.py b/hed/models/hed_string.py index 123c8d6f..3db22675 100644 --- a/hed/models/hed_string.py +++ b/hed/models/hed_string.py @@ -207,8 +207,6 @@ def split_into_groups(hed_string, hed_schema, def_dict=None): current_tag_group.append(HedGroup(hed_string, startpos + delimiter_index)) if delimiter_char is HedString.CLOSING_GROUP_CHARACTER: - # if prev_delimiter == ",": - # raise ValueError(f"Closing parentheses in HED string {hed_string}") # Terminate existing group, and save it off. paren_end = startpos + delimiter_index + 1 @@ -296,14 +294,12 @@ def split_hed_string(hed_string): if char in tag_delimiters: if found_symbol: - # view_string = hed_string[last_end_pos: i] if last_end_pos != i: result_positions.append((False, (last_end_pos, i))) last_end_pos = i elif not found_symbol: found_symbol = True last_end_pos = i - current_spacing - # view_string = hed_string[tag_start_pos: last_end_pos] result_positions.append((True, (tag_start_pos, last_end_pos))) current_spacing = 0 tag_start_pos = None @@ -311,7 +307,6 @@ def split_hed_string(hed_string): # If we have a current delimiter, end it here. if found_symbol and last_end_pos is not None: - # view_string = hed_string[last_end_pos: i] if last_end_pos != i: result_positions.append((False, (last_end_pos, i))) last_end_pos = None @@ -322,10 +317,8 @@ def split_hed_string(hed_string): tag_start_pos = i if last_end_pos is not None and len(hed_string) != last_end_pos: - # view_string = hed_string[last_end_pos: len(hed_string)] result_positions.append((False, (last_end_pos, len(hed_string)))) if tag_start_pos is not None: - # view_string = hed_string[tag_start_pos: len(hed_string)] result_positions.append((True, (tag_start_pos, len(hed_string) - current_spacing))) if current_spacing: result_positions.append((False, (len(hed_string) - current_spacing, len(hed_string)))) diff --git a/hed/models/query_expressions.py b/hed/models/query_expressions.py index 203d40be..6b0daeb7 100644 --- a/hed/models/query_expressions.py +++ b/hed/models/query_expressions.py @@ -20,7 +20,8 @@ def __init__(self, token, left=None, right=None): self._match_mode = 2 token.text = token.text.replace("*", "") - def _get_parent_groups(self, search_results): + @staticmethod + def _get_parent_groups(search_results): found_parent_groups = [] if search_results: for group in search_results: @@ -41,6 +42,14 @@ def __str__(self): return output_str def handle_expr(self, hed_group, exact=False): + """Handles parsing the given expression, recursively down the list as needed. + + BaseClass implementation is search terms. + + Parameters: + hed_group(HedGroup): The object to search + exact(bool): If True, we are only looking for groups containing this term directly, not descendants. + """ if self._match_mode == 2: groups_found = hed_group.find_wildcard_tags([self.token.text], recursive=True, include_groups=2) elif self._match_mode: @@ -76,10 +85,19 @@ def handle_expr(self, hed_group, exact=False): return groups1 groups2 = self.right.handle_expr(hed_group, exact=exact) - return self.merge_groups(groups1, groups2) + return self.merge_and_groups(groups1, groups2) @staticmethod - def merge_groups(groups1, groups2): + def merge_and_groups(groups1, groups2): + """Finds any shared results + + Parameters: + groups1(list): a list of search results + groups2(list): a list of search results + + Returns: + combined_groups(list): groups in both lists narrowed down results to where none of the tags overlap + """ return_list = [] for group in groups1: for other_group in groups2: @@ -87,7 +105,8 @@ def merge_groups(groups1, groups2): # At this point any shared tags between the two groups invalidates it. if any(tag is tag2 and tag is not None for tag in group.tags for tag2 in other_group.tags): continue - merged_result = group.merge_result(other_group) + # Merge the two groups tags into one new result, now that we've verified they're unique + merged_result = group.merge_and_result(other_group) dont_add = False # This is trash and slow @@ -195,7 +214,8 @@ def __init__(self, token, left=None, right=None): super().__init__(token, left, right) self.optional = "any" - def _filter_exact_matches(self, search_results): + @staticmethod + def _filter_exact_matches(search_results): filtered_list = [] for group in search_results: if len(group.group.children) == len(group.tags): @@ -215,7 +235,7 @@ def handle_expr(self, hed_group, exact=False): # Basically if we don't have an exact match above, do the more complex matching including optional if self.left: optional_groups = self.left.handle_expr(hed_group, exact=True) - found_groups = ExpressionAnd.merge_groups(found_groups, optional_groups) + found_groups = ExpressionAnd.merge_and_groups(found_groups, optional_groups) filtered_list = self._filter_exact_matches(found_groups) if filtered_list: diff --git a/hed/models/query_handler.py b/hed/models/query_handler.py index 71b741e7..8aaf04a3 100644 --- a/hed/models/query_handler.py +++ b/hed/models/query_handler.py @@ -47,26 +47,63 @@ def __init__(self, expression_string): self.tree = self._parse(expression_string.lower()) self._org_string = expression_string + def search(self, hed_string_obj): + """Returns if a match is found in the given string + + Parameters: + hed_string_obj (HedString): String to search + + Returns: + list(SearchResult): Generally you should just treat this as a bool + True if a match was found. + """ + current_node = self.tree + + result = current_node.handle_expr(hed_string_obj) + return result + def __str__(self): return str(self.tree) def _get_next_token(self): + """Returns the current token and advances the counter""" self.at_token += 1 if self.at_token >= len(self.tokens): raise ValueError("Parse error in get next token") return self.tokens[self.at_token] def _next_token_is(self, kinds): + """Returns the current token if it matches kinds, and advances the counter""" if self.at_token + 1 >= len(self.tokens): return None if self.tokens[self.at_token + 1].kind in kinds: return self._get_next_token() return None - def current_token(self): - if self.at_token + 1 >= len(self.tokens): - return None - return self.tokens[self.at_token].text + def _parse(self, expression_string): + """Parse the string and build an expression tree""" + self.tokens = self._tokenize(expression_string) + + expr = self._handle_or_op() + + if self.at_token + 1 != len(self.tokens): + raise ValueError("Parse error in search string") + + return expr + + @staticmethod + def _tokenize(expression_string): + """Tokenize the expression string into a list""" + grouping_re = r"\[\[|\[|\]\]|\]|}|{|:" + paren_re = r"\)|\(|~" + word_re = r"\?+|\band\b|\bor\b|,|[\"_\-a-zA-Z0-9/.^#\*@]+" + re_string = fr"({grouping_re}|{paren_re}|{word_re})" + token_re = re.compile(re_string) + + tokens = token_re.findall(expression_string) + tokens = [Token(token) for token in tokens] + + return tokens def _handle_and_op(self): expr = self._handle_negation() @@ -79,10 +116,10 @@ def _handle_and_op(self): return expr def _handle_or_op(self): - expr = self._handle_and_op() # Note: calling _handle_and_op here + expr = self._handle_and_op() next_token = self._next_token_is([Token.Or]) while next_token: - right = self._handle_and_op() # Note: calling _handle_and_op here + right = self._handle_and_op() if next_token.kind == Token.Or: expr = ExpressionOr(next_token, expr, right) next_token = self._next_token_is([Token.Or]) @@ -143,31 +180,3 @@ def _handle_grouping_op(self): expr = None return expr - - def _parse(self, expression_string): - self.tokens = self._tokenize(expression_string) - - expr = self._handle_or_op() - - if self.at_token + 1 != len(self.tokens): - raise ValueError("Parse error in search string") - - return expr - - def _tokenize(self, expression_string): - grouping_re = r"\[\[|\[|\]\]|\]|}|{|:" - paren_re = r"\)|\(|~" - word_re = r"\?+|\band\b|\bor\b|,|[\"_\-a-zA-Z0-9/.^#\*@]+" - re_string = fr"({grouping_re}|{paren_re}|{word_re})" - token_re = re.compile(re_string) - - tokens = token_re.findall(expression_string) - tokens = [Token(token) for token in tokens] - - return tokens - - def search(self, hed_string_obj): - current_node = self.tree - - result = current_node.handle_expr(hed_string_obj) - return result diff --git a/hed/models/query_service.py b/hed/models/query_service.py index da77daf9..e45da5b9 100644 --- a/hed/models/query_service.py +++ b/hed/models/query_service.py @@ -21,7 +21,7 @@ def get_query_handlers(queries, query_names=None): return None, None, [f"EmptyQueries: The queries list must not be empty"] elif isinstance(queries, str): queries = [queries] - expression_parsers = [None for i in range(len(queries))] + expression_parsers = [None] * len(queries) issues = [] if not query_names: query_names = [f"query_{index}" for index in range(len(queries))] diff --git a/hed/models/query_util.py b/hed/models/query_util.py index 172c70ca..4534a62e 100644 --- a/hed/models/query_util.py +++ b/hed/models/query_util.py @@ -12,12 +12,8 @@ def __init__(self, group, tag): new_tags = tag.copy() self.tags = new_tags - def __eq__(self, other): - if isinstance(other, SearchResult): - return self.group == other.group - return other == self.group - - def merge_result(self, other): + def merge_and_result(self, other): + """Returns a new result, with the combined tags/groups from this and other.""" # Returns a new new_tags = self.tags.copy() for tag in other.tags: @@ -31,6 +27,7 @@ def merge_result(self, other): return SearchResult(self.group, new_tags) def has_same_tags(self, other): + """Checks if these two results have the same tags/groups by identity(not equality)""" if self.group != other.group: return False @@ -42,16 +39,9 @@ def has_same_tags(self, other): def __str__(self): return str(self.group) + " Tags: " + "---".join([str(tag) for tag in self.tags]) - def get_tags_only(self): - from hed import HedTag - return [tag for tag in self.tags if isinstance(tag, HedTag)] - - def get_groups_only(self): - from hed import HedTag - return [tag for tag in self.tags if not isinstance(tag, HedTag)] - class Token: + """Represents a single term/character""" And = 0 Tag = 1 DescendantGroup = 4 diff --git a/hed/schema/hed_cache.py b/hed/schema/hed_cache.py index a07888bc..d51eeafa 100644 --- a/hed/schema/hed_cache.py +++ b/hed/schema/hed_cache.py @@ -6,7 +6,7 @@ import json from hashlib import sha1 from shutil import copyfile -from hed.errors.exceptions import HedFileError, HedExceptions + import re from semantic_version import Version import portalocker @@ -14,6 +14,7 @@ from hed.schema.schema_io.schema_util import url_to_file, make_url_request from pathlib import Path import urllib +from urllib.error import URLError # From https://semver.org/#is-there-a-suggested-regular-expression-regex-to-check-a-semver-string HED_VERSION_P1 = r"(?P0|[1-9]\d*)\.(?P0|[1-9]\d*)\.(?P0|[1-9]\d*)" @@ -103,43 +104,6 @@ def get_hed_versions(local_hed_directory=None, library_name=None): return all_hed_versions -def cache_specific_url(hed_xml_url, xml_version=None, library_name=None, cache_folder=None): - """ Cache a file from a URL. - - Parameters: - hed_xml_url (str): Path to an exact file at a URL, or a GitHub API url to a directory. - xml_version (str): If not None and hed_xml_url is a directory, return this version or None. - library_name (str or None): Optional schema library name. - cache_folder (str): The path of the hed cache. Defaults to HED_CACHE_DIRECTORY. - - Returns: - str: Path to local hed XML file to use. - - """ - if not cache_folder: - cache_folder = HED_CACHE_DIRECTORY - - if not _check_if_url(hed_xml_url): - return None - - try: - if _check_if_api_url(hed_xml_url): - return _download_latest_hed_xml_version_from_url(hed_xml_url, - xml_version=xml_version, - library_name=library_name, - cache_folder=cache_folder) - - if not _check_if_specific_xml(hed_xml_url): - return None - - filename = hed_xml_url.split('/')[-1] - cache_filename = os.path.join(cache_folder, filename) - - return _cache_specific_url(hed_xml_url, cache_filename) - except urllib.error.URLError as e: - raise HedFileError(HedExceptions.URL_ERROR, str(e), hed_xml_url) from e - - def get_hed_version_path(xml_version, library_name=None, local_hed_directory=None): """ Get HED XML file path in a directory. Only returns filenames that exist. @@ -199,7 +163,7 @@ def cache_xml_versions(hed_base_urls=DEFAULT_URL_LIST, skip_folders=DEFAULT_SKIP Notes: - The Default skip_folders is 'deprecated'. - The HED cache folder defaults to HED_CACHE_DIRECTORY. - - The directories on Github are of the form: + - The directories on GitHub are of the form: https://api.github.com/repos/hed-standard/hed-schemas/contents/standard_schema/hedxml """ @@ -226,13 +190,14 @@ def cache_xml_versions(hed_base_urls=DEFAULT_URL_LIST, skip_folders=DEFAULT_SKIP _cache_hed_version(version, library_name, version_info, cache_folder=cache_folder) _write_last_cached_time(current_timestamp, cache_folder) - except portalocker.exceptions.LockException or ValueError or urllib.errors.URLError: + except portalocker.exceptions.LockException or ValueError or URLError: return -1 return 0 def _cache_specific_url(hed_xml_url, cache_filename): + """Copies a specific url to the cache at the given filename""" cache_folder = cache_filename.rpartition("/")[0] os.makedirs(cache_folder, exist_ok=True) temp_hed_xml_file = url_to_file(hed_xml_url) @@ -244,6 +209,7 @@ def _cache_specific_url(hed_xml_url, cache_filename): def _copy_installed_schemas_to_cache(cache_folder): + """Copies the schemas from the install folder to the cache""" installed_files = os.listdir(INSTALLED_CACHE_LOCATION) for install_name in installed_files: _, basename = os.path.split(install_name) @@ -291,21 +257,15 @@ def _write_last_cached_time(new_time, cache_folder): raise ValueError("Error writing timestamp to hed cache") -def _check_if_specific_xml(hed_xml_or_url): - return hed_xml_or_url.endswith(HED_XML_EXTENSION) - - -def _check_if_api_url(api_url): - return api_url.startswith("http://api.github.com") or api_url.startswith("https://api.github.com") - - def _check_if_url(hed_xml_or_url): + """Returns true if this is a url""" if hed_xml_or_url.startswith("http://") or hed_xml_or_url.startswith("https://"): return True return False def _create_xml_filename(hed_xml_version, library_name=None, hed_directory=None): + """Returns the default file name format for the given version""" if library_name: hed_xml_basename = f"{HED_XML_PREFIX}_{library_name}_{hed_xml_version}{HED_XML_EXTENSION}" else: @@ -337,7 +297,7 @@ def _get_hed_xml_versions_from_url(hed_base_url, library_name=None, Notes: - The Default skip_folders is 'deprecated'. - The HED cache folder defaults to HED_CACHE_DIRECTORY. - - The directories on Github are of the form: + - The directories on GitHub are of the form: https://api.github.com/repos/hed-standard/hed-schemas/contents/standard_schema/hedxml """ url_request = make_url_request(hed_base_url) @@ -355,7 +315,7 @@ def _get_hed_xml_versions_from_url(hed_base_url, library_name=None, sub_folder_versions = \ _get_hed_xml_versions_from_url(hed_base_url + "/" + file_entry['name'] + hedxml_suffix, skip_folders=skip_folders, get_libraries=True) - except urllib.error.URLError as e: + except urllib.error.URLError: # Silently ignore ones without a hedxml section for now. continue _merge_in_versions(all_hed_versions, sub_folder_versions) @@ -383,33 +343,18 @@ def _get_hed_xml_versions_from_url(hed_base_url, library_name=None, def _merge_in_versions(all_hed_versions, sub_folder_versions): + """Build up the version dictionary, divided by library""" for lib_name, hed_versions in sub_folder_versions.items(): if lib_name not in all_hed_versions: all_hed_versions[lib_name] = {} all_hed_versions[lib_name].update(sub_folder_versions[lib_name]) -def _download_latest_hed_xml_version_from_url(hed_base_url, xml_version, library_name, cache_folder): - latest_version, version_info = _get_latest_hed_xml_version_from_url(hed_base_url, xml_version, library_name) - if latest_version: - cached_xml_file = _cache_hed_version(latest_version, library_name, version_info, cache_folder=cache_folder) - return cached_xml_file - - -def _get_latest_hed_xml_version_from_url(hed_base_url, library_name=None, xml_version=None): - hed_versions = _get_hed_xml_versions_from_url(hed_base_url, library_name=library_name) - - if not hed_versions: - return None - - if xml_version and xml_version in hed_versions: - return xml_version, hed_versions[xml_version] - - for version in hed_versions: - return version, hed_versions[version] - - def _calculate_sha1(filename): + """ Calculate sha1 hash for filename + + Can be compared to GitHub hash values + """ try: with open(filename, 'rb') as f: data = f.read() @@ -447,6 +392,7 @@ def _safe_move_tmp_to_folder(temp_hed_xml_file, dest_filename): def _cache_hed_version(version, library_name, version_info, cache_folder): + """Cache the given hed version""" sha_hash, download_url = version_info possible_cache_filename = _create_xml_filename(version, library_name, cache_folder) diff --git a/hed/schema/hed_schema_io.py b/hed/schema/hed_schema_io.py index 8b53e6a4..34a9791f 100644 --- a/hed/schema/hed_schema_io.py +++ b/hed/schema/hed_schema_io.py @@ -2,7 +2,6 @@ import os import json import functools -import urllib.error from hed.schema.schema_io.xml2schema import SchemaLoaderXML from hed.schema.schema_io.wiki2schema import SchemaLoaderWiki @@ -15,6 +14,7 @@ from hed.schema.schema_validation_util import validate_version_string from collections import defaultdict from hed.schema.schema_io.owl_constants import ext_to_format +from urllib.error import URLError MAX_MEMORY_CACHE = 40 @@ -97,7 +97,7 @@ def load_schema(hed_path, schema_namespace=None, schema=None, file_format=None, if is_url: try: file_as_string = schema_util.url_to_string(hed_path) - except urllib.error.URLError as e: + except URLError as e: raise HedFileError(HedExceptions.URL_ERROR, str(e), hed_path) from e hed_schema = from_string(file_as_string, schema_format=os.path.splitext(hed_path.lower())[1], name=name) elif ext in ext_to_format: @@ -309,10 +309,11 @@ def parse_version_list(xml_version_list): f"Must specify a schema version by number, found no version on {xml_version_list} schema.", filename=None) if version in out_versions[schema_namespace]: - raise HedFileError(HedExceptions.SCHEMA_DUPLICATE_LIBRARY, f"Attempting to load the same library '{version}' twice: {out_versions[schema_namespace]}", + raise HedFileError(HedExceptions.SCHEMA_DUPLICATE_LIBRARY, + f"Attempting to load the same library '{version}' twice: {out_versions[schema_namespace]}", filename=None) out_versions[schema_namespace].append(version) out_versions = {key: ",".join(value) if not key else f"{key}:" + ",".join(value) for key, value in out_versions.items()} - return out_versions \ No newline at end of file + return out_versions diff --git a/hed/schema/schema_io/xml2schema.py b/hed/schema/schema_io/xml2schema.py index 8dbd4590..9206a632 100644 --- a/hed/schema/schema_io/xml2schema.py +++ b/hed/schema/schema_io/xml2schema.py @@ -4,6 +4,7 @@ from defusedxml import ElementTree import xml +from xml.etree import ElementTree import hed.schema.hed_schema_constants from hed.errors.exceptions import HedFileError, HedExceptions diff --git a/hed/tools/analysis/temporal_event.py b/hed/tools/analysis/temporal_event.py index 09cf13de..a8bc898e 100644 --- a/hed/tools/analysis/temporal_event.py +++ b/hed/tools/analysis/temporal_event.py @@ -40,7 +40,7 @@ def _split_group(self, contents): to_remove.append(item) elif item.short_base_tag.lower() == "duration": to_remove.append(item) - self.end_time = self.short_time + float(item.extension.lower()) # Will need to be fixed for units + self.end_time = self.start_time + float(item.extension.lower()) # Will need to be fixed for units elif item.short_base_tag.lower() == "def": self.anchor = item.short_tag contents.remove(to_remove) diff --git a/hed/tools/visualization/word_cloud_util.py b/hed/tools/visualization/word_cloud_util.py index 46bc6c3c..1b022577 100644 --- a/hed/tools/visualization/word_cloud_util.py +++ b/hed/tools/visualization/word_cloud_util.py @@ -54,7 +54,7 @@ def _get_contour_mask(wc, width, height): return contour -def _draw_contour(wc, img): +def _draw_contour(wc, img: Image): """Slightly tweaked copy of internal WorldCloud function to allow transparency. Parameters: @@ -92,10 +92,10 @@ def _draw_contour(wc, img): def _numpy_to_svg(contour): - """ Convert an image array to SVG. + """ Convert a numpy array to SVG. Parameters: - contour (Image): Image to be converted. + contour (np.Array): Image to be converted. Returns: str: The SVG representation. diff --git a/hed/validator/tag_util/class_util.py b/hed/validator/tag_util/class_util.py index 6ce88627..31606c61 100644 --- a/hed/validator/tag_util/class_util.py +++ b/hed/validator/tag_util/class_util.py @@ -122,7 +122,7 @@ def _get_problem_indexes(self, original_tag, stripped_value): Parameters: original_tag (HedTag): The original tag that is used to report the error. - stripped_value (str): Value stripped of white space? + stripped_value (str): value without units Returns: list: List of int locations in which error occurred. @@ -146,7 +146,7 @@ def _check_value_class(self, original_tag, stripped_value, report_as, error_code Parameters: original_tag (HedTag): The original tag that is used to report the error. - stripped_value (str): Value stripped of white space? + stripped_value (str): value without units report_as (HedTag): Report as this tag. error_code(str): The code to override the error as. Again mostly for def/def-expand tags. index_offset(int): Offset into the extension validate_text starts at. diff --git a/tests/schema/test_hed_cache.py b/tests/schema/test_hed_cache.py index c5595974..0639009f 100644 --- a/tests/schema/test_hed_cache.py +++ b/tests/schema/test_hed_cache.py @@ -42,9 +42,6 @@ def test_cache_again(self): time_since_update = hed_cache.cache_xml_versions(cache_folder=self.hed_cache_dir) self.assertGreater(time_since_update, 0) - def test_cache_specific_urls(self): - filename = hed_cache.cache_specific_url(self.specific_base_url) - self.assertTrue(os.path.exists(filename)) def test_get_cache_directory(self): from hed.schema import get_cache_directory @@ -62,13 +59,6 @@ def test_set_cache_directory(self): self.assertTrue(hed_cache.HED_CACHE_DIRECTORY == saved_cache_dir) os.rmdir(hed_cache_dir) - def test_cache_specific_url(self): - local_filename = hed_cache.cache_specific_url(self.specific_hed_url, None, cache_folder=self.hed_cache_dir) - self.assertTrue(local_filename) - - with self.assertRaises(HedFileError): - hed_cache.cache_specific_url("https://github.com/hed-standard/hed-python/notrealurl.xml") - def test_get_hed_versions_all(self): cached_versions = hed_cache.get_hed_versions(self.hed_cache_dir, library_name="all") self.assertIsInstance(cached_versions, dict)