Skip to content

Commit

Permalink
[FLINK-35016][table] Add model to catalog interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
lihaosky authored Oct 21, 2024
1 parent 2c83089 commit 2d17f61
Show file tree
Hide file tree
Showing 26 changed files with 2,355 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,11 @@ public CatalogPartition createPartition() {
throw new UnsupportedOperationException();
}

@Override
protected boolean supportsModels() {
return false;
}

@Override
protected CatalogFunction createFunction() {
return new CatalogFunctionImpl(TestGenericUDF.class.getCanonicalName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,4 +671,9 @@ protected CatalogFunction createFunction() {
protected CatalogFunction createAnotherFunction() {
return new CatalogFunctionImpl(UDFRand.class.getName());
}

@Override
protected boolean supportsModels() {
return false;
}
}
161 changes: 159 additions & 2 deletions flink-python/pyflink/table/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import Dict, List, Optional

from py4j.java_gateway import java_import

from pyflink.java_gateway import get_gateway
from pyflink.table.schema import Schema
from pyflink.table.table_schema import TableSchema
from typing import Dict, List, Optional

__all__ = ['Catalog', 'CatalogDatabase', 'CatalogBaseTable', 'CatalogPartition', 'CatalogFunction',
'Procedure', 'ObjectPath', 'CatalogPartitionSpec', 'CatalogTableStatistics',
Expand Down Expand Up @@ -449,6 +448,101 @@ def drop_function(self, function_path: 'ObjectPath', ignore_if_not_exists: bool)
"""
self._j_catalog.dropFunction(function_path._j_object_path, ignore_if_not_exists)

def list_models(self, database_name: str) -> List[str]:
"""
List the names of all models in the given database. An empty list is returned if none is
registered.
:param database_name: Name of the database.
:return: A list of the names of the models in this database.
:raise: CatalogException in case of any runtime exception.
DatabaseNotExistException if the database does not exist.
"""
return list(self._j_catalog.listModels(database_name))

def get_model(self, model_path: 'ObjectPath') -> 'CatalogModel':
"""
Get the model.
:param model_path: Path :class:`ObjectPath` of the model.
:return: The requested :class:`CatalogModel`.
:raise: CatalogException in case of any runtime exception.
ModelNotExistException if the model does not exist in the catalog.
"""
return CatalogModel._get(self._j_catalog.getModel(model_path._j_object_path))

def model_exists(self, model_path: 'ObjectPath') -> bool:
"""
Check whether a model exists or not.
:param model_path: Path :class:`ObjectPath` of the model.
:return: true if the model exists in the catalog false otherwise.
:raise: CatalogException in case of any runtime exception.
"""
return self._j_catalog.modelExists(model_path._j_object_path)

def drop_model(self, model_path: 'ObjectPath', ignore_if_not_exists: bool):
"""
Drop a model.
:param model_path: Path :class:`ObjectPath` of the model to be dropped.
:param ignore_if_not_exists: Flag to specify behavior if the model does not exist:
if set to false, throw an exception
if set to true, nothing happens.
:raise: CatalogException in case of any runtime exception.
ModelNotExistException if the model does not exist.
"""
self._j_catalog.dropModel(model_path._j_object_path, ignore_if_not_exists)

def rename_model(self, model_path: 'ObjectPath', new_model_name: str,
ignore_if_not_exists: bool):
"""
Rename an existing model.
:param model_path: Path :class:`ObjectPath` of the model to be renamed.
:param new_model_name: The new name of the model.
:param ignore_if_not_exists: Flag to specify behavior when the model does not exist:
if set to false, throw an exception,
if set to true, do nothing.
:raise: CatalogException in case of any runtime exception.
ModelNotExistException if the model does not exist.
"""
self._j_catalog.renameModel(model_path._j_object_path, new_model_name, ignore_if_not_exists)

def create_model(self, model_path: 'ObjectPath', model: 'CatalogModel',
ignore_if_exists: bool):
"""
Create a new model.
:param model_path: Path :class:`ObjectPath` of the model to be created.
:param model: The model definition :class:`CatalogModel`.
:param ignore_if_exists: Flag to specify behavior when a model already exists at
the given path:
if set to false, it throws a ModelAlreadyExistException,
if set to true, do nothing.
:raise: CatalogException in case of any runtime exception.
DatabaseNotExistException if the database in tablePath doesn't exist.
ModelAlreadyExistException if model already exists and ignoreIfExists is false.
"""
self._j_catalog.createModel(model_path._j_object_path, model._j_catalog_model,
ignore_if_exists)

def alter_model(self, model_path: 'ObjectPath', new_model: 'CatalogModel',
ignore_if_not_exists):
"""
Modify an existing model.
:param model_path: Path :class:`ObjectPath` of the model to be modified.
:param new_model: The new model definition :class:`CatalogModel`.
:param ignore_if_not_exists: Flag to specify behavior when the model does not exist:
if set to false, throw an exception,
if set to true, do nothing.
:raise: CatalogException in case of any runtime exception.
ModelNotExistException if the model does not exist.
"""
self._j_catalog.alterModel(model_path._j_object_path, new_model._j_catalog_model,
ignore_if_not_exists)

def get_table_statistics(self, table_path: 'ObjectPath') -> 'CatalogTableStatistics':
"""
Get the statistics of a table.
Expand Down Expand Up @@ -1016,6 +1110,69 @@ def get_function_language(self):
return self._j_catalog_function.getFunctionLanguage()


class CatalogModel(object):
"""
Interface for a model in a catalog.
"""

def __init__(self, j_catalog_model):
self._j_catalog_model = j_catalog_model

@staticmethod
def create_model(
input_schema: Schema,
output_schema: Schema,
options: Dict[str, str] = {},
comment: str = None
) -> "CatalogModel":
"""
Create an instance of CatalogModel for the catalog model.
:param input_schema: the model input schema
:param output_schema: the model output schema
:param options: the properties of the catalog model
:param comment: the comment of the catalog model
"""
assert input_schema is not None
assert output_schema is not None
assert options is not None

gateway = get_gateway()
return CatalogModel(
gateway.jvm.org.apache.flink.table.catalog.CatalogModel.of(
input_schema._j_schema, output_schema._j_schema, options, comment))

@staticmethod
def _get(j_catalog_model):
return CatalogModel(j_catalog_model)

def copy(self) -> 'CatalogModel':
"""
Create a deep copy of the model.
:return: A deep copy of "this" instance.
"""
return CatalogModel(self._j_catalog_model.copy())

def get_comment(self) -> str:
"""
Get comment of the model.
:return: Comment of model.
"""
return self._j_catalog_model.getComment()

def get_options(self):
"""
Returns a map of string-based options.
:return: Property map of the model.
.. versionadded:: 1.11.0
"""
return dict(self._j_catalog_model.getOptions())


class Procedure(object):
"""
Interface for a procedure in a catalog.
Expand Down
Loading

0 comments on commit 2d17f61

Please sign in to comment.