Skip to content

Commit

Permalink
Fixes GEN-1994: Remove View Lineage from Metadata Ingestion flow (#18558
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ayush-shah authored Nov 12, 2024
1 parent b5f4aee commit 6fa03ee
Show file tree
Hide file tree
Showing 18 changed files with 74 additions and 283 deletions.
4 changes: 4 additions & 0 deletions bootstrap/sql/migrations/native/1.6.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@ CREATE TABLE IF NOT EXISTS successful_sent_change_events (
-- Create an index on the event_subscription_id column in the successful_sent_change_events table
CREATE INDEX idx_event_subscription_id ON successful_sent_change_events (event_subscription_id);

-- Remove Override View Lineage
UPDATE ingestion_pipeline_entity
SET json = JSON_REMOVE(json, '$.sourceConfig.config.overrideViewLineage')
WHERE JSON_EXTRACT(json, '$.pipelineType') = 'metadata';
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,7 @@ CREATE TABLE IF NOT EXISTS successful_sent_change_events (
-- Create an index on the event_subscription_id column in the successful_sent_change_events table
CREATE INDEX idx_event_subscription_id ON successful_sent_change_events (event_subscription_id);

-- Remove Override View Lineage
UPDATE ingestion_pipeline_entity
SET json = json::jsonb #- '{sourceConfig,config,overrideViewLineage}'
WHERE json #>> '{pipelineType}' = 'metadata';
118 changes: 1 addition & 117 deletions ingestion/src/metadata/ingestion/source/database/common_db_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,10 @@
Generic source to build SQL connectors.
"""
import copy
import math
import time
import traceback
from abc import ABC
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, cast
from typing import Any, Dict, Iterable, List, Optional, Tuple, cast

from pydantic import BaseModel
from sqlalchemy.engine import Connection
Expand All @@ -30,12 +27,10 @@
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import (
Expand Down Expand Up @@ -64,9 +59,7 @@
from metadata.ingestion.api.models import Either
from metadata.ingestion.connections.session import create_and_bind_thread_safe_session
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
from metadata.ingestion.models.patch_request import PatchedEntity, PatchRequest
from metadata.ingestion.models.topology import Queue
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import (
get_connection,
Expand All @@ -79,9 +72,7 @@
from metadata.ingestion.source.models import TableView
from metadata.utils import fqn
from metadata.utils.constraints import get_relationship_type
from metadata.utils.db_utils import get_view_lineage
from metadata.utils.execution_time_tracker import (
ExecutionTimeTrackerContextMap,
calculate_execution_time,
calculate_execution_time_generator,
)
Expand Down Expand Up @@ -483,13 +474,6 @@ def yield_stored_procedure(
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""Not Implemented"""

@calculate_execution_time_generator()
def yield_procedure_lineage_and_queries(
self,
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
"""Not Implemented"""
yield from []

def get_location_path(self, table_name: str, schema_name: str) -> Optional[str]:
"""
Method to fetch the location path of the table
Expand Down Expand Up @@ -618,106 +602,6 @@ def yield_table(
)
)

def multithread_process_view_lineage(self) -> Iterable[Either[OMetaLineageRequest]]:
"""Multithread Processing of a Node"""

views_list = list(self.context.get().table_views or [])
views_length = len(views_list)

if views_length != 0:
chunksize = int(math.ceil(views_length / self.source_config.threads))
chunks = [
views_list[i : i + chunksize] for i in range(0, views_length, chunksize)
]

thread_pool = ThreadPoolExecutor(max_workers=self.source_config.threads)
queue = Queue()

futures = [
thread_pool.submit(
self._process_view_def_chunk,
chunk,
queue,
self.context.get_current_thread_id(),
)
for chunk in chunks
]

while True:
if queue.has_tasks():
yield from queue.process()

else:
if not futures:
break

for i, future in enumerate(futures):
if future.done():
future.result()
futures.pop(i)

time.sleep(0.01)

def _process_view_def_chunk(
self, chunk: List[TableView], queue: Queue, thread_id: int
) -> None:
"""
Process a chunk of view definitions
"""
self.context.copy_from(thread_id)
ExecutionTimeTrackerContextMap().copy_from_parent(thread_id)
for view in [v for v in chunk if v.view_definition is not None]:
for lineage in get_view_lineage(
view=view,
metadata=self.metadata,
service_name=self.context.get().database_service,
connection_type=self.service_connection.type.value,
timeout_seconds=self.source_config.queryParsingTimeoutLimit,
):
if lineage.right is not None:
queue.put(
Either(
right=OMetaLineageRequest(
lineage_request=lineage.right,
override_lineage=self.source_config.overrideViewLineage,
)
)
)
else:
queue.put(lineage)

def _process_view_def_serial(self) -> Iterable[Either[OMetaLineageRequest]]:
"""
Process view definitions serially
"""
for view in [
v for v in self.context.get().table_views if v.view_definition is not None
]:
for lineage in get_view_lineage(
view=view,
metadata=self.metadata,
service_name=self.context.get().database_service,
connection_type=self.service_connection.type.value,
timeout_seconds=self.source_config.queryParsingTimeoutLimit,
):
if lineage.right is not None:
yield Either(
right=OMetaLineageRequest(
lineage_request=lineage.right,
override_lineage=self.source_config.overrideViewLineage,
)
)
else:
yield lineage

@calculate_execution_time_generator()
def yield_view_lineage(self) -> Iterable[Either[OMetaLineageRequest]]:
logger.info("Processing Lineage for Views")
if self.source_config.threads > 1:
yield from self.multithread_process_view_lineage()
else:
yield from self._process_view_def_serial()

def _prepare_foreign_constraints( # pylint: disable=too-many-arguments, too-many-locals
self,
supports_database: bool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import (
Expand Down Expand Up @@ -272,12 +270,6 @@ def yield_table(
)
)

def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
"""
views are not supported with NoSQL
"""
yield from []

def yield_tag(
self, schema_name: str
) -> Iterable[Either[OMetaTagAndClassification]]:
Expand All @@ -296,12 +288,6 @@ def yield_stored_procedure(
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""Not Implemented"""

def yield_procedure_lineage_and_queries(
self,
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
"""Not Implemented"""
yield from []

def get_source_url(
self,
database_name: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"""
import traceback
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Optional, Set, Tuple, Union
from typing import Any, Iterable, List, Optional, Set, Tuple

from pydantic import BaseModel, Field
from sqlalchemy.engine import Inspector
Expand All @@ -23,7 +23,6 @@
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
Expand Down Expand Up @@ -112,11 +111,7 @@ class DatabaseServiceTopology(ServiceTopology):
),
],
children=["database"],
# Note how we have `yield_view_lineage` and `yield_stored_procedure_lineage`
# as post_processed. This is because we cannot ensure proper lineage processing
# until we have finished ingesting all the metadata from the source.
post_process=[
"yield_view_lineage",
"yield_external_table_lineage",
"yield_table_constraints",
],
Expand Down Expand Up @@ -345,13 +340,6 @@ def yield_database_tag_details(
if self.source_config.includeTags:
yield from self.yield_database_tag(database_name) or []

@abstractmethod
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
"""
From topology.
Parses view definition to get lineage information
"""

def update_table_constraints(
self,
table_name,
Expand Down Expand Up @@ -387,12 +375,6 @@ def yield_stored_procedure(
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Process the stored procedure information"""

@abstractmethod
def yield_procedure_lineage_and_queries(
self,
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
"""Extracts the lineage information from Stored Procedures"""

def get_raw_database_schema_names(self) -> Iterable[str]:
"""
fetch all schema names without any filtering.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,16 @@
"""
import json
import traceback
from typing import Any, Iterable, Optional, Tuple, Union
from typing import Any, Iterable, Optional, Tuple

from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Table, TableType
Expand Down Expand Up @@ -319,9 +317,6 @@ def yield_table(
)
)

def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
yield from []

def yield_tag(
self, schema_name: str
) -> Iterable[Either[OMetaTagAndClassification]]:
Expand All @@ -338,12 +333,6 @@ def yield_stored_procedure(
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""Not Implemented"""

def yield_procedure_lineage_and_queries(
self,
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
"""Not Implemented"""
yield from []

def standardize_table_name(
self, schema: str, table: str # pylint: disable=unused-argument
) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,16 @@
Deltalake source methods.
"""
import traceback
from typing import Any, Iterable, Optional, Tuple, Union
from typing import Any, Iterable, Optional, Tuple

from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Table, TablePartition, TableType
Expand Down Expand Up @@ -283,9 +281,6 @@ def yield_table(
def prepare(self):
"""Nothing to prepare"""

def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
yield from []

def yield_tag(
self, schema_name: str
) -> Iterable[Either[OMetaTagAndClassification]]:
Expand All @@ -302,11 +297,5 @@ def yield_stored_procedure(
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""Not Implemented"""

def yield_procedure_lineage_and_queries(
self,
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
"""Not Implemented"""
yield from []

def close(self):
"""No client to close"""
Loading

0 comments on commit 6fa03ee

Please sign in to comment.