From 6fa03ee66a9e42a069e81d8564f6114c147f09ba Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Wed, 13 Nov 2024 00:08:55 +0530 Subject: [PATCH] Fixes GEN-1994: Remove View Lineage from Metadata Ingestion flow (#18558) --- .../native/1.6.0/mysql/schemaChanges.sql | 4 + .../native/1.6.0/postgres/schemaChanges.sql | 4 + .../source/database/common_db_source.py | 118 +----------------- .../source/database/common_nosql_source.py | 14 --- .../source/database/database_service.py | 20 +-- .../source/database/datalake/metadata.py | 13 +- .../source/database/deltalake/metadata.py | 13 +- .../source/database/domodatabase/metadata.py | 13 +- .../source/database/glue/metadata.py | 13 +- .../source/database/iceberg/metadata.py | 17 +-- .../source/database/salesforce/metadata.py | 13 +- .../ingestion/source/database/sas/metadata.py | 11 +- .../source/database/unitycatalog/metadata.py | 23 +--- ingestion/tests/cli_e2e/common/test_cli_db.py | 13 +- .../integration/postgres/test_lineage.py | 1 + .../deployment/upgrade/versions/150-to-160.md | 52 ++++++++ .../develop-ingestion-code.md | 9 -- .../databaseServiceMetadataPipeline.json | 6 - 18 files changed, 74 insertions(+), 283 deletions(-) create mode 100644 openmetadata-docs/content/v1.6.x-SNAPSHOT/deployment/upgrade/versions/150-to-160.md diff --git a/bootstrap/sql/migrations/native/1.6.0/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.6.0/mysql/schemaChanges.sql index 70914e8086a0..fcb818c365c4 100644 --- a/bootstrap/sql/migrations/native/1.6.0/mysql/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.6.0/mysql/schemaChanges.sql @@ -81,3 +81,7 @@ CREATE TABLE IF NOT EXISTS successful_sent_change_events ( -- Create an index on the event_subscription_id column in the successful_sent_change_events table CREATE INDEX idx_event_subscription_id ON successful_sent_change_events (event_subscription_id); +-- Remove Override View Lineage +UPDATE ingestion_pipeline_entity +SET json = JSON_REMOVE(json, '$.sourceConfig.config.overrideViewLineage') +WHERE JSON_EXTRACT(json, '$.pipelineType') = 'metadata'; diff --git a/bootstrap/sql/migrations/native/1.6.0/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.6.0/postgres/schemaChanges.sql index 31f80b8d578d..242793bc4dfc 100644 --- a/bootstrap/sql/migrations/native/1.6.0/postgres/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.6.0/postgres/schemaChanges.sql @@ -60,3 +60,7 @@ CREATE TABLE IF NOT EXISTS successful_sent_change_events ( -- Create an index on the event_subscription_id column in the successful_sent_change_events table CREATE INDEX idx_event_subscription_id ON successful_sent_change_events (event_subscription_id); +-- Remove Override View Lineage +UPDATE ingestion_pipeline_entity +SET json = json::jsonb #- '{sourceConfig,config,overrideViewLineage}' +WHERE json #>> '{pipelineType}' = 'metadata'; diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index ddb53f713d89..663b21628cbc 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -12,13 +12,10 @@ Generic source to build SQL connectors. """ import copy -import math -import time import traceback from abc import ABC -from concurrent.futures import ThreadPoolExecutor from copy import deepcopy -from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, cast +from typing import Any, Dict, Iterable, List, Optional, Tuple, cast from pydantic import BaseModel from sqlalchemy.engine import Connection @@ -30,12 +27,10 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) -from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.data.createStoredProcedure import ( CreateStoredProcedureRequest, ) from metadata.generated.schema.api.data.createTable import CreateTableRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import ( @@ -64,9 +59,7 @@ from metadata.ingestion.api.models import Either from metadata.ingestion.connections.session import create_and_bind_thread_safe_session from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification -from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest from metadata.ingestion.models.patch_request import PatchedEntity, PatchRequest -from metadata.ingestion.models.topology import Queue from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import ( get_connection, @@ -79,9 +72,7 @@ from metadata.ingestion.source.models import TableView from metadata.utils import fqn from metadata.utils.constraints import get_relationship_type -from metadata.utils.db_utils import get_view_lineage from metadata.utils.execution_time_tracker import ( - ExecutionTimeTrackerContextMap, calculate_execution_time, calculate_execution_time_generator, ) @@ -483,13 +474,6 @@ def yield_stored_procedure( def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: """Not Implemented""" - @calculate_execution_time_generator() - def yield_procedure_lineage_and_queries( - self, - ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: - """Not Implemented""" - yield from [] - def get_location_path(self, table_name: str, schema_name: str) -> Optional[str]: """ Method to fetch the location path of the table @@ -618,106 +602,6 @@ def yield_table( ) ) - def multithread_process_view_lineage(self) -> Iterable[Either[OMetaLineageRequest]]: - """Multithread Processing of a Node""" - - views_list = list(self.context.get().table_views or []) - views_length = len(views_list) - - if views_length != 0: - chunksize = int(math.ceil(views_length / self.source_config.threads)) - chunks = [ - views_list[i : i + chunksize] for i in range(0, views_length, chunksize) - ] - - thread_pool = ThreadPoolExecutor(max_workers=self.source_config.threads) - queue = Queue() - - futures = [ - thread_pool.submit( - self._process_view_def_chunk, - chunk, - queue, - self.context.get_current_thread_id(), - ) - for chunk in chunks - ] - - while True: - if queue.has_tasks(): - yield from queue.process() - - else: - if not futures: - break - - for i, future in enumerate(futures): - if future.done(): - future.result() - futures.pop(i) - - time.sleep(0.01) - - def _process_view_def_chunk( - self, chunk: List[TableView], queue: Queue, thread_id: int - ) -> None: - """ - Process a chunk of view definitions - """ - self.context.copy_from(thread_id) - ExecutionTimeTrackerContextMap().copy_from_parent(thread_id) - for view in [v for v in chunk if v.view_definition is not None]: - for lineage in get_view_lineage( - view=view, - metadata=self.metadata, - service_name=self.context.get().database_service, - connection_type=self.service_connection.type.value, - timeout_seconds=self.source_config.queryParsingTimeoutLimit, - ): - if lineage.right is not None: - queue.put( - Either( - right=OMetaLineageRequest( - lineage_request=lineage.right, - override_lineage=self.source_config.overrideViewLineage, - ) - ) - ) - else: - queue.put(lineage) - - def _process_view_def_serial(self) -> Iterable[Either[OMetaLineageRequest]]: - """ - Process view definitions serially - """ - for view in [ - v for v in self.context.get().table_views if v.view_definition is not None - ]: - for lineage in get_view_lineage( - view=view, - metadata=self.metadata, - service_name=self.context.get().database_service, - connection_type=self.service_connection.type.value, - timeout_seconds=self.source_config.queryParsingTimeoutLimit, - ): - if lineage.right is not None: - yield Either( - right=OMetaLineageRequest( - lineage_request=lineage.right, - override_lineage=self.source_config.overrideViewLineage, - ) - ) - else: - yield lineage - - @calculate_execution_time_generator() - def yield_view_lineage(self) -> Iterable[Either[OMetaLineageRequest]]: - logger.info("Processing Lineage for Views") - if self.source_config.threads > 1: - yield from self.multithread_process_view_lineage() - else: - yield from self._process_view_def_serial() - def _prepare_foreign_constraints( # pylint: disable=too-many-arguments, too-many-locals self, supports_database: bool, diff --git a/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py b/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py index c0b968ddd978..81ae7da59d49 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py @@ -20,12 +20,10 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) -from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.data.createStoredProcedure import ( CreateStoredProcedureRequest, ) from metadata.generated.schema.api.data.createTable import CreateTableRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import ( @@ -272,12 +270,6 @@ def yield_table( ) ) - def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]: - """ - views are not supported with NoSQL - """ - yield from [] - def yield_tag( self, schema_name: str ) -> Iterable[Either[OMetaTagAndClassification]]: @@ -296,12 +288,6 @@ def yield_stored_procedure( def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: """Not Implemented""" - def yield_procedure_lineage_and_queries( - self, - ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: - """Not Implemented""" - yield from [] - def get_source_url( self, database_name: Optional[str] = None, diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index c2dd0368b3b5..594d6b3c17d1 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -13,7 +13,7 @@ """ import traceback from abc import ABC, abstractmethod -from typing import Any, Iterable, List, Optional, Set, Tuple, Union +from typing import Any, Iterable, List, Optional, Set, Tuple from pydantic import BaseModel, Field from sqlalchemy.engine import Inspector @@ -23,7 +23,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) -from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.data.createStoredProcedure import ( CreateStoredProcedureRequest, ) @@ -112,11 +111,7 @@ class DatabaseServiceTopology(ServiceTopology): ), ], children=["database"], - # Note how we have `yield_view_lineage` and `yield_stored_procedure_lineage` - # as post_processed. This is because we cannot ensure proper lineage processing - # until we have finished ingesting all the metadata from the source. post_process=[ - "yield_view_lineage", "yield_external_table_lineage", "yield_table_constraints", ], @@ -345,13 +340,6 @@ def yield_database_tag_details( if self.source_config.includeTags: yield from self.yield_database_tag(database_name) or [] - @abstractmethod - def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]: - """ - From topology. - Parses view definition to get lineage information - """ - def update_table_constraints( self, table_name, @@ -387,12 +375,6 @@ def yield_stored_procedure( ) -> Iterable[Either[CreateStoredProcedureRequest]]: """Process the stored procedure information""" - @abstractmethod - def yield_procedure_lineage_and_queries( - self, - ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: - """Extracts the lineage information from Stored Procedures""" - def get_raw_database_schema_names(self) -> Iterable[str]: """ fetch all schema names without any filtering. diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py index ee4516fad315..20c3f0a57847 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py @@ -14,18 +14,16 @@ """ import json import traceback -from typing import Any, Iterable, Optional, Tuple, Union +from typing import Any, Iterable, Optional, Tuple from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) -from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.data.createStoredProcedure import ( CreateStoredProcedureRequest, ) from metadata.generated.schema.api.data.createTable import CreateTableRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import Table, TableType @@ -319,9 +317,6 @@ def yield_table( ) ) - def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]: - yield from [] - def yield_tag( self, schema_name: str ) -> Iterable[Either[OMetaTagAndClassification]]: @@ -338,12 +333,6 @@ def yield_stored_procedure( def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: """Not Implemented""" - def yield_procedure_lineage_and_queries( - self, - ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: - """Not Implemented""" - yield from [] - def standardize_table_name( self, schema: str, table: str # pylint: disable=unused-argument ) -> str: diff --git a/ingestion/src/metadata/ingestion/source/database/deltalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/deltalake/metadata.py index 37ae60633882..41b95e4571a8 100644 --- a/ingestion/src/metadata/ingestion/source/database/deltalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/deltalake/metadata.py @@ -12,18 +12,16 @@ Deltalake source methods. """ import traceback -from typing import Any, Iterable, Optional, Tuple, Union +from typing import Any, Iterable, Optional, Tuple from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) -from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.data.createStoredProcedure import ( CreateStoredProcedureRequest, ) from metadata.generated.schema.api.data.createTable import CreateTableRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import Table, TablePartition, TableType @@ -283,9 +281,6 @@ def yield_table( def prepare(self): """Nothing to prepare""" - def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]: - yield from [] - def yield_tag( self, schema_name: str ) -> Iterable[Either[OMetaTagAndClassification]]: @@ -302,11 +297,5 @@ def yield_stored_procedure( def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: """Not Implemented""" - def yield_procedure_lineage_and_queries( - self, - ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: - """Not Implemented""" - yield from [] - def close(self): """No client to close""" diff --git a/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py b/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py index 5485250d76a1..970a66eb5782 100644 --- a/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py @@ -14,18 +14,16 @@ """ import traceback -from typing import Any, Iterable, Optional, Tuple, Union +from typing import Any, Iterable, Optional, Tuple from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) -from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.data.createStoredProcedure import ( CreateStoredProcedureRequest, ) from metadata.generated.schema.api.data.createTable import CreateTableRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import ( @@ -294,15 +292,6 @@ def yield_stored_procedure( def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: """Not Implemented""" - def yield_procedure_lineage_and_queries( - self, - ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: - """Not Implemented""" - yield from [] - - def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]: - yield from [] - def get_source_url( self, table_name: Optional[str] = None, diff --git a/ingestion/src/metadata/ingestion/source/database/glue/metadata.py b/ingestion/src/metadata/ingestion/source/database/glue/metadata.py index 35be904cd70b..7111ce099292 100755 --- a/ingestion/src/metadata/ingestion/source/database/glue/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/glue/metadata.py @@ -12,18 +12,16 @@ Glue source methods. """ import traceback -from typing import Any, Iterable, Optional, Tuple, Union +from typing import Any, Iterable, Optional, Tuple from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) -from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.data.createStoredProcedure import ( CreateStoredProcedureRequest, ) from metadata.generated.schema.api.data.createTable import CreateTableRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import ( @@ -404,9 +402,6 @@ def get_format(cls, storage: StorageDetails) -> Optional[FileFormat]: def standardize_table_name(self, _: str, table: str) -> str: return table[:128] - def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]: - yield from [] - def yield_tag( self, schema_name: str ) -> Iterable[Either[OMetaTagAndClassification]]: @@ -423,12 +418,6 @@ def yield_stored_procedure( def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: """Not Implemented""" - def yield_procedure_lineage_and_queries( - self, - ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: - """Not Implemented""" - yield from [] - def get_source_url( self, database_name: Optional[str], diff --git a/ingestion/src/metadata/ingestion/source/database/iceberg/metadata.py b/ingestion/src/metadata/ingestion/source/database/iceberg/metadata.py index e7336be8f1c5..6a9d0863a58f 100644 --- a/ingestion/src/metadata/ingestion/source/database/iceberg/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/iceberg/metadata.py @@ -12,7 +12,7 @@ Iceberg source methods. """ import traceback -from typing import Any, Iterable, Optional, Tuple, Union +from typing import Any, Iterable, Optional, Tuple import pyiceberg import pyiceberg.exceptions @@ -21,12 +21,10 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) -from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.data.createStoredProcedure import ( CreateStoredProcedureRequest, ) from metadata.generated.schema.api.data.createTable import CreateTableRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import Table, TableType @@ -293,13 +291,6 @@ def yield_tag( """ yield from [] - def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]: - """ - From topology. - Parses view definition to get lineage information - """ - yield from [] - def get_stored_procedures(self) -> Iterable[Any]: """Not Implemented""" @@ -309,11 +300,5 @@ def yield_stored_procedure( """Process the stored procedure information""" yield from [] - def yield_procedure_lineage_and_queries( - self, - ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: - """Extracts the lineage information from Stored Procedures""" - yield from [] - def close(self): """There is no connection to close.""" diff --git a/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py b/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py index a40ac76fa694..0e68053ab90c 100644 --- a/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/salesforce/metadata.py @@ -12,18 +12,16 @@ Salesforce source ingestion """ import traceback -from typing import Any, Iterable, Optional, Tuple, Union +from typing import Any, Iterable, Optional, Tuple from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) -from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.data.createStoredProcedure import ( CreateStoredProcedureRequest, ) from metadata.generated.schema.api.data.createTable import CreateTableRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import ( @@ -324,9 +322,6 @@ def column_type(self, column_type: str): return DataType.VARCHAR.value return DataType.UNKNOWN.value - def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]: - yield from [] - def yield_tag( self, schema_name: str ) -> Iterable[Either[OMetaTagAndClassification]]: @@ -343,12 +338,6 @@ def yield_stored_procedure( def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: """Not Implemented""" - def yield_procedure_lineage_and_queries( - self, - ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: - """Not Implemented""" - yield from [] - def standardize_table_name( # pylint: disable=unused-argument self, schema: str, table: str ) -> str: diff --git a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py index c6fbc8fb0c29..3ce2270e85f5 100644 --- a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py @@ -19,7 +19,7 @@ import re import traceback from datetime import datetime, timezone -from typing import Any, Iterable, Optional, Tuple, Union +from typing import Any, Iterable, Optional, Tuple from requests.exceptions import HTTPError @@ -28,7 +28,6 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) -from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.data.createStoredProcedure import ( CreateStoredProcedureRequest, ) @@ -881,9 +880,6 @@ def yield_tag( ) -> Iterable[Either[OMetaTagAndClassification]]: """No tags to send""" - def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]: - yield from [] - def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, list]]]: """Not implemented""" @@ -900,11 +896,6 @@ def yield_stored_procedure( ) -> Iterable[Either[CreateStoredProcedureRequest]]: """Not implemented""" - def yield_procedure_lineage_and_queries( - self, - ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: - yield from [] - def close(self) -> None: pass diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py index f738e3c883a9..6eb03388cf2c 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py @@ -13,7 +13,7 @@ """ import json import traceback -from typing import Any, Iterable, List, Optional, Tuple, Union +from typing import Any, Iterable, List, Optional, Tuple from databricks.sdk.service.catalog import ColumnInfo from databricks.sdk.service.catalog import TableConstraint as DBTableConstraint @@ -22,12 +22,10 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) -from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.data.createStoredProcedure import ( CreateStoredProcedureRequest, ) from metadata.generated.schema.api.data.createTable import CreateTableRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import ( @@ -76,7 +74,6 @@ ) from metadata.ingestion.source.models import TableView from metadata.utils import fqn -from metadata.utils.db_utils import get_view_lineage from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table from metadata.utils.logger import ingestion_logger @@ -521,18 +518,6 @@ def get_columns(self, column_data: List[ColumnInfo]) -> Iterable[Column]: ) yield parsed_column - def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]: - logger.info("Processing Lineage for Views") - for view in [ - v for v in self.context.get().table_views if v.view_definition is not None - ]: - yield from get_view_lineage( - view=view, - metadata=self.metadata, - service_name=self.context.get().database_service, - connection_type=self.service_connection.type.value, - ) - def yield_tag( self, schema_name: str ) -> Iterable[Either[OMetaTagAndClassification]]: @@ -549,12 +534,6 @@ def yield_stored_procedure( def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: """Not Implemented""" - def yield_procedure_lineage_and_queries( - self, - ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: - """Not Implemented""" - yield from [] - def close(self): """Nothing to close""" diff --git a/ingestion/tests/cli_e2e/common/test_cli_db.py b/ingestion/tests/cli_e2e/common/test_cli_db.py index 981e092ef407..17fa666477d2 100644 --- a/ingestion/tests/cli_e2e/common/test_cli_db.py +++ b/ingestion/tests/cli_e2e/common/test_cli_db.py @@ -98,17 +98,10 @@ def assert_for_table_with_profiler( self.expected_profiled_tables(), ) sample_data = self.retrieve_sample_data(self.fqn_created_table()).sampleData - lineage = self.retrieve_lineage(self.fqn_created_table()) self.assertEqual(len(sample_data.rows), self.inserted_rows_count()) - if self.view_column_lineage_count() is not None: - self.assertEqual( - len( - lineage["downstreamEdges"][0]["lineageDetails"][ - "columnsLineage" - ] - ), - self.view_column_lineage_count(), - ) + # Since we removed view lineage from metadata workflow as part + # of https://github.com/open-metadata/OpenMetadata/pull/18558 + # we need to introduce Lineage E2E base and add view lineage check there. def assert_for_table_with_profiler_time_partition( self, source_status: Status, sink_status: Status diff --git a/ingestion/tests/integration/postgres/test_lineage.py b/ingestion/tests/integration/postgres/test_lineage.py index 15aab0f766af..7b688ce17196 100644 --- a/ingestion/tests/integration/postgres/test_lineage.py +++ b/ingestion/tests/integration/postgres/test_lineage.py @@ -55,6 +55,7 @@ def test_native_lineage( ): ingestion_config["source"]["sourceConfig"]["config"].update(source_config) run_workflow(MetadataWorkflow, ingestion_config) + run_workflow(MetadataWorkflow, native_lineage_config) film_actor_edges = metadata.get_lineage_by_name( Table, f"{db_service.fullyQualifiedName.root}.dvdrental.public.film_actor" ) diff --git a/openmetadata-docs/content/v1.6.x-SNAPSHOT/deployment/upgrade/versions/150-to-160.md b/openmetadata-docs/content/v1.6.x-SNAPSHOT/deployment/upgrade/versions/150-to-160.md new file mode 100644 index 000000000000..8fa855fc5caa --- /dev/null +++ b/openmetadata-docs/content/v1.6.x-SNAPSHOT/deployment/upgrade/versions/150-to-160.md @@ -0,0 +1,52 @@ +--- +title: Upgrade 1.5.x to 1.6.x +slug: /deployment/upgrade/versions/150-to-160 +collate: false +--- + +# Upgrade from 1.5.x to 1.6.x + +Upgrading from version 1.5.x to 1.6.x can be executed directly on your instances. Below are important details and considerations for a smooth upgrade process. + + +## Deprecation Notice + +## Breaking Changes in 1.6.x Stable Release + +### View Lineage Transition to Lineage Workflow + +The View Lineage feature has been relocated to the Lineage Workflow. This adjustment aims to enhance user experience and streamline access to lineage information. + +#### Key Changes in YAML Configuration + +As part of this upgrade, please note the modifications required in your YAML files for metadata ingestion: + +- The `overrideViewLineage` configuration has been deprecated in the DatabaseMetadata source configuration. + +#### Old Configuration Example + +```yaml + .... + sourceConfig: + config: + type: DatabaseMetadata + ..... + overrideViewLineage: true # deprecated + ..... +``` + +#### New Configuration Requirement +The `overrideViewLineage` setting will now be part of the DatabaseLineage configuration within the Lineage Workflow: + + +```yaml + .... + sourceConfig: + config: + type: DatabaseLineage + ..... + lineageInformation: + overrideViewLineage: true + ..... +``` + \ No newline at end of file diff --git a/openmetadata-docs/content/v1.6.x-SNAPSHOT/developers/contribute/developing-a-new-connector/develop-ingestion-code.md b/openmetadata-docs/content/v1.6.x-SNAPSHOT/developers/contribute/developing-a-new-connector/develop-ingestion-code.md index dc7cbedc551f..8b180b477ce5 100644 --- a/openmetadata-docs/content/v1.6.x-SNAPSHOT/developers/contribute/developing-a-new-connector/develop-ingestion-code.md +++ b/openmetadata-docs/content/v1.6.x-SNAPSHOT/developers/contribute/developing-a-new-connector/develop-ingestion-code.md @@ -55,10 +55,6 @@ class DatabaseServiceTopology(ServiceTopology): ), ], children=["database"], - # Note how we have `yield_view_lineage` and `yield_stored_procedure_lineage` - # as post_processed. This is because we cannot ensure proper lineage processing - # until we have finished ingesting all the metadata from the source. - post_process=["yield_view_lineage", "yield_procedure_lineage_and_queries"], ) database = TopologyNode( producer="get_database_names", @@ -330,11 +326,6 @@ class DatabaseServiceSource( ) -> Iterable[Either[CreateStoredProcedureRequest]]: """Process the stored procedure information""" - @abstractmethod - def yield_procedure_lineage_and_queries( - self, - ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: - """Extracts the lineage information from Stored Procedures""" def get_raw_database_schema_names(self) -> Iterable[str]: """ diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json index 7a6a39f54bf2..f02d9af7d8e4 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json @@ -102,12 +102,6 @@ "type": "boolean", "default": false }, - "overrideViewLineage":{ - "title": "Override View Lineage", - "description": "Set the 'Override View Lineage' toggle to control whether to override the existing view lineage.", - "type": "boolean", - "default": false - }, "queryLogDuration": { "description": "Configuration to tune how far we want to look back in query logs to process Stored Procedures results.", "type": "integer",