diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 098bd0104..311aed667 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -174,7 +174,7 @@ jobs: - uses: jakebailey/pyright-action@v1 with: - version: 1.1.347 + version: 1.1.356 docs: runs-on: ubuntu-latest diff --git a/docs/releases.rst b/docs/releases.rst index ac167bd0a..f30aacbc4 100644 --- a/docs/releases.rst +++ b/docs/releases.rst @@ -3,6 +3,76 @@ Release notes ============= +Version 3.3.0 +------------- + +.. warning:: + All older Partoni versions are not compatible with ``ydiff>=1.3``. + + There are the following options available to "fix" the problem: + + 1. upgrade Patroni to the latest version + 2. install ``ydiff<1.3`` after installing Patroni + 3. install ``cdiff`` module + + +**New features** + +- Add ability to pass ``auth_data`` to Zookeeper client (Aras Mumcuyan) + + It allows to specify the authentication credentials to use for the connection. + +- Add a contrib script for ``Barman`` integration (Israel Barth Rubio) + + Provide an application ``patroni_barman`` that allows to perform ``Barman`` operations remotely and can be used as a custom bootstrap/custom replica method or as an ``on_role_change`` callback. Please check :ref:`here ` for more information. + +- Support ``JSON`` log format (alisalemmi) + + Apart from ``plain`` (default), Patroni now also supports ``json`` log format. Requires ``python-json-logger>=2.0.2`` library to be installed. + +- Show ``pending_restart_reason`` information (Polina Bungina) + + Provide extended information about the PostgreSQL parameters that caused ``pending_restart`` flag to be set. Both ``patronictl list`` and ``/patroni`` REST API endpoint now show the parameters names and their "diff" as ``pending_restart_reason``. + +- Implement ``nostream`` tag (Grigory Smolkin) + + If ``nostream`` tag is set to ``true``, the node will not use replication protocol to stream WAL but instead rely on archive recovery (if ``restore_command`` is configured). It also disables copying and synchronization of permanent logical replication slots on the node itself and all its cascading replicas. + + +**Improvements** + +- Implement validation of the ``log`` section (Alexander Kukushkin) + + Until now validator was not checking the correctness of the logging configuration provided. + +- Improve logging for PostgreSQL parameters change (Polina Bungina) + + Convert old values to a human-readable format and log information about the ``pg_controldata`` vs Patroni global configuration mismatch. + + +**Bugfixes** + +- Properly filter out not allowed ``pg_basebackup`` options (Israel Barth Rubio) + + Due to a bug, Patroni was not properly filtering out the not allowed options configured for the ``basebackup`` replica bootstrap method, when provided in the ``- setting: value`` format. + +- Fix ``etcd3`` authentication error handling (Alexander Kukushkin) + + Always retry one time on ``etcd3`` authentication error if authentication was not done right before executing the request. Also, do not restart watchers on reauthentication. + +- Improve logic of the validator files discovery (Waynerv) + + Use ``importlib`` library to discover the files with available configuration parameters when possible (for Python 3.9+). This implementation is more stable and doesn't break the Patroni distributions based on ``zip`` archives. + +- Use ``target_session_attrs`` only when multiple hosts are specified in the ``standby_cluster`` section (Alexander Kukushkin) + + ``target_session_attrs=read-write`` is now added to the ``primary_conninfo`` on the standby leader node only when ``standby_cluster.host`` section contains multiple hosts separated by commas. + +- Add compatibility code for ``ydiff`` library version 1.3+ (Alexander Kukushkin) + + Patroni is relying on some API from ``ydiff`` that is not public because it is supposed to be just a terminal tool rather than a python module. Unfortunately, the API change in 1.3 broke old Patroni versions. + + Version 3.2.2 ------------- diff --git a/docs/tools_integration.rst b/docs/tools_integration.rst index de7d3c6d4..1419e58d3 100644 --- a/docs/tools_integration.rst +++ b/docs/tools_integration.rst @@ -1,3 +1,5 @@ +.. _tools_integration: + Integration with other tools ============================ diff --git a/patroni.spec b/patroni.spec index b5c414a2e..ad9d2d7b4 100644 --- a/patroni.spec +++ b/patroni.spec @@ -13,13 +13,17 @@ def hiddenimports(): sys.path.pop(0) +def resources(): + import os + res_dir = 'patroni/postgresql/available_parameters/' + exts = set(f.split('.')[-1] for f in os.listdir(res_dir)) + return [(res_dir + '*.' + e, res_dir) for e in exts if e.lower() in {'yml', 'yaml'}] + + a = Analysis(['patroni/__main__.py'], pathex=[], binaries=None, - datas=[ - ('patroni/postgresql/available_parameters/*.yml', 'patroni/postgresql/available_parameters'), - ('patroni/postgresql/available_parameters/*.yaml', 'patroni/postgresql/available_parameters'), - ], + datas=resources(), hiddenimports=hiddenimports(), hookspath=[], runtime_hooks=[], diff --git a/patroni/collections.py b/patroni/collections.py index 2d38e8d7f..94dff7bb9 100644 --- a/patroni/collections.py +++ b/patroni/collections.py @@ -1,9 +1,10 @@ """Patroni custom object types somewhat like :mod:`collections` module. -Provides a case insensitive :class:`dict` and :class:`set` object types. +Provides a case insensitive :class:`dict` and :class:`set` object types, and `EMPTY_DICT` frozen dictionary object. """ from collections import OrderedDict -from typing import Any, Collection, Dict, Iterator, KeysView, MutableMapping, MutableSet, Optional +from copy import deepcopy +from typing import Any, Collection, Dict, Iterator, KeysView, Mapping, MutableMapping, MutableSet, Optional class CaseInsensitiveSet(MutableSet[str]): @@ -48,7 +49,7 @@ def __str__(self) -> str: """ return str(set(self._values.values())) - def __contains__(self, value: str) -> bool: + def __contains__(self, value: object) -> bool: """Check if set contains *value*. The check is performed case-insensitively. @@ -57,7 +58,7 @@ def __contains__(self, value: str) -> bool: :returns: ``True`` if *value* is already in the set, ``False`` otherwise. """ - return value.lower() in self._values + return isinstance(value, str) and value.lower() in self._values def __iter__(self) -> Iterator[str]: """Iterate over the values in this set. @@ -207,3 +208,47 @@ def __repr__(self) -> str: "'.format(type(self).__name__, dict(self.items()), id(self)) + + +class _FrozenDict(Mapping[str, Any]): + """Frozen dictionary object.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + """Create a new instance of :class:`_FrozenDict` with given data.""" + self.__values: Dict[str, Any] = dict(*args, **kwargs) + + def __iter__(self) -> Iterator[str]: + """Iterate over keys of this dict. + + :yields: each key present in the dict. Yields each key with its last case that has been stored. + """ + return iter(self.__values) + + def __len__(self) -> int: + """Get the length of this dict. + + :returns: number of keys in the dict. + + :Example: + + >>> len(_FrozenDict()) + 0 + """ + return len(self.__values) + + def __getitem__(self, key: str) -> Any: + """Get the value corresponding to *key*. + + :returns: value corresponding to *key*. + """ + return self.__values[key] + + def copy(self) -> Dict[str, Any]: + """Create a copy of this dict. + + :return: a new dict object with the same keys and values of this dict. + """ + return deepcopy(self.__values) + + +EMPTY_DICT = _FrozenDict() diff --git a/patroni/config.py b/patroni/config.py index baca2a001..662047f87 100644 --- a/patroni/config.py +++ b/patroni/config.py @@ -12,7 +12,7 @@ from typing import Any, Callable, Collection, Dict, List, Optional, Union, TYPE_CHECKING from . import PATRONI_ENV_PREFIX -from .collections import CaseInsensitiveDict +from .collections import CaseInsensitiveDict, EMPTY_DICT from .dcs import ClusterConfig from .exceptions import ConfigParseError from .file_perm import pg_perm @@ -445,14 +445,14 @@ def _safe_copy_dynamic_configuration(self, dynamic_configuration: Dict[str, Any] for name, value in dynamic_configuration.items(): if name == 'postgresql': - for name, value in (value or {}).items(): + for name, value in (value or EMPTY_DICT).items(): if name == 'parameters': config['postgresql'][name].update(self._process_postgresql_parameters(value)) elif name not in ('connect_address', 'proxy_address', 'listen', 'config_dir', 'data_dir', 'pgpass', 'authentication'): config['postgresql'][name] = deepcopy(value) elif name == 'standby_cluster': - for name, value in (value or {}).items(): + for name, value in (value or EMPTY_DICT).items(): if name in self.__DEFAULT_CONFIG['standby_cluster']: config['standby_cluster'][name] = deepcopy(value) elif name in config: # only variables present in __DEFAULT_CONFIG allowed to be overridden from DCS diff --git a/patroni/config_generator.py b/patroni/config_generator.py index cf0b02a52..1ff4dce0b 100644 --- a/patroni/config_generator.py +++ b/patroni/config_generator.py @@ -15,6 +15,7 @@ from psycopg2 import cursor from . import psycopg +from .collections import EMPTY_DICT from .config import Config from .exceptions import PatroniException from .log import PatroniLogger @@ -244,7 +245,8 @@ def _get_int_major_version(self) -> int: See :func:`~patroni.postgresql.misc.postgres_major_version_to_int` and :func:`~patroni.utils.get_major_version`. """ - postgres_bin = ((self.config.get('postgresql') or {}).get('bin_name') or {}).get('postgres', 'postgres') + postgres_bin = ((self.config.get('postgresql') + or EMPTY_DICT).get('bin_name') or EMPTY_DICT).get('postgres', 'postgres') return postgres_major_version_to_int(get_major_version(self.config['postgresql'].get('bin_dir'), postgres_bin)) def generate(self) -> None: @@ -411,8 +413,10 @@ def _set_su_params(self) -> None: val = self.parsed_dsn.get(conn_param, os.getenv(env_var)) if val: su_params[conn_param] = val - patroni_env_su_username = ((self.config.get('authentication') or {}).get('superuser') or {}).get('username') - patroni_env_su_pwd = ((self.config.get('authentication') or {}).get('superuser') or {}).get('password') + patroni_env_su_username = ((self.config.get('authentication') + or EMPTY_DICT).get('superuser') or EMPTY_DICT).get('username') + patroni_env_su_pwd = ((self.config.get('authentication') + or EMPTY_DICT).get('superuser') or EMPTY_DICT).get('password') # because we use "username" in the config for some reason su_params['username'] = su_params.pop('user', patroni_env_su_username) or getuser() su_params['password'] = su_params.get('password', patroni_env_su_pwd) or \ diff --git a/patroni/dcs/__init__.py b/patroni/dcs/__init__.py index 21eba1bd6..971cc3c91 100644 --- a/patroni/dcs/__init__.py +++ b/patroni/dcs/__init__.py @@ -85,6 +85,8 @@ def dcs_modules() -> List[str]: :returns: list of known module names with absolute python module path namespace, e.g. ``patroni.dcs.etcd``. """ + if TYPE_CHECKING: # pragma: no cover + assert isinstance(__package__, str) return iter_modules(__package__) @@ -101,6 +103,8 @@ def iter_dcs_classes( :returns: an iterator of tuples, each containing the module ``name`` and the imported DCS class object. """ + if TYPE_CHECKING: # pragma: no cover + assert isinstance(__package__, str) return iter_classes(__package__, AbstractDCS, config) diff --git a/patroni/dcs/consul.py b/patroni/dcs/consul.py index c6747d060..deedfdb14 100644 --- a/patroni/dcs/consul.py +++ b/patroni/dcs/consul.py @@ -444,8 +444,9 @@ def _mpp_cluster_loader(self, path: str) -> Dict[int, Cluster]: :returns: all MPP groups as :class:`dict`, with group IDs as keys and :class:`Cluster` objects as values. """ + results: Optional[List[Dict[str, Any]]] _, results = self.retry(self._client.kv.get, path, recurse=True, consistency=self._consistency) - clusters: Dict[int, Dict[str, Cluster]] = defaultdict(dict) + clusters: Dict[int, Dict[str, Dict[str, Any]]] = defaultdict(dict) for node in results or []: key = node['Key'][len(path):].split('/', 1) if len(key) == 2 and self._mpp.group_re.match(key[0]): diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 4b32130d4..9590bb98f 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -20,6 +20,7 @@ from typing import Any, Callable, Collection, Dict, List, Optional, Tuple, Type, Union, TYPE_CHECKING from . import AbstractDCS, Cluster, ClusterConfig, Failover, Leader, Member, Status, SyncState, TimelineHistory +from ..collections import EMPTY_DICT from ..exceptions import DCSError from ..postgresql.mpp import AbstractMPP from ..utils import deep_compare, iter_response_objects, keepalive_socket_options, \ @@ -470,7 +471,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Union[urllib3.HTTPResponse, K8sObject] if len(args) == 3: # name, namespace, body body = args[2] elif action == 'create': # namespace, body - body = args[1] + body = args[1] # pyright: ignore [reportGeneralTypeIssues] elif action == 'delete': # name, namespace body = kwargs.pop('body', None) else: @@ -509,7 +510,7 @@ def __init__(self, orig: K8sClient.rest.ApiException) -> None: @property def sleeptime(self) -> Optional[int]: try: - return int((self.headers or {}).get('retry-after', '')) + return int((self.headers or EMPTY_DICT).get('retry-after', '')) except Exception: return None @@ -654,7 +655,7 @@ def _process_event(self, event: Dict[str, Union[Any, Dict[str, Union[Any, Dict[s obj = K8sObject(obj) success, old_value = self.set(name, obj) if success: - new_value = (obj.metadata.annotations or {}).get(self._annotations_map.get(name)) + new_value = (obj.metadata.annotations or EMPTY_DICT).get(self._annotations_map.get(name, '')) elif ev_type == 'DELETED': success, old_value = self.delete(name, obj['metadata']['resourceVersion']) else: @@ -662,7 +663,7 @@ def _process_event(self, event: Dict[str, Union[Any, Dict[str, Union[Any, Dict[s if success and obj.get('kind') != 'Pod': if old_value: - old_value = (old_value.metadata.annotations or {}).get(self._annotations_map.get(name)) + old_value = (old_value.metadata.annotations or EMPTY_DICT).get(self._annotations_map.get(name, '')) value_changed = old_value != new_value and \ (name != self._dcs.config_path or old_value is not None and new_value is not None) @@ -844,7 +845,7 @@ def reload_config(self, config: Union['Config', Dict[str, Any]]) -> None: @staticmethod def member(pod: K8sObject) -> Member: - annotations = pod.metadata.annotations or {} + annotations = pod.metadata.annotations or EMPTY_DICT member = Member.from_node(pod.metadata.resource_version, pod.metadata.name, None, annotations.get('status', '')) member.data['pod_labels'] = pod.metadata.labels return member @@ -925,7 +926,7 @@ def _cluster_from_nodes(self, group: str, nodes: Dict[str, K8sObject], pods: Col failover = nodes.get(path + self._FAILOVER) metadata = failover and failover.metadata failover = metadata and Failover.from_node(metadata.resource_version, - (metadata.annotations or {}).copy()) + (metadata.annotations or EMPTY_DICT).copy()) # get synchronization state sync = nodes.get(path + self._SYNC) @@ -1047,8 +1048,9 @@ def subsets_changed(last_observed_subsets: List[K8sObject], ip: str, ports: List def __target_ref(self, leader_ip: str, latest_subsets: List[K8sObject], pod: K8sObject) -> K8sObject: # we want to re-use existing target_ref if possible + empty_addresses: List[K8sObject] = [] for subset in latest_subsets: - for address in subset.addresses or []: + for address in subset.addresses or empty_addresses: if address.ip == leader_ip and address.target_ref and address.target_ref.name == self._name: return address.target_ref return k8s_client.V1ObjectReference(kind='Pod', uid=pod.metadata.uid, namespace=self._namespace, @@ -1056,7 +1058,8 @@ def __target_ref(self, leader_ip: str, latest_subsets: List[K8sObject], pod: K8s def _map_subsets(self, endpoints: Dict[str, Any], ips: List[str]) -> None: leader = self._kinds.get(self.leader_path) - latest_subsets = leader and leader.subsets or [] + empty_addresses: List[K8sObject] = [] + latest_subsets = leader and leader.subsets or empty_addresses if not ips: # We want to have subsets empty if latest_subsets: @@ -1212,7 +1215,7 @@ def _retry(*args: Any, **kwargs: Any) -> Any: if not retry.ensure_deadline(0.5): return False - kind_annotations = kind and kind.metadata.annotations or {} + kind_annotations = kind and kind.metadata.annotations or EMPTY_DICT kind_resource_version = kind and kind.metadata.resource_version # There is different leader or resource_version in cache didn't change @@ -1225,7 +1228,7 @@ def _retry(*args: Any, **kwargs: Any) -> Any: def update_leader(self, leader: Leader, last_lsn: Optional[int], slots: Optional[Dict[str, int]] = None, failsafe: Optional[Dict[str, str]] = None) -> bool: kind = self._kinds.get(self.leader_path) - kind_annotations = kind and kind.metadata.annotations or {} + kind_annotations = kind and kind.metadata.annotations or EMPTY_DICT if kind and kind_annotations.get(self._LEADER) != self._name: return False @@ -1346,7 +1349,7 @@ def _delete_leader(self, leader: Leader) -> bool: def delete_leader(self, leader: Optional[Leader], last_lsn: Optional[int] = None) -> bool: ret = False kind = self._kinds.get(self.leader_path) - if kind and (kind.metadata.annotations or {}).get(self._LEADER) == self._name: + if kind and (kind.metadata.annotations or EMPTY_DICT).get(self._LEADER) == self._name: annotations: Dict[str, Optional[str]] = {self._LEADER: None} if last_lsn: annotations[self._OPTIME] = str(last_lsn) diff --git a/patroni/global_config.py b/patroni/global_config.py index ded66f6fe..5423d7e58 100644 --- a/patroni/global_config.py +++ b/patroni/global_config.py @@ -10,6 +10,7 @@ from copy import deepcopy from typing import Any, Dict, List, Optional, Union, TYPE_CHECKING +from .collections import EMPTY_DICT from .utils import parse_bool, parse_int if TYPE_CHECKING: # pragma: no cover @@ -214,7 +215,7 @@ def max_timelines_history(self) -> int: @property def use_slots(self) -> bool: """``True`` if cluster is configured to use replication slots.""" - return bool(parse_bool((self.get('postgresql') or {}).get('use_slots', True))) + return bool(parse_bool((self.get('postgresql') or EMPTY_DICT).get('use_slots', True))) @property def permanent_slots(self) -> Dict[str, Any]: @@ -222,7 +223,7 @@ def permanent_slots(self) -> Dict[str, Any]: return deepcopy(self.get('permanent_replication_slots') or self.get('permanent_slots') or self.get('slots') - or {}) + or EMPTY_DICT.copy()) sys.modules[__name__] = GlobalConfig() diff --git a/patroni/log.py b/patroni/log.py index beaacf4e8..177520566 100644 --- a/patroni/log.py +++ b/patroni/log.py @@ -413,7 +413,8 @@ def reload_config(self, config: Dict[str, Any]) -> None: if not isinstance(handler, RotatingFileHandler): handler = RotatingFileHandler(os.path.join(config['dir'], __name__)) - handler.maxBytes = int(config.get('file_size', 25000000)) # pyright: ignore [reportGeneralTypeIssues] + max_file_size = int(config.get('file_size', 25000000)) + handler.maxBytes = max_file_size # pyright: ignore [reportAttributeAccessIssue] handler.backupCount = int(config.get('file_num', 4)) # we can't use `if not isinstance(handler, logging.StreamHandler)` below, # because RotatingFileHandler is a child of StreamHandler!!! diff --git a/patroni/postgresql/__init__.py b/patroni/postgresql/__init__.py index 059c8034f..f6deb97de 100644 --- a/patroni/postgresql/__init__.py +++ b/patroni/postgresql/__init__.py @@ -26,7 +26,7 @@ from .sync import SyncHandler from .. import global_config, psycopg from ..async_executor import CriticalTask -from ..collections import CaseInsensitiveSet, CaseInsensitiveDict +from ..collections import CaseInsensitiveSet, CaseInsensitiveDict, EMPTY_DICT from ..dcs import Cluster, Leader, Member, SLOT_ADVANCE_AVAILABLE_VERSION from ..exceptions import PostgresConnectionException from ..utils import Retry, RetryFailedError, polling_loop, data_directory_is_empty, parse_int @@ -272,7 +272,7 @@ def pgcommand(self, cmd: str) -> str: :returns: path to Postgres binary named *cmd*. """ - return os.path.join(self._bin_dir, (self.config.get('bin_name', {}) or {}).get(cmd, cmd)) + return os.path.join(self._bin_dir, (self.config.get('bin_name', {}) or EMPTY_DICT).get(cmd, cmd)) def pg_ctl(self, cmd: str, *args: str, **kwargs: Any) -> bool: """Builds and executes pg_ctl command @@ -414,7 +414,7 @@ def data_directory_empty(self) -> bool: return data_directory_is_empty(self._data_dir) def replica_method_options(self, method: str) -> Dict[str, Any]: - return deepcopy(self.config.get(method, {}) or {}) + return deepcopy(self.config.get(method, {}) or EMPTY_DICT.copy()) def replica_method_can_work_without_replication_connection(self, method: str) -> bool: return method != 'basebackup' and bool(self.replica_method_options(method).get('no_master') diff --git a/patroni/postgresql/available_parameters/__init__.py b/patroni/postgresql/available_parameters/__init__.py index 8af63cd97..8ab4a20fb 100644 --- a/patroni/postgresql/available_parameters/__init__.py +++ b/patroni/postgresql/available_parameters/__init__.py @@ -1,12 +1,13 @@ import logging import sys -from pathlib import Path from typing import Iterator logger = logging.getLogger(__name__) -if sys.version_info < (3, 9): +if sys.version_info < (3, 9): # pragma: no cover + from pathlib import Path + PathLikeObj = Path conf_dir = Path(__file__).parent else: diff --git a/patroni/postgresql/bootstrap.py b/patroni/postgresql/bootstrap.py index d112dbe61..779410c14 100644 --- a/patroni/postgresql/bootstrap.py +++ b/patroni/postgresql/bootstrap.py @@ -7,6 +7,7 @@ from typing import Any, Callable, Dict, List, Optional, Union, Tuple, TYPE_CHECKING from ..async_executor import CriticalTask +from ..collections import EMPTY_DICT from ..dcs import Leader, Member, RemoteMember from ..psycopg import quote_ident, quote_literal from ..utils import deep_compare, unquote @@ -146,7 +147,7 @@ def _post_restore(self) -> None: # make sure there is no trigger file or postgres will be automatically promoted trigger_file = self._postgresql.config.triggerfile_good_name - trigger_file = (self._postgresql.config.get('recovery_conf') or {}).get(trigger_file) or 'promote' + trigger_file = (self._postgresql.config.get('recovery_conf') or EMPTY_DICT).get(trigger_file) or 'promote' trigger_file = os.path.abspath(os.path.join(self._postgresql.data_dir, trigger_file)) if os.path.exists(trigger_file): os.unlink(trigger_file) @@ -441,7 +442,7 @@ def post_bootstrap(self, config: Dict[str, Any], task: CriticalTask) -> Optional if config.get('users'): logger.warning('User creation via "bootstrap.users" will be removed in v4.0.0') - for name, value in (config.get('users') or {}).items(): + for name, value in (config.get('users') or EMPTY_DICT).items(): if all(name != a.get('username') for a in (superuser, replication, rewind)): self.create_or_update_role(name, value.get('password'), value.get('options', [])) diff --git a/patroni/postgresql/cancellable.py b/patroni/postgresql/cancellable.py index 83423808c..e44729b07 100644 --- a/patroni/postgresql/cancellable.py +++ b/patroni/postgresql/cancellable.py @@ -100,7 +100,8 @@ def call(self, *args: Any, **kwargs: Union[Any, Dict[str, str]]) -> Optional[int if started and self._process is not None: if isinstance(communicate, dict): - communicate['stdout'], communicate['stderr'] = self._process.communicate(input_data) + communicate['stdout'], communicate['stderr'] = \ + self._process.communicate(input_data) # pyright: ignore [reportGeneralTypeIssues] return self._process.wait() finally: with self._lock: diff --git a/patroni/postgresql/config.py b/patroni/postgresql/config.py index ca8c8e75e..8b7d03286 100644 --- a/patroni/postgresql/config.py +++ b/patroni/postgresql/config.py @@ -13,7 +13,7 @@ from .validator import recovery_parameters, transform_postgresql_parameter_value, transform_recovery_parameter_value from .. import global_config -from ..collections import CaseInsensitiveDict, CaseInsensitiveSet +from ..collections import CaseInsensitiveDict, CaseInsensitiveSet, EMPTY_DICT from ..dcs import Leader, Member, RemoteMember, slot_name_from_member_name from ..exceptions import PatroniFatalException, PostgresConnectionException from ..file_perm import pg_perm @@ -619,7 +619,8 @@ def _write_recovery_params(self, fd: ConfigWriter, recovery_params: CaseInsensit fd.write_param(name, value) def build_recovery_params(self, member: Union[Leader, Member, None]) -> CaseInsensitiveDict: - recovery_params = CaseInsensitiveDict({p: v for p, v in (self.get('recovery_conf') or {}).items() + default: Dict[str, Any] = {} + recovery_params = CaseInsensitiveDict({p: v for p, v in (self.get('recovery_conf') or default).items() if not p.lower().startswith('recovery_target') and p.lower() not in ('primary_conninfo', 'primary_slot_name')}) recovery_params.update({'standby_mode': 'on', 'recovery_target_timeline': 'latest'}) @@ -845,7 +846,7 @@ def record_missmatch(mtype: bool) -> None: required['restart' if mtype else 'reload'] += 1 wanted_recovery_params = self.build_recovery_params(member) - for param, value in (self._current_recovery_params or {}).items(): + for param, value in (self._current_recovery_params or EMPTY_DICT).items(): # Skip certain parameters defined in the included postgres config files # if we know that they are not specified in the patroni configuration. if len(value) > 2 and value[2] not in (self._postgresql_conf, self._auto_conf) and \ @@ -1324,4 +1325,4 @@ def get(self, key: str, default: Optional[Any] = None) -> Optional[Any]: return self._config.get(key, default) def restore_command(self) -> Optional[str]: - return (self.get('recovery_conf') or {}).get('restore_command') + return (self.get('recovery_conf') or EMPTY_DICT).get('restore_command') diff --git a/patroni/postgresql/mpp/__init__.py b/patroni/postgresql/mpp/__init__.py index bb180d8a5..5120db35a 100644 --- a/patroni/postgresql/mpp/__init__.py +++ b/patroni/postgresql/mpp/__init__.py @@ -299,6 +299,8 @@ def iter_mpp_classes( :yields: tuples, each containing the module ``name`` and the imported MPP class object. """ + if TYPE_CHECKING: # pragma: no cover + assert isinstance(__package__, str) yield from iter_classes(__package__, AbstractMPP, config) diff --git a/patroni/postgresql/rewind.py b/patroni/postgresql/rewind.py index 0639866cc..c3461630d 100644 --- a/patroni/postgresql/rewind.py +++ b/patroni/postgresql/rewind.py @@ -13,6 +13,7 @@ from .connection import get_connection_cursor from .misc import format_lsn, fsync_dir, parse_history, parse_lsn from ..async_executor import CriticalTask +from ..collections import EMPTY_DICT from ..dcs import Leader, RemoteMember logger = logging.getLogger(__name__) @@ -418,7 +419,7 @@ def pg_rewind(self, r: Dict[str, Any]) -> bool: dsn = self._postgresql.config.format_dsn(r, True) logger.info('running pg_rewind from %s', dsn) - restore_command = (self._postgresql.config.get('recovery_conf') or {}).get('restore_command') \ + restore_command = (self._postgresql.config.get('recovery_conf') or EMPTY_DICT).get('restore_command') \ if self._postgresql.major_version < 120000 else self._postgresql.get_guc_value('restore_command') # Until v15 pg_rewind expected postgresql.conf to be inside $PGDATA, which is not the case on e.g. Debian diff --git a/patroni/psycopg.py b/patroni/psycopg.py index 4a92047ca..556244143 100644 --- a/patroni/psycopg.py +++ b/patroni/psycopg.py @@ -42,7 +42,8 @@ def quote_literal(value: Any, conn: Optional[Any] = None) -> str: value.prepare(conn) return value.getquoted().decode('utf-8') except ImportError: - from psycopg import connect as __connect, sql, Error, DatabaseError, OperationalError, ProgrammingError + from psycopg import connect as __connect # pyright: ignore [reportUnknownVariableType] + from psycopg import sql, Error, DatabaseError, OperationalError, ProgrammingError def _connect(dsn: Optional[str] = None, **kwargs: Any) -> 'Connection[Any]': """Call :func:`psycopg.connect` with *dsn* and ``**kwargs``. @@ -56,7 +57,7 @@ def _connect(dsn: Optional[str] = None, **kwargs: Any) -> 'Connection[Any]': :returns: a connection to the database. """ - ret = __connect(dsn or "", **kwargs) + ret: 'Connection[Any]' = __connect(dsn or "", **kwargs) setattr(ret, 'server_version', ret.pgconn.server_version) # compatibility with psycopg2 return ret diff --git a/patroni/validator.py b/patroni/validator.py index 204c81f0d..233b05b72 100644 --- a/patroni/validator.py +++ b/patroni/validator.py @@ -11,8 +11,7 @@ from typing import Any, Dict, Union, Iterator, List, Optional as OptionalType, Tuple, TYPE_CHECKING -from .collections import CaseInsensitiveSet - +from .collections import CaseInsensitiveSet, EMPTY_DICT from .dcs import dcs_modules from .exceptions import ConfigParseError from .utils import parse_int, split_host_port, data_directory_is_empty, get_major_version @@ -245,7 +244,7 @@ def get_bin_name(bin_name: str) -> str: """ if TYPE_CHECKING: # pragma: no cover assert isinstance(schema.data, dict) - return (schema.data.get('postgresql', {}).get('bin_name', {}) or {}).get(bin_name, bin_name) + return (schema.data.get('postgresql', {}).get('bin_name', {}) or EMPTY_DICT).get(bin_name, bin_name) def validate_data_dir(data_dir: str) -> bool: diff --git a/patroni/version.py b/patroni/version.py index 232dc4a90..46b5acc04 100644 --- a/patroni/version.py +++ b/patroni/version.py @@ -2,4 +2,4 @@ :var __version__: the current Patroni version. """ -__version__ = '3.2.2' +__version__ = '3.3.0' diff --git a/tests/test_config.py b/tests/test_config.py index 7808be6c4..2ca38cbab 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -160,12 +160,10 @@ def test_invalid_path(self): @patch('patroni.config.logger') def test__validate_failover_tags(self, mock_logger, mock_get): """Ensures that only one of `nofailover` or `failover_priority` can be provided""" - config = Config("postgres0.yml") - # Providing one of `nofailover` or `failover_priority` is fine for single_param in ({"nofailover": True}, {"failover_priority": 1}, {"failover_priority": 0}): mock_get.side_effect = [single_param] * 2 - self.assertIsNone(config._validate_failover_tags()) + self.assertIsNone(self.config._validate_failover_tags()) mock_logger.warning.assert_not_called() # Providing both `nofailover` and `failover_priority` is fine if consistent @@ -175,7 +173,7 @@ def test__validate_failover_tags(self, mock_logger, mock_get): {"nofailover": "False", "failover_priority": 0} ): mock_get.side_effect = [consistent_state] * 2 - self.assertIsNone(config._validate_failover_tags()) + self.assertIsNone(self.config._validate_failover_tags()) mock_logger.warning.assert_not_called() # Providing both inconsistently should log a warning @@ -186,7 +184,7 @@ def test__validate_failover_tags(self, mock_logger, mock_get): {"nofailover": "", "failover_priority": 0} ): mock_get.side_effect = [inconsistent_state] * 2 - self.assertIsNone(config._validate_failover_tags()) + self.assertIsNone(self.config._validate_failover_tags()) mock_logger.warning.assert_called_once_with( 'Conflicting configuration between nofailover: %s and failover_priority: %s.' + ' Defaulting to nofailover: %s', diff --git a/tests/test_consul.py b/tests/test_consul.py index 87fb47045..8c7eae1d5 100644 --- a/tests/test_consul.py +++ b/tests/test_consul.py @@ -160,7 +160,7 @@ def test_take_leader(self): self.c.set_ttl(20) self.c._do_refresh_session = Mock() self.assertFalse(self.c.take_leader()) - with patch('time.time', Mock(side_effect=[0, 0, 0, 100, 100])): + with patch('time.time', Mock(side_effect=[0, 0, 0, 100, 100, 100])): self.assertFalse(self.c.take_leader()) @patch.object(consul.Consul.KV, 'put', Mock(return_value=True)) diff --git a/tests/test_etcd3.py b/tests/test_etcd3.py index 3e1bd9e83..ffcabd50c 100644 --- a/tests/test_etcd3.py +++ b/tests/test_etcd3.py @@ -273,7 +273,7 @@ def test_take_leader(self): def test_attempt_to_acquire_leader(self): self.assertFalse(self.etcd3.attempt_to_acquire_leader()) - with patch('time.time', Mock(side_effect=[0, 0, 0, 0, 0, 100, 200])): + with patch('time.time', Mock(side_effect=[0, 0, 0, 0, 0, 100, 200, 300])): self.assertFalse(self.etcd3.attempt_to_acquire_leader()) with patch('time.time', Mock(side_effect=[0, 100, 200, 300, 400])): self.assertRaises(Etcd3Error, self.etcd3.attempt_to_acquire_leader) diff --git a/tests/test_kubernetes.py b/tests/test_kubernetes.py index 235d24b20..59887e305 100644 --- a/tests/test_kubernetes.py +++ b/tests/test_kubernetes.py @@ -80,7 +80,7 @@ def mock_namespaced_kind(*args, **kwargs): def mock_load_k8s_config(self, *args, **kwargs): - self._server = '' + self._server = 'http://localhost' class TestK8sConfig(unittest.TestCase): @@ -242,6 +242,7 @@ def setUp(self, config=None): self.k.get_cluster() +@patch('urllib3.PoolManager.request', Mock()) @patch.object(k8s_client.CoreV1Api, 'patch_namespaced_config_map', mock_namespaced_kind, create=True) class TestKubernetesConfigMaps(BaseTestKubernetes): @@ -374,6 +375,7 @@ def test_reload_config(self, mock_warning): mock_warning.assert_called_once() +@patch('urllib3.PoolManager.request', Mock()) class TestKubernetesEndpointsNoPodIP(BaseTestKubernetes): @patch.object(k8s_client.CoreV1Api, 'list_namespaced_endpoints', mock_list_namespaced_endpoints, create=True) def setUp(self, config=None): @@ -388,6 +390,7 @@ def test_update_leader(self, mock_patch_namespaced_endpoints): self.assertEqual(args[2].subsets[0].addresses[0].ip, '10.0.0.1') +@patch('urllib3.PoolManager.request', Mock()) class TestKubernetesEndpoints(BaseTestKubernetes): @patch.object(k8s_client.CoreV1Api, 'list_namespaced_endpoints', mock_list_namespaced_endpoints, create=True) @@ -478,6 +481,7 @@ def mock_watch(*args): return urllib3.HTTPResponse() +@patch('urllib3.PoolManager.request', Mock()) class TestCacheBuilder(BaseTestKubernetes): @patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map, create=True)