Skip to content

Commit

Permalink
MDB-32028: acquire_replication_source_slot_lock in all awaits (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
munakoiso authored Nov 12, 2024
1 parent d2420d1 commit 1147c4d
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ def replica_return(self, db_state, zk_state):
# Try to resume WAL replaying, it can be paused earlier
self.db.pg_wal_replay_resume()

if not self._check_archive_recovery(limit) and not self._wait_for_streaming(holder, limit):
if not self._check_archive_recovery(holder, limit) and not self._wait_for_streaming(holder, limit):
# Wal receiver is not running and
# postgresql isn't in archive recovery
# We should try to restart
Expand Down Expand Up @@ -1068,7 +1068,7 @@ def _simple_primary_switch(self, limit, new_primary, is_dead):
if self.db.start_postgresql() != 0:
logging.error('Could not start PostgreSQL. Skipping it.')

if self._wait_for_recovery(limit) and self._check_archive_recovery(limit):
if self._wait_for_recovery(new_primary, limit) and self._check_archive_recovery(new_primary, limit):
#
# We have reached consistent state but there is a small
# chance that we are not streaming changes from new primary
Expand Down Expand Up @@ -1128,7 +1128,7 @@ def _attach_to_primary(self, new_primary, limit):
if self.db.start_postgresql() != 0:
logging.error('Could not start PostgreSQL. Skipping it.')

if not self._wait_for_recovery(limit):
if not self._wait_for_recovery(new_primary, limit):
self.checks['primary_switch'] = 0
return None

Expand Down Expand Up @@ -1196,6 +1196,7 @@ def _get_db_state(self):
def _acquire_replication_source_slot_lock(self, source):
if not self.config.getboolean('global', 'replication_slots_polling'):
return
self.re_init_zk()
# We need to drop the slot in the old primary.
# But we don't know who the primary was (probably there are many of them).
# So, we need to release the lock on all hosts.
Expand Down Expand Up @@ -1566,13 +1567,14 @@ def _do_failover(self):
self._replication_manager.leave_sync_group()
return True

def _wait_for_recovery(self, limit=-1):
def _wait_for_recovery(self, new_primary, limit=-1):
"""
Stop until postgresql complete recovery.
With limit=-1 the loop here can be infinite.
"""

def check_recovery_completion():
self._acquire_replication_source_slot_lock(new_primary)
is_db_alive, terminal_state = self.db.is_alive_and_in_terminal_state()
if not terminal_state:
logging.debug('PostgreSQL in nonterminal state.')
Expand All @@ -1587,14 +1589,14 @@ def check_recovery_completion():

return helpers.await_for_value(check_recovery_completion, limit, "PostgreSQL has completed recovery")

def _check_archive_recovery(self, limit):
def _check_archive_recovery(self, new_primary, limit):
"""
Returns True if postgresql is in recovery from archive
and False if it hasn't started recovery within `limit` seconds
"""

def check_recovery_start():
if self._check_postgresql_streaming():
if self._check_postgresql_streaming(new_primary):
logging.debug('PostgreSQL is already streaming from primary')
return True

Expand Down Expand Up @@ -1630,7 +1632,8 @@ def _is_caught_up(replica_infos):
return True
return False

def _check_postgresql_streaming(self, primary=None):
def _check_postgresql_streaming(self, primary):
self._acquire_replication_source_slot_lock(primary)
is_db_alive, terminal_state = self.db.is_alive_and_in_terminal_state()
if not terminal_state:
logging.debug('PostgreSQL in nonterminal state.')
Expand Down Expand Up @@ -1664,7 +1667,6 @@ def _wait_for_streaming(self, primary, limit=-1):
Stop until postgresql start streaming from primary.
With limit=-1 the loop here can be infinite.
"""
self._acquire_replication_source_slot_lock(primary)
check_streaming = functools.partial(self._check_postgresql_streaming, primary)
return helpers.await_for_value(check_streaming, limit, 'PostgreSQL started streaming from primary')

Expand Down

0 comments on commit 1147c4d

Please sign in to comment.