Skip to content

Commit

Permalink
Merge branch 'master' into ingestion/glue
Browse files Browse the repository at this point in the history
  • Loading branch information
aviv-julienjehannet authored Oct 10, 2024
2 parents c507982 + a219316 commit 3ba0b7b
Show file tree
Hide file tree
Showing 20 changed files with 1,231 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,13 @@ export const EntityProfile = <T, U>({
{showBrowseBar && <EntityProfileNavBar urn={urn} entityType={entityType} />}
{entityData?.status?.removed === true && (
<Alert
message="This entity is not discoverable via search or lineage graph. Contact your DataHub admin for more information."
message={
<>
This entity is marked as soft-deleted, likely due to stateful ingestion or a manual
deletion command, and will not appear in search or lineage graphs. Contact your DataHub
admin for more information.
</>
}
banner
/>
)}
Expand Down
1 change: 1 addition & 0 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ conn_id = datahub_rest_default # or datahub_kafka_default
| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. |
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
| render_templates | true | If true, jinja-templated fields will be automatically rendered to improve the accuracy of SQL statement extraction. |
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. |
| |
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,24 @@ class DatahubLineageConfig(ConfigModel):

capture_executions: bool = False

datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE

# Note that this field is only respected by the lineage backend.
# The Airflow plugin v2 behaves as if it were set to True.
graceful_exceptions: bool = True

# The remaining config fields are only relevant for the v2 plugin.
enable_extractors: bool = True

# If true, ti.render_templates() will be called in the listener.
# Makes extraction of jinja-templated fields more accurate.
render_templates: bool = True

log_level: Optional[str] = None
debug_emitter: bool = False

disable_openlineage_plugin: bool = True

# Note that this field is only respected by the lineage backend.
# The Airflow plugin behaves as if it were set to True.
graceful_exceptions: bool = True

datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE

def make_emitter_hook(self) -> "DatahubGenericHook":
# This is necessary to avoid issues with circular imports.
from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook
Expand Down Expand Up @@ -84,6 +89,7 @@ def get_lineage_config() -> DatahubLineageConfig:
disable_openlineage_plugin = conf.get(
"datahub", "disable_openlineage_plugin", fallback=True
)
render_templates = conf.get("datahub", "render_templates", fallback=True)
datajob_url_link = conf.get(
"datahub", "datajob_url_link", fallback=DatajobUrl.TASKINSTANCE.value
)
Expand All @@ -102,4 +108,5 @@ def get_lineage_config() -> DatahubLineageConfig:
debug_emitter=debug_emitter,
disable_openlineage_plugin=disable_openlineage_plugin,
datajob_url_link=datajob_url_link,
render_templates=render_templates,
)
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ def on_task_instance_running(
f"DataHub listener got notification about task instance start for {task_instance.task_id}"
)

task_instance = _render_templates(task_instance)
if self.config.render_templates:
task_instance = _render_templates(task_instance)

# The type ignore is to placate mypy on Airflow 2.1.x.
dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined]
Expand Down Expand Up @@ -478,7 +479,8 @@ def on_task_instance_finish(
) -> None:
dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined]

task_instance = _render_templates(task_instance)
if self.config.render_templates:
task_instance = _render_templates(task_instance)

# We must prefer the task attribute, in case modifications to the task's inlets/outlets
# were made by the execute() method.
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@
"snowflake-summary = datahub.ingestion.source.snowflake.snowflake_summary:SnowflakeSummarySource",
"snowflake-queries = datahub.ingestion.source.snowflake.snowflake_queries:SnowflakeQueriesSource",
"superset = datahub.ingestion.source.superset:SupersetSource",
"preset = datahub.ingestion.source.preset:PresetSource",
"tableau = datahub.ingestion.source.tableau.tableau:TableauSource",
"openapi = datahub.ingestion.source.openapi:OpenApiSource",
"metabase = datahub.ingestion.source.metabase:MetabaseSource",
Expand Down
7 changes: 7 additions & 0 deletions metadata-ingestion/src/datahub/cli/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ def _get_config_from_env() -> Tuple[Optional[str], Optional[str]]:
return url or host, token


def require_config_from_env() -> Tuple[str, Optional[str]]:
host, token = _get_config_from_env()
if host is None:
raise MissingConfigError("No GMS host was provided in env variables.")
return host, token


def load_client_config() -> DatahubClientConfig:
gms_host_env, gms_token_env = _get_config_from_env()
if gms_host_env:
Expand Down
6 changes: 6 additions & 0 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ def __init__(
):
if not gms_server:
raise ConfigurationError("gms server is required")
if gms_server == "__from_env__" and token is None:
# HACK: similar to what we do with system auth, we transparently
# inject the config in here. Ideally this should be done in the
# config loader or by the caller, but it gets the job done for now.
gms_server, token = config_utils.require_config_from_env()

self._gms_server = fixup_gms_url(gms_server)
self._token = token
self.server_config: Dict[str, Any] = {}
Expand Down
114 changes: 114 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/preset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import logging
from typing import Dict, Optional

import requests
from pydantic.class_validators import root_validator, validator
from pydantic.fields import Field

from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.superset import SupersetConfig, SupersetSource
from datahub.utilities import config_clean

logger = logging.getLogger(__name__)


class PresetConfig(SupersetConfig):
manager_uri: str = Field(
default="https://api.app.preset.io", description="Preset.io API URL"
)
connect_uri: str = Field(default="", description="Preset workspace URL.")
display_uri: Optional[str] = Field(
default=None,
description="optional URL to use in links (if `connect_uri` is only for ingestion)",
)
api_key: Optional[str] = Field(default=None, description="Preset.io API key.")
api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.")

# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description="Preset Stateful Ingestion Config."
)

options: Dict = Field(default={}, description="")
env: str = Field(
default=DEFAULT_ENV,
description="Environment to use in namespace when constructing URNs",
)
database_alias: Dict[str, str] = Field(
default={},
description="Can be used to change mapping for database names in superset to what you have in datahub",
)

@validator("connect_uri", "display_uri")
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)

@root_validator
def default_display_uri_to_connect_uri(cls, values):
base = values.get("display_uri")
if base is None:
values["display_uri"] = values.get("connect_uri")
return values


@platform_name("Preset")
@config_class(PresetConfig)
@support_status(SupportStatus.TESTING)
@capability(
SourceCapability.DELETION_DETECTION, "Optionally enabled via stateful_ingestion"
)
class PresetSource(SupersetSource):
"""
Variation of the Superset plugin that works with Preset.io (Apache Superset SaaS).
"""

config: PresetConfig
report: StaleEntityRemovalSourceReport
platform = "preset"

def __init__(self, ctx: PipelineContext, config: PresetConfig):
logger.info(f"ctx is {ctx}")

super().__init__(ctx, config)
self.config = config
self.report = StaleEntityRemovalSourceReport()

def login(self):
try:
login_response = requests.post(
f"{self.config.manager_uri}/v1/auth/",
json={"name": self.config.api_key, "secret": self.config.api_secret},
)
except requests.exceptions.RequestException as e:
logger.error(f"Failed to authenticate with Preset: {e}")
raise e

self.access_token = login_response.json()["payload"]["access_token"]
logger.debug("Got access token from Preset")

requests_session = requests.Session()
requests_session.headers.update(
{
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
"Accept": "*/*",
}
)
# Test the connection
test_response = requests_session.get(f"{self.config.connect_uri}/version")
if not test_response.ok:
logger.error("Unable to connect to workspace")
return requests_session
28 changes: 18 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ class SupersetConfig(
)
username: Optional[str] = Field(default=None, description="Superset username.")
password: Optional[str] = Field(default=None, description="Superset password.")

api_key: Optional[str] = Field(default=None, description="Preset.io API key.")
api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.")
manager_uri: str = Field(
default="https://api.app.preset.io", description="Preset.io API URL"
)
# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description="Superset Stateful Ingestion Config."
Expand Down Expand Up @@ -179,7 +183,14 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig):
super().__init__(config, ctx)
self.config = config
self.report = StaleEntityRemovalSourceReport()
if self.config.domain:
self.domain_registry = DomainRegistry(
cached_domains=[domain_id for domain_id in self.config.domain],
graph=self.ctx.graph,
)
self.session = self.login()

def login(self) -> requests.Session:
login_response = requests.post(
f"{self.config.connect_uri}/api/v1/security/login",
json={
Expand All @@ -193,26 +204,23 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig):
self.access_token = login_response.json()["access_token"]
logger.debug("Got access token from superset")

self.session = requests.Session()
self.session.headers.update(
requests_session = requests.Session()
requests_session.headers.update(
{
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
"Accept": "*/*",
}
)

if self.config.domain:
self.domain_registry = DomainRegistry(
cached_domains=[domain_id for domain_id in self.config.domain],
graph=self.ctx.graph,
)

# Test the connection
test_response = self.session.get(f"{self.config.connect_uri}/api/v1/dashboard/")
test_response = requests_session.get(
f"{self.config.connect_uri}/api/v1/dashboard/"
)
if test_response.status_code == 200:
pass
# TODO(Gabe): how should we message about this error?
return requests_session

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
Expand Down
Loading

0 comments on commit 3ba0b7b

Please sign in to comment.