From 19f20ec2ebb574aefe78d14eafd17f02555217c4 Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Thu, 7 Sep 2023 12:56:07 +0200 Subject: [PATCH] Refactor replication slots handling (#2851) 1. make _get_members_slots() method return data in the same format as _get_permanent_slots() method 2. move conflicting name handling from get_replication_slots() to _get_members_slots() method 3. enrich structure returned by get_replication_slots() with the LSN of permanent logical slots reported by primary 4. use the added information in the SlotsHandler instead of fetching it from the Cluster.slots 5. bugfix: don't try to advance logical slot that doesn't match required configuration --- patroni/dcs/__init__.py | 68 +++++++++++++++++++++---------------- patroni/postgresql/slots.py | 21 ++++++------ tests/__init__.py | 2 +- tests/test_slots.py | 5 +-- 4 files changed, 52 insertions(+), 44 deletions(-) diff --git a/patroni/dcs/__init__.py b/patroni/dcs/__init__.py index d28a59ce3..4cef65ea2 100644 --- a/patroni/dcs/__init__.py +++ b/patroni/dcs/__init__.py @@ -903,8 +903,16 @@ def get_clone_member(self, exclude_name: str) -> Union[Member, Leader, None]: @property def __permanent_slots(self) -> Dict[str, Union[Dict[str, Any], Any]]: - """Dictionary of permanent replication slots.""" - return self.config and self.config.permanent_slots or {} + """Dictionary of permanent replication slots with their known LSN.""" + ret = deepcopy(self.config.permanent_slots if self.config else {}) + # If primary reported flush LSN for permanent slots we want to enrich our structure with it + for name, lsn in (self.slots or {}).items(): + if name in ret: + if not ret[name]: + ret[name] = {} + if isinstance(ret[name], dict): + ret[name]['lsn'] = lsn + return ret @property def __permanent_physical_slots(self) -> Dict[str, Any]: @@ -929,7 +937,6 @@ def get_replication_slots(self, my_name: str, role: str, nofailover: bool, major Will log an error if: - * Conflicting slot names between members are found * Any logical slots are disabled, due to version compatibility, and *show_error* is ``True``. :param my_name: name of this node. @@ -942,21 +949,9 @@ def get_replication_slots(self, my_name: str, role: str, nofailover: bool, major :returns: final dictionary of slot names, after merging with permanent slots and performing sanity checks. """ - slot_members: List[str] = self._get_slot_members(my_name, role) - - slots: Dict[str, Dict[str, str]] = {slot_name_from_member_name(name): {'type': 'physical'} - for name in slot_members} - - if len(slots) < len(slot_members): - # Find which names are conflicting for a nicer error message - slot_conflicts: Dict[str, List[str]] = defaultdict(list) - for name in slot_members: - slot_conflicts[slot_name_from_member_name(name)].append(name) - logger.error("Following cluster members share a replication slot name: %s", - "; ".join(f"{', '.join(v)} map to {k}" - for k, v in slot_conflicts.items() if len(v) > 1)) - + slots: Dict[str, Dict[str, str]] = self._get_members_slots(my_name, role) permanent_slots: Dict[str, Any] = self._get_permanent_slots(is_standby_cluster, role, nofailover) + disabled_permanent_logical_slots: List[str] = self._merge_permanent_slots( slots, permanent_slots, my_name, major_version) @@ -1016,7 +1011,7 @@ def _merge_permanent_slots(slots: Dict[str, Dict[str, str]], permanent_slots: Di return disabled_permanent_logical_slots def _get_permanent_slots(self, is_standby_cluster: bool, role: str, nofailover: bool) -> Dict[str, Any]: - """Get configured permanent slot names. + """Get configured permanent replication slots. .. note:: Permanent replication slots are only considered if ``use_slots`` configuration is enabled. @@ -1042,35 +1037,48 @@ def _get_permanent_slots(self, is_standby_cluster: bool, role: str, nofailover: return self.__permanent_slots if role in ('master', 'primary') else self.__permanent_logical_slots - def _get_slot_members(self, my_name: str, role: str) -> List[str]: - """Get a list of member names that have replication slots sourcing from this node. + def _get_members_slots(self, my_name: str, role: str) -> Dict[str, Dict[str, str]]: + """Get physical replication slots configuration for members that sourcing from this node. If the ``replicatefrom`` tag is set on the member - we should not create the replication slot for it on the current primary, because that member would replicate from elsewhere. We still create the slot if the ``replicatefrom`` destination member is currently not a member of the cluster (fallback to the primary), or if ``replicatefrom`` destination member happens to be the current primary. + Will log an error if: + + * Conflicting slot names between members are found + :param my_name: name of this node. :param role: role of this node, if this is a ``primary`` or ``standby_leader`` return list of members replicating from this node. If not then return a list of members replicating as cascaded replicas from this node. - :returns: list of member names. + :returns: dictionary of physical replication slots that should exist on a given node. """ if not self.use_slots: - return [] + return {} + + # we always want to exclude the member with our name from the list + members = filter(lambda m: m.name != my_name, self.members) if role in ('master', 'primary', 'standby_leader'): - slot_members = [m.name for m in self.members - if m.name != my_name - and (m.replicatefrom is None - or m.replicatefrom == my_name - or not self.has_member(m.replicatefrom))] + members = [m for m in members if m.replicatefrom is None + or m.replicatefrom == my_name or not self.has_member(m.replicatefrom)] else: # only manage slots for replicas that replicate from this one, except for the leader among them - slot_members = [m.name for m in self.members - if m.replicatefrom == my_name and m.name != self.leader_name] - return slot_members + members = [m for m in members if m.replicatefrom == my_name and m.name != self.leader_name] + + slots = {slot_name_from_member_name(m.name): {'type': 'physical'} for m in members} + if len(slots) < len(members): + # Find which names are conflicting for a nicer error message + slot_conflicts: Dict[str, List[str]] = defaultdict(list) + for member in members: + slot_conflicts[slot_name_from_member_name(member.name)].append(member.name) + logger.error("Following cluster members share a replication slot name: %s", + "; ".join(f"{', '.join(v)} map to {k}" + for k, v in slot_conflicts.items() if len(v) > 1)) + return slots def has_permanent_logical_slots(self, my_name: str, nofailover: bool, major_version: int = 110000) -> bool: """Check if the given member node has permanent ``logical`` replication slots configured. diff --git a/patroni/postgresql/slots.py b/patroni/postgresql/slots.py index 51a8fc5ad..7391f5438 100644 --- a/patroni/postgresql/slots.py +++ b/patroni/postgresql/slots.py @@ -434,7 +434,7 @@ def schedule_advance_slots(self, slots: Dict[str, Dict[str, int]]) -> Tuple[bool self._advance = SlotsAdvanceThread(self) return self._advance.schedule(slots) - def _ensure_logical_slots_replica(self, cluster: Cluster, slots: Dict[str, Any]) -> List[str]: + def _ensure_logical_slots_replica(self, slots: Dict[str, Any]) -> List[str]: """Update logical *slots* on replicas. If the logical slot already exists, copy state information into the replication slots structure stored in the @@ -444,7 +444,6 @@ class instance. Slots that exist are also advanced if their ``confirmed_flush_ls As logical slots can only be created when the primary is available, pass the list of slots that need to be copied back to the caller. They will be created on replicas with :meth:`SlotsHandler.copy_logical_slots`. - :param cluster: object containing stateful information for the cluster. :param slots: A dictionary mapping slot name to slot attributes. This method only considers a slot if the value is a dictionary with the key ``type`` and a value of ``logical``. @@ -459,15 +458,16 @@ class instance. Slots that exist are also advanced if their ``confirmed_flush_ls continue # If the logical already exists, copy some information about it into the original structure - if self._replication_slots.get(name, {}).get('datoid'): + if name in self._replication_slots and compare_slots(value, self._replication_slots[name]): self._copy_items(self._replication_slots[name], value) - if cluster.slots and name in cluster.slots: + if 'lsn' in value: # The slot has feedback in DCS try: # Skip slots that don't need to be advanced - if value['confirmed_flush_lsn'] < int(cluster.slots[name]): - advance_slots[value['database']][name] = int(cluster.slots[name]) + if value['confirmed_flush_lsn'] < int(value['lsn']): + advance_slots[value['database']][name] = int(value['lsn']) except Exception as e: - logger.error('Failed to parse "%s": %r', cluster.slots[name], e) - elif cluster.slots and name in cluster.slots: # We want to copy only slots with feedback in a DCS + logger.error('Failed to parse "%s": %r', value['lsn'], e) + elif name not in self._replication_slots and 'lsn' in value: + # We want to copy only slots with feedback in a DCS create_slots.append(name) # Slots to be copied from the primary should be removed from the *slots* structure, @@ -512,10 +512,9 @@ def sync_replication_slots(self, cluster: Cluster, nofailover: bool, if self._postgresql.is_primary(): self._logical_slots_processing_queue.clear() self._ensure_logical_slots_primary(slots) - elif cluster.slots and slots: + else: self.check_logical_slots_readiness(cluster, replicatefrom) - - ret = self._ensure_logical_slots_replica(cluster, slots) + ret = self._ensure_logical_slots_replica(slots) self._replication_slots = slots except Exception: diff --git a/tests/__init__.py b/tests/__init__.py index 79ef02297..68961af72 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -104,7 +104,7 @@ def execute(self, sql, *params): elif sql.startswith('SELECT slot_name, slot_type, datname, plugin, catalog_xmin'): self.results = [('ls', 'logical', 'a', 'b', 100, 500, b'123456')] elif sql.startswith('SELECT slot_name'): - self.results = [('blabla', 'physical'), ('foobar', 'physical'), ('ls', 'logical', 'a', 'b', 5, 100, 500)] + self.results = [('blabla', 'physical'), ('foobar', 'physical'), ('ls', 'logical', 'b', 'a', 5, 100, 500)] elif sql.startswith('WITH slots AS (SELECT slot_name, active'): self.results = [(False, True)] if self.rowcount == 1 else [] elif sql.startswith('SELECT CASE WHEN pg_catalog.pg_is_in_recovery()'): diff --git a/tests/test_slots.py b/tests/test_slots.py index add0fdf79..a35d465c7 100644 --- a/tests/test_slots.py +++ b/tests/test_slots.py @@ -32,9 +32,9 @@ def setUp(self): self.p._global_config = GlobalConfig({}) self.s = self.p.slots_handler self.p.start() - config = ClusterConfig(1, {'slots': {'ls': {'database': 'a', 'plugin': 'b'}}}, 1) + config = ClusterConfig(1, {'slots': {'ls': {'database': 'a', 'plugin': 'b'}, 'ls2': None}}, 1) self.cluster = Cluster(True, config, self.leader, 0, [self.me, self.other, self.leadermem], - None, SyncState.empty(), None, {'ls': 12345}, None) + None, SyncState.empty(), None, {'ls': 12345, 'ls2': 12345}, None) def test_sync_replication_slots(self): config = ClusterConfig(1, {'slots': {'test_3': {'database': 'a', 'plugin': 'b'}, @@ -123,6 +123,7 @@ def test__ensure_logical_slots_replica(self): self.assertEqual(self.s.sync_replication_slots(self.cluster, False), ['ls']) self.cluster.slots['ls'] = 'a' self.assertEqual(self.s.sync_replication_slots(self.cluster, False), []) + self.cluster.config.data['slots']['ls']['database'] = 'b' with patch.object(MockCursor, 'rowcount', PropertyMock(return_value=1), create=True): self.assertEqual(self.s.sync_replication_slots(self.cluster, False), ['ls'])