From d201671a08097f9030a5e5eb57ca46cad5a517d5 Mon Sep 17 00:00:00 2001 From: mkaruza Date: Tue, 21 May 2024 17:34:57 +0200 Subject: [PATCH] [columnar] Fix for case sensitive schema names (#264) (#265) * We should be able to convert case sensitive schema name and table names. For case sensitive names function argument for `alter_table_set_access_method` should be like "Schema"."Table". --- .../src/backend/columnar/columnar.control | 2 +- .../sql/columnar--11.1-11--11.1-12.sql | 1 + .../alter_table_set_access_method/11.1-12.sql | 259 ++++++++++++++++++ .../alter_table_set_access_method/latest.sql | 22 +- ...columnar_alter_table_set_access_method.out | 118 ++++++++ ...columnar_alter_table_set_access_method.sql | 63 ++++- 6 files changed, 456 insertions(+), 9 deletions(-) create mode 100644 columnar/src/backend/columnar/sql/columnar--11.1-11--11.1-12.sql create mode 100644 columnar/src/backend/columnar/sql/udfs/alter_table_set_access_method/11.1-12.sql diff --git a/columnar/src/backend/columnar/columnar.control b/columnar/src/backend/columnar/columnar.control index 3d2e779..e5439eb 100644 --- a/columnar/src/backend/columnar/columnar.control +++ b/columnar/src/backend/columnar/columnar.control @@ -1,4 +1,4 @@ comment = 'Hydra Columnar extension' -default_version = '11.1-11' +default_version = '11.1-12' module_pathname = '$libdir/columnar' relocatable = false diff --git a/columnar/src/backend/columnar/sql/columnar--11.1-11--11.1-12.sql b/columnar/src/backend/columnar/sql/columnar--11.1-11--11.1-12.sql new file mode 100644 index 0000000..b16b986 --- /dev/null +++ b/columnar/src/backend/columnar/sql/columnar--11.1-11--11.1-12.sql @@ -0,0 +1 @@ +#include "udfs/alter_table_set_access_method/11.1-12.sql" \ No newline at end of file diff --git a/columnar/src/backend/columnar/sql/udfs/alter_table_set_access_method/11.1-12.sql b/columnar/src/backend/columnar/sql/udfs/alter_table_set_access_method/11.1-12.sql new file mode 100644 index 0000000..221943b --- /dev/null +++ b/columnar/src/backend/columnar/sql/udfs/alter_table_set_access_method/11.1-12.sql @@ -0,0 +1,259 @@ +CREATE OR REPLACE FUNCTION columnar.alter_table_set_access_method(t TEXT, method TEXT) + RETURNS BOOLEAN LANGUAGE plpgsql +AS $func$ + +DECLARE + + tbl_exists BOOLEAN; + tbl_schema TEXT = 'public'; + tbl_name TEXT; + tbl_array TEXT[] = (parse_ident(t)); + tbl_oid INT; + tbl_am_oid INT; + temp_tbl_name TEXT; + + is_case_sensitive BOOLEAN; + tbl_name_original TEXT; + tbl_schema_original TEXT; + + trigger_list_definition TEXT[]; + trigger TEXT; + + index_list_definition TEXT[]; + idx TEXT; + + saved_search_path TEXT; + + constraint_list_name_and_definition TEXT[]; + constraint_name_and_definition TEXT; + constraint_name_and_definition_split TEXT[]; + +BEGIN + CASE + WHEN CARDINALITY(tbl_array) = 1 THEN + SELECT tbl_array[1] INTO tbl_name; + WHEN CARDINALITY(tbl_array) = 2 THEN + SELECT tbl_array[1] INTO tbl_schema; + SELECT tbl_array[2] INTO tbl_name; + ELSE + RAISE WARNING 'Argument should provided as table or schema.table.'; + RETURN 0; + END CASE; + + -- Allow only convert to columnar / heap access method + + IF method NOT IN ('columnar', 'heap') THEN + RAISE WARNING 'Cannot convert table: Allowed access methods are heap and columnar.'; + RETURN 0; + END IF; + + -- Check if table exists + + SELECT EXISTS + (SELECT FROM pg_catalog.pg_tables WHERE schemaname = tbl_schema AND tablename = tbl_name) + INTO tbl_exists; + + IF tbl_exists = False THEN + RAISE WARNING 'Table %.% does not exist.', tbl_schema, tbl_name; + RETURN 0; + END IF; + + -- Case senstivitiy + + SELECT EXISTS (SELECT regexp_matches(tbl_name,'[A-Z]')) INTO is_case_sensitive; + + SELECT tbl_name INTO tbl_name_original; + + IF is_case_sensitive = True THEN + SELECT quote_ident(tbl_name) INTO tbl_name; + END IF; + + SELECT EXISTS (SELECT regexp_matches(tbl_schema,'[A-Z]')) INTO is_case_sensitive; + + SELECT tbl_schema INTO tbl_schema_original; + + IF is_case_sensitive = True THEN + SELECT quote_ident(tbl_schema) INTO tbl_schema; + END IF; + + SELECT current_setting('search_path') INTO saved_search_path; + + EXECUTE FORMAT('SET search_path TO %s'::text, tbl_schema); + + -- Get table OID + + EXECUTE FORMAT('SELECT %L::regclass::oid'::text, tbl_schema || '.' || tbl_name) INTO tbl_oid; + + -- Get table AM oid + + SELECT relam FROM pg_class WHERE oid = tbl_oid INTO tbl_am_oid; + + -- Check that table is heap or columnar + + IF (tbl_am_oid != (SELECT oid FROM pg_am WHERE amname = 'columnar')) AND + (tbl_am_oid != (SELECT oid FROM pg_am WHERE amname = 'heap')) THEN + RAISE WARNING 'Cannot convert table: table %.% is not heap or colummnar', tbl_schema, tbl_name; + RETURN 0; + END IF; + + -- Check that we can convert only from 'heap' to 'columnar' and vice versa + + IF tbl_am_oid = (SELECT oid FROM pg_am WHERE amname = method) THEN + RAISE WARNING 'Cannot convert table: conversion to same access method.'; + RETURN 0; + END IF; + + -- Check if table has FOREIGN KEY + + IF (SELECT COUNT(1) FROM pg_constraint WHERE contype = 'f' AND conrelid = tbl_oid) > 0 THEN + RAISE WARNING 'Cannot convert table: table %.% has a FOREIGN KEY constraint.', tbl_schema, tbl_name; + RETURN 0; + END IF; + + -- Check if table is REFERENCED by FOREIGN KEY + + IF (SELECT COUNT(1) FROM pg_constraint WHERE contype = 'f' AND confrelid = tbl_oid) > 0 THEN + RAISE WARNING 'Cannot convert table: table %.% is referenced by FOREIGN KEY.', tbl_schema, tbl_name; + RETURN 0; + END IF; + + -- Check if table has identity columns + + IF (SELECT COUNT(1) FROM pg_attribute WHERE attrelid = tbl_oid AND attidentity <> '') > 0 THEN + RAISE WARNING 'Cannot convert table: table %.% must not use GENERATED ... AS IDENTITY.', tbl_schema, tbl_name; + RETURN 0; + END IF; + + -- Collect triggers definitions + + SELECT ARRAY_AGG(pg_get_triggerdef(oid)) FROM pg_trigger + WHERE tgrelid = tbl_oid INTO trigger_list_definition; + + -- Collect constraint names and definitions (delimiter is `?`) + -- Search for constraints that depend on index AM which is supported by columnar AM + + SELECT ARRAY_AGG(pg_constraint.conname || '?' || pg_get_constraintdef(pg_constraint.oid)) + + FROM pg_constraint, pg_class + + WHERE + pg_constraint.conindid = pg_class.oid + AND + pg_constraint.conrelid = tbl_oid + AND + pg_class.relam IN (SELECT oid FROM pg_am WHERE amname IN ('btree', 'hash')) + + INTO constraint_list_name_and_definition; + + -- Collect index definitions which are not constraints + + SELECT ARRAY_AGG(indexdef) FROM pg_indexes + + WHERE + + schemaname = tbl_schema_original AND tablename = tbl_name_original + + AND + + quote_ident(indexname)::regclass::oid IN + ( + SELECT indexrelid FROM pg_index + + WHERE + indexrelid IN + (SELECT quote_ident(indexname)::regclass::oid FROM pg_indexes + WHERE schemaname = tbl_schema_original AND tablename = tbl_name_original) + + AND + + indexrelid NOT IN + (SELECT conindid FROM pg_constraint + WHERE pg_constraint.conrelid = tbl_oid) + ) + + INTO index_list_definition; + + -- Generate random name for new table + + SELECT 't_' || substr(md5(random()::text), 0, 25) INTO temp_tbl_name; + + -- Create new table + + EXECUTE FORMAT(' + CREATE TABLE %s.%s (LIKE %s.%s + INCLUDING GENERATED + INCLUDING DEFAULTS + ) USING %s'::text, tbl_schema, temp_tbl_name, tbl_schema, tbl_name, method); + + -- Insert all data from original table + + EXECUTE FORMAT('INSERT INTO %s.%s SELECT * FROM %s.%s'::text, tbl_schema, temp_tbl_name, tbl_schema, tbl_name); + + -- Drop original table + + EXECUTE FORMAT('DROP TABLE %s.%s'::text, tbl_schema, tbl_name); + + -- Rename new table to original name + + EXECUTE FORMAT('ALTER TABLE %s.%s RENAME TO %s;'::text, tbl_schema, temp_tbl_name, tbl_name); + + -- Since we inserted rows before they are not flushed so trigger flushing + + EXECUTE FORMAT('SELECT COUNT(1) FROM %s.%s LIMIT 1;'::text, tbl_schema, tbl_name); + + -- Set indexes + + IF CARDINALITY(index_list_definition) <> 0 THEN + FOREACH idx IN ARRAY index_list_definition + LOOP + BEGIN + EXECUTE idx; + EXCEPTION WHEN feature_not_supported THEN + RAISE WARNING 'Index `%` cannot be created.', idx; + END; + END LOOP; + END IF; + + -- Set constraints + + IF CARDINALITY(constraint_list_name_and_definition) <> 0 THEN + FOREACH constraint_name_and_definition IN ARRAY constraint_list_name_and_definition + LOOP + SELECT string_to_array(constraint_name_and_definition, '?') INTO constraint_name_and_definition_split; + BEGIN + EXECUTE 'ALTER TABLE ' || tbl_name || ' ADD CONSTRAINT ' + || constraint_name_and_definition_split[1] || ' ' + || constraint_name_and_definition_split[2]; + EXCEPTION WHEN feature_not_supported THEN + RAISE WARNING 'Constraint `%` cannot be added.', constraint_name_and_definition_split[2]; + END; + END LOOP; + END IF; + + -- Set triggers + + IF CARDINALITY(trigger_list_definition) <> 0 THEN + FOREACH trigger IN ARRAY trigger_list_definition + LOOP + BEGIN + EXECUTE trigger; + EXCEPTION WHEN feature_not_supported THEN + RAISE WARNING 'Trigger `%` cannot be applied.', trigger; + RAISE WARNING + 'Foreign keys and AFTER ROW triggers are not supported for columnar tables.' + ' Consider an AFTER STATEMENT trigger instead.'; + END; + END LOOP; + END IF; + + -- Restore search_path + EXECUTE FORMAT('SET search_path TO %s'::text, saved_search_path); + + RETURN 1; + +END; + +$func$; + +COMMENT ON FUNCTION columnar.alter_table_set_access_method(t text, method text) + IS 'alters a table''s access method'; \ No newline at end of file diff --git a/columnar/src/backend/columnar/sql/udfs/alter_table_set_access_method/latest.sql b/columnar/src/backend/columnar/sql/udfs/alter_table_set_access_method/latest.sql index 2c3bbb1..fbdad5e 100644 --- a/columnar/src/backend/columnar/sql/udfs/alter_table_set_access_method/latest.sql +++ b/columnar/src/backend/columnar/sql/udfs/alter_table_set_access_method/latest.sql @@ -22,12 +22,13 @@ DECLARE index_list_definition TEXT[]; idx TEXT; + saved_search_path TEXT; + constraint_list_name_and_definition TEXT[]; constraint_name_and_definition TEXT; constraint_name_and_definition_split TEXT[]; BEGIN - CASE WHEN CARDINALITY(tbl_array) = 1 THEN SELECT tbl_array[1] INTO tbl_name; @@ -75,6 +76,10 @@ BEGIN SELECT quote_ident(tbl_schema) INTO tbl_schema; END IF; + SELECT current_setting('search_path') INTO saved_search_path; + + EXECUTE FORMAT('SET search_path TO %s'::text, tbl_schema); + -- Get table OID EXECUTE FORMAT('SELECT %L::regclass::oid'::text, tbl_schema || '.' || tbl_name) INTO tbl_oid; @@ -175,26 +180,26 @@ BEGIN -- Create new table EXECUTE FORMAT(' - CREATE TABLE %s (LIKE %s.%s + CREATE TABLE %s.%s (LIKE %s.%s INCLUDING GENERATED INCLUDING DEFAULTS - ) USING %s'::text, temp_tbl_name, tbl_schema, tbl_name, method); + ) USING %s'::text, tbl_schema, temp_tbl_name, tbl_schema, tbl_name, method); -- Insert all data from original table - EXECUTE FORMAT('INSERT INTO %s SELECT * FROM %s.%s'::text, temp_tbl_name, tbl_schema, tbl_name); + EXECUTE FORMAT('INSERT INTO %s.%s SELECT * FROM %s.%s'::text, tbl_schema, temp_tbl_name, tbl_schema, tbl_name); -- Drop original table - EXECUTE FORMAT('DROP TABLE %s'::text, tbl_name); + EXECUTE FORMAT('DROP TABLE %s.%s'::text, tbl_schema, tbl_name); -- Rename new table to original name - EXECUTE FORMAT('ALTER TABLE %s RENAME TO %s;'::text, temp_tbl_name, tbl_name); + EXECUTE FORMAT('ALTER TABLE %s.%s RENAME TO %s;'::text, tbl_schema, temp_tbl_name, tbl_name); -- Since we inserted rows before they are not flushed so trigger flushing - EXECUTE FORMAT('SELECT COUNT(1) FROM %s LIMIT 1;'::text, tbl_name); + EXECUTE FORMAT('SELECT COUNT(1) FROM %s.%s LIMIT 1;'::text, tbl_schema, tbl_name); -- Set indexes @@ -241,6 +246,9 @@ BEGIN END LOOP; END IF; + -- Restore search_path + EXECUTE FORMAT('SET search_path TO %s'::text, saved_search_path); + RETURN 1; END; diff --git a/columnar/src/test/regress/expected/columnar_alter_table_set_access_method.out b/columnar/src/test/regress/expected/columnar_alter_table_set_access_method.out index 7bb7feb..4017fa7 100644 --- a/columnar/src/test/regress/expected/columnar_alter_table_set_access_method.out +++ b/columnar/src/test/regress/expected/columnar_alter_table_set_access_method.out @@ -545,3 +545,121 @@ ORDER BY conname; (3 rows) DROP TABLE "tBl"; +-- 11. Check case sensitivity and schema sensitivity +CREATE SCHEMA "tEST"; +CREATE TABLE "tEST"."tBl" ( + c1 CIRCLE, + "C2" TEXT, + i int4[], + p point, + a int, + EXCLUDE USING gist + (c1 WITH &&, ("C2"::circle) WITH &&) + WHERE (circle_center(c1) <> '(0,0)'), + EXCLUDE USING btree + (a WITH =) + INCLUDE(p) + WHERE ("C2" < 'astring') +); +CREATE INDEX "TBL_GIN" ON "tEST"."tBl" USING gin (i); +CREATE INDEX tbl_gist ON "tEST"."tBl" USING gist(p); +CREATE INDEX tbl_brin ON "tEST"."tBl" USING brin (a) WITH (pages_per_range = 1); +CREATE INDEX tbl_hash ON "tEST"."tBl" USING hash ("C2"); +ALTER TABLE "tEST"."tBl" ADD CONSTRAINT tbl_unique UNIQUE ("C2"); +CREATE UNIQUE INDEX tbl_btree ON "tEST"."tBl" USING btree (a); +ALTER TABLE "tEST"."tBl" ADD CONSTRAINT tbl_pkey PRIMARY KEY USING INDEX tbl_btree; +NOTICE: ALTER TABLE / ADD CONSTRAINT USING INDEX will rename index "tbl_btree" to "tbl_pkey" +SELECT indexname, indexdef FROM pg_indexes +WHERE tablename = 'tBl' +ORDER BY indexname; + indexname | indexdef +----------------+----------------------------------------------------------------------------------------------------------------------------- + TBL_GIN | CREATE INDEX "TBL_GIN" ON "tEST"."tBl" USING gin (i) + tBl_a_p_excl | CREATE INDEX "tBl_a_p_excl" ON "tEST"."tBl" USING btree (a) INCLUDE (p) WHERE ("C2" < 'astring'::text) + tBl_c1_C2_excl | CREATE INDEX "tBl_c1_C2_excl" ON "tEST"."tBl" USING gist (c1, (("C2")::circle)) WHERE (circle_center(c1) <> '(0,0)'::point) + tbl_brin | CREATE INDEX tbl_brin ON "tEST"."tBl" USING brin (a) WITH (pages_per_range='1') + tbl_gist | CREATE INDEX tbl_gist ON "tEST"."tBl" USING gist (p) + tbl_hash | CREATE INDEX tbl_hash ON "tEST"."tBl" USING hash ("C2") + tbl_pkey | CREATE UNIQUE INDEX tbl_pkey ON "tEST"."tBl" USING btree (a) + tbl_unique | CREATE UNIQUE INDEX tbl_unique ON "tEST"."tBl" USING btree ("C2") +(8 rows) + +SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = '"tEST"."tBl"'::regclass; + pg_get_constraintdef +--------------------------------------------------------------------------------------------------------- + EXCLUDE USING gist (c1 WITH &&, (("C2")::circle) WITH &&) WHERE ((circle_center(c1) <> '(0,0)'::point)) + EXCLUDE USING btree (a WITH =) INCLUDE (p) WHERE (("C2" < 'astring'::text)) + UNIQUE ("C2") + PRIMARY KEY (a) +(4 rows) + +SELECT columnar.alter_table_set_access_method('"tEST"."tBl"', 'columnar'); +WARNING: Index `CREATE INDEX tbl_brin ON "tEST"."tBl" USING brin (a) WITH (pages_per_range='1')` cannot be created. + alter_table_set_access_method +------------------------------- + t +(1 row) + +SELECT COUNT(1) FROM pg_class WHERE relname = 'tBl' AND relam = (SELECT oid FROM pg_am WHERE amname = 'columnar'); + count +------- + 1 +(1 row) + +SELECT indexname FROM pg_indexes WHERE tablename = 'tBl' ORDER BY indexname; + indexname +-------------- + TBL_GIN + tbl_a_p_excl + tbl_gist + tbl_hash + tbl_pkey + tbl_unique +(6 rows) + +SELECT conname FROM pg_constraint +WHERE conrelid = '"tEST"."tBl"'::regclass +ORDER BY conname; + conname +-------------- + tbl_a_p_excl + tbl_pkey + tbl_unique +(3 rows) + +-- Convert back to 'heap' +SELECT columnar.alter_table_set_access_method('"tEST"."tBl"', 'heap'); + alter_table_set_access_method +------------------------------- + t +(1 row) + +SELECT COUNT(1) FROM pg_class WHERE relname = 'tBl' AND relam = (SELECT oid FROM pg_am WHERE amname = 'heap'); + count +------- + 1 +(1 row) + +SELECT indexname FROM pg_indexes WHERE tablename = 'tBl' ORDER BY indexname; + indexname +-------------- + TBL_GIN + tbl_a_p_excl + tbl_gist + tbl_hash + tbl_pkey + tbl_unique +(6 rows) + +SELECT conname FROM pg_constraint +WHERE conrelid = '"tEST"."tBl"'::regclass +ORDER BY conname; + conname +-------------- + tbl_a_p_excl + tbl_pkey + tbl_unique +(3 rows) + +DROP TABLE "tEST"."tBl"; +DROP SCHEMA "tEST"; diff --git a/columnar/src/test/regress/sql/columnar_alter_table_set_access_method.sql b/columnar/src/test/regress/sql/columnar_alter_table_set_access_method.sql index a7c5816..d176b8b 100644 --- a/columnar/src/test/regress/sql/columnar_alter_table_set_access_method.sql +++ b/columnar/src/test/regress/sql/columnar_alter_table_set_access_method.sql @@ -287,4 +287,65 @@ SELECT conname FROM pg_constraint WHERE conrelid = '"tBl"'::regclass ORDER BY conname; -DROP TABLE "tBl"; \ No newline at end of file +DROP TABLE "tBl"; + +-- 11. Check case sensitivity and schema sensitivity + +CREATE SCHEMA "tEST"; + +CREATE TABLE "tEST"."tBl" ( + c1 CIRCLE, + "C2" TEXT, + i int4[], + p point, + a int, + EXCLUDE USING gist + (c1 WITH &&, ("C2"::circle) WITH &&) + WHERE (circle_center(c1) <> '(0,0)'), + EXCLUDE USING btree + (a WITH =) + INCLUDE(p) + WHERE ("C2" < 'astring') +); + +CREATE INDEX "TBL_GIN" ON "tEST"."tBl" USING gin (i); +CREATE INDEX tbl_gist ON "tEST"."tBl" USING gist(p); +CREATE INDEX tbl_brin ON "tEST"."tBl" USING brin (a) WITH (pages_per_range = 1); + +CREATE INDEX tbl_hash ON "tEST"."tBl" USING hash ("C2"); +ALTER TABLE "tEST"."tBl" ADD CONSTRAINT tbl_unique UNIQUE ("C2"); + +CREATE UNIQUE INDEX tbl_btree ON "tEST"."tBl" USING btree (a); +ALTER TABLE "tEST"."tBl" ADD CONSTRAINT tbl_pkey PRIMARY KEY USING INDEX tbl_btree; + +SELECT indexname, indexdef FROM pg_indexes +WHERE tablename = 'tBl' +ORDER BY indexname; + +SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = '"tEST"."tBl"'::regclass; + +SELECT columnar.alter_table_set_access_method('"tEST"."tBl"', 'columnar'); + +SELECT COUNT(1) FROM pg_class WHERE relname = 'tBl' AND relam = (SELECT oid FROM pg_am WHERE amname = 'columnar'); + +SELECT indexname FROM pg_indexes WHERE tablename = 'tBl' ORDER BY indexname; + +SELECT conname FROM pg_constraint +WHERE conrelid = '"tEST"."tBl"'::regclass +ORDER BY conname; + +-- Convert back to 'heap' + +SELECT columnar.alter_table_set_access_method('"tEST"."tBl"', 'heap'); + +SELECT COUNT(1) FROM pg_class WHERE relname = 'tBl' AND relam = (SELECT oid FROM pg_am WHERE amname = 'heap'); + +SELECT indexname FROM pg_indexes WHERE tablename = 'tBl' ORDER BY indexname; + +SELECT conname FROM pg_constraint +WHERE conrelid = '"tEST"."tBl"'::regclass +ORDER BY conname; + +DROP TABLE "tEST"."tBl"; + +DROP SCHEMA "tEST"; \ No newline at end of file