Skip to content

Commit

Permalink
Add pending restart reason information (patroni#2978)
Browse files Browse the repository at this point in the history
Provide info about the PG parameters that caused "pending restart"
flag to be set. Both `patronictl list` and `/patroni` REST API endpoint
now show the parameters names and the diff as the "pending restart
reason".
  • Loading branch information
hughcapet authored Feb 14, 2024
1 parent 7adfc0d commit bdd0232
Show file tree
Hide file tree
Showing 12 changed files with 164 additions and 52 deletions.
7 changes: 5 additions & 2 deletions patroni/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ def _write_status_response(self, status_code: int, response: Dict[str, Any]) ->
* ``tags``: tags that were set through Patroni configuration merged with dynamically applied tags;
* ``database_system_identifier``: ``Database system identifier`` from ``pg_controldata`` output;
* ``pending_restart``: ``True`` if PostgreSQL is pending to be restarted;
* ``pending_restart_reason``: dictionary where each key is the parameter that caused "pending restart" flag
to be set and the value is a dictionary with the old and the new value.
* ``scheduled_restart``: a dictionary with a single key ``schedule``, which is the timestamp for the
scheduled restart;
* ``watchdog_failed``: ``True`` if watchdog device is unhealthy;
Expand All @@ -196,8 +198,9 @@ def _write_status_response(self, status_code: int, response: Dict[str, Any]) ->
response['tags'] = tags
if patroni.postgresql.sysid:
response['database_system_identifier'] = patroni.postgresql.sysid
if patroni.postgresql.pending_restart:
if patroni.postgresql.pending_restart_reason:
response['pending_restart'] = True
response['pending_restart_reason'] = dict(patroni.postgresql.pending_restart_reason)
response['patroni'] = {
'version': patroni.version,
'scope': patroni.postgresql.scope,
Expand Down Expand Up @@ -634,7 +637,7 @@ def do_GET_metrics(self) -> None:
metrics.append("# HELP patroni_pending_restart Value is 1 if the node needs a restart, 0 otherwise.")
metrics.append("# TYPE patroni_pending_restart gauge")
metrics.append("patroni_pending_restart{0} {1}"
.format(labels, int(patroni.postgresql.pending_restart)))
.format(labels, int(bool(patroni.postgresql.pending_restart_reason))))

metrics.append("# HELP patroni_is_paused Value is 1 if auto failover is disabled, 0 otherwise.")
metrics.append("# TYPE patroni_is_paused gauge")
Expand Down
12 changes: 10 additions & 2 deletions patroni/ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1558,7 +1558,7 @@ def output_members(cluster: Cluster, name: str, extended: bool = False,

all_members = [m for c in clusters.values() for m in c['members'] if 'host' in m]

for c in ('Pending restart', 'Scheduled restart', 'Tags'):
for c in ('Pending restart', 'Pending restart reason', 'Scheduled restart', 'Tags'):
if extended or any(m.get(c.lower().replace(' ', '_')) for m in all_members):
columns.append(c)

Expand All @@ -1572,11 +1572,19 @@ def output_members(cluster: Cluster, name: str, extended: bool = False,
logging.debug(member)

lag = member.get('lag', '')

def format_diff(param: str, values: Dict[str, str], hide_long: bool):
full_diff = param + ': ' + values['old_value'] + '->' + values['new_value']
return full_diff if not hide_long or len(full_diff) <= 50 else param + ': [hidden - too long]'
restart_reason = '\n'.join([format_diff(k, v, fmt in ('pretty', 'topology'))
for k, v in member.get('pending_restart_reason', {}).items()]) or ''

member.update(cluster=name, member=member['name'], group=g,
host=member.get('host', ''), tl=member.get('timeline', ''),
role=member['role'].replace('_', ' ').title(),
lag_in_mb=round(lag / 1024 / 1024) if isinstance(lag, int) else lag,
pending_restart='*' if member.get('pending_restart') else '')
pending_restart='*' if member.get('pending_restart') else '',
pending_restart_reason=restart_reason)

if append_port and member['host'] and member.get('port'):
member['host'] = ':'.join([member['host'], str(member['port'])])
Expand Down
5 changes: 3 additions & 2 deletions patroni/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,9 @@ def touch_member(self) -> bool:
tags = self.get_effective_tags()
if tags:
data['tags'] = tags
if self.state_handler.pending_restart:
if self.state_handler.pending_restart_reason:
data['pending_restart'] = True
data['pending_restart_reason'] = dict(self.state_handler.pending_restart_reason)
if self._async_executor.scheduled_action in (None, 'promote') \
and data['state'] in ['running', 'restarting', 'starting']:
try:
Expand Down Expand Up @@ -1485,7 +1486,7 @@ def restart_matches(self, role: Optional[str], postgres_version: Optional[str],
if postgres_version and postgres_version_to_int(postgres_version) <= int(self.state_handler.server_version):
reason_to_cancel = "postgres version mismatch"

if pending_restart and not self.state_handler.pending_restart:
if pending_restart and not self.state_handler.pending_restart_reason:
reason_to_cancel = "pending restart flag is not set"

if not reason_to_cancel:
Expand Down
25 changes: 18 additions & 7 deletions patroni/postgresql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from .sync import SyncHandler
from .. import global_config, psycopg
from ..async_executor import CriticalTask
from ..collections import CaseInsensitiveSet
from ..collections import CaseInsensitiveSet, CaseInsensitiveDict
from ..dcs import Cluster, Leader, Member, SLOT_ADVANCE_AVAILABLE_VERSION
from ..exceptions import PostgresConnectionException
from ..utils import Retry, RetryFailedError, polling_loop, data_directory_is_empty, parse_int
Expand Down Expand Up @@ -77,7 +77,7 @@ def __init__(self, config: Dict[str, Any], mpp: AbstractMPP) -> None:
self._state_lock = Lock()
self.set_state('stopped')

self._pending_restart = False
self._pending_restart_reason = CaseInsensitiveDict()
self.connection_pool = ConnectionPool()
self._connection = self.connection_pool.get('heartbeat')
self.citus_handler = mpp.get_handler_impl(self)
Expand Down Expand Up @@ -321,11 +321,22 @@ def reload_config(self, config: Dict[str, Any], sighup: bool = False) -> None:
self._is_leader_retry.deadline = self.retry.deadline = config['retry_timeout'] / 2.0

@property
def pending_restart(self) -> bool:
return self._pending_restart
def pending_restart_reason(self) -> CaseInsensitiveDict:
"""Get :attr:`_pending_restart_reason` value.
def set_pending_restart(self, value: bool) -> None:
self._pending_restart = value
:attr:`_pending_restart_reason` is a :class:`CaseInsensitiveDict` object of the PG parameters that are
causing pending restart state. Every key is a parameter name, value - a dictionary containing the old
and the new value (see :func:`~patroni.postgresql.config.get_param_diff`).
"""
return self._pending_restart_reason

def set_pending_restart_reason(self, diff_dict: CaseInsensitiveDict) -> None:
"""Set new or update current :attr:`_pending_restart_reason`.
:param diff_dict: :class:``CaseInsensitiveDict`` object with the parameters that are causing pending restart
state with the diff of their values. Used to reset/update the :attr:`_pending_restart_reason`.
"""
self._pending_restart_reason = diff_dict

@property
def sysid(self) -> str:
Expand Down Expand Up @@ -727,7 +738,7 @@ def start(self, timeout: Optional[float] = None, task: Optional[CriticalTask] =
self.set_role(role or self.get_postgres_role_from_data_directory())

self.set_state('starting')
self._pending_restart = False
self.set_pending_restart_reason(CaseInsensitiveDict())

try:
if not self.ensure_major_version_is_known():
Expand Down
2 changes: 1 addition & 1 deletion patroni/postgresql/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ def post_bootstrap(self, config: Dict[str, Any], task: CriticalTask) -> Optional
postgresql.restart()
else:
postgresql.config.replace_pg_hba()
if postgresql.pending_restart:
if postgresql.pending_restart_reason:
postgresql.restart()
else:
postgresql.reload()
Expand Down
63 changes: 50 additions & 13 deletions patroni/postgresql/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from contextlib import contextmanager
from urllib.parse import urlparse, parse_qsl, unquote
from types import TracebackType
from typing import Any, Collection, Dict, Iterator, List, Optional, Union, Tuple, Type, TYPE_CHECKING
from typing import Any, Callable, Collection, Dict, Iterator, List, Optional, Union, Tuple, Type, TYPE_CHECKING

from .validator import recovery_parameters, transform_postgresql_parameter_value, transform_recovery_parameter_value
from .. import global_config
Expand Down Expand Up @@ -271,6 +271,29 @@ def _bool_is_true_validator(value: Any) -> bool:
return parse_bool(value) is True


def get_param_diff(old_value: Any, new_value: Any,
vartype: Optional[str] = None, unit: Optional[str] = None) -> Dict[str, str]:
"""Get a dictionary representing a single PG parameter's value diff.
:param old_value: current :class:`str` parameter value.
:param new_value: :class:`str` value of the paramater after a restart.
:param vartype: the target type to parse old/new_value. See ``vartype`` argument of
:func:`~patroni.utils.maybe_convert_from_base_unit`.
:param unit: unit of *old/new_value*. See ``base_unit`` argument of
:func:`~patroni.utils.maybe_convert_from_base_unit`.
:returns: a :class:`dict` object that contains two keys: ``old_value`` and ``new_value``
with their values casted to :class:`str` and converted from base units (if possible).
"""
str_value: Callable[[Any], str] = lambda x: '' if x is None else str(x)
return {
'old_value': (maybe_convert_from_base_unit(str_value(old_value), vartype, unit)
if vartype else str_value(old_value)),
'new_value': (maybe_convert_from_base_unit(str_value(new_value), vartype, unit)
if vartype else str_value(new_value))
}


class ConfigHandler(object):

# List of parameters which must be always passed to postmaster as command line options
Expand Down Expand Up @@ -1080,7 +1103,8 @@ def reload_config(self, config: Dict[str, Any], sighup: bool = False) -> None:
server_parameters = self.get_server_parameters(config)
params_skip_changes = CaseInsensitiveSet((*self._RECOVERY_PARAMETERS, 'hot_standby', 'wal_log_hints'))

conf_changed = hba_changed = ident_changed = local_connection_address_changed = pending_restart = False
conf_changed = hba_changed = ident_changed = local_connection_address_changed = False
param_diff = CaseInsensitiveDict()
if self._postgresql.state == 'running':
changes = CaseInsensitiveDict({p: v for p, v in server_parameters.items()
if p not in params_skip_changes})
Expand All @@ -1103,16 +1127,16 @@ def reload_config(self, config: Dict[str, Any], sighup: bool = False) -> None:
new_value = changes.pop(r[0])
if new_value is None or not compare_values(r[3], r[2], r[1], new_value):
conf_changed = True
old_value = maybe_convert_from_base_unit(r[1], r[3], r[2])
if r[4] == 'postmaster':
pending_restart = True
param_diff[r[0]] = get_param_diff(r[1], new_value, r[3], r[2])
logger.info("Changed %s from '%s' to '%s' (restart might be required)",
r[0], old_value, new_value)
r[0], param_diff[r[0]]['old_value'], new_value)
if config.get('use_unix_socket') and r[0] == 'unix_socket_directories'\
or r[0] in ('listen_addresses', 'port'):
local_connection_address_changed = True
else:
logger.info("Changed %s from '%s' to '%s'", r[0], old_value, new_value)
logger.info("Changed %s from '%s' to '%s'",
r[0], maybe_convert_from_base_unit(r[1], r[3], r[2]), new_value)
elif r[0] in self._server_parameters \
and not compare_values(r[3], r[2], r[1], self._server_parameters[r[0]]):
# Check if any parameter was set back to the current pg_settings value
Expand Down Expand Up @@ -1140,7 +1164,6 @@ def reload_config(self, config: Dict[str, Any], sighup: bool = False) -> None:
ident_changed = self._config.get('pg_ident', []) != config['pg_ident']

self._config = config
self._postgresql.set_pending_restart(pending_restart)
self._server_parameters = server_parameters
self._adjust_recovery_parameters()
self._krbsrvname = config.get('krbsrvname')
Expand Down Expand Up @@ -1170,16 +1193,28 @@ def reload_config(self, config: Dict[str, Any], sighup: bool = False) -> None:
if self._postgresql.major_version >= 90500:
time.sleep(1)
try:
pending_restart = self._postgresql.query(
'SELECT COUNT(*) FROM pg_catalog.pg_settings'
' WHERE pg_catalog.lower(name) != ALL(%s) AND pending_restart',
[n.lower() for n in params_skip_changes])[0][0] > 0
self._postgresql.set_pending_restart(pending_restart)
settings_diff: CaseInsensitiveDict = CaseInsensitiveDict()
for param, value, unit, vartype in self._postgresql.query(
'SELECT name, pg_catalog.current_setting(name), unit, vartype FROM pg_catalog.pg_settings'
' WHERE pg_catalog.lower(name) != ALL(%s) AND pending_restart',
[n.lower() for n in params_skip_changes]):
new_value = self._postgresql.get_guc_value(param)
new_value = '?' if new_value is None else new_value
settings_diff[param] = get_param_diff(value, new_value, vartype, unit)
external_change = {param: value for param, value in settings_diff.items()
if param not in param_diff or value != param_diff[param]}
if external_change:
logger.info("PostgreSQL configuration parameters requiring restart"
" (%s) seem to be changed bypassing Patroni config."
" Setting 'Pending restart' flag", ', '.join(external_change))
param_diff = settings_diff
except Exception as e:
logger.warning('Exception %r when running query', e)
else:
logger.info('No PostgreSQL configuration items changed, nothing to reload.')

self._postgresql.set_pending_restart_reason(param_diff)

def set_synchronous_standby_names(self, value: Optional[str]) -> Optional[bool]:
"""Updates synchronous_standby_names and reloads if necessary.
:returns: True if value was updated."""
Expand Down Expand Up @@ -1222,6 +1257,7 @@ def effective_configuration(self) -> CaseInsensitiveDict:
data = self._postgresql.controldata()
effective_configuration = self._server_parameters.copy()

param_diff = CaseInsensitiveDict()
for name, cname in options_mapping.items():
value = parse_int(effective_configuration[name])
if cname not in data:
Expand All @@ -1233,7 +1269,8 @@ def effective_configuration(self) -> CaseInsensitiveDict:
effective_configuration[name] = cvalue
logger.info("%s value in pg_controldata: %d, in the global configuration: %d."
" pg_controldata value will be used. Setting 'Pending restart' flag", name, cvalue, value)
self._postgresql.set_pending_restart(True)
param_diff[name] = get_param_diff(cvalue, value)
self._postgresql.set_pending_restart_reason(param_diff)

# If we are using custom bootstrap with PITR it could fail when values like max_connections
# are increased, therefore we disable hot_standby if recovery_target_action == 'promote'.
Expand Down
2 changes: 1 addition & 1 deletion patroni/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ def cluster_as_json(cluster: 'Cluster') -> Dict[str, Any]:
member['host'] = conn_kwargs['host']
if conn_kwargs.get('port'):
member['port'] = int(conn_kwargs['port'])
optional_attributes = ('timeline', 'pending_restart', 'scheduled_restart', 'tags')
optional_attributes = ('timeline', 'pending_restart', 'pending_restart_reason', 'scheduled_restart', 'tags')
member.update({n: m.data[n] for n in optional_attributes if n in m.data})

if m.name != leader_name:
Expand Down
4 changes: 2 additions & 2 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ def execute(self, sql, *params):
self.results = [(False, 2)]
elif sql.startswith('SELECT pg_catalog.pg_postmaster_start_time'):
self.results = [(datetime.datetime.now(tzutc),)]
elif sql.endswith('AND pending_restart'):
self.results = []
elif sql.startswith('SELECT name, pg_catalog.current_setting(name) FROM pg_catalog.pg_settings'):
self.results = [('data_directory', 'data'),
('hba_file', os.path.join('data', 'pg_hba.conf')),
Expand All @@ -168,8 +170,6 @@ def execute(self, sql, *params):
('cluster_name', 'my_cluster')]
elif sql.startswith('SELECT name, setting'):
self.results = GET_PG_SETTINGS_RESULT
elif sql.startswith('SELECT COUNT(*) FROM pg_catalog.pg_settings'):
self.results = [(0,)]
elif sql.startswith('IDENTIFY_SYSTEM'):
self.results = [('1', 3, '0/402EEC0', '')]
elif sql.startswith('TIMELINE_HISTORY '):
Expand Down
4 changes: 3 additions & 1 deletion tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from patroni.dcs import ClusterConfig, Member
from patroni.exceptions import PostgresConnectionException
from patroni.ha import _MemberStatus
from patroni.postgresql.config import get_param_diff
from patroni.psycopg import OperationalError
from patroni.utils import RetryFailedError, tzutc

Expand Down Expand Up @@ -54,7 +55,7 @@ class MockPostgresql:
major_version = 90600
sysid = 'dummysysid'
scope = 'dummy'
pending_restart = True
pending_restart_reason = {}
wal_name = 'wal'
lsn_name = 'lsn'
wal_flush = '_flush'
Expand Down Expand Up @@ -202,6 +203,7 @@ class TestRestApiHandler(unittest.TestCase):
_authorization = '\nAuthorization: Basic dGVzdDp0ZXN0'

def test_do_GET(self):
MockPostgresql.pending_restart_reason = {'max_connections': get_param_diff('200', '100')}
MockPatroni.dcs.cluster.last_lsn = 20
MockPatroni.dcs.cluster.sync.members = [MockPostgresql.name]
with patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True)):
Expand Down
8 changes: 5 additions & 3 deletions tests/test_bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
from mock import Mock, PropertyMock, patch

from patroni.async_executor import CriticalTask
from patroni.collections import CaseInsensitiveDict
from patroni.postgresql import Postgresql
from patroni.postgresql.bootstrap import Bootstrap
from patroni.postgresql.cancellable import CancellableSubprocess
from patroni.postgresql.config import ConfigHandler
from patroni.postgresql.config import ConfigHandler, get_param_diff

from . import psycopg_connect, BaseTestPostgresql, mock_available_gucs

Expand Down Expand Up @@ -245,8 +246,9 @@ def test_post_bootstrap(self):
self.assertTrue(task.result)

self.b.bootstrap(config)
with patch.object(Postgresql, 'pending_restart', PropertyMock(return_value=True)), \
patch.object(Postgresql, 'restart', Mock()) as mock_restart:
with patch.object(Postgresql, 'pending_restart_reason',
PropertyMock(CaseInsensitiveDict({'max_connections': get_param_diff('200', '100')}))), \
patch.object(Postgresql, 'restart', Mock()) as mock_restart:
self.b.post_bootstrap({}, task)
mock_restart.assert_called_once()

Expand Down
Loading

0 comments on commit bdd0232

Please sign in to comment.