From 1147c4db9dd3a8af938a2466ac4526ee3d62e2ce Mon Sep 17 00:00:00 2001 From: munakoiso Date: Tue, 12 Nov 2024 17:59:30 +0500 Subject: [PATCH] MDB-32028: acquire_replication_source_slot_lock in all awaits (#48) --- src/main.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/main.py b/src/main.py index 8e1d5e5..7d95b42 100644 --- a/src/main.py +++ b/src/main.py @@ -644,7 +644,7 @@ def replica_return(self, db_state, zk_state): # Try to resume WAL replaying, it can be paused earlier self.db.pg_wal_replay_resume() - if not self._check_archive_recovery(limit) and not self._wait_for_streaming(holder, limit): + if not self._check_archive_recovery(holder, limit) and not self._wait_for_streaming(holder, limit): # Wal receiver is not running and # postgresql isn't in archive recovery # We should try to restart @@ -1068,7 +1068,7 @@ def _simple_primary_switch(self, limit, new_primary, is_dead): if self.db.start_postgresql() != 0: logging.error('Could not start PostgreSQL. Skipping it.') - if self._wait_for_recovery(limit) and self._check_archive_recovery(limit): + if self._wait_for_recovery(new_primary, limit) and self._check_archive_recovery(new_primary, limit): # # We have reached consistent state but there is a small # chance that we are not streaming changes from new primary @@ -1128,7 +1128,7 @@ def _attach_to_primary(self, new_primary, limit): if self.db.start_postgresql() != 0: logging.error('Could not start PostgreSQL. Skipping it.') - if not self._wait_for_recovery(limit): + if not self._wait_for_recovery(new_primary, limit): self.checks['primary_switch'] = 0 return None @@ -1196,6 +1196,7 @@ def _get_db_state(self): def _acquire_replication_source_slot_lock(self, source): if not self.config.getboolean('global', 'replication_slots_polling'): return + self.re_init_zk() # We need to drop the slot in the old primary. # But we don't know who the primary was (probably there are many of them). # So, we need to release the lock on all hosts. @@ -1566,13 +1567,14 @@ def _do_failover(self): self._replication_manager.leave_sync_group() return True - def _wait_for_recovery(self, limit=-1): + def _wait_for_recovery(self, new_primary, limit=-1): """ Stop until postgresql complete recovery. With limit=-1 the loop here can be infinite. """ def check_recovery_completion(): + self._acquire_replication_source_slot_lock(new_primary) is_db_alive, terminal_state = self.db.is_alive_and_in_terminal_state() if not terminal_state: logging.debug('PostgreSQL in nonterminal state.') @@ -1587,14 +1589,14 @@ def check_recovery_completion(): return helpers.await_for_value(check_recovery_completion, limit, "PostgreSQL has completed recovery") - def _check_archive_recovery(self, limit): + def _check_archive_recovery(self, new_primary, limit): """ Returns True if postgresql is in recovery from archive and False if it hasn't started recovery within `limit` seconds """ def check_recovery_start(): - if self._check_postgresql_streaming(): + if self._check_postgresql_streaming(new_primary): logging.debug('PostgreSQL is already streaming from primary') return True @@ -1630,7 +1632,8 @@ def _is_caught_up(replica_infos): return True return False - def _check_postgresql_streaming(self, primary=None): + def _check_postgresql_streaming(self, primary): + self._acquire_replication_source_slot_lock(primary) is_db_alive, terminal_state = self.db.is_alive_and_in_terminal_state() if not terminal_state: logging.debug('PostgreSQL in nonterminal state.') @@ -1664,7 +1667,6 @@ def _wait_for_streaming(self, primary, limit=-1): Stop until postgresql start streaming from primary. With limit=-1 the loop here can be infinite. """ - self._acquire_replication_source_slot_lock(primary) check_streaming = functools.partial(self._check_postgresql_streaming, primary) return helpers.await_for_value(check_streaming, limit, 'PostgreSQL started streaming from primary')