From 3457f1c3704929f284d10ed77a3a2a1b1195359c Mon Sep 17 00:00:00 2001 From: yh263208 Date: Thu, 13 Jun 2024 11:57:46 +0800 Subject: [PATCH 01/16] process schema and database --- dbt/adapters/oceanbase_mysql/connections.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/oceanbase_mysql/connections.py b/dbt/adapters/oceanbase_mysql/connections.py index 92b48ba..ade8a8a 100644 --- a/dbt/adapters/oceanbase_mysql/connections.py +++ b/dbt/adapters/oceanbase_mysql/connections.py @@ -13,7 +13,7 @@ # limitations under the License. from contextlib import contextmanager from dataclasses import dataclass -from typing import Any, ContextManager, Tuple +from typing import Any, ContextManager, Dict, Tuple import mysql from dbt_common.exceptions import DbtRuntimeError @@ -78,6 +78,23 @@ def _connection_keys(self) -> Tuple[str, ...]: "connect_timeout_seconds", ) + def __post_init__(self): + if self.schema is None and self.database is None: + raise DbtRuntimeError("The schema and database can not be both null") + if self.schema is None: + self.schema = self.database + elif self.database is None: + self.database = self.schema + + @classmethod + def translate_aliases(cls, kwargs: Dict[str, Any], recurse: bool = False) -> Dict[str, Any]: + data = super().translate_aliases(kwargs, recurse) + if "schema" not in data: + data.update({"schema": data.get("database", None)}) + elif "database" not in data: + data.update({"database": data.get("schema", None)}) + return data + class OBMySQLConnectionManager(SQLConnectionManager): From 8dda285316b5820fd1c3cb939c693119de497fc0 Mon Sep 17 00:00:00 2001 From: yh263208 Date: Thu, 13 Jun 2024 12:03:22 +0800 Subject: [PATCH 02/16] add parse test cases --- tests/functional/test_parse.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 tests/functional/test_parse.py diff --git a/tests/functional/test_parse.py b/tests/functional/test_parse.py new file mode 100644 index 0000000..8289a08 --- /dev/null +++ b/tests/functional/test_parse.py @@ -0,0 +1,22 @@ +# Copyright (c) 2023 OceanBase. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from dbt.cli.main import dbtRunner, dbtRunnerResult +from tests.functional.utils import BaseOBMySQLTestCase + + +class TestParse(BaseOBMySQLTestCase): + + def test_no_content_parse_succeed(self, project, dbt: dbtRunner): + res: dbtRunnerResult = dbt.invoke(args=["parse"]) + assert res.success == True From a641f65c574fccca05a76fc91b190eee600c2399 Mon Sep 17 00:00:00 2001 From: yh263208 Date: Thu, 13 Jun 2024 14:57:44 +0800 Subject: [PATCH 03/16] fix test error --- tests/functional/test_init.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/functional/test_init.py b/tests/functional/test_init.py index b7bc8f1..ae73776 100644 --- a/tests/functional/test_init.py +++ b/tests/functional/test_init.py @@ -36,6 +36,7 @@ def test_init_project_with_profile_existing_init_succeed( ob_mysql_credentials: OBMySQLCredentials, ): project_name = "test_init_project" + cur_dir = os.getcwd() try: mock_get_adapter.return_value = ["oceanbase_mysql"] mock_confirm.return_value = "y" @@ -77,3 +78,4 @@ def test_init_project_with_profile_existing_init_succeed( finally: if os.getcwd().endswith(project_name): shutil.rmtree(os.getcwd()) + os.chdir(cur_dir) From ac53022242deeb687dd7ac7e4420d3b24ed54a04 Mon Sep 17 00:00:00 2001 From: yh263208 Date: Thu, 13 Jun 2024 15:01:42 +0800 Subject: [PATCH 04/16] opt codes --- tests/conftest.py | 5 +++++ tests/functional/utils.py | 4 ---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index ec68c53..c01a89c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -81,6 +81,11 @@ def ob_mysql_connection(ob_mysql_credentials: OBMySQLCredentials): conn.close() +@pytest.fixture(scope="class") +def project_config_update(): + return {"models": {"+materialized": "view"}} + + def generate_tmp_schema_name(): hostname = socket.gethostname() hostname = re.sub(r"[\.\-\s]", "", hostname) diff --git a/tests/functional/utils.py b/tests/functional/utils.py index 2b37b3c..605abf4 100644 --- a/tests/functional/utils.py +++ b/tests/functional/utils.py @@ -36,7 +36,3 @@ def dbt_profile_target(self, ob_mysql_credentials: OBMySQLCredentials): } ) return kwargs - - @pytest.fixture(scope="class") - def project_config_update(self): - return {"models": {"+materialized": "view"}} From 5742321294e89bbe0c94aab137ba6df3f0df3830 Mon Sep 17 00:00:00 2001 From: yh263208 Date: Thu, 13 Jun 2024 15:22:47 +0800 Subject: [PATCH 05/16] add test cases --- tests/functional/test_parse.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tests/functional/test_parse.py b/tests/functional/test_parse.py index 8289a08..a940c99 100644 --- a/tests/functional/test_parse.py +++ b/tests/functional/test_parse.py @@ -11,6 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import pytest + from dbt.cli.main import dbtRunner, dbtRunnerResult from tests.functional.utils import BaseOBMySQLTestCase @@ -20,3 +22,23 @@ class TestParse(BaseOBMySQLTestCase): def test_no_content_parse_succeed(self, project, dbt: dbtRunner): res: dbtRunnerResult = dbt.invoke(args=["parse"]) assert res.success == True + + +class TestParseWithModels(BaseOBMySQLTestCase): + + __MODEL_SQL = """ + select 1 from dual + """ + + @pytest.fixture(scope="class") + def models(self): + return { + "model_one.sql": self.__MODEL_SQL, + "model_two.sql": self.__MODEL_SQL, + "model_three.sql": self.__MODEL_SQL, + } + + def test_three_models_parse_succeed(self, project, dbt: dbtRunner, models): + res: dbtRunnerResult = dbt.invoke(args=["parse"]) + assert res.success == True + assert len(res.result.nodes) == len(models) From c63aac90a510118b9466045936c2928e0c760d82 Mon Sep 17 00:00:00 2001 From: yh263208 Date: Thu, 13 Jun 2024 16:18:06 +0800 Subject: [PATCH 06/16] fix unit test error --- tests/functional/utils.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/tests/functional/utils.py b/tests/functional/utils.py index 605abf4..5303b66 100644 --- a/tests/functional/utils.py +++ b/tests/functional/utils.py @@ -17,21 +17,31 @@ OBMySQL_DIALECT_TYPE, OBMySQLCredentials, ) +from tests.conftest import generate_tmp_schema_name class BaseOBMySQLTestCase: + __COUNT = 0 + @pytest.fixture(scope="class") - def unique_schema(self, ob_mysql_credentials: OBMySQLCredentials) -> str: - return ob_mysql_credentials.schema + def unique_schema(self, dbt_profile_target) -> str: + return dbt_profile_target["schema"] @pytest.fixture(scope="class") - def dbt_profile_target(self, ob_mysql_credentials: OBMySQLCredentials): + def dbt_profile_target(self, ob_mysql_credentials: OBMySQLCredentials, ob_mysql_connection): kwargs = ob_mysql_credentials.to_dict() for k in OBMySQLCredentials._ALIASES.keys(): kwargs.pop(k, {}) + self.__COUNT = self.__COUNT + 1 + database = f"{generate_tmp_schema_name()}_{self.__COUNT}" + with ob_mysql_connection.cursor() as cursor: + cursor.execute("create database {};".format(database)) + cursor.fetchone() kwargs.update( { + "database": database, + "schema": database, "type": OBMySQL_DIALECT_TYPE, } ) From 8dd32d16992c948ed021bb24855198dca5227c8c Mon Sep 17 00:00:00 2001 From: yh263208 Date: Thu, 13 Jun 2024 16:19:37 +0800 Subject: [PATCH 07/16] opt codes --- tests/functional/utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/functional/utils.py b/tests/functional/utils.py index 5303b66..4138914 100644 --- a/tests/functional/utils.py +++ b/tests/functional/utils.py @@ -17,7 +17,6 @@ OBMySQL_DIALECT_TYPE, OBMySQLCredentials, ) -from tests.conftest import generate_tmp_schema_name class BaseOBMySQLTestCase: @@ -34,7 +33,7 @@ def dbt_profile_target(self, ob_mysql_credentials: OBMySQLCredentials, ob_mysql_ for k in OBMySQLCredentials._ALIASES.keys(): kwargs.pop(k, {}) self.__COUNT = self.__COUNT + 1 - database = f"{generate_tmp_schema_name()}_{self.__COUNT}" + database = f"{ob_mysql_credentials.database}_{self.__COUNT}" with ob_mysql_connection.cursor() as cursor: cursor.execute("create database {};".format(database)) cursor.fetchone() From c61880fdfd831ef1af19349e663df5fb58f12e20 Mon Sep 17 00:00:00 2001 From: yh263208 Date: Thu, 13 Jun 2024 16:25:01 +0800 Subject: [PATCH 08/16] opt codes --- tests/functional/utils.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/functional/utils.py b/tests/functional/utils.py index 4138914..3f18e9c 100644 --- a/tests/functional/utils.py +++ b/tests/functional/utils.py @@ -18,10 +18,10 @@ OBMySQLCredentials, ) +COUNT = 0 -class BaseOBMySQLTestCase: - __COUNT = 0 +class BaseOBMySQLTestCase: @pytest.fixture(scope="class") def unique_schema(self, dbt_profile_target) -> str: @@ -32,8 +32,9 @@ def dbt_profile_target(self, ob_mysql_credentials: OBMySQLCredentials, ob_mysql_ kwargs = ob_mysql_credentials.to_dict() for k in OBMySQLCredentials._ALIASES.keys(): kwargs.pop(k, {}) - self.__COUNT = self.__COUNT + 1 - database = f"{ob_mysql_credentials.database}_{self.__COUNT}" + global COUNT + COUNT = COUNT + 1 + database = f"{ob_mysql_credentials.database}_{COUNT}" with ob_mysql_connection.cursor() as cursor: cursor.execute("create database {};".format(database)) cursor.fetchone() From 0c40a3ec0501e57850702aa5efded5831f229ba5 Mon Sep 17 00:00:00 2001 From: yh263208 Date: Thu, 13 Jun 2024 16:26:43 +0800 Subject: [PATCH 09/16] opt codes --- tests/functional/utils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/functional/utils.py b/tests/functional/utils.py index 3f18e9c..17a624c 100644 --- a/tests/functional/utils.py +++ b/tests/functional/utils.py @@ -18,7 +18,7 @@ OBMySQLCredentials, ) -COUNT = 0 +_COUNT = 0 class BaseOBMySQLTestCase: @@ -32,9 +32,9 @@ def dbt_profile_target(self, ob_mysql_credentials: OBMySQLCredentials, ob_mysql_ kwargs = ob_mysql_credentials.to_dict() for k in OBMySQLCredentials._ALIASES.keys(): kwargs.pop(k, {}) - global COUNT - COUNT = COUNT + 1 - database = f"{ob_mysql_credentials.database}_{COUNT}" + global _COUNT + _COUNT = _COUNT + 1 + database = f"{ob_mysql_credentials.database}_{_COUNT}" with ob_mysql_connection.cursor() as cursor: cursor.execute("create database {};".format(database)) cursor.fetchone() From af58c4d2120def36e9fc3d43002841ffc0e855d4 Mon Sep 17 00:00:00 2001 From: yh263208 Date: Mon, 17 Jun 2024 19:24:07 +0800 Subject: [PATCH 10/16] impl for table --- dbt/adapters/oceanbase_mysql/connections.py | 55 +++++++- dbt/adapters/oceanbase_mysql/impl.py | 72 ++++++++-- dbt/adapters/oceanbase_mysql/relation.py | 7 + .../oceanbase_mysql/macros/adapters.sql | 127 ++++++++++++++++++ .../relations/materialized_view/drop.sql | 3 + .../macros/relations/table/drop.sql | 3 + .../macros/relations/view/drop.sql | 3 + 7 files changed, 259 insertions(+), 11 deletions(-) create mode 100644 dbt/include/oceanbase_mysql/macros/relations/materialized_view/drop.sql create mode 100644 dbt/include/oceanbase_mysql/macros/relations/table/drop.sql create mode 100644 dbt/include/oceanbase_mysql/macros/relations/view/drop.sql diff --git a/dbt/adapters/oceanbase_mysql/connections.py b/dbt/adapters/oceanbase_mysql/connections.py index ade8a8a..13fd823 100644 --- a/dbt/adapters/oceanbase_mysql/connections.py +++ b/dbt/adapters/oceanbase_mysql/connections.py @@ -11,12 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import time from contextlib import contextmanager from dataclasses import dataclass -from typing import Any, ContextManager, Dict, Tuple +from typing import Any, ContextManager, Dict, Optional, Tuple, Union import mysql +from dbt_common.events.contextvars import get_node_info +from dbt_common.events.functions import fire_event from dbt_common.exceptions import DbtRuntimeError +from dbt_common.utils import cast_to_str from dbt.adapters.contracts.connection import ( AdapterResponse, @@ -25,6 +29,7 @@ Credentials, ) from dbt.adapters.events.logging import AdapterLogger +from dbt.adapters.events.types import ConnectionUsed, SQLQuery, SQLQueryStatus from dbt.adapters.sql import SQLConnectionManager log = AdapterLogger("OceanBase") @@ -158,6 +163,48 @@ def cancel(self, connection: Connection): res = cursor.fetchone() log.debug("Cancel query '{}': {}".format(connection_name, res)) + def add_query( + self, + sql: str, + auto_begin: bool = True, + bindings: Optional[Any] = None, + abridge_sql_log: bool = False, + ) -> Tuple[Connection, Any]: + connection = self.get_thread_connection() + if auto_begin and connection.transaction_open is False: + self.begin() + fire_event( + ConnectionUsed( + conn_type=self.TYPE, + conn_name=cast_to_str(connection.name), + node_info=get_node_info(), + ) + ) + with self.exception_handler(sql): + if abridge_sql_log: + log_sql = "{}...".format(sql[:512]) + else: + log_sql = sql + fire_event( + SQLQuery( + conn_name=cast_to_str(connection.name), + sql=log_sql, + node_info=get_node_info(), + ) + ) + pre = time.time() + cursor = connection.handle.cursor() + for item in cursor.execute(sql, bindings, multi=True): + last_cursor = item + fire_event( + SQLQueryStatus( + status=str(self.get_response(cursor)), + elapsed=round((time.time() - pre)), + node_info=get_node_info(), + ) + ) + return connection, last_cursor + @classmethod def get_response(cls, cursor: Any) -> AdapterResponse: # there is no way to get info from cursor before fetch @@ -166,3 +213,9 @@ def get_response(cls, cursor: Any) -> AdapterResponse: return AdapterResponse( _message="{0}-{1}".format(code, rows_affected), rows_affected=rows_affected, code=code ) + + @classmethod + def data_type_code_to_name(cls, type_code: Union[int, str]) -> str: + field_type_values = mysql.connector.constants.FieldType.desc.values() + mapping = {code: name for code, name in field_type_values} + return mapping[type_code] diff --git a/dbt/adapters/oceanbase_mysql/impl.py b/dbt/adapters/oceanbase_mysql/impl.py index 28ddc36..1507ebd 100644 --- a/dbt/adapters/oceanbase_mysql/impl.py +++ b/dbt/adapters/oceanbase_mysql/impl.py @@ -11,27 +11,48 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Type +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Type import agate -from dbt_common.contracts.constraints import ConstraintType - -from dbt.adapters.base import ( - BaseConnectionManager, - BaseRelation, - Column, - ConstraintSupport, -) +from dbt_common.contracts.constraints import ColumnLevelConstraint, ConstraintType +from dbt_common.dataclass_schema import dbtClassMixin +from dbt_common.exceptions import DbtRuntimeError, DbtValidationError + +from dbt.adapters.base import BaseConnectionManager, BaseRelation +from dbt.adapters.base import Column as BaseColumn +from dbt.adapters.base import ConstraintSupport, available from dbt.adapters.oceanbase_mysql.column import OBMySQLColumn from dbt.adapters.oceanbase_mysql.connections import OBMySQLConnectionManager from dbt.adapters.oceanbase_mysql.relation import OBMySQLRelation from dbt.adapters.sql import SQLAdapter +@dataclass +class OBMySQLIndex(dbtClassMixin): + + columns: List[str] + algorithm: str = field(default=None) + unique: Optional[bool] = None + name: str = field(default=None) + options: List[str] = field(default=None) + column_groups: List[str] = field(default=None) + + def get_name(self, relation: BaseRelation): + if self.name is not None: + return self.name + if self.columns is None: + raise DbtRuntimeError("Index columns can not be empty") + return "dbt_idx_{0}_{1}".format( + relation.quote(identifier=False).include(database=False, schema=False), + "_".join([name for name in self.columns]), + ) + + class OBMySQLAdapter(SQLAdapter): Relation: Type[BaseRelation] = OBMySQLRelation - Column: Type[Column] = OBMySQLColumn + Column: Type[BaseColumn] = OBMySQLColumn ConnectionManager: Type[BaseConnectionManager] = OBMySQLConnectionManager CONSTRAINT_SUPPORT = { @@ -56,3 +77,34 @@ def convert_datetime_type(cls, agate_table: agate.Table, col_idx: int) -> str: @classmethod def quote(cls, identifier: str) -> str: return "`{}`".format(identifier) + + def get_column_schema_from_query(self, sql: str) -> List[BaseColumn]: + _, cursor = self.connections.add_select_query(sql) + cursor.fetchone() + return [ + self.Column.create( + column_name, self.connections.data_type_code_to_name(column_type_code) + ) + for column_name, column_type_code, *_ in cursor.description + ] + + @classmethod + def render_column_constraint(cls, constraint: ColumnLevelConstraint) -> Optional[str]: + constraint_expression = constraint.expression or "" + if constraint.type == ConstraintType.unique: + rendered_column_constraint = f"unique key {constraint_expression}" + elif constraint.type == ConstraintType.foreign_key: + raise DbtRuntimeError("Foreign references should not appear in a column") + else: + return super().render_column_constraint(constraint) + if rendered_column_constraint: + rendered_column_constraint = rendered_column_constraint.strip() + return rendered_column_constraint + + @available + def parse_index(self, raw_index: Dict[str, Any]) -> OBMySQLIndex: + try: + OBMySQLIndex.validate(raw_index) + return OBMySQLIndex.from_dict(raw_index) + except Exception as e: + raise DbtValidationError(f"Could not parse constraint: {raw_index}") diff --git a/dbt/adapters/oceanbase_mysql/relation.py b/dbt/adapters/oceanbase_mysql/relation.py index cbedb3f..ca6e653 100644 --- a/dbt/adapters/oceanbase_mysql/relation.py +++ b/dbt/adapters/oceanbase_mysql/relation.py @@ -52,3 +52,10 @@ def __validate_path(path: Path): raise DbtRuntimeError( f"The schema '{path.schema}' is not equals to the database '{path.database}'" ) + + def __str__(self) -> str: + relation = self.include(schema=False) + return relation.render() if self.limit is None else relation.render_limited() + + def __repr__(self) -> str: + return "<{} {}>".format(self.__class__.__name__, self.include(schema=False).render()) diff --git a/dbt/include/oceanbase_mysql/macros/adapters.sql b/dbt/include/oceanbase_mysql/macros/adapters.sql index bc5371e..57244f4 100644 --- a/dbt/include/oceanbase_mysql/macros/adapters.sql +++ b/dbt/include/oceanbase_mysql/macros/adapters.sql @@ -8,4 +8,131 @@ {%- call statement('drop_schema') -%} drop database if exists {{ relation.without_identifier().include(schema=False) }} {%- endcall -%} +{% endmacro %} + +{% macro oceanbase_mysql__list_schemas(database) %} + {% call statement('list_schemas', fetch_result=True, auto_begin=False) %} + show databases + {% endcall %} + {{ return(load_result('list_schemas').table) }} +{% endmacro %} + +{% macro oceanbase_mysql__list_relations_without_caching(schema_relation) %} + {% call statement('list_relations_without_caching', fetch_result=True) -%} + select + '{{ schema_relation.schema }}' as `schema`, + '{{ schema_relation.database }}' as `database`, + table_name as name, 'table' as type + from information_schema.tables + where table_schema = '{{ schema_relation.schema }}' and table_type = 'BASE TABLE' + union all + select + '{{ schema_relation.schema }}' as `schema`, + '{{ schema_relation.database }}' as `database`, + table_name as name, 'view' as type + from information_schema.tables + where table_schema = '{{ schema_relation.schema }}' and table_type like '%VIEW%' + union all + select + '{{ schema_relation.schema }}' as `schema`, + '{{ schema_relation.database }}' as `database`, + mview_name as name, 'materialized_view' as type + from oceanbase.DBA_MVIEWS + where owner = '{{ schema_relation.schema }}' + {% endcall %} + {{ return(load_result('list_relations_without_caching').table) }} +{% endmacro %} + +{% macro oceanbase_mysql__create_table_as(temporary, relation, sql) -%} + {%- set external = config.get('external', default=false) -%} + {%- set sql_header = config.get('sql_header', none) -%} + {%- set column_groups = config.get('column_groups', none) -%} + + {{ sql_header if sql_header is not none }} + + create {% if temporary -%} + temporary + {%- elif external -%} + external + {%- endif %} table {{ relation.include(schema=False) }} + + {% if column_groups is not none %} + with column group ( + {%- for column_group in column_groups -%} + {{- column_group -}} + {{ ", " if not loop.last }} + {%- endfor -%} + ) + {% endif %} + + {% set contract_config = config.get('contract') %} + {% if contract_config.enforced %} + {{ get_assert_columns_equivalent(sql) }} + {% endif -%} + {% if contract_config.enforced and (not temporary) -%} + {{ get_table_columns_and_constraints() }} ; + insert into {{ relation.include(schema=False) }} ( + {{ adapter.dispatch('get_column_names', 'dbt')() }} + ) + {%- set sql = get_select_subquery(sql) %} + {% else %} + as + {% endif %} + ( + {{ sql }} + ); +{%- endmacro %} + +{% macro oceanbase_mysql__get_empty_schema_sql(columns) %} + {%- set col_err = [] -%} + select + {% for i in columns %} + {%- set col = columns[i] -%} + {%- if col['data_type'] is not defined -%} + {%- do col_err.append(col['name']) -%} + {%- endif -%} + {% set col_name = adapter.quote(col['name']) if col.get('quote') else col['name'] %} + {%- if col['data_type'].strip().lower() in ('int', 'bigint') -%} + cast(null as signed integer) as {{ col_name }}{{ ", " if not loop.last }} + {% else %} + cast(null as {{ col['data_type'] }}) as {{ col_name }}{{ ", " if not loop.last }} + {%- endif -%} + {%- endfor -%} + {%- if (col_err | length) > 0 -%} + {{ exceptions.column_type_missing(column_names=col_err) }} + {%- endif -%} +{% endmacro %} + +{% macro oceanbase_mysql__get_create_index_sql(relation, index_dict) -%} + {%- set index_config = adapter.parse_index(index_dict) -%} + {%- set comma_separated_columns = ", ".join(index_config.columns) -%} + {%- set index_name = index_config.get_name(relation) -%} + + create {% if index_config.unique -%} + unique + {%- endif %} index if not exists `{{ index_name }}` + {% if index_config.algorithm -%} + using {{ index_config.algorithm }} + {%- endif %} + on {{ relation }} + ({{ comma_separated_columns }}) + {% if index_config.options is not none %} + {{ " ".join(index_config.options) }} + {% endif %} + {% if index_config.column_groups is not none %} + with column group ( + {{ ", ".join(index_config.column_groups) }} + ) + {% endif %}; +{%- endmacro %} + +{% macro oceanbase_mysql__copy_grants() %} + {{ return(False) }} +{% endmacro %} + +{% macro oceanbase_mysql__alter_relation_comment(relation, comment) %} + {%- set external = config.get('external', default=false) -%} + alter table {%- if external -%} + external + {%- endif %} {{ relation }} set comment='{{ comment }}'; {% endmacro %} \ No newline at end of file diff --git a/dbt/include/oceanbase_mysql/macros/relations/materialized_view/drop.sql b/dbt/include/oceanbase_mysql/macros/relations/materialized_view/drop.sql new file mode 100644 index 0000000..3969c37 --- /dev/null +++ b/dbt/include/oceanbase_mysql/macros/relations/materialized_view/drop.sql @@ -0,0 +1,3 @@ +{% macro oceanbase_mysql__drop_materialized_view(relation) -%} + drop materialized view if exists {{ relation.include(schema=False) }} +{%- endmacro %} diff --git a/dbt/include/oceanbase_mysql/macros/relations/table/drop.sql b/dbt/include/oceanbase_mysql/macros/relations/table/drop.sql new file mode 100644 index 0000000..1e7699e --- /dev/null +++ b/dbt/include/oceanbase_mysql/macros/relations/table/drop.sql @@ -0,0 +1,3 @@ +{% macro oceanbase_mysql__drop_table(relation) -%} + drop table if exists {{ relation.include(schema=False) }} +{%- endmacro %} diff --git a/dbt/include/oceanbase_mysql/macros/relations/view/drop.sql b/dbt/include/oceanbase_mysql/macros/relations/view/drop.sql new file mode 100644 index 0000000..37519b4 --- /dev/null +++ b/dbt/include/oceanbase_mysql/macros/relations/view/drop.sql @@ -0,0 +1,3 @@ +{% macro oceanbase_mysql__drop_view(relation) -%} + drop view id exists {{ relation.include(schema=False) }} +{%- endmacro %} From 4d0035f48d59af23e44b170f69c93eb17e8f70b5 Mon Sep 17 00:00:00 2001 From: yh263208 Date: Mon, 17 Jun 2024 19:49:50 +0800 Subject: [PATCH 11/16] fix unit test cases --- tests/functional/test_init.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/functional/test_init.py b/tests/functional/test_init.py index ae73776..6bee17f 100644 --- a/tests/functional/test_init.py +++ b/tests/functional/test_init.py @@ -37,6 +37,7 @@ def test_init_project_with_profile_existing_init_succeed( ): project_name = "test_init_project" cur_dir = os.getcwd() + profiles_dir = os.getcwd() try: mock_get_adapter.return_value = ["oceanbase_mysql"] mock_confirm.return_value = "y" @@ -53,6 +54,7 @@ def test_init_project_with_profile_existing_init_succeed( args=["init"], **{ "project_name": project_name, + "profiles_dir": profiles_dir, }, ) assert dbtRunnerResult(success=True) == actual @@ -79,3 +81,4 @@ def test_init_project_with_profile_existing_init_succeed( if os.getcwd().endswith(project_name): shutil.rmtree(os.getcwd()) os.chdir(cur_dir) + os.remove(os.path.join(profiles_dir, "profiles.yml")) From 9e7a5cd395a41bb9f93160411addae85658e8471 Mon Sep 17 00:00:00 2001 From: yh263208 Date: Mon, 17 Jun 2024 20:33:40 +0800 Subject: [PATCH 12/16] add a basic test cases --- tests/functional/basic/test_basic.py | 74 ++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 tests/functional/basic/test_basic.py diff --git a/tests/functional/basic/test_basic.py b/tests/functional/basic/test_basic.py new file mode 100644 index 0000000..56f4839 --- /dev/null +++ b/tests/functional/basic/test_basic.py @@ -0,0 +1,74 @@ +# Copyright (c) 2023 OceanBase. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest + +from dbt.adapters.base import BaseAdapter +from dbt.cli.main import dbtRunner, dbtRunnerResult +from tests.functional.utils import BaseOBMySQLTestCase + +_MODEL_SQL = """ + +{{ config( + persist_docs={"relation": True}, + materialized='table', + indexes=[ + { + "columns": ["id"], + } + ], + contract={'enforced': True}) }} + + select 1 as id union all select 2 as id + +""" + +_MODEL_YML = """ + +models: + - name: my_first_model + config: + materialized: table + contract: + enforced: true + constraints: + - type: unique + columns: ['id'] + description: "this is comment" + columns: + - name: id + quote: True + data_type: bigint + constraints: + - type: not_null + - type: unique + +""" + + +class TestRunSimpleModel(BaseOBMySQLTestCase): + + @pytest.fixture(scope="class") + def models(self): + return { + "my_first_model.sql": _MODEL_SQL, + "my_first_model.yml": _MODEL_YML, + } + + def test_simple_model_run_succeed(self, project, dbt: dbtRunner): + res: dbtRunnerResult = dbt.invoke(args=["run"]) + assert res.success == True + adapter: BaseAdapter = project.adapter + with adapter.connection_named("test"): + _, table = adapter.execute("show tables", fetch=True) + assert ["my_first_model"] == [row.values()[0] for row in table.rows] From d494fbf437c926b50e233d16d87736538d1fa765 Mon Sep 17 00:00:00 2001 From: yh263208 Date: Mon, 17 Jun 2024 21:05:44 +0800 Subject: [PATCH 13/16] add a test case --- .../basic/test_invalid_reference.py | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 tests/functional/basic/test_invalid_reference.py diff --git a/tests/functional/basic/test_invalid_reference.py b/tests/functional/basic/test_invalid_reference.py new file mode 100644 index 0000000..1565acc --- /dev/null +++ b/tests/functional/basic/test_invalid_reference.py @@ -0,0 +1,43 @@ +# Copyright (c) 2023 OceanBase. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest +from dbt_common.exceptions import CompilationError + +from dbt.cli.main import dbtRunner +from dbt.tests.adapter.column_types.test_column_types import run_dbt +from tests.functional.utils import BaseOBMySQLTestCase + +descendant_sql = """ +-- should be ref('model') +select * from {{ ref(model) }} +""" + + +model_sql = """ +select 1 as id +""" + + +class TestInvalidReference(BaseOBMySQLTestCase): + + @pytest.fixture(scope="class") + def models(self): + return { + "descendant.sql": descendant_sql, + "model.sql": model_sql, + } + + def test_undefined_value(self, project, dbt: dbtRunner): + with pytest.raises(CompilationError): + run_dbt(args=["compile"]) From aabeb84ee127179a60a11a3e284141ce978d143c Mon Sep 17 00:00:00 2001 From: yh263208 Date: Mon, 17 Jun 2024 22:24:20 +0800 Subject: [PATCH 14/16] add docs --- README-zh-CN.md | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/README-zh-CN.md b/README-zh-CN.md index abcc3a7..aad2248 100644 --- a/README-zh-CN.md +++ b/README-zh-CN.md @@ -103,4 +103,22 @@ dbt run | Materializaed View |✅| - | | Docs |✅| 生成文档 | | Seed |✅| 导入数据 | -| Snaspshot |✅| 生成快照 | \ No newline at end of file +| Snaspshot |✅| 生成快照 | + +### Table + +dbt-oceanbase 对表对象进行了针对性的兼容,允许用户最大限度地使用 OceanBase 表地特性。 + +| 特性 |是否支持| 使用示例 | +|:----------|:----|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 外表 | ✅ | ` {{ config(external=True) }}` | +| 列存 | ✅ | ` {{ config(column_groups=['all columns', 'r_name(col1, col2)']) }}` | +| 临时表 | ✅ | ` {{ config(temporary=True) }}` | +| contract | ✅ | ` {{ config(contract={'enforced': True}) }}` | +| 检查约束(列/表) | ✅ | `constraints.type='check'` | +| 非空约束(列/表) | ✅ | `constraints.type='not_null'` | +| 唯一约束(列/表) | ✅ | `constraints.type='unique'` | +| 主键约束(列/表) | ✅ | `constraints.type='primary_key'` | +| 外键约束(表) | ✅ | `constraints.type='foreign_key'` | +| 表级注释 | ✅ | `models.description='this is the comment'` | +| 索引 | ✅ | `{{ config(indexes=[{"columns": ["id"],"algorithm": "BTREE", "unique": True, "options": ['GLOBAL'], "name": "idx", "column_groups": ['all columns', 'r_name(col1, col2)']}] }}` | \ No newline at end of file From c7e438fdd4f63ee82b738bf79a9d6cbff96952ca Mon Sep 17 00:00:00 2001 From: yh263208 Date: Tue, 18 Jun 2024 10:19:22 +0800 Subject: [PATCH 15/16] opt test cases --- tests/functional/basic/test_invalid_reference.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tests/functional/basic/test_invalid_reference.py b/tests/functional/basic/test_invalid_reference.py index 1565acc..91aa28a 100644 --- a/tests/functional/basic/test_invalid_reference.py +++ b/tests/functional/basic/test_invalid_reference.py @@ -38,6 +38,12 @@ def models(self): "model.sql": model_sql, } - def test_undefined_value(self, project, dbt: dbtRunner): - with pytest.raises(CompilationError): - run_dbt(args=["compile"]) + def test_undefined_value( + self, project, dbt: dbtRunner, dbt_profile_target, ob_mysql_connection + ): + try: + with pytest.raises(CompilationError): + run_dbt(args=["compile"]) + finally: + with ob_mysql_connection.cursor() as cursor: + cursor.execute(f"drop database {dbt_profile_target['database']}") From f72ea675b1d85051525ff51338aa41953852fc7f Mon Sep 17 00:00:00 2001 From: yh263208 Date: Tue, 18 Jun 2024 10:25:03 +0800 Subject: [PATCH 16/16] opt test cases --- tests/functional/basic/test_invalid_reference.py | 12 +++--------- tests/functional/test_debug.py | 10 +++++++--- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/tests/functional/basic/test_invalid_reference.py b/tests/functional/basic/test_invalid_reference.py index 91aa28a..1565acc 100644 --- a/tests/functional/basic/test_invalid_reference.py +++ b/tests/functional/basic/test_invalid_reference.py @@ -38,12 +38,6 @@ def models(self): "model.sql": model_sql, } - def test_undefined_value( - self, project, dbt: dbtRunner, dbt_profile_target, ob_mysql_connection - ): - try: - with pytest.raises(CompilationError): - run_dbt(args=["compile"]) - finally: - with ob_mysql_connection.cursor() as cursor: - cursor.execute(f"drop database {dbt_profile_target['database']}") + def test_undefined_value(self, project, dbt: dbtRunner): + with pytest.raises(CompilationError): + run_dbt(args=["compile"]) diff --git a/tests/functional/test_debug.py b/tests/functional/test_debug.py index d306674..aee7f22 100644 --- a/tests/functional/test_debug.py +++ b/tests/functional/test_debug.py @@ -17,6 +17,10 @@ class TestDebug(BaseOBMySQLTestCase): - def test_debug_succeed(self, project, dbt: dbtRunner): - res = dbt.invoke(args=["debug"]) - assert dbtRunnerResult(success=True, result=True) == res + def test_debug_succeed(self, project, dbt: dbtRunner, dbt_profile_target, ob_mysql_connection): + try: + res = dbt.invoke(args=["debug"]) + assert dbtRunnerResult(success=True, result=True) == res + finally: + with ob_mysql_connection.cursor() as cursor: + cursor.execute(f"drop database {dbt_profile_target['database']}")