diff --git a/mpcontribs-client/mpcontribs/client/__init__.py b/mpcontribs-client/mpcontribs/client/__init__.py index 4c26f0f94..43b488b54 100644 --- a/mpcontribs-client/mpcontribs/client/__init__.py +++ b/mpcontribs-client/mpcontribs/client/__init__.py @@ -11,6 +11,7 @@ import itertools import functools import requests +import logging from requests.exceptions import RequestException from bravado_core.param import Param @@ -101,6 +102,24 @@ ureg.define("electron_mass = 9.1093837015e-31 kg = mₑ = m_e") +class CustomLoggerAdapter(logging.LoggerAdapter): + def process(self, msg, kwargs): + prefix = self.extra.get('prefix') + return f"[{prefix}] {msg}" if prefix else msg, kwargs + + +def get_logger(name): + logger = logging.getLogger(name) + process = os.environ.get("SUPERVISOR_PROCESS_NAME") + group = os.environ.get("SUPERVISOR_GROUP_NAME") + cfg = {"prefix": f"{group}/{process}"} if process and group else {} + logger.setLevel("DEBUG" if os.environ.get("NODE_ENV") == "development" else "INFO") + return CustomLoggerAdapter(logger, cfg) + + +logger = get_logger(__name__) + + def get_md5(d): s = ujson.dumps(d, sort_keys=True).encode("utf-8") return md5(s).hexdigest() @@ -175,15 +194,15 @@ def _response_hook(resp, *args, **kwargs): resp.count = result["count"] if "warning" in result: - print("WARNING", result["warning"]) + logger.warning(result["warning"]) elif "error" in result and isinstance(result["error"], str): - print("ERROR", result["error"][:10000] + "...") + logger.error(result["error"][:10000] + "...") elif content_type == "application/gzip": resp.result = resp.content resp.count = 1 else: - print("ERROR", resp.status_code) + logger.error(resp.status_code) resp.count = 0 @@ -235,7 +254,7 @@ def display(self): try: return self.plot(**self.attrs) except Exception as e: - print(f"Can't display table: {e}") + logger.error(f"Can't display table: {e}") return self @@ -381,7 +400,7 @@ def _load(protocol, host, headers_json, project): if apispec.exists(): spec_dict = ujson.loads(apispec.read_bytes()) - print(f"Specs for {origin_url} re-loaded from {apispec}.") + logger.info(f"Specs for {origin_url} re-loaded from {apispec}.") else: try: if requests.options(f"{url}/healthcheck").status_code == 200: @@ -391,13 +410,13 @@ def _load(protocol, host, headers_json, project): with apispec.open("w") as f: ujson.dump(spec_dict, f) - print(f"Specs for {origin_url} saved as {apispec}.") + logger.info(f"Specs for {origin_url} saved as {apispec}.") else: spec_dict = EMPTY_SPEC_DICT - print(f"Specs not loaded: Healthcheck for {url} failed!") + logger.error(f"Specs not loaded: Healthcheck for {url} failed!") except RequestException: spec_dict = EMPTY_SPEC_DICT - print(f"Specs not loaded: Could not connect to {url}!") + logger.error(f"Specs not loaded: Could not connect to {url}!") spec_dict["host"] = host spec_dict["schemes"] = [protocol] @@ -508,7 +527,7 @@ def __init__( if apikey and headers is not None: apikey = None - print("headers set => ignoring apikey!") + logger.info("headers set => ignoring apikey!") self.apikey = apikey self.headers = headers or {} @@ -898,7 +917,7 @@ def delete_contributions(self, query: dict = None, timeout: int = -1): timeout (int): cancel remaining requests if timeout exceeded (in seconds) """ if not self.project and (not query or "project" not in query): - print("initialize client with project, or include project in query!") + logger.error("initialize client with project, or include project in query!") return tic = time.perf_counter() @@ -910,7 +929,7 @@ def delete_contributions(self, query: dict = None, timeout: int = -1): cids = list(self.get_all_ids(query).get(query["project"], {}).get("ids", set())) if not cids: - print(f"There aren't any contributions to delete for {query['project']}") + logger.info(f"There aren't any contributions to delete for {query['project']}") return total = len(cids) @@ -925,10 +944,10 @@ def delete_contributions(self, query: dict = None, timeout: int = -1): self._reinit() toc = time.perf_counter() dt = (toc - tic) / 60 - print(f"It took {dt:.1f}min to delete {deleted} contributions.") + logger.info(f"It took {dt:.1f}min to delete {deleted} contributions.") if left: - print(f"There were errors and {left} contributions are left to delete!") + logger.error(f"There were errors and {left} contributions are left to delete!") def get_totals( self, @@ -950,7 +969,7 @@ def get_totals( """ ops = {"get", "create", "update", "delete", "download"} if op not in ops: - print(f"`op` has to be one of {ops}") + logger.error(f"`op` has to be one of {ops}") return query = query or {} @@ -1055,17 +1074,17 @@ def get_all_ids( include = include or [] components = set(x for x in include if x in COMPONENTS) if include and not components: - print(f"`include` must be subset of {COMPONENTS}!") + logger.error(f"`include` must be subset of {COMPONENTS}!") return fmts = {"sets", "map"} if fmt not in fmts: - print(f"`fmt` must be subset of {fmts}!") + logger.error(f"`fmt` must be subset of {fmts}!") return ops = {"get", "create", "update", "delete", "download"} if op not in ops: - print(f"`op` has to be one of {ops}") + logger.error(f"`op` has to be one of {ops}") return unique_identifiers = self.get_unique_identifiers_flags() @@ -1249,7 +1268,7 @@ def update_contributions( cids = list(self.get_all_ids(query).get(self.project, {}).get("ids", set())) if not cids: - print(f"There aren't any contributions to update for {self.project}") + logger.info(f"There aren't any contributions to update for {self.project}") return # get current list of data columns to decide if swagger reload is needed @@ -1398,7 +1417,7 @@ def submit_contributions( skip_dupe_check (bool): skip duplicate check for contribution identifiers """ if not contributions or not isinstance(contributions, list): - print("Please provide list of contributions to submit.") + logger.error("Please provide list of contributions to submit.") return # get existing contributions @@ -1620,7 +1639,7 @@ def put_future(pk, payload): if "id" in c: pk = c.pop("id") if not c: - print( + logger.error( f"SKIPPED update of {project_name}/{pk}: empty." ) @@ -1628,7 +1647,7 @@ def put_future(pk, payload): if len(payload) < MAX_PAYLOAD: futures.append(put_future(pk, payload)) else: - print( + logger.error( f"SKIPPED update of {project_name}/{pk}: too large." ) else: @@ -1639,7 +1658,7 @@ def put_future(pk, payload): if len(payload) < MAX_PAYLOAD: futures.append(post_future(idx, payload)) else: - print( + logger.error( f"SKIPPED {project_name}/{idx}: too large, reduce per_request" ) @@ -1670,9 +1689,9 @@ def put_future(pk, payload): contribs[project_name] = [] # abort retrying if processed != ncontribs and retry: if retries >= RETRIES: - print(f"{project_name}: Tried {RETRIES} times - abort.") + logger.error(f"{project_name}: Tried {RETRIES} times - abort.") elif not unique_identifiers.get(project_name): - print( + logger.info( f"{project_name}: resubmit failed contributions manually" ) @@ -1682,9 +1701,9 @@ def put_future(pk, payload): self._reinit() self.session.close() self.session = get_session() - print(f"It took {dt:.1f}min to submit {total_processed}/{total} contributions.") + logger.info(f"It took {dt:.1f}min to submit {total_processed}/{total} contributions.") else: - print("Nothing to submit.") + logger.info("Nothing to submit.") def download_contributions( self, @@ -1713,7 +1732,7 @@ def download_contributions( outdir.mkdir(parents=True, exist_ok=True) components = set(x for x in include if x in COMPONENTS) if include and not components: - print(f"`include` must be subset of {COMPONENTS}!") + logger.error(f"`include` must be subset of {COMPONENTS}!") return all_ids = self.get_all_ids(query, include=components, timeout=timeout) @@ -1736,9 +1755,11 @@ def download_contributions( if paths: npaths = len(paths) ndownloads += npaths - print(f"Downloaded {len(cids)} contributions for '{name}' in {npaths} file(s).") + logger.info( + f"Downloaded {len(cids)} contributions for '{name}' in {npaths} file(s)." + ) else: - print(f"No new contributions to download for '{name}'.") + logger.info(f"No new contributions to download for '{name}'.") for component in components: if timeout > 0: @@ -1756,9 +1777,11 @@ def download_contributions( if paths: npaths = len(paths) ndownloads += npaths - print(f"Downloaded {len(ids)} {component} for '{name}' in {npaths} file(s).") + logger.info( + f"Downloaded {len(ids)} {component} for '{name}' in {npaths} file(s)." + ) else: - print(f"No new {component} to download for '{name}'.") + logger.info(f"No new {component} to download for '{name}'.") return ndownloads @@ -1861,12 +1884,12 @@ def _download_resource( """ resources = ["contributions"] + COMPONENTS if resource not in resources: - print(f"`resource` must be one of {resources}!") + logger.error(f"`resource` must be one of {resources}!") return formats = {"json", "csv"} if fmt not in formats: - print(f"`fmt` must be one of {formats}!") + logger.error(f"`fmt` must be one of {formats}!") return oids = sorted(i for i in ids if ObjectId.is_valid(i))