Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes GEN-1994: Remove View Lineage from Metadata Ingestion flow #18558

Merged
merged 7 commits into from
Nov 12, 2024
4 changes: 4 additions & 0 deletions bootstrap/sql/migrations/native/1.6.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@ CREATE TABLE IF NOT EXISTS successful_sent_change_events (
-- Create an index on the event_subscription_id column in the successful_sent_change_events table
CREATE INDEX idx_event_subscription_id ON successful_sent_change_events (event_subscription_id);

-- Remove Override View Lineage
UPDATE ingestion_pipeline_entity
SET json = JSON_REMOVE(json, '$.sourceConfig.config.overrideViewLineage')
WHERE JSON_EXTRACT(json, '$.pipelineType') = 'metadata';
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,7 @@ CREATE TABLE IF NOT EXISTS successful_sent_change_events (
-- Create an index on the event_subscription_id column in the successful_sent_change_events table
CREATE INDEX idx_event_subscription_id ON successful_sent_change_events (event_subscription_id);

-- Remove Override View Lineage
UPDATE ingestion_pipeline_entity
SET json = json::jsonb #- '{sourceConfig,config,overrideViewLineage}'
WHERE json #>> '{pipelineType}' = 'metadata';
107 changes: 0 additions & 107 deletions ingestion/src/metadata/ingestion/source/database/common_db_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@
Generic source to build SQL connectors.
"""
import copy
import math
import time
import traceback
from abc import ABC
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, cast

Expand Down Expand Up @@ -64,9 +61,7 @@
from metadata.ingestion.api.models import Either
from metadata.ingestion.connections.session import create_and_bind_thread_safe_session
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
from metadata.ingestion.models.patch_request import PatchedEntity, PatchRequest
from metadata.ingestion.models.topology import Queue
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import (
get_connection,
Expand All @@ -79,9 +74,7 @@
from metadata.ingestion.source.models import TableView
from metadata.utils import fqn
from metadata.utils.constraints import get_relationship_type
from metadata.utils.db_utils import get_view_lineage
from metadata.utils.execution_time_tracker import (
ExecutionTimeTrackerContextMap,
calculate_execution_time,
calculate_execution_time_generator,
)
Expand Down Expand Up @@ -618,106 +611,6 @@ def yield_table(
)
)

def multithread_process_view_lineage(self) -> Iterable[Either[OMetaLineageRequest]]:
"""Multithread Processing of a Node"""

views_list = list(self.context.get().table_views or [])
views_length = len(views_list)

if views_length != 0:
chunksize = int(math.ceil(views_length / self.source_config.threads))
chunks = [
views_list[i : i + chunksize] for i in range(0, views_length, chunksize)
]

thread_pool = ThreadPoolExecutor(max_workers=self.source_config.threads)
queue = Queue()

futures = [
thread_pool.submit(
self._process_view_def_chunk,
chunk,
queue,
self.context.get_current_thread_id(),
)
for chunk in chunks
]

while True:
if queue.has_tasks():
yield from queue.process()

else:
if not futures:
break

for i, future in enumerate(futures):
if future.done():
future.result()
futures.pop(i)

time.sleep(0.01)

def _process_view_def_chunk(
self, chunk: List[TableView], queue: Queue, thread_id: int
) -> None:
"""
Process a chunk of view definitions
"""
self.context.copy_from(thread_id)
ExecutionTimeTrackerContextMap().copy_from_parent(thread_id)
for view in [v for v in chunk if v.view_definition is not None]:
for lineage in get_view_lineage(
view=view,
metadata=self.metadata,
service_name=self.context.get().database_service,
connection_type=self.service_connection.type.value,
timeout_seconds=self.source_config.queryParsingTimeoutLimit,
):
if lineage.right is not None:
queue.put(
Either(
right=OMetaLineageRequest(
lineage_request=lineage.right,
override_lineage=self.source_config.overrideViewLineage,
)
)
)
else:
queue.put(lineage)

def _process_view_def_serial(self) -> Iterable[Either[OMetaLineageRequest]]:
"""
Process view definitions serially
"""
for view in [
v for v in self.context.get().table_views if v.view_definition is not None
]:
for lineage in get_view_lineage(
view=view,
metadata=self.metadata,
service_name=self.context.get().database_service,
connection_type=self.service_connection.type.value,
timeout_seconds=self.source_config.queryParsingTimeoutLimit,
):
if lineage.right is not None:
yield Either(
right=OMetaLineageRequest(
lineage_request=lineage.right,
override_lineage=self.source_config.overrideViewLineage,
)
)
else:
yield lineage

@calculate_execution_time_generator()
def yield_view_lineage(self) -> Iterable[Either[OMetaLineageRequest]]:
logger.info("Processing Lineage for Views")
if self.source_config.threads > 1:
yield from self.multithread_process_view_lineage()
else:
yield from self._process_view_def_serial()

def _prepare_foreign_constraints( # pylint: disable=too-many-arguments, too-many-locals
self,
supports_database: bool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,6 @@ def yield_table(
)
)

def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
"""
views are not supported with NoSQL
"""
yield from []

def yield_tag(
self, schema_name: str
) -> Iterable[Either[OMetaTagAndClassification]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,7 @@ class DatabaseServiceTopology(ServiceTopology):
),
],
children=["database"],
# Note how we have `yield_view_lineage` and `yield_stored_procedure_lineage`
# as post_processed. This is because we cannot ensure proper lineage processing
# until we have finished ingesting all the metadata from the source.
post_process=[
"yield_view_lineage",
"yield_external_table_lineage",
"yield_table_constraints",
],
Expand Down Expand Up @@ -345,13 +341,6 @@ def yield_database_tag_details(
if self.source_config.includeTags:
yield from self.yield_database_tag(database_name) or []

@abstractmethod
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
"""
From topology.
Parses view definition to get lineage information
"""

def update_table_constraints(
self,
table_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,6 @@ def yield_table(
)
)

def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
yield from []

def yield_tag(
self, schema_name: str
) -> Iterable[Either[OMetaTagAndClassification]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,6 @@ def yield_table(
def prepare(self):
"""Nothing to prepare"""

def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
yield from []

def yield_tag(
self, schema_name: str
) -> Iterable[Either[OMetaTagAndClassification]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,6 @@ def yield_procedure_lineage_and_queries(
"""Not Implemented"""
yield from []

def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
yield from []

def get_source_url(
self,
table_name: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,6 @@ def get_format(cls, storage: StorageDetails) -> Optional[FileFormat]:
def standardize_table_name(self, _: str, table: str) -> str:
return table[:128]

def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
yield from []

def yield_tag(
self, schema_name: str
) -> Iterable[Either[OMetaTagAndClassification]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,6 @@ def yield_tag(
"""
yield from []

def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
"""
From topology.
Parses view definition to get lineage information
"""
yield from []

def get_stored_procedures(self) -> Iterable[Any]:
"""Not Implemented"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,6 @@ def column_type(self, column_type: str):
return DataType.VARCHAR.value
return DataType.UNKNOWN.value

def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
yield from []

def yield_tag(
self, schema_name: str
) -> Iterable[Either[OMetaTagAndClassification]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,9 +881,6 @@ def yield_tag(
) -> Iterable[Either[OMetaTagAndClassification]]:
"""No tags to send"""

def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
yield from []

def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, list]]]:
"""Not implemented"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
)
from metadata.ingestion.source.models import TableView
from metadata.utils import fqn
from metadata.utils.db_utils import get_view_lineage
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
from metadata.utils.logger import ingestion_logger

Expand Down Expand Up @@ -521,18 +520,6 @@ def get_columns(self, column_data: List[ColumnInfo]) -> Iterable[Column]:
)
yield parsed_column

def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
logger.info("Processing Lineage for Views")
for view in [
v for v in self.context.get().table_views if v.view_definition is not None
]:
yield from get_view_lineage(
view=view,
metadata=self.metadata,
service_name=self.context.get().database_service,
connection_type=self.service_connection.type.value,
)

def yield_tag(
self, schema_name: str
) -> Iterable[Either[OMetaTagAndClassification]]:
Expand Down
13 changes: 3 additions & 10 deletions ingestion/tests/cli_e2e/common/test_cli_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,10 @@ def assert_for_table_with_profiler(
self.expected_profiled_tables(),
)
sample_data = self.retrieve_sample_data(self.fqn_created_table()).sampleData
lineage = self.retrieve_lineage(self.fqn_created_table())
self.assertEqual(len(sample_data.rows), self.inserted_rows_count())
if self.view_column_lineage_count() is not None:
self.assertEqual(
len(
lineage["downstreamEdges"][0]["lineageDetails"][
"columnsLineage"
]
),
self.view_column_lineage_count(),
)
# Since we removed view lineage from metadata workflow as part
# of https://github.com/open-metadata/OpenMetadata/pull/18558
# we need to introduce Lineage E2E base and add view lineage check there.

def assert_for_table_with_profiler_time_partition(
self, source_status: Status, sink_status: Status
Expand Down
1 change: 1 addition & 0 deletions ingestion/tests/integration/postgres/test_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def test_native_lineage(
):
ingestion_config["source"]["sourceConfig"]["config"].update(source_config)
run_workflow(MetadataWorkflow, ingestion_config)
run_workflow(MetadataWorkflow, native_lineage_config)
film_actor_edges = metadata.get_lineage_by_name(
Table, f"{db_service.fullyQualifiedName.root}.dvdrental.public.film_actor"
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
title: Upgrade 1.5.x to 1.6.x
slug: /deployment/upgrade/versions/150-to-160
collate: false
---

# Upgrade from 1.5.x to 1.6.x

Upgrading from version 1.5.x to 1.6.x can be executed directly on your instances. Below are important details and considerations for a smooth upgrade process.


## Deprecation Notice

## Breaking Changes in 1.6.x Stable Release

### View Lineage Transition to Lineage Workflow

The View Lineage feature has been relocated to the Lineage Workflow. This adjustment aims to enhance user experience and streamline access to lineage information.

#### Key Changes in YAML Configuration

As part of this upgrade, please note the modifications required in your YAML files for metadata ingestion:

- The `overrideViewLineage` configuration has been deprecated in the DatabaseMetadata source configuration.

#### Old Configuration Example

```yaml
....
sourceConfig:
config:
type: DatabaseMetadata
.....
overrideViewLineage: true # deprecated
.....
```

#### New Configuration Requirement
The `overrideViewLineage` setting will now be part of the DatabaseLineage configuration within the Lineage Workflow:


```yaml
....
sourceConfig:
config:
type: DatabaseLineage
.....
lineageInformation:
overrideViewLineage: true
.....
```

Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ class DatabaseServiceTopology(ServiceTopology):
),
],
children=["database"],
# Note how we have `yield_view_lineage` and `yield_stored_procedure_lineage`
# as post_processed. This is because we cannot ensure proper lineage processing
# until we have finished ingesting all the metadata from the source.
post_process=["yield_view_lineage", "yield_procedure_lineage_and_queries"],
post_process=["yield_procedure_lineage_and_queries"],
ayush-shah marked this conversation as resolved.
Show resolved Hide resolved
)
database = TopologyNode(
producer="get_database_names",
Expand Down
Loading
Loading