From 39e74a153edf01db8ab43be81f5585bc4210818d Mon Sep 17 00:00:00 2001 From: Isman Firmansyah Date: Wed, 11 Dec 2024 19:24:12 +0700 Subject: [PATCH] refactor(jans-pycloudlib): simplify classes and methods for locking (#10376) * refactor(jans-pycloudlib): simplify classes and methods for locking Signed-off-by: iromli * refactor(jans-pycloudlib): remove dead code * refactor(jans-pycloudlib): deprecate sync_sql_password in favor get_sql_password --------- Signed-off-by: iromli Co-authored-by: Mohammad Abudayyeh <47318409+moabu@users.noreply.github.com> --- .../jans/pycloudlib/lock/__init__.py | 204 +++++------------- .../jans/pycloudlib/lock/sql_lock.py | 85 ++------ jans-pycloudlib/jans/pycloudlib/manager.py | 43 +++- .../jans/pycloudlib/persistence/sql.py | 21 +- 4 files changed, 117 insertions(+), 236 deletions(-) diff --git a/jans-pycloudlib/jans/pycloudlib/lock/__init__.py b/jans-pycloudlib/jans/pycloudlib/lock/__init__.py index a7db198f64b..63f2e4f4bd8 100644 --- a/jans-pycloudlib/jans/pycloudlib/lock/__init__.py +++ b/jans-pycloudlib/jans/pycloudlib/lock/__init__.py @@ -9,12 +9,8 @@ import threading import time import typing as _t -from abc import ABC -from abc import abstractmethod -from abc import abstractproperty from datetime import datetime from datetime import timedelta -from functools import cached_property import backoff @@ -26,6 +22,7 @@ # imported objects for function type hint, completion, etc. # these won't be executed in runtime from backoff.types import Details + from jans.pycloudlib.manager import Manager logger = logging.getLogger(__name__) @@ -46,8 +43,8 @@ def _on_connection_backoff(details: Details) -> None: Args: details: backoff Details type. """ - mgr = details["args"][0].__class__.__name__ - lock = details["args"][1] + lock = details["args"][0].__class__.__name__ + adapter = details["args"][0].adapter.__class__.__name__ func = details["target"].__name__ exc_type, exc, _ = sys.exc_info() @@ -57,7 +54,7 @@ def _on_connection_backoff(details: Details) -> None: error = details.get("value", "Uncaught exception") # emit warning - logger.warning(f"Backing off {mgr}.{func}({lock}) for {details['wait']:.1f} seconds; {error=}") + logger.warning(f"Backing off {lock}(adapter={adapter}).{func}() for {details['wait']:.1f} seconds; {error=}") def _on_connection_giveup(details: Details) -> None: @@ -66,8 +63,8 @@ def _on_connection_giveup(details: Details) -> None: Args: details: backoff Details type. """ - mgr = details["args"][0].__class__.__name__ - lock = details["args"][1] + lock = details["args"][0].__class__.__name__ + adapter = details["args"][0].adapter.__class__.__name__ func = details["target"].__name__ exc_type, exc, _ = sys.exc_info() @@ -77,87 +74,39 @@ def _on_connection_giveup(details: Details) -> None: error = details.get("value", "Uncaught exception") # emit warning - logger.warning(f"Giving up {mgr}.{func}({lock}) after {details['tries']} tries within {details['elapsed']:.1f} seconds; {error=}") + logger.warning(f"Giving up {lock}(adapter={adapter}).{func}() after {details['tries']} tries within {details['elapsed']:.1f} seconds; {error=}") -class LockManager: - @property - def lock_enabled(self): - return as_boolean(os.environ.get("CN_OCI_LOCK_ENABLED", "true")) - - @backoff.on_exception( - backoff.constant, - Exception, - on_backoff=_on_connection_backoff, - on_giveup=_on_connection_giveup, - max_time=60, - jitter=None, - interval=10, - ) - def check_connection(self, lock) -> None: - """Check if connection is established. - - Returns: - A boolean to indicate connection is established. - """ - if lock.adapter is None: - return - - connected = lock.adapter.connected() - if not connected: - raise RuntimeError(f"Cannot establish connection using adapter {lock.adapter.__class__.__name__}.") - - def create_lock( - self, - name: str, - owner: str = "", - ttl: int = 10, - retry_delay: float = 5.0, - max_start_delay: float = 0.0, - ): - """Create lock object. - - Example: - - ```py - from jans.pycloudlib.lock import LockManager +class LockNotAcquired(RuntimeError): + """Error class to indicate failure on acquiring a lock.""" - # automally try to acquire the lock and then release it at the end of operation - with LockManager().create_lock("lock-1") as lock: - # do operation which requires coordination - ``` - Args: - name: Name of the lock. - owner: Owner of the lock. - ttl: Duration of lock (in seconds). - retry_delay: Delay before retrying to acquire lock (in seconds). - max_start_delay: Max. delay before starting to acquire lock. +class LockConnectionError(RuntimeError): + """Error class to indicate failure on acquiring connection.""" - Returns: - Instance of `jans.pycloudlib.lock.Lock`. - """ - # default to hostname as owner - owner = owner or socket.gethostname() - # lock implementation is set to _fake_ for backward-compatibility - lock_cls = FakeLockRecord +class LockRecord: + """This class manage the locking process. - if self.lock_enabled: - lock_cls = LockRecord + Common example: - lock = lock_cls(name, owner, ttl, retry_delay, max_start_delay) + ```py + from jans.pycloudlib import get_manager - # pre-flight connection checking - self.check_connection(lock) - return lock + manager = get_manager() + lock = LockRecord("lock-1", owner="container-1", ttl=30) + lock.init_adapter(manager) -class LockNotAcquired(RuntimeError): - """Error class to indicate failure on acquiring a lock.""" + # try to acquire the lock + if lock.acquire(): + # do operation which requires coordination + # release the lock + lock.release() + ``` + """ -class BaseLockRecord(ABC): def __init__( self, name: str, @@ -184,6 +133,9 @@ def __init__( # event object to stop renew lock self._renew_stop_event = None + # lock adapter + self.adapter = None + def __enter__(self): if not self.acquire(): raise LockNotAcquired(f"Lock {self.name} is not acquired within the specified time") @@ -192,69 +144,14 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.release() - @abstractmethod - def acquire(self) -> bool: - pass - - @abstractmethod - def release(self) -> None: - pass - - @abstractproperty - def adapter(self) -> _t.Optional[LockAdapter]: - pass - - def __repr__(self): - return f"<{self.__class__.__name__}(adapter={self.adapter.__class__.__name__})>" - - -class LockRecord(BaseLockRecord): - """This class manage the locking process. - - Common example: - - ```py - lock = Lock("lock-1", owner="container-1", ttl=30) - - # try to acquire the lock - if lock.acquire(): - # do operation which requires coordination - - # release the lock - lock.release() - ``` - """ - - @cached_property - def adapter(self) -> LockAdapter: # noqa: D412 - """Get an instance of lock adapter class. - - Returns: - An instance of lock adapter class. - - Raises: - ValueError: If the value of `CN_OCI_LOCK_ADAPTER` or `CN_PERSISTENCE_TYPE` environment variable is not supported. - - Examples: - - ```py - os.environ["CN_OCI_LOCK_ADAPTER"] = "sql" - LockStorage().adapter # returns an instance of adapter class - ``` - - The adapter name is pre-populated from `CN_OCI_LOCK_ADAPTER` environment variable. - - Supported lock adapter name: - - - `sql`: returns an instance of [SqlLock][jans.pycloudlib.lock.sql_lock.SqlLock] - """ + def init_adapter(self, manager: Manager): _adapter = os.environ.get("CN_OCI_LOCK_ADAPTER") or PersistenceMapper().mapping["default"] if _adapter == "sql": - return SqlLock() - - # unsupported adapter - raise ValueError(f"Unsupported lock adapter {_adapter!r}") + self.adapter = SqlLock(manager) + else: + # unsupported adapter + raise ValueError(f"Unsupported lock adapter {_adapter!r}") def _record_expired(self, record: dict[str, _t.Any]) -> bool: """Check if a record is expire. @@ -306,7 +203,7 @@ def _get_record(self) -> dict[str, _t.Any]: """Get exising lock record. Returns: - A `dict` contains of lock record (if any). + A `dict` contains lock record (if any). """ return self.adapter.get(self.name) @@ -415,22 +312,29 @@ def release(self) -> None: self._delete_record() logger.info(f"Lock {self.name} is released by candidate {self.candidate['owner']}") + def __repr__(self): + return f"<{self.__class__.__name__}(adapter={self.adapter.__class__.__name__})>" -class FakeLockRecord(BaseLockRecord): - @property - def adapter(self): - return None - - def acquire(self) -> bool: - logger.warning(f"Using {self.__class__.__name__}.acquire(...) to acquire lock {self.name}") - return True + @backoff.on_exception( + backoff.constant, + Exception, + on_backoff=_on_connection_backoff, + on_giveup=_on_connection_giveup, + max_time=60, + jitter=None, + interval=10, + ) + def check_adapter_connection(self) -> None: + """Check if connection to adapter is established. - def release(self) -> None: - logger.warning(f"Using {self.__class__.__name__}.release(...) to release lock {self.name}") + Returns: + A boolean to indicate connection is established. + """ + if not self.adapter.connected(): + raise LockConnectionError(f"Cannot establish connection using adapter {self.adapter.__class__.__name__}.") # avoid implicit reexport disabled error __all__ = [ - "LockManager", - "SqlLock", + "LockRecord", ] diff --git a/jans-pycloudlib/jans/pycloudlib/lock/sql_lock.py b/jans-pycloudlib/jans/pycloudlib/lock/sql_lock.py index 400c267a259..46e21f6f469 100644 --- a/jans-pycloudlib/jans/pycloudlib/lock/sql_lock.py +++ b/jans-pycloudlib/jans/pycloudlib/lock/sql_lock.py @@ -2,97 +2,38 @@ import json import logging -import os import typing as _t -import warnings -from functools import cached_property -from sqlalchemy import create_engine -from sqlalchemy import MetaData from sqlalchemy import Table from sqlalchemy import Column from sqlalchemy import String from sqlalchemy import Text -from sqlalchemy.exc import SAWarning from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import DatabaseError from sqlalchemy.sql import select from jans.pycloudlib.lock.base_lock import BaseLock -from jans.pycloudlib.utils import get_password_from_file - -if _t.TYPE_CHECKING: # pragma: no cover - # imported objects for function type hint, completion, etc. - # these won't be executed in runtime - from sqlalchemy.engine import Engine - +from jans.pycloudlib.persistence.sql import SqlClient logger = logging.getLogger(__name__) class SqlLock(BaseLock): - def __init__(self) -> None: - self._metadata: _t.Optional[MetaData] = None - self._dialect = os.environ.get("CN_SQL_DB_DIALECT", "mysql") - - @cached_property - def engine(self) -> Engine: - """Lazy init of engine instance object.""" - return create_engine(self.engine_url, pool_pre_ping=True, hide_parameters=True) - - @property - def engine_url(self) -> str: - """Engine connection URL.""" - host = os.environ.get("CN_SQL_DB_HOST", "localhost") - - port = os.environ.get("CN_SQL_DB_PORT", 3306) - - database = os.environ.get("CN_SQL_DB_NAME", "jans") - - user = os.environ.get("CN_SQL_DB_USER", "jans") - - password_file = os.environ.get("CN_SQL_PASSWORD_FILE", "/etc/jans/conf/sql_password") - - password = get_password_from_file(password_file) - - if self._dialect in ("pgsql", "postgresql"): - connector = "postgresql+psycopg2" - else: - connector = "mysql+pymysql" - return f"{connector}://{user}:{password}@{host}:{port}/{database}" - - @property - def metadata(self) -> MetaData: - """Lazy init of metadata.""" - if not self._metadata: - with warnings.catch_warnings(): - # postgresql driver will show warnings about unsupported reflection - # on expression-based index, i.e. `lower(uid::text)`; but we don't - # want to clutter the logs with these warnings, hence we suppress the - # warnings - warnings.filterwarnings( - "ignore", - message="Skipped unsupported reflection of expression-based index", - category=SAWarning, - ) - - # do reflection on database table - self._metadata = MetaData(bind=self.engine) - self._metadata.reflect() - return self._metadata + def __init__(self, manager) -> None: + self.client = SqlClient(manager) def _prepare_table(self, table_name) -> None: try: # prepare table Table( table_name, - self.metadata, + self.client.metadata, Column("doc_id", String(128), primary_key=True), # handle type compatibility with current Janssen by using TEXT instead of JSON/JSONB Column("jansData", Text(), default="{}"), extend_existing=True, ) - self.metadata.create_all(self.engine) + self.client.metadata.create_all(self.client.engine) except DatabaseError as exc: raise_on_error = False @@ -111,10 +52,10 @@ def table(self): """Get table object.""" table_name = "jansOciLock" - _table = self.metadata.tables.get(table_name) + _table = self.client.metadata.tables.get(table_name) if _table is None: self._prepare_table(table_name) - _table = self.metadata.tables.get(table_name) + _table = self.client.metadata.tables.get(table_name) # underlying table object return _table # noqa: R504 @@ -130,7 +71,7 @@ def get(self, key: str) -> dict[str, _t.Any]: """ stmt = select([self.table]).where(self.table.c.doc_id == key).limit(1) - with self.engine.connect() as conn: + with self.client.engine.connect() as conn: result = conn.execute(stmt) entry = result.fetchone() @@ -156,7 +97,7 @@ def post(self, key: str, owner: str, ttl: float, updated_at: str) -> bool: jansData=json.dumps({"owner": owner, "ttl": ttl, "updated_at": updated_at}), ) - with self.engine.connect() as conn: + with self.client.engine.connect() as conn: try: result = conn.execute(stmt) created = bool(result.inserted_primary_key) @@ -180,7 +121,7 @@ def put(self, key: str, owner: str, ttl: float, updated_at: str) -> bool: jansData=json.dumps({"owner": owner, "ttl": ttl, "updated_at": updated_at}), ) - with self.engine.connect() as conn: + with self.client.engine.connect() as conn: result = conn.execute(stmt) return bool(result.rowcount) @@ -195,7 +136,7 @@ def delete(self, key: str) -> bool: """ stmt = self.table.delete().where(self.table.c.doc_id == key) - with self.engine.connect() as conn: + with self.client.engine.connect() as conn: result = conn.execute(stmt) return bool(result.rowcount) @@ -205,6 +146,4 @@ def connected(self) -> bool: Returns: A boolean to indicate connection is established. """ - with self.engine.connect() as conn: - result = conn.execute("SELECT 1 AS is_alive") - return bool(result.fetchone()[0] > 0) + return self.client.connected() diff --git a/jans-pycloudlib/jans/pycloudlib/manager.py b/jans-pycloudlib/jans/pycloudlib/manager.py index c978eafd193..5c65990f3c8 100644 --- a/jans-pycloudlib/jans/pycloudlib/manager.py +++ b/jans-pycloudlib/jans/pycloudlib/manager.py @@ -2,10 +2,12 @@ import logging import os +import socket import typing as _t from abc import ABC from abc import abstractproperty from functools import cached_property +from types import SimpleNamespace from jans.pycloudlib.config import ConsulConfig from jans.pycloudlib.config import KubernetesConfig @@ -19,7 +21,7 @@ from jans.pycloudlib.secret import FileSecret from jans.pycloudlib.utils import decode_text from jans.pycloudlib.utils import encode_text -from jans.pycloudlib.lock import LockManager +from jans.pycloudlib.lock import LockRecord logger = logging.getLogger(__name__) @@ -374,7 +376,6 @@ class Manager: def __init__(self): self.config = ConfigManager() self.secret = SecretManager() - self.lock = LockManager() def _bootstrap_assets(self, adapter): assets = self._bootstrap_asset_mappings.get(adapter) or [] @@ -395,6 +396,44 @@ def bootstrap(self): for adapter_name in [self.config.remote_adapter_name, self.secret.remote_adapter_name]: self._bootstrap_assets(adapter_name) + def create_lock( + self, + name: str, + owner: str = "", + ttl: int = 10, + retry_delay: float = 5.0, + max_start_delay: float = 0.0, + ): + """Create lock object. + + Args: + name: Name of the lock. + owner: Owner of the lock. + ttl: Duration of lock (in seconds). + retry_delay: Delay before retrying to acquire lock (in seconds). + max_start_delay: Max. delay before starting to acquire lock. + + Returns: + Instance of `jans.pycloudlib.lock.LockRecord`. + """ + # default to hostname as owner + owner = owner or socket.gethostname() + + lock = LockRecord(name, owner, ttl, retry_delay, max_start_delay) + lock.init_adapter(self) + + # pre-flight connection checking + lock.check_adapter_connection() + return lock + + @property + def lock(self): # pragma: no cover + # backward-compat for .lock attribute + ns = SimpleNamespace() + ns.create_lock = self.create_lock + logger.warning(f"Accessing {self.__class__.__name__}.lock attribute is deprecated") + return ns + def get_manager() -> Manager: # noqa: D412 """Create an instance of [Manager][jans.pycloudlib.manager.Manager] class. diff --git a/jans-pycloudlib/jans/pycloudlib/persistence/sql.py b/jans-pycloudlib/jans/pycloudlib/persistence/sql.py index c05c03aceae..df6981a823d 100644 --- a/jans-pycloudlib/jans/pycloudlib/persistence/sql.py +++ b/jans-pycloudlib/jans/pycloudlib/persistence/sql.py @@ -647,21 +647,20 @@ def sync_sql_password(manager: Manager) -> None: Args: manager: An instance of manager class. """ - password_file = get_sql_password_file() - - # previous version may not have sql_password secret hence we're pre-populating - # the value from mounted password file (if any) - if os.path.isfile(password_file): - manager.secret.set("sql_password", get_password_from_file(password_file)) - - # make sure password file always exists - if not os.path.isfile(password_file): - manager.secret.to_file("sql_password", password_file) + logger.warning( + f"Accessing jans.pycloudlib.persistence.sql.sync_sql_password is deprecated; " + f"Use jans.pycloudlib.persistence.sql.get_sql_password instead" + ) def get_sql_password(manager: Manager | None = None): password_file = get_sql_password_file() - return get_password_from_file(password_file) + if os.path.isfile(password_file): + return get_password_from_file(password_file) + + # safer method to get credential + return manager.secret.get("sql_password") + def preconfigure_simple_json(dbapi_connection, connection_record):