Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): stateful redundant run skip handler #8467

Merged
merged 33 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
46babb6
fix(ingest): stateful redundant run skip handler WIP
mayurinehate Jul 17, 2023
02fc770
changes to consider suggested changes in time window
mayurinehate Jul 18, 2023
3f1487a
changes to update state only in case of complete success
mayurinehate Jul 19, 2023
d8be681
add unit tests for should_skip_this_run and suggest_run_time_window
mayurinehate Jul 20, 2023
da3904e
more tests, remove |
mayurinehate Jul 21, 2023
887d3b7
Merge branch 'master' into redundant_run_fix
anshbansal Jul 31, 2023
2a1ac2e
Merge branch 'master' into redundant_run_fix
mayurinehate Aug 1, 2023
1b81436
Merge branch 'master' into redundant_run_fix
mayurinehate Aug 3, 2023
6405f21
small refractor and test for snowflake stateful ingestion
mayurinehate Aug 3, 2023
e35203a
Merge branch 'master' into redundant_run_fix
mayurinehate Aug 4, 2023
686e5f9
fix lint
mayurinehate Aug 4, 2023
b0dc114
Merge branch 'master' into redundant_run_fix
mayurinehate Aug 8, 2023
201ccbc
Merge branch 'master' into redundant_run_fix
mayurinehate Aug 14, 2023
3127e2e
Merge branch 'master' into redundant_run_fix
mayurinehate Aug 17, 2023
8ceb21c
address review comments
mayurinehate Aug 17, 2023
9c275c9
add timestamps in report
mayurinehate Aug 18, 2023
45c09d9
rewrite redundant run conditions, more tests
mayurinehate Aug 18, 2023
39ceb6a
fix state for ignore_start_time_lineage and time reporting for lineage
mayurinehate Aug 18, 2023
c3fe256
Merge branch 'master' into redundant_run_fix
mayurinehate Aug 18, 2023
b366ce6
refractor to prefer mypy check over runtime failure
mayurinehate Aug 18, 2023
73043c5
Merge branch 'master' into redundant_run_fix
mayurinehate Aug 18, 2023
3013168
more changes, tests
mayurinehate Aug 18, 2023
75a7abc
Merge remote-tracking branch 'refs/remotes/origin/redundant_run_fix' …
mayurinehate Aug 18, 2023
126804c
Merge branch 'master' into redundant_run_fix
mayurinehate Aug 22, 2023
7afc105
do not floor start time if absolute start time is specified
mayurinehate Aug 22, 2023
59b21d7
nit changes
mayurinehate Aug 23, 2023
8851ff2
Merge branch 'master' into redundant_run_fix
mayurinehate Aug 23, 2023
663e5e2
suggested time in logs
mayurinehate Aug 23, 2023
fd09d3a
Merge remote-tracking branch 'refs/remotes/origin/redundant_run_fix' …
mayurinehate Aug 23, 2023
c286a8d
fix issue with zero usage aspects
mayurinehate Aug 23, 2023
a7a2c1c
Update metadata-ingestion/src/datahub/utilities/time.py
asikowitz Aug 23, 2023
8c79d75
Merge branch 'master' into redundant_run_fix
asikowitz Aug 25, 2023
fe4702c
Merge branch 'master' into redundant_run_fix
mayurinehate Aug 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,15 @@ Example code:
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
# Skip a redundant run
if self.redundant_run_skip_handler.should_skip_this_run(
cur_start_time_millis=datetime_to_ts_millis(self.config.start_time)
cur_start_time_millis=self.config.start_time
):
return

# Generate the workunits.
# <code for generating the workunits>
# Update checkpoint state for this run.
self.redundant_run_skip_handler.update_state(
start_time_millis=datetime_to_ts_millis(self.config.start_time),
end_time_millis=datetime_to_ts_millis(self.config.end_time),
start_time_millis=self.config.start_time,
end_time_millis=self.config.end_time,
)
```
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,15 @@ def default_start_time(
assert delta < timedelta(
0
), "Relative start time should start with minus sign (-) e.g. '-2 days'."
assert abs(delta) > get_bucket_duration_delta(
assert abs(delta) >= get_bucket_duration_delta(
values["bucket_duration"]
), "Relative start time should be in terms of configured bucket duration. e.g '-2 days' or '-2 hours'."
return values["end_time"] + delta
return get_time_bucket(
values["end_time"] + delta, values["bucket_duration"]
)
except humanfriendly.InvalidTimespan:
# We do not floor start_time to the bucket start time if absolute start time is specified.
# If user has specified absolute start time in recipe, it's most likely that he means it.
return parse_absolute_time(v)

return v
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,20 @@
)
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler
from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantRunSkipHandler,
RedundantLineageRunSkipHandler,
RedundantUsageRunSkipHandler,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.ingestion.source_report.ingestion_stage import (
LINEAGE_EXTRACTION,
METADATA_EXTRACTION,
PROFILING,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import (
Status,
SubTypes,
Expand Down Expand Up @@ -122,7 +128,6 @@
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.sqlglot_lineage import SchemaResolver, sqlglot_lineage
from datahub.utilities.time import datetime_to_ts_millis

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -228,10 +233,36 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):

set_dataset_urn_to_lower(self.config.convert_urns_to_lowercase)

self.redundant_lineage_run_skip_handler: Optional[
RedundantLineageRunSkipHandler
] = None
if self.config.enable_stateful_lineage_ingestion:
self.redundant_lineage_run_skip_handler = RedundantLineageRunSkipHandler(
source=self,
config=self.config,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)

# For database, schema, tables, views, etc
self.lineage_extractor = BigqueryLineageExtractor(config, self.report)
self.lineage_extractor = BigqueryLineageExtractor(
config, self.report, self.redundant_lineage_run_skip_handler
)

redundant_usage_run_skip_handler: Optional[RedundantUsageRunSkipHandler] = None
if self.config.enable_stateful_usage_ingestion:
redundant_usage_run_skip_handler = RedundantUsageRunSkipHandler(
source=self,
config=self.config,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)

self.usage_extractor = BigQueryUsageExtractor(
config, self.report, dataset_urn_builder=self.gen_dataset_urn_from_ref
config,
self.report,
dataset_urn_builder=self.gen_dataset_urn_from_ref,
redundant_run_skip_handler=redundant_usage_run_skip_handler,
)

self.domain_registry: Optional[DomainRegistry] = None
Expand All @@ -240,15 +271,8 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
cached_domains=[k for k in self.config.domain], graph=self.ctx.graph
)

self.redundant_run_skip_handler = RedundantRunSkipHandler(
source=self,
config=self.config,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)

self.profiling_state_handler: Optional[ProfilingHandler] = None
if self.config.store_last_profiling_timestamps:
if self.config.enable_stateful_profiling:
self.profiling_state_handler = ProfilingHandler(
source=self,
config=self.config,
Expand All @@ -271,7 +295,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
self.sql_parser_schema_resolver = SchemaResolver(
platform=self.platform, env=self.config.env
)

self.add_config_to_report()
atexit.register(cleanup, config)

@classmethod
Expand Down Expand Up @@ -502,68 +526,50 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
conn: bigquery.Client = get_bigquery_client(self.config)
self.add_config_to_report()

projects = self._get_projects(conn)
if not projects:
return

for project_id in projects:
self.report.set_ingestion_stage(project_id.id, "Metadata Extraction")
self.report.set_ingestion_stage(project_id.id, METADATA_EXTRACTION)
logger.info(f"Processing project: {project_id.id}")
yield from self._process_project(conn, project_id)

if self._should_ingest_usage():
if self.config.include_usage_statistics:
yield from self.usage_extractor.get_usage_workunits(
mayurinehate marked this conversation as resolved.
Show resolved Hide resolved
[p.id for p in projects], self.table_refs
)

if self._should_ingest_lineage():
for project in projects:
self.report.set_ingestion_stage(project.id, "Lineage Extraction")
self.report.set_ingestion_stage(project.id, LINEAGE_EXTRACTION)
mayurinehate marked this conversation as resolved.
Show resolved Hide resolved
yield from self.generate_lineage(project.id)

def _should_ingest_usage(self) -> bool:
if not self.config.include_usage_statistics:
return False

if self.config.store_last_usage_extraction_timestamp:
if self.redundant_run_skip_handler.should_skip_this_run(
cur_start_time_millis=datetime_to_ts_millis(self.config.start_time)
):
self.report.report_warning(
"usage-extraction",
f"Skip this run as there was a run later than the current start time: {self.config.start_time}",
)
return False
else:
if self.redundant_lineage_run_skip_handler:
# Update the checkpoint state for this run.
self.redundant_run_skip_handler.update_state(
start_time_millis=datetime_to_ts_millis(self.config.start_time),
end_time_millis=datetime_to_ts_millis(self.config.end_time),
self.redundant_lineage_run_skip_handler.update_state(
self.config.start_time, self.config.end_time
)
return True

def _should_ingest_lineage(self) -> bool:
if not self.config.include_table_lineage:
return False

if self.config.store_last_lineage_extraction_timestamp:
if self.redundant_run_skip_handler.should_skip_this_run(
cur_start_time_millis=datetime_to_ts_millis(self.config.start_time)
):
# Skip this run
self.report.report_warning(
"lineage-extraction",
f"Skip this run as there was a run later than the current start time: {self.config.start_time}",
)
return False
else:
# Update the checkpoint state for this run.
self.redundant_run_skip_handler.update_state(
start_time_millis=datetime_to_ts_millis(self.config.start_time),
end_time_millis=datetime_to_ts_millis(self.config.end_time),
)
if (
self.redundant_lineage_run_skip_handler
and self.redundant_lineage_run_skip_handler.should_skip_this_run(
cur_start_time=self.config.start_time,
cur_end_time=self.config.end_time,
)
):
# Skip this run
self.report.report_warning(
"lineage-extraction",
"Skip this run as there was already a run for current ingestion window.",
)
return False

return True

def _get_projects(self, conn: bigquery.Client) -> List[BigqueryProject]:
Expand Down Expand Up @@ -664,7 +670,7 @@ def _process_project(

if self.config.is_profiling_enabled():
logger.info(f"Starting profiling project {project_id}")
self.report.set_ingestion_stage(project_id, "Profiling")
self.report.set_ingestion_stage(project_id, PROFILING)
yield from self.profiler.get_workunits(
project_id=project_id,
tables=db_tables,
Expand Down Expand Up @@ -1328,3 +1334,13 @@ def add_config_to_report(self):
self.report.use_exported_bigquery_audit_metadata = (
self.config.use_exported_bigquery_audit_metadata
)
self.report.stateful_lineage_ingestion_enabled = (
self.config.enable_stateful_lineage_ingestion
)
self.report.stateful_usage_ingestion_enabled = (
self.config.enable_stateful_usage_ingestion
)
self.report.window_start_time, self.report.window_end_time = (
self.config.start_time,
self.config.end_time,
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,22 @@
import dataclasses
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from datetime import datetime
from typing import Counter, Dict, List, Optional

import pydantic

from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.stats_collections import TopKDict, int_top_k_dict

logger: logging.Logger = logging.getLogger(__name__)


@dataclass
class BigQueryV2Report(ProfilingSqlReport):
class BigQueryV2Report(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowReport):
num_total_lineage_entries: TopKDict[str, int] = field(default_factory=TopKDict)
num_skipped_lineage_entries_missing_data: TopKDict[str, int] = field(
default_factory=int_top_k_dict
Expand Down Expand Up @@ -52,7 +53,6 @@ class BigQueryV2Report(ProfilingSqlReport):
use_date_sharded_audit_log_tables: Optional[bool] = None
log_page_size: Optional[pydantic.PositiveInt] = None
use_exported_bigquery_audit_metadata: Optional[bool] = None
end_time: Optional[datetime] = None
log_entry_start_time: Optional[str] = None
log_entry_end_time: Optional[str] = None
audit_start_time: Optional[str] = None
Expand Down Expand Up @@ -88,23 +88,14 @@ class BigQueryV2Report(ProfilingSqlReport):
default_factory=collections.Counter
)
usage_state_size: Optional[str] = None
ingestion_stage: Optional[str] = None
ingestion_stage_durations: TopKDict[str, float] = field(default_factory=TopKDict)

_timer: Optional[PerfTimer] = field(
default=None, init=False, repr=False, compare=False
)
lineage_start_time: Optional[datetime] = None
lineage_end_time: Optional[datetime] = None
stateful_lineage_ingestion_enabled: bool = False

def set_ingestion_stage(self, project: str, stage: str) -> None:
if self._timer:
elapsed = round(self._timer.elapsed_seconds(), 2)
logger.info(
f"Time spent in stage <{self.ingestion_stage}>: {elapsed} seconds"
)
if self.ingestion_stage:
self.ingestion_stage_durations[self.ingestion_stage] = elapsed
else:
self._timer = PerfTimer()
usage_start_time: Optional[datetime] = None
usage_end_time: Optional[datetime] = None
stateful_usage_ingestion_enabled: bool = False

self.ingestion_stage = f"{project}: {stage} at {datetime.now(timezone.utc)}"
self._timer.start()
def set_ingestion_stage(self, project_id: str, stage: str) -> None:
self.report_ingestion_stage_start(f"{project_id}: {stage}")
Loading
Loading