Skip to content

Commit

Permalink
Retry one time on Etcd3 auth error (patroni#3026)
Browse files Browse the repository at this point in the history
But do it only in case if we didn't authenticate right before executing a request. Previously retries only happened when the caller was executed with `Retry.__call__()`, which is not the case for methods like `set_failover_value()` or `set_config_value()`. Also, it seems that existing watchers aren't affected, therefore we will not restart them after reauthentication.

In addition to that fix issues with `Retry.ensure_deadline(0)`:
1. the return value was ignored
2. we don't have to set `Retry.deadline` attr, it is not used anywhere

Close patroni#3023
  • Loading branch information
CyberDem0n authored Mar 7, 2024
1 parent fd3e3ca commit a8cfd46
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 43 deletions.
9 changes: 6 additions & 3 deletions patroni/dcs/consul.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,14 +577,17 @@ def _do_attempt_to_acquire_leader(self, retry: Retry) -> bool:
try:
return retry(self._client.kv.put, self.leader_path, self._name, acquire=self._session)
except InvalidSession:
logger.error('Our session disappeared from Consul. Will try to get a new one and retry attempt')
self._session = None
retry.ensure_deadline(0)

if not retry.ensure_deadline(0):
logger.error('Our session disappeared from Consul. Deadline exceeded, giving up')
return False

logger.error('Our session disappeared from Consul. Will try to get a new one and retry attempt')

retry(self._do_refresh_session)

retry.ensure_deadline(1, ConsulError('_do_attempt_to_acquire_leader timeout'))

return retry(self._client.kv.put, self.leader_path, self._name, acquire=self._session)

@catch_return_false_exception
Expand Down
45 changes: 17 additions & 28 deletions patroni/dcs/etcd3.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,6 @@ def build_range_request(key: str, range_end: Union[bytes, str, None] = None) ->
return fields


class ReAuthenticateMode(IntEnum):
NOT_REQUIRED = 0
REQUIRED = 1
WITHOUT_WATCHER_RESTART = 2


def _handle_auth_errors(func: Callable[..., Any]) -> Any:
def wrapper(self: 'Etcd3Client', *args: Any, **kwargs: Any) -> Any:
return self.handle_auth_errors(func, *args, **kwargs)
Expand All @@ -215,7 +209,7 @@ class Etcd3Client(AbstractEtcdClientWithFailover):
ERROR_CLS = Etcd3Error

def __init__(self, config: Dict[str, Any], dns_resolver: DnsCachingResolver, cache_ttl: int = 300) -> None:
self._reauthenticate_reason = ReAuthenticateMode.NOT_REQUIRED
self._reauthenticate = False
self._token = None
self._cluster_version: Tuple[int, ...] = tuple()
super(Etcd3Client, self).__init__({**config, 'version_prefix': '/v3beta'}, dns_resolver, cache_ttl)
Expand Down Expand Up @@ -294,7 +288,7 @@ def call_rpc(self, method: str, fields: Dict[str, Any], retry: Optional[Retry] =
fields['retry'] = retry
return self.api_execute(self.version_prefix + method, self._MPOST, fields)

def authenticate(self, *, restart_watcher: bool = True, retry: Optional[Retry] = None) -> bool:
def authenticate(self, *, retry: Optional[Retry] = None) -> bool:
if self._use_proxies and not self._cluster_version:
kwargs = self._prepare_common_parameters(1)
self._ensure_version_prefix(self._base_uri, **kwargs)
Expand All @@ -316,20 +310,18 @@ def authenticate(self, *, restart_watcher: bool = True, retry: Optional[Retry] =

def handle_auth_errors(self: 'Etcd3Client', func: Callable[..., Any], *args: Any,
retry: Optional[Retry] = None, **kwargs: Any) -> Any:
reauthenticated = False
exc = None
while True:
if self._reauthenticate_reason:
if self._reauthenticate:
if self.username and self.password:
self.authenticate(
restart_watcher=self._reauthenticate_reason != ReAuthenticateMode.WITHOUT_WATCHER_RESTART,
retry=retry)
self._reauthenticate_reason = ReAuthenticateMode.NOT_REQUIRED
if retry:
retry.ensure_deadline(0)
self.authenticate(retry=retry)
self._reauthenticate = False
else:
msg = 'Username or password not set, authentication is not possible'
logger.fatal(msg)
raise exc or Etcd3Exception(msg)
reauthenticated = True

try:
return func(self, *args, retry=retry, **kwargs)
Expand All @@ -347,11 +339,12 @@ def handle_auth_errors(self: 'Etcd3Client', func: Callable[..., Any], *args: Any
except AuthOldRevision as e:
logger.error('Auth token is for old revision of auth store')
exc = e
self._reauthenticate_reason = ReAuthenticateMode.WITHOUT_WATCHER_RESTART \
if isinstance(exc, AuthOldRevision) else ReAuthenticateMode.REQUIRED
if not retry:
self._reauthenticate = True
if retry:
logger.error('retry = %s', retry)
retry.ensure_deadline(0.5, exc)
elif reauthenticated:
raise exc
retry.ensure_deadline(0.5, exc)

@_handle_auth_errors
def range(self, key: str, range_end: Union[bytes, str, None] = None, serializable: bool = True,
Expand Down Expand Up @@ -603,12 +596,6 @@ def set_base_uri(self, value: str) -> None:
super(PatroniEtcd3Client, self).set_base_uri(value)
self._restart_watcher()

def authenticate(self, *, restart_watcher: bool = True, retry: Optional[Retry] = None) -> bool:
ret = super(PatroniEtcd3Client, self).authenticate(restart_watcher=restart_watcher, retry=retry)
if ret and restart_watcher:
self._restart_watcher()
return ret

def _wait_cache(self, timeout: float) -> None:
stop_time = time.time() + timeout
while self._kv_cache and not self._kv_cache.is_ready():
Expand Down Expand Up @@ -866,14 +853,16 @@ def _retry(*args: Any, **kwargs: Any) -> Any:
try:
return _retry(self._client.put, self.leader_path, self._name, self._lease, create_revision='0')
except LeaseNotFound:
logger.error('Our lease disappeared from Etcd. Will try to get a new one and retry attempt')
self._lease = None
retry.ensure_deadline(0)
if not retry.ensure_deadline(0):
logger.error('Our lease disappeared from Etcd. Deadline exceeded, giving up')
return False

logger.error('Our lease disappeared from Etcd. Will try to get a new one and retry attempt')

_retry(self._do_refresh_lease)

retry.ensure_deadline(1, Etcd3Error('_do_attempt_to_acquire_leader timeout'))

return _retry(self._client.put, self.leader_path, self._name, self._lease, create_revision='0')

@catch_return_false_exception
Expand Down
5 changes: 2 additions & 3 deletions patroni/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ def stoptime(self) -> float:
return self._cur_stoptime or 0

def ensure_deadline(self, timeout: float, raise_ex: Optional[Exception] = None) -> bool:
"""Calculates, sets, and checks the remaining deadline time.
"""Calculates and checks the remaining deadline time.
:param timeout: if the *deadline* is smaller than the provided *timeout* value raise *raise_ex* exception.
:param raise_ex: the exception object that will be raised if the *deadline* is smaller than provided *timeout*.
Expand All @@ -727,8 +727,7 @@ def ensure_deadline(self, timeout: float, raise_ex: Optional[Exception] = None)
:raises:
:class:`Exception`: *raise_ex* if calculated deadline is smaller than provided *timeout*.
"""
self.deadline = self.stoptime - time.time()
if self.deadline < timeout:
if self.stoptime - time.time() < timeout:
if raise_ex:
raise raise_ex
return False
Expand Down
6 changes: 2 additions & 4 deletions tests/test_consul.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,8 @@ def test_take_leader(self):
self.c.set_ttl(20)
self.c._do_refresh_session = Mock()
self.assertFalse(self.c.take_leader())
with patch('time.time', Mock(side_effect=[0, 100])):
self.assertRaises(ConsulError, self.c.take_leader)
with patch('time.time', Mock(side_effect=[0, 0, 0, 0, 0, 0, 100])):
self.assertRaises(ConsulError, self.c.take_leader)
with patch('time.time', Mock(side_effect=[0, 0, 0, 100, 100])):
self.assertFalse(self.c.take_leader())

@patch.object(consul.Consul.KV, 'put', Mock(return_value=True))
def test_set_failover_value(self):
Expand Down
12 changes: 7 additions & 5 deletions tests/test_etcd3.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from patroni.dcs import get_dcs
from patroni.dcs.etcd import DnsCachingResolver
from patroni.dcs.etcd3 import PatroniEtcd3Client, Cluster, Etcd3, Etcd3Client, \
Etcd3Error, Etcd3ClientError, ReAuthenticateMode, RetryFailedError, InvalidAuthToken, Unavailable, \
Etcd3Error, Etcd3ClientError, RetryFailedError, InvalidAuthToken, Unavailable, \
Unknown, UnsupportedEtcdVersion, UserEmpty, AuthFailed, AuthOldRevision, base64_encode
from patroni.postgresql.mpp import get_mpp
from threading import Thread
Expand Down Expand Up @@ -166,12 +166,14 @@ def test__handle_auth_errors(self, mock_urlopen):
retry = self.etcd3._retry.copy()
with patch('time.time', Mock(side_effect=[0, 10, 20, 30, 40])):
self.assertRaises(InvalidAuthToken, retry, self.client.deleteprefix, 'foo', retry=retry)
with patch('time.time', Mock(side_effect=[0, 10])):
self.assertRaises(InvalidAuthToken, self.client.deleteprefix, 'foo')
self.client.username = None
self.client._reauthenticate_reason = ReAuthenticateMode.NOT_REQUIRED
self.client._reauthenticate = False
retry = self.etcd3._retry.copy()
self.assertRaises(InvalidAuthToken, retry, self.client.deleteprefix, 'foo', retry=retry)
mock_urlopen.return_value.content = '{"code":3,"error":"etcdserver: revision of auth store is old"}'
self.client._reauthenticate_reason = ReAuthenticateMode.NOT_REQUIRED
self.client._reauthenticate = False
self.assertRaises(AuthOldRevision, retry, self.client.deleteprefix, 'foo', retry=retry)

def test__handle_server_response(self):
Expand Down Expand Up @@ -271,8 +273,8 @@ def test_take_leader(self):

def test_attempt_to_acquire_leader(self):
self.assertFalse(self.etcd3.attempt_to_acquire_leader())
with patch('time.time', Mock(side_effect=[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 100, 200])):
self.assertRaises(Etcd3Error, self.etcd3.attempt_to_acquire_leader)
with patch('time.time', Mock(side_effect=[0, 0, 0, 0, 0, 100, 200])):
self.assertFalse(self.etcd3.attempt_to_acquire_leader())
with patch('time.time', Mock(side_effect=[0, 100, 200, 300, 400])):
self.assertRaises(Etcd3Error, self.etcd3.attempt_to_acquire_leader)
with patch.object(PatroniEtcd3Client, 'put', Mock(return_value=False)):
Expand Down

0 comments on commit a8cfd46

Please sign in to comment.