From 2c9ba537eb14d718ff7cb106717bf36f68f0e239 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Mon, 8 May 2023 14:52:53 +0530 Subject: [PATCH] Fix min max on rowversion/timestamp mssql (#11455) --- .../src/metadata/profiler/api/workflow.py | 4 -- .../metadata/profiler/metrics/static/min.py | 1 - .../src/metadata/profiler/orm/converter.py | 2 +- .../src/metadata/profiler/orm/registry.py | 2 + .../profiler/orm/types/custom_timestamp.py | 50 +++++++++++++++++++ .../orm_profiler/test_converter.py | 3 +- 6 files changed, 55 insertions(+), 7 deletions(-) create mode 100644 ingestion/src/metadata/profiler/orm/types/custom_timestamp.py diff --git a/ingestion/src/metadata/profiler/api/workflow.py b/ingestion/src/metadata/profiler/api/workflow.py index 84cd004281fa..f3cb1eb55764 100644 --- a/ingestion/src/metadata/profiler/api/workflow.py +++ b/ingestion/src/metadata/profiler/api/workflow.py @@ -491,10 +491,6 @@ def _raise_from_status_internal(self, raise_warnings=False): raise WorkflowExecutionError( "Source reported warnings", self.source_status ) - if self.source_status.warnings: - raise WorkflowExecutionError( - "Processor reported warnings", self.source_status - ) if hasattr(self, "sink") and self.sink.get_status().warnings: raise WorkflowExecutionError( "Sink reported warnings", self.sink.get_status() diff --git a/ingestion/src/metadata/profiler/metrics/static/min.py b/ingestion/src/metadata/profiler/metrics/static/min.py index 89f949149679..32871f7f207d 100644 --- a/ingestion/src/metadata/profiler/metrics/static/min.py +++ b/ingestion/src/metadata/profiler/metrics/static/min.py @@ -14,7 +14,6 @@ """ # pylint: disable=duplicate-code - from sqlalchemy import column from sqlalchemy.ext.compiler import compiles from sqlalchemy.sql.functions import GenericFunction diff --git a/ingestion/src/metadata/profiler/orm/converter.py b/ingestion/src/metadata/profiler/orm/converter.py index 60725af8dd2d..7d5602e4564c 100644 --- a/ingestion/src/metadata/profiler/orm/converter.py +++ b/ingestion/src/metadata/profiler/orm/converter.py @@ -41,7 +41,7 @@ DataType.DOUBLE: sqlalchemy.DECIMAL, DataType.DECIMAL: sqlalchemy.DECIMAL, DataType.NUMERIC: sqlalchemy.NUMERIC, - DataType.TIMESTAMP: sqlalchemy.TIMESTAMP, + DataType.TIMESTAMP: CustomTypes.TIMESTAMP.value, DataType.TIME: sqlalchemy.TIME, DataType.DATE: sqlalchemy.DATE, DataType.DATETIME: sqlalchemy.DATETIME, diff --git a/ingestion/src/metadata/profiler/orm/registry.py b/ingestion/src/metadata/profiler/orm/registry.py index 3b2ee0a17223..eee00e29e84a 100644 --- a/ingestion/src/metadata/profiler/orm/registry.py +++ b/ingestion/src/metadata/profiler/orm/registry.py @@ -21,6 +21,7 @@ from metadata.ingestion.source import sqa_types from metadata.profiler.orm.types.bytea_to_string import ByteaToHex from metadata.profiler.orm.types.custom_array import CustomArray +from metadata.profiler.orm.types.custom_timestamp import CustomTimestamp from metadata.profiler.orm.types.hex_byte_string import HexByteString from metadata.profiler.orm.types.uuid import UUIDString from metadata.profiler.registry import TypeRegistry @@ -32,6 +33,7 @@ class CustomTypes(TypeRegistry): UUID = UUIDString BYTEA = ByteaToHex ARRAY = CustomArray + TIMESTAMP = CustomTimestamp class Dialects(Enum): diff --git a/ingestion/src/metadata/profiler/orm/types/custom_timestamp.py b/ingestion/src/metadata/profiler/orm/types/custom_timestamp.py new file mode 100644 index 000000000000..00946d210c33 --- /dev/null +++ b/ingestion/src/metadata/profiler/orm/types/custom_timestamp.py @@ -0,0 +1,50 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=abstract-method + +""" +Expand sqlalchemy types to map them to OpenMetadata DataType +""" +from sqlalchemy.sql.sqltypes import TIMESTAMP, TypeDecorator + +from metadata.utils.logger import profiler_logger + +logger = profiler_logger() + + +class CustomTimestamp(TypeDecorator): + """ + Convert RowVersion + """ + + impl = TIMESTAMP + cache_ok = True + + @property + def python_type(self): + return str + + def process_result_value(self, value, dialect): + """This is executed during result retrieval + + Args: + value: database record + dialect: database dialect + Returns: + python rowversion conversion to timestamp + """ + import struct # pylint: disable=import-outside-toplevel + + if dialect.name == "mssql" and isinstance(value, bytes): + bytes_to_int = struct.unpack(">Q", value)[0] + return bytes_to_int + return value diff --git a/ingestion/tests/integration/orm_profiler/test_converter.py b/ingestion/tests/integration/orm_profiler/test_converter.py index a5d1b5158dbd..1fa126175969 100644 --- a/ingestion/tests/integration/orm_profiler/test_converter.py +++ b/ingestion/tests/integration/orm_profiler/test_converter.py @@ -44,6 +44,7 @@ ) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.profiler.orm.converter import ometa_to_sqa_orm +from metadata.profiler.orm.types.custom_timestamp import CustomTimestamp class ProfilerWorkflowTest(TestCase): @@ -121,7 +122,7 @@ def test_no_db_conversion(self): assert isinstance(orm_table.id.type, sqlalchemy.BIGINT) assert isinstance(orm_table.name.type, sqlalchemy.String) assert isinstance(orm_table.age.type, sqlalchemy.INTEGER) - assert isinstance(orm_table.last_updated.type, sqlalchemy.TIMESTAMP) + assert isinstance(orm_table.last_updated.type, CustomTimestamp) assert isinstance(orm_table.created_date.type, sqlalchemy.DATE) assert isinstance(orm_table.group.type, sqlalchemy.CHAR) assert isinstance(orm_table.savings.type, sqlalchemy.DECIMAL)