Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/datahub-source): Allow ingesting aspects from the entitiesV2 API #9089

Merged
merged 2 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from typing import Optional

from pydantic import Field, root_validator
Expand Down Expand Up @@ -67,9 +68,25 @@ class DataHubSourceConfig(StatefulIngestionConfigBase):
),
)

pull_from_datahub_api: bool = Field(
default=False,
description="Use the DataHub API to fetch versioned aspects.",
hidden_from_docs=True,
)

max_workers: int = Field(
default=5 * (os.cpu_count() or 4),
description="Number of worker threads to use for datahub api ingestion.",
hidden_from_docs=True,
)

@root_validator
def check_ingesting_data(cls, values):
if not values.get("database_connection") and not values.get("kafka_connection"):
if (
not values.get("database_connection")
and not values.get("kafka_connection")
and not values.get("pull_from_datahub_api")
):
raise ValueError(
"Your current config will not ingest any data."
" Please specify at least one of `database_connection` or `kafka_connection`, ideally both."
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging
from concurrent import futures
from typing import Dict, Iterable, List

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.graph.filters import RemovedStatusFilter
from datahub.ingestion.source.datahub.config import DataHubSourceConfig
from datahub.ingestion.source.datahub.report import DataHubSourceReport
from datahub.metadata._schema_classes import _Aspect

logger = logging.getLogger(__name__)

# Should work for at least mysql, mariadb, postgres
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f"


class DataHubApiReader:
def __init__(
self,
config: DataHubSourceConfig,
report: DataHubSourceReport,
graph: DataHubGraph,
):
self.config = config
self.report = report
self.graph = graph

def get_aspects(self) -> Iterable[MetadataChangeProposalWrapper]:
urns = self.graph.get_urns_by_filter(
status=RemovedStatusFilter.ALL,
batch_size=self.config.database_query_batch_size,
)
tasks: List[futures.Future[Iterable[MetadataChangeProposalWrapper]]] = []
with futures.ThreadPoolExecutor(
max_workers=self.config.max_workers
) as executor:
for urn in urns:
tasks.append(executor.submit(self._get_aspects_for_urn, urn))
for task in futures.as_completed(tasks):
yield from task.result()

def _get_aspects_for_urn(self, urn: str) -> Iterable[MetadataChangeProposalWrapper]:
aspects: Dict[str, _Aspect] = self.graph.get_entity_semityped(urn) # type: ignore
for aspect in aspects.values():
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=aspect,
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.datahub.config import DataHubSourceConfig
from datahub.ingestion.source.datahub.datahub_api_reader import DataHubApiReader
from datahub.ingestion.source.datahub.datahub_database_reader import (
DataHubDatabaseReader,
)
Expand Down Expand Up @@ -58,6 +59,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
logger.info(f"Ingesting DataHub metadata up until {self.report.stop_time}")
state = self.stateful_ingestion_handler.get_last_run_state()

if self.config.pull_from_datahub_api:
yield from self._get_api_workunits()

if self.config.database_connection is not None:
yield from self._get_database_workunits(
from_createdon=state.database_createdon_datetime
Expand Down Expand Up @@ -139,6 +143,18 @@ def _get_kafka_workunits(
)
self._commit_progress(i)

def _get_api_workunits(self) -> Iterable[MetadataWorkUnit]:
if self.ctx.graph is None:
self.report.report_failure(
"datahub_api",
"Specify datahub_api on your ingestion recipe to ingest from the DataHub API",
)
return

reader = DataHubApiReader(self.config, self.report, self.ctx.graph)
for mcp in reader.get_aspects():
yield mcp.as_workunit()

def _commit_progress(self, i: Optional[int] = None) -> None:
"""Commit progress to stateful storage, if there have been no errors.

Expand Down
Loading