Skip to content

Commit

Permalink
refactor: remove lineage unused code; fix tests;
Browse files Browse the repository at this point in the history
  • Loading branch information
ayush-shah committed Nov 11, 2024
1 parent d79d177 commit 30bd949
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@
Generic source to build SQL connectors.
"""
import copy
import math
import time
import traceback
from abc import ABC
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, cast

Expand Down Expand Up @@ -64,9 +61,7 @@
from metadata.ingestion.api.models import Either
from metadata.ingestion.connections.session import create_and_bind_thread_safe_session
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
from metadata.ingestion.models.patch_request import PatchedEntity, PatchRequest
from metadata.ingestion.models.topology import Queue
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import (
get_connection,
Expand All @@ -79,9 +74,7 @@
from metadata.ingestion.source.models import TableView
from metadata.utils import fqn
from metadata.utils.constraints import get_relationship_type
from metadata.utils.db_utils import get_view_lineage
from metadata.utils.execution_time_tracker import (
ExecutionTimeTrackerContextMap,
calculate_execution_time,
calculate_execution_time_generator,
)
Expand Down Expand Up @@ -618,98 +611,6 @@ def yield_table(
)
)

def multithread_process_view_lineage(self) -> Iterable[Either[OMetaLineageRequest]]:
"""Multithread Processing of a Node"""

views_list = list(self.context.get().table_views or [])
views_length = len(views_list)

if views_length != 0:
chunksize = int(math.ceil(views_length / self.source_config.threads))
chunks = [
views_list[i : i + chunksize] for i in range(0, views_length, chunksize)
]

thread_pool = ThreadPoolExecutor(max_workers=self.source_config.threads)
queue = Queue()

futures = [
thread_pool.submit(
self._process_view_def_chunk,
chunk,
queue,
self.context.get_current_thread_id(),
)
for chunk in chunks
]

while True:
if queue.has_tasks():
yield from queue.process()

else:
if not futures:
break

for i, future in enumerate(futures):
if future.done():
future.result()
futures.pop(i)

time.sleep(0.01)

def _process_view_def_chunk(
self, chunk: List[TableView], queue: Queue, thread_id: int
) -> None:
"""
Process a chunk of view definitions
"""
self.context.copy_from(thread_id)
ExecutionTimeTrackerContextMap().copy_from_parent(thread_id)
for view in [v for v in chunk if v.view_definition is not None]:
for lineage in get_view_lineage(
view=view,
metadata=self.metadata,
service_name=self.context.get().database_service,
connection_type=self.service_connection.type.value,
timeout_seconds=self.source_config.queryParsingTimeoutLimit,
):
if lineage.right is not None:
queue.put(
Either(
right=OMetaLineageRequest(
lineage_request=lineage.right,
override_lineage=self.source_config.overrideViewLineage,
)
)
)
else:
queue.put(lineage)

def _process_view_def_serial(self) -> Iterable[Either[OMetaLineageRequest]]:
"""
Process view definitions serially
"""
for view in [
v for v in self.context.get().table_views if v.view_definition is not None
]:
for lineage in get_view_lineage(
view=view,
metadata=self.metadata,
service_name=self.context.get().database_service,
connection_type=self.service_connection.type.value,
timeout_seconds=self.source_config.queryParsingTimeoutLimit,
):
if lineage.right is not None:
yield Either(
right=OMetaLineageRequest(
lineage_request=lineage.right,
override_lineage=self.source_config.overrideViewLineage,
)
)
else:
yield lineage

def _prepare_foreign_constraints( # pylint: disable=too-many-arguments, too-many-locals
self,
supports_database: bool,
Expand Down
3 changes: 1 addition & 2 deletions ingestion/tests/integration/postgres/test_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import pytest

from metadata.cli.lineage import LineageWorkflow
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.metadataIngestion.databaseServiceQueryLineagePipeline import (
DatabaseLineageConfigType,
Expand Down Expand Up @@ -56,7 +55,7 @@ def test_native_lineage(
):
ingestion_config["source"]["sourceConfig"]["config"].update(source_config)
run_workflow(MetadataWorkflow, ingestion_config)
run_workflow(LineageWorkflow, native_lineage_config)
run_workflow(MetadataWorkflow, native_lineage_config)
film_actor_edges = metadata.get_lineage_by_name(
Table, f"{db_service.fullyQualifiedName.root}.dvdrental.public.film_actor"
)
Expand Down

0 comments on commit 30bd949

Please sign in to comment.