Skip to content

Commit

Permalink
Merge pull request #1 from ulbmuenster/logging
Browse files Browse the repository at this point in the history
Logging
  • Loading branch information
karkraeg authored Jun 24, 2024
2 parents 5f293ca + 3deefc4 commit 1a11523
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 83 deletions.
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ progress = "*"
elabftw-usersync = {editable = true, path = "."}
python-dotenv = "*"
click = "*"
loguru = "*"

[dev-packages]
pytest = "*"
Expand Down
59 changes: 31 additions & 28 deletions elabftw_usersync/elabftw.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import requests
from progress.bar import Bar

from elabftw_usersync.helper import UserSyncException, error_print
from elabftw_usersync.helper import UserSyncException
from elabftw_usersync.logger_config import logger


class ElabFTW:
Expand Down Expand Up @@ -46,10 +47,10 @@ def check_connection(self):

def get_all_users(self):
"""Get all users from ElabFTW as JSON."""
print("Getting all users from ElabFTW...")
logger.info("Getting all users from ElabFTW...")
resp = self.session.get(self.host_url + "/api/v2/users?includeArchived=1")
if resp.status_code != 200:
error_print("Error getting users: " + resp.text)
logger.error("Error getting users: " + resp.text)

return resp.json()

Expand All @@ -67,7 +68,7 @@ def create_users_dict(self):
for user_id in user_ids:
user_resp = self.session.get(self.host_url + f"/api/v2/users/{user_id}")
if user_resp.status_code != 200:
error_print("Error get user object: " + user_resp.text)
logger.error("Error get user object: " + user_resp.text)
else:
user_data_list.append(user_resp.json())
bar.next()
Expand Down Expand Up @@ -141,15 +142,15 @@ def create_user(
post_user_resp = self.session.post(self.host_url + "/api/v2/users", json=data)

if post_user_resp.status_code != 201:
error_print("Error creating user: " + post_user_resp.text)
logger.error("Error creating user: " + post_user_resp.text)
# break somehow?
# return None

try:
user_id = int(post_user_resp.headers["location"].rsplit("/", 1)[-1])
except KeyError:
user_id = post_user_resp.headers.get("location", None)
print(f"User {email} created with id {user_id}")
logger.info(f"User {email} created with id {user_id}")
# add the user to self.all_users
newly_created_user = {
"userid": user_id,
Expand Down Expand Up @@ -190,7 +191,7 @@ def toggle_user_archived(self, user_id: int):
)

if modify_user_resp.status_code != 200:
error_print("Error unarchiving user: " + modify_user_resp.text)
logger.error("Error unarchiving user: " + modify_user_resp.text)
return None
else:
return modify_user_resp.json()
Expand Down Expand Up @@ -232,13 +233,13 @@ def get_user_id_or_create(
uid, is_archived = self.get_user_id(uni_id)
if uid is not None:
if is_archived:
print(f"User {email} is archived. Unarchiving it.")
logger.info(f"User {email} is archived. Unarchiving it.")
if self.toggle_user_archived(uid) is not None:
unarchived = True
else:
unarchived = False
else:
print(f"User not found: {email}. Creating it.")
logger.info(f"User not found: {email}. Creating it.")
uid = self.create_user(email, firstname, lastname, uni_id, team_id)
unarchived = False

Expand All @@ -256,7 +257,7 @@ def add_user_to_team(self, user_id: int, team_id: int) -> dict:
resp = self.session.patch(self.host_url + f"/api/v2/users/{user_id}", json=data)

if resp.status_code != 200:
error_print("Error updating user: " + resp.text)
logger.error("Error updating user: " + resp.text)

return resp.json()

Expand All @@ -269,7 +270,7 @@ def get_all_teams(self) -> list:
resp = self.session.get(self.host_url + "/api/v2/teams")

if resp.status_code != 200:
error_print("Error getting teams: " + resp.text)
logger.error("Error getting teams: " + resp.text)

return resp.json()

Expand Down Expand Up @@ -301,7 +302,7 @@ def get_team_owners(self, team_id: int) -> int:
self.host_url + f"/api/v2/users/{user['user_id']}"
)
if user_resp.status_code != 200:
error_print("Error get user object: " + user_resp.text)
logger.error("Error get user object: " + user_resp.text)
user = user_resp.json()
for i, team in enumerate(user["teams"]):
if team["id"] == team_id:
Expand Down Expand Up @@ -331,7 +332,7 @@ def remove_user_as_teamowner(self, user_id: int, team_id: int) -> bool:
if team["id"] == team_id:
# looking at the team we are working with
if team["is_owner"] == 1:
print(
logger.info(
f"User {user_id} will not be owner of team {team_id} anymore."
)
# the user is owner: we need to remove that!
Expand All @@ -348,7 +349,7 @@ def remove_user_as_teamowner(self, user_id: int, team_id: int) -> bool:
)

if resp.status_code != 200:
error_print("Error setting owner of a team: " + resp.text)
logger.error("Error setting owner of a team: " + resp.text)

patchuser_make_user_instead_admin_payload = {
"action": "patchuser2team",
Expand All @@ -363,7 +364,7 @@ def remove_user_as_teamowner(self, user_id: int, team_id: int) -> bool:
)

if resp2.status_code != 200:
error_print("Error setting owner of a team: " + resp2.text)
logger.error("Error setting owner of a team: " + resp2.text)

def ensure_single_teamowner(self, new_owner_id: int, team_id: int):
"""Ensure that only one person at a time is the teamowner.
Expand All @@ -375,25 +376,27 @@ def ensure_single_teamowner(self, new_owner_id: int, team_id: int):
# get the teams current owner
team_owners = self.get_team_owners(team_id)
if len(team_owners) == 0:
print("Team had no owner, setting new one")
logger.info("Team had no owner, setting new one")
self.set_teamowner(new_owner_id, team_id)
elif len(team_owners) > 1:
print("Team has more than one owner, unsetting all")
logger.info("Team has more than one owner, unsetting all")
for user in team_owners:
self.remove_user_as_teamowner(user, team_id)
print("Setting new team owner")
logger.info("Setting new team owner")
self.set_teamowner(new_owner_id, team_id)
else:
# team_owner == 1
# check if the current owner differs from the new owner
if team_owners[0] != new_owner_id:
print("Change in ownership detected")
logger.info("Change in ownership detected")
# if yes, remove the current owner
self.remove_user_as_teamowner(team_owners[0], team_id)
# Set the new owner as owner
self.set_teamowner(new_owner_id, team_id)
else:
print("The new owner is the same as current owner. Doing nothing.")
logger.info(
"The new owner is the same as current owner. Doing nothing."
)

def set_teamowner(self, user_id: int, team_id: int) -> bool:
"""
Expand Down Expand Up @@ -438,7 +441,7 @@ def set_teamowner(self, user_id: int, team_id: int) -> bool:
)

if resp.status_code != 200:
error_print("Error setting owner of a team: " + resp.text)
logger.error("Error setting owner of a team: " + resp.text)
else:
for t in resp.json()["teams"]:
if t["id"] == team_id:
Expand All @@ -456,16 +459,16 @@ def set_teamowner(self, user_id: int, team_id: int) -> bool:
json=patchuser_make_admin_payload,
)
if resp.status_code != 200:
error_print(
logger.error(
"Error setting admin of a team: " + resp.text
)
else:
print(
logger.success(
f"Sucessfully set user {user_id} as admin of team {team_id}."
)
return True
else:
error_print(
logger.error(
f"Error while setting user with the id {user_id} as owner of the team {team_id}"
)

Expand Down Expand Up @@ -510,7 +513,7 @@ def remove_user_from_team(self, user_id: int, team_name: str) -> dict:
:team_id: The team id
:return: dict of the archived user
"""
print(f"Removing user {user_id} from team {team_name}")
logger.info(f"Removing user {user_id} from team {team_name}")
team_id = self.get_team_id(team_name)
# first check if the user is only assigned to one team. If yes, add the user to the team `userarchiv` and remove the user from the team
user_teams = self.get_teams_for_user(user_id)
Expand All @@ -529,9 +532,9 @@ def remove_user_from_team(self, user_id: int, team_name: str) -> dict:
)

if resp.status_code != 200:
error_print("Error removing user from team: " + resp.text)
logger.error("Error removing user from team: " + resp.text)
if self.toggle_user_archived(user_id) is not None:
print(f"User {user_id} was archived.")
logger.info(f"User {user_id} was archived.")

else:
# User is in more than one team. We can remove the user from the team without adding it to the team `userarchiv`.
Expand All @@ -546,6 +549,6 @@ def remove_user_from_team(self, user_id: int, team_name: str) -> dict:
)

if resp.status_code != 200:
error_print("Error removing user from team: " + resp.text)
logger.error("Error removing user from team: " + resp.text)

return resp.json()
60 changes: 34 additions & 26 deletions elabftw_usersync/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

from progress.bar import Bar

from elabftw_usersync.logger_config import logger


class UserSyncException(Exception):
"""This exception is raised when an error occurs during user synchronization."""
Expand All @@ -16,11 +18,6 @@ def __init__(self, msg):
self.msg = msg


def error_print(*args, **kwargs):
"""Provide a print function that prints to stderr."""
print(*args, file=sys.stderr, **kwargs)


def init_ldap():
"""Class to initialize the LDAP connection."""
ldap_host = os.getenv("LDAP_HOST")
Expand All @@ -31,22 +28,26 @@ def init_ldap():
ldap_search_user_attrs = os.getenv("LDAP_SEARCH_USER_ATTRS")

if ldap_host is None:
error_print("Environment variable LDAP_HOST is not set.")
sys.exit(1)
logger.critical("Environment variable LDAP_HOST is not set.")
if ldap_dn is None:
error_print("Environment variable LDAP_DN is not set.")
sys.exit(1)
logger.critical("Environment variable LDAP_DN is not set.")
if ldap_base_dn is None:
error_print("Environment variable LDAP_BASE_DN is not set.")
sys.exit(1)
logger.critical("Environment variable LDAP_BASE_DN is not set.")
if ldap_password is None:
error_print("Environment variable LDAP_PASSWORD is not set.")
sys.exit(1)
logger.critical("Environment variable LDAP_PASSWORD is not set.")
if ldap_search_group is None:
error_print("Environment variable LDAP_SEARCH_GROUP is not set.")
sys.exit(1)
logger.critical("Environment variable LDAP_SEARCH_GROUP is not set.")
if ldap_search_user_attrs is None:
error_print("Environment variable LDAP_SEARCH_USER_ATTRS is not set.")
logger.critical("Environment variable LDAP_SEARCH_USER_ATTRS is not set.")

if (
ldap_host is None
or ldap_dn is None
or ldap_base_dn is None
or ldap_password is None
or ldap_search_group is None
or ldap_search_user_attrs is None
):
sys.exit(1)

return (
Expand Down Expand Up @@ -80,10 +81,10 @@ def init_elabftw():
elabftw_apikey = os.getenv("ELABFTW_APIKEY")

if elabftw_host is None:
error_print("Environment variable ELABFTW_HOST is not set.")
logger.critical("Environment variable ELABFTW_HOST is not set.")
sys.exit(1)
if elabftw_apikey is None:
error_print("Environment variable ELABFTW_APIKEY is not set.")
logger.critical("Environment variable ELABFTW_APIKEY is not set.")
sys.exit(1)

return elabftw_host, elabftw_apikey
Expand All @@ -95,24 +96,31 @@ def read_whitelist() -> list:
:return: list of dicts of the groups and leaders
"""
whitelist_path = f"{get_whitelist_filename()}"
from pathlib import Path

whitelist_path = Path(Path.cwd(), get_whitelist_filename())
data = []
try:
with open(whitelist_path, "r") as file:
reader = csv.DictReader(file)
for row in reader:
data.append(dict(row))
except FileNotFoundError as e:
error_print(
f"File not found: Processing file on path {whitelist_path} raised exception\n{e}."
logger.critical(
f"File not found: Processing raised exception: {e}. Check your whitelist file."
)
sys.exit(1)
except csv.Error as e:
error_print(
f"CSV Error: Processing file on path {whitelist_path} raised exception\n{e}."
logger.critical(
f"CSV Error: Processing file on path {whitelist_path} raised exception: {e}."
)
except Exception as e:
error_print(
f"Error: Processing file on path {whitelist_path} raised exception\n{e}."
logger.critical(
f"Error: Processing file on path {whitelist_path} raised exception: {e}."
)
else:
logger.success(
f"Whitelist file {whitelist_path} with {len(data)} entries successfully read."
)
return data

Expand Down Expand Up @@ -161,7 +169,7 @@ def parse_leader_mail_from_ldap(parsed_users: list, leader_acc: str) -> str:
break

if not leader_mail:
error_print(
logger.error(
f"No leader mail address for ID {leader_acc} could be obtained from the LDAP server."
)

Expand Down
36 changes: 36 additions & 0 deletions elabftw_usersync/logger_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import sys

from loguru import logger

logger.remove()

logger.add(
sys.stdout,
level="INFO",
format="<yellow>{time} | {level}</yellow>: {message}",
filter=lambda record: record["level"].name == "INFO",
)

logger.add(
sys.stdout,
level="SUCCESS",
format="<green>{time} | {level}</green>: {message}",
filter=lambda record: record["level"].name == "SUCCESS",
)

logger.add(
sys.stdout,
level="ERROR",
format="<fg #FF8234>{time} | {level}</fg #FF8234>: <b>{message}</b>",
filter=lambda record: record["level"].name == "ERROR",
)

# Add handler for CRITICAL level logs to stderr
logger.add(
sys.stderr,
level="CRITICAL",
format="<red>{time} | {level}</red>: <b>{message}</b>",
filter=lambda record: record["level"].name == "CRITICAL",
)

__all__ = ["logger"]
Loading

0 comments on commit 1a11523

Please sign in to comment.