Skip to content

Commit

Permalink
impl for table
Browse files Browse the repository at this point in the history
  • Loading branch information
yhilmare committed Jun 17, 2024
1 parent 0c40a3e commit af58c4d
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 11 deletions.
55 changes: 54 additions & 1 deletion dbt/adapters/oceanbase_mysql/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any, ContextManager, Dict, Tuple
from typing import Any, ContextManager, Dict, Optional, Tuple, Union

import mysql
from dbt_common.events.contextvars import get_node_info
from dbt_common.events.functions import fire_event
from dbt_common.exceptions import DbtRuntimeError
from dbt_common.utils import cast_to_str

from dbt.adapters.contracts.connection import (
AdapterResponse,
Expand All @@ -25,6 +29,7 @@
Credentials,
)
from dbt.adapters.events.logging import AdapterLogger
from dbt.adapters.events.types import ConnectionUsed, SQLQuery, SQLQueryStatus
from dbt.adapters.sql import SQLConnectionManager

log = AdapterLogger("OceanBase")
Expand Down Expand Up @@ -158,6 +163,48 @@ def cancel(self, connection: Connection):
res = cursor.fetchone()
log.debug("Cancel query '{}': {}".format(connection_name, res))

def add_query(
self,
sql: str,
auto_begin: bool = True,
bindings: Optional[Any] = None,
abridge_sql_log: bool = False,
) -> Tuple[Connection, Any]:
connection = self.get_thread_connection()
if auto_begin and connection.transaction_open is False:
self.begin()
fire_event(
ConnectionUsed(
conn_type=self.TYPE,
conn_name=cast_to_str(connection.name),
node_info=get_node_info(),
)
)
with self.exception_handler(sql):
if abridge_sql_log:
log_sql = "{}...".format(sql[:512])
else:
log_sql = sql
fire_event(
SQLQuery(
conn_name=cast_to_str(connection.name),
sql=log_sql,
node_info=get_node_info(),
)
)
pre = time.time()
cursor = connection.handle.cursor()
for item in cursor.execute(sql, bindings, multi=True):
last_cursor = item
fire_event(
SQLQueryStatus(
status=str(self.get_response(cursor)),
elapsed=round((time.time() - pre)),
node_info=get_node_info(),
)
)
return connection, last_cursor

@classmethod
def get_response(cls, cursor: Any) -> AdapterResponse:
# there is no way to get info from cursor before fetch
Expand All @@ -166,3 +213,9 @@ def get_response(cls, cursor: Any) -> AdapterResponse:
return AdapterResponse(
_message="{0}-{1}".format(code, rows_affected), rows_affected=rows_affected, code=code
)

@classmethod
def data_type_code_to_name(cls, type_code: Union[int, str]) -> str:
field_type_values = mysql.connector.constants.FieldType.desc.values()
mapping = {code: name for code, name in field_type_values}
return mapping[type_code]
72 changes: 62 additions & 10 deletions dbt/adapters/oceanbase_mysql/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,48 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Type
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Type

import agate
from dbt_common.contracts.constraints import ConstraintType

from dbt.adapters.base import (
BaseConnectionManager,
BaseRelation,
Column,
ConstraintSupport,
)
from dbt_common.contracts.constraints import ColumnLevelConstraint, ConstraintType
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.exceptions import DbtRuntimeError, DbtValidationError

from dbt.adapters.base import BaseConnectionManager, BaseRelation
from dbt.adapters.base import Column as BaseColumn
from dbt.adapters.base import ConstraintSupport, available
from dbt.adapters.oceanbase_mysql.column import OBMySQLColumn
from dbt.adapters.oceanbase_mysql.connections import OBMySQLConnectionManager
from dbt.adapters.oceanbase_mysql.relation import OBMySQLRelation
from dbt.adapters.sql import SQLAdapter


@dataclass
class OBMySQLIndex(dbtClassMixin):

columns: List[str]
algorithm: str = field(default=None)
unique: Optional[bool] = None
name: str = field(default=None)
options: List[str] = field(default=None)
column_groups: List[str] = field(default=None)

def get_name(self, relation: BaseRelation):
if self.name is not None:
return self.name
if self.columns is None:
raise DbtRuntimeError("Index columns can not be empty")
return "dbt_idx_{0}_{1}".format(
relation.quote(identifier=False).include(database=False, schema=False),
"_".join([name for name in self.columns]),
)


class OBMySQLAdapter(SQLAdapter):

Relation: Type[BaseRelation] = OBMySQLRelation
Column: Type[Column] = OBMySQLColumn
Column: Type[BaseColumn] = OBMySQLColumn
ConnectionManager: Type[BaseConnectionManager] = OBMySQLConnectionManager

CONSTRAINT_SUPPORT = {
Expand All @@ -56,3 +77,34 @@ def convert_datetime_type(cls, agate_table: agate.Table, col_idx: int) -> str:
@classmethod
def quote(cls, identifier: str) -> str:
return "`{}`".format(identifier)

def get_column_schema_from_query(self, sql: str) -> List[BaseColumn]:
_, cursor = self.connections.add_select_query(sql)
cursor.fetchone()
return [
self.Column.create(
column_name, self.connections.data_type_code_to_name(column_type_code)
)
for column_name, column_type_code, *_ in cursor.description
]

@classmethod
def render_column_constraint(cls, constraint: ColumnLevelConstraint) -> Optional[str]:
constraint_expression = constraint.expression or ""
if constraint.type == ConstraintType.unique:
rendered_column_constraint = f"unique key {constraint_expression}"
elif constraint.type == ConstraintType.foreign_key:
raise DbtRuntimeError("Foreign references should not appear in a column")
else:
return super().render_column_constraint(constraint)
if rendered_column_constraint:
rendered_column_constraint = rendered_column_constraint.strip()
return rendered_column_constraint

@available
def parse_index(self, raw_index: Dict[str, Any]) -> OBMySQLIndex:
try:
OBMySQLIndex.validate(raw_index)
return OBMySQLIndex.from_dict(raw_index)
except Exception as e:
raise DbtValidationError(f"Could not parse constraint: {raw_index}")
7 changes: 7 additions & 0 deletions dbt/adapters/oceanbase_mysql/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,10 @@ def __validate_path(path: Path):
raise DbtRuntimeError(
f"The schema '{path.schema}' is not equals to the database '{path.database}'"
)

def __str__(self) -> str:
relation = self.include(schema=False)
return relation.render() if self.limit is None else relation.render_limited()

def __repr__(self) -> str:
return "<{} {}>".format(self.__class__.__name__, self.include(schema=False).render())
127 changes: 127 additions & 0 deletions dbt/include/oceanbase_mysql/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,131 @@
{%- call statement('drop_schema') -%}
drop database if exists {{ relation.without_identifier().include(schema=False) }}
{%- endcall -%}
{% endmacro %}

{% macro oceanbase_mysql__list_schemas(database) %}
{% call statement('list_schemas', fetch_result=True, auto_begin=False) %}
show databases
{% endcall %}
{{ return(load_result('list_schemas').table) }}
{% endmacro %}

{% macro oceanbase_mysql__list_relations_without_caching(schema_relation) %}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
select
'{{ schema_relation.schema }}' as `schema`,
'{{ schema_relation.database }}' as `database`,
table_name as name, 'table' as type
from information_schema.tables
where table_schema = '{{ schema_relation.schema }}' and table_type = 'BASE TABLE'
union all
select
'{{ schema_relation.schema }}' as `schema`,
'{{ schema_relation.database }}' as `database`,
table_name as name, 'view' as type
from information_schema.tables
where table_schema = '{{ schema_relation.schema }}' and table_type like '%VIEW%'
union all
select
'{{ schema_relation.schema }}' as `schema`,
'{{ schema_relation.database }}' as `database`,
mview_name as name, 'materialized_view' as type
from oceanbase.DBA_MVIEWS
where owner = '{{ schema_relation.schema }}'
{% endcall %}
{{ return(load_result('list_relations_without_caching').table) }}
{% endmacro %}

{% macro oceanbase_mysql__create_table_as(temporary, relation, sql) -%}
{%- set external = config.get('external', default=false) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{%- set column_groups = config.get('column_groups', none) -%}

{{ sql_header if sql_header is not none }}

create {% if temporary -%}
temporary
{%- elif external -%}
external
{%- endif %} table {{ relation.include(schema=False) }}

{% if column_groups is not none %}
with column group (
{%- for column_group in column_groups -%}
{{- column_group -}}
{{ ", " if not loop.last }}
{%- endfor -%}
)
{% endif %}

{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{% endif -%}
{% if contract_config.enforced and (not temporary) -%}
{{ get_table_columns_and_constraints() }} ;
insert into {{ relation.include(schema=False) }} (
{{ adapter.dispatch('get_column_names', 'dbt')() }}
)
{%- set sql = get_select_subquery(sql) %}
{% else %}
as
{% endif %}
(
{{ sql }}
);
{%- endmacro %}

{% macro oceanbase_mysql__get_empty_schema_sql(columns) %}
{%- set col_err = [] -%}
select
{% for i in columns %}
{%- set col = columns[i] -%}
{%- if col['data_type'] is not defined -%}
{%- do col_err.append(col['name']) -%}
{%- endif -%}
{% set col_name = adapter.quote(col['name']) if col.get('quote') else col['name'] %}
{%- if col['data_type'].strip().lower() in ('int', 'bigint') -%}
cast(null as signed integer) as {{ col_name }}{{ ", " if not loop.last }}
{% else %}
cast(null as {{ col['data_type'] }}) as {{ col_name }}{{ ", " if not loop.last }}
{%- endif -%}
{%- endfor -%}
{%- if (col_err | length) > 0 -%}
{{ exceptions.column_type_missing(column_names=col_err) }}
{%- endif -%}
{% endmacro %}

{% macro oceanbase_mysql__get_create_index_sql(relation, index_dict) -%}
{%- set index_config = adapter.parse_index(index_dict) -%}
{%- set comma_separated_columns = ", ".join(index_config.columns) -%}
{%- set index_name = index_config.get_name(relation) -%}

create {% if index_config.unique -%}
unique
{%- endif %} index if not exists `{{ index_name }}`
{% if index_config.algorithm -%}
using {{ index_config.algorithm }}
{%- endif %}
on {{ relation }}
({{ comma_separated_columns }})
{% if index_config.options is not none %}
{{ " ".join(index_config.options) }}
{% endif %}
{% if index_config.column_groups is not none %}
with column group (
{{ ", ".join(index_config.column_groups) }}
)
{% endif %};
{%- endmacro %}

{% macro oceanbase_mysql__copy_grants() %}
{{ return(False) }}
{% endmacro %}

{% macro oceanbase_mysql__alter_relation_comment(relation, comment) %}
{%- set external = config.get('external', default=false) -%}
alter table {%- if external -%}
external
{%- endif %} {{ relation }} set comment='{{ comment }}';
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{% macro oceanbase_mysql__drop_materialized_view(relation) -%}
drop materialized view if exists {{ relation.include(schema=False) }}
{%- endmacro %}
3 changes: 3 additions & 0 deletions dbt/include/oceanbase_mysql/macros/relations/table/drop.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{% macro oceanbase_mysql__drop_table(relation) -%}
drop table if exists {{ relation.include(schema=False) }}
{%- endmacro %}
3 changes: 3 additions & 0 deletions dbt/include/oceanbase_mysql/macros/relations/view/drop.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{% macro oceanbase_mysql__drop_view(relation) -%}
drop view id exists {{ relation.include(schema=False) }}
{%- endmacro %}

0 comments on commit af58c4d

Please sign in to comment.