Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): file-based state checkpoint provider #9029

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ def get_long_description():
],
"datahub.ingestion.checkpointing_provider.plugins": [
"datahub = datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider:DatahubIngestionCheckpointingProvider",
"file = datahub.ingestion.source.state_provider.file_ingestion_checkpointing_provider:FileIngestionCheckpointingProvider",
],
"datahub.ingestion.reporting_provider.plugins": [
"datahub = datahub.ingestion.reporting.datahub_ingestion_run_summary_provider:DatahubIngestionRunSummaryProvider",
Expand Down
13 changes: 8 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,11 @@ def read_metadata_file(
]
]:
# This simplified version of the FileSource can be used for testing purposes.
with file.open("r") as f:
for obj in json.load(f):
item = _from_obj_for_file(obj)
if item:
yield item
try:
with file.open("r") as f:
for obj in json.load(f):
item = _from_obj_for_file(obj)
if item:
yield item
except FileNotFoundError:
shubhamjagtap639 marked this conversation as resolved.
Show resolved Hide resolved
return []
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import logging
import pathlib
from datetime import datetime
from typing import Any, Dict, List, Optional, cast

from datahub.configuration.common import ConfigurationError
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import (
IngestionCheckpointingProviderBase,
IngestionCheckpointingProviderConfig,
JobId,
)
from datahub.ingestion.sink.file import write_metadata_file
from datahub.ingestion.source.file import read_metadata_file
from datahub.metadata.schema_classes import DatahubIngestionCheckpointClass

logger = logging.getLogger(__name__)


class FileIngestionStateProviderConfig(IngestionCheckpointingProviderConfig):
filename: Optional[str] = None


class FileIngestionCheckpointingProvider(IngestionCheckpointingProviderBase):
orchestrator_name: str = "file"

def __init__(self, filename: str, name: str):
super().__init__(name)
shubhamjagtap639 marked this conversation as resolved.
Show resolved Hide resolved
self.filename = filename
self.committed = False

@classmethod
def create(
cls, config_dict: Dict[str, Any], ctx: PipelineContext, name: str
) -> "FileIngestionCheckpointingProvider":
if config_dict is None:
raise ConfigurationError("Missing provider configuration.")
else:
provider_config = FileIngestionStateProviderConfig.parse_obj_allow_extras(
config_dict
)
if provider_config.filename:
return cls(provider_config.filename, name)
else:
raise ConfigurationError(
shubhamjagtap639 marked this conversation as resolved.
Show resolved Hide resolved
"Missing filename. Provide filename under the state_provider configuration."
)

def get_latest_checkpoint(
self,
pipeline_name: str,
job_name: JobId,
) -> Optional[DatahubIngestionCheckpointClass]:
logger.debug(
f"Querying for the latest ingestion checkpoint for pipelineName:'{pipeline_name}',"
f" job_name:'{job_name}'"
)

data_job_urn = self.get_data_job_urn(
self.orchestrator_name, pipeline_name, job_name
)
latest_checkpoint: Optional[DatahubIngestionCheckpointClass] = None
for obj in read_metadata_file(pathlib.Path(self.filename)):
if isinstance(obj, MetadataChangeProposalWrapper) and obj.aspect:
if (
obj.entityUrn == data_job_urn
and isinstance(obj.aspect, DatahubIngestionCheckpointClass)
and obj.aspect.get("pipelineName", "") == pipeline_name
):
latest_checkpoint = cast(
Optional[DatahubIngestionCheckpointClass], obj.aspect
shubhamjagtap639 marked this conversation as resolved.
Show resolved Hide resolved
)
break

if latest_checkpoint:
logger.debug(
f"The last committed ingestion checkpoint for pipelineName:'{pipeline_name}',"
f" job_name:'{job_name}' found with start_time:"
f" {datetime.utcfromtimestamp(latest_checkpoint.timestampMillis/1000)}"
)
return latest_checkpoint
else:
logger.debug(
f"No committed ingestion checkpoint for pipelineName:'{pipeline_name}',"
f" job_name:'{job_name}' found"
)

return None

def commit(self) -> None:
if not self.state_to_commit:
logger.warning(f"No state available to commit for {self.name}")
return None

checkpoint_workunits: List[MetadataChangeProposalWrapper] = []
for job_name, checkpoint in self.state_to_commit.items():
# Emit the ingestion state for each job
logger.debug(
f"Committing ingestion checkpoint for pipeline:'{checkpoint.pipelineName}', "
f"job:'{job_name}'"
)
datajob_urn = self.get_data_job_urn(
self.orchestrator_name,
checkpoint.pipelineName,
job_name,
)
checkpoint_workunits.append(
MetadataChangeProposalWrapper(
entityUrn=datajob_urn,
aspect=checkpoint,
)
)
write_metadata_file(pathlib.Path(self.filename), checkpoint_workunits)
self.committed = True
logger.debug(
f"Committed all ingestion checkpoints for pipeline:'{checkpoint.pipelineName}'"
)
82 changes: 37 additions & 45 deletions metadata-ingestion/tests/integration/lookml/test_lookml.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ def test_hive_platform_drops_ids(pytestconfig, tmp_path, mock_time):


@freeze_time(FROZEN_TIME)
def test_lookml_ingest_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
def test_lookml_ingest_stateful(pytestconfig, tmp_path, mock_time):
output_file_name: str = "lookml_mces.json"
golden_file_name: str = "expected_output.json"
output_file_deleted_name: str = "lookml_mces_deleted_stateful.json"
Expand All @@ -754,8 +754,10 @@ def test_lookml_ingest_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_
"remove_stale_metadata": True,
"fail_safe_threshold": 100.0,
"state_provider": {
"type": "datahub",
"config": {"datahub_api": {"server": GMS_SERVER}},
"type": "file",
"config": {
"filename": f"{tmp_path}/checkpoint_mces.json",
},
},
},
},
Expand All @@ -767,56 +769,46 @@ def test_lookml_ingest_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_
}

pipeline_run1 = None
with mock.patch(
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
mock_datahub_graph,
) as mock_checkpoint:
mock_checkpoint.return_value = mock_datahub_graph
pipeline_run1_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( # type: ignore
base_pipeline_config # type: ignore
)
# Set the special properties for this run
pipeline_run1_config["source"]["config"]["emit_reachable_views_only"] = False
pipeline_run1_config["sink"]["config"][
"filename"
] = f"{tmp_path}/{output_file_name}"
pipeline_run1 = Pipeline.create(pipeline_run1_config)
pipeline_run1.run()
pipeline_run1.raise_from_status()
pipeline_run1.pretty_print_summary()
pipeline_run1_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( # type: ignore
base_pipeline_config # type: ignore
)
# Set the special properties for this run
pipeline_run1_config["source"]["config"]["emit_reachable_views_only"] = False
pipeline_run1_config["sink"]["config"][
"filename"
] = f"{tmp_path}/{output_file_name}"
pipeline_run1 = Pipeline.create(pipeline_run1_config)
pipeline_run1.run()
pipeline_run1.raise_from_status()
pipeline_run1.pretty_print_summary()

mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / output_file_name,
golden_path=f"{test_resources_dir}/{golden_file_name}",
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / output_file_name,
golden_path=f"{test_resources_dir}/{golden_file_name}",
)

checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1)
assert checkpoint1
assert checkpoint1.state

pipeline_run2 = None
with mock.patch(
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
mock_datahub_graph,
) as mock_checkpoint:
mock_checkpoint.return_value = mock_datahub_graph
pipeline_run2_config: Dict[str, Dict[str, Dict[str, Any]]] = dict(base_pipeline_config) # type: ignore
# Set the special properties for this run
pipeline_run2_config["source"]["config"]["emit_reachable_views_only"] = True
pipeline_run2_config["sink"]["config"][
"filename"
] = f"{tmp_path}/{output_file_deleted_name}"
pipeline_run2 = Pipeline.create(pipeline_run2_config)
pipeline_run2.run()
pipeline_run2.raise_from_status()
pipeline_run2.pretty_print_summary()
pipeline_run2_config: Dict[str, Dict[str, Dict[str, Any]]] = dict(base_pipeline_config) # type: ignore
# Set the special properties for this run
pipeline_run2_config["source"]["config"]["emit_reachable_views_only"] = True
pipeline_run2_config["sink"]["config"][
"filename"
] = f"{tmp_path}/{output_file_deleted_name}"
pipeline_run2 = Pipeline.create(pipeline_run2_config)
pipeline_run2.run()
pipeline_run2.raise_from_status()
pipeline_run2.pretty_print_summary()

mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / output_file_deleted_name,
golden_path=f"{test_resources_dir}/{golden_file_deleted_name}",
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / output_file_deleted_name,
golden_path=f"{test_resources_dir}/{golden_file_deleted_name}",
)
checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2)
assert checkpoint2
assert checkpoint2.state
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from typing import List

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
)
from datahub.ingestion.source.state.usage_common_state import (
BaseTimeWindowCheckpointState,
)
from datahub.ingestion.source.state_provider.file_ingestion_checkpointing_provider import (
FileIngestionCheckpointingProvider,
)
from tests.test_helpers.type_helpers import assert_not_null

pipeline_name: str = "test_pipeline"
job_names: List[JobId] = [JobId("job1"), JobId("job2")]
run_id: str = "test_run"


def test_file_ingestion_checkpointing_provider(tmp_path):
shubhamjagtap639 marked this conversation as resolved.
Show resolved Hide resolved
ctx: PipelineContext = PipelineContext(run_id=run_id, pipeline_name=pipeline_name)
provider = FileIngestionCheckpointingProvider.create(
{"filename": str(tmp_path / "checkpoint_mces.json")},
ctx,
name=FileIngestionCheckpointingProvider.__name__,
)
# 1. Create the individual job checkpoints with appropriate states.
# Job1 - Checkpoint with a BaseSQLAlchemyCheckpointState state
job1_state_obj = BaseSQLAlchemyCheckpointState()
job1_checkpoint = Checkpoint(
job_name=job_names[0],
pipeline_name=pipeline_name,
run_id=run_id,
state=job1_state_obj,
)
# Job2 - Checkpoint with a BaseTimeWindowCheckpointState state
job2_state_obj = BaseTimeWindowCheckpointState(
begin_timestamp_millis=10, end_timestamp_millis=100
)
job2_checkpoint = Checkpoint(
job_name=job_names[1],
pipeline_name=pipeline_name,
run_id=run_id,
state=job2_state_obj,
)

# 2. Set the provider's state_to_commit.
provider.state_to_commit = {
# NOTE: state_to_commit accepts only the aspect version of the checkpoint.
job_names[0]: assert_not_null(
job1_checkpoint.to_checkpoint_aspect(max_allowed_state_size=2**20)
),
job_names[1]: assert_not_null(
job2_checkpoint.to_checkpoint_aspect(max_allowed_state_size=2**20)
),
}

# 3. Perform the commit
# NOTE: This will commit the state to the pytest temp path json file.
provider.commit()
assert provider.committed

# 4. Get last committed state. This must match what has been committed earlier.
# NOTE: This will retrieve from pytest temp path json file.
job1_last_state = provider.get_latest_checkpoint(pipeline_name, job_names[0])
job2_last_state = provider.get_latest_checkpoint(pipeline_name, job_names[1])

# 5. Validate individual job checkpoint state values that have been committed and retrieved
# against the original values.
assert job1_last_state is not None
job1_last_checkpoint = Checkpoint.create_from_checkpoint_aspect(
job_name=job_names[0],
checkpoint_aspect=job1_last_state,
state_class=type(job1_state_obj),
)
assert job1_last_checkpoint == job1_checkpoint

assert job2_last_state is not None
job2_last_checkpoint = Checkpoint.create_from_checkpoint_aspect(
job_name=job_names[1],
checkpoint_aspect=job2_last_state,
state_class=type(job2_state_obj),
)
assert job2_last_checkpoint == job2_checkpoint
Loading