diff --git a/.github/workflows/airflow-plugin.yml b/.github/workflows/airflow-plugin.yml index e75bafdac9628..66a08dc63aa0d 100644 --- a/.github/workflows/airflow-plugin.yml +++ b/.github/workflows/airflow-plugin.yml @@ -54,6 +54,9 @@ jobs: - python-version: "3.11" extra_pip_requirements: "apache-airflow~=2.9.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.11.txt" extra_pip_extras: plugin-v2 + - python-version: "3.11" + extra_pip_requirements: "apache-airflow~=2.10.2 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.11.txt" + extra_pip_extras: plugin-v2 fail-fast: false steps: - name: Set up JDK 17 diff --git a/datahub-web-react/src/app/entity/shared/containers/profile/EntityProfile.tsx b/datahub-web-react/src/app/entity/shared/containers/profile/EntityProfile.tsx index a9737c9698f7b..1deeed076d8d6 100644 --- a/datahub-web-react/src/app/entity/shared/containers/profile/EntityProfile.tsx +++ b/datahub-web-react/src/app/entity/shared/containers/profile/EntityProfile.tsx @@ -323,7 +323,13 @@ export const EntityProfile = ({ {showBrowseBar && } {entityData?.status?.removed === true && ( + This entity is marked as soft-deleted, likely due to stateful ingestion or a manual + deletion command, and will not appear in search or lineage graphs. Contact your DataHub + admin for more information. + + } banner /> )} diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md index 65da1fd5251dc..aca6d30619ea8 100644 --- a/docs/lineage/airflow.md +++ b/docs/lineage/airflow.md @@ -141,6 +141,7 @@ conn_id = datahub_rest_default # or datahub_kafka_default | capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. | | capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. | | materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. | +| render_templates | true | If true, jinja-templated fields will be automatically rendered to improve the accuracy of SQL statement extraction. | | datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. | | | | graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. | diff --git a/metadata-ingestion-modules/airflow-plugin/setup.py b/metadata-ingestion-modules/airflow-plugin/setup.py index 1587b4793fbf1..0d5ceefd989dc 100644 --- a/metadata-ingestion-modules/airflow-plugin/setup.py +++ b/metadata-ingestion-modules/airflow-plugin/setup.py @@ -44,7 +44,7 @@ def get_long_description(): # We remain restrictive on the versions allowed here to prevent # us from being broken by backwards-incompatible changes in the # underlying package. - "openlineage-airflow>=1.2.0,<=1.18.0", + "openlineage-airflow>=1.2.0,<=1.22.0", }, } diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py index c37a1b334ed37..8deba22a107ce 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py @@ -43,19 +43,24 @@ class DatahubLineageConfig(ConfigModel): capture_executions: bool = False + datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE + + # Note that this field is only respected by the lineage backend. + # The Airflow plugin v2 behaves as if it were set to True. + graceful_exceptions: bool = True + + # The remaining config fields are only relevant for the v2 plugin. enable_extractors: bool = True + # If true, ti.render_templates() will be called in the listener. + # Makes extraction of jinja-templated fields more accurate. + render_templates: bool = True + log_level: Optional[str] = None debug_emitter: bool = False disable_openlineage_plugin: bool = True - # Note that this field is only respected by the lineage backend. - # The Airflow plugin behaves as if it were set to True. - graceful_exceptions: bool = True - - datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE - def make_emitter_hook(self) -> "DatahubGenericHook": # This is necessary to avoid issues with circular imports. from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook @@ -84,6 +89,7 @@ def get_lineage_config() -> DatahubLineageConfig: disable_openlineage_plugin = conf.get( "datahub", "disable_openlineage_plugin", fallback=True ) + render_templates = conf.get("datahub", "render_templates", fallback=True) datajob_url_link = conf.get( "datahub", "datajob_url_link", fallback=DatajobUrl.TASKINSTANCE.value ) @@ -102,4 +108,5 @@ def get_lineage_config() -> DatahubLineageConfig: debug_emitter=debug_emitter, disable_openlineage_plugin=disable_openlineage_plugin, datajob_url_link=datajob_url_link, + render_templates=render_templates, ) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index 123b74fee74b5..b818b76de9f7f 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -386,7 +386,8 @@ def on_task_instance_running( f"DataHub listener got notification about task instance start for {task_instance.task_id}" ) - task_instance = _render_templates(task_instance) + if self.config.render_templates: + task_instance = _render_templates(task_instance) # The type ignore is to placate mypy on Airflow 2.1.x. dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined] @@ -478,7 +479,8 @@ def on_task_instance_finish( ) -> None: dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined] - task_instance = _render_templates(task_instance) + if self.config.render_templates: + task_instance = _render_templates(task_instance) # We must prefer the task attribute, in case modifications to the task's inlets/outlets # were made by the execute() method. diff --git a/metadata-ingestion-modules/airflow-plugin/tox.ini b/metadata-ingestion-modules/airflow-plugin/tox.ini index 9e0a30df6fcbd..2e4596a24c2a6 100644 --- a/metadata-ingestion-modules/airflow-plugin/tox.ini +++ b/metadata-ingestion-modules/airflow-plugin/tox.ini @@ -4,7 +4,7 @@ # and then run "tox" from this directory. [tox] -envlist = py38-airflow21, py38-airflow22, py310-airflow24, py310-airflow26, py310-airflow27, py310-airflow28, py311-airflow29 +envlist = py38-airflow21, py38-airflow22, py310-airflow24, py310-airflow26, py310-airflow27, py310-airflow28, py311-airflow29, py311-airflow210 [testenv] use_develop = true @@ -20,6 +20,7 @@ deps = airflow27: apache-airflow~=2.7.0 airflow28: apache-airflow~=2.8.0 airflow29: apache-airflow~=2.9.0 + airflow210: apache-airflow~=2.10.0 # Respect the Airflow constraints files. # We can't make ourselves work with the constraints of Airflow < 2.3. @@ -30,6 +31,7 @@ deps = py310-airflow27: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt py310-airflow28: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt py311-airflow29: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.11.txt + py311-airflow210: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.11.txt # Before pinning to the constraint files, we previously left the dependencies # more open. There were a number of packages for which this caused issues. @@ -57,6 +59,6 @@ commands = [testenv:py310-airflow24] extras = dev,integration-tests,plugin-v2,test-airflow24 -[testenv:py310-airflow{26,27,28},py311-airflow{29}] +[testenv:py310-airflow{26,27,28},py311-airflow{29,210}] extras = dev,integration-tests,plugin-v2 diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index bf80172441405..f14c080df644a 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -722,6 +722,7 @@ "snowflake-summary = datahub.ingestion.source.snowflake.snowflake_summary:SnowflakeSummarySource", "snowflake-queries = datahub.ingestion.source.snowflake.snowflake_queries:SnowflakeQueriesSource", "superset = datahub.ingestion.source.superset:SupersetSource", + "preset = datahub.ingestion.source.preset:PresetSource", "tableau = datahub.ingestion.source.tableau.tableau:TableauSource", "openapi = datahub.ingestion.source.openapi:OpenApiSource", "metabase = datahub.ingestion.source.metabase:MetabaseSource", diff --git a/metadata-ingestion/src/datahub/cli/config_utils.py b/metadata-ingestion/src/datahub/cli/config_utils.py index bb85809174ea9..5d9604de7836f 100644 --- a/metadata-ingestion/src/datahub/cli/config_utils.py +++ b/metadata-ingestion/src/datahub/cli/config_utils.py @@ -84,6 +84,13 @@ def _get_config_from_env() -> Tuple[Optional[str], Optional[str]]: return url or host, token +def require_config_from_env() -> Tuple[str, Optional[str]]: + host, token = _get_config_from_env() + if host is None: + raise MissingConfigError("No GMS host was provided in env variables.") + return host, token + + def load_client_config() -> DatahubClientConfig: gms_host_env, gms_token_env = _get_config_from_env() if gms_host_env: diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index e370ad3562a06..948060c3c4f44 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -76,6 +76,12 @@ def __init__( ): if not gms_server: raise ConfigurationError("gms server is required") + if gms_server == "__from_env__" and token is None: + # HACK: similar to what we do with system auth, we transparently + # inject the config in here. Ideally this should be done in the + # config loader or by the caller, but it gets the job done for now. + gms_server, token = config_utils.require_config_from_env() + self._gms_server = fixup_gms_url(gms_server) self._token = token self.server_config: Dict[str, Any] = {} diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 0fdb7bb537457..b9b0ed556e66c 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -214,27 +214,28 @@ def _get_generic(self, url: str, params: Optional[Dict] = None) -> Dict: def _post_generic(self, url: str, payload_dict: Dict) -> Dict: return self._send_restli_request("POST", url, json=payload_dict) - def _make_rest_sink_config(self) -> "DatahubRestSinkConfig": - from datahub.ingestion.sink.datahub_rest import ( - DatahubRestSinkConfig, - RestSinkMode, - ) + def _make_rest_sink_config( + self, extra_config: Optional[Dict] = None + ) -> "DatahubRestSinkConfig": + from datahub.ingestion.sink.datahub_rest import DatahubRestSinkConfig # This is a bit convoluted - this DataHubGraph class is a subclass of DatahubRestEmitter, # but initializing the rest sink creates another rest emitter. # TODO: We should refactor out the multithreading functionality of the sink # into a separate class that can be used by both the sink and the graph client # e.g. a DatahubBulkRestEmitter that both the sink and the graph client use. - return DatahubRestSinkConfig(**self.config.dict(), mode=RestSinkMode.ASYNC) + return DatahubRestSinkConfig(**self.config.dict(), **(extra_config or {})) @contextlib.contextmanager def make_rest_sink( - self, run_id: str = _GRAPH_DUMMY_RUN_ID + self, + run_id: str = _GRAPH_DUMMY_RUN_ID, + extra_sink_config: Optional[Dict] = None, ) -> Iterator["DatahubRestSink"]: from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.sink.datahub_rest import DatahubRestSink - sink_config = self._make_rest_sink_config() + sink_config = self._make_rest_sink_config(extra_config=extra_sink_config) with DatahubRestSink(PipelineContext(run_id=run_id), sink_config) as sink: yield sink if sink.report.failures: diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py index 9059dcca3e2b8..5b4d3fe38ecd9 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py @@ -79,6 +79,7 @@ class DataHubRestSinkReport(SinkReport): gms_version: Optional[str] = None pending_requests: int = 0 + async_batches_prepared: int = 0 async_batches_split: int = 0 main_thread_blocking_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer) @@ -260,6 +261,7 @@ def _emit_batch_wrapper( events.append(event) chunks = self.emitter.emit_mcps(events) + self.report.async_batches_prepared += 1 if chunks > 1: self.report.async_batches_split += chunks logger.info( diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py index 0b5c164a6b2c7..ab55d4e15e5de 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py @@ -196,11 +196,25 @@ def folder_ancestors( fields: Union[str, List[str]] = ["id", "name", "parent_id"], ) -> Sequence[Folder]: self.client_stats.folder_calls += 1 - return self.client.folder_ancestors( - folder_id, - self.__fields_mapper(fields), - transport_options=self.transport_options, - ) + try: + return self.client.folder_ancestors( + folder_id, + self.__fields_mapper(fields), + transport_options=self.transport_options, + ) + except SDKError as e: + if "Looker Not Found (404)" in str(e): + # Folder ancestors not found + logger.info( + f"Could not find ancestors for folder with id {folder_id}: 404 error" + ) + else: + logger.warning( + f"Could not find ancestors for folder with id {folder_id}" + ) + logger.warning(f"Failure was {e}") + # Folder ancestors not found + return [] def all_connections(self): self.client_stats.all_connections_calls += 1 diff --git a/metadata-ingestion/src/datahub/ingestion/source/preset.py b/metadata-ingestion/src/datahub/ingestion/source/preset.py new file mode 100644 index 0000000000000..e51520898103d --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/preset.py @@ -0,0 +1,114 @@ +import logging +from typing import Dict, Optional + +import requests +from pydantic.class_validators import root_validator, validator +from pydantic.fields import Field + +from datahub.emitter.mce_builder import DEFAULT_ENV +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import ( + SourceCapability, + SupportStatus, + capability, + config_class, + platform_name, + support_status, +) +from datahub.ingestion.source.state.stale_entity_removal_handler import ( + StaleEntityRemovalSourceReport, + StatefulStaleMetadataRemovalConfig, +) +from datahub.ingestion.source.superset import SupersetConfig, SupersetSource +from datahub.utilities import config_clean + +logger = logging.getLogger(__name__) + + +class PresetConfig(SupersetConfig): + manager_uri: str = Field( + default="https://api.app.preset.io", description="Preset.io API URL" + ) + connect_uri: str = Field(default="", description="Preset workspace URL.") + display_uri: Optional[str] = Field( + default=None, + description="optional URL to use in links (if `connect_uri` is only for ingestion)", + ) + api_key: Optional[str] = Field(default=None, description="Preset.io API key.") + api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.") + + # Configuration for stateful ingestion + stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field( + default=None, description="Preset Stateful Ingestion Config." + ) + + options: Dict = Field(default={}, description="") + env: str = Field( + default=DEFAULT_ENV, + description="Environment to use in namespace when constructing URNs", + ) + database_alias: Dict[str, str] = Field( + default={}, + description="Can be used to change mapping for database names in superset to what you have in datahub", + ) + + @validator("connect_uri", "display_uri") + def remove_trailing_slash(cls, v): + return config_clean.remove_trailing_slashes(v) + + @root_validator + def default_display_uri_to_connect_uri(cls, values): + base = values.get("display_uri") + if base is None: + values["display_uri"] = values.get("connect_uri") + return values + + +@platform_name("Preset") +@config_class(PresetConfig) +@support_status(SupportStatus.TESTING) +@capability( + SourceCapability.DELETION_DETECTION, "Optionally enabled via stateful_ingestion" +) +class PresetSource(SupersetSource): + """ + Variation of the Superset plugin that works with Preset.io (Apache Superset SaaS). + """ + + config: PresetConfig + report: StaleEntityRemovalSourceReport + platform = "preset" + + def __init__(self, ctx: PipelineContext, config: PresetConfig): + logger.info(f"ctx is {ctx}") + + super().__init__(ctx, config) + self.config = config + self.report = StaleEntityRemovalSourceReport() + + def login(self): + try: + login_response = requests.post( + f"{self.config.manager_uri}/v1/auth/", + json={"name": self.config.api_key, "secret": self.config.api_secret}, + ) + except requests.exceptions.RequestException as e: + logger.error(f"Failed to authenticate with Preset: {e}") + raise e + + self.access_token = login_response.json()["payload"]["access_token"] + logger.debug("Got access token from Preset") + + requests_session = requests.Session() + requests_session.headers.update( + { + "Authorization": f"Bearer {self.access_token}", + "Content-Type": "application/json", + "Accept": "*/*", + } + ) + # Test the connection + test_response = requests_session.get(f"{self.config.connect_uri}/version") + if not test_response.ok: + logger.error("Unable to connect to workspace") + return requests_session diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py index c73472f1b8041..9d77e13a0f3c2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py @@ -29,6 +29,7 @@ STATEFUL_INGESTION_IGNORED_ENTITY_TYPES = { "dataProcessInstance", + "query", } @@ -75,7 +76,10 @@ def auto_stale_entity_removal( if wu.is_primary_source: entity_type = guess_entity_type(urn) - if entity_type is not None: + if ( + entity_type is not None + and entity_type not in STATEFUL_INGESTION_IGNORED_ENTITY_TYPES + ): stale_entity_removal_handler.add_entity_to_state(entity_type, urn) else: stale_entity_removal_handler.add_urn_to_skip(urn) diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index e563a806446c4..858281f880359 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -101,7 +101,11 @@ class SupersetConfig( ) username: Optional[str] = Field(default=None, description="Superset username.") password: Optional[str] = Field(default=None, description="Superset password.") - + api_key: Optional[str] = Field(default=None, description="Preset.io API key.") + api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.") + manager_uri: str = Field( + default="https://api.app.preset.io", description="Preset.io API URL" + ) # Configuration for stateful ingestion stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field( default=None, description="Superset Stateful Ingestion Config." @@ -179,7 +183,14 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig): super().__init__(config, ctx) self.config = config self.report = StaleEntityRemovalSourceReport() + if self.config.domain: + self.domain_registry = DomainRegistry( + cached_domains=[domain_id for domain_id in self.config.domain], + graph=self.ctx.graph, + ) + self.session = self.login() + def login(self) -> requests.Session: login_response = requests.post( f"{self.config.connect_uri}/api/v1/security/login", json={ @@ -193,8 +204,8 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig): self.access_token = login_response.json()["access_token"] logger.debug("Got access token from superset") - self.session = requests.Session() - self.session.headers.update( + requests_session = requests.Session() + requests_session.headers.update( { "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json", @@ -202,17 +213,14 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig): } ) - if self.config.domain: - self.domain_registry = DomainRegistry( - cached_domains=[domain_id for domain_id in self.config.domain], - graph=self.ctx.graph, - ) - # Test the connection - test_response = self.session.get(f"{self.config.connect_uri}/api/v1/dashboard/") + test_response = requests_session.get( + f"{self.config.connect_uri}/api/v1/dashboard/" + ) if test_response.status_code == 200: pass # TODO(Gabe): how should we message about this error? + return requests_session @classmethod def create(cls, config_dict: dict, ctx: PipelineContext) -> Source: diff --git a/metadata-ingestion/tests/integration/preset/golden_test_ingest.json b/metadata-ingestion/tests/integration/preset/golden_test_ingest.json new file mode 100644 index 0000000000000..5aca7f3e5bd14 --- /dev/null +++ b/metadata-ingestion/tests/integration/preset/golden_test_ingest.json @@ -0,0 +1,286 @@ +[ +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot": { + "urn": "urn:li:dashboard:(preset,1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dashboard.DashboardInfo": { + "customProperties": { + "Status": "published", + "IsPublished": "true", + "Owners": "test_username_1, test_username_2", + "IsCertified": "true", + "CertifiedBy": "Certification team", + "CertificationDetails": "Approved" + }, + "title": "test_dashboard_title_1", + "description": "", + "charts": [ + "urn:li:chart:(preset,10)", + "urn:li:chart:(preset,11)" + ], + "datasets": [], + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_1" + } + }, + "dashboardUrl": "mock://mock-domain.preset.io/dashboard/test_dashboard_url_1" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot": { + "urn": "urn:li:dashboard:(preset,2)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dashboard.DashboardInfo": { + "customProperties": { + "Status": "draft", + "IsPublished": "false", + "Owners": "unknown", + "IsCertified": "false" + }, + "title": "test_dashboard_title_2", + "description": "", + "charts": [ + "urn:li:chart:(preset,12)", + "urn:li:chart:(preset,13)" + ], + "datasets": [], + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_2" + } + }, + "dashboardUrl": "mock://mock-domain.preset.io/dashboard/test_dashboard_url_2" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { + "urn": "urn:li:chart:(preset,10)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.chart.ChartInfo": { + "customProperties": { + "Metrics": "", + "Filters": "", + "Dimensions": "" + }, + "title": "test_chart_title_1", + "description": "", + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_1" + } + }, + "chartUrl": "mock://mock-domain.preset.io/explore/test_chart_url_10", + "inputs": [ + { + "string": "urn:li:dataset:(urn:li:dataPlatform:external,test_database_name.test_schema_name.test_table_name,PROD)" + } + ], + "type": "BAR" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { + "urn": "urn:li:chart:(preset,11)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.chart.ChartInfo": { + "customProperties": { + "Metrics": "", + "Filters": "", + "Dimensions": "" + }, + "title": "test_chart_title_2", + "description": "", + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_1" + } + }, + "chartUrl": "mock://mock-domain.preset.io/explore/test_chart_url_11", + "inputs": [ + { + "string": "urn:li:dataset:(urn:li:dataPlatform:external,test_database_name.test_schema_name.test_table_name,PROD)" + } + ], + "type": "PIE" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { + "urn": "urn:li:chart:(preset,12)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.chart.ChartInfo": { + "customProperties": { + "Metrics": "", + "Filters": "", + "Dimensions": "" + }, + "title": "test_chart_title_3", + "description": "", + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_2" + } + }, + "chartUrl": "mock://mock-domain.preset.io/explore/test_chart_url_12", + "inputs": [ + { + "string": "urn:li:dataset:(urn:li:dataPlatform:external,test_database_name.test_schema_name.test_table_name,PROD)" + } + ], + "type": "AREA" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { + "urn": "urn:li:chart:(preset,13)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.chart.ChartInfo": { + "customProperties": { + "Metrics": "", + "Filters": "", + "Dimensions": "" + }, + "title": "test_chart_title_4", + "description": "", + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_2" + } + }, + "chartUrl": "mock://mock-domain.preset.io/explore/test_chart_url_13", + "inputs": [ + { + "string": "urn:li:dataset:(urn:li:dataPlatform:external,test_database_name.test_schema_name.test_table_name,PROD)" + } + ], + "type": "HISTOGRAM" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-test", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/preset/golden_test_stateful_ingest.json b/metadata-ingestion/tests/integration/preset/golden_test_stateful_ingest.json new file mode 100644 index 0000000000000..719f0a78fb7d7 --- /dev/null +++ b/metadata-ingestion/tests/integration/preset/golden_test_stateful_ingest.json @@ -0,0 +1,261 @@ +[ +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot": { + "urn": "urn:li:dashboard:(preset,1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dashboard.DashboardInfo": { + "customProperties": { + "Status": "published", + "IsPublished": "true", + "Owners": "test_username_1, test_username_2", + "IsCertified": "true", + "CertifiedBy": "Certification team", + "CertificationDetails": "Approved" + }, + "title": "test_dashboard_title_1", + "description": "", + "charts": [ + "urn:li:chart:(preset,10)", + "urn:li:chart:(preset,11)" + ], + "datasets": [], + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_1" + } + }, + "dashboardUrl": "mock://mock-domain.preset.io/dashboard/test_dashboard_url_1" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-2024_07_10-07_00_00", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { + "urn": "urn:li:chart:(preset,10)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.chart.ChartInfo": { + "customProperties": { + "Metrics": "", + "Filters": "", + "Dimensions": "" + }, + "title": "test_chart_title_1", + "description": "", + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_1" + } + }, + "chartUrl": "mock://mock-domain.preset.io/explore/test_chart_url_10", + "inputs": [ + { + "string": "urn:li:dataset:(urn:li:dataPlatform:external,test_database_name.test_schema_name.test_table_name,PROD)" + } + ], + "type": "BAR" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-2024_07_10-07_00_00", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { + "urn": "urn:li:chart:(preset,11)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.chart.ChartInfo": { + "customProperties": { + "Metrics": "", + "Filters": "", + "Dimensions": "" + }, + "title": "test_chart_title_2", + "description": "", + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_1" + } + }, + "chartUrl": "mock://mock-domain.preset.io/explore/test_chart_url_11", + "inputs": [ + { + "string": "urn:li:dataset:(urn:li:dataPlatform:external,test_database_name.test_schema_name.test_table_name,PROD)" + } + ], + "type": "PIE" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-2024_07_10-07_00_00", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { + "urn": "urn:li:chart:(preset,12)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.chart.ChartInfo": { + "customProperties": { + "Metrics": "", + "Filters": "", + "Dimensions": "" + }, + "title": "test_chart_title_3", + "description": "", + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_2" + } + }, + "chartUrl": "mock://mock-domain.preset.io/explore/test_chart_url_12", + "inputs": [ + { + "string": "urn:li:dataset:(urn:li:dataPlatform:external,test_database_name.test_schema_name.test_table_name,PROD)" + } + ], + "type": "AREA" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-2024_07_10-07_00_00", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { + "urn": "urn:li:chart:(preset,13)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.chart.ChartInfo": { + "customProperties": { + "Metrics": "", + "Filters": "", + "Dimensions": "" + }, + "title": "test_chart_title_4", + "description": "", + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 1720594800000, + "actor": "urn:li:corpuser:test_username_2" + } + }, + "chartUrl": "mock://mock-domain.preset.io/explore/test_chart_url_13", + "inputs": [ + { + "string": "urn:li:dataset:(urn:li:dataPlatform:external,test_database_name.test_schema_name.test_table_name,PROD)" + } + ], + "type": "HISTOGRAM" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-2024_07_10-07_00_00", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(preset,2)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": true + } + }, + "systemMetadata": { + "lastObserved": 1720594800000, + "runId": "preset-2024_07_10-07_00_00", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/preset/test_preset.py b/metadata-ingestion/tests/integration/preset/test_preset.py new file mode 100644 index 0000000000000..f926a762e6a07 --- /dev/null +++ b/metadata-ingestion/tests/integration/preset/test_preset.py @@ -0,0 +1,366 @@ +from typing import Any, Dict, Optional +from unittest.mock import patch + +import pytest +from freezegun import freeze_time + +from datahub.ingestion.run.pipeline import Pipeline +from tests.test_helpers import mce_helpers +from tests.test_helpers.state_helpers import ( + get_current_checkpoint_from_pipeline, + run_and_get_pipeline, + validate_all_providers_have_committed_successfully, +) + +FROZEN_TIME = "2024-07-10 07:00:00" +GMS_PORT = 8080 +GMS_SERVER = f"http://localhost:{GMS_PORT}" + + +def register_mock_api(request_mock: Any, override_data: Optional[dict] = None) -> None: + if override_data is None: + override_data = {} + + api_vs_response = { + "mock://mock-domain.preset.io/v1/auth/": { + "method": "POST", + "status_code": 200, + "json": { + "payload": { + "access_token": "test_token", + } + }, + }, + "mock://mock-domain.preset.io/version": { + "method": "GET", + "status_code": 200, + "json": { + "ci": { + "built_at": "Tue Jul 10 00:00:00 UTC 2024", + "build_num": "1", + "triggered_by": "Not triggered by a user", + }, + "git": { + "branch": "4.0.1.6", + "sha": "test_sha", + "sha_superset": "test_sha_superset", + "release_name": "test_release_name", + }, + "chart_version": "1.16.1", + "start_time": "2024-07-10 00:00:00", + "mt_deployment": True, + }, + }, + "mock://mock-domain.preset.io/api/v1/dashboard/": { + "method": "GET", + "status_code": 200, + "json": { + "count": 2, + "result": [ + { + "id": "1", + "changed_by": { + "username": "test_username_1", + }, + "changed_on_utc": "2024-07-10T07:00:00.000000+0000", + "dashboard_title": "test_dashboard_title_1", + "url": "/dashboard/test_dashboard_url_1", + "position_json": '{"CHART-test-1": {"meta": { "chartId": "10" }}, "CHART-test-2": {"meta": { "chartId": "11" }}}', + "status": "published", + "published": True, + "owners": [ + { + "username": "test_username_1", + }, + { + "username": "test_username_2", + }, + ], + "certified_by": "Certification team", + "certification_details": "Approved", + }, + { + "id": "2", + "changed_by": { + "username": "test_username_2", + }, + "changed_on_utc": "2024-07-10T07:00:00.000000+0000", + "dashboard_title": "test_dashboard_title_2", + "url": "/dashboard/test_dashboard_url_2", + "position_json": '{"CHART-test-3": {"meta": { "chartId": "12" }}, "CHART-test-4": {"meta": { "chartId": "13" }}}', + "status": "draft", + "published": False, + "owners": [ + { + "first_name": "name", + }, + ], + "certified_by": "", + "certification_details": "", + }, + ], + }, + }, + "mock://mock-domain.preset.io/api/v1/chart/": { + "method": "GET", + "status_code": 200, + "json": { + "count": 4, + "result": [ + { + "id": "10", + "changed_by": { + "username": "test_username_1", + }, + "changed_on_utc": "2024-07-10T07:00:00.000000+0000", + "slice_name": "test_chart_title_1", + "viz_type": "box_plot", + "url": "/explore/test_chart_url_10", + "datasource_id": "20", + "params": '{"metrics": [], "adhoc_filters": []}', + }, + { + "id": "11", + "changed_by": { + "username": "test_username_1", + }, + "changed_on_utc": "2024-07-10T07:00:00.000000+0000", + "slice_name": "test_chart_title_2", + "viz_type": "pie", + "url": "/explore/test_chart_url_11", + "datasource_id": "20", + "params": '{"metrics": [], "adhoc_filters": []}', + }, + { + "id": "12", + "changed_by": { + "username": "test_username_2", + }, + "changed_on_utc": "2024-07-10T07:00:00.000000+0000", + "slice_name": "test_chart_title_3", + "viz_type": "treemap", + "url": "/explore/test_chart_url_12", + "datasource_id": "20", + "params": '{"metrics": [], "adhoc_filters": []}', + }, + { + "id": "13", + "changed_by": { + "username": "test_username_2", + }, + "changed_on_utc": "2024-07-10T07:00:00.000000+0000", + "slice_name": "test_chart_title_4", + "viz_type": "histogram", + "url": "/explore/test_chart_url_13", + "datasource_id": "20", + "params": '{"metrics": [], "adhoc_filters": []}', + }, + ], + }, + }, + "mock://mock-domain.preset.io/api/v1/dataset/20": { + "method": "GET", + "status_code": 200, + "json": { + "result": { + "schema": "test_schema_name", + "table_name": "test_table_name", + "database": { + "id": "30", + "database_name": "test_database_name", + }, + }, + }, + }, + "mock://mock-domain.preset.io/api/v1/database/30": { + "method": "GET", + "status_code": 200, + "json": { + "result": { + "sqlalchemy_uri": "test_sqlalchemy_uri", + }, + }, + }, + } + + api_vs_response.update(override_data) + + for url in api_vs_response: + request_mock.register_uri( + api_vs_response[url]["method"], + url, + json=api_vs_response[url]["json"], + status_code=api_vs_response[url]["status_code"], + ) + + +@freeze_time(FROZEN_TIME) +@pytest.mark.integration +def test_preset_ingest(pytestconfig, tmp_path, mock_time, requests_mock): + test_resources_dir = pytestconfig.rootpath / "tests/integration/preset" + + register_mock_api(request_mock=requests_mock) + + pipeline = Pipeline.create( + { + "run_id": "preset-test", + "source": { + "type": "preset", + "config": { + "connect_uri": "mock://mock-domain.preset.io/", + "manager_uri": "mock://mock-domain.preset.io", + "api_key": "test_key", + "api_secret": "test_secret", + "provider": "db", + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/preset_mces.json", + }, + }, + } + ) + + pipeline.run() + pipeline.raise_from_status() + golden_file = "golden_test_ingest.json" + + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / "preset_mces.json", + golden_path=f"{test_resources_dir}/{golden_file}", + ) + + +@freeze_time(FROZEN_TIME) +@pytest.mark.integration +def test_preset_stateful_ingest( + pytestconfig, tmp_path, mock_time, requests_mock, mock_datahub_graph +): + test_resources_dir = pytestconfig.rootpath / "tests/integration/preset" + + register_mock_api(request_mock=requests_mock) + + pipeline_config_dict: Dict[str, Any] = { + "source": { + "type": "preset", + "config": { + "connect_uri": "mock://mock-domain.preset.io/", + "manager_uri": "mock://mock-domain.preset.io", + "api_key": "test_key", + "api_secret": "test_secret", + "provider": "db", + # enable stateful ingestion + "stateful_ingestion": { + "enabled": True, + "remove_stale_metadata": True, + "fail_safe_threshold": 100.0, + "state_provider": { + "type": "datahub", + "config": {"datahub_api": {"server": GMS_SERVER}}, + }, + }, + }, + }, + "sink": { + # we are not really interested in the resulting events for this test + "type": "console" + }, + "pipeline_name": "test_pipeline", + } + + dashboard_endpoint_override = { + "mock://mock-domain.preset.io/api/v1/dashboard/": { + "method": "GET", + "status_code": 200, + "json": { + "count": 1, + "result": [ + { + "id": "1", + "changed_by": { + "username": "test_username_1", + }, + "changed_on_utc": "2024-07-10T07:00:00.000000+0000", + "dashboard_title": "test_dashboard_title_1", + "url": "/dashboard/test_dashboard_url_1", + "position_json": '{"CHART-test-1": {"meta": { "chartId": "10" }}, "CHART-test-2": {"meta": { "chartId": "11" }}}', + "status": "published", + "published": True, + "owners": [ + { + "username": "test_username_1", + }, + { + "username": "test_username_2", + }, + ], + "certified_by": "Certification team", + "certification_details": "Approved", + }, + ], + }, + }, + } + + with patch( + "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", + mock_datahub_graph, + ) as mock_checkpoint: + # Both checkpoint and reporting will use the same mocked graph instance. + mock_checkpoint.return_value = mock_datahub_graph + + # Do the first run of the pipeline and get the default job's checkpoint. + pipeline_run1 = run_and_get_pipeline(pipeline_config_dict) + checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1) + + assert checkpoint1 + assert checkpoint1.state + + # Remove one dashboard from the preset config. + register_mock_api( + request_mock=requests_mock, override_data=dashboard_endpoint_override + ) + + # Capture MCEs of second run to validate Status(removed=true) + deleted_mces_path = f"{tmp_path}/preset_deleted_mces.json" + pipeline_config_dict["sink"]["type"] = "file" + pipeline_config_dict["sink"]["config"] = {"filename": deleted_mces_path} + + # Do the second run of the pipeline. + pipeline_run2 = run_and_get_pipeline(pipeline_config_dict) + checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2) + + assert checkpoint2 + assert checkpoint2.state + + # Perform all assertions on the states. The deleted dashboard should not be + # part of the second state + state1 = checkpoint1.state + state2 = checkpoint2.state + difference_urns = list( + state1.get_urns_not_in(type="dashboard", other_checkpoint_state=state2) + ) + + assert len(difference_urns) == 1 + + urn1 = "urn:li:dashboard:(preset,2)" + + assert urn1 in difference_urns + + # Validate that all providers have committed successfully. + validate_all_providers_have_committed_successfully( + pipeline=pipeline_run1, expected_providers=1 + ) + validate_all_providers_have_committed_successfully( + pipeline=pipeline_run2, expected_providers=1 + ) + + # Verify the output. + mce_helpers.check_golden_file( + pytestconfig, + output_path=deleted_mces_path, + golden_path=test_resources_dir / "golden_test_stateful_ingest.json", + ) diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json index ce03804279097..22f63da8ecb95 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json @@ -17,7 +17,7 @@ "state": { "formatVersion": "1.0", "serde": "utf-8", - "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\", \"urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619\"]}" + "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\", \"urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619\", \"urn:li:query:query1\"]}" }, "runId": "dummy-test-stateful-ingestion" } diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json index 6a00e67a2ca21..a155c4cf1dbbb 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json @@ -8,8 +8,8 @@ "json": { "timestampMillis": 1586847600000, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "pipelineName": "dummy_stateful", "platformInstanceId": "", @@ -17,7 +17,7 @@ "state": { "formatVersion": "1.0", "serde": "utf-8", - "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataProcessInstance:7f26c3b4d2d82ace47f4b9dd0c9dea26\"]}" + "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\"]}" }, "runId": "dummy-test-stateful-ingestion" } diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json index adf11a2833914..5cb8576594db3 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json @@ -69,6 +69,23 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "query", + "entityUrn": "urn:li:query:query1", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided", + "pipelineName": "dummy_stateful" + } +}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619", @@ -84,5 +101,22 @@ "runId": "dummy-test-stateful-ingestion", "lastRunId": "no-run-id-provided" } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:query1", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided", + "pipelineName": "dummy_stateful" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json index e4893642d61ae..5300743f23ca8 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json @@ -47,12 +47,46 @@ } } }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided", + "pipelineName": "dummy_stateful" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": true + } + }, "systemMetadata": { "lastObserved": 1586847600000, "runId": "dummy-test-stateful-ingestion", "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "query", + "entityUrn": "urn:li:query:query2", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided", + "pipelineName": "dummy_stateful" + } +}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:7f26c3b4d2d82ace47f4b9dd0c9dea26", @@ -66,23 +100,25 @@ "systemMetadata": { "lastObserved": 1586847600000, "runId": "dummy-test-stateful-ingestion", - "lastRunId": "no-run-id-provided" + "lastRunId": "no-run-id-provided", + "pipelineName": "dummy_stateful" } }, { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)", + "entityType": "query", + "entityUrn": "urn:li:query:query2", "changeType": "UPSERT", "aspectName": "status", "aspect": { "json": { - "removed": true + "removed": false } }, "systemMetadata": { "lastObserved": 1586847600000, "runId": "dummy-test-stateful-ingestion", - "lastRunId": "no-run-id-provided" + "lastRunId": "no-run-id-provided", + "pipelineName": "dummy_stateful" } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py index e3a2a6cccea79..66564dc856aba 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py @@ -29,7 +29,12 @@ from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import ( DataProcessInstanceProperties, ) -from datahub.metadata.schema_classes import AuditStampClass, StatusClass +from datahub.metadata.schema_classes import ( + AuditStampClass, + DataPlatformInstanceClass, + StatusClass, +) +from datahub.metadata.urns import DataPlatformUrn, QueryUrn from datahub.utilities.urns.dataset_urn import DatasetUrn from tests.test_helpers import mce_helpers from tests.test_helpers.state_helpers import ( @@ -71,6 +76,9 @@ class DummySourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): default=None, description="Data process instance id to ingest.", ) + query_id_to_ingest: Optional[str] = Field( + default=None, description="Query id to ingest" + ) class DummySource(StatefulIngestionSourceBase): @@ -136,6 +144,14 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ), ).as_workunit() + if self.source_config.query_id_to_ingest: + yield MetadataChangeProposalWrapper( + entityUrn=QueryUrn(self.source_config.query_id_to_ingest).urn(), + aspect=DataPlatformInstanceClass( + platform=DataPlatformUrn("bigquery").urn() + ), + ).as_workunit() + if self.source_config.report_failure: self.reporter.report_failure("Dummy error", "Error") @@ -188,6 +204,7 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): }, }, "dpi_id_to_ingest": "job1", + "query_id_to_ingest": "query1", }, }, "sink": { @@ -198,7 +215,11 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): with mock.patch( "datahub.ingestion.source.state.stale_entity_removal_handler.StaleEntityRemovalHandler._get_state_obj" - ) as mock_state: + ) as mock_state, mock.patch( + "datahub.ingestion.source.state.stale_entity_removal_handler.STATEFUL_INGESTION_IGNORED_ENTITY_TYPES", + {}, + # Second mock is to imitate earlier behavior where entity type check was not present when adding entity to state + ): mock_state.return_value = GenericCheckpointState(serde="utf-8") pipeline_run1 = None pipeline_run1_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( # type: ignore @@ -237,6 +258,8 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): "allow": ["dummy_dataset1", "dummy_dataset2"], } pipeline_run2_config["source"]["config"]["dpi_id_to_ingest"] = "job2" + pipeline_run2_config["source"]["config"]["query_id_to_ingest"] = "query2" + pipeline_run2_config["sink"]["config"][ "filename" ] = f"{tmp_path}/{output_file_name_after_deleted}" @@ -288,6 +311,7 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): # assert report last ingestion state non_deletable entity urns non_deletable_urns: List[str] = [ "urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619", + "urn:li:query:query1", ] assert sorted(non_deletable_urns) == sorted( report.last_state_non_deletable_entities diff --git a/metadata-ingestion/tests/unit/test_preset_source.py b/metadata-ingestion/tests/unit/test_preset_source.py new file mode 100644 index 0000000000000..d97db651f4c79 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_preset_source.py @@ -0,0 +1,22 @@ +from datahub.ingestion.source.preset import PresetConfig + + +def test_default_values(): + config = PresetConfig.parse_obj({}) + + assert config.connect_uri == "" + assert config.manager_uri == "https://api.app.preset.io" + assert config.display_uri == "" + assert config.env == "PROD" + assert config.api_key is None + assert config.api_secret is None + + +def test_set_display_uri(): + display_uri = "some_host:1234" + + config = PresetConfig.parse_obj({"display_uri": display_uri}) + + assert config.connect_uri == "" + assert config.manager_uri == "https://api.app.preset.io" + assert config.display_uri == display_uri diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/EbeanAspectDaoTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/EbeanAspectDaoTest.java index 7d1fdd34026f9..775770d28b4a2 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/EbeanAspectDaoTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/EbeanAspectDaoTest.java @@ -9,7 +9,6 @@ import com.linkedin.metadata.EbeanTestUtils; import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.config.EbeanConfiguration; -import com.linkedin.metadata.entity.EbeanEntityServiceTest; import io.ebean.Database; import io.ebean.test.LoggedSql; import java.util.List; @@ -24,7 +23,7 @@ public class EbeanAspectDaoTest { @BeforeMethod public void setupTest() { - Database server = EbeanTestUtils.createTestServer(EbeanEntityServiceTest.class.getSimpleName()); + Database server = EbeanTestUtils.createTestServer(EbeanAspectDaoTest.class.getSimpleName()); testDao = new EbeanAspectDao(server, EbeanConfiguration.testDefault); } @@ -34,7 +33,8 @@ public void testGetNextVersionForUpdate() { testDao.runInTransactionWithRetryUnlocked( (txContext) -> { - testDao.getNextVersions(Map.of("urn:li:corpuser:test", Set.of("status"))); + testDao.getNextVersions( + Map.of("urn:li:corpuser:testGetNextVersionForUpdate", Set.of("status"))); return ""; }, mock(AspectsBatch.class), @@ -43,9 +43,9 @@ public void testGetNextVersionForUpdate() { // Get the captured SQL statements List sql = LoggedSql.stop().stream() - .filter(str -> str.contains("(t0.urn,t0.aspect,t0.version)")) + .filter(str -> str.contains("testGetNextVersionForUpdate")) .toList(); - assertEquals(sql.size(), 1, String.format("Found: %s", sql)); + assertEquals(sql.size(), 2, String.format("Found: %s", sql)); assertTrue( sql.get(0).contains("for update;"), String.format("Did not find `for update` in %s ", sql)); } diff --git a/metadata-models/src/main/pegasus/com/linkedin/dashboard/DashboardInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/dashboard/DashboardInfo.pdl index c436011eb58db..0ce19b32c8930 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/dashboard/DashboardInfo.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/dashboard/DashboardInfo.pdl @@ -100,6 +100,26 @@ record DashboardInfo includes CustomProperties, ExternalReference { } datasetEdges: optional array[Edge] + /** + * Dashboards included by this dashboard. + * Some dashboard entities (e.g. PowerBI Apps) can contain other dashboards. + * + * The Edge's sourceUrn should never be set, as it will always be the base dashboard. + */ + @Relationship = { + "/*/destinationUrn": { + "name": "DashboardContainsDashboard", + "entityTypes": [ "dashboard" ], + "isLineage": true, + "createdOn": "datasetEdges/*/created/time" + "createdActor": "datasetEdges/*/created/actor" + "updatedOn": "datasetEdges/*/lastModified/time" + "updatedActor": "datasetEdges/*/lastModified/actor" + "properties": "datasetEdges/*/properties" + } + } + dashboards: array[Edge] = [ ] + /** * Captures information about who created/last modified/deleted this dashboard and when */ diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json index 30d0e9a09cdf4..e9e2778a479d3 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json @@ -1473,6 +1473,26 @@ "updatedOn" : "datasetEdges/*/lastModified/time" } } + }, { + "name" : "dashboards", + "type" : { + "type" : "array", + "items" : "com.linkedin.common.Edge" + }, + "doc" : "Dashboards included by this dashboard.\nSome dashboard entities (e.g. PowerBI Apps) can contain other dashboards.\n\nThe Edge's sourceUrn should never be set, as it will always be the base dashboard.", + "default" : [ ], + "Relationship" : { + "/*/destinationUrn" : { + "createdActor" : "datasetEdges/*/created/actor", + "createdOn" : "datasetEdges/*/created/time", + "entityTypes" : [ "dashboard" ], + "isLineage" : true, + "name" : "DashboardContainsDashboard", + "properties" : "datasetEdges/*/properties", + "updatedActor" : "datasetEdges/*/lastModified/actor", + "updatedOn" : "datasetEdges/*/lastModified/time" + } + } }, { "name" : "lastModified", "type" : "com.linkedin.common.ChangeAuditStamps", diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json index 8cf02a768ecae..959cb5381fd9b 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json @@ -1500,6 +1500,26 @@ "updatedOn" : "datasetEdges/*/lastModified/time" } } + }, { + "name" : "dashboards", + "type" : { + "type" : "array", + "items" : "com.linkedin.common.Edge" + }, + "doc" : "Dashboards included by this dashboard.\nSome dashboard entities (e.g. PowerBI Apps) can contain other dashboards.\n\nThe Edge's sourceUrn should never be set, as it will always be the base dashboard.", + "default" : [ ], + "Relationship" : { + "/*/destinationUrn" : { + "createdActor" : "datasetEdges/*/created/actor", + "createdOn" : "datasetEdges/*/created/time", + "entityTypes" : [ "dashboard" ], + "isLineage" : true, + "name" : "DashboardContainsDashboard", + "properties" : "datasetEdges/*/properties", + "updatedActor" : "datasetEdges/*/lastModified/actor", + "updatedOn" : "datasetEdges/*/lastModified/time" + } + } }, { "name" : "lastModified", "type" : "com.linkedin.common.ChangeAuditStamps", diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json index d06f3b737a3e1..3e0cd46aba0c0 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json @@ -1206,6 +1206,26 @@ "updatedOn" : "datasetEdges/*/lastModified/time" } } + }, { + "name" : "dashboards", + "type" : { + "type" : "array", + "items" : "com.linkedin.common.Edge" + }, + "doc" : "Dashboards included by this dashboard.\nSome dashboard entities (e.g. PowerBI Apps) can contain other dashboards.\n\nThe Edge's sourceUrn should never be set, as it will always be the base dashboard.", + "default" : [ ], + "Relationship" : { + "/*/destinationUrn" : { + "createdActor" : "datasetEdges/*/created/actor", + "createdOn" : "datasetEdges/*/created/time", + "entityTypes" : [ "dashboard" ], + "isLineage" : true, + "name" : "DashboardContainsDashboard", + "properties" : "datasetEdges/*/properties", + "updatedActor" : "datasetEdges/*/lastModified/actor", + "updatedOn" : "datasetEdges/*/lastModified/time" + } + } }, { "name" : "lastModified", "type" : "com.linkedin.common.ChangeAuditStamps", diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json index 56562ff49ff8d..7f651a10139e2 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json @@ -1206,6 +1206,26 @@ "updatedOn" : "datasetEdges/*/lastModified/time" } } + }, { + "name" : "dashboards", + "type" : { + "type" : "array", + "items" : "com.linkedin.common.Edge" + }, + "doc" : "Dashboards included by this dashboard.\nSome dashboard entities (e.g. PowerBI Apps) can contain other dashboards.\n\nThe Edge's sourceUrn should never be set, as it will always be the base dashboard.", + "default" : [ ], + "Relationship" : { + "/*/destinationUrn" : { + "createdActor" : "datasetEdges/*/created/actor", + "createdOn" : "datasetEdges/*/created/time", + "entityTypes" : [ "dashboard" ], + "isLineage" : true, + "name" : "DashboardContainsDashboard", + "properties" : "datasetEdges/*/properties", + "updatedActor" : "datasetEdges/*/lastModified/actor", + "updatedOn" : "datasetEdges/*/lastModified/time" + } + } }, { "name" : "lastModified", "type" : "com.linkedin.common.ChangeAuditStamps", diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json index b90543745c65f..c3e04add825c9 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json @@ -1500,6 +1500,26 @@ "updatedOn" : "datasetEdges/*/lastModified/time" } } + }, { + "name" : "dashboards", + "type" : { + "type" : "array", + "items" : "com.linkedin.common.Edge" + }, + "doc" : "Dashboards included by this dashboard.\nSome dashboard entities (e.g. PowerBI Apps) can contain other dashboards.\n\nThe Edge's sourceUrn should never be set, as it will always be the base dashboard.", + "default" : [ ], + "Relationship" : { + "/*/destinationUrn" : { + "createdActor" : "datasetEdges/*/created/actor", + "createdOn" : "datasetEdges/*/created/time", + "entityTypes" : [ "dashboard" ], + "isLineage" : true, + "name" : "DashboardContainsDashboard", + "properties" : "datasetEdges/*/properties", + "updatedActor" : "datasetEdges/*/lastModified/actor", + "updatedOn" : "datasetEdges/*/lastModified/time" + } + } }, { "name" : "lastModified", "type" : "com.linkedin.common.ChangeAuditStamps",