Skip to content

Commit

Permalink
fix: stop using default replication set (#371)
Browse files Browse the repository at this point in the history
* fix: stop using default replication set

* fix: update teardown to drop 'pgbelt' replication setl

* fix: typos

* fix: typo again

* fix: forgot to get postgres conn from pool

* fix: another typo

* fix: didn't include new replication set in subscription call
  • Loading branch information
vjeeva authored Feb 2, 2024
1 parent c2fe3eb commit cfa6276
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions pgbelt/util/pglogical.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,25 @@ async def configure_replication_set(
"""
Add each table in the given list to the default replication set
"""
logger.info(f"Configuring default replication set with tables: {tables}")
logger.info("Creating new replication set 'pgbelt'")
async with pool.acquire() as conn:
try:
await conn.execute("SELECT pglogical.create_replication_set('pgbelt');")
logger.debug("Created the 'pgbelt' replication set")
except Exception as e:
logger.debug(f"Could not create replication set 'pgbelt': {e}")

logger.info(f"Configuring 'pgbelt' replication set with tables: {tables}")
for table in tables:
async with pool.acquire() as conn:
async with conn.transaction():
try:
await conn.execute(
f"SELECT pglogical.replication_set_add_table('default', '\"{table}\"');"
f"SELECT pglogical.replication_set_add_table('pgbelt', '\"{table}\"');"
)
logger.debug(f"{table} added to default replication set")
logger.debug(f"Table '{table}' added to 'pgbelt' replication set")
except UniqueViolationError:
logger.debug(f"{table} already in default replication set")
logger.debug(f"Table '{table}' already in 'pgbelt' replication set")


async def configure_node(pool: Pool, name: str, dsn: str, logger: Logger) -> None:
Expand Down Expand Up @@ -145,6 +153,7 @@ async def configure_subscription(
await conn.execute(
f"""SELECT pglogical.create_subscription(
subscription_name:='{name}',
replication_sets:='{{pgbelt}}',
provider_dsn:='{provider_dsn}',
synchronize_structure:=false,
synchronize_data:={'true' if name.startswith('pg1') else 'false'},
Expand Down Expand Up @@ -197,8 +206,8 @@ async def teardown_replication_set(pool: Pool, logger: Logger) -> None:
async with pool.acquire() as conn:
async with conn.transaction():
try:
await conn.execute("SELECT pglogical.drop_replication_set('default');")
logger.debug("Replication set 'default' dropped")
await conn.execute("SELECT pglogical.drop_replication_set('pgbelt');")
logger.debug("Replication set 'pgbelt' dropped")
except (
InvalidSchemaNameError,
UndefinedFunctionError,
Expand Down

0 comments on commit cfa6276

Please sign in to comment.