Skip to content

Commit

Permalink
[FLINK-32584] Make it possible to unset default catalog and/or database
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys authored and twalthr committed Jul 17, 2023
1 parent bcdbc51 commit aa21eac
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.table.types.AbstractDataType;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -1110,10 +1112,13 @@ default String explainSql(String statement, ExplainDetail... extraDetails) {
* </tbody>
* </table>
*
* <p>You can unset the current catalog by passing a null value. If the current catalog is
* unset, you need to use fully qualified identifiers.
*
* @param catalogName The name of the catalog to set as the current default catalog.
* @see TableEnvironment#useDatabase(String)
*/
void useCatalog(String catalogName);
void useCatalog(@Nullable String catalogName);

/**
* Gets the current default database name of the running session.
Expand Down Expand Up @@ -1179,10 +1184,13 @@ default String explainSql(String statement, ExplainDetail... extraDetails) {
* </tbody>
* </table>
*
* <p>You can unset the current database by passing a null value. If the current database is
* unset, you need to qualify identifiers at least with the database name.
*
* @param databaseName The name of the database to set as the current database.
* @see TableEnvironment#useCatalog(String)
*/
void useDatabase(String databaseName);
void useDatabase(@Nullable String databaseName);

/** Returns the table config that defines the runtime behavior of the Table API. */
TableConfig getConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ public final class CatalogManager implements CatalogRegistry {
private final Map<ObjectIdentifier, CatalogBaseTable> temporaryTables;

// The name of the current catalog and database
private String currentCatalogName;
private @Nullable String currentCatalogName;

private String currentDatabaseName;
private @Nullable String currentDatabaseName;

private DefaultSchemaResolver schemaResolver;

Expand Down Expand Up @@ -226,7 +226,7 @@ public void unregisterCatalog(String catalogName, boolean ignoreIfNotExists) {
"Catalog name cannot be null or empty.");

if (catalogs.containsKey(catalogName)) {
if (currentCatalogName.equals(catalogName)) {
if (catalogName.equals(currentCatalogName)) {
throw new CatalogException("Cannot drop a catalog which is currently in use.");
}
Catalog catalog = catalogs.remove(catalogName);
Expand Down Expand Up @@ -282,18 +282,23 @@ public String getCurrentCatalog() {
* @throws CatalogNotExistException thrown if the catalog doesn't exist
* @see CatalogManager#qualifyIdentifier(UnresolvedIdentifier)
*/
public void setCurrentCatalog(String catalogName) throws CatalogNotExistException {
public void setCurrentCatalog(@Nullable String catalogName) throws CatalogNotExistException {
if (catalogName == null) {
this.currentCatalogName = null;
this.currentDatabaseName = null;
return;
}

checkArgument(
!StringUtils.isNullOrWhitespaceOnly(catalogName),
"Catalog name cannot be null or empty.");
!StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be empty.");

Catalog potentialCurrentCatalog = catalogs.get(catalogName);
if (potentialCurrentCatalog == null) {
throw new CatalogException(
format("A catalog with name [%s] does not exist.", catalogName));
}

if (!currentCatalogName.equals(catalogName)) {
if (!catalogName.equals(currentCatalogName)) {
currentCatalogName = catalogName;
currentDatabaseName = potentialCurrentCatalog.getDefaultDatabase();

Expand Down Expand Up @@ -323,10 +328,19 @@ public String getCurrentDatabase() {
* @see CatalogManager#qualifyIdentifier(UnresolvedIdentifier)
* @see CatalogManager#setCurrentCatalog(String)
*/
public void setCurrentDatabase(String databaseName) {
public void setCurrentDatabase(@Nullable String databaseName) {
if (databaseName == null) {
this.currentDatabaseName = null;
return;
}

checkArgument(
!StringUtils.isNullOrWhitespaceOnly(databaseName),
"The database name cannot be null or empty.");
"The database name cannot be empty.");

if (currentCatalogName == null) {
throw new CatalogException("Current catalog has not been set.");
}

if (!catalogs.get(currentCatalogName).databaseExists(databaseName)) {
throw new CatalogException(
Expand All @@ -335,7 +349,7 @@ public void setCurrentDatabase(String databaseName) {
databaseName, currentCatalogName));
}

if (!currentDatabaseName.equals(databaseName)) {
if (!databaseName.equals(currentDatabaseName)) {
currentDatabaseName = databaseName;

LOG.info(
Expand Down Expand Up @@ -681,8 +695,37 @@ private boolean permanentDatabaseExists(String catalogName, String databaseName)
*/
public ObjectIdentifier qualifyIdentifier(UnresolvedIdentifier identifier) {
return ObjectIdentifier.of(
identifier.getCatalogName().orElseGet(this::getCurrentCatalog),
identifier.getDatabaseName().orElseGet(this::getCurrentDatabase),
identifier
.getCatalogName()
.orElseGet(
() -> {
final String currentCatalog = getCurrentCatalog();
if (StringUtils.isNullOrWhitespaceOnly(currentCatalog)) {
throw new ValidationException(
"A current catalog has not been set. Please use a"
+ " fully qualified identifier (such as"
+ " 'my_catalog.my_database.my_table') or"
+ " set a current catalog using"
+ " 'USE CATALOG my_catalog'.");
}
return currentCatalog;
}),
identifier
.getDatabaseName()
.orElseGet(
() -> {
final String currentDatabase = getCurrentDatabase();
if (StringUtils.isNullOrWhitespaceOnly(currentDatabase)) {
throw new ValidationException(
"A current database has not been set. Please use a"
+ " fully qualified identifier (such as"
+ " 'my_database.my_table' or"
+ " 'my_catalog.my_database.my_table') or"
+ " set a current database using"
+ " 'USE my_database'.");
}
return currentDatabase;
}),
identifier.getObjectName());
}

Expand Down Expand Up @@ -941,6 +984,7 @@ private void dropTableInternal(
* handling across different commands.
*/
private interface ModifyCatalog {

void execute(Catalog catalog, ObjectPath path) throws Exception;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.table.procedures.Procedure;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -120,9 +122,13 @@ default Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() {
* value probably comes from configuration, will not change for the life time of the catalog
* instance.
*
* <p>If the default database is null, users will need to set a current database themselves or
* qualify identifiers at least with the database name when using the catalog.
*
* @return the name of the current database
* @throws CatalogException in case of any runtime exception
*/
@Nullable
String getDefaultDatabase() throws CatalogException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.flink.table.planner.plan.cost.FlinkCostFactory;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.util.StringUtils;

import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteSchema;
Expand All @@ -70,6 +71,7 @@
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;

import java.util.ArrayList;
import java.util.List;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -193,13 +195,20 @@ public FlinkCalciteCatalogReader createCatalogReader(boolean lenientCaseSensitiv
final SchemaPlus finalRootSchema = getRootSchema(rootSchema.plus());

final CatalogManager catalogManager = context.getCatalogManager();
return new FlinkCalciteCatalogReader(
CalciteSchema.from(finalRootSchema),
asList(
final List<List<String>> paths = new ArrayList<>();
if (!StringUtils.isNullOrWhitespaceOnly(catalogManager.getCurrentCatalog())) {
if (!StringUtils.isNullOrWhitespaceOnly(catalogManager.getCurrentDatabase())) {
paths.add(
asList(
catalogManager.getCurrentCatalog(),
catalogManager.getCurrentDatabase()),
singletonList(catalogManager.getCurrentCatalog())),
catalogManager.getCurrentDatabase()));
}
paths.add(singletonList(catalogManager.getCurrentCatalog()));
}

return new FlinkCalciteCatalogReader(
CalciteSchema.from(finalRootSchema),
paths,
typeFactory,
CalciteConfig$.MODULE$.connectionConfig(newSqlParserConfig));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ abstract class PlannerBase(
TimeZone.getTimeZone(TableConfigUtils.getLocalTimeZone(tableConfig)).getOffset(epochTime)
tableConfig.set(TABLE_QUERY_START_LOCAL_TIME, localTime)

val currentDatabase = catalogManager.getCurrentDatabase
val currentDatabase = Option(catalogManager.getCurrentDatabase).getOrElse("")
tableConfig.set(TABLE_QUERY_CURRENT_DATABASE, currentDatabase)

// We pass only the configuration to avoid reconfiguration with the rootConfiguration
Expand Down
Loading

0 comments on commit aa21eac

Please sign in to comment.