From b4ebd73df5fdcccf008748a338ff604a333fddcd Mon Sep 17 00:00:00 2001 From: Ajai Date: Thu, 2 Jan 2025 18:31:08 +0530 Subject: [PATCH] Update utils for statvar processor (#1132) * update utils * lint fix * add chardet requirement * fix review comments * fix review comments * fix review comments * add process time, mem counters * lint * add psutil requirement --- requirements.txt | 2 + requirements_all.txt | 2 + util/config_map.py | 309 +++++++++++++------------- util/counters.py | 23 +- util/dc_api_wrapper.py | 284 +++++++++++++++--------- util/dc_api_wrapper_test.py | 18 +- util/download_util.py | 31 ++- util/file_util.py | 176 ++++++++++++++- util/statvar_dcid_generator.py | 395 ++++++++++++++++++--------------- 9 files changed, 769 insertions(+), 471 deletions(-) diff --git a/requirements.txt b/requirements.txt index f8ddfe17d7..edfbd2bc1f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ absl-py arcgis2geojson +chardet dataclasses==0.6 datacommons==1.4.3 frozendict @@ -17,6 +18,7 @@ lxml==4.9.1 numpy==1.26.4 openpyxl>=3.1.0 pandas +psutil pylint pytest requests==2.27.1 diff --git a/requirements_all.txt b/requirements_all.txt index 975edc670b..39d2259b3a 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -3,6 +3,7 @@ absl-py arcgis2geojson chembl-webresource-client>=0.10.2 +chardet dataclasses==0.6 datacommons==1.4.3 deepdiff==6.3.0 @@ -25,6 +26,7 @@ netCDF4==1.6.4 numpy==1.26.4 openpyxl>=3.1.0 pandas +psutil pylint pytest rasterio diff --git a/util/config_map.py b/util/config_map.py index 1ea640107e..dc75566d0d 100644 --- a/util/config_map.py +++ b/util/config_map.py @@ -55,28 +55,32 @@ ''' import ast +from collections import OrderedDict import collections.abc import pprint import sys +from typing import Union from absl import logging -from collections import OrderedDict -from typing import Union +import file_util class ConfigMap: - '''Class to store config mapping of named parameters to values as a dictionary.''' - - def __init__(self, - config_dict: dict = None, - filename: str = None, - config_string: str = None): - '''Create a Config Map object. - Args: - config_dict: dictionary with key:values to be loaded into the config map. - filename: override the dictionary with key:values from the file. - config_string: string of dictionary parameters to override key:values. - ''' + """Class to store config mapping of named parameters to values as a dictionary.""" + + def __init__( + self, + config_dict: dict = None, + filename: str = None, + config_string: str = None, + ): + """Create a Config Map object. + + Args: + config_dict: dictionary with key:values to be loaded into the config map. + filename: override the dictionary with key:values from the file. + config_string: string of dictionary parameters to override key:values. + """ self._config_dict = dict() # Add configs from input args. if config_dict: @@ -89,181 +93,191 @@ def __init__(self, logging.debug(f'Loaded ConfigMap: {self.get_configs()}') def load_config_file(self, filename: str) -> dict: - '''Load configs from a file overwriting any existing parameter with a new value. + """Load configs from a file overwriting any existing parameter with a new value. - Args: - filename: a py or json file with a dictionary of parameter:value mappings. + Args: + filename: a py or json file with a dictionary of parameter:value + mappings. - Returns: - dictionary with all config parameters after updates from the file. - ''' + Returns: + dictionary with all config parameters after updates from the file. + """ if filename: self.add_configs(read_py_dict_from_file(filename)) return self._config_dict def load_config_string(self, config_params_str: str) -> dict: - '''Loads a JSON config dictionary overriding existing configs. + """Loads a JSON config dictionary overriding existing configs. - Args: - config_params_str: JSON string with a dictionary of parameter:value mappings. + Args: + config_params_str: JSON string with a dictionary of parameter:value + mappings. - Returns: - dictionary with all config parameters after updates. - ''' + Returns: + dictionary with all config parameters after updates. + """ if config_params_str: param_dict = ast.literal_eval(config_params_str) self.add_configs(param_dict) return self._config_dict def add_configs(self, configs: dict) -> dict: - '''Add new or replace existing config parameters - - Nested parameters with dict, or list values are replaced. - Use update_config() for a deep-update of nested parameters. + """Add new or replace existing config parameters + + Nested parameters with dict, or list values are replaced. + Use update_config() for a deep-update of nested parameters. + + For example, assume config-dict has a nested dict: + with an config dict set as follows: self._config_dict = { + 'int-param': 10, + 'nested-dict1': { + 'param1': 123, + } + } + add_config({ 'nested-dict1': { 'param2': abc }) + will return { + 'int-param': 10, + 'nested-dict1': { + 'param2': abc, # older key:values from nested-dict removed. + } + } - For example, assume config-dict has a nested dict: - with an config dict set as follows: self._config_dict = { - 'int-param': 10, - 'nested-dict1': { - 'param1': 123, - } - } - add_config({ 'nested-dict1': { 'param2': abc }) - will return { - 'int-param': 10, - 'nested-dict1': { - 'param2': abc, # older key:values from nested-dict removed. - } - } - - Args: - configs: dictionary with new parameter:value mappings - that are updated into existing dict. - Nested dict objects within the dict are replaced. + Args: + configs: dictionary with new parameter:value mappings that are updated + into existing dict. Nested dict objects within the dict are replaced. - Returns: - dictionary with all parameter:value mappings. - ''' + Returns: + dictionary with all parameter:value mappings. + """ if configs: self._config_dict.update(configs) return self._config_dict def update_config(self, configs: dict) -> dict: - '''Does a deep update of the dict updating nested dicts as well. - For example, assume config-dict has a nested dict: - self._config_dict = { - 'nested-dict1': { - 'param1': 123, - 'nested-dict2': { - 'param2': 345, - } - } + """Does a deep update of the dict updating nested dicts as well. + + For example, assume config-dict has a nested dict: + + self._config_dict = { + 'nested-dict1': { + 'param1': 123, + 'nested-dict2': { + 'param2': 345, } + } + } + + update_config(configs={ + 'nested-dict1': { + 'param1': 321, + 'param1-2': 456, + 'nested-dict2': { + 'param2-1': 789, + }, + }) + + will result in an updated config_dict: + { + 'nested-dict1': { + 'param1': 321, # updated + 'param1-2': 456, # added + 'nested-dict2': { + 'param2': 345, # original + 'param2-1': 789, # added + }, + } + + Args: + configs: dictionary with additional parameter:value mappings. - update_config(configs={ - 'nested-dict1': { - 'param1': 321, - 'param1-2': 456, - 'nested-dict2': { - 'param2-1': 789, - }, - }) - - will result in an updated config_dict: - { - 'nested-dict1': { - 'param1': 321, # updated - 'param1-2': 456, # added - 'nested-dict2': { - 'param2': 345, # original - 'param2-1': 789, # added - }, - } - - Args: - configs: dictionary with additional parameter:value mappings. - - Returns: - dictionary with all parameter:value mappings. - ''' + Returns: + dictionary with all parameter:value mappings. + """ return _deep_update(self._config_dict, configs) def get(self, parameter: str, default_value=None) -> Union[str, int, float, list, dict]: - '''Return the value of a named config parameter. + """Return the value of a named config parameter. - Args: - parameter: name of the parameter to lookup - default_value: Default value to be returned if the parameter doesn't exist. + Args: + parameter: name of the parameter to lookup + default_value: Default value to be returned if the parameter doesn't + exist. - Returns: - value of the parameter in the config dict if it exists or the default_value. - ''' + Returns: + value of the parameter in the config dict if it exists or the + default_value. + """ return self._config_dict.get(parameter, default_value) def get_configs(self) -> dict: - '''Return a reference to the config dictionary. + """Return a reference to the config dictionary. - Any modifications to the dict is reflected within this object as well. - ''' + Any modifications to the dict is reflected within this object as well. + """ return self._config_dict def set_config(self, parameter: str, value): - '''Set the value for a parameter overwriting one if it already exists - Args: - parameter: Name of the parameter - value: Value to be set. - ''' + """Set the value for a parameter overwriting one if it already exists + + Args: + parameter: Name of the parameter + value: Value to be set. + """ self._config_dict[parameter] = value def get_config_str(self) -> str: - '''Returns the config dictionary as a pretty string.''' + """Returns the config dictionary as a pretty string.""" return pprint.pformat(self._config_dict, indent=4) def write_config(filename: str): - '''Write the config dictionary into a file. + """Write the config dictionary into a file. - Args: - filename: name of the file to write. - ''' + Args: + filename: name of the file to write. + """ with open(filename, 'w') as file: file.write(self.get_config_str()) def get_config_map_from_file(filename: str) -> ConfigMap: - '''Returns a ConfigMap object with parameters loaded from a file. + """Returns a ConfigMap object with parameters loaded from a file. - Args: - filename: name of the file to load. + Args: + filename: name of the file to load. - Returns: - ConfigMap object with all the parameters loaded into the config_dict. - ''' + Returns: + ConfigMap object with all the parameters loaded into the config_dict. + """ return ConfigMap(filename=filename) def _deep_update(src: dict, add_dict: dict) -> dict: - '''Deep update of parameters in add_dict into src. + """Deep update of parameters in add_dict into src. - Args: - src: source dictionary into which new parameters are added. - add_dict: dictionary with new parameters to be added. + Args: + src: source dictionary into which new parameters are added. + add_dict: dictionary with new parameters to be added. - Returns: - src dictionary with updated parameters. + Returns: + src dictionary with updated parameters. - Note: - Assumes the new dictionary has same type(dict/list) for updated parameters. - ''' + Note: + Assumes the new dictionary has same type(dict/list) for updated parameters. + """ for k, v in add_dict.items(): if isinstance(v, collections.abc.Mapping): src[k] = _deep_update(src.get(k, {}), v) elif isinstance(v, list): # TODO: deep update of list + if k not in src: + src[k] = list() src[k].extend(v) elif isinstance(v, set): # TODO: deep update of set + if k not in src: + src[k] = set() src[k].update(v) else: src[k] = v @@ -271,38 +285,31 @@ def _deep_update(src: dict, add_dict: dict) -> dict: def read_py_dict_from_file(filename: str) -> dict: - '''Read a python dict from a file. - - Args: - filename: JSON or a python file containing dict of parameter to value mappings. - The file can have comments and extra commas at the end. - Example: '{ 'abc': 123, 'def': 'lmn' } - Note: It assumes bools are in Python: True, False and None is used for 'null'. - - Returns: - dictionary loaded from the file. - - Raises: - exceptions on parsing errors string dict from literal_eval() - ''' - logging.info(f'Reading python dict from {filename}...') - with open(filename) as file: - dict_str = file.read() - - # Load the map assuming a python dictionary. - # Can also be used with JSON with trailing commas and comments. - param_dict = ast.literal_eval(dict_str) + """Read a python dict from a file. + + Args: + filename: JSON or a python file containing dict of parameter to value + mappings. The file can have comments and extra commas at the end. + Example: '{ 'abc': 123, 'def': 'lmn' } + Note: It assumes bools are in Python: True, False and None is used for + 'null'. + + Returns: + dictionary loaded from the file. + + Raises: + exceptions on parsing errors string dict from literal_eval() + """ + param_dict = file_util.file_load_py_dict(filename) logging.debug(f'Loaded {filename} into dict {param_dict}') return param_dict def write_py_dict_to_file(py_dict: dict, filename: str): - '''Write a python dict into a file. + """Write a python dict into a file. - Args: - py_dict: Dictionary to save into the file. - filename: file to write into. - ''' - logging.info(f'Writing python dict into {filename}') - with open(filename, 'w') as file: - file.write(pprint.pformat(py_dict, indent=4)) + Args: + py_dict: Dictionary to save into the file. + filename: file to write into. + """ + file_util.file_write_py_dict(py_dict, filename) diff --git a/util/counters.py b/util/counters.py index 65e43c4801..6f7a0a3285 100644 --- a/util/counters.py +++ b/util/counters.py @@ -13,6 +13,8 @@ # limitations under the License. '''Class for dictionary of named counters.''' +import os +import psutil import sys import time @@ -89,7 +91,7 @@ def __init__(self, def __del__(self): '''Log the counters.''' - self._update_processing_rate() + self._update_periodic_counters() logging.info(self.get_counters_string()) def add_counter(self, @@ -212,7 +214,7 @@ def print_counters(self, file=sys.stderr): Args: file: file handle to emit counters string. ''' - self._update_processing_rate() + self._update_periodic_counters() print(self.get_counters_string(), file=file) def print_counters_periodically(self): @@ -234,7 +236,7 @@ def reset_start_time(self): def set_prefix(self, prefix: str): '''Set the prefix for the counter names. Also resets the start_time and processing rate counters.''' - self._update_processing_rate() + self._update_periodic_counters() self._prefix = prefix self.reset_start_time() logging.info(self.get_counters_string()) @@ -251,6 +253,11 @@ def _get_counter_name(self, name: str, debug_context: str = None): name = name + f'_{debug_context}' return name + def _update_periodic_counters(self): + '''Update periodic counters.''' + self._update_processing_rate() + self._update_process_counters() + def _update_processing_rate(self): '''Update the processing rate and remaining time. Uses the option: 'processed' to get the counter for processing rate @@ -271,3 +278,13 @@ def _update_processing_rate(self): if totals: self.set_counter('process_remaining_time', max(0, (totals - num_processed)) / rate) + + def _update_process_counters(self): + '''Update process counters for memory and time.''' + process = psutil.Process(os.getpid()) + mem = process.memory_info() + self.max_counter('process-mem-rss', mem.rss) + self.max_counter('process-mem', mem.vms) + cpu_times = process.cpu_times() + self.set_counter('process-time-user-secs', cpu_times.user) + self.set_counter('process-time-sys-secs', cpu_times.system) diff --git a/util/dc_api_wrapper.py b/util/dc_api_wrapper.py index b0f12dfb8b..9024fa123f 100644 --- a/util/dc_api_wrapper.py +++ b/util/dc_api_wrapper.py @@ -11,17 +11,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -'''Wrapper utilities for data commons API.''' +"""Wrapper utilities for data commons API.""" -import sys +from collections import OrderedDict import os -import datacommons as dc -import requests_cache +import sys import time import urllib from absl import logging -from collections import OrderedDict +import datacommons as dc +import requests_cache _SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(_SCRIPT_DIR) @@ -39,28 +39,33 @@ _DC_API_PATH_RESOLVE_COORD = '/v1/recon/resolve/coordinate' -def dc_api_wrapper(function, - args: dict, - retries: int = 3, - retry_secs: int = 1, - use_cache: bool = False, - api_root: str = None): - '''Wrapper for a DC APi call with retries and caching. - Returns the result from the DC APi call function. - In case of errors, retries the function with a delay a fixed number of times. - - Args: - function: The DataCommons API function. - args: dictionary with any the keyword arguments for the DataCommons API function. - retries: Number of retries in case of HTTP errors. - retry_sec: Interval in seconds between retries for which caller is blocked. - use_cache: If True, uses request cache for faster response. - api_root: The API server to use. Default is 'http://api.datacommons.org'. - To use autopush with more recent data, set it to 'http://autopush.api.datacommons.org' - - Returns: - The response from the DataCommons API call. - ''' +def dc_api_wrapper( + function, + args: dict, + retries: int = 3, + retry_secs: int = 1, + use_cache: bool = False, + api_root: str = None, +): + """Wrapper for a DC APi call with retries and caching. + + Returns the result from the DC APi call function. In case of errors, retries + the function with a delay a fixed number of times. + + Args: + function: The DataCommons API function. + args: dictionary with any the keyword arguments for the DataCommons API + function. + retries: Number of retries in case of HTTP errors. + retry_sec: Interval in seconds between retries for which caller is blocked. + use_cache: If True, uses request cache for faster response. + api_root: The API server to use. Default is 'http://api.datacommons.org'. To + use autopush with more recent data, set it to + 'http://autopush.api.datacommons.org' + + Returns: + The response from the DataCommons API call. + """ if api_root: dc.utils._API_ROOT = api_root logging.debug(f'Setting DC API root to {api_root} for {function}') @@ -80,42 +85,54 @@ def dc_api_wrapper(function, for attempt in range(retries): try: logging.debug( - f'Invoking DC API {function}, #{attempt} with {args}, retries={retries}' - ) + f'Invoking DC API {function}, #{attempt} with {args},' + f' retries={retries}') response = function(**args) logging.debug( f'Got API response {response} for {function}, {args}') return response - except KeyError: - # Exception in case of API error. + except KeyError as e: + # Exception in case of missing dcid. Don't retry. + logging.error(f'Got exception for api: {function}, {e}') return None - except urllib.error.URLError: + except (urllib.error.URLError, urllib.error.HTTPError, + ValueError) as e: # Exception when server is overloaded, retry after a delay if attempt >= retries: + logging.error( + f'Got exception for api: {function}, {e}, no more retries' + ) raise urllib.error.URLError else: logging.debug( - f'Retrying API {function} after {retry_secs}...') + f'Got exception {e}, retrying API {function} after' + f' {retry_secs}...') time.sleep(retry_secs) return None -def dc_api_batched_wrapper(function, - dcids: list, - args: dict, - config: dict = None) -> dict: - '''A wrapper for DC API on dcids with batching support. +def dc_api_batched_wrapper( + function, + dcids: list, + args: dict, + dcid_arg_kw: str = 'dcid', + headers: dict = {}, + config: dict = None, +) -> dict: + """A wrapper for DC API on dcids with batching support. + Returns the dictionary result for the function call across all arguments. - It batches the dcids to make multiple calls to the DC API and merges all results. + It batches the dcids to make multiple calls to the DC API and merges all + results. Args: function: DC API to be invoked. It should have dcids as one of the arguments and should return a dictionary with dcid as the key. - dcids: List of dcids to be invoked with the function. - The namespace is stripped from the dcid before the call to the DC API. + dcids: List of dcids to be invoked with the function. The namespace is + stripped from the dcid before the call to the DC API. args: Additional arguments for the function call. - config: dictionary of DC API configuration settings. - The supported settings are: + config: dictionary of DC API configuration settings. The supported settings + are: dc_api_batch_size: Number of dcids to invoke per API call. dc_api_retries: Number of times an API can be retried. dc_api_retry_sec: Interval in seconds between retries. @@ -124,7 +141,7 @@ def dc_api_batched_wrapper(function, Returns: Merged function return values across all dcids. - ''' + """ if not config: config = {} api_result = {} @@ -132,8 +149,8 @@ def dc_api_batched_wrapper(function, num_dcids = len(dcids) api_batch_size = config.get('dc_api_batch_size', dc.utils._MAX_LIMIT) logging.debug( - f'Calling DC API {function} on {len(dcids)} dcids in batches of {api_batch_size} with args: {args}...' - ) + f'Calling DC API {function} on {len(dcids)} dcids in batches of' + f' {api_batch_size} with args: {args}...') while index < num_dcids: # dcids in batches. dcids_batch = [ @@ -141,11 +158,14 @@ def dc_api_batched_wrapper(function, ] index += api_batch_size args['dcids'] = dcids_batch - batch_result = dc_api_wrapper(function, args, - config.get('dc_api_retries', 3), - config.get('dc_api_retry_secs', 5), - config.get('dc_api_use_cache', False), - config.get('dc_api_root', None)) + batch_result = dc_api_wrapper( + function, + args, + config.get('dc_api_retries', 3), + config.get('dc_api_retry_secs', 5), + config.get('dc_api_use_cache', False), + config.get('dc_api_root', None), + ) if batch_result: api_result.update(batch_result) logging.debug(f'Got DC API result for {function}: {batch_result}') @@ -155,17 +175,19 @@ def dc_api_batched_wrapper(function, def dc_api_is_defined_dcid(dcids: list, wrapper_config: dict = None) -> dict: - '''Returns a dicttionary with dcids mapped to True/False based on whether - the dcid is defined in the API and has a 'typeOf' property. - Uses the property_value() DC API to lookup 'typeOf' for each dcid. - dcids not defined in KG get a value of False. - Args: - dcids: List of dcids. The namespace is stripped from the dcid. - wrapper_config: dictionary of configurationparameters for the wrapper. - See dc_api_batched_wrapper and dc_api_wrapper for details. - Returns: - dictionary with each input dcid mapped to a True/False value. - ''' + """Returns a dictionary with dcids mapped to True/False based on whether + + the dcid is defined in the API and has a 'typeOf' property. + Uses the property_value() DC API to lookup 'typeOf' for each dcid. + dcids not defined in KG get a value of False. + Args: + dcids: List of dcids. The namespace is stripped from the dcid. + wrapper_config: dictionary of configurationparameters for the wrapper. See + dc_api_batched_wrapper and dc_api_wrapper for details. + + Returns: + dictionary with each input dcid mapped to a True/False value. + """ api_function = dc.get_property_values args = { 'prop': 'typeOf', @@ -183,26 +205,68 @@ def dc_api_is_defined_dcid(dcids: list, wrapper_config: dict = None) -> dict: return response +def dc_api_get_node_property(dcids: list, + prop: str, + wrapper_config: dict = None) -> dict: + """Returns a dictionary keyed by dcid with { prop:value } for each dcid. + + Uses the get_property_values() DC API to lookup the property for each dcid. + + Args: + dcids: List of dcids. The namespace is stripped from the dcid. + wrapper_config: dictionary of configurationparameters for the wrapper. See + dc_api_batched_wrapper and dc_api_wrapper for details. + + Returns: + dictionary with each input dcid mapped to a True/False value. + """ + api_function = dc.get_property_values + args = { + 'prop': prop, + 'out': True, + } + api_result = dc_api_batched_wrapper(api_function, dcids, args, + wrapper_config) + response = {} + for dcid in dcids: + dcid_stripped = _strip_namespace(dcid) + value = api_result.get(dcid_stripped) + if value: + response[dcid] = {prop: value} + return response + + def dc_api_get_node_property_values(dcids: list, wrapper_config: dict = None) -> dict: - '''Returns all the property values for a set of dcids from the DC API. - Args: - dcids: list of dcids to lookup - wrapper_config: configuration parameters for the wrapper. - See dc_api_batched_wrapper() and dc_api_wrapper() for details. - Returns: - dictionary with each dcid with the namspace 'dcid:' as the key - mapped to a dictionary of property:value. - ''' + """Returns all the property values for a set of dcids from the DC API. + + Args: + dcids: list of dcids to lookup + wrapper_config: configuration parameters for the wrapper. See + dc_api_batched_wrapper() and dc_api_wrapper() for details. + + Returns: + dictionary with each dcid with the namspace 'dcid:' as the key + mapped to a dictionary of property:value. + """ predefined_nodes = OrderedDict() api_function = dc.get_triples api_triples = dc_api_batched_wrapper(api_function, dcids, {}, wrapper_config) if api_triples: for dcid, triples in api_triples.items(): + if (_strip_namespace(dcid) not in dcids and + _add_namespace(dcid) not in dcids): + continue pvs = {} for d, prop, val in triples: - pvs[prop] = val + if d == dcid and val: + # quote string values with spaces if needed + if ' ' in val and val[0] != '"': + val = '"' + val + '"' + if prop in pvs: + val = pvs[prop] + ',' + val + pvs[prop] = val if len(pvs) > 0: if 'Node' not in pvs: pvs['Node'] = _add_namespace(dcid) @@ -210,16 +274,16 @@ def dc_api_get_node_property_values(dcids: list, return predefined_nodes -def dc_api_resolve_placeid(dcids: list) -> dict: - '''Returns the resolved dcid for each of the placeid. +def dc_api_resolve_placeid(dcids: list, in_prop: str = 'placeId') -> dict: + """Returns the resolved dcid for each of the placeid. - Args: - dcids: list of placeids to be resolved. + Args: + dcids: list of placeids to be resolved. - Returns: - dictionary keyed by input placeid with reoslved dcid as value. - ''' - data = {'in_prop': 'placeId', 'out_prop': 'dcid'} + Returns: + dictionary keyed by input placeid with reoslved dcid as value. + """ + data = {'in_prop': in_prop, 'out_prop': 'dcid'} data['ids'] = dcids num_ids = len(dcids) api_url = dc.utils._API_ROOT + _DC_API_PATH_RESOLVE_ID @@ -241,14 +305,14 @@ def dc_api_resolve_placeid(dcids: list) -> dict: def dc_api_resolve_latlng(dcids: list) -> dict: - '''Returns the resolved dcid for each of the placeid. + """Returns the resolved dcid for each of the placeid. - Args: - dcids: list of placeids to be resolved. + Args: + dcids: list of placeids to be resolved. - Returns: - dictionary keyed by input placeid with reoslved dcid as value. - ''' + Returns: + dictionary keyed by input placeid with reoslved dcid as value. + """ data = {} data['coordinates'] = dcids num_ids = len(dcids) @@ -264,8 +328,8 @@ def dc_api_resolve_latlng(dcids: list) -> dict: if recon_resp: for entity in recon_resp.get('placeCoordinates', []): dcids = entity.get('placeDcids', '') - lat = entity.get("latitude", "") - lng = entity.get("longitude", "") + lat = entity.get('latitude', '') + lng = entity.get('longitude', '') place_id = f'{lat}{lng}' if place_id and dcids: results[place_id] = entity @@ -273,17 +337,19 @@ def dc_api_resolve_latlng(dcids: list) -> dict: def _add_namespace(value: str, namespace: str = 'dcid') -> str: - '''Returns the value with a namespace prefix for references. - Args: - value: string to which namespace is to be added. - Returns: - value with the namespace prefix if the value is not a quoted string - and doesn't have a namespace already. - O/w return the value as is. - - Any sequence of letters followed by a ':' is treated as a namespace. - Quoted strings are assumed to start with '"' and won't get a namespace. - ''' + """Returns the value with a namespace prefix for references. + + Args: + value: string to which namespace is to be added. + + Returns: + value with the namespace prefix if the value is not a quoted string + and doesn't have a namespace already. + O/w return the value as is. + + Any sequence of letters followed by a ':' is treated as a namespace. + Quoted strings are assumed to start with '"' and won't get a namespace. + """ if value and isinstance(value, str): if value[0].isalpha() and value.find(':') < 0: return f'{namespace}:{value}' @@ -291,15 +357,17 @@ def _add_namespace(value: str, namespace: str = 'dcid') -> str: def _strip_namespace(value: str) -> str: - '''Returns the value without the namespace prefix. - Args: - value: string from which the namespace prefix is to be removed. - Returns: - value without the namespace prefix if there was a namespace - - Any sequence of letters followed by a ':' is treated as a namespace. - Quoted strings are assumed to start with '"' and won't be filtered. - ''' + """Returns the value without the namespace prefix. + + Args: + value: string from which the namespace prefix is to be removed. + + Returns: + value without the namespace prefix if there was a namespace + + Any sequence of letters followed by a ':' is treated as a namespace. + Quoted strings are assumed to start with '"' and won't be filtered. + """ if value and isinstance(value, str) and value[0].isalnum(): return value[value.find(':') + 1:].strip() return value diff --git a/util/dc_api_wrapper_test.py b/util/dc_api_wrapper_test.py index 099f1be159..9d95c9151a 100644 --- a/util/dc_api_wrapper_test.py +++ b/util/dc_api_wrapper_test.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -'''Tests for dc_api_wrapper.''' +"""Tests for dc_api_wrapper.""" import os import sys @@ -30,7 +30,7 @@ class TestDCAPIWrapper(unittest.TestCase): def test_dc_api_wrapper(self): - '''Test the wrapper for DC API.''' + """Test the wrapper for DC API.""" api_function = dc.get_property_labels dcids = [ 'Count_Person', # 'dcid:' namespace will be removed. @@ -42,7 +42,7 @@ def test_dc_api_wrapper(self): self.assertTrue('typeOf' in response['Count_Person']) def test_dc_api_batched_wrapper(self): - '''Test DC API wrapper for batched calls.''' + """Test DC API wrapper for batched calls.""" api_function = dc.get_property_values dcids = [ 'Count_Person', # Statvar defined in DC @@ -58,7 +58,7 @@ def test_dc_api_batched_wrapper(self): self.assertFalse(response['NewStatVar_NotInDC']) def test_dc_api_is_defined_dcid(self): - '''Test API wrapper for defined DCIDs.''' + """Test API wrapper for defined DCIDs.""" dcids = [ 'geoId/06', # Geo Id defined. 'country/ZZZ', # Geo Id not defined. @@ -66,10 +66,12 @@ def test_dc_api_is_defined_dcid(self): 'schema:Year', # Class ] response = dc_api.dc_api_is_defined_dcid( - dcids, { + dcids, + { 'dc_api_batch_size': 2, - 'dc_api_root': 'http://autopush.api.datacommons.org' - }) + 'dc_api_root': 'http://autopush.api.datacommons.org', + }, + ) self.assertTrue(response is not None) self.assertEqual(len(response), len(dcids)) self.assertTrue(response['geoId/06']) @@ -79,7 +81,7 @@ def test_dc_api_is_defined_dcid(self): self.assertTrue(response['dcs:value']) def test_dc_get_node_property_values(self): - '''Test API wrapper to get all property:values for a node.''' + """Test API wrapper to get all property:values for a node.""" node_pvs = dc_api.dc_api_get_node_property_values(['dcs:Count_Person']) self.assertTrue(node_pvs) # Verify the resposnse has dcid with the namespace prefix 'dcid:' diff --git a/util/download_util.py b/util/download_util.py index aac96a10df..091a251773 100644 --- a/util/download_util.py +++ b/util/download_util.py @@ -90,6 +90,7 @@ def test_my_function(self): def request_url(url: str, params: dict = {}, method: str = 'GET', + headers: dict = {}, output: str = 'text', timeout: int = 30, retries: int = 3, @@ -147,12 +148,18 @@ def request_url(url: str, for attempt in range(retries): try: logging.debug( - f'Downloading URL {url}, params:{params}, {method} #{attempt}, retries={retries}' + f'Downloading URL {url}, headers:{headers} params:{params}, {method} #{attempt}, retries={retries}' ) if 'get' in method.lower(): - response = requests.get(url, params=params, timeout=timeout) + response = requests.get(url, + headers=headers, + params=params, + timeout=timeout) else: - response = requests.post(url, json=params, timeout=timeout) + response = requests.post(url, + headers=headers, + json=params, + timeout=timeout) logging.debug( f'Got API response {response} for {url}, {params}') if response.ok: @@ -166,14 +173,16 @@ def request_url(url: str, # Exception in case of API error. return None except (requests.exceptions.ConnectTimeout, - urllib.error.URLError) as e: - # Exception when server is overloaded, retry after a delay - if attempt >= retries: - raise urllib.error.URLError - else: - logging.debug( - f'Retrying URL {url} after {retry_secs} secs ...') - time.sleep(retry_secs) + requests.exceptions.ConnectionError, urllib.error.URLError, + urllib.error.HTTPError) as e: + logging.debug(f'Got exception {e} for {url}, {params}') + + # retry in case of errors + if attempt >= retries: + raise urllib.error.URLError + else: + logging.debug(f'Retrying URL {url} after {retry_secs} secs ...') + time.sleep(retry_secs) return None diff --git a/util/file_util.py b/util/file_util.py index d961d13e95..d858c36a06 100644 --- a/util/file_util.py +++ b/util/file_util.py @@ -18,22 +18,27 @@ """ import ast +import chardet import csv import fnmatch import glob +import gspread +import io import json import os import pickle import pprint import sys import tempfile -from typing import Union + +import numpy as np from absl import app from absl import logging from aggregation_util import aggregate_dict, aggregate_value from google.cloud import storage -import gspread +from retry.api import retry_call +from typing import Union class FileIO: @@ -428,9 +433,12 @@ def file_get_name(file_path: str, Returns: file name combined from path, suffix and extension. """ - # Create the file directory if it doesn't exist. + if not file_path: + return None if file_is_google_spreadsheet(file_path): + # Don't modify spreadsheets return file_path + # Create the file directory if it doesn't exist. file_makedirs(file_path) file_prefix, ext = os.path.splitext(file_path) if file_prefix.endswith(suffix): @@ -508,6 +516,7 @@ def file_load_csv_dict( value_column: str = None, delimiter: str = ',', config: dict = {}, + key_index: bool = False, ) -> dict: """Returns a dictionary loaded from a CSV file. @@ -540,18 +549,26 @@ def file_load_csv_dict( config: dictionary of aggregation settings in case there are multiple rows with the same key. refer to aggregation_util.aggregate_dict for config settings. - + key_index: if True, each row is loaded with a unique key for row index. + Overrides key_column and uses index as key. Returns: dictionary of {key:value} loaded from the CSV file. """ csv_dict = {} input_files = file_get_matching(filename) logging.debug(f'Loading dict from csv files: {input_files}') + if key_column and key_index: + raise ValueError( + f'Both Key_column: {key_column} and key_index set for {filename}') + for filename in input_files: num_rows = 0 # Load each CSV file with FileIO(filename) as csvfile: - reader = csv.DictReader(csvfile, delimiter=delimiter) + reader = csv.DictReader( + csvfile, + **file_get_csv_reader_options(csvfile, + {'delimiter': delimiter})) if reader.fieldnames: # Get the key and value column names if not key_column: @@ -567,7 +584,9 @@ def file_load_csv_dict( for row in reader: # Get the key for the row. key = None - if key_column in row: + if key_index: + key = len(csv_dict) + elif key_column in row: key = row.pop(key_column) # Get the value for the key value = None @@ -597,7 +616,8 @@ def file_load_csv_dict( def file_write_csv_dict(py_dict: dict, filename: str, - columns: list = None) -> list: + columns: list = None, + key_column_name: str = 'key') -> list: """Returns the filename after writing py_dict with a csv row per item. Each dictionary items is written as a row in the CSV file. @@ -629,6 +649,9 @@ def file_write_csv_dict(py_dict: dict, is used as the key's column name. If no columns are specified for values, column names are picked from each entry's value if the value is a dict. Else the value is written as column name 'value'. + key_column_name: name of the column used as key. + if '', the first column is used as key. + if set to None, the key is ignored. Returns: list of columns written to the output csv @@ -638,8 +661,10 @@ def file_write_csv_dict(py_dict: dict, # Get the list of columns value_column_name = '' if not columns: + columns = [] # Add a columns for key. - columns = ['key'] + if key_column_name: + columns.append(key_column_name) if len(columns) <= 1: # Get columns across all entries. for key, value in py_dict.items(): @@ -652,7 +677,8 @@ def file_write_csv_dict(py_dict: dict, value_column_name = 'value' columns.append(value_column_name) # Use the first column for the key. - key_column_name = columns[0] + if key_column_name is '': + key_column_name = columns[0] # Get the output filename output_files = file_get_matching(filename) @@ -799,7 +825,9 @@ def file_is_google_spreadsheet(filename: str) -> bool: return False -def file_open_google_spreadsheet(url: str) -> gspread.spreadsheet.Spreadsheet: +def file_open_google_spreadsheet(url: str, + retries: int = 3 + ) -> gspread.spreadsheet.Spreadsheet: """Returns the google spreasheet handle. Assumes caller has access to the spreadsheet. @@ -811,7 +839,14 @@ def file_open_google_spreadsheet(url: str) -> gspread.spreadsheet.Spreadsheet: google spreadsheet object for the given url """ # Get a handle for the whole spreadsheet - gs = _file_get_gspread_client().open_by_url(url) + gs = retry_call( + _file_get_gspread_client().open_by_url, + f_args=[url], + exceptions=gspread.exceptions.APIError, + tries=retries, + ) + if gs is None: + logging.error(f'Failed to open {url}') return gs @@ -943,7 +978,10 @@ def file_copy_to_spreadsheet(filename: str, # Read the rows from the source file rows = [] with FileIO(filename) as file: - csv_reader = csv.reader(file, skipinitialspace=True, escapechar='\\') + csv_reader = csv.reader(file, + skipinitialspace=True, + escapechar='\\', + **file_get_csv_reader_options(file)) for row in csv_reader: rows.append(row) @@ -963,6 +1001,120 @@ def file_copy_to_spreadsheet(filename: str, return ws.url +def file_get_sample_bytes(file: str, byte_count: int = 4096) -> bytes: + """Returns sample bytes from file. + + Args: + file: a file name or an open file handle. + byte_count: buyes to be returned. + + Returns: + bytes of the given byte_count. + The file handle is reset to the start. + """ + if isinstance(file, io.TextIOWrapper): + # File is a handle. Get the filename + file = file.name + if isinstance(file, str): + logging.debug(f'Getting sample {byte_count} bytes from {file}') + with FileIO(file, 'rb') as fh: + return fh.read(byte_count) + else: + return b'' + + +def file_get_encoding(file: str, + rawdata: bytes = None, + default: str = 'utf-8-sig') -> str: + """Returns the encoding for the file + + Args: + file: filename whose encoding is required. + rawdata: content whose encoding is to be detected if available. + default: default encoding to be retruned if it can't be determined. + + Returns: + string with encoding such as 'utf8' + """ + if rawdata is None: + rawdata = file_get_sample_bytes(file) + encoding_result = chardet.detect(rawdata) + if encoding_result: + encoding = encoding_result.get('encoding') + if encoding: + return encoding + return default + + +def file_get_csv_reader_options( + file: str, + default_options: dict = {}, + data: str = None, + encoding: str = None, + delim_chars: list = [',', ' ', ';', '|', ':']) -> dict: + """Returns a dictionary with options for the CSV file reader. + + Args: + file: name of the csv file to get encoding + default_options: default options returned if not detected + such as 'delimiter'. + data: string for which delimiter is to be detected + If data is not given, sample data is read from the file. + encoding: character encoding in the file. + delim_chars: list of possible delimiter characters. + If not set, non-alphanumeric characters from the first line + are used as candidate delimiter characters. + + Returns: + dict with the following: + 'delimiter': delimiter character for CSV files. + 'dialect': File dialect, such as 'unix', 'excel' + """ + result = dict(default_options) + + if data is None: + # Get data from file decoded with the right encoding + rawdata = file_get_sample_bytes(file) + if encoding is None: + encoding = file_get_encoding(file, rawdata=rawdata) + data = rawdata.decode(encoding) + + # Get the dialect for the data + try: + dialect = csv.Sniffer().sniff(data) + except csv.Error: + # Use default as excel as it may not be detected well. + dialect = 'excel' + if dialect: + result['dialect'] = dialect + + # Get CSV delimiter by counting possible delimiter characters + # across rows and picking the most common delimiter. + rows = data.split('\n') + if not delim_chars: + # Get non alphanumeric characters from data. + delim_chars = {c for c in rows[0].strip() if not c.isalnum()} + logging.debug(f'Looking for delimiter in %s among %s', file, + delim_chars) + char_counts = {c: [] for c in delim_chars} + for index in range(1, len(rows) - 1): + # Count possible delimiter characters per row + row = rows[index] + for char in delim_chars: + char_counts[char].append(row.count(char)) + # Get the char with the same count across rows. + for c in char_counts.keys(): + c_counts = char_counts[c] + if c_counts: + c_min = min(c_counts) + c_med = np.median(c_counts) + if c_min > 0 and c_min == c_med: + result['delimiter'] = c + break + logging.debug('Got options for file: %s: result = %s', file, result) + return result + + def file_is_csv(filename: str) -> bool: """Returns True is the file has a .csv extension or is a spreadsheet.""" if filename.endswith('.csv') or file_is_google_spreadsheet(filename): diff --git a/util/statvar_dcid_generator.py b/util/statvar_dcid_generator.py index 52a59d05fe..78c16a830d 100644 --- a/util/statvar_dcid_generator.py +++ b/util/statvar_dcid_generator.py @@ -14,20 +14,20 @@ """A utility to generate dcid for statistical variables.""" import copy -import re import os +import re import sys -#pylint: disable=wrong-import-position -#pylint: disable=import-error +# pylint: disable=wrong-import-position +# pylint: disable=import-error # Allows the following module imports to work when running as a script _SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.join(_SCRIPT_PATH, '.')) # For soc_codes_names from soc_codes_names import SOC_MAP -#pylint: enable=wrong-import-position -#pylint: enable=import-error +# pylint: enable=wrong-import-position +# pylint: enable=import-error # Global constants # Regex to match the quantity notations - [value quantity], [quantity value] @@ -48,10 +48,20 @@ r'(?P-|-?\d+(\.\d+)?)\]') # These are the default properties ignored during dcid generation -_DEFAULT_IGNORE_PROPS = ('unit', 'Node', 'memberOf', 'typeOf', - 'constraintProperties', 'name', 'description', - 'descriptionUrl', 'label', 'url', 'alternateName', - 'scalingFactor') +_DEFAULT_IGNORE_PROPS = ( + 'unit', + 'Node', + 'memberOf', + 'typeOf', + 'constraintProperties', + 'name', + 'description', + 'descriptionUrl', + 'label', + 'url', + 'alternateName', + 'scalingFactor', +) # Regex to match prefixes to be removed from constraints. The regex checks for # specific prefixes followed by an upper case letter or underscore. This helps @@ -109,7 +119,7 @@ '1026': 'LeisureHospitality', '1027': 'OtherServices', '1028': 'PublicAdministration', - '1029': 'Unclassified' + '1029': 'Unclassified', } # Regex to match NAICS Codes. These codes could be a single code or a range @@ -201,12 +211,12 @@ 'householderRelatedChildrenUnder18Years': { 'prepend': 'Householder', 'replace': 'Child', - 'replacement': 'RelatedChildren' + 'replacement': 'RelatedChildren', }, 'householderOwnChildrenUnder18Years': { 'prepend': 'Householder', 'replace': 'Child', - 'replacement': 'OwnChildren' + 'replacement': 'OwnChildren', }, 'occupation': { 'append': 'Occupation' @@ -220,7 +230,7 @@ 'dateOfEntry': { 'prepend': 'DateOfEntry', 'replace': 'Date', - 'replacement': '' + 'replacement': '', }, 'placeOfBirth': { 'prepend': 'PlaceOfBirth' @@ -228,7 +238,7 @@ 'dateMovedIn': { 'prepend': 'MovedInDate', 'replace': 'Date', - 'replacement': '' + 'replacement': '', }, 'bachelorsDegreeMajor': { 'prepend': 'BachelorOf' @@ -265,17 +275,36 @@ }, 'mothersEducation': { 'prepend': 'Mother' - } + }, + 'importSource': { + 'prepend': 'ImportFrom', + }, + 'exportDestination': { + 'prepend': 'ExportTo', + }, + 'lendingEntity': { + 'prepend': 'Lender', + }, } # This is a list of boolean properties _BOOLEAN_PROPS = [ - 'hasComputer', 'hasFunctionalToilet', 'isAccessibleForFree', - 'isEnergyStored', 'isFDAReferenceStandard', 'isFamilyFriendly', - 'isGenomeRepresentationFull', 'isGift', 'isInternetUser', - 'isLiquefiedNaturalGasStored', 'isLiveBroadcast', 'isNaturalGasStored', - 'isPharmacodynamicRelationship', 'isPharmacokineticRelationship', - 'isRefSeqGenBankAssembliesIdentical', 'isHateCrime' + 'hasComputer', + 'hasFunctionalToilet', + 'isAccessibleForFree', + 'isEnergyStored', + 'isFDAReferenceStandard', + 'isFamilyFriendly', + 'isGenomeRepresentationFull', + 'isGift', + 'isInternetUser', + 'isLiquefiedNaturalGasStored', + 'isLiveBroadcast', + 'isNaturalGasStored', + 'isPharmacodynamicRelationship', + 'isPharmacokineticRelationship', + 'isRefSeqGenBankAssembliesIdentical', + 'isHateCrime', ] # To map stat vars which do not follow the conventions of stat var dcid naming @@ -283,29 +312,30 @@ # the replacement dcid. _LEGACY_MAP = { 'Count_Person_WithDisability_NoHealthInsurance': - 'Count_Person_NoHealthInsurance_WithDisability', + ('Count_Person_NoHealthInsurance_WithDisability'), 'Count_Person_NoDisability_NoHealthInsurance': - 'Count_Person_NoHealthInsurance_NoDisability' + ('Count_Person_NoHealthInsurance_NoDisability'), } def _capitalize_process(word: str) -> str: """Capitalizes, removes namespaces, measurement constraint prefixes and - underscores from a word. - Manual upper casing is preferred compared to the builtin function - str.capitalize() because we want to change only the case of the first - character and ignore the case of other characters. Firstly, all namespaces - are removed from the string. Then, constraint prefixes and underscores - are removed. Lastly, the first character is upper cased. + underscores from a word. + + Manual upper casing is preferred compared to the builtin function + str.capitalize() because we want to change only the case of the first + character and ignore the case of other characters. Firstly, all namespaces + are removed from the string. Then, constraint prefixes and underscores + are removed. Lastly, the first character is upper cased. - Args: - word: A string literal to capitalize and process. + Args: + word: A string literal to capitalize and process. - Returns: - Returns a string that can be used in dcid generation. - Returns None if the string is empty. - """ + Returns: + Returns a string that can be used in dcid generation. + Returns None if the string is empty. + """ if word: # Removing namespaces word = word[word.find(':') + 1:] @@ -319,6 +349,15 @@ def _capitalize_process(word: str) -> str: # Removing all underscores word = word.replace('_', '') + # Remove '/' or replace with '-' when used as number separator + words = [] + for tok in word.split('/'): + if tok: + if tok[0].isdigit() and len( + words) > 0 and words[-1][-1].isdigit(): + words.append('-') + words.append(tok[0].upper() + tok[1:]), + word = ''.join(words) # Upper casing the first character word = word[0].upper() + word[1:] @@ -329,19 +368,15 @@ def _capitalize_process(word: str) -> str: def _generate_quantity_range_name(match_dict: dict) -> str: """Generate a name for a quantity range. - Args: - match_dict: A dictionary containing quantity range regex groups. - Expected syntax of match_dict is - { - 'lower_limit': , - 'upper_limit': , - 'quantity': - } - - Returns: - A string representing the quantity range name to be used in the dcid. - Returns None if any of the expected keys are not in the dictionary. - """ + Args: + match_dict: A dictionary containing quantity range regex groups. Expected + syntax of match_dict is { 'lower_limit': , 'upper_limit': + , 'quantity': } + + Returns: + A string representing the quantity range name to be used in the dcid. + Returns None if any of the expected keys are not in the dictionary. + """ try: lower_limit = match_dict['lower_limit'] upper_limit = match_dict['upper_limit'] @@ -369,19 +404,20 @@ def _generate_quantity_range_name(match_dict: dict) -> str: def _naics_code_to_name(naics_val: str) -> str: """Converts NAICS codes to their industry using the _NAICS_MAP. - Args: - naics_val: A NAICS string literal to process. - Expected syntax of naics_val - NAICS/{codes} - '-' can be used to denote range of codes that may or may not belong - to the same industry. For eg, 44-45 will be mapped to 'RetailTrade'. - '_' can be used to represent multiple industries. For eg, 51_52 will - be mapped to 'InformationFinanceInsurance'. A combination of '-' and - '_' is acceptable. - Returns: - A string with all NAICS codes changed to their respective industry. - This string can be used in dcid generation. Returns None if the string - is empty or if the string does not follow the expected syntax. - """ + + Args: + naics_val: A NAICS string literal to process. Expected syntax of naics_val + - NAICS/{codes} '-' can be used to denote range of codes that may or may + not belong to the same industry. For eg, 44-45 will be mapped to + 'RetailTrade'. '_' can be used to represent multiple industries. For eg, + 51_52 will be mapped to 'InformationFinanceInsurance'. A combination of + '-' and '_' is acceptable. + + Returns: + A string with all NAICS codes changed to their respective industry. + This string can be used in dcid generation. Returns None if the string + is empty or if the string does not follow the expected syntax. + """ # Helper function to process NAICS ranges def _process_naics_range(range_str: str) -> str: @@ -419,7 +455,9 @@ def _process_naics_range(range_str: str) -> str: if match_str.find('-') != -1: # Range industry_str = _process_naics_range(match_str) else: - industry_str = _NAICS_MAP[match_str] + industry_str = _NAICS_MAP.get(match_str) + if not industry_str: + return None processed_str = processed_str + industry_str return processed_str return None @@ -427,16 +465,18 @@ def _process_naics_range(range_str: str) -> str: def _soc_code_to_name(soc_val: str) -> str: """Converts SOCv2018 codes to their industry using the SOC_MAP from - soc_codes_names.py - - Args: - soc_val: A SOCv2018 string literal to process. - Expected syntax of soc_val - SOCv2018/{code} - Returns: - A string with SOC code changed to it's occupation. - This string can be used in dcid generation. Returns the original string - if the code is not in the SOC_MAP. Returns None if the string is empty. - """ + + soc_codes_names.py + + Args: + soc_val: A SOCv2018 string literal to process. Expected syntax of soc_val + - SOCv2018/{code} + + Returns: + A string with SOC code changed to it's occupation. + This string can be used in dcid generation. Returns the original string + if the code is not in the SOC_MAP. Returns None if the string is empty. + """ if soc_val: processed_str = soc_val @@ -458,20 +498,22 @@ def _prepend_append_replace(word, replace='', replacement=''): """Prepends, appends and replaces text in a word. - Args: - word: A string literal to prepend, append or replace on. - prepend: A string literal to prepend to word. - append: A string literal to append to word. - replace: A string literal that repersents a substring in word to be - replaced. - replacement: A string literal. In word, all occurances of replace will - be changed to replacement. - Returns: - A string after appending, prepending and replacing to word. - """ + + Args: + word: A string literal to prepend, append or replace on. + prepend: A string literal to prepend to word. + append: A string literal to append to word. + replace: A string literal that repersents a substring in word to be + replaced. + replacement: A string literal. In word, all occurances of replace will be + changed to replacement. + + Returns: + A string after appending, prepending and replacing to word. + """ if replace: word = word.replace(replace, replacement) - if prepend: + if prepend and not word.lower().startswith(prepend.lower()): word = prepend + word if append: word = word + append @@ -481,18 +523,14 @@ def _prepend_append_replace(word, def _generate_quantity_name(match_dict: dict) -> str: """Generate a name for a quantity. - Args: - match_dict: A dictionary containing quantity regex groups. - Expected syntax of match_dict - { - 'value': , - 'quantity': - } - - Returns: - A string representing the quantity name to be used in the dcid. - Returns None if any of the expected keys are not in the dictionary. - """ + Args: + match_dict: A dictionary containing quantity regex groups. Expected syntax + of match_dict { 'value': , 'quantity': } + + Returns: + A string representing the quantity name to be used in the dcid. + Returns None if any of the expected keys are not in the dictionary. + """ try: value = match_dict['value'] quantity = match_dict['quantity'] @@ -505,37 +543,41 @@ def _generate_quantity_name(match_dict: dict) -> str: def _generate_boolean_value_name(prop: str, value: str) -> str: """Generates a name given a boolean property and value. - Args: - prop: A string literal representing the boolean property name. - value: A string literal representing the boolean property value. - Returns: - A string that can be used in dcid generation - """ + + Args: + prop: A string literal representing the boolean property name. + value: A string literal representing the boolean property value. + + Returns: + A string that can be used in dcid generation + """ if value in ('True', 'False'): - constraint_value = value == "True" + constraint_value = value == 'True' pop = None prefix = None - if prop.startswith("has"): + if prop.startswith('has'): pop = prop[3:] - prefix = "Has" if constraint_value else "No" - elif prop.startswith("is"): + prefix = 'Has' if constraint_value else 'No' + elif prop.startswith('is'): pop = prop[2:] - prefix = "Is" if constraint_value else "Not" + prefix = 'Is' if constraint_value else 'Not' else: - assert False, f"Unhandled prefix {prop}" + assert False, f'Unhandled prefix {prop}' return prefix + pop return None def _process_constraint_property(prop: str, value: str) -> str: """Processes constraint property, value and returns a name that can be used - in dcid generation. - Args: - prop: A string literal representing the constraint property name. - value: A string literal representing the constraint property value. - Returns: - A string that can be used in dcid generation. - """ + + in dcid generation. + Args: + prop: A string literal representing the constraint property name. + value: A string literal representing the constraint property value. + + Returns: + A string that can be used in dcid generation. + """ if 'NAICS' in value: name = _naics_code_to_name(value) elif 'SOCv2018/' in value: @@ -568,68 +610,66 @@ def _process_constraint_property(prop: str, value: str) -> str: def get_statvar_dcid(stat_var_dict: dict, ignore_props: list = None) -> str: """Generates the dcid given a statistical variable. - The generated dcid will follow the pattern - ____ - - 1. measurementQualifier is added as a prefix to the dcid. - 2. statType is included when it is not measuredValue. - 3. measurementDenominator is added as a suffix to the dcid. - 4. Constraints are sorted alphabetically based on the prop and values are - added to the dcid. - 5. Existing dcids may not follow the above conventions. The _LEGACY_MAP maps - generated dcids to their existing dcid. - 6. NAICS and SOC codes are replaced with their industry and occupation names - respectively. See _NAICS_MAP and util/soc_codes_names.py for the - mapping. - 7. Boolean constraints are replaced by their populations. For example, - p=isInternetUser and v=True/False becomes v=isInternetUser/ - notInternetUser. See _BOOLEAN_PROPS for the properties that are - considered for this renaming. - 8. Quantities and Quantity Ranges are changed into a name to be used in the - dcid. For example p=age and v=[10 20 Years] becomes v=10To20Years. - 9. Certain variables have text prepended or appended to their constraints to - improve readability. See _PREPEND_APPEND_REPLACE_MAP for more details. - - Args: - stat_var_dict: A dictionary with property: value of the statistical - variable as key-value pairs. - ignore_props: A list of properties to ignore from stat_var_dict when - generating the dcid. This list of ignore_props will be added to the - default set of properties that are ignored. The ignore_props can be - used to account for dependent properties to ignore when generating - the dcid. For example in the following statVar, - { - populationType: Person - measuredProperty: count - statType: measuredValue - healthInsurance: NoHealthInsurance - armedForceStatus: Civilian - institutionalization: USC_NonInstitutionalized - } - since the healthInsurance property indicates they are Civilian and - USC_NonInstitutionalized, ignore_props can be the list - ['armedForceStatus', 'institutionalization']. During the dcid - generation process, these properties will not be considered. - - Returns: - A string representing the dcid of the statistical variable. - - Caveats: - 1. Currently, there is no support for renaming ICD10 cause of death - values and DEA drug names. - 2. MeasuredProp=InsuredUnemploymentRate is not changed to - Rate_InsuredUnemployment. - 3. The generated dcids can get too long due to the large number of - constraint props. In such cases, manual generation or the - ignore_props arg can be used to exclude a few props from the - generation process. It is recommended to limit the length of - statvar dcids to 80 characters or less. - 4. This function does not differentiate between property names and only - uses the values to generate the dcid. Two props having the same - value, say p1=fuel, v1=Coal and p2=energy, v2=Coal will result in - the same dcid. The _PREPEND_APPEND_REPLACE_MAP can be modified to - disambiguate in this case. - """ + The generated dcid will follow the pattern + ____ + + 1. measurementQualifier is added as a prefix to the dcid. + 2. statType is included when it is not measuredValue. + 3. measurementDenominator is added as a suffix to the dcid. + 4. Constraints are sorted alphabetically based on the prop and values are + added to the dcid. + 5. Existing dcids may not follow the above conventions. The _LEGACY_MAP maps + generated dcids to their existing dcid. + 6. NAICS and SOC codes are replaced with their industry and occupation names + respectively. See _NAICS_MAP and util/soc_codes_names.py for the + mapping. + 7. Boolean constraints are replaced by their populations. For example, + p=isInternetUser and v=True/False becomes v=isInternetUser/ + notInternetUser. See _BOOLEAN_PROPS for the properties that are + considered for this renaming. + 8. Quantities and Quantity Ranges are changed into a name to be used in the + dcid. For example p=age and v=[10 20 Years] becomes v=10To20Years. + 9. Certain variables have text prepended or appended to their constraints to + improve readability. See _PREPEND_APPEND_REPLACE_MAP for more details. + + Args: + stat_var_dict: A dictionary with property: value of the statistical + variable as key-value pairs. + ignore_props: A list of properties to ignore from stat_var_dict when + generating the dcid. This list of ignore_props will be added to the + default set of properties that are ignored. The ignore_props can be used + to account for dependent properties to ignore when generating the dcid. + For example in the following statVar, { + populationType: Person + measuredProperty: count + statType: measuredValue + healthInsurance: NoHealthInsurance + armedForceStatus: Civilian + institutionalization: USC_NonInstitutionalized } since the + healthInsurance property indicates they are Civilian and + USC_NonInstitutionalized, ignore_props can be the list + ['armedForceStatus', 'institutionalization']. During the dcid + generation process, these properties will not be considered. + + Returns: + A string representing the dcid of the statistical variable. + + Caveats: + 1. Currently, there is no support for renaming ICD10 cause of death + values and DEA drug names. + 2. MeasuredProp=InsuredUnemploymentRate is not changed to + Rate_InsuredUnemployment. + 3. The generated dcids can get too long due to the large number of + constraint props. In such cases, manual generation or the + ignore_props arg can be used to exclude a few props from the + generation process. It is recommended to limit the length of + statvar dcids to 80 characters or less. + 4. This function does not differentiate between property names and only + uses the values to generate the dcid. Two props having the same + value, say p1=fuel, v1=Coal and p2=energy, v2=Coal will result in + the same dcid. The _PREPEND_APPEND_REPLACE_MAP can be modified to + disambiguate in this case. + """ # TODO: Renaming cause of death properties # TODO: Renaming DEA drug names @@ -693,7 +733,6 @@ def add_prop_to_list(prop: str, svd: dict, dcid_list: list): if denominator_suffix: dcid_list.append(denominator_suffix) - dcid = '_'.join(dcid_list) dcid = _LEGACY_MAP.get(dcid, dcid) return dcid