diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 7f7826abe2095..2f89316752520 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -661,6 +661,7 @@ ], "datahub.ingestion.checkpointing_provider.plugins": [ "datahub = datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider:DatahubIngestionCheckpointingProvider", + "file = datahub.ingestion.source.state_provider.file_ingestion_checkpointing_provider:FileIngestionCheckpointingProvider", ], "datahub.ingestion.reporting_provider.plugins": [ "datahub = datahub.ingestion.reporting.datahub_ingestion_run_summary_provider:DatahubIngestionRunSummaryProvider", diff --git a/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_checkpointing_provider_base.py b/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_checkpointing_provider_base.py index ca02b88ab6324..285ad9c088447 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_checkpointing_provider_base.py +++ b/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_checkpointing_provider_base.py @@ -35,7 +35,7 @@ def __init__( @classmethod @abstractmethod def create( - cls: Type[_Self], config_dict: Dict[str, Any], ctx: PipelineContext, name: str + cls: Type[_Self], config_dict: Dict[str, Any], ctx: PipelineContext ) -> "_Self": pass diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index ccff677c3a471..d91165ac9777c 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -756,7 +756,7 @@ def get_latest_pipeline_checkpoint( DatahubIngestionCheckpointingProvider, ) - checkpoint_provider = DatahubIngestionCheckpointingProvider(self, "graph") + checkpoint_provider = DatahubIngestionCheckpointingProvider(self) job_name = StaleEntityRemovalHandler.compute_job_id(platform) raw_checkpoint = checkpoint_provider.get_latest_checkpoint( diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py index 7fb2cf9813cab..df702c1cc13f1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py @@ -1,6 +1,6 @@ import logging from dataclasses import dataclass -from typing import Any, Dict, Generic, Optional, Type, TypeVar, cast +from typing import Any, Dict, Generic, Optional, Type, TypeVar import pydantic from pydantic import root_validator @@ -38,10 +38,8 @@ class DynamicTypedStateProviderConfig(DynamicTypedConfig): type: str = Field( description="The type of the state provider to use. For DataHub use `datahub`", ) - # This config type is declared Optional[Any] here. The eventual parser for the - # specified type is responsible for further validation. - config: Optional[Any] = Field( - default=None, + config: Dict[str, Any] = Field( + default={}, description="The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19).", ) @@ -81,7 +79,7 @@ def validate_config(cls, values: Dict[str, Any]) -> Dict[str, Any]: if values.get("enabled"): if values.get("state_provider") is None: values["state_provider"] = DynamicTypedStateProviderConfig( - type="datahub", config=None + type="datahub", config={} ) return values @@ -246,15 +244,10 @@ def _initialize_checkpointing_state_provider(self) -> None: f"Cannot find checkpoint provider class of type={self.stateful_ingestion_config.state_provider.type} " " in the registry! Please check the type of the checkpointing provider in your config." ) - config_dict: Dict[str, Any] = cast( - Dict[str, Any], - self.stateful_ingestion_config.state_provider.dict().get("config", {}), - ) self.ingestion_checkpointing_state_provider = ( checkpointing_state_provider_class.create( - config_dict=config_dict, + config_dict=self.stateful_ingestion_config.state_provider.config, ctx=self.ctx, - name=checkpointing_state_provider_class.__name__, ) ) assert self.ingestion_checkpointing_state_provider diff --git a/metadata-ingestion/src/datahub/ingestion/source/state_provider/datahub_ingestion_checkpointing_provider.py b/metadata-ingestion/src/datahub/ingestion/source/state_provider/datahub_ingestion_checkpointing_provider.py index d7ebcba2c6695..442abb3aaf4cf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state_provider/datahub_ingestion_checkpointing_provider.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state_provider/datahub_ingestion_checkpointing_provider.py @@ -17,14 +17,17 @@ class DatahubIngestionStateProviderConfig(IngestionCheckpointingProviderConfig): - datahub_api: Optional[DatahubClientConfig] = DatahubClientConfig() + datahub_api: DatahubClientConfig = DatahubClientConfig() class DatahubIngestionCheckpointingProvider(IngestionCheckpointingProviderBase): orchestrator_name: str = "datahub" - def __init__(self, graph: DataHubGraph, name: str): - super().__init__(name) + def __init__( + self, + graph: DataHubGraph, + ): + super().__init__(self.__class__.__name__) self.graph = graph if not self._is_server_stateful_ingestion_capable(): raise ConfigurationError( @@ -34,24 +37,14 @@ def __init__(self, graph: DataHubGraph, name: str): @classmethod def create( - cls, config_dict: Dict[str, Any], ctx: PipelineContext, name: str + cls, config_dict: Dict[str, Any], ctx: PipelineContext ) -> "DatahubIngestionCheckpointingProvider": + config = DatahubIngestionStateProviderConfig.parse_obj(config_dict) if ctx.graph: # Use the pipeline-level graph if set - return cls(ctx.graph, name) - elif config_dict is None: - raise ConfigurationError("Missing provider configuration.") + return cls(ctx.graph) else: - provider_config = ( - DatahubIngestionStateProviderConfig.parse_obj_allow_extras(config_dict) - ) - if provider_config.datahub_api: - graph = DataHubGraph(provider_config.datahub_api) - return cls(graph, name) - else: - raise ConfigurationError( - "Missing datahub_api. Provide either a global one or under the state_provider." - ) + return cls(DataHubGraph(config.datahub_api)) def _is_server_stateful_ingestion_capable(self) -> bool: server_config = self.graph.get_config() if self.graph else None diff --git a/metadata-ingestion/src/datahub/ingestion/source/state_provider/file_ingestion_checkpointing_provider.py b/metadata-ingestion/src/datahub/ingestion/source/state_provider/file_ingestion_checkpointing_provider.py new file mode 100644 index 0000000000000..a37774773b84d --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/state_provider/file_ingestion_checkpointing_provider.py @@ -0,0 +1,108 @@ +import logging +import pathlib +from datetime import datetime +from typing import Any, Dict, List, Optional + +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import ( + IngestionCheckpointingProviderBase, + IngestionCheckpointingProviderConfig, + JobId, +) +from datahub.ingestion.sink.file import write_metadata_file +from datahub.ingestion.source.file import read_metadata_file +from datahub.metadata.schema_classes import DatahubIngestionCheckpointClass + +logger = logging.getLogger(__name__) + + +class FileIngestionStateProviderConfig(IngestionCheckpointingProviderConfig): + filename: str + + +class FileIngestionCheckpointingProvider(IngestionCheckpointingProviderBase): + orchestrator_name: str = "file" + + def __init__(self, config: FileIngestionStateProviderConfig): + super().__init__(self.__class__.__name__) + self.config = config + + @classmethod + def create( + cls, config_dict: Dict[str, Any], ctx: PipelineContext + ) -> "FileIngestionCheckpointingProvider": + config = FileIngestionStateProviderConfig.parse_obj(config_dict) + return cls(config) + + def get_latest_checkpoint( + self, + pipeline_name: str, + job_name: JobId, + ) -> Optional[DatahubIngestionCheckpointClass]: + logger.debug( + f"Querying for the latest ingestion checkpoint for pipelineName:'{pipeline_name}'," + f" job_name:'{job_name}'" + ) + + data_job_urn = self.get_data_job_urn( + self.orchestrator_name, pipeline_name, job_name + ) + latest_checkpoint: Optional[DatahubIngestionCheckpointClass] = None + try: + for obj in read_metadata_file(pathlib.Path(self.config.filename)): + if ( + isinstance(obj, MetadataChangeProposalWrapper) + and obj.entityUrn == data_job_urn + and obj.aspect + and isinstance(obj.aspect, DatahubIngestionCheckpointClass) + and obj.aspect.get("pipelineName", "") == pipeline_name + ): + latest_checkpoint = obj.aspect + break + except FileNotFoundError: + logger.debug(f"File {self.config.filename} not found") + + if latest_checkpoint: + logger.debug( + f"The last committed ingestion checkpoint for pipelineName:'{pipeline_name}'," + f" job_name:'{job_name}' found with start_time:" + f" {datetime.utcfromtimestamp(latest_checkpoint.timestampMillis/1000)}" + ) + return latest_checkpoint + else: + logger.debug( + f"No committed ingestion checkpoint for pipelineName:'{pipeline_name}'," + f" job_name:'{job_name}' found" + ) + + return None + + def commit(self) -> None: + if not self.state_to_commit: + logger.warning(f"No state available to commit for {self.name}") + return None + + checkpoint_workunits: List[MetadataChangeProposalWrapper] = [] + for job_name, checkpoint in self.state_to_commit.items(): + # Emit the ingestion state for each job + logger.debug( + f"Committing ingestion checkpoint for pipeline:'{checkpoint.pipelineName}', " + f"job:'{job_name}'" + ) + datajob_urn = self.get_data_job_urn( + self.orchestrator_name, + checkpoint.pipelineName, + job_name, + ) + checkpoint_workunits.append( + MetadataChangeProposalWrapper( + entityUrn=datajob_urn, + aspect=checkpoint, + ) + ) + write_metadata_file(pathlib.Path(self.config.filename), checkpoint_workunits) + self.committed = True + logger.debug( + f"Committed all ingestion checkpoints for pipeline:'{checkpoint.pipelineName}'" + ) diff --git a/metadata-ingestion/tests/integration/lookml/golden_test_state.json b/metadata-ingestion/tests/integration/lookml/golden_test_state.json new file mode 100644 index 0000000000000..c62106ac10089 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/golden_test_state.json @@ -0,0 +1,26 @@ +[ +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(file,lookml_stateful,prod),lookml_stale_entity_removal)", + "changeType": "UPSERT", + "aspectName": "datahubIngestionCheckpoint", + "aspect": { + "json": { + "timestampMillis": 1586847600000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "pipelineName": "lookml_stateful", + "platformInstanceId": "", + "config": "", + "state": { + "formatVersion": "1.0", + "serde": "base85-bz2-json", + "payload": "LRx4!F+o`-Q(4)<4JiNuUmt)_WdINa0@Mn>@BivB0a-v1sF;Ar&}h&A0K-EjK*+=xnKU%Oib;?JVrrXB7?aRqCarWwpZm8v5Yh+DsN{|c*msMh9%WJXjKPvIPsDn^@g3;DD9Q9kBh?*|=8M4uRW$_0HKn3XhN;RhAcLIBhLnO2%UA@Ykl;h&Xx(^@2;Y9C#d4g3K_2CA-I*M)h{NMA8Nu4C3XjEQYdh{nR--&lfRUsTL}OOkOO435f=1nKzYJ^9)mbBljM0}gaqy26URw1=q<80Eb9y)y?Vl88kG;g~MToq#r%6trK9U`U?k}RS<@^?i@1M1@9*%tk}1N3hRzUaNB" + }, + "runId": "lookml-test" + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/lookml_mces_golden_deleted_stateful.json b/metadata-ingestion/tests/integration/lookml/lookml_mces_golden_deleted_stateful.json deleted file mode 100644 index a323118666940..0000000000000 --- a/metadata-ingestion/tests/integration/lookml/lookml_mces_golden_deleted_stateful.json +++ /dev/null @@ -1,650 +0,0 @@ -[ -{ - "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view,PROD)", - "aspects": [ - { - "com.linkedin.pegasus2avro.common.BrowsePaths": { - "paths": [ - "/prod/looker/lkml_samples/views" - ] - } - }, - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } - }, - { - "com.linkedin.pegasus2avro.dataset.UpstreamLineage": { - "upstreams": [ - { - "auditStamp": { - "time": 1586847600000, - "actor": "urn:li:corpuser:datahub" - }, - "dataset": "urn:li:dataset:(urn:li:dataPlatform:conn,..my_table,PROD)", - "type": "VIEW" - } - ] - } - }, - { - "com.linkedin.pegasus2avro.schema.SchemaMetadata": { - "schemaName": "my_view", - "platform": "urn:li:dataPlatform:looker", - "version": 0, - "created": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "hash": "", - "platformSchema": { - "com.linkedin.pegasus2avro.schema.OtherSchema": { - "rawSchema": "" - } - }, - "fields": [ - { - "fieldPath": "country", - "nullable": false, - "description": "The country", - "label": "", - "type": { - "type": { - "com.linkedin.pegasus2avro.schema.StringType": {} - } - }, - "nativeDataType": "string", - "recursive": false, - "globalTags": { - "tags": [ - { - "tag": "urn:li:tag:Dimension" - } - ] - }, - "isPartOfKey": false - }, - { - "fieldPath": "city", - "nullable": false, - "description": "City", - "label": "", - "type": { - "type": { - "com.linkedin.pegasus2avro.schema.StringType": {} - } - }, - "nativeDataType": "string", - "recursive": false, - "globalTags": { - "tags": [ - { - "tag": "urn:li:tag:Dimension" - } - ] - }, - "isPartOfKey": false - }, - { - "fieldPath": "is_latest", - "nullable": false, - "description": "Is latest data", - "label": "", - "type": { - "type": { - "com.linkedin.pegasus2avro.schema.BooleanType": {} - } - }, - "nativeDataType": "yesno", - "recursive": false, - "globalTags": { - "tags": [ - { - "tag": "urn:li:tag:Dimension" - } - ] - }, - "isPartOfKey": false - }, - { - "fieldPath": "timestamp", - "nullable": false, - "description": "Timestamp of measurement", - "label": "", - "type": { - "type": { - "com.linkedin.pegasus2avro.schema.TimeType": {} - } - }, - "nativeDataType": "time", - "recursive": false, - "globalTags": { - "tags": [ - { - "tag": "urn:li:tag:Dimension" - }, - { - "tag": "urn:li:tag:Temporal" - } - ] - }, - "isPartOfKey": false - }, - { - "fieldPath": "average_measurement", - "nullable": false, - "description": "My measurement", - "label": "", - "type": { - "type": { - "com.linkedin.pegasus2avro.schema.NumberType": {} - } - }, - "nativeDataType": "average", - "recursive": false, - "globalTags": { - "tags": [ - { - "tag": "urn:li:tag:Measure" - } - ] - }, - "isPartOfKey": false - } - ], - "primaryKeys": [] - } - }, - { - "com.linkedin.pegasus2avro.dataset.DatasetProperties": { - "customProperties": { - "looker.file.path": "foo.view.lkml" - }, - "name": "my_view", - "tags": [] - } - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view,PROD)", - "changeType": "UPSERT", - "aspectName": "subTypes", - "aspect": { - "json": { - "typeNames": [ - "View" - ] - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view,PROD)", - "changeType": "UPSERT", - "aspectName": "viewProperties", - "aspect": { - "json": { - "materialized": false, - "viewLogic": "SELECT\n is_latest,\n country,\n city,\n timestamp,\n measurement\n FROM\n my_table", - "viewLanguage": "sql" - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view,PROD)", - "changeType": "UPSERT", - "aspectName": "browsePathsV2", - "aspect": { - "json": { - "path": [ - { - "id": "looker" - }, - { - "id": "lkml_samples" - }, - { - "id": "views" - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.owners,PROD)", - "aspects": [ - { - "com.linkedin.pegasus2avro.common.BrowsePaths": { - "paths": [ - "/prod/looker/lkml_samples/views" - ] - } - }, - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } - }, - { - "com.linkedin.pegasus2avro.dataset.UpstreamLineage": { - "upstreams": [ - { - "auditStamp": { - "time": 1586847600000, - "actor": "urn:li:corpuser:datahub" - }, - "dataset": "urn:li:dataset:(urn:li:dataPlatform:conn,..owners,PROD)", - "type": "VIEW" - } - ], - "fineGrainedLineages": [ - { - "upstreamType": "FIELD_SET", - "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:conn,..owners,PROD),id)" - ], - "downstreamType": "FIELD", - "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.owners,PROD),id)" - ], - "confidenceScore": 1.0 - }, - { - "upstreamType": "FIELD_SET", - "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:conn,..owners,PROD),owner_name)" - ], - "downstreamType": "FIELD", - "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.owners,PROD),owner_name)" - ], - "confidenceScore": 1.0 - } - ] - } - }, - { - "com.linkedin.pegasus2avro.schema.SchemaMetadata": { - "schemaName": "owners", - "platform": "urn:li:dataPlatform:looker", - "version": 0, - "created": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "hash": "", - "platformSchema": { - "com.linkedin.pegasus2avro.schema.OtherSchema": { - "rawSchema": "" - } - }, - "fields": [ - { - "fieldPath": "id", - "nullable": false, - "description": "", - "label": "", - "type": { - "type": { - "com.linkedin.pegasus2avro.schema.StringType": {} - } - }, - "nativeDataType": "string", - "recursive": false, - "globalTags": { - "tags": [ - { - "tag": "urn:li:tag:Dimension" - } - ] - }, - "isPartOfKey": true - }, - { - "fieldPath": "owner_name", - "nullable": false, - "description": "", - "label": "", - "type": { - "type": { - "com.linkedin.pegasus2avro.schema.StringType": {} - } - }, - "nativeDataType": "string", - "recursive": false, - "globalTags": { - "tags": [ - { - "tag": "urn:li:tag:Dimension" - } - ] - }, - "isPartOfKey": false - } - ], - "primaryKeys": [ - "id" - ] - } - }, - { - "com.linkedin.pegasus2avro.dataset.DatasetProperties": { - "customProperties": { - "looker.file.path": "owners.view.lkml" - }, - "name": "owners", - "tags": [] - } - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.owners,PROD)", - "changeType": "UPSERT", - "aspectName": "subTypes", - "aspect": { - "json": { - "typeNames": [ - "View" - ] - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.owners,PROD)", - "changeType": "UPSERT", - "aspectName": "viewProperties", - "aspect": { - "json": { - "materialized": false, - "viewLogic": "view: owners {\n dimension: id {\n primary_key: yes\n sql: ${TABLE}.id ;;\n }\n dimension: owner_name {\n sql: ${TABLE}.owner_name ;;\n }\n}", - "viewLanguage": "lookml" - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.owners,PROD)", - "changeType": "UPSERT", - "aspectName": "browsePathsV2", - "aspect": { - "json": { - "path": [ - { - "id": "looker" - }, - { - "id": "lkml_samples" - }, - { - "id": "views" - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "tag", - "entityUrn": "urn:li:tag:Dimension", - "changeType": "UPSERT", - "aspectName": "tagKey", - "aspect": { - "json": { - "name": "Dimension" - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "tag", - "entityUrn": "urn:li:tag:Measure", - "changeType": "UPSERT", - "aspectName": "tagKey", - "aspect": { - "json": { - "name": "Measure" - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "tag", - "entityUrn": "urn:li:tag:Temporal", - "changeType": "UPSERT", - "aspectName": "tagKey", - "aspect": { - "json": { - "name": "Temporal" - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.flights,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": true - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.include_able_view,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": true - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.customer_facts,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": true - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.ability,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": true - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.looker_events,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": true - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.extending_looker_events,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": true - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.autodetect_sql_name_based_on_view_name,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": true - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_derived_view,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": true - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.fragment_derived_view,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": true - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.view_derived_explore,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": true - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.test_include_external_view,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": true - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "lookml-test" - } -} -] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/test_lookml.py b/metadata-ingestion/tests/integration/lookml/test_lookml.py index 21a0b19849d97..b1853cfa2b3c0 100644 --- a/metadata-ingestion/tests/integration/lookml/test_lookml.py +++ b/metadata-ingestion/tests/integration/lookml/test_lookml.py @@ -1,6 +1,6 @@ import logging import pathlib -from typing import Any, Dict, List, cast +from typing import Any, List from unittest import mock import pydantic @@ -17,17 +17,13 @@ LookerRefinementResolver, LookMLSourceConfig, ) -from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState from datahub.metadata.schema_classes import ( DatasetSnapshotClass, MetadataChangeEventClass, UpstreamLineageClass, ) from tests.test_helpers import mce_helpers -from tests.test_helpers.state_helpers import ( - get_current_checkpoint_from_pipeline, - validate_all_providers_have_committed_successfully, -) +from tests.test_helpers.state_helpers import get_current_checkpoint_from_pipeline logging.getLogger("lkml").setLevel(logging.INFO) @@ -728,11 +724,10 @@ def test_hive_platform_drops_ids(pytestconfig, tmp_path, mock_time): @freeze_time(FROZEN_TIME) -def test_lookml_ingest_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph): +def test_lookml_stateful_ingestion(pytestconfig, tmp_path, mock_time): output_file_name: str = "lookml_mces.json" - golden_file_name: str = "expected_output.json" - output_file_deleted_name: str = "lookml_mces_deleted_stateful.json" - golden_file_deleted_name: str = "lookml_mces_golden_deleted_stateful.json" + state_file_name: str = "lookml_state_mces.json" + golden_file_name: str = "golden_test_state.json" test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" @@ -754,106 +749,37 @@ def test_lookml_ingest_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_ "remove_stale_metadata": True, "fail_safe_threshold": 100.0, "state_provider": { - "type": "datahub", - "config": {"datahub_api": {"server": GMS_SERVER}}, + "type": "file", + "config": { + "filename": f"{tmp_path}/{state_file_name}", + }, }, }, }, }, "sink": { "type": "file", - "config": {}, + "config": { + "filename": f"{tmp_path}/{output_file_name}", + }, }, } - pipeline_run1 = None - with mock.patch( - "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", - mock_datahub_graph, - ) as mock_checkpoint: - mock_checkpoint.return_value = mock_datahub_graph - pipeline_run1_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( # type: ignore - base_pipeline_config # type: ignore - ) - # Set the special properties for this run - pipeline_run1_config["source"]["config"]["emit_reachable_views_only"] = False - pipeline_run1_config["sink"]["config"][ - "filename" - ] = f"{tmp_path}/{output_file_name}" - pipeline_run1 = Pipeline.create(pipeline_run1_config) - pipeline_run1.run() - pipeline_run1.raise_from_status() - pipeline_run1.pretty_print_summary() + pipeline_run1 = Pipeline.create(base_pipeline_config) + pipeline_run1.run() + pipeline_run1.raise_from_status() + pipeline_run1.pretty_print_summary() - mce_helpers.check_golden_file( - pytestconfig, - output_path=tmp_path / output_file_name, - golden_path=f"{test_resources_dir}/{golden_file_name}", - ) + mce_helpers.check_golden_file( + pytestconfig, + output_path=f"{tmp_path}/{state_file_name}", + golden_path=f"{test_resources_dir}/{golden_file_name}", + ) checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1) assert checkpoint1 assert checkpoint1.state - pipeline_run2 = None - with mock.patch( - "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", - mock_datahub_graph, - ) as mock_checkpoint: - mock_checkpoint.return_value = mock_datahub_graph - pipeline_run2_config: Dict[str, Dict[str, Dict[str, Any]]] = dict(base_pipeline_config) # type: ignore - # Set the special properties for this run - pipeline_run2_config["source"]["config"]["emit_reachable_views_only"] = True - pipeline_run2_config["sink"]["config"][ - "filename" - ] = f"{tmp_path}/{output_file_deleted_name}" - pipeline_run2 = Pipeline.create(pipeline_run2_config) - pipeline_run2.run() - pipeline_run2.raise_from_status() - pipeline_run2.pretty_print_summary() - - mce_helpers.check_golden_file( - pytestconfig, - output_path=tmp_path / output_file_deleted_name, - golden_path=f"{test_resources_dir}/{golden_file_deleted_name}", - ) - checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2) - assert checkpoint2 - assert checkpoint2.state - - # Validate that all providers have committed successfully. - validate_all_providers_have_committed_successfully( - pipeline=pipeline_run1, expected_providers=1 - ) - validate_all_providers_have_committed_successfully( - pipeline=pipeline_run2, expected_providers=1 - ) - - # Perform all assertions on the states. The deleted table should not be - # part of the second state - state1 = cast(GenericCheckpointState, checkpoint1.state) - state2 = cast(GenericCheckpointState, checkpoint2.state) - - difference_dataset_urns = list( - state1.get_urns_not_in(type="dataset", other_checkpoint_state=state2) - ) - # the difference in dataset urns are all the views that are not reachable from the model file - assert len(difference_dataset_urns) == 11 - deleted_dataset_urns: List[str] = [ - "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.fragment_derived_view,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_derived_view,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.test_include_external_view,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.extending_looker_events,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.customer_facts,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.include_able_view,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.autodetect_sql_name_based_on_view_name,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.ability,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.looker_events,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.view_derived_explore,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.flights,PROD)", - ] - assert sorted(deleted_dataset_urns) == sorted(difference_dataset_urns) - def test_lookml_base_folder(): fake_api = { diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_datahub_ingestion_checkpointing_provider.py b/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_datahub_ingestion_checkpointing_provider.py deleted file mode 100644 index 600985266043b..0000000000000 --- a/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_datahub_ingestion_checkpointing_provider.py +++ /dev/null @@ -1,170 +0,0 @@ -import types -import unittest -from typing import Dict, List, Optional, Type -from unittest.mock import MagicMock, patch - -from avrogen.dict_wrapper import DictWrapper - -from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.ingestion.api.common import PipelineContext -from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import ( - CheckpointJobStateType, - JobId, -) -from datahub.ingestion.source.state.checkpoint import Checkpoint -from datahub.ingestion.source.state.sql_common_state import ( - BaseSQLAlchemyCheckpointState, -) -from datahub.ingestion.source.state.usage_common_state import ( - BaseTimeWindowCheckpointState, -) -from datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider import ( - DatahubIngestionCheckpointingProvider, -) -from tests.test_helpers.type_helpers import assert_not_null - - -class TestDatahubIngestionCheckpointProvider(unittest.TestCase): - # Static members for the tests - pipeline_name: str = "test_pipeline" - job_names: List[JobId] = [JobId("job1"), JobId("job2")] - run_id: str = "test_run" - - def setUp(self) -> None: - self._setup_mock_graph() - self.provider = self._create_provider() - assert self.provider - - def _setup_mock_graph(self) -> None: - """ - Setup monkey-patched graph client. - """ - self.patcher = patch( - "datahub.ingestion.graph.client.DataHubGraph", autospec=True - ) - self.addCleanup(self.patcher.stop) - self.mock_graph = self.patcher.start() - # Make server stateful ingestion capable - self.mock_graph.get_config.return_value = {"statefulIngestionCapable": True} - # Bind mock_graph's emit_mcp to testcase's monkey_patch_emit_mcp so that we can emulate emits. - self.mock_graph.emit_mcp = types.MethodType( - self.monkey_patch_emit_mcp, self.mock_graph - ) - # Bind mock_graph's get_latest_timeseries_value to monkey_patch_get_latest_timeseries_value - self.mock_graph.get_latest_timeseries_value = types.MethodType( - self.monkey_patch_get_latest_timeseries_value, self.mock_graph - ) - # Tracking for emitted mcps. - self.mcps_emitted: Dict[str, MetadataChangeProposalWrapper] = {} - - def _create_provider(self) -> DatahubIngestionCheckpointingProvider: - ctx: PipelineContext = PipelineContext( - run_id=self.run_id, pipeline_name=self.pipeline_name - ) - ctx.graph = self.mock_graph - return DatahubIngestionCheckpointingProvider.create( - {}, ctx, name=DatahubIngestionCheckpointingProvider.__name__ - ) - - def monkey_patch_emit_mcp( - self, graph_ref: MagicMock, mcpw: MetadataChangeProposalWrapper - ) -> None: - """ - Mockey patched implementation of DatahubGraph.emit_mcp that caches the mcp locally in memory. - """ - self.assertIsNotNone(graph_ref) - if mcpw.aspectName != "status": - self.assertEqual(mcpw.entityType, "dataJob") - self.assertEqual(mcpw.aspectName, "datahubIngestionCheckpoint") - # Cache the mcpw against the entityUrn - assert mcpw.entityUrn is not None - self.mcps_emitted[mcpw.entityUrn] = mcpw - - def monkey_patch_get_latest_timeseries_value( - self, - graph_ref: MagicMock, - entity_urn: str, - aspect_type: Type[DictWrapper], - filter_criteria_map: Dict[str, str], - ) -> Optional[DictWrapper]: - """ - Monkey patched implementation of DatahubGraph.get_latest_timeseries_value that returns the latest cached aspect - for a given entity urn. - """ - self.assertIsNotNone(graph_ref) - self.assertEqual(aspect_type, CheckpointJobStateType) - self.assertEqual( - filter_criteria_map, - { - "pipelineName": self.pipeline_name, - }, - ) - # Retrieve the cached mcpw and return its aspect value. - mcpw = self.mcps_emitted.get(entity_urn) - if mcpw: - return mcpw.aspect - return None - - def test_provider(self): - # 1. Create the individual job checkpoints with appropriate states. - # Job1 - Checkpoint with a BaseSQLAlchemyCheckpointState state - job1_state_obj = BaseSQLAlchemyCheckpointState() - job1_checkpoint = Checkpoint( - job_name=self.job_names[0], - pipeline_name=self.pipeline_name, - run_id=self.run_id, - state=job1_state_obj, - ) - # Job2 - Checkpoint with a BaseTimeWindowCheckpointState state - job2_state_obj = BaseTimeWindowCheckpointState( - begin_timestamp_millis=10, end_timestamp_millis=100 - ) - job2_checkpoint = Checkpoint( - job_name=self.job_names[1], - pipeline_name=self.pipeline_name, - run_id=self.run_id, - state=job2_state_obj, - ) - - # 2. Set the provider's state_to_commit. - self.provider.state_to_commit = { - # NOTE: state_to_commit accepts only the aspect version of the checkpoint. - self.job_names[0]: assert_not_null( - job1_checkpoint.to_checkpoint_aspect(max_allowed_state_size=2**20) - ), - self.job_names[1]: assert_not_null( - job2_checkpoint.to_checkpoint_aspect(max_allowed_state_size=2**20) - ), - } - - # 3. Perform the commit - # NOTE: This will commit the state to the in-memory self.mcps_emitted because of the monkey-patching. - self.provider.commit() - self.assertTrue(self.provider.committed) - - # 4. Get last committed state. This must match what has been committed earlier. - # NOTE: This will retrieve from in-memory self.mcps_emitted because of the monkey-patching. - job1_last_state = self.provider.get_latest_checkpoint( - self.pipeline_name, self.job_names[0] - ) - job2_last_state = self.provider.get_latest_checkpoint( - self.pipeline_name, self.job_names[1] - ) - - # 5. Validate individual job checkpoint state values that have been committed and retrieved - # against the original values. - self.assertIsNotNone(job1_last_state) - job1_last_checkpoint = Checkpoint.create_from_checkpoint_aspect( - job_name=self.job_names[0], - checkpoint_aspect=job1_last_state, - state_class=type(job1_state_obj), - ) - self.assertEqual(job1_last_checkpoint, job1_checkpoint) - - self.assertIsNotNone(job2_last_state) - job2_last_checkpoint = Checkpoint.create_from_checkpoint_aspect( - job_name=self.job_names[1], - checkpoint_aspect=job2_last_state, - state_class=type(job2_state_obj), - ) - self.assertEqual(job2_last_checkpoint, job2_checkpoint) diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_provider.py b/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_provider.py new file mode 100644 index 0000000000000..4387e5a17790f --- /dev/null +++ b/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_provider.py @@ -0,0 +1,183 @@ +import tempfile +import types +import unittest +from typing import Dict, List, Optional, Type +from unittest.mock import MagicMock, patch + +from avrogen.dict_wrapper import DictWrapper + +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import ( + CheckpointJobStateType, + IngestionCheckpointingProviderBase, + JobId, +) +from datahub.ingestion.source.state.checkpoint import Checkpoint +from datahub.ingestion.source.state.sql_common_state import ( + BaseSQLAlchemyCheckpointState, +) +from datahub.ingestion.source.state.usage_common_state import ( + BaseTimeWindowCheckpointState, +) +from datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider import ( + DatahubIngestionCheckpointingProvider, +) +from datahub.ingestion.source.state_provider.file_ingestion_checkpointing_provider import ( + FileIngestionCheckpointingProvider, +) +from tests.test_helpers.type_helpers import assert_not_null + + +class TestIngestionCheckpointProviders(unittest.TestCase): + # Static members for the tests + pipeline_name: str = "test_pipeline" + job_names: List[JobId] = [JobId("job1"), JobId("job2")] + run_id: str = "test_run" + + def setUp(self) -> None: + self._setup_mock_graph() + self._create_providers() + + def _setup_mock_graph(self) -> None: + """ + Setup monkey-patched graph client. + """ + self.patcher = patch( + "datahub.ingestion.graph.client.DataHubGraph", autospec=True + ) + self.addCleanup(self.patcher.stop) + self.mock_graph = self.patcher.start() + # Make server stateful ingestion capable + self.mock_graph.get_config.return_value = {"statefulIngestionCapable": True} + # Bind mock_graph's emit_mcp to testcase's monkey_patch_emit_mcp so that we can emulate emits. + self.mock_graph.emit_mcp = types.MethodType( + self.monkey_patch_emit_mcp, self.mock_graph + ) + # Bind mock_graph's get_latest_timeseries_value to monkey_patch_get_latest_timeseries_value + self.mock_graph.get_latest_timeseries_value = types.MethodType( + self.monkey_patch_get_latest_timeseries_value, self.mock_graph + ) + # Tracking for emitted mcps. + self.mcps_emitted: Dict[str, MetadataChangeProposalWrapper] = {} + + def _create_providers(self) -> None: + ctx: PipelineContext = PipelineContext( + run_id=self.run_id, pipeline_name=self.pipeline_name + ) + ctx.graph = self.mock_graph + self.providers: List[IngestionCheckpointingProviderBase] = [ + DatahubIngestionCheckpointingProvider.create({}, ctx), + FileIngestionCheckpointingProvider.create( + {"filename": f"{tempfile.mkdtemp()}/checkpoint_mces.json"}, + ctx, + ), + ] + + def monkey_patch_emit_mcp( + self, graph_ref: MagicMock, mcpw: MetadataChangeProposalWrapper + ) -> None: + """ + Mockey patched implementation of DatahubGraph.emit_mcp that caches the mcp locally in memory. + """ + self.assertIsNotNone(graph_ref) + if mcpw.aspectName != "status": + self.assertEqual(mcpw.entityType, "dataJob") + self.assertEqual(mcpw.aspectName, "datahubIngestionCheckpoint") + # Cache the mcpw against the entityUrn + assert mcpw.entityUrn is not None + self.mcps_emitted[mcpw.entityUrn] = mcpw + + def monkey_patch_get_latest_timeseries_value( + self, + graph_ref: MagicMock, + entity_urn: str, + aspect_type: Type[DictWrapper], + filter_criteria_map: Dict[str, str], + ) -> Optional[DictWrapper]: + """ + Monkey patched implementation of DatahubGraph.get_latest_timeseries_value that returns the latest cached aspect + for a given entity urn. + """ + self.assertIsNotNone(graph_ref) + self.assertEqual(aspect_type, CheckpointJobStateType) + self.assertEqual( + filter_criteria_map, + { + "pipelineName": self.pipeline_name, + }, + ) + # Retrieve the cached mcpw and return its aspect value. + mcpw = self.mcps_emitted.get(entity_urn) + if mcpw: + return mcpw.aspect + return None + + def test_providers(self): + self.assertEqual(len(self.providers), 2) + for provider in self.providers: + assert provider + # 1. Create the individual job checkpoints with appropriate states. + # Job1 - Checkpoint with a BaseSQLAlchemyCheckpointState state + job1_state_obj = BaseSQLAlchemyCheckpointState() + job1_checkpoint = Checkpoint( + job_name=self.job_names[0], + pipeline_name=self.pipeline_name, + run_id=self.run_id, + state=job1_state_obj, + ) + # Job2 - Checkpoint with a BaseTimeWindowCheckpointState state + job2_state_obj = BaseTimeWindowCheckpointState( + begin_timestamp_millis=10, end_timestamp_millis=100 + ) + job2_checkpoint = Checkpoint( + job_name=self.job_names[1], + pipeline_name=self.pipeline_name, + run_id=self.run_id, + state=job2_state_obj, + ) + + # 2. Set the provider's state_to_commit. + provider.state_to_commit = { + # NOTE: state_to_commit accepts only the aspect version of the checkpoint. + self.job_names[0]: assert_not_null( + job1_checkpoint.to_checkpoint_aspect(max_allowed_state_size=2**20) + ), + self.job_names[1]: assert_not_null( + job2_checkpoint.to_checkpoint_aspect(max_allowed_state_size=2**20) + ), + } + + # 3. Perform the commit + # NOTE: This will commit the state to + # In-memory self.mcps_emitted because of the monkey-patching for datahub ingestion checkpointer provider. + # And to temp directory json file for file ingestion checkpointer provider. + provider.commit() + self.assertTrue(provider.committed) + + # 4. Get last committed state. This must match what has been committed earlier. + # NOTE: This will retrieve the state form where it is committed. + job1_last_state = provider.get_latest_checkpoint( + self.pipeline_name, self.job_names[0] + ) + job2_last_state = provider.get_latest_checkpoint( + self.pipeline_name, self.job_names[1] + ) + + # 5. Validate individual job checkpoint state values that have been committed and retrieved + # against the original values. + self.assertIsNotNone(job1_last_state) + job1_last_checkpoint = Checkpoint.create_from_checkpoint_aspect( + job_name=self.job_names[0], + checkpoint_aspect=job1_last_state, + state_class=type(job1_state_obj), + ) + self.assertEqual(job1_last_checkpoint, job1_checkpoint) + + self.assertIsNotNone(job2_last_state) + job2_last_checkpoint = Checkpoint.create_from_checkpoint_aspect( + job_name=self.job_names[1], + checkpoint_aspect=job2_last_state, + state_class=type(job2_state_obj), + ) + self.assertEqual(job2_last_checkpoint, job2_checkpoint) diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json new file mode 100644 index 0000000000000..4e62492918bfb --- /dev/null +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json @@ -0,0 +1,26 @@ +[ +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(file,dummy_stateful,prod),default_stale_entity_removal)", + "changeType": "UPSERT", + "aspectName": "datahubIngestionCheckpoint", + "aspect": { + "json": { + "timestampMillis": 1586847600000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "pipelineName": "dummy_stateful", + "platformInstanceId": "", + "config": "", + "state": { + "formatVersion": "1.0", + "serde": "base85-bz2-json", + "payload": "LRx4!F+o`-Q(1w>5G4QrYoCBnWH=B60MH7jr`{?c0BA?5L)2-AGyu>6y;V<9hz%Mv0Bt1*)lOMzr>a0|Iq-4VtTsYONQsFPLn1EpdQS;HIy|&CvSAlRvAJwmtCEM+Rx(v_)~sVvkx3V@WX4O`=losC6yZWb2OL0@" + }, + "runId": "dummy-test-stateful-ingestion" + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json new file mode 100644 index 0000000000000..6ecd43483d948 --- /dev/null +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json @@ -0,0 +1,26 @@ +[ +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(file,dummy_stateful,prod),default_stale_entity_removal)", + "changeType": "UPSERT", + "aspectName": "datahubIngestionCheckpoint", + "aspect": { + "json": { + "timestampMillis": 1586847600000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "pipelineName": "dummy_stateful", + "platformInstanceId": "", + "config": "", + "state": { + "formatVersion": "1.0", + "serde": "base85-bz2-json", + "payload": "LRx4!F+o`-Q(317h`0a%NgsevWH1l}0MH7jr`{?c0B9vdZ9%mLfYG4P6;f$2G%+v`9z&~6n|e(JEPC2_Iix~CA_im)jR-zsjEK*yo|HQz#IUUHtf@DYVEme-lUW9{Xmmt~y^2jCdyY95az!{$kf#WUxB" + }, + "runId": "dummy-test-stateful-ingestion" + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json new file mode 100644 index 0000000000000..4a77651c93066 --- /dev/null +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json @@ -0,0 +1,50 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json new file mode 100644 index 0000000000000..9d6f755374462 --- /dev/null +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json @@ -0,0 +1,50 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": true + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py new file mode 100644 index 0000000000000..2b811d5e5e3a3 --- /dev/null +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py @@ -0,0 +1,227 @@ +from dataclasses import dataclass, field as dataclass_field +from typing import Any, Dict, Iterable, List, Optional, cast + +import pydantic +from freezegun import freeze_time +from pydantic import Field + +from datahub.configuration.common import AllowDenyPattern +from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.run.pipeline import Pipeline +from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState +from datahub.ingestion.source.state.stale_entity_removal_handler import ( + StaleEntityRemovalHandler, + StaleEntityRemovalSourceReport, + StatefulStaleMetadataRemovalConfig, +) +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulIngestionConfigBase, + StatefulIngestionSourceBase, +) +from datahub.metadata.schema_classes import StatusClass +from datahub.utilities.urns.dataset_urn import DatasetUrn +from tests.test_helpers import mce_helpers +from tests.test_helpers.state_helpers import ( + get_current_checkpoint_from_pipeline, + validate_all_providers_have_committed_successfully, +) + +FROZEN_TIME = "2020-04-14 07:00:00" + +dummy_datasets: List = ["dummy_dataset1", "dummy_dataset2", "dummy_dataset3"] + + +@dataclass +class DummySourceReport(StaleEntityRemovalSourceReport): + datasets_scanned: int = 0 + filtered_datasets: List[str] = dataclass_field(default_factory=list) + + def report_datasets_scanned(self, count: int = 1) -> None: + self.datasets_scanned += count + + def report_datasets_dropped(self, model: str) -> None: + self.filtered_datasets.append(model) + + +class DummySourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): + dataset_patterns: AllowDenyPattern = Field( + default=AllowDenyPattern.allow_all(), + description="Regex patterns for datasets to filter in ingestion.", + ) + # Configuration for stateful ingestion + stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field( + default=None, description="Dummy source Ingestion Config." + ) + + +class DummySource(StatefulIngestionSourceBase): + """ + This is dummy source which only extract dummy datasets + """ + + source_config: DummySourceConfig + reporter: DummySourceReport + + def __init__(self, config: DummySourceConfig, ctx: PipelineContext): + super(DummySource, self).__init__(config, ctx) + self.source_config = config + self.reporter = DummySourceReport() + # Create and register the stateful ingestion use-case handler. + self.stale_entity_removal_handler = StaleEntityRemovalHandler.create( + self, self.source_config, self.ctx + ) + + @classmethod + def create(cls, config_dict, ctx): + config = DummySourceConfig.parse_obj(config_dict) + return cls(config, ctx) + + def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: + return [ + *super().get_workunit_processors(), + self.stale_entity_removal_handler.workunit_processor, + ] + + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + for dataset in dummy_datasets: + if not self.source_config.dataset_patterns.allowed(dataset): + self.reporter.report_datasets_dropped(dataset) + continue + else: + self.reporter.report_datasets_scanned() + dataset_urn = DatasetUrn.create_from_ids( + platform_id="postgres", + table_name=dataset, + env=DEFAULT_ENV, + ) + yield MetadataChangeProposalWrapper( + entityUrn=str(dataset_urn), + aspect=StatusClass(removed=False), + ).as_workunit() + + def get_report(self) -> SourceReport: + return self.reporter + + +@freeze_time(FROZEN_TIME) +def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): + # test stateful ingestion using dummy source + state_file_name: str = "checkpoint_state_mces.json" + golden_state_file_name: str = "golden_test_checkpoint_state.json" + golden_state_file_name_after_deleted: str = ( + "golden_test_checkpoint_state_after_deleted.json" + ) + output_file_name: str = "dummy_mces.json" + golden_file_name: str = "golden_test_stateful_ingestion.json" + output_file_name_after_deleted: str = "dummy_mces_stateful_after_deleted.json" + golden_file_name_after_deleted: str = ( + "golden_test_stateful_ingestion_after_deleted.json" + ) + + test_resources_dir = pytestconfig.rootpath / "tests/unit/stateful_ingestion/state" + + base_pipeline_config = { + "run_id": "dummy-test-stateful-ingestion", + "pipeline_name": "dummy_stateful", + "source": { + "type": "tests.unit.stateful_ingestion.state.test_stateful_ingestion.DummySource", + "config": { + "stateful_ingestion": { + "enabled": True, + "remove_stale_metadata": True, + "state_provider": { + "type": "file", + "config": { + "filename": f"{tmp_path}/{state_file_name}", + }, + }, + }, + }, + }, + "sink": { + "type": "file", + "config": {}, + }, + } + + pipeline_run1 = None + pipeline_run1_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( # type: ignore + base_pipeline_config # type: ignore + ) + pipeline_run1_config["sink"]["config"][ + "filename" + ] = f"{tmp_path}/{output_file_name}" + pipeline_run1 = Pipeline.create(pipeline_run1_config) + pipeline_run1.run() + pipeline_run1.raise_from_status() + pipeline_run1.pretty_print_summary() + + # validate both dummy source mces and checkpoint state mces files + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / output_file_name, + golden_path=f"{test_resources_dir}/{golden_file_name}", + ) + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / state_file_name, + golden_path=f"{test_resources_dir}/{golden_state_file_name}", + ) + checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1) + assert checkpoint1 + assert checkpoint1.state + + pipeline_run2 = None + pipeline_run2_config: Dict[str, Dict[str, Dict[str, Any]]] = dict(base_pipeline_config) # type: ignore + pipeline_run2_config["source"]["config"]["dataset_patterns"] = { + "allow": ["dummy_dataset1", "dummy_dataset2"], + } + pipeline_run2_config["sink"]["config"][ + "filename" + ] = f"{tmp_path}/{output_file_name_after_deleted}" + pipeline_run2 = Pipeline.create(pipeline_run2_config) + pipeline_run2.run() + pipeline_run2.raise_from_status() + pipeline_run2.pretty_print_summary() + + # validate both updated dummy source mces and checkpoint state mces files after deleting dataset + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / output_file_name_after_deleted, + golden_path=f"{test_resources_dir}/{golden_file_name_after_deleted}", + ) + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / state_file_name, + golden_path=f"{test_resources_dir}/{golden_state_file_name_after_deleted}", + ) + checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2) + assert checkpoint2 + assert checkpoint2.state + + # Validate that all providers have committed successfully. + validate_all_providers_have_committed_successfully( + pipeline=pipeline_run1, expected_providers=1 + ) + validate_all_providers_have_committed_successfully( + pipeline=pipeline_run2, expected_providers=1 + ) + + # Perform all assertions on the states. The deleted table should not be + # part of the second state + state1 = cast(GenericCheckpointState, checkpoint1.state) + state2 = cast(GenericCheckpointState, checkpoint2.state) + + difference_dataset_urns = list( + state1.get_urns_not_in(type="dataset", other_checkpoint_state=state2) + ) + # the difference in dataset urns is the dataset which is not allowed to ingest + assert len(difference_dataset_urns) == 1 + deleted_dataset_urns: List[str] = [ + "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)", + ] + assert sorted(deleted_dataset_urns) == sorted(difference_dataset_urns) diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/test_configs.py b/metadata-ingestion/tests/unit/stateful_ingestion/test_configs.py index 9edfe8c4a957b..0e6d60e3440b2 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/test_configs.py +++ b/metadata-ingestion/tests/unit/stateful_ingestion/test_configs.py @@ -3,9 +3,10 @@ import pytest from pydantic import ValidationError -from datahub.configuration.common import ConfigModel, DynamicTypedConfig +from datahub.configuration.common import ConfigModel from datahub.ingestion.graph.client import DatahubClientConfig from datahub.ingestion.source.state.stateful_ingestion_base import ( + DynamicTypedStateProviderConfig, StatefulIngestionConfig, ) from datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider import ( @@ -23,7 +24,6 @@ }, "simple": {}, "default": {}, - "none": None, } @@ -81,13 +81,6 @@ ), False, ), - # None - "checkpointing_bad_config": ( - DatahubIngestionStateProviderConfig, - datahub_client_configs["none"], - None, - True, - ), } @@ -119,7 +112,7 @@ max_checkpoint_state_size=1024, ignore_old_state=True, ignore_new_state=True, - state_provider=DynamicTypedConfig( + state_provider=DynamicTypedStateProviderConfig( type="datahub", config=datahub_client_configs["full"], ), @@ -148,7 +141,7 @@ max_checkpoint_state_size=2**24, ignore_old_state=False, ignore_new_state=False, - state_provider=DynamicTypedConfig(type="datahub", config=None), + state_provider=DynamicTypedStateProviderConfig(type="datahub"), ), False, ),