Skip to content

Commit

Permalink
client: use logger and custom adapter instead of print (#938)
Browse files Browse the repository at this point in the history
  • Loading branch information
tschaume authored Feb 10, 2022
1 parent e44405a commit ff5713d
Showing 1 changed file with 56 additions and 33 deletions.
89 changes: 56 additions & 33 deletions mpcontribs-client/mpcontribs/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import itertools
import functools
import requests
import logging

from requests.exceptions import RequestException
from bravado_core.param import Param
Expand Down Expand Up @@ -101,6 +102,24 @@
ureg.define("electron_mass = 9.1093837015e-31 kg = mₑ = m_e")


class CustomLoggerAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
prefix = self.extra.get('prefix')
return f"[{prefix}] {msg}" if prefix else msg, kwargs


def get_logger(name):
logger = logging.getLogger(name)
process = os.environ.get("SUPERVISOR_PROCESS_NAME")
group = os.environ.get("SUPERVISOR_GROUP_NAME")
cfg = {"prefix": f"{group}/{process}"} if process and group else {}
logger.setLevel("DEBUG" if os.environ.get("NODE_ENV") == "development" else "INFO")
return CustomLoggerAdapter(logger, cfg)


logger = get_logger(__name__)


def get_md5(d):
s = ujson.dumps(d, sort_keys=True).encode("utf-8")
return md5(s).hexdigest()
Expand Down Expand Up @@ -175,15 +194,15 @@ def _response_hook(resp, *args, **kwargs):
resp.count = result["count"]

if "warning" in result:
print("WARNING", result["warning"])
logger.warning(result["warning"])
elif "error" in result and isinstance(result["error"], str):
print("ERROR", result["error"][:10000] + "...")
logger.error(result["error"][:10000] + "...")

elif content_type == "application/gzip":
resp.result = resp.content
resp.count = 1
else:
print("ERROR", resp.status_code)
logger.error(resp.status_code)
resp.count = 0


Expand Down Expand Up @@ -235,7 +254,7 @@ def display(self):
try:
return self.plot(**self.attrs)
except Exception as e:
print(f"Can't display table: {e}")
logger.error(f"Can't display table: {e}")

return self

Expand Down Expand Up @@ -381,7 +400,7 @@ def _load(protocol, host, headers_json, project):

if apispec.exists():
spec_dict = ujson.loads(apispec.read_bytes())
print(f"Specs for {origin_url} re-loaded from {apispec}.")
logger.info(f"Specs for {origin_url} re-loaded from {apispec}.")
else:
try:
if requests.options(f"{url}/healthcheck").status_code == 200:
Expand All @@ -391,13 +410,13 @@ def _load(protocol, host, headers_json, project):
with apispec.open("w") as f:
ujson.dump(spec_dict, f)

print(f"Specs for {origin_url} saved as {apispec}.")
logger.info(f"Specs for {origin_url} saved as {apispec}.")
else:
spec_dict = EMPTY_SPEC_DICT
print(f"Specs not loaded: Healthcheck for {url} failed!")
logger.error(f"Specs not loaded: Healthcheck for {url} failed!")
except RequestException:
spec_dict = EMPTY_SPEC_DICT
print(f"Specs not loaded: Could not connect to {url}!")
logger.error(f"Specs not loaded: Could not connect to {url}!")

spec_dict["host"] = host
spec_dict["schemes"] = [protocol]
Expand Down Expand Up @@ -508,7 +527,7 @@ def __init__(

if apikey and headers is not None:
apikey = None
print("headers set => ignoring apikey!")
logger.info("headers set => ignoring apikey!")

self.apikey = apikey
self.headers = headers or {}
Expand Down Expand Up @@ -898,7 +917,7 @@ def delete_contributions(self, query: dict = None, timeout: int = -1):
timeout (int): cancel remaining requests if timeout exceeded (in seconds)
"""
if not self.project and (not query or "project" not in query):
print("initialize client with project, or include project in query!")
logger.error("initialize client with project, or include project in query!")
return

tic = time.perf_counter()
Expand All @@ -910,7 +929,7 @@ def delete_contributions(self, query: dict = None, timeout: int = -1):
cids = list(self.get_all_ids(query).get(query["project"], {}).get("ids", set()))

if not cids:
print(f"There aren't any contributions to delete for {query['project']}")
logger.info(f"There aren't any contributions to delete for {query['project']}")
return

total = len(cids)
Expand All @@ -925,10 +944,10 @@ def delete_contributions(self, query: dict = None, timeout: int = -1):
self._reinit()
toc = time.perf_counter()
dt = (toc - tic) / 60
print(f"It took {dt:.1f}min to delete {deleted} contributions.")
logger.info(f"It took {dt:.1f}min to delete {deleted} contributions.")

if left:
print(f"There were errors and {left} contributions are left to delete!")
logger.error(f"There were errors and {left} contributions are left to delete!")

def get_totals(
self,
Expand All @@ -950,7 +969,7 @@ def get_totals(
"""
ops = {"get", "create", "update", "delete", "download"}
if op not in ops:
print(f"`op` has to be one of {ops}")
logger.error(f"`op` has to be one of {ops}")
return

query = query or {}
Expand Down Expand Up @@ -1055,17 +1074,17 @@ def get_all_ids(
include = include or []
components = set(x for x in include if x in COMPONENTS)
if include and not components:
print(f"`include` must be subset of {COMPONENTS}!")
logger.error(f"`include` must be subset of {COMPONENTS}!")
return

fmts = {"sets", "map"}
if fmt not in fmts:
print(f"`fmt` must be subset of {fmts}!")
logger.error(f"`fmt` must be subset of {fmts}!")
return

ops = {"get", "create", "update", "delete", "download"}
if op not in ops:
print(f"`op` has to be one of {ops}")
logger.error(f"`op` has to be one of {ops}")
return

unique_identifiers = self.get_unique_identifiers_flags()
Expand Down Expand Up @@ -1249,7 +1268,7 @@ def update_contributions(
cids = list(self.get_all_ids(query).get(self.project, {}).get("ids", set()))

if not cids:
print(f"There aren't any contributions to update for {self.project}")
logger.info(f"There aren't any contributions to update for {self.project}")
return

# get current list of data columns to decide if swagger reload is needed
Expand Down Expand Up @@ -1398,7 +1417,7 @@ def submit_contributions(
skip_dupe_check (bool): skip duplicate check for contribution identifiers
"""
if not contributions or not isinstance(contributions, list):
print("Please provide list of contributions to submit.")
logger.error("Please provide list of contributions to submit.")
return

# get existing contributions
Expand Down Expand Up @@ -1620,15 +1639,15 @@ def put_future(pk, payload):
if "id" in c:
pk = c.pop("id")
if not c:
print(
logger.error(
f"SKIPPED update of {project_name}/{pk}: empty."
)

payload = ujson.dumps(c).encode("utf-8")
if len(payload) < MAX_PAYLOAD:
futures.append(put_future(pk, payload))
else:
print(
logger.error(
f"SKIPPED update of {project_name}/{pk}: too large."
)
else:
Expand All @@ -1639,7 +1658,7 @@ def put_future(pk, payload):
if len(payload) < MAX_PAYLOAD:
futures.append(post_future(idx, payload))
else:
print(
logger.error(
f"SKIPPED {project_name}/{idx}: too large, reduce per_request"
)

Expand Down Expand Up @@ -1670,9 +1689,9 @@ def put_future(pk, payload):
contribs[project_name] = [] # abort retrying
if processed != ncontribs and retry:
if retries >= RETRIES:
print(f"{project_name}: Tried {RETRIES} times - abort.")
logger.error(f"{project_name}: Tried {RETRIES} times - abort.")
elif not unique_identifiers.get(project_name):
print(
logger.info(
f"{project_name}: resubmit failed contributions manually"
)

Expand All @@ -1682,9 +1701,9 @@ def put_future(pk, payload):
self._reinit()
self.session.close()
self.session = get_session()
print(f"It took {dt:.1f}min to submit {total_processed}/{total} contributions.")
logger.info(f"It took {dt:.1f}min to submit {total_processed}/{total} contributions.")
else:
print("Nothing to submit.")
logger.info("Nothing to submit.")

def download_contributions(
self,
Expand Down Expand Up @@ -1713,7 +1732,7 @@ def download_contributions(
outdir.mkdir(parents=True, exist_ok=True)
components = set(x for x in include if x in COMPONENTS)
if include and not components:
print(f"`include` must be subset of {COMPONENTS}!")
logger.error(f"`include` must be subset of {COMPONENTS}!")
return

all_ids = self.get_all_ids(query, include=components, timeout=timeout)
Expand All @@ -1736,9 +1755,11 @@ def download_contributions(
if paths:
npaths = len(paths)
ndownloads += npaths
print(f"Downloaded {len(cids)} contributions for '{name}' in {npaths} file(s).")
logger.info(
f"Downloaded {len(cids)} contributions for '{name}' in {npaths} file(s)."
)
else:
print(f"No new contributions to download for '{name}'.")
logger.info(f"No new contributions to download for '{name}'.")

for component in components:
if timeout > 0:
Expand All @@ -1756,9 +1777,11 @@ def download_contributions(
if paths:
npaths = len(paths)
ndownloads += npaths
print(f"Downloaded {len(ids)} {component} for '{name}' in {npaths} file(s).")
logger.info(
f"Downloaded {len(ids)} {component} for '{name}' in {npaths} file(s)."
)
else:
print(f"No new {component} to download for '{name}'.")
logger.info(f"No new {component} to download for '{name}'.")

return ndownloads

Expand Down Expand Up @@ -1861,12 +1884,12 @@ def _download_resource(
"""
resources = ["contributions"] + COMPONENTS
if resource not in resources:
print(f"`resource` must be one of {resources}!")
logger.error(f"`resource` must be one of {resources}!")
return

formats = {"json", "csv"}
if fmt not in formats:
print(f"`fmt` must be one of {formats}!")
logger.error(f"`fmt` must be one of {formats}!")
return

oids = sorted(i for i in ids if ObjectId.is_valid(i))
Expand Down

0 comments on commit ff5713d

Please sign in to comment.