diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index c9ffddb50b52..197c3b01d824 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -12,11 +12,8 @@ Generic source to build SQL connectors. """ import copy -import math -import time import traceback from abc import ABC -from concurrent.futures import ThreadPoolExecutor from copy import deepcopy from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, cast @@ -64,9 +61,7 @@ from metadata.ingestion.api.models import Either from metadata.ingestion.connections.session import create_and_bind_thread_safe_session from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification -from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest from metadata.ingestion.models.patch_request import PatchedEntity, PatchRequest -from metadata.ingestion.models.topology import Queue from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import ( get_connection, @@ -79,9 +74,7 @@ from metadata.ingestion.source.models import TableView from metadata.utils import fqn from metadata.utils.constraints import get_relationship_type -from metadata.utils.db_utils import get_view_lineage from metadata.utils.execution_time_tracker import ( - ExecutionTimeTrackerContextMap, calculate_execution_time, calculate_execution_time_generator, ) @@ -618,98 +611,6 @@ def yield_table( ) ) - def multithread_process_view_lineage(self) -> Iterable[Either[OMetaLineageRequest]]: - """Multithread Processing of a Node""" - - views_list = list(self.context.get().table_views or []) - views_length = len(views_list) - - if views_length != 0: - chunksize = int(math.ceil(views_length / self.source_config.threads)) - chunks = [ - views_list[i : i + chunksize] for i in range(0, views_length, chunksize) - ] - - thread_pool = ThreadPoolExecutor(max_workers=self.source_config.threads) - queue = Queue() - - futures = [ - thread_pool.submit( - self._process_view_def_chunk, - chunk, - queue, - self.context.get_current_thread_id(), - ) - for chunk in chunks - ] - - while True: - if queue.has_tasks(): - yield from queue.process() - - else: - if not futures: - break - - for i, future in enumerate(futures): - if future.done(): - future.result() - futures.pop(i) - - time.sleep(0.01) - - def _process_view_def_chunk( - self, chunk: List[TableView], queue: Queue, thread_id: int - ) -> None: - """ - Process a chunk of view definitions - """ - self.context.copy_from(thread_id) - ExecutionTimeTrackerContextMap().copy_from_parent(thread_id) - for view in [v for v in chunk if v.view_definition is not None]: - for lineage in get_view_lineage( - view=view, - metadata=self.metadata, - service_name=self.context.get().database_service, - connection_type=self.service_connection.type.value, - timeout_seconds=self.source_config.queryParsingTimeoutLimit, - ): - if lineage.right is not None: - queue.put( - Either( - right=OMetaLineageRequest( - lineage_request=lineage.right, - override_lineage=self.source_config.overrideViewLineage, - ) - ) - ) - else: - queue.put(lineage) - - def _process_view_def_serial(self) -> Iterable[Either[OMetaLineageRequest]]: - """ - Process view definitions serially - """ - for view in [ - v for v in self.context.get().table_views if v.view_definition is not None - ]: - for lineage in get_view_lineage( - view=view, - metadata=self.metadata, - service_name=self.context.get().database_service, - connection_type=self.service_connection.type.value, - timeout_seconds=self.source_config.queryParsingTimeoutLimit, - ): - if lineage.right is not None: - yield Either( - right=OMetaLineageRequest( - lineage_request=lineage.right, - override_lineage=self.source_config.overrideViewLineage, - ) - ) - else: - yield lineage - def _prepare_foreign_constraints( # pylint: disable=too-many-arguments, too-many-locals self, supports_database: bool, diff --git a/ingestion/tests/integration/postgres/test_lineage.py b/ingestion/tests/integration/postgres/test_lineage.py index d663adee8b90..7b688ce17196 100644 --- a/ingestion/tests/integration/postgres/test_lineage.py +++ b/ingestion/tests/integration/postgres/test_lineage.py @@ -4,7 +4,6 @@ import pytest -from metadata.cli.lineage import LineageWorkflow from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.metadataIngestion.databaseServiceQueryLineagePipeline import ( DatabaseLineageConfigType, @@ -56,7 +55,7 @@ def test_native_lineage( ): ingestion_config["source"]["sourceConfig"]["config"].update(source_config) run_workflow(MetadataWorkflow, ingestion_config) - run_workflow(LineageWorkflow, native_lineage_config) + run_workflow(MetadataWorkflow, native_lineage_config) film_actor_edges = metadata.get_lineage_by_name( Table, f"{db_service.fullyQualifiedName.root}.dvdrental.public.film_actor" )