From aa21eac3a14e74d0cdee69db987b7e95adda9932 Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Tue, 11 Jul 2023 15:10:53 +0200 Subject: [PATCH] [FLINK-32584] Make it possible to unset default catalog and/or database --- .../flink/table/api/TableEnvironment.java | 12 +- .../flink/table/catalog/CatalogManager.java | 68 +++++- .../apache/flink/table/catalog/Catalog.java | 6 + .../planner/delegation/PlannerContext.java | 19 +- .../planner/delegation/PlannerBase.scala | 2 +- .../planner/catalog/UnknownCatalogTest.java | 209 ++++++++++++++++++ 6 files changed, 296 insertions(+), 20 deletions(-) create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java index 01d5f174dbfa0..2169a66b2aff4 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java @@ -33,6 +33,8 @@ import org.apache.flink.table.resource.ResourceUri; import org.apache.flink.table.types.AbstractDataType; +import javax.annotation.Nullable; + import java.io.Serializable; import java.util.Arrays; import java.util.List; @@ -1110,10 +1112,13 @@ default String explainSql(String statement, ExplainDetail... extraDetails) { * * * + *

You can unset the current catalog by passing a null value. If the current catalog is + * unset, you need to use fully qualified identifiers. + * * @param catalogName The name of the catalog to set as the current default catalog. * @see TableEnvironment#useDatabase(String) */ - void useCatalog(String catalogName); + void useCatalog(@Nullable String catalogName); /** * Gets the current default database name of the running session. @@ -1179,10 +1184,13 @@ default String explainSql(String statement, ExplainDetail... extraDetails) { * * * + *

You can unset the current database by passing a null value. If the current database is + * unset, you need to qualify identifiers at least with the database name. + * * @param databaseName The name of the database to set as the current database. * @see TableEnvironment#useCatalog(String) */ - void useDatabase(String databaseName); + void useDatabase(@Nullable String databaseName); /** Returns the table config that defines the runtime behavior of the Table API. */ TableConfig getConfig(); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 09625fb658c06..2ec64890e9358 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -73,9 +73,9 @@ public final class CatalogManager implements CatalogRegistry { private final Map temporaryTables; // The name of the current catalog and database - private String currentCatalogName; + private @Nullable String currentCatalogName; - private String currentDatabaseName; + private @Nullable String currentDatabaseName; private DefaultSchemaResolver schemaResolver; @@ -226,7 +226,7 @@ public void unregisterCatalog(String catalogName, boolean ignoreIfNotExists) { "Catalog name cannot be null or empty."); if (catalogs.containsKey(catalogName)) { - if (currentCatalogName.equals(catalogName)) { + if (catalogName.equals(currentCatalogName)) { throw new CatalogException("Cannot drop a catalog which is currently in use."); } Catalog catalog = catalogs.remove(catalogName); @@ -282,10 +282,15 @@ public String getCurrentCatalog() { * @throws CatalogNotExistException thrown if the catalog doesn't exist * @see CatalogManager#qualifyIdentifier(UnresolvedIdentifier) */ - public void setCurrentCatalog(String catalogName) throws CatalogNotExistException { + public void setCurrentCatalog(@Nullable String catalogName) throws CatalogNotExistException { + if (catalogName == null) { + this.currentCatalogName = null; + this.currentDatabaseName = null; + return; + } + checkArgument( - !StringUtils.isNullOrWhitespaceOnly(catalogName), - "Catalog name cannot be null or empty."); + !StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be empty."); Catalog potentialCurrentCatalog = catalogs.get(catalogName); if (potentialCurrentCatalog == null) { @@ -293,7 +298,7 @@ public void setCurrentCatalog(String catalogName) throws CatalogNotExistExceptio format("A catalog with name [%s] does not exist.", catalogName)); } - if (!currentCatalogName.equals(catalogName)) { + if (!catalogName.equals(currentCatalogName)) { currentCatalogName = catalogName; currentDatabaseName = potentialCurrentCatalog.getDefaultDatabase(); @@ -323,10 +328,19 @@ public String getCurrentDatabase() { * @see CatalogManager#qualifyIdentifier(UnresolvedIdentifier) * @see CatalogManager#setCurrentCatalog(String) */ - public void setCurrentDatabase(String databaseName) { + public void setCurrentDatabase(@Nullable String databaseName) { + if (databaseName == null) { + this.currentDatabaseName = null; + return; + } + checkArgument( !StringUtils.isNullOrWhitespaceOnly(databaseName), - "The database name cannot be null or empty."); + "The database name cannot be empty."); + + if (currentCatalogName == null) { + throw new CatalogException("Current catalog has not been set."); + } if (!catalogs.get(currentCatalogName).databaseExists(databaseName)) { throw new CatalogException( @@ -335,7 +349,7 @@ public void setCurrentDatabase(String databaseName) { databaseName, currentCatalogName)); } - if (!currentDatabaseName.equals(databaseName)) { + if (!databaseName.equals(currentDatabaseName)) { currentDatabaseName = databaseName; LOG.info( @@ -681,8 +695,37 @@ private boolean permanentDatabaseExists(String catalogName, String databaseName) */ public ObjectIdentifier qualifyIdentifier(UnresolvedIdentifier identifier) { return ObjectIdentifier.of( - identifier.getCatalogName().orElseGet(this::getCurrentCatalog), - identifier.getDatabaseName().orElseGet(this::getCurrentDatabase), + identifier + .getCatalogName() + .orElseGet( + () -> { + final String currentCatalog = getCurrentCatalog(); + if (StringUtils.isNullOrWhitespaceOnly(currentCatalog)) { + throw new ValidationException( + "A current catalog has not been set. Please use a" + + " fully qualified identifier (such as" + + " 'my_catalog.my_database.my_table') or" + + " set a current catalog using" + + " 'USE CATALOG my_catalog'."); + } + return currentCatalog; + }), + identifier + .getDatabaseName() + .orElseGet( + () -> { + final String currentDatabase = getCurrentDatabase(); + if (StringUtils.isNullOrWhitespaceOnly(currentDatabase)) { + throw new ValidationException( + "A current database has not been set. Please use a" + + " fully qualified identifier (such as" + + " 'my_database.my_table' or" + + " 'my_catalog.my_database.my_table') or" + + " set a current database using" + + " 'USE my_database'."); + } + return currentDatabase; + }), identifier.getObjectName()); } @@ -941,6 +984,7 @@ private void dropTableInternal( * handling across different commands. */ private interface ModifyCatalog { + void execute(Catalog catalog, ObjectPath path) throws Exception; } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java index 48b3fcda6c244..8b137d2e51d41 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java @@ -42,6 +42,8 @@ import org.apache.flink.table.factories.TableFactory; import org.apache.flink.table.procedures.Procedure; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -120,9 +122,13 @@ default Optional getFunctionDefinitionFactory() { * value probably comes from configuration, will not change for the life time of the catalog * instance. * + *

If the default database is null, users will need to set a current database themselves or + * qualify identifiers at least with the database name when using the catalog. + * * @return the name of the current database * @throws CatalogException in case of any runtime exception */ + @Nullable String getDefaultDatabase() throws CatalogException; /** diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java index 66297964b351a..81f359a4fce7e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java @@ -49,6 +49,7 @@ import org.apache.flink.table.planner.plan.cost.FlinkCostFactory; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.planner.utils.TableConfigUtils; +import org.apache.flink.util.StringUtils; import org.apache.calcite.config.Lex; import org.apache.calcite.jdbc.CalciteSchema; @@ -70,6 +71,7 @@ import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; +import java.util.ArrayList; import java.util.List; import static java.util.Arrays.asList; @@ -193,13 +195,20 @@ public FlinkCalciteCatalogReader createCatalogReader(boolean lenientCaseSensitiv final SchemaPlus finalRootSchema = getRootSchema(rootSchema.plus()); final CatalogManager catalogManager = context.getCatalogManager(); - return new FlinkCalciteCatalogReader( - CalciteSchema.from(finalRootSchema), - asList( + final List> paths = new ArrayList<>(); + if (!StringUtils.isNullOrWhitespaceOnly(catalogManager.getCurrentCatalog())) { + if (!StringUtils.isNullOrWhitespaceOnly(catalogManager.getCurrentDatabase())) { + paths.add( asList( catalogManager.getCurrentCatalog(), - catalogManager.getCurrentDatabase()), - singletonList(catalogManager.getCurrentCatalog())), + catalogManager.getCurrentDatabase())); + } + paths.add(singletonList(catalogManager.getCurrentCatalog())); + } + + return new FlinkCalciteCatalogReader( + CalciteSchema.from(finalRootSchema), + paths, typeFactory, CalciteConfig$.MODULE$.connectionConfig(newSqlParserConfig)); } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index 83f81be2678f4..48fe89d53031d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -478,7 +478,7 @@ abstract class PlannerBase( TimeZone.getTimeZone(TableConfigUtils.getLocalTimeZone(tableConfig)).getOffset(epochTime) tableConfig.set(TABLE_QUERY_START_LOCAL_TIME, localTime) - val currentDatabase = catalogManager.getCurrentDatabase + val currentDatabase = Option(catalogManager.getCurrentDatabase).getOrElse("") tableConfig.set(TABLE_QUERY_CURRENT_DATABASE, currentDatabase) // We pass only the configuration to avoid reconfiguration with the rootConfiguration diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java new file mode 100644 index 0000000000000..250aa35c84f65 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/UnknownCatalogTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.catalog; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableDescriptor; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; + +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.STRING; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for no default catalog and/or database. */ +public class UnknownCatalogTest { + + public static final String BUILTIN_CATALOG = "cat"; + private static final String BUILTIN_DATABASE = "db"; + public static final EnvironmentSettings ENVIRONMENT_SETTINGS = + EnvironmentSettings.newInstance() + .inStreamingMode() + .withBuiltInCatalogName(BUILTIN_CATALOG) + .withBuiltInDatabaseName(BUILTIN_DATABASE) + .build(); + public static final ResolvedSchema EXPECTED_SCHEMA = + ResolvedSchema.of(Column.physical("i", INT()), Column.physical("s", STRING())); + + @Test + public void testUnsetCatalogWithFullyQualified() throws Exception { + TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS); + + tEnv.useCatalog(null); + final String tablePath = String.format("%s.%s.%s", BUILTIN_CATALOG, BUILTIN_DATABASE, "tb"); + registerTable(tEnv, tablePath); + + Table table = tEnv.sqlQuery(String.format("SELECT * FROM %s", tablePath)); + + assertThat(table.getResolvedSchema()).isEqualTo(EXPECTED_SCHEMA); + } + + @Test + public void testUnsetCatalogWithSingleIdentifier() throws Exception { + TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS); + + tEnv.useCatalog(null); + final String tableName = "tb"; + final String tablePath = + String.format("%s.%s.%s", BUILTIN_CATALOG, BUILTIN_DATABASE, tableName); + registerTable(tEnv, tablePath); + + assertThatThrownBy(() -> tEnv.sqlQuery("SELECT * FROM " + tableName)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining(String.format("Object '%s' not found", tableName)); + } + + @Test + public void testUsingUnknownDatabaseWithDatabaseQualified() throws Exception { + TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS); + tEnv.useDatabase(null); + + final String tableName = "tb"; + final String tablePath = + String.format("%s.%s.%s", BUILTIN_CATALOG, BUILTIN_DATABASE, tableName); + registerTable(tEnv, tablePath); + + Table table = + tEnv.sqlQuery(String.format("SELECT * FROM %s.%s", BUILTIN_DATABASE, tableName)); + + assertThat(table.getResolvedSchema()).isEqualTo(EXPECTED_SCHEMA); + } + + @Test + public void testUsingUnknownDatabaseWithSingleIdentifier() throws Exception { + TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS); + tEnv.useDatabase(null); + + final String tableName = "tb"; + final String tablePath = + String.format("%s.%s.%s", BUILTIN_CATALOG, BUILTIN_DATABASE, tableName); + registerTable(tEnv, tablePath); + + assertThatThrownBy(() -> tEnv.sqlQuery("SELECT * FROM " + tableName)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining(String.format("Object '%s' not found", tableName)); + } + + @Test + public void testUnsetCatalogWithAlterTable() throws Exception { + TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS); + + tEnv.useCatalog(null); + final String tableName = "tb"; + final String tablePath = + String.format("%s.%s.%s", BUILTIN_CATALOG, BUILTIN_DATABASE, tableName); + registerTable(tEnv, tablePath); + + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format("ALTER TABLE %s ADD (f STRING)", tableName))) + .isInstanceOf(ValidationException.class) + .hasMessage( + "A current catalog has not been set. Please use a fully qualified" + + " identifier (such as 'my_catalog.my_database.my_table') or set a" + + " current catalog using 'USE CATALOG my_catalog'."); + } + + @Test + public void testUnsetDatabaseWithAlterTable() throws Exception { + TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS); + + tEnv.useDatabase(null); + final String tableName = "tb"; + final String tablePath = + String.format("%s.%s.%s", BUILTIN_CATALOG, BUILTIN_DATABASE, tableName); + registerTable(tEnv, tablePath); + + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format("ALTER TABLE %s ADD (f STRING)", tableName))) + .isInstanceOf(ValidationException.class) + .hasMessage( + "A current database has not been set. Please use a fully qualified" + + " identifier (such as 'my_database.my_table' or" + + " 'my_catalog.my_database.my_table') or set a current database" + + " using 'USE my_database'."); + } + + @Test + public void testUnsetDatabaseComingFromCatalogWithAlterTable() throws Exception { + TableEnvironment tEnv = TableEnvironment.create(ENVIRONMENT_SETTINGS); + + final String catalogName = "custom"; + final NullDefaultDatabaseCatalog catalog = new NullDefaultDatabaseCatalog(catalogName); + catalog.createDatabase( + BUILTIN_DATABASE, new CatalogDatabaseImpl(Collections.emptyMap(), null), false); + tEnv.registerCatalog(catalogName, catalog); + tEnv.useCatalog(catalogName); + final String tableName = "tb"; + final String tablePath = + String.format("%s.%s.%s", catalogName, BUILTIN_DATABASE, tableName); + registerTable(tEnv, tablePath); + + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format("ALTER TABLE %s ADD (f STRING)", tableName))) + .isInstanceOf(ValidationException.class) + .hasMessage( + "A current database has not been set. Please use a fully qualified" + + " identifier (such as 'my_database.my_table' or" + + " 'my_catalog.my_database.my_table') or set a current database" + + " using 'USE my_database'."); + } + + private static void registerTable(TableEnvironment tEnv, String tableName) { + final String input1DataId = + TestValuesTableFactory.registerData(Arrays.asList(Row.of(1, "a"), Row.of(2, "b"))); + tEnv.createTable( + tableName, + TableDescriptor.forConnector("values") + .option("data-id", input1DataId) + .schema(Schema.newBuilder().fromResolvedSchema(EXPECTED_SCHEMA).build()) + .build()); + } + + private static class NullDefaultDatabaseCatalog extends GenericInMemoryCatalog { + + public NullDefaultDatabaseCatalog(String name) { + super(name); + } + + @Override + public String getDefaultDatabase() { + return null; + } + } +}