Skip to content

Commit

Permalink
Merge branch 'master' into feat(ingestion/neo4j)
Browse files Browse the repository at this point in the history
  • Loading branch information
k-bartlett authored Oct 21, 2024
2 parents 9360ce6 + 554288b commit 44fad3a
Show file tree
Hide file tree
Showing 31 changed files with 971 additions and 275 deletions.
11 changes: 10 additions & 1 deletion datahub-web-react/src/app/ingest/source/builder/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import csvLogo from '../../../../images/csv-logo.png';
import qlikLogo from '../../../../images/qliklogo.png';
import sigmaLogo from '../../../../images/sigmalogo.png';
import sacLogo from '../../../../images/saclogo.svg';
import datahubLogo from '../../../../images/datahublogo.png';

export const ATHENA = 'athena';
export const ATHENA_URN = `urn:li:dataPlatform:${ATHENA}`;
Expand Down Expand Up @@ -125,6 +126,11 @@ export const SIGMA = 'sigma';
export const SIGMA_URN = `urn:li:dataPlatform:${SIGMA}`;
export const SAC = 'sac';
export const SAC_URN = `urn:li:dataPlatform:${SAC}`;
export const DATAHUB = 'datahub';
export const DATAHUB_GC = 'datahub-gc';
export const DATAHUB_LINEAGE_FILE = 'datahub-lineage-file';
export const DATAHUB_BUSINESS_GLOSSARY = 'datahub-business-glossary';
export const DATAHUB_URN = `urn:li:dataPlatform:${DATAHUB}`;

export const PLATFORM_URN_TO_LOGO = {
[ATHENA_URN]: athenaLogo,
Expand Down Expand Up @@ -165,6 +171,7 @@ export const PLATFORM_URN_TO_LOGO = {
[QLIK_SENSE_URN]: qlikLogo,
[SIGMA_URN]: sigmaLogo,
[SAC_URN]: sacLogo,
[DATAHUB_URN]: datahubLogo,
};

export const SOURCE_TO_PLATFORM_URN = {
Expand All @@ -178,5 +185,7 @@ export const SOURCE_TO_PLATFORM_URN = {
[SNOWFLAKE_USAGE]: SNOWFLAKE_URN,
[STARBURST_TRINO_USAGE]: TRINO_URN,
[DBT_CLOUD]: DBT_URN,
[VERTICA]: VERTICA_URN,
[DATAHUB_GC]: DATAHUB_URN,
[DATAHUB_LINEAGE_FILE]: DATAHUB_URN,
[DATAHUB_BUSINESS_GLOSSARY]: DATAHUB_URN,
};
1 change: 0 additions & 1 deletion docs/businessattributes.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ Taking the example of "United States- Social Security Number", if an application
What you need to create/update and associate business attributes to dataset schema field

* **Manage Business Attributes** platform privilege to create/update/delete business attributes.
* **Edit Dataset Column Business Attribute** metadata privilege to associate business attributes to dataset schema field.

## Using Business Attributes
As of now Business Attributes can only be created through UI
Expand Down
28 changes: 15 additions & 13 deletions metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@ transformers:
```
## Simple Add Dataset ownership
### Config Details
| Field | Required | Type | Default | Description |
|--------------------|----------|--------------|-------------|---------------------------------------------------------------------|
| `owner_urns` | ✅ | list[string] | | List of owner urns. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove ownership from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| Field | Required | Type | Default | Description |
|--------------------|----------|--------------|-------------|------------------------------------------------------------------------------------------------------------|
| `owner_urns` | ✅ | list[string] | | List of owner urns. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove ownership from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `on_conflict` | | enum | `DO_UPDATE` | Whether to make changes if domains already exist. If set to DO_NOTHING, `semantics` setting is irrelevant. |

For transformer behaviour on `replace_existing` and `semantics`, please refer section [Relationship Between replace_existing And semantics](#relationship-between-replace_existing-and-semantics).

Expand Down Expand Up @@ -191,13 +192,14 @@ transformers:

## Pattern Add Dataset ownership
### Config Details
| Field | Required | Type | Default | Description |
|--------------------|----------|----------------------|-------------|-----------------------------------------------------------------------------------------|
| `owner_pattern` | ✅ | map[regx, list[urn]] | | entity urn with regular expression and list of owners urn apply to matching entity urn. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then ownership will be attached to both the dataset and its container. |
| Field | Required | Type | Default | Description |
|--------------------|----------|----------------------|-------------|------------------------------------------------------------------------------------------------------------------------------|
| `owner_pattern` | ✅ | map[regx, list[urn]] | | entity urn with regular expression and list of owners urn apply to matching entity urn. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then ownership will be attached to both the dataset and its container. |
| `on_conflict` | | enum | `DO_UPDATE` | Whether to make changes if domains already exist. If set to DO_NOTHING, `semantics` setting is irrelevant. |

let’s suppose we’d like to append a series of users who we know to own a different dataset from a data source but aren't detected during normal ingestion. To do so, we can use the `pattern_add_dataset_ownership` module that’s included in the ingestion framework. This will match the pattern to `urn` of the dataset and assign the respective owners.

Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ def get_tags(self, entity_urn: str) -> Optional[GlobalTagsClass]:
def get_glossary_terms(self, entity_urn: str) -> Optional[GlossaryTermsClass]:
return self.get_aspect(entity_urn=entity_urn, aspect_type=GlossaryTermsClass)

@functools.lru_cache(maxsize=1)
def get_domain(self, entity_urn: str) -> Optional[DomainsClass]:
return self.get_aspect(entity_urn=entity_urn, aspect_type=DomainsClass)

Expand Down
19 changes: 10 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dataclasses
import logging
from dataclasses import dataclass, field as dataclass_field
from typing import Dict, List, Optional
from typing import Dict, Optional

import pydantic
from pydantic import Field, root_validator
Expand All @@ -23,6 +23,7 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.perf_timer import PerfTimer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -114,24 +115,24 @@ def validate_destination_platfrom_and_config(cls, values: Dict) -> Dict:
return values


@dataclass
@dataclasses.dataclass
class MetadataExtractionPerfReport(Report):
connectors_metadata_extraction_sec: PerfTimer = dataclass_field(
connectors_metadata_extraction_sec: PerfTimer = dataclasses.field(
default_factory=PerfTimer
)
connectors_lineage_extraction_sec: PerfTimer = dataclass_field(
connectors_lineage_extraction_sec: PerfTimer = dataclasses.field(
default_factory=PerfTimer
)
connectors_jobs_extraction_sec: PerfTimer = dataclass_field(
connectors_jobs_extraction_sec: PerfTimer = dataclasses.field(
default_factory=PerfTimer
)


@dataclass
@dataclasses.dataclass
class FivetranSourceReport(StaleEntityRemovalSourceReport):
connectors_scanned: int = 0
filtered_connectors: List[str] = dataclass_field(default_factory=list)
metadata_extraction_perf: MetadataExtractionPerfReport = dataclass_field(
filtered_connectors: LossyList[str] = dataclasses.field(default_factory=LossyList)
metadata_extraction_perf: MetadataExtractionPerfReport = dataclasses.field(
default_factory=MetadataExtractionPerfReport
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class Connector:
sync_frequency: int
destination_id: str
user_id: str
table_lineage: List[TableLineage]
lineage: List[TableLineage]
jobs: List["Job"]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
)
from datahub.ingestion.source.fivetran.data_classes import Connector, Job
from datahub.ingestion.source.fivetran.fivetran_log_api import FivetranLogAPI
from datahub.ingestion.source.fivetran.fivetran_query import (
MAX_JOBS_PER_CONNECTOR,
MAX_TABLE_LINEAGE_PER_CONNECTOR,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
Expand Down Expand Up @@ -72,11 +76,6 @@ def __init__(self, config: FivetranSourceConfig, ctx: PipelineContext):

self.audit_log = FivetranLogAPI(self.config.fivetran_log_config)

# Create and register the stateful ingestion use-case handler.
self.stale_entity_removal_handler = StaleEntityRemovalHandler.create(
self, self.config, self.ctx
)

def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:
input_dataset_urn_list: List[DatasetUrn] = []
output_dataset_urn_list: List[DatasetUrn] = []
Expand Down Expand Up @@ -108,13 +107,21 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:
f"Fivetran connector source type: {connector.connector_type} is not supported to mapped with Datahub dataset entity."
)

for table_lineage in connector.table_lineage:
if len(connector.lineage) >= MAX_TABLE_LINEAGE_PER_CONNECTOR:
self.report.warning(
title="Table lineage truncated",
message=f"The connector had more than {MAX_TABLE_LINEAGE_PER_CONNECTOR} table lineage entries. "
f"Only the most recent {MAX_TABLE_LINEAGE_PER_CONNECTOR} entries were ingested.",
context=f"{connector.connector_name} (connector_id: {connector.connector_id})",
)

for lineage in connector.lineage:
input_dataset_urn = DatasetUrn.create_from_ids(
platform_id=source_platform,
table_name=(
f"{source_database.lower()}.{table_lineage.source_table}"
f"{source_database.lower()}.{lineage.source_table}"
if source_database
else table_lineage.source_table
else lineage.source_table
),
env=source_platform_detail.env,
platform_instance=source_platform_detail.platform_instance,
Expand All @@ -123,14 +130,14 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:

output_dataset_urn = DatasetUrn.create_from_ids(
platform_id=self.config.fivetran_log_config.destination_platform,
table_name=f"{self.audit_log.fivetran_log_database.lower()}.{table_lineage.destination_table}",
table_name=f"{self.audit_log.fivetran_log_database.lower()}.{lineage.destination_table}",
env=destination_platform_detail.env,
platform_instance=destination_platform_detail.platform_instance,
)
output_dataset_urn_list.append(output_dataset_urn)

if self.config.include_column_lineage:
for column_lineage in table_lineage.column_lineage:
for column_lineage in lineage.column_lineage:
fine_grained_lineage.append(
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
Expand Down Expand Up @@ -267,6 +274,13 @@ def _get_connector_workunits(
).as_workunit(is_primary_source=False)

# Map Fivetran's job/sync history entity with Datahub's data process entity
if len(connector.jobs) >= MAX_JOBS_PER_CONNECTOR:
self.report.warning(
title="Not all sync history was captured",
message=f"The connector had more than {MAX_JOBS_PER_CONNECTOR} sync runs in the past {self.config.history_sync_lookback_period} days. "
f"Only the most recent {MAX_JOBS_PER_CONNECTOR} syncs were ingested.",
context=f"{connector.connector_name} (connector_id: {connector.connector_id})",
)
for job in connector.jobs:
dpi = self._generate_dpi_from_job(job, datajob)
yield from self._get_dpi_workunits(job, dpi)
Expand All @@ -279,7 +293,9 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
self.stale_entity_removal_handler.workunit_processor,
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
]

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
Expand Down
Loading

0 comments on commit 44fad3a

Please sign in to comment.