Skip to content

Commit

Permalink
better logging if _handle_slots (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
munakoiso authored Nov 12, 2024
1 parent 0345dca commit 28c738f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 19 deletions.
12 changes: 9 additions & 3 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1175,15 +1175,21 @@ def _handle_slots(self):

# create slots
slot_names = [helpers.app_name_from_fqdn(fqdn) for fqdn in slot_lock_holders]
actual_replication_slots = self.db.get_replication_slots()
if actual_replication_slots is None:
logging.warning('Failed to get actual replication slots')
# However, we can continue here and try to create slots. None of slots will be dropped, but some might be created
else:
logging.debug('Actual replication slots: %s', actual_replication_slots)

if not self.db.replication_slots('create', slot_names):
if not self.db.create_replication_slots(slot_names, verbose=False):
logging.warning('Could not create replication slots. %s', slot_names)

# drop slots
if my_hostname in non_holders_hosts:
non_holders_hosts.remove(my_hostname)
slot_names_to_drop = [helpers.app_name_from_fqdn(fqdn) for fqdn in non_holders_hosts]
if not self.db.replication_slots('drop', slot_names_to_drop):
if not self.db.drop_replication_slots(slot_names_to_drop, verbose=False):
logging.warning('Could not drop replication slots. %s', slot_names_to_drop)

def _get_db_state(self):
Expand Down Expand Up @@ -1350,7 +1356,7 @@ def _promote_handle_slots(self):
return False
# Create replication slots, regardless of whether replicas hold DCS locks for replication slots.
hosts = [helpers.app_name_from_fqdn(fqdn) for fqdn in hosts]
if not self.db.replication_slots('create', hosts):
if not self.db.create_replication_slots(hosts):
logging.error('Could not create replication slots. Releasing the lock in ZK.')
return False

Expand Down
36 changes: 20 additions & 16 deletions src/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,17 @@ def _offline_detect_pgdata(self):
logging.error(line.rstrip())

@helpers.return_none_on_error
def _get_replication_slots(self):
def get_replication_slots(self):
res = self._exec_query('SELECT slot_name FROM pg_replication_slots;').fetchall()
return [i[0] for i in res]

def _create_replication_slot(self, slot_name):
logging.info('Creating slot %s.', slot_name)
query = f"SELECT pg_create_physical_replication_slot('{slot_name}', true)"
return self._exec_without_result(query)

def _drop_replication_slot(self, slot_name):
logging.info('Dropping slot %s.', slot_name)
query = f"SELECT pg_drop_replication_slot('{slot_name}')"
return self._exec_without_result(query)

Expand Down Expand Up @@ -736,24 +738,26 @@ def stop_postgresql(self, timeout=60):
logging.warning(line.rstrip())
return self._cmd_manager.stop_postgresql(timeout, self.pgdata)

def replication_slots(self, action, slots):
"""
Perform replication slots action (create/drop)
"""
current = self._get_replication_slots()
def create_replication_slots(self, slots, verbose=True):
current = self.get_replication_slots()
for slot in slots:
if action == 'create':
if current and slot in current:
if current and slot in current:
if verbose:
logging.debug('Slot %s already exists.', slot)
continue
if not self._create_replication_slot(slot):
return False
else:
if current is not None and slot not in current:
continue
if not self._create_replication_slot(slot):
return False
return True

def drop_replication_slots(self, slots, verbose=True):
current = self.get_replication_slots()
for slot in slots:
if current is not None and slot not in current:
if verbose:
logging.debug('Slot %s does not exist.', slot)
continue
if not self._drop_replication_slot(slot):
return False
continue
if not self._drop_replication_slot(slot):
return False
return True

def is_replaying_wal(self, check_time):
Expand Down

0 comments on commit 28c738f

Please sign in to comment.