Skip to content

Commit

Permalink
refactor(jans-pycloudlib): simplify classes and methods for locking (#…
Browse files Browse the repository at this point in the history
…10376)

* refactor(jans-pycloudlib): simplify classes and methods for locking

Signed-off-by: iromli <[email protected]>

* refactor(jans-pycloudlib): remove dead code

* refactor(jans-pycloudlib): deprecate sync_sql_password in favor get_sql_password

---------

Signed-off-by: iromli <[email protected]>
Co-authored-by: Mohammad Abudayyeh <[email protected]>
  • Loading branch information
iromli and moabu authored Dec 11, 2024
1 parent a1517a0 commit 39e74a1
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 236 deletions.
204 changes: 54 additions & 150 deletions jans-pycloudlib/jans/pycloudlib/lock/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,8 @@
import threading
import time
import typing as _t
from abc import ABC
from abc import abstractmethod
from abc import abstractproperty
from datetime import datetime
from datetime import timedelta
from functools import cached_property

import backoff

Expand All @@ -26,6 +22,7 @@
# imported objects for function type hint, completion, etc.
# these won't be executed in runtime
from backoff.types import Details
from jans.pycloudlib.manager import Manager

logger = logging.getLogger(__name__)

Expand All @@ -46,8 +43,8 @@ def _on_connection_backoff(details: Details) -> None:
Args:
details: backoff Details type.
"""
mgr = details["args"][0].__class__.__name__
lock = details["args"][1]
lock = details["args"][0].__class__.__name__
adapter = details["args"][0].adapter.__class__.__name__
func = details["target"].__name__

exc_type, exc, _ = sys.exc_info()
Expand All @@ -57,7 +54,7 @@ def _on_connection_backoff(details: Details) -> None:
error = details.get("value", "Uncaught exception")

# emit warning
logger.warning(f"Backing off {mgr}.{func}({lock}) for {details['wait']:.1f} seconds; {error=}")
logger.warning(f"Backing off {lock}(adapter={adapter}).{func}() for {details['wait']:.1f} seconds; {error=}")


def _on_connection_giveup(details: Details) -> None:
Expand All @@ -66,8 +63,8 @@ def _on_connection_giveup(details: Details) -> None:
Args:
details: backoff Details type.
"""
mgr = details["args"][0].__class__.__name__
lock = details["args"][1]
lock = details["args"][0].__class__.__name__
adapter = details["args"][0].adapter.__class__.__name__
func = details["target"].__name__

exc_type, exc, _ = sys.exc_info()
Expand All @@ -77,87 +74,39 @@ def _on_connection_giveup(details: Details) -> None:
error = details.get("value", "Uncaught exception")

# emit warning
logger.warning(f"Giving up {mgr}.{func}({lock}) after {details['tries']} tries within {details['elapsed']:.1f} seconds; {error=}")
logger.warning(f"Giving up {lock}(adapter={adapter}).{func}() after {details['tries']} tries within {details['elapsed']:.1f} seconds; {error=}")


class LockManager:
@property
def lock_enabled(self):
return as_boolean(os.environ.get("CN_OCI_LOCK_ENABLED", "true"))

@backoff.on_exception(
backoff.constant,
Exception,
on_backoff=_on_connection_backoff,
on_giveup=_on_connection_giveup,
max_time=60,
jitter=None,
interval=10,
)
def check_connection(self, lock) -> None:
"""Check if connection is established.
Returns:
A boolean to indicate connection is established.
"""
if lock.adapter is None:
return

connected = lock.adapter.connected()
if not connected:
raise RuntimeError(f"Cannot establish connection using adapter {lock.adapter.__class__.__name__}.")

def create_lock(
self,
name: str,
owner: str = "",
ttl: int = 10,
retry_delay: float = 5.0,
max_start_delay: float = 0.0,
):
"""Create lock object.
Example:
```py
from jans.pycloudlib.lock import LockManager
class LockNotAcquired(RuntimeError):
"""Error class to indicate failure on acquiring a lock."""

# automally try to acquire the lock and then release it at the end of operation
with LockManager().create_lock("lock-1") as lock:
# do operation which requires coordination
```

Args:
name: Name of the lock.
owner: Owner of the lock.
ttl: Duration of lock (in seconds).
retry_delay: Delay before retrying to acquire lock (in seconds).
max_start_delay: Max. delay before starting to acquire lock.
class LockConnectionError(RuntimeError):
"""Error class to indicate failure on acquiring connection."""

Returns:
Instance of `jans.pycloudlib.lock.Lock`.
"""
# default to hostname as owner
owner = owner or socket.gethostname()

# lock implementation is set to _fake_ for backward-compatibility
lock_cls = FakeLockRecord
class LockRecord:
"""This class manage the locking process.
if self.lock_enabled:
lock_cls = LockRecord
Common example:
lock = lock_cls(name, owner, ttl, retry_delay, max_start_delay)
```py
from jans.pycloudlib import get_manager
# pre-flight connection checking
self.check_connection(lock)
return lock
manager = get_manager()
lock = LockRecord("lock-1", owner="container-1", ttl=30)
lock.init_adapter(manager)
class LockNotAcquired(RuntimeError):
"""Error class to indicate failure on acquiring a lock."""
# try to acquire the lock
if lock.acquire():
# do operation which requires coordination
# release the lock
lock.release()
```
"""

class BaseLockRecord(ABC):
def __init__(
self,
name: str,
Expand All @@ -184,6 +133,9 @@ def __init__(
# event object to stop renew lock
self._renew_stop_event = None

# lock adapter
self.adapter = None

def __enter__(self):
if not self.acquire():
raise LockNotAcquired(f"Lock {self.name} is not acquired within the specified time")
Expand All @@ -192,69 +144,14 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()

@abstractmethod
def acquire(self) -> bool:
pass

@abstractmethod
def release(self) -> None:
pass

@abstractproperty
def adapter(self) -> _t.Optional[LockAdapter]:
pass

def __repr__(self):
return f"<{self.__class__.__name__}(adapter={self.adapter.__class__.__name__})>"


class LockRecord(BaseLockRecord):
"""This class manage the locking process.
Common example:
```py
lock = Lock("lock-1", owner="container-1", ttl=30)
# try to acquire the lock
if lock.acquire():
# do operation which requires coordination
# release the lock
lock.release()
```
"""

@cached_property
def adapter(self) -> LockAdapter: # noqa: D412
"""Get an instance of lock adapter class.
Returns:
An instance of lock adapter class.
Raises:
ValueError: If the value of `CN_OCI_LOCK_ADAPTER` or `CN_PERSISTENCE_TYPE` environment variable is not supported.
Examples:
```py
os.environ["CN_OCI_LOCK_ADAPTER"] = "sql"
LockStorage().adapter # returns an instance of adapter class
```
The adapter name is pre-populated from `CN_OCI_LOCK_ADAPTER` environment variable.
Supported lock adapter name:
- `sql`: returns an instance of [SqlLock][jans.pycloudlib.lock.sql_lock.SqlLock]
"""
def init_adapter(self, manager: Manager):
_adapter = os.environ.get("CN_OCI_LOCK_ADAPTER") or PersistenceMapper().mapping["default"]

if _adapter == "sql":
return SqlLock()

# unsupported adapter
raise ValueError(f"Unsupported lock adapter {_adapter!r}")
self.adapter = SqlLock(manager)
else:
# unsupported adapter
raise ValueError(f"Unsupported lock adapter {_adapter!r}")

def _record_expired(self, record: dict[str, _t.Any]) -> bool:
"""Check if a record is expire.
Expand Down Expand Up @@ -306,7 +203,7 @@ def _get_record(self) -> dict[str, _t.Any]:
"""Get exising lock record.
Returns:
A `dict` contains of lock record (if any).
A `dict` contains lock record (if any).
"""
return self.adapter.get(self.name)

Expand Down Expand Up @@ -415,22 +312,29 @@ def release(self) -> None:
self._delete_record()
logger.info(f"Lock {self.name} is released by candidate {self.candidate['owner']}")

def __repr__(self):
return f"<{self.__class__.__name__}(adapter={self.adapter.__class__.__name__})>"

class FakeLockRecord(BaseLockRecord):
@property
def adapter(self):
return None

def acquire(self) -> bool:
logger.warning(f"Using {self.__class__.__name__}.acquire(...) to acquire lock {self.name}")
return True
@backoff.on_exception(
backoff.constant,
Exception,
on_backoff=_on_connection_backoff,
on_giveup=_on_connection_giveup,
max_time=60,
jitter=None,
interval=10,
)
def check_adapter_connection(self) -> None:
"""Check if connection to adapter is established.
def release(self) -> None:
logger.warning(f"Using {self.__class__.__name__}.release(...) to release lock {self.name}")
Returns:
A boolean to indicate connection is established.
"""
if not self.adapter.connected():
raise LockConnectionError(f"Cannot establish connection using adapter {self.adapter.__class__.__name__}.")


# avoid implicit reexport disabled error
__all__ = [
"LockManager",
"SqlLock",
"LockRecord",
]
Loading

0 comments on commit 39e74a1

Please sign in to comment.