Skip to content

Commit

Permalink
feat(ingest): allow lower freq profiling
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal committed Jul 24, 2023
1 parent c0dbea8 commit 97e24db
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source_config.operation_config import (
OperationConfig,
is_profiling_enabled,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import StatusClass
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
SchemaField,
Expand Down Expand Up @@ -200,6 +204,12 @@ class ElasticProfiling(ConfigModel):
default=False,
description="Whether to enable profiling for the elastic search source.",
)
operation_config: OperationConfig = Field(
default_factory=OperationConfig, description="To specify operation configs."
)

def is_profiling_enabld(self) -> bool:
return self.enabled and is_profiling_enabled(self.operation_config)


class CollapseUrns(ConfigModel):
Expand Down Expand Up @@ -515,7 +525,7 @@ def _extract_mcps(
),
)

if self.source_config.profiling.enabled:
if self.source_config.profiling.is_profiling_enabld():
if self.cat_response is None:
self.cat_response = self.client.cat.indices(
params={
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import datetime
import logging
from typing import Optional

import click
import pydantic
from pydantic.fields import Field

from datahub.configuration.common import ConfigModel, ConfigurationError

logger = logging.getLogger(__name__)


class OperationConfig(ConfigModel):
lower_freq_profile_enabled: bool = Field(
default=False,
description="Whether to do profiling at lower freq or not. This does not do any scheduling just adds additional checks to when not to run profiling.",
)
profile_day_of_week: Optional[int] = Field(
default=None,
description="Number between 0 to 6 for day of week (both inclusive). 0 is Monday and 6 is Sunday. If not specified, defaults to Nothing and this field does not take affect.",
)
profile_date_of_month: Optional[int] = Field(
default=None,
description="Number between 1 to 31 for date of month (both inclusive). If not specified, defaults to Nothing and this field does not take affect.",
)

@pydantic.validator("profile_day_of_week")
def validate_profile_day_of_week(cls, v) -> Optional[int]:
profile_day_of_week = v
if profile_day_of_week is None:
return None
if profile_day_of_week < 0 or profile_day_of_week > 6:
raise ConfigurationError(
f"Invalid value {profile_day_of_week} for profile_day_of_week. Must be between 0 to 6 (both inclusive)."
)
return profile_day_of_week

@pydantic.validator("profile_date_of_month")
def validate_profile_date_of_month(cls, v) -> Optional[int]:
profile_date_of_month = v
if profile_date_of_month is None:
return None
if profile_date_of_month < 1 or profile_date_of_month > 31:
raise ConfigurationError(
f"Invalid value {profile_date_of_month} for profile_date_of_month. Must be between 1 to 31 (both inclusive)."
)
return profile_date_of_month


def is_profiling_enabled(operation_config: OperationConfig) -> bool:
if operation_config.lower_freq_profile_enabled is False:
return True
if (
operation_config.profile_day_of_week is None
and operation_config.profile_date_of_month is None
):
click.secho(
"Lower freq profiling setting is enabled but no day of week or date of month is specified. Profiling will be done.",
fg="yellow",
)
logger.info("Lower freq profiling setting is enabled.")
today = datetime.date.today()
if (
operation_config.profile_day_of_week is not None
and operation_config.profile_date_of_month != today.weekday()
):
click.secho(
"Profiling won't be done because weekday does not match config profile_date_of_month.",
fg="yellow",
)
return False
if (
operation_config.profile_date_of_month is not None
and operation_config.profile_date_of_month != today.day
):
click.secho(
"Profiling won't be done because date of month does not match config profile_date_of_month.",
fg="yellow",
)
return False
return True

0 comments on commit 97e24db

Please sign in to comment.