Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Apr 18, 2024
2 parents 8db7cc2 + d3fb698 commit 733c403
Show file tree
Hide file tree
Showing 34 changed files with 269 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.DatasetStatsSummary;
import com.linkedin.datahub.graphql.generated.Domain;
import com.linkedin.datahub.graphql.generated.ERModelRelationship;
import com.linkedin.datahub.graphql.generated.ERModelRelationshipProperties;
import com.linkedin.datahub.graphql.generated.EntityPath;
import com.linkedin.datahub.graphql.generated.EntityRelationship;
Expand Down Expand Up @@ -987,8 +988,10 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("listUsers", new ListUsersResolver(this.entityClient))
.dataFetcher("listGroups", new ListGroupsResolver(this.entityClient))
.dataFetcher(
"listRecommendations", new ListRecommendationsResolver(recommendationsService))
.dataFetcher("getEntityCounts", new EntityCountsResolver(this.entityClient))
"listRecommendations",
new ListRecommendationsResolver(recommendationsService, viewService))
.dataFetcher(
"getEntityCounts", new EntityCountsResolver(this.entityClient, viewService))
.dataFetcher("getAccessToken", new GetAccessTokenResolver(statefulTokenService))
.dataFetcher("listAccessTokens", new ListAccessTokensResolver(this.entityClient))
.dataFetcher(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package com.linkedin.datahub.graphql.resolvers;

import static com.linkedin.datahub.graphql.resolvers.search.SearchUtils.*;
import static com.linkedin.metadata.Constants.*;

import com.datahub.authentication.Authentication;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.StringArray;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.ValidationException;
import com.linkedin.datahub.graphql.generated.AndFilterInput;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.resolvers.search.SearchUtils;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
Expand All @@ -20,7 +23,10 @@
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.search.utils.ESUtils;
import com.linkedin.metadata.search.utils.QueryUtils;
import com.linkedin.metadata.service.ViewService;
import com.linkedin.view.DataHubViewInfo;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.context.OperationContext;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -226,4 +232,14 @@ public static Filter buildFilterWithUrns(@Nonnull Set<Urn> urns, @Nullable Filte
}
return QueryUtils.newFilter(urnMatchCriterion);
}

public static Filter viewFilter(
OperationContext opContext, ViewService viewService, String viewUrn) {
if (viewUrn == null) {
return null;
}
DataHubViewInfo viewInfo = resolveView(opContext, viewService, UrnUtils.getUrn(viewUrn));
Filter result = SearchUtils.combineFilters(null, viewInfo.getDefinition().getFilter());
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.linkedin.datahub.graphql.generated.EntityCountResults;
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.service.ViewService;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.opentelemetry.extension.annotations.WithSpan;
Expand All @@ -20,8 +21,11 @@ public class EntityCountsResolver implements DataFetcher<CompletableFuture<Entit

private final EntityClient _entityClient;

public EntityCountsResolver(final EntityClient entityClient) {
private final ViewService _viewService;

public EntityCountsResolver(final EntityClient entityClient, final ViewService viewService) {
_entityClient = entityClient;
_viewService = viewService;
}

@Override
Expand All @@ -44,7 +48,8 @@ public CompletableFuture<EntityCountResults> get(final DataFetchingEnvironment e
context.getOperationContext(),
input.getTypes().stream()
.map(EntityTypeMapper::getName)
.collect(Collectors.toList()));
.collect(Collectors.toList()),
viewFilter(context.getOperationContext(), _viewService, input.getViewUrn()));

// bind to a result.
List<EntityCountResult> resultList =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.linkedin.metadata.recommendation.EntityRequestContext;
import com.linkedin.metadata.recommendation.RecommendationsService;
import com.linkedin.metadata.recommendation.SearchRequestContext;
import com.linkedin.metadata.service.ViewService;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.opentelemetry.extension.annotations.WithSpan;
Expand All @@ -44,6 +45,7 @@ public class ListRecommendationsResolver
new ListRecommendationsResult(Collections.emptyList());

private final RecommendationsService _recommendationsService;
private final ViewService _viewService;

@WithSpan
@Override
Expand All @@ -60,6 +62,7 @@ public CompletableFuture<ListRecommendationsResult> get(DataFetchingEnvironment
_recommendationsService.listRecommendations(
context.getOperationContext(),
mapRequestContext(input.getRequestContext()),
viewFilter(context.getOperationContext(), _viewService, input.getViewUrn()),
input.getLimit());
return ListRecommendationsResult.builder()
.setModules(
Expand Down
5 changes: 5 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,11 @@ Input for the get entity counts endpoint
"""
input EntityCountInput {
types: [EntityType!]

"""
Optional - A View to apply when generating results
"""
viewUrn: String
}

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ input ListRecommendationsInput {
Max number of modules to return
"""
limit: Int

"""
Optional - A View to apply when generating results
"""
viewUrn: String
}

"""
Expand Down
6 changes: 6 additions & 0 deletions datahub-web-react/src/app/home/HomePageRecommendations.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
} from '../onboarding/config/HomePageOnboardingConfig';
import { useToggleEducationStepIdsAllowList } from '../onboarding/useToggleEducationStepIdsAllowList';
import { useBusinessAttributesFlag } from '../useAppConfig';
import { useUserContext } from '../context/useUserContext';

const PLATFORMS_MODULE_ID = 'Platforms';
const MOST_POPULAR_MODULE_ID = 'HighUsageEntities';
Expand Down Expand Up @@ -105,6 +106,9 @@ export const HomePageRecommendations = ({ user }: Props) => {
const browseEntityList = entityRegistry.getBrowseEntityTypes();
const userUrn = user?.urn;

const userContext = useUserContext();
const viewUrn = userContext.localState?.selectedViewUrn;

const businessAttributesFlag = useBusinessAttributesFlag();

const showSimplifiedHomepage = user?.settings?.appearance?.showSimplifiedHomepage;
Expand All @@ -113,6 +117,7 @@ export const HomePageRecommendations = ({ user }: Props) => {
variables: {
input: {
types: browseEntityList,
viewUrn
},
},
});
Expand All @@ -133,6 +138,7 @@ export const HomePageRecommendations = ({ user }: Props) => {
scenario,
},
limit: 10,
viewUrn
},
},
fetchPolicy: 'no-cache',
Expand Down
2 changes: 2 additions & 0 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ Set up a DataHub connection in Airflow, either via command line or the Airflow U
airflow connections add --conn-type 'datahub-rest' 'datahub_rest_default' --conn-host 'http://datahub-gms:8080' --conn-password '<optional datahub auth token>'
```

If you are using hosted Acryl Datahub then please use `https://YOUR_PREFIX.acryl.io/gms` as the `--conn-host` parameter.

#### Airflow UI

On the Airflow UI, go to Admin -> Connections and click the "+" symbol to create a new connection. Select "DataHub REST Server" from the dropdown for "Connection Type" and enter the appropriate values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@
datahub_rest_connection_config = Connection(
conn_id="datahub_rest_test",
conn_type="datahub_rest",
host="http://test_host:8080/",
host="http://test_host:8080",
extra=None,
)
datahub_rest_connection_config_with_timeout = Connection(
conn_id="datahub_rest_test",
conn_type="datahub_rest",
host="http://test_host:8080/",
host="http://test_host:8080",
extra=json.dumps({"timeout_sec": 5}),
)

Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import HTTPError, RequestException

from datahub.cli.cli_utils import get_system_auth
from datahub.cli.cli_utils import fixup_gms_url, get_system_auth
from datahub.configuration.common import ConfigurationError, OperationalError
from datahub.emitter.generic_emitter import Emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand Down Expand Up @@ -72,7 +72,7 @@ def __init__(
):
if not gms_server:
raise ConfigurationError("gms server is required")
self._gms_server = gms_server
self._gms_server = fixup_gms_url(gms_server)
self._token = token
self.server_config: Dict[str, Any] = {}

Expand Down
34 changes: 24 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,9 +689,28 @@ def generate_dataset_profile( # noqa: C901 (complexity)
logger.debug(f"profiling {self.dataset_name}: flushing stage 1 queries")
self.query_combiner.flush()

assert profile.rowCount is not None
full_row_count = profile.rowCount

if self.config.use_sampling and not self.config.limit:
self.update_dataset_batch_use_sampling(profile)

# Note that this row count may be different from the full_row_count if we are using sampling.
row_count: int = profile.rowCount
if profile.partitionSpec and "SAMPLE" in profile.partitionSpec.partition:
# Querying exact row count of sample using `_get_dataset_rows`.
# We are not using `self.config.sample_size` directly as the actual row count
# in the sample may be different than configured `sample_size`. For BigQuery,
# we've even seen 160k rows returned for a sample size of 10k.
logger.debug("Recomputing row count for the sample")

# Note that we can't just call `self._get_dataset_rows(profile)` here because
# there's some sort of caching happening that will return the full table row count
# instead of the sample row count.
row_count = self.dataset.get_row_count(str(self.dataset._table))

profile.partitionSpec.partition += f" (sample rows {row_count})"

columns_profiling_queue: List[_SingleColumnSpec] = []
if columns_to_profile:
for column in all_columns:
Expand All @@ -708,16 +727,6 @@ def generate_dataset_profile( # noqa: C901 (complexity)
logger.debug(f"profiling {self.dataset_name}: flushing stage 2 queries")
self.query_combiner.flush()

assert profile.rowCount is not None
row_count: int # used for null counts calculation
if profile.partitionSpec and "SAMPLE" in profile.partitionSpec.partition:
# Querying exact row count of sample using `_get_dataset_rows`.
# We are not using `self.config.sample_size` directly as actual row count
# in sample may be slightly different (more or less) than configured `sample_size`.
self._get_dataset_rows(profile)

row_count = profile.rowCount

for column_spec in columns_profiling_queue:
column = column_spec.column
column_profile = column_spec.column_profile
Expand Down Expand Up @@ -825,6 +834,10 @@ def generate_dataset_profile( # noqa: C901 (complexity)

logger.debug(f"profiling {self.dataset_name}: flushing stage 3 queries")
self.query_combiner.flush()

# Reset the row count to the original value.
profile.rowCount = full_row_count

return profile

def init_profile(self):
Expand Down Expand Up @@ -1274,6 +1287,7 @@ def create_bigquery_temp_table(
try:
cursor: "BigQueryCursor" = cast("BigQueryCursor", raw_connection.cursor())
try:
logger.debug(f"Creating temporary table for {table_pretty_name}: {bq_sql}")
cursor.execute(bq_sql)
except Exception as e:
if not instance.config.catch_exceptions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
Expand Down Expand Up @@ -84,3 +85,5 @@ class SigmaSourceConfig(
default={},
description="A mapping of the sigma workspace/workbook/chart folder path to all chart's data sources platform details present inside that folder path.",
)

stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class SigmaSource(StatefulIngestionSourceBase, TestableSource):
platform: str = "sigma"

def __init__(self, config: SigmaSourceConfig, ctx: PipelineContext):
super(SigmaSource, self).__init__(config, ctx)
super().__init__(config, ctx)
self.config = config
self.reporter = SigmaSourceReport()
self.dataset_upstream_urn_mapping: Dict[str, List[str]] = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def get_profile_request(
rows_count=table.rows_count,
):
logger.debug(
f"Dataset {dataset_name} was not eliagable for profiling due to last_altered, size in bytes or count of rows limit"
f"Dataset {dataset_name} was not eligible for profiling due to last_altered, size in bytes or count of rows limit"
)
# Profile only table level if dataset is filtered from profiling
# due to size limits alone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ class SupersetSource(StatefulIngestionSourceBase):
config: SupersetConfig
report: StaleEntityRemovalSourceReport
platform = "superset"
stale_entity_removal_handler: StaleEntityRemovalHandler

def __hash__(self):
return id(self)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
from typing import Callable, List, Optional, Union, cast
from typing import Callable, Dict, List, Optional, Union, cast

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import (
KeyValuePattern,
TransformerSemanticsConfigModel,
Expand All @@ -15,7 +14,6 @@
GlobalTagsClass,
MetadataChangeProposalClass,
TagAssociationClass,
TagKeyClass,
)
from datahub.utilities.urns.tag_urn import TagUrn

Expand All @@ -33,13 +31,13 @@ class AddDatasetTags(DatasetTagsTransformer):

ctx: PipelineContext
config: AddDatasetTagsConfig
processed_tags: List[TagAssociationClass]
processed_tags: Dict[str, TagAssociationClass]

def __init__(self, config: AddDatasetTagsConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config
self.processed_tags = []
self.processed_tags = {}

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetTags":
Expand All @@ -58,9 +56,9 @@ def transform_aspect(
tags_to_add = self.config.get_tags_to_add(entity_urn)
if tags_to_add is not None:
out_global_tags_aspect.tags.extend(tags_to_add)
self.processed_tags.extend(
tags_to_add
) # Keep track of tags added so that we can create them in handle_end_of_stream
# Keep track of tags added so that we can create them in handle_end_of_stream
for tag in tags_to_add:
self.processed_tags.setdefault(tag.tag, tag)

return self.get_result_semantics(
self.config, self.ctx.graph, entity_urn, out_global_tags_aspect
Expand All @@ -76,19 +74,12 @@ def handle_end_of_stream(

logger.debug("Generating tags")

for tag_association in self.processed_tags:
ids: List[str] = TagUrn.create_from_string(
tag_association.tag
).get_entity_id()

assert len(ids) == 1, "Invalid Tag Urn"

tag_name: str = ids[0]

for tag_association in self.processed_tags.values():
tag_urn = TagUrn.create_from_string(tag_association.tag)
mcps.append(
MetadataChangeProposalWrapper(
entityUrn=builder.make_tag_urn(tag=tag_name),
aspect=TagKeyClass(name=tag_name),
entityUrn=tag_urn.urn(),
aspect=tag_urn.to_key_aspect(),
)
)

Expand Down
Loading

0 comments on commit 733c403

Please sign in to comment.