Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove dump-dst-indexes, useless and add 0 statement timeout to creating indexes #629

Merged
merged 2 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions pgbelt/cmd/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from pgbelt.util.dump import apply_target_schema
from pgbelt.util.dump import create_target_indexes
from pgbelt.util.dump import dump_source_schema
from pgbelt.util.dump import dump_dst_create_index
from pgbelt.util.dump import remove_dst_not_valid_constraints
from pgbelt.util.dump import remove_dst_indexes
from pgbelt.util.logs import get_logger
Expand Down Expand Up @@ -69,17 +68,6 @@ async def remove_constraints(config_future: Awaitable[DbupgradeConfig]) -> None:
await remove_dst_not_valid_constraints(conf, logger)


@run_with_configs(skip_src=True)
async def dump_indexes(config_future: Awaitable[DbupgradeConfig]) -> None:
"""
Dumps the CREATE INDEX statements from the target database onto disk, in
the schemas directory.
"""
conf = await config_future
logger = get_logger(conf.db, conf.dc, "schema.dst")
await dump_dst_create_index(conf, logger)


@run_with_configs(skip_src=True)
async def remove_indexes(config_future: Awaitable[DbupgradeConfig]) -> None:
"""
Expand Down Expand Up @@ -123,7 +111,6 @@ async def create_indexes(config_future: Awaitable[DbupgradeConfig]) -> None:
load_schema,
load_constraints,
remove_constraints,
dump_indexes,
remove_indexes,
create_indexes,
]
52 changes: 4 additions & 48 deletions pgbelt/util/dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,53 +339,6 @@ async def apply_target_constraints(config: DbupgradeConfig, logger: Logger) -> N
)


async def dump_dst_create_index(config: DbupgradeConfig, logger: Logger) -> None:
"""
Dump CREATE INDEX statements from the target database.
Used when schema is loaded in outside of pgbelt.
"""

logger.info("Dumping target CREATE INDEX statements...")

command = [
"pg_dump",
"--schema-only",
"--no-owner",
"-n",
config.schema_name,
config.dst.pglogical_dsn,
]

out = await _execute_subprocess(command, "Retrieved target schema", logger)

# No username replacement needs to be done, so replace dst user with the same.
commands_raw = _parse_dump_commands(
out.decode("utf-8"), config.dst.owner_user.name, config.dst.owner_user.name
)

commands = []
for c in commands_raw:
if "CREATE" in command and "INDEX" in command:
regex_matches = search(
r"CREATE [UNIQUE ]*INDEX (?P<index>[a-zA-Z0-9._]+)+.*",
c,
)
if not regex_matches:
continue
commands.append(c)

try:
await makedirs(schema_dir(config.db, config.dc))
except FileExistsError:
pass

async with aopen(schema_file(config.db, config.dc, ONLY_INDEXES), "w") as out:
for command in commands:
await out.write(command)

logger.debug("Finished dumping CREATE INDEX statements from the target.")


async def remove_dst_indexes(config: DbupgradeConfig, logger: Logger) -> None:
"""
Remove the INDEXes from the schema of the target database.
Expand Down Expand Up @@ -451,7 +404,10 @@ async def create_target_indexes(
index = regex_matches.groupdict()["index"]

# Create the index
command = ["psql", config.dst.owner_dsn, "-c", f"{c};"]
# Note that the host DSN must have a statement timeout of 0.
# Example DSN: `host=server-hostname user=user dbname=db_name options='-c statement_timeout=3600000'`
host_dsn = config.dst.owner_dsn + " options='-c statement_timeout=0'"
command = ["psql", host_dsn, "-c", f"{c};"]
logger.info(f"Creating index {index} on the target...")
try:
await _execute_subprocess(
Expand Down