Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Apr 4, 2024
2 parents 18cf30d + bad96ed commit bed014c
Show file tree
Hide file tree
Showing 13 changed files with 213 additions and 108 deletions.
4 changes: 2 additions & 2 deletions docker/profiles/docker-compose.actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ x-datahub-actions-service: &datahub-actions-service
image: ${DATAHUB_ACTIONS_IMAGE:-${DATAHUB_ACTIONS_REPO:-acryldata}/datahub-actions}:${ACTIONS_VERSION:-v0.0.14}
env_file:
- datahub-actions/env/docker.env
- ${DATAHUB_LOCAL_COMMON_ENV:-datahub-actions/env/docker.env}
- ${DATAHUB_LOCAL_ACTIONS_ENV:-datahub-actions/env/docker.env}
- ${DATAHUB_LOCAL_COMMON_ENV:-empty.env}
- ${DATAHUB_LOCAL_ACTIONS_ENV:-empty.env}
environment:
ACTIONS_EXTRA_PACKAGES: ${ACTIONS_EXTRA_PACKAGES:-}
ACTIONS_CONFIG: ${ACTIONS_CONFIG:-}
Expand Down
4 changes: 2 additions & 2 deletions docker/profiles/docker-compose.frontend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ x-datahub-frontend-service: &datahub-frontend-service
- ${DATAHUB_MAPPED_FRONTEND_PORT:-9002}:9002
env_file:
- datahub-frontend/env/docker.env
- ${DATAHUB_LOCAL_COMMON_ENV:-datahub-frontend/env/docker.env}
- ${DATAHUB_LOCAL_FRONTEND_ENV:-datahub-frontend/env/docker.env}
- ${DATAHUB_LOCAL_COMMON_ENV:-empty.env}
- ${DATAHUB_LOCAL_FRONTEND_ENV:-empty.env}
environment: &datahub-frontend-service-env
KAFKA_BOOTSTRAP_SERVER: broker:29092
volumes:
Expand Down
16 changes: 8 additions & 8 deletions docker/profiles/docker-compose.gms.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ x-datahub-system-update-service: &datahub-system-update-service
- SystemUpdate
env_file:
- datahub-upgrade/env/docker.env
- ${DATAHUB_LOCAL_COMMON_ENV:-datahub-upgrade/env/docker.env}
- ${DATAHUB_LOCAL_SYS_UPDATE_ENV:-datahub-upgrade/env/docker.env}
- ${DATAHUB_LOCAL_COMMON_ENV:-empty.env}
- ${DATAHUB_LOCAL_SYS_UPDATE_ENV:-empty.env}
environment: &datahub-system-update-env
<<: [*primary-datastore-mysql-env, *graph-datastore-search-env, *search-datastore-env, *kafka-env]
SCHEMA_REGISTRY_SYSTEM_UPDATE: ${SCHEMA_REGISTRY_SYSTEM_UPDATE:-true}
Expand Down Expand Up @@ -95,8 +95,8 @@ x-datahub-gms-service: &datahub-gms-service
- ${DATAHUB_MAPPED_GMS_PORT:-8080}:8080
env_file:
- datahub-gms/env/docker.env
- ${DATAHUB_LOCAL_COMMON_ENV:-datahub-gms/env/docker.env}
- ${DATAHUB_LOCAL_GMS_ENV:-datahub-gms/env/docker.env}
- ${DATAHUB_LOCAL_COMMON_ENV:-empty.env}
- ${DATAHUB_LOCAL_GMS_ENV:-empty.env}
environment: &datahub-gms-env
<<: [*primary-datastore-mysql-env, *graph-datastore-search-env, *search-datastore-env, *datahub-quickstart-telemetry-env, *kafka-env]
healthcheck:
Expand Down Expand Up @@ -142,8 +142,8 @@ x-datahub-mae-consumer-service: &datahub-mae-consumer-service
- 9091:9091
env_file:
- datahub-mae-consumer/env/docker.env
- ${DATAHUB_LOCAL_COMMON_ENV:-datahub-mae-consumer/env/docker.env}
- ${DATAHUB_LOCAL_MAE_ENV:-datahub-mae-consumer/env/docker.env}
- ${DATAHUB_LOCAL_COMMON_ENV:-empty.env}
- ${DATAHUB_LOCAL_MAE_ENV:-empty.env}
environment: &datahub-mae-consumer-env
<<: [*primary-datastore-mysql-env, *graph-datastore-search-env, *search-datastore-env, *kafka-env]

Expand All @@ -168,8 +168,8 @@ x-datahub-mce-consumer-service: &datahub-mce-consumer-service
- 9090:9090
env_file:
- datahub-mce-consumer/env/docker.env
- ${DATAHUB_LOCAL_COMMON_ENV:-datahub-mce-consumer/env/docker.env}
- ${DATAHUB_LOCAL_MCE_ENV:-datahub-mce-consumer/env/docker.env}
- ${DATAHUB_LOCAL_COMMON_ENV:-empty.env}
- ${DATAHUB_LOCAL_MCE_ENV:-empty.env}
environment: &datahub-mce-consumer-env
<<: [*primary-datastore-mysql-env, *graph-datastore-search-env, *search-datastore-env, *datahub-quickstart-telemetry-env, *kafka-env]

Expand Down
4 changes: 4 additions & 0 deletions docker/profiles/empty.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Docker compose requires that all env_file entries exist.
# Because we have some optional env_file entries that can be set
# as environment variables, we need a default file to point at
# when those are not set.
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,6 @@ static <T> Map<String, Map<String, T>> merge(
Collectors.mapping(
Pair::getValue, Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));
}

String toAbbreviatedString(int maxWidth);
}
199 changes: 133 additions & 66 deletions metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@
from sqlalchemy.exc import ProgrammingError
from typing_extensions import Concatenate, ParamSpec

from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import get_sys_time
from datahub.ingestion.graph.client import get_default_graph
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.profiling.common import (
Cardinality,
convert_to_cardinality,
)
from datahub.ingestion.source.sql.sql_common import SQLSourceReport
from datahub.metadata.com.linkedin.pegasus2avro.schema import EditableSchemaMetadata
from datahub.metadata.schema_classes import (
DatasetFieldProfileClass,
DatasetProfileClass,
Expand Down Expand Up @@ -296,6 +299,9 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase):

query_combiner: SQLAlchemyQueryCombiner

platform: str
env: str

def _get_columns_to_profile(self) -> List[str]:
if not self.config.any_field_level_metrics_enabled():
return []
Expand Down Expand Up @@ -670,6 +676,16 @@ def generate_dataset_profile( # noqa: C901 (complexity)
profile.columnCount = len(all_columns)
columns_to_profile = set(self._get_columns_to_profile())

(
ignore_table_sampling,
columns_list_to_ignore_sampling,
) = _get_columns_to_ignore_sampling(
self.dataset_name,
self.config.tags_to_ignore_sampling,
self.platform,
self.env,
)

logger.debug(f"profiling {self.dataset_name}: flushing stage 1 queries")
self.query_combiner.flush()

Expand Down Expand Up @@ -732,76 +748,80 @@ def generate_dataset_profile( # noqa: C901 (complexity)
if not profile.rowCount:
continue

self._get_dataset_column_sample_values(column_profile, column)

if (
type_ == ProfilerDataType.INT
or type_ == ProfilerDataType.FLOAT
or type_ == ProfilerDataType.NUMERIC
not ignore_table_sampling
and column not in columns_list_to_ignore_sampling
):
self._get_dataset_column_min(column_profile, column)
self._get_dataset_column_max(column_profile, column)
self._get_dataset_column_mean(column_profile, column)
self._get_dataset_column_median(column_profile, column)
self._get_dataset_column_stdev(column_profile, column)

if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)
if cardinality in {
Cardinality.FEW,
Cardinality.MANY,
Cardinality.VERY_MANY,
}:
self._get_dataset_column_quantiles(column_profile, column)
self._get_dataset_column_histogram(column_profile, column)

elif type_ == ProfilerDataType.STRING:
if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)
self._get_dataset_column_sample_values(column_profile, column)

elif type_ == ProfilerDataType.DATETIME:
self._get_dataset_column_min(column_profile, column)
self._get_dataset_column_max(column_profile, column)

# FIXME: Re-add histogram once kl_divergence has been modified to support datetimes

if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)
if (
type_ == ProfilerDataType.INT
or type_ == ProfilerDataType.FLOAT
or type_ == ProfilerDataType.NUMERIC
):
self._get_dataset_column_min(column_profile, column)
self._get_dataset_column_max(column_profile, column)
self._get_dataset_column_mean(column_profile, column)
self._get_dataset_column_median(column_profile, column)
self._get_dataset_column_stdev(column_profile, column)

if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)
if cardinality in {
Cardinality.FEW,
Cardinality.MANY,
Cardinality.VERY_MANY,
}:
self._get_dataset_column_quantiles(column_profile, column)
self._get_dataset_column_histogram(column_profile, column)

elif type_ == ProfilerDataType.STRING:
if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)

else:
if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)
elif type_ == ProfilerDataType.DATETIME:
self._get_dataset_column_min(column_profile, column)
self._get_dataset_column_max(column_profile, column)

# FIXME: Re-add histogram once kl_divergence has been modified to support datetimes

if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)

else:
if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)

logger.debug(f"profiling {self.dataset_name}: flushing stage 3 queries")
self.query_combiner.flush()
Expand Down Expand Up @@ -896,6 +916,7 @@ class DatahubGEProfiler:

base_engine: Engine
platform: str # passed from parent source config
env: str

# The actual value doesn't matter, it just matters that we use it consistently throughout.
_datasource_name_base: str = "my_sqlalchemy_datasource"
Expand All @@ -906,12 +927,15 @@ def __init__(
report: SQLSourceReport,
config: GEProfilingConfig,
platform: str,
env: str = "PROD",
):
self.report = report
self.config = config
self.times_taken = []
self.total_row_count = 0

self.env = env

# TRICKY: The call to `.engine` is quite important here. Connection.connect()
# returns a "branched" connection, which does not actually use a new underlying
# DB-API object from the connection pool. Engine.connect() does what we want to
Expand Down Expand Up @@ -1151,6 +1175,8 @@ def _generate_single_profile(
self.report,
custom_sql,
query_combiner,
self.platform,
self.env,
).generate_dataset_profile()

time_taken = timer.elapsed_seconds()
Expand Down Expand Up @@ -1309,3 +1335,44 @@ def create_bigquery_temp_table(
return bigquery_temp_table
finally:
raw_connection.close()


def _get_columns_to_ignore_sampling(
dataset_name: str, tags_to_ignore: Optional[List[str]], platform: str, env: str
) -> Tuple[bool, List[str]]:
logger.debug("Collecting columns to ignore for sampling")

ignore_table: bool = False
columns_to_ignore: List[str] = []

if not tags_to_ignore:
return ignore_table, columns_to_ignore

dataset_urn = mce_builder.make_dataset_urn(
name=dataset_name, platform=platform, env=env
)

datahub_graph = get_default_graph()

dataset_tags = datahub_graph.get_tags(dataset_urn)
if dataset_tags:
ignore_table = any(
tag_association.tag.split("urn:li:tag:")[1] in tags_to_ignore
for tag_association in dataset_tags.tags
)

if not ignore_table:
metadata = datahub_graph.get_aspect(
entity_urn=dataset_urn, aspect_type=EditableSchemaMetadata
)

if metadata:
for schemaField in metadata.editableSchemaFieldInfo:
if schemaField.globalTags:
columns_to_ignore.extend(
schemaField.fieldPath
for tag_association in schemaField.globalTags.tags
if tag_association.tag.split("urn:li:tag:")[1] in tags_to_ignore
)

return ignore_table, columns_to_ignore
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ class GEProfilingConfig(ConfigModel):
description="Whether to profile external tables. Only Snowflake and Redshift supports this.",
)

tags_to_ignore_sampling: Optional[List[str]] = pydantic.Field(
default=None,
description=(
"Fixed list of tags to ignore sampling."
" If not specified, tables will be sampled based on `use_sampling`."
),
)

@pydantic.root_validator(pre=True)
def deprecate_bigquery_temp_table_schema(cls, values):
# TODO: Update docs to remove mention of this field.
Expand Down
Loading

0 comments on commit bed014c

Please sign in to comment.