Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Sep 9, 2024
2 parents d43053a + 5467481 commit f251b08
Show file tree
Hide file tree
Showing 22 changed files with 7,657 additions and 401 deletions.
24 changes: 19 additions & 5 deletions datahub-web-react/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4510,7 +4510,14 @@ brace-expansion@^1.1.7:
balanced-match "^1.0.0"
concat-map "0.0.1"

braces@^3.0.2, braces@~3.0.2:
braces@^3.0.3:
version "3.0.3"
resolved "https://registry.yarnpkg.com/braces/-/braces-3.0.3.tgz#490332f40919452272d55a8480adc0c441358789"
integrity sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==
dependencies:
fill-range "^7.1.1"

braces@~3.0.2:
version "3.0.2"
resolved "https://registry.yarnpkg.com/braces/-/braces-3.0.2.tgz#3454e1a462ee8d599e236df336cd9ea4f8afe107"
integrity sha512-b8um+L1RzM3WDSzvhm6gIz1yfTbBt6YTlcEKAvsmqCZZFw46z626lVj9j1yEPW33H5H+lBQpZMP1k8l+78Ha0A==
Expand Down Expand Up @@ -6052,6 +6059,13 @@ fill-range@^7.0.1:
dependencies:
to-regex-range "^5.0.1"

fill-range@^7.1.1:
version "7.1.1"
resolved "https://registry.yarnpkg.com/fill-range/-/fill-range-7.1.1.tgz#44265d3cac07e3ea7dc247516380643754a05292"
integrity sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==
dependencies:
to-regex-range "^5.0.1"

filter-obj@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/filter-obj/-/filter-obj-1.1.0.tgz#9b311112bc6c6127a16e016c6c5d7f19e0805c5b"
Expand Down Expand Up @@ -7654,11 +7668,11 @@ micromark@^2.11.3, micromark@~2.11.0, micromark@~2.11.3:
parse-entities "^2.0.0"

micromatch@^4.0.4, micromatch@^4.0.5:
version "4.0.5"
resolved "https://registry.yarnpkg.com/micromatch/-/micromatch-4.0.5.tgz#bc8999a7cbbf77cdc89f132f6e467051b49090c6"
integrity sha512-DMy+ERcEW2q8Z2Po+WNXuw3c5YaUSFjAO5GsJqfEl7UjvtIuFKO6ZrKvcItdy98dwFI2N1tg3zNIdKaQT+aNdA==
version "4.0.8"
resolved "https://registry.yarnpkg.com/micromatch/-/micromatch-4.0.8.tgz#d66fa18f3a47076789320b9b1af32bd86d9fa202"
integrity sha512-PXwfBhYu0hBCPw8Dn0E+WDYb7af3dSLVWKi3HGv84IdF4TyFoC0ysxFd0Goxw7nSv4T/PzEJQxsYsEiFCKo2BA==
dependencies:
braces "^3.0.2"
braces "^3.0.3"
picomatch "^2.3.1"

[email protected]:
Expand Down
4 changes: 2 additions & 2 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ module.exports = {
{
label: "Documentation Propagation",
type: "doc",
id: "docs/automation/docs-propagation",
id: "docs/automations/docs-propagation",
},
{
label: "Snowflake Tag Sync",
type: "doc",
id: "docs/automation/snowflake-tag-propagation",
id: "docs/automations/snowflake-tag-propagation",
className: "saasOnly",
},
],
Expand Down
File renamed without changes.
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
```
Re-running with stateful ingestion should automatically clear up the entities with old URNS and add entities with new URNs, therefore not duplicating the containers or jobs.

- #11313 - `datahub get` will no longer return a key aspect for entities that don't exist.

### Potential Downtime

### Deprecations
Expand Down
24 changes: 13 additions & 11 deletions metadata-ingestion/docs/sources/dbt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ Ingesting metadata from dbt requires either using the **dbt** module or the **db

### Concept Mapping

| Source Concept | DataHub Concept | Notes |
| --------------- | ------------------------------------------------------------- | ------------------ |
| `"dbt"` | [Data Platform](../../metamodel/entities/dataPlatform.md) | |
| dbt Source | [Dataset](../../metamodel/entities/dataset.md) | Subtype `Source` |
| dbt Seed | [Dataset](../../metamodel/entities/dataset.md) | Subtype `Seed` |
| dbt Model | [Dataset](../../metamodel/entities/dataset.md) | Subtype `Model` |
| dbt Snapshot | [Dataset](../../metamodel/entities/dataset.md) | Subtype `Snapshot` |
| dbt Test | [Assertion](../../metamodel/entities/assertion.md) | |
| dbt Test Result | [Assertion Run Result](../../metamodel/entities/assertion.md) | |
| Source Concept | DataHub Concept | Notes |
| -------------- | ---------------------------------------------------------------------- | ------------------ |
| Source | [Dataset](../../metamodel/entities/dataset.md) | Subtype `Source` |
| Seed | [Dataset](../../metamodel/entities/dataset.md) | Subtype `Seed` |
| Model | [Dataset](../../metamodel/entities/dataset.md) | Subtype `Model` |
| Snapshot | [Dataset](../../metamodel/entities/dataset.md) | Subtype `Snapshot` |
| Test | [Assertion](../../metamodel/entities/assertion.md) | |
| Test Result | [Assertion Run Result](../../metamodel/entities/assertion.md) | |
| Model Runs | [DataProcessInstance](../../metamodel/entities/dataProcessInstance.md) | |

Note:

1. It also generates lineage between the `dbt` nodes (e.g. ephemeral nodes that depend on other dbt sources) as well as lineage between the `dbt` nodes and the underlying (target) platform nodes (e.g. BigQuery Table -> dbt Source, dbt View -> BigQuery View).
2. We also support automated actions (like add a tag, term or owner) based on properties defined in dbt meta.
1. You must **run ingestion for both dbt and your data warehouse** (target platform). They can be run in any order.
2. It generates column lineage between the `dbt` nodes (e.g. when a model/snapshot depends on a dbt source or ephemeral model) as well as lineage between the `dbt` nodes and the underlying target platform nodes (e.g. BigQuery Table -> dbt source, dbt model -> BigQuery table/view).
3. It automatically generates "sibling" relationships between the dbt nodes and the target / data warehouse nodes. These nodes will show up in the UI with both platform logos.
4. We also support automated actions (like add a tag, term or owner) based on properties defined in dbt meta.
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ disallow_untyped_defs = yes

[tool:pytest]
asyncio_mode = auto
addopts = --cov=src --cov-report= --cov-config setup.cfg --strict-markers
addopts = --cov=src --cov-report= --cov-config setup.cfg --strict-markers -p no:faker
markers =
slow: marks tests that are slow to run, including all docker-based tests (deselect with '-m not slow')
integration: marks all integration tests, across all batches (deselect with '-m "not integration"')
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"importlib_metadata>=4.0.0; python_version < '3.10'",
"docker",
"expandvars>=0.6.5",
"avro-gen3==0.7.13",
"avro-gen3==0.7.16",
# "avro-gen3 @ git+https://github.com/acryldata/avro_gen@master#egg=avro-gen3",
"avro>=1.11.3,<1.12",
"python-dateutil>=2.8.0",
Expand Down
28 changes: 20 additions & 8 deletions metadata-ingestion/src/datahub/cli/get_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,28 @@ def urn(ctx: Any, urn: Optional[str], aspect: List[str], details: bool) -> None:

client = get_default_graph()

if aspect:
# If aspects are specified, we need to do the existence check first.
if not client.exists(urn):
raise click.ClickException(f"urn {urn} not found")

aspect_data = get_aspects_for_entity(
session=client._session,
gms_host=client.config.server,
entity_urn=urn,
aspects=aspect,
typed=False,
details=details,
)

if not aspect:
# If no aspects are specified and we only get a key aspect back, yield an error instead.
if len(aspect_data) == 1 and "key" in next(iter(aspect_data)).lower():
raise click.ClickException(f"urn {urn} not found")

click.echo(
json.dumps(
get_aspects_for_entity(
session=client._session,
gms_host=client.config.server,
entity_urn=urn,
aspects=aspect,
typed=False,
details=details,
),
aspect_data,
sort_keys=True,
indent=2,
)
Expand Down
19 changes: 14 additions & 5 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,21 @@ def _emit_generic(self, url: str, payload: str) -> None:
except HTTPError as e:
try:
info: Dict = response.json()
logger.debug(
"Full stack trace from DataHub:\n%s", info.get("stackTrace")
)
info.pop("stackTrace", None)

if info.get("stackTrace"):
logger.debug(
"Full stack trace from DataHub:\n%s", info.get("stackTrace")
)
info.pop("stackTrace", None)

hint = ""
if "unrecognized field found but not allowed" in (
info.get("message") or ""
):
hint = ", likely because the server version is too old relative to the client"

raise OperationalError(
f"Unable to emit metadata to DataHub GMS: {info.get('message')}",
f"Unable to emit metadata to DataHub GMS{hint}: {info.get('message')}",
info,
) from e
except JSONDecodeError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class AwsConnectionConfig(ConfigModel):
aws_role: Optional[Union[str, List[Union[str, AwsAssumeRoleConfig]]]] = Field(
default=None,
description="AWS roles to assume. If using the string format, the role ARN can be specified directly. "
"If using the object format, the role can be specified in the RoleArn field and additional available arguments are documented at https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sts.html?highlight=assume_role#STS.Client.assume_role",
"If using the object format, the role can be specified in the RoleArn field and additional available arguments are the same as [boto3's STS.Client.assume_role](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sts.html?highlight=assume_role#STS.Client.assume_role).",
)
aws_profile: Optional[str] = Field(
default=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class BigQueryQueriesExtractorConfig(BigQueryBaseConfig):
include_lineage: bool = True
include_queries: bool = True
include_usage_statistics: bool = True
include_query_usage_statistics: bool = False
include_query_usage_statistics: bool = True
include_operations: bool = True

region_qualifiers: List[str] = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import pydantic
from typing_extensions import Self

from datahub.configuration.common import ConfigModel
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.time_window_config import (
BaseTimeWindowConfig,
BucketDuration,
Expand Down Expand Up @@ -67,8 +67,16 @@ class SnowflakeQueriesExtractorConfig(ConfigModel):
# TODO: Support stateful ingestion for the time windows.
window: BaseTimeWindowConfig = BaseTimeWindowConfig()

# TODO: make this a proper allow/deny pattern
deny_usernames: List[str] = []
pushdown_deny_usernames: List[str] = pydantic.Field(
default=[],
description="List of snowflake usernames which will not be considered for lineage/usage/queries extraction. "
"This is primarily useful for improving performance by filtering out users with extremely high query volumes.",
)

user_email_pattern: AllowDenyPattern = pydantic.Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for user emails to filter in usage.",
)

temporary_tables_pattern: List[str] = pydantic.Field(
default=DEFAULT_TEMP_TABLES_PATTERNS,
Expand All @@ -88,7 +96,7 @@ class SnowflakeQueriesExtractorConfig(ConfigModel):
include_lineage: bool = True
include_queries: bool = True
include_usage_statistics: bool = True
include_query_usage_statistics: bool = False
include_query_usage_statistics: bool = True
include_operations: bool = True


Expand Down Expand Up @@ -150,6 +158,7 @@ def __init__(
bucket_duration=self.config.window.bucket_duration,
start_time=self.config.window.start_time,
end_time=self.config.window.end_time,
user_email_pattern=self.config.user_email_pattern,
# TODO make the rest of the fields configurable
),
generate_operations=self.config.include_operations,
Expand Down Expand Up @@ -281,7 +290,7 @@ def fetch_query_log(
start_time=self.config.window.start_time,
end_time=self.config.window.end_time,
bucket_duration=self.config.window.bucket_duration,
deny_usernames=self.config.deny_usernames,
deny_usernames=self.config.pushdown_deny_usernames,
)

with self.structured_reporter.report_exc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
include_lineage=self.config.include_table_lineage,
include_usage_statistics=self.config.include_usage_stats,
include_operations=self.config.include_operational_stats,
user_email_pattern=self.config.user_email_pattern,
),
structured_report=self.report,
filters=self.filters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ class SqlAggregatorReport(Report):
# Usage-related.
usage_skipped_missing_timestamp: int = 0
num_query_usage_stats_generated: int = 0
num_query_usage_stats_outside_window: int = 0

# Operation-related.
num_operations_generated: int = 0
Expand Down Expand Up @@ -432,6 +433,7 @@ def _need_schemas(self) -> bool:
or self.generate_usage_statistics
or self.generate_queries
or self.generate_operations
or self.generate_query_usage_statistics
)

def register_schema(
Expand Down Expand Up @@ -1033,9 +1035,9 @@ def gen_metadata(self) -> Iterable[MetadataChangeProposalWrapper]:
queries_generated: Set[QueryId] = set()

yield from self._gen_lineage_mcps(queries_generated)
yield from self._gen_remaining_queries(queries_generated)
yield from self._gen_usage_statistics_mcps()
yield from self._gen_operation_mcps(queries_generated)
yield from self._gen_remaining_queries(queries_generated)

def _gen_lineage_mcps(
self, queries_generated: Set[QueryId]
Expand Down Expand Up @@ -1322,9 +1324,15 @@ def _gen_query(
query_counter = self._query_usage_counts.get(query_id)
if not query_counter:
return
for bucket in self.usage_config.buckets():
count = query_counter.get(bucket)
if not count:

all_buckets = self.usage_config.buckets()

for bucket, count in query_counter.items():
if bucket not in all_buckets:
# What happens if we get a query with a timestamp that's outside our configured window?
# Theoretically this should never happen, since the audit logs are also fetched
# for the window. However, it's useful to have reporting for it, just in case.
self.report.num_query_usage_stats_outside_window += 1
continue

yield MetadataChangeProposalWrapper(
Expand Down
Loading

0 comments on commit f251b08

Please sign in to comment.