Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(view): supports materialization of view #4

Merged
merged 11 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion README-zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,13 @@ dbt-oceanbase 对表对象进行了针对性的兼容,允许用户最大限度
| 主键约束(列/表) | ✅ | `constraints.type='primary_key'` |
| 外键约束(表) | ✅ | `constraints.type='foreign_key'` |
| 表级注释 | ✅ | `models.description='this is the comment'` |
| 索引 | ✅ | `{{ config(indexes=[{"columns": ["id"],"algorithm": "BTREE", "unique": True, "options": ['GLOBAL'], "name": "idx", "column_groups": ['all columns', 'r_name(col1, col2)']}] }}` |
| 索引 | ✅ | `{{ config(indexes=[{"columns": ["id"],"algorithm": "BTREE", "unique": True, "options": ['GLOBAL'], "name": "idx", "column_groups": ['all columns', 'r_name(col1, col2)']}] }}` |

### View

| 特性 |是否支持| 使用示例 |
|:-------------|:----|:--------------------------------------------------|
| contract | ✅ | ` {{ config(contract={'enforced': True}) }}` |
| columns | ✅ | ` {{ config(columns=['col1', 'col2']) }}` |
| check option | ✅ | ` {{ config(check_option="with check option") }}` |
| 表级注释 | ✅ | `models.description='this is the comment'` |
14 changes: 14 additions & 0 deletions dbt/adapters/oceanbase_mysql/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict

from dbt.adapters.base import Column


class OBMySQLColumn(Column):

_CAST_TYPE_ALIAS: Dict[str, str] = {
"int": "signed integer",
"bigint": "signed integer",
"nchar": "character",
"char": "character",
"varchar": "character",
}

@property
def quoted(self) -> str:
return "`{}`".format(self.column)

@classmethod
def translate_cast_type(cls, dtype: str) -> str:
return cls._CAST_TYPE_ALIAS.get(dtype, dtype)
4 changes: 4 additions & 0 deletions dbt/adapters/oceanbase_mysql/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,7 @@ def parse_index(self, raw_index: Dict[str, Any]) -> OBMySQLIndex:
return OBMySQLIndex.from_dict(raw_index)
except Exception as e:
raise DbtValidationError(f"Could not parse constraint: {raw_index}")

@available
def translate_cast_type(self, dtype: str) -> str:
return OBMySQLColumn.translate_cast_type(dtype)
75 changes: 24 additions & 51 deletions dbt/include/oceanbase_mysql/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,69 +20,32 @@
{% macro oceanbase_mysql__list_relations_without_caching(schema_relation) %}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
select
'{{ schema_relation.schema }}' as `schema`,
'{{ schema_relation.database }}' as `database`,
table_name as name, 'table' as type
table_name as name,
'{{ schema_relation.schema }}' as `schema`,
'table' as type
from information_schema.tables
where table_schema = '{{ schema_relation.schema }}' and table_type = 'BASE TABLE'
union all
select
'{{ schema_relation.schema }}' as `schema`,
'{{ schema_relation.database }}' as `database`,
table_name as name, 'view' as type
table_name as name,
'{{ schema_relation.schema }}' as `schema`,
'view' as type
from information_schema.tables
where table_schema = '{{ schema_relation.schema }}' and table_type like '%VIEW%'
union all
select
'{{ schema_relation.schema }}' as `schema`,
'{{ schema_relation.database }}' as `database`,
mview_name as name, 'materialized_view' as type
mview_name as name,
'{{ schema_relation.schema }}' as `schema`,
'materialized_view' as type
from oceanbase.DBA_MVIEWS
where owner = '{{ schema_relation.schema }}'
{% endcall %}
{{ return(load_result('list_relations_without_caching').table) }}
{% endmacro %}

{% macro oceanbase_mysql__create_table_as(temporary, relation, sql) -%}
{%- set external = config.get('external', default=false) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{%- set column_groups = config.get('column_groups', none) -%}

{{ sql_header if sql_header is not none }}

create {% if temporary -%}
temporary
{%- elif external -%}
external
{%- endif %} table {{ relation.include(schema=False) }}

{% if column_groups is not none %}
with column group (
{%- for column_group in column_groups -%}
{{- column_group -}}
{{ ", " if not loop.last }}
{%- endfor -%}
)
{% endif %}

{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{% endif -%}
{% if contract_config.enforced and (not temporary) -%}
{{ get_table_columns_and_constraints() }} ;
insert into {{ relation.include(schema=False) }} (
{{ adapter.dispatch('get_column_names', 'dbt')() }}
)
{%- set sql = get_select_subquery(sql) %}
{% else %}
as
{% endif %}
(
{{ sql }}
);
{%- endmacro %}

{% macro oceanbase_mysql__get_empty_schema_sql(columns) %}
{%- set col_err = [] -%}
select
Expand All @@ -92,11 +55,7 @@
{%- do col_err.append(col['name']) -%}
{%- endif -%}
{% set col_name = adapter.quote(col['name']) if col.get('quote') else col['name'] %}
{%- if col['data_type'].strip().lower() in ('int', 'bigint') -%}
cast(null as signed integer) as {{ col_name }}{{ ", " if not loop.last }}
{% else %}
cast(null as {{ col['data_type'] }}) as {{ col_name }}{{ ", " if not loop.last }}
{%- endif -%}
cast(null as {{ adapter.translate_cast_type(col['data_type']) }}) as {{ col_name }}{{ ", " if not loop.last }}
{%- endfor -%}
{%- if (col_err | length) > 0 -%}
{{ exceptions.column_type_missing(column_names=col_err) }}
Expand Down Expand Up @@ -135,4 +94,18 @@
alter table {%- if external -%}
external
{%- endif %} {{ relation }} set comment='{{ comment }}';
{% endmacro %}

{% macro oceanbase_mysql__rename_relation(from_relation, to_relation) -%}
{#
2-step process is needed:
1. Drop the existing relation
2. Rename the new relation to existing relation
#}
{% call statement('drop_relation') %}
drop {{ to_relation.type }} if exists {{ to_relation }} cascade
{% endcall %}
{% call statement('rename_relation') %}
rename table {{ from_relation }} to {{ to_relation }}
{% endcall %}
{% endmacro %}
39 changes: 39 additions & 0 deletions dbt/include/oceanbase_mysql/macros/relations/table/create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{% macro oceanbase_mysql__create_table_as(temporary, relation, sql) -%}
{%- set external = config.get('external', default=false) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{%- set column_groups = config.get('column_groups', none) -%}

{{ sql_header if sql_header is not none }}

create {% if temporary -%}
temporary
{%- elif external -%}
external
{%- endif %} table {{ relation.include(schema=False) }}

{% if column_groups is not none %}
with column group (
{%- for column_group in column_groups -%}
{{- column_group -}}
{{ ", " if not loop.last }}
{%- endfor -%}
)
{% endif %}

{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{% endif -%}
{% if contract_config.enforced and (not temporary) -%}
{{ get_table_columns_and_constraints() }} ;
insert into {{ relation.include(schema=False) }} (
{{ adapter.dispatch('get_column_names', 'dbt')() }}
)
{%- set sql = get_select_subquery(sql) %}
{% else %}
as
{% endif %}
(
{{ sql }}
);
{%- endmacro %}
28 changes: 28 additions & 0 deletions dbt/include/oceanbase_mysql/macros/relations/view/create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{% macro oceanbase_mysql__create_view_as(relation, sql) -%}
{{ get_create_or_replace_view_as(relation, sql, False) }}
{%- endmacro %}

{% macro get_create_or_replace_view_as(relation, sql, replace=False) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{%- set columns = config.get('columns', none) -%}
{%- set check_option = config.get('check_option', none) -%}

{{ sql_header if sql_header is not none }}
create {% if replace %} or replace {% endif %} view {{ relation }}
{% if columns is not none %}
(
{{ ", ".join(columns) }}
)
{% endif %}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{%- endif %}
as (
{{ sql }}
)
{% if check_option is not none %}
{{ check_option }}
{% endif %}
;
{%- endmacro %}
2 changes: 1 addition & 1 deletion dbt/include/oceanbase_mysql/macros/relations/view/drop.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{% macro oceanbase_mysql__drop_view(relation) -%}
drop view id exists {{ relation.include(schema=False) }}
drop view if exists {{ relation.include(schema=False) }}
{%- endmacro %}
5 changes: 5 additions & 0 deletions tests/functional/basic/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from dbt.adapters.base import BaseAdapter
from dbt.cli.main import dbtRunner, dbtRunnerResult
from dbt.tests.adapter.basic.test_empty import BaseEmpty
from tests.functional.utils import BaseOBMySQLTestCase

_MODEL_SQL = """
Expand Down Expand Up @@ -72,3 +73,7 @@ def test_simple_model_run_succeed(self, project, dbt: dbtRunner):
with adapter.connection_named("test"):
_, table = adapter.execute("show tables", fetch=True)
assert ["my_first_model"] == [row.values()[0] for row in table.rows]


class TestEmpty(BaseEmpty, BaseOBMySQLTestCase):
pass
112 changes: 112 additions & 0 deletions tests/functional/materializations/test_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright (c) 2023 OceanBase.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List

import pytest

from dbt.adapters.base import BaseAdapter, BaseRelation
from dbt.adapters.contracts.relation import RelationType
from dbt.adapters.oceanbase_mysql.relation import OBMySQLRelation
from dbt.tests.adapter.column_types.test_column_types import run_dbt
from tests.functional.utils import BaseOBMySQLTestCase

_MODEL_SQL = """

{{ config(
materialized='view',
persist_docs={"relation": True},
contract={'enforced': True}) }}

select 1 as id union all select 2 as id

"""

_MODEL_CHAR_YML = """

models:
- name: my_first_model
config:
materialized: view
contract:
enforced: true
description: "this is comment"
columns:
- name: id
quote: True
data_type: varchar
constraints:
- type: not_null
- type: unique

"""

_MODEL_INT_YML = """

models:
- name: my_first_model
config:
materialized: view
contract:
enforced: true
description: "this is comment"
columns:
- name: id
quote: True
data_type: bigint
constraints:
- type: not_null
- type: unique

"""


class TestViewWithValidContract(BaseOBMySQLTestCase):

@pytest.fixture(scope="class")
def models(self):
return {
"my_first_model.sql": _MODEL_SQL,
"my_first_model.yml": _MODEL_INT_YML,
}

def test_materialization_as_view_run_succeed(self, project, dbt_profile_target):
run_dbt(args=["run"])
adapter: BaseAdapter = project.adapter
database = dbt_profile_target["database"]
with adapter.connection_named("test"):
expect = OBMySQLRelation.create(
**{
"database": database,
"schema": database,
"identifier": "my_first_model",
"type": RelationType.View,
}
)
relations: List[BaseRelation] = adapter.list_relations_without_caching(expect)
actual = [r for r in relations if r.type == RelationType.View]
assert [expect] == actual


class TestViewWithInValidContract(BaseOBMySQLTestCase):

@pytest.fixture(scope="class")
def models(self):
return {
"my_first_model.sql": _MODEL_SQL,
"my_first_model.yml": _MODEL_CHAR_YML,
}

def test_materialization_as_view_exp_thrown(self, project, dbt_profile_target):
with pytest.raises(AssertionError):
run_dbt(args=["run"])
Loading