Skip to content

Commit

Permalink
feat(ingest): add DataHubGraph.emit_all method (datahub-project#10002)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Mar 11, 2024
1 parent 7e2076e commit 5937472
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 38 deletions.
28 changes: 27 additions & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,38 @@
# Updating DataHub

<!--
## <version number>
### Breaking Changes
### Potential Downtime
### Deprecations
### Other Notable Changes
-->

This file documents any backwards-incompatible changes in DataHub and assists people when migrating to a new version.

## Next

### Breaking Changes

- #9934 - Stateful ingestion is now enabled by default if datahub-rest sink is used or if a `datahub_api` is specified. It will still be disabled by default when any other sink type is used.
- #10002 - The `DataHubGraph` client no longer makes a request to the backend during initialization. If you want to preserve the old behavior, call `graph.test_connection()` after constructing the client.

### Potential Downtime

### Deprecations

### Other Notable Changes

## 0.13.0

### Breaking Changes

- Updating MySQL version for quickstarts to 8.2, may cause quickstart issues for existing instances.
- Neo4j 5.x, may require migration from 4.x
- Build requires JDK17 (Runtime Java 11)
Expand Down Expand Up @@ -36,7 +63,6 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

- #9601 - The Unity Catalog(UC) ingestion source config `include_hive_metastore` is now enabled by default. This requires config `warehouse_id` to be set. You can disable `include_hive_metastore` by setting it to `False` to avoid ingesting legacy hive metastore catalog in Databricks.
- #9904 - The default Redshift `table_lineage_mode` is now MIXED, instead of `STL_SCAN_BASED`. Improved lineage generation is also available by enabling `use_lineaege_v2`. This v2 implementation will become the default in a future release.
- #9934 - The stateful_ingestion is now enabled by default, if datahub-rest sink is used or if a `datahub_api` is specified

### Potential Downtime

Expand Down
12 changes: 7 additions & 5 deletions metadata-ingestion/src/datahub/cli/specific/user_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ def upsert(file: Path, override_editable: bool) -> None:
for user_config in user_configs:
try:
datahub_user: CorpUser = CorpUser.parse_obj(user_config)
for mcp in datahub_user.generate_mcp(
generation_config=CorpUserGenerationConfig(
override_editable=override_editable

emitter.emit_all(
datahub_user.generate_mcp(
generation_config=CorpUserGenerationConfig(
override_editable=override_editable
)
)
):
emitter.emit(mcp)
)
click.secho(f"Update succeeded for urn {datahub_user.urn}.", fg="green")
except Exception as e:
click.secho(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class EnvConfigMixin(ConfigModel):

_env_deprecation = pydantic_field_deprecated(
"env",
message="env is deprecated and will be removed in a future release. Please use platform_instance instead.",
message="We recommend using platform_instance instead of env. "
"While specifying env does still work, we intend to deprecate it in the future.",
)

@validator("env")
Expand Down
18 changes: 3 additions & 15 deletions metadata-ingestion/src/datahub/emitter/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
from typing import TYPE_CHECKING, List, Optional, Sequence, Tuple, Union

from datahub.emitter.aspect import ASPECT_MAP, JSON_CONTENT_TYPE, TIMESERIES_ASPECT_MAP
from datahub.emitter.aspect import ASPECT_MAP, JSON_CONTENT_TYPE
from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform
from datahub.metadata.schema_classes import (
ChangeTypeClass,
Expand Down Expand Up @@ -244,21 +244,9 @@ def as_workunit(
) -> "MetadataWorkUnit":
from datahub.ingestion.api.workunit import MetadataWorkUnit

if self.aspect and self.aspectName in TIMESERIES_ASPECT_MAP:
# TODO: Make this a cleaner interface.
ts = getattr(self.aspect, "timestampMillis", None)
assert ts is not None

# If the aspect is a timeseries aspect, include the timestampMillis in the ID.
return MetadataWorkUnit(
id=f"{self.entityUrn}-{self.aspectName}-{ts}",
mcp=self,
treat_errors_as_warnings=treat_errors_as_warnings,
is_primary_source=is_primary_source,
)

id = MetadataWorkUnit.generate_workunit_id(self)
return MetadataWorkUnit(
id=f"{self.entityUrn}-{self.aspectName}",
id=id,
mcp=self,
treat_errors_as_warnings=treat_errors_as_warnings,
is_primary_source=is_primary_source,
Expand Down
8 changes: 6 additions & 2 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,14 @@ def __init__(
timeout=(self._connect_timeout_sec, self._read_timeout_sec),
)

def test_connection(self) -> dict:
def test_connection(self) -> None:
url = f"{self._gms_server}/config"
response = self._session.get(url)
if response.status_code == 200:
config: dict = response.json()
if config.get("noCode") == "true":
self.server_config = config
return config
return

else:
# Looks like we either connected to an old GMS or to some other service. Let's see if we can determine which before raising an error
Expand Down Expand Up @@ -195,6 +195,10 @@ def test_connection(self) -> dict:
message += "\nPlease check your configuration and make sure you are talking to the DataHub GMS (usually <datahub-gms-host>:8080) or Frontend GMS API (usually <frontend>:9002/api/gms)."
raise ConfigurationError(message)

def get_server_config(self) -> dict:
self.test_connection()
return self.server_config

def to_graph(self) -> "DataHubGraph":
from datahub.ingestion.graph.client import DataHubGraph

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ def auto_workunit(

for item in stream:
if isinstance(item, MetadataChangeEventClass):
yield MetadataWorkUnit(id=f"{item.proposedSnapshot.urn}/mce", mce=item)
yield MetadataWorkUnit(
id=MetadataWorkUnit.generate_workunit_id(item),
mce=item,
)
else:
yield item.as_workunit()

Expand Down
23 changes: 23 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/workunit.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from deprecated import deprecated

from datahub.emitter.aspect import TIMESERIES_ASPECT_MAP
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import WorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
Expand Down Expand Up @@ -97,6 +98,28 @@ def get_urn(self) -> str:
assert self.metadata.entityUrn
return self.metadata.entityUrn

@classmethod
def generate_workunit_id(
cls,
item: Union[
MetadataChangeEvent, MetadataChangeProposal, MetadataChangeProposalWrapper
],
) -> str:
if isinstance(item, MetadataChangeEvent):
return f"{item.proposedSnapshot.urn}/mce"
elif isinstance(item, (MetadataChangeProposalWrapper, MetadataChangeProposal)):
if item.aspect and item.aspectName in TIMESERIES_ASPECT_MAP:
# TODO: Make this a cleaner interface.
ts = getattr(item.aspect, "timestampMillis", None)
assert ts is not None

# If the aspect is a timeseries aspect, include the timestampMillis in the ID.
return f"{item.entityUrn}-{item.aspectName}-{ts}"

return f"{item.entityUrn}-{item.aspectName}"
else:
raise ValueError(f"Unexpected type {type(item)}")

def get_aspect_of_type(self, aspect_cls: Type[T_Aspect]) -> Optional[T_Aspect]:
aspects: list
if isinstance(self.metadata, MetadataChangeEvent):
Expand Down
91 changes: 83 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import enum
import functools
import json
Expand All @@ -7,7 +8,18 @@
from dataclasses import dataclass
from datetime import datetime
from json.decoder import JSONDecodeError
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
Iterator,
List,
Optional,
Tuple,
Type,
Union,
)

from avro.schema import RecordSchema
from deprecated import deprecated
Expand All @@ -26,6 +38,10 @@
generate_filter,
)
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
)
from datahub.metadata.schema_classes import (
ASPECT_NAME_MAP,
KEY_ASPECTS,
Expand All @@ -47,6 +63,7 @@
from datahub.utilities.urns.urn import Urn, guess_entity_type

if TYPE_CHECKING:
from datahub.ingestion.sink.datahub_rest import DatahubRestSink
from datahub.ingestion.source.state.entity_removal_state import (
GenericCheckpointState,
)
Expand All @@ -58,6 +75,8 @@


logger = logging.getLogger(__name__)
_MISSING_SERVER_ID = "missing"
_GRAPH_DUMMY_RUN_ID = "__datahub-graph-client"


class DatahubClientConfig(ConfigModel):
Expand Down Expand Up @@ -122,21 +141,25 @@ def __init__(self, config: DatahubClientConfig) -> None:
client_certificate_path=self.config.client_certificate_path,
disable_ssl_verification=self.config.disable_ssl_verification,
)
self.test_connection()

self.server_id = _MISSING_SERVER_ID

def test_connection(self) -> None:
super().test_connection()

# Cache the server id for telemetry.
from datahub.telemetry.telemetry import telemetry_instance

if not telemetry_instance.enabled:
self.server_id = "missing"
self.server_id = _MISSING_SERVER_ID
return
try:
client_id: Optional[TelemetryClientIdClass] = self.get_aspect(
"urn:li:telemetry:clientId", TelemetryClientIdClass
)
self.server_id = client_id.clientId if client_id else "missing"
self.server_id = client_id.clientId if client_id else _MISSING_SERVER_ID
except Exception as e:
self.server_id = "missing"
self.server_id = _MISSING_SERVER_ID
logger.debug(f"Failed to get server id due to {e}")

@classmethod
Expand Down Expand Up @@ -179,6 +202,56 @@ def _get_generic(self, url: str, params: Optional[Dict] = None) -> Dict:
def _post_generic(self, url: str, payload_dict: Dict) -> Dict:
return self._send_restli_request("POST", url, json=payload_dict)

@contextlib.contextmanager
def make_rest_sink(
self, run_id: str = _GRAPH_DUMMY_RUN_ID
) -> Iterator["DatahubRestSink"]:
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.sink.datahub_rest import (
DatahubRestSink,
DatahubRestSinkConfig,
SyncOrAsync,
)

# This is a bit convoluted - this DataHubGraph class is a subclass of DatahubRestEmitter,
# but initializing the rest sink creates another rest emitter.
# TODO: We should refactor out the multithreading functionality of the sink
# into a separate class that can be used by both the sink and the graph client
# e.g. a DatahubBulkRestEmitter that both the sink and the graph client use.
sink_config = DatahubRestSinkConfig(
**self.config.dict(), mode=SyncOrAsync.ASYNC
)

with DatahubRestSink(PipelineContext(run_id=run_id), sink_config) as sink:
yield sink
if sink.report.failures:
raise OperationalError(
f"Failed to emit {len(sink.report.failures)} records",
info=sink.report.as_obj(),
)

def emit_all(
self,
items: Iterable[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
]
],
run_id: str = _GRAPH_DUMMY_RUN_ID,
) -> None:
"""Emit all items in the iterable using multiple threads."""

with self.make_rest_sink(run_id=run_id) as sink:
for item in items:
sink.emit_async(item)
if sink.report.failures:
raise OperationalError(
f"Failed to emit {len(sink.report.failures)} records",
info=sink.report.as_obj(),
)

def get_aspect(
self,
entity_urn: str,
Expand Down Expand Up @@ -861,7 +934,7 @@ def exists(self, entity_urn: str) -> bool:
def soft_delete_entity(
self,
urn: str,
run_id: str = "__datahub-graph-client",
run_id: str = _GRAPH_DUMMY_RUN_ID,
deletion_timestamp: Optional[int] = None,
) -> None:
"""Soft-delete an entity by urn.
Expand All @@ -873,7 +946,7 @@ def soft_delete_entity(
assert urn

deletion_timestamp = deletion_timestamp or int(time.time() * 1000)
self.emit_mcp(
self.emit(
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=StatusClass(removed=True),
Expand Down Expand Up @@ -1098,4 +1171,6 @@ def close(self) -> None:

def get_default_graph() -> DataHubGraph:
(url, token) = get_url_and_token()
return DataHubGraph(DatahubClientConfig(server=url, token=token))
graph = DataHubGraph(DatahubClientConfig(server=url, token=token))
graph.test_connection()
return graph
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def __init__(
with _add_init_error_context("connect to DataHub"):
if self.config.datahub_api:
self.graph = DataHubGraph(self.config.datahub_api)
self.graph.test_connection()

telemetry.telemetry_instance.update_capture_exception_context(
server=self.graph
Expand Down
Loading

0 comments on commit 5937472

Please sign in to comment.