Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: support for table names with capitals #422

Merged
merged 3 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions pgbelt/cmd/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,23 @@ async def _setup_src_node(

pglogical_tables = pkey_tables
if conf.tables:
pglogical_tables = [t for t in pkey_tables if t in conf.tables]
pglogical_tables = [
t
for t in pkey_tables
if t
in list(
map(str.lower, conf.tables)
) # Postgres returns table names in lowercase (in analyze_table_pkeys)
]

# Intentionally throw an error if no tables are found, so that the user can correct their config.
# When reported by a certain user, errors showed when running the status command, but it was ignored,
# then the user ran setup and since that DIDN'T throw an error, they assumed everything was fine.

if not pglogical_tables:
raise ValueError(
f"No tables were targeted to replicate. Please check your config's schema and tables. DB: {conf.db} DC: {conf.dc}, SCHEMA: {conf.schema_name} TABLES: {conf.tables}.\nIf TABLES is [], all tables in the schema should be replicated, but pgbelt still found no tables.\nCheck the schema name or reach out to the pgbelt team for help."
)

await configure_replication_set(
src_root_pool, pglogical_tables, conf.schema_name, src_logger
Expand Down Expand Up @@ -145,7 +161,14 @@ async def setup_back_replication(config_future: Awaitable[DbupgradeConfig]) -> N

pglogical_tables = pkeys
if conf.tables:
pglogical_tables = [t for t in pkeys if t in conf.tables]
pglogical_tables = [
t
for t in pkeys
if t
in list(
map(str.lower, conf.tables)
) # Postgres returns table names in lowercase (in analyze_table_pkeys)
]

await configure_replication_set(
dst_root_pool, pglogical_tables, conf.schema_name, dst_logger
Expand Down
9 changes: 8 additions & 1 deletion pgbelt/cmd/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,14 @@ async def status(conf_future: Awaitable[DbupgradeConfig]) -> dict[str, str]:
all_tables = pkey_tables + non_pkey_tables
target_tables = all_tables
if conf.tables:
target_tables = [t for t in all_tables if t in conf.tables]
target_tables = [
t
for t in all_tables
if t
in list(
map(str.lower, conf.tables)
) # Postgres gave us lowercase table names in analyze_table_pkeys
]

if not target_tables:
raise ValueError(
Expand Down
18 changes: 16 additions & 2 deletions pgbelt/cmd/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,14 @@ async def dump_tables(
_, tables, _ = await analyze_table_pkeys(src_pool, conf.schema_name, logger)

if conf.tables:
tables = [t for t in tables if t in conf.tables]
tables = [
t
for t in tables
if t
in list(
map(str.lower, conf.tables)
) # Postgres returns table names in lowercase (in analyze_table_pkeys)
]

await dump_source_tables(conf, tables, logger)

Expand Down Expand Up @@ -185,7 +192,14 @@ async def _dump_and_load_all_tables(
) -> None:
_, tables, _ = await analyze_table_pkeys(src_pool, conf.schema_name, src_logger)
if conf.tables:
tables = [t for t in tables if t in conf.tables]
tables = [
t
for t in tables
if t
in list(
map(str.lower, conf.tables)
) # Postgres returns table names in lowercase (in analyze_table_pkeys)
]
await dump_source_tables(conf, tables, src_logger)
await load_dumped_tables(conf, tables, dst_logger)

Expand Down
46 changes: 41 additions & 5 deletions pgbelt/util/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,16 @@ async def compare_data(
dst_old_extra_float_digits = await dst_pool.fetchval("SHOW extra_float_digits;")
await dst_pool.execute("SET extra_float_digits TO 0;")

has_run = False
for table in set(pkeys):
# If specific table list is defined and iterated table is not in that list, skip.
if tables and (table not in tables):
# If specific table list is defined and the iterated table is not in that list, skip.
# Note that the pkeys tables returned from Postgres are all lowercased, so we need to
# map the passed conf tables to lowercase.
if tables and (table not in list(map(str.lower, tables))):
continue

has_run = True # If this runs, we have at least one table to compare. We will use this flag to throw an error if no tables are found.

full_table_name = f"{schema}.{table}"

logger.debug(f"Validating table {full_table_name}...")
Expand Down Expand Up @@ -171,6 +177,13 @@ async def compare_data(
f"Dest Row: {dst_row}"
)

# Just a paranoia check. If this throws, then it's possible pgbelt didn't migrate any data.
# This was found in issue #420, and previous commands threw errors before this issue could arise.
if not has_run:
raise ValueError(
"No tables were found to compare. Please reach out to the pgbelt for help, and check if your data was migrated."
)

await src_pool.execute(f"SET extra_float_digits TO {src_old_extra_float_digits};")
await dst_pool.execute(f"SET extra_float_digits TO {dst_old_extra_float_digits};")
logger.info(
Expand Down Expand Up @@ -372,9 +385,22 @@ async def precheck_info(
AND n.nspname <> 'pglogical'
ORDER BY 1,2;"""
)

# We filter the table list if the user has specified a list of tables to target.
# Note, from issue #420, the above query will return the table names in lowercase,
# so we need to map the target_tables to lowercase.
if target_tables:
result["tables"] = [t for t in result["tables"] if t["Name"] in target_tables]

result["tables"] = [
t
for t in result["tables"]
if t["Name"] in list(map(str.lower, target_tables))
]

# We will not recapitalize the table names in the result["tables"] list,
# to preserve how Postgres sees those tables in its system catalog. Easy
# rabbit hole later if we keep patching the table names to match the user's
# input.

result["sequences"] = await pool.fetch(
"""
Expand All @@ -392,12 +418,22 @@ async def precheck_info(
ORDER BY 1,2;"""
)

# We filter the sequence list if the user has specified a list of sequences to target.
# We filter the table list if the user has specified a list of tables to target.
# Note, from issue #420, the above query will return the table names in lowercase,
# so we need to map the target_tables to lowercase.
if target_sequences:

result["sequences"] = [
s for s in result["sequences"] if s["Name"] in target_sequences
t
for t in result["sequences"]
if t["Name"] in list(map(str.lower, target_sequences))
]

# We will not recapitalize the table names in the result["tables"] list,
# to preserve how Postgres sees those tables in its system catalog. Easy
# rabbit hole later if we keep patching the table names to match the user's
# input.

users = await pool.fetch(
f"""
SELECT r.rolname, r.rolsuper, r.rolinherit,
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async def _create_dbupgradeconfigs() -> dict[str, DbupgradeConfig]:
db_upgrade_config_kwargs["schema_name"] = (
"non_public_schema" if "nonpublic" in s else "public"
)
db_upgrade_config_kwargs["tables"] = ["users"] if "exodus" in s else None
db_upgrade_config_kwargs["tables"] = ["UsersCapital"] if "exodus" in s else None
db_upgrade_config_kwargs["sequences"] = (
["users_id_seq"] if "exodus" in s else None
)
Expand Down
40 changes: 20 additions & 20 deletions tests/integration/files/test_schema_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ CREATE TABLE public.fruits (
ALTER TABLE public.fruits OWNER TO owner;

--
-- Name: users; Type: TABLE; Schema: public; Owner: owner
-- Name: UsersCapital; Type: TABLE; Schema: public; Owner: owner
--

CREATE TABLE public.Users (
CREATE TABLE public.UsersCapital (
id bigint NOT NULL,
hash_firstname text NOT NULL,
hash_lastname text NOT NULL,
Expand All @@ -23,13 +23,13 @@ CREATE TABLE public.Users (
);


ALTER TABLE public.Users OWNER TO owner;
ALTER TABLE public.UsersCapital OWNER TO owner;

--
-- Name: users2; Type: TABLE; Schema: public; Owner: owner
-- Name: UsersCapital2; Type: TABLE; Schema: public; Owner: owner
--

CREATE TABLE public.Users2 (
CREATE TABLE public.UsersCapital2 (
id bigint NOT NULL,
hash_firstname text NOT NULL,
hash_lastname text NOT NULL,
Expand All @@ -38,13 +38,13 @@ CREATE TABLE public.Users2 (
);


ALTER TABLE public.Users2 OWNER TO owner;
ALTER TABLE public.UsersCapital2 OWNER TO owner;

--
-- Name: users_idx; Type: INDEX; Schema: public; Owner: owner
--

CREATE INDEX users_idx ON public.Users (
CREATE INDEX users_idx ON public.UsersCapital (
hash_firstname,
hash_lastname
);
Expand All @@ -53,24 +53,24 @@ CREATE INDEX users_idx ON public.Users (
-- Name: users2_idx; Type: INDEX; Schema: public; Owner: owner
--

CREATE INDEX users2_idx ON public.Users (
CREATE INDEX users2_idx ON public.UsersCapital (
hash_firstname,
hash_lastname
);

--
-- Name: users_id_seq; Type: SEQUENCE; Schema: public; Owner: owner
-- Name: userS_id_seq; Type: SEQUENCE; Schema: public; Owner: owner
--

CREATE SEQUENCE public.users_id_seq
CREATE SEQUENCE public.userS_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;


ALTER TABLE public.users_id_seq OWNER TO owner;
ALTER TABLE public.userS_id_seq OWNER TO owner;

--
-- Name: users2_id_seq; Type: SEQUENCE; Schema: public; Owner: owner
Expand All @@ -96,10 +96,10 @@ INSERT INTO public.fruits (id, name)
(4, 'grape');

--
-- Data for Name: Users; Type: TABLE DATA; Schema: public; Owner: owner
-- Data for Name: UsersCapital; Type: TABLE DATA; Schema: public; Owner: owner
--

INSERT INTO public.users (id, hash_firstname, hash_lastname, gender)
INSERT INTO public.UsersCapital (id, hash_firstname, hash_lastname, gender)
VALUES (1, 'garbagefirst', 'garbagelast', 'male'),
(2, 'garbagefirst1', 'garbagelast1', 'female'),
(3, 'sdgarbagefirst', 'dgsadsrbagelast', 'male'),
Expand All @@ -111,7 +111,7 @@ INSERT INTO public.users (id, hash_firstname, hash_lastname, gender)
-- Data for Name: Users2; Type: TABLE DATA; Schema: public; Owner: owner
--

INSERT INTO public.users2 (id, hash_firstname, hash_lastname, gender)
INSERT INTO public.UsersCapital2 (id, hash_firstname, hash_lastname, gender)
VALUES (1, 'garbagefirst', 'garbagelast', 'male'),
(2, 'garbagefirst1', 'garbagelast1', 'female'),
(3, 'sdgarbagefirst', 'dgsadsrbagelast', 'male'),
Expand All @@ -120,10 +120,10 @@ INSERT INTO public.users2 (id, hash_firstname, hash_lastname, gender)


--
-- Name: users_id_seq; Type: SEQUENCE SET; Schema: public; Owner: owner
-- Name: userS_id_seq; Type: SEQUENCE SET; Schema: public; Owner: owner
--

SELECT pg_catalog.setval('public.users_id_seq', 1, false);
SELECT pg_catalog.setval('public.userS_id_seq', 1, false);


--
Expand All @@ -134,16 +134,16 @@ SELECT pg_catalog.setval('public.users2_id_seq', 1, false);


--
-- Name: Users users_pkey; Type: CONSTRAINT; Schema: public; Owner: owner
-- Name: UsersCapital users_pkey; Type: CONSTRAINT; Schema: public; Owner: owner
--

ALTER TABLE ONLY public.Users
ALTER TABLE ONLY public.UsersCapital
ADD CONSTRAINT users_pkey PRIMARY KEY (id);


--
-- Name: Users users_pkey; Type: CONSTRAINT; Schema: public; Owner: owner
-- Name: UsersCapital users_pkey; Type: CONSTRAINT; Schema: public; Owner: owner
--

ALTER TABLE ONLY public.Users2
ALTER TABLE ONLY public.UsersCapital2
ADD CONSTRAINT users2_pkey PRIMARY KEY (id);
Loading