Skip to content

Commit

Permalink
Refactor replication slots handling (patroni#2851)
Browse files Browse the repository at this point in the history
1. make _get_members_slots() method return data in the same format as _get_permanent_slots() method
2. move conflicting name handling from get_replication_slots() to _get_members_slots() method
3. enrich structure returned by get_replication_slots() with the LSN of permanent logical slots reported by primary
4. use the added information in the SlotsHandler instead of fetching it from the Cluster.slots
5. bugfix: don't try to advance logical slot that doesn't match required configuration
  • Loading branch information
CyberDem0n authored Sep 7, 2023
1 parent 30f0f13 commit 19f20ec
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 44 deletions.
68 changes: 38 additions & 30 deletions patroni/dcs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -903,8 +903,16 @@ def get_clone_member(self, exclude_name: str) -> Union[Member, Leader, None]:

@property
def __permanent_slots(self) -> Dict[str, Union[Dict[str, Any], Any]]:
"""Dictionary of permanent replication slots."""
return self.config and self.config.permanent_slots or {}
"""Dictionary of permanent replication slots with their known LSN."""
ret = deepcopy(self.config.permanent_slots if self.config else {})
# If primary reported flush LSN for permanent slots we want to enrich our structure with it
for name, lsn in (self.slots or {}).items():
if name in ret:
if not ret[name]:
ret[name] = {}
if isinstance(ret[name], dict):
ret[name]['lsn'] = lsn
return ret

@property
def __permanent_physical_slots(self) -> Dict[str, Any]:
Expand All @@ -929,7 +937,6 @@ def get_replication_slots(self, my_name: str, role: str, nofailover: bool, major
Will log an error if:
* Conflicting slot names between members are found
* Any logical slots are disabled, due to version compatibility, and *show_error* is ``True``.
:param my_name: name of this node.
Expand All @@ -942,21 +949,9 @@ def get_replication_slots(self, my_name: str, role: str, nofailover: bool, major
:returns: final dictionary of slot names, after merging with permanent slots and performing sanity checks.
"""
slot_members: List[str] = self._get_slot_members(my_name, role)

slots: Dict[str, Dict[str, str]] = {slot_name_from_member_name(name): {'type': 'physical'}
for name in slot_members}

if len(slots) < len(slot_members):
# Find which names are conflicting for a nicer error message
slot_conflicts: Dict[str, List[str]] = defaultdict(list)
for name in slot_members:
slot_conflicts[slot_name_from_member_name(name)].append(name)
logger.error("Following cluster members share a replication slot name: %s",
"; ".join(f"{', '.join(v)} map to {k}"
for k, v in slot_conflicts.items() if len(v) > 1))

slots: Dict[str, Dict[str, str]] = self._get_members_slots(my_name, role)
permanent_slots: Dict[str, Any] = self._get_permanent_slots(is_standby_cluster, role, nofailover)

disabled_permanent_logical_slots: List[str] = self._merge_permanent_slots(
slots, permanent_slots, my_name, major_version)

Expand Down Expand Up @@ -1016,7 +1011,7 @@ def _merge_permanent_slots(slots: Dict[str, Dict[str, str]], permanent_slots: Di
return disabled_permanent_logical_slots

def _get_permanent_slots(self, is_standby_cluster: bool, role: str, nofailover: bool) -> Dict[str, Any]:
"""Get configured permanent slot names.
"""Get configured permanent replication slots.
.. note::
Permanent replication slots are only considered if ``use_slots`` configuration is enabled.
Expand All @@ -1042,35 +1037,48 @@ def _get_permanent_slots(self, is_standby_cluster: bool, role: str, nofailover:

return self.__permanent_slots if role in ('master', 'primary') else self.__permanent_logical_slots

def _get_slot_members(self, my_name: str, role: str) -> List[str]:
"""Get a list of member names that have replication slots sourcing from this node.
def _get_members_slots(self, my_name: str, role: str) -> Dict[str, Dict[str, str]]:
"""Get physical replication slots configuration for members that sourcing from this node.
If the ``replicatefrom`` tag is set on the member - we should not create the replication slot for it on
the current primary, because that member would replicate from elsewhere. We still create the slot if
the ``replicatefrom`` destination member is currently not a member of the cluster (fallback to the
primary), or if ``replicatefrom`` destination member happens to be the current primary.
Will log an error if:
* Conflicting slot names between members are found
:param my_name: name of this node.
:param role: role of this node, if this is a ``primary`` or ``standby_leader`` return list of members
replicating from this node. If not then return a list of members replicating as cascaded
replicas from this node.
:returns: list of member names.
:returns: dictionary of physical replication slots that should exist on a given node.
"""
if not self.use_slots:
return []
return {}

# we always want to exclude the member with our name from the list
members = filter(lambda m: m.name != my_name, self.members)

if role in ('master', 'primary', 'standby_leader'):
slot_members = [m.name for m in self.members
if m.name != my_name
and (m.replicatefrom is None
or m.replicatefrom == my_name
or not self.has_member(m.replicatefrom))]
members = [m for m in members if m.replicatefrom is None
or m.replicatefrom == my_name or not self.has_member(m.replicatefrom)]
else:
# only manage slots for replicas that replicate from this one, except for the leader among them
slot_members = [m.name for m in self.members
if m.replicatefrom == my_name and m.name != self.leader_name]
return slot_members
members = [m for m in members if m.replicatefrom == my_name and m.name != self.leader_name]

slots = {slot_name_from_member_name(m.name): {'type': 'physical'} for m in members}
if len(slots) < len(members):
# Find which names are conflicting for a nicer error message
slot_conflicts: Dict[str, List[str]] = defaultdict(list)
for member in members:
slot_conflicts[slot_name_from_member_name(member.name)].append(member.name)
logger.error("Following cluster members share a replication slot name: %s",
"; ".join(f"{', '.join(v)} map to {k}"
for k, v in slot_conflicts.items() if len(v) > 1))
return slots

def has_permanent_logical_slots(self, my_name: str, nofailover: bool, major_version: int = 110000) -> bool:
"""Check if the given member node has permanent ``logical`` replication slots configured.
Expand Down
21 changes: 10 additions & 11 deletions patroni/postgresql/slots.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def schedule_advance_slots(self, slots: Dict[str, Dict[str, int]]) -> Tuple[bool
self._advance = SlotsAdvanceThread(self)
return self._advance.schedule(slots)

def _ensure_logical_slots_replica(self, cluster: Cluster, slots: Dict[str, Any]) -> List[str]:
def _ensure_logical_slots_replica(self, slots: Dict[str, Any]) -> List[str]:
"""Update logical *slots* on replicas.
If the logical slot already exists, copy state information into the replication slots structure stored in the
Expand All @@ -444,7 +444,6 @@ class instance. Slots that exist are also advanced if their ``confirmed_flush_ls
As logical slots can only be created when the primary is available, pass the list of slots that need to be
copied back to the caller. They will be created on replicas with :meth:`SlotsHandler.copy_logical_slots`.
:param cluster: object containing stateful information for the cluster.
:param slots: A dictionary mapping slot name to slot attributes. This method only considers a slot
if the value is a dictionary with the key ``type`` and a value of ``logical``.
Expand All @@ -459,15 +458,16 @@ class instance. Slots that exist are also advanced if their ``confirmed_flush_ls
continue

# If the logical already exists, copy some information about it into the original structure
if self._replication_slots.get(name, {}).get('datoid'):
if name in self._replication_slots and compare_slots(value, self._replication_slots[name]):
self._copy_items(self._replication_slots[name], value)
if cluster.slots and name in cluster.slots:
if 'lsn' in value: # The slot has feedback in DCS
try: # Skip slots that don't need to be advanced
if value['confirmed_flush_lsn'] < int(cluster.slots[name]):
advance_slots[value['database']][name] = int(cluster.slots[name])
if value['confirmed_flush_lsn'] < int(value['lsn']):
advance_slots[value['database']][name] = int(value['lsn'])
except Exception as e:
logger.error('Failed to parse "%s": %r', cluster.slots[name], e)
elif cluster.slots and name in cluster.slots: # We want to copy only slots with feedback in a DCS
logger.error('Failed to parse "%s": %r', value['lsn'], e)
elif name not in self._replication_slots and 'lsn' in value:
# We want to copy only slots with feedback in a DCS
create_slots.append(name)

# Slots to be copied from the primary should be removed from the *slots* structure,
Expand Down Expand Up @@ -512,10 +512,9 @@ def sync_replication_slots(self, cluster: Cluster, nofailover: bool,
if self._postgresql.is_primary():
self._logical_slots_processing_queue.clear()
self._ensure_logical_slots_primary(slots)
elif cluster.slots and slots:
else:
self.check_logical_slots_readiness(cluster, replicatefrom)

ret = self._ensure_logical_slots_replica(cluster, slots)
ret = self._ensure_logical_slots_replica(slots)

self._replication_slots = slots
except Exception:
Expand Down
2 changes: 1 addition & 1 deletion tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def execute(self, sql, *params):
elif sql.startswith('SELECT slot_name, slot_type, datname, plugin, catalog_xmin'):
self.results = [('ls', 'logical', 'a', 'b', 100, 500, b'123456')]
elif sql.startswith('SELECT slot_name'):
self.results = [('blabla', 'physical'), ('foobar', 'physical'), ('ls', 'logical', 'a', 'b', 5, 100, 500)]
self.results = [('blabla', 'physical'), ('foobar', 'physical'), ('ls', 'logical', 'b', 'a', 5, 100, 500)]
elif sql.startswith('WITH slots AS (SELECT slot_name, active'):
self.results = [(False, True)] if self.rowcount == 1 else []
elif sql.startswith('SELECT CASE WHEN pg_catalog.pg_is_in_recovery()'):
Expand Down
5 changes: 3 additions & 2 deletions tests/test_slots.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ def setUp(self):
self.p._global_config = GlobalConfig({})
self.s = self.p.slots_handler
self.p.start()
config = ClusterConfig(1, {'slots': {'ls': {'database': 'a', 'plugin': 'b'}}}, 1)
config = ClusterConfig(1, {'slots': {'ls': {'database': 'a', 'plugin': 'b'}, 'ls2': None}}, 1)
self.cluster = Cluster(True, config, self.leader, 0, [self.me, self.other, self.leadermem],
None, SyncState.empty(), None, {'ls': 12345}, None)
None, SyncState.empty(), None, {'ls': 12345, 'ls2': 12345}, None)

def test_sync_replication_slots(self):
config = ClusterConfig(1, {'slots': {'test_3': {'database': 'a', 'plugin': 'b'},
Expand Down Expand Up @@ -123,6 +123,7 @@ def test__ensure_logical_slots_replica(self):
self.assertEqual(self.s.sync_replication_slots(self.cluster, False), ['ls'])
self.cluster.slots['ls'] = 'a'
self.assertEqual(self.s.sync_replication_slots(self.cluster, False), [])
self.cluster.config.data['slots']['ls']['database'] = 'b'
with patch.object(MockCursor, 'rowcount', PropertyMock(return_value=1), create=True):
self.assertEqual(self.s.sync_replication_slots(self.cluster, False), ['ls'])

Expand Down

0 comments on commit 19f20ec

Please sign in to comment.