Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/glue): use resource_links in catalog correctly #1

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datahub-frontend/app/auth/AuthModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.util.Configuration;
import config.ConfigurationProvider;
import controllers.SsoCallbackController;
import io.datahubproject.metadata.context.ValidationContext;
import java.nio.charset.StandardCharsets;
import java.util.Collections;

Expand Down Expand Up @@ -187,6 +188,7 @@ protected OperationContext provideOperationContext(
.authorizationContext(AuthorizationContext.builder().authorizer(Authorizer.EMPTY).build())
.searchContext(SearchContext.EMPTY)
.entityRegistryContext(EntityRegistryContext.builder().build(EmptyEntityRegistry.EMPTY))
.validationContext(ValidationContext.builder().alternateValidation(false).build())
.build(systemAuthentication);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.datahubproject.metadata.context.OperationContextConfig;
import io.datahubproject.metadata.context.RetrieverContext;
import io.datahubproject.metadata.context.ServicesRegistryContext;
import io.datahubproject.metadata.context.ValidationContext;
import io.datahubproject.metadata.services.RestrictedService;
import java.util.List;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -161,7 +162,8 @@ protected OperationContext javaSystemOperationContext(
@Nonnull final GraphService graphService,
@Nonnull final SearchService searchService,
@Qualifier("baseElasticSearchComponents")
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components) {
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components,
@Nonnull final ConfigurationProvider configurationProvider) {

EntityServiceAspectRetriever entityServiceAspectRetriever =
EntityServiceAspectRetriever.builder()
Expand All @@ -186,6 +188,10 @@ protected OperationContext javaSystemOperationContext(
.aspectRetriever(entityServiceAspectRetriever)
.graphRetriever(systemGraphRetriever)
.searchRetriever(searchServiceSearchRetriever)
.build(),
ValidationContext.builder()
.alternateValidation(
configurationProvider.getFeatureFlags().isAlternateMCPValidation())
.build());

entityServiceAspectRetriever.setSystemOperationContext(systemOperationContext);
Expand Down
2 changes: 1 addition & 1 deletion docker/kafka-setup/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG KAFKA_DOCKER_VERSION=7.4.6
ARG KAFKA_DOCKER_VERSION=7.7.1

# Defining custom repo urls for use in enterprise environments. Re-used between stages below.
ARG ALPINE_REPO_URL=http://dl-cdn.alpinelinux.org/alpine
Expand Down
2 changes: 2 additions & 0 deletions docker/profiles/docker-compose.gms.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ x-datahub-gms-service: &datahub-gms-service
environment: &datahub-gms-env
<<: [*primary-datastore-mysql-env, *graph-datastore-search-env, *search-datastore-env, *datahub-quickstart-telemetry-env, *kafka-env]
ELASTICSEARCH_QUERY_CUSTOM_CONFIG_FILE: ${ELASTICSEARCH_QUERY_CUSTOM_CONFIG_FILE:-search_config.yaml}
ALTERNATE_MCP_VALIDATION: ${ALTERNATE_MCP_VALIDATION:-true}
healthcheck:
test: curl -sS --fail http://datahub-gms:${DATAHUB_GMS_PORT:-8080}/health
start_period: 90s
Expand Down Expand Up @@ -182,6 +183,7 @@ x-datahub-mce-consumer-service: &datahub-mce-consumer-service
- ${DATAHUB_LOCAL_MCE_ENV:-empty2.env}
environment: &datahub-mce-consumer-env
<<: [*primary-datastore-mysql-env, *graph-datastore-search-env, *search-datastore-env, *datahub-quickstart-telemetry-env, *kafka-env]
ALTERNATE_MCP_VALIDATION: ${ALTERNATE_MCP_VALIDATION:-true}

x-datahub-mce-consumer-service-dev: &datahub-mce-consumer-service-dev
<<: *datahub-mce-consumer-service
Expand Down
2 changes: 1 addition & 1 deletion docs-website/docusaurus.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ module.exports = {
announcementBar: {
id: "announcement-2",
content:
'<div style="display: flex; justify-content: center; align-items: center;width: 100%;"><!--img src="/img/acryl-logo-white-mark.svg" / --><div style="font-size: .8rem; font-weight: 600; background-color: white; color: #111; padding: 0px 8px; border-radius: 4px; margin-right:12px;">NEW</div><p><span>Join us at Metadata & AI Summit, Oct. 29 & 30!</span></p><a href="http://www.acryldata.io/conference?utm_source=datahub_web&utm_medium=metadata_ai_2024&utm_campaign=home_banner" target="_blank" class="button">Register</a></div>',
'<div style="display: flex; justify-content: center; align-items: center;width: 100%;"><!--img src="/img/acryl-logo-white-mark.svg" / --><div style="font-size: .8rem; font-weight: 600; background-color: white; color: #111; padding: 0px 8px; border-radius: 4px; margin-right:12px;">NEW</div><p>Join us at Metadata & AI Summit, Oct. 29 & 30!</p><a href="http://www.acryldata.io/conference?utm_source=datahub_web&utm_medium=metadata_ai_2024&utm_campaign=home_banner" target="_blank" class="button">Register<span> →</span></a></div>',
backgroundColor: "#111",
textColor: "#ffffff",
isCloseable: false,
Expand Down
7 changes: 7 additions & 0 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,19 @@ module.exports = {
{
label: "Automations",
type: "category",
collapsed: false,
items: [
{
label: "Documentation Propagation",
type: "doc",
id: "docs/automations/docs-propagation",
},
{
label: "BigQuery Metadata Sync",
type: "doc",
id: "docs/automations/bigquery-metadata-sync",
className: "saasOnly",
},
{
label: "Snowflake Tag Sync",
type: "doc",
Expand Down
29 changes: 21 additions & 8 deletions docs-website/src/styles/global.scss
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,34 @@ div[class^="announcementBar"] {
>div {
display: flex;
align-items: center;
> div {
@media (max-width: 580px) {
display: none;
}
}
a>span {
@media (max-width: 580px) {
display: none;
}
}

>p {
text-align: left;
line-height: 1.1rem;
margin: 0;

>span {
@media (max-width: 780px) {
display: none;
}
}

@media (max-width: 480px) {
display: none;
@media (max-width: 580px) {
font-size: .9rem;
}
// >span {
// @media (max-width: 780px) {
// display: none;
// }
// }

// @media (max-width: 480px) {
// display: none;
// }
}
}

Expand Down
177 changes: 177 additions & 0 deletions docs/automations/bigquery-metadata-sync.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import FeatureAvailability from '@site/src/components/FeatureAvailability';

# BigQuery Metadata Sync Automation

<FeatureAvailability saasOnly />

## Introduction

BigQuery Metadata Sync is an automation that synchronizes DataHub Tags, Table and Column descriptions, and Column Glossary Terms with
BigQuery. This automation is exclusively available in DataHub Cloud (Acryl).

## Use-Cases

- Maintain consistent metadata across DataHub and BigQuery
- Improve data discovery by propagating rich descriptions back to BigQuery
- Enhance data governance by applying Policy Tags based on DataHub Glossary Terms
- Streamline data classification by syncing DataHub Tags to BigQuery Labels
- Facilitate compliance efforts by automatically tagging sensitive data columns
- Support data lineage tracking by keeping metadata aligned across platforms

## Capabilities

- Automatically add DataHub Tags as BigQuery Labels to tables
- Automatically add DataHub Table descriptions to BigQuery Tables
- Automatically add DataHub Column descriptions to BigQuery Columns
- Automatically add DataHub Glossary Terms as Policy Tags to BigQuery Columns (under a **DataHub** taxonomy created in BigQuery)
- Automatically remove Policy Tags/Table Labels when removed in DataHub


## Required Bigquery Permissions

| Action | Required Permission(s) |
|--------|------------------------|
| Create/update policy tags and taxonomies | `bigquery.taxonomies.create` <br/> `bigquery.taxonomies.update` |
| Assign/remove policy tags from columns | `bigquery.tables.updateTag` |
| Edit table description | `bigquery.tables.update` |
| Edit column description | `bigquery.tables.update` |
| Assign/remove labels from tables | `bigquery.tables.update` |

## Enabling BigQuery Sync Automation

1. **Navigate to Automations**: Click on 'Govern' > 'Automations' in the navigation bar.

<p align="left">
<img width="30%" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/automation/saas/automations-nav-link.png"/>
</p>

2. **Create An Automation**: Click on 'Create' and select 'BigQuery Tag Propagation'.

<p align="left">
<img width="50%" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/automation/saas/bigquery-propagation/automation-type.png"/>
</p>

3. **Configure Automation**:

1. **Select a Propagation Action**

<p align="left">
<img width="50%" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/automation/saas/bigquery-propagation/automation-form.png"/>
</p>

| Propagation Type | DataHub Entity | BigQuery Entity | Note |
| -------- | ------- | ------- | ------- |
| Table Tags as Labels | [Table Tag](https://datahubproject.io/docs/tags/) | [BigQuery Label](https://cloud.google.com/bigquery/docs/labels-intro) | - |
| Column Glossary Terms as Policy Tags | [Glossary Term on Table Column](https://datahubproject.io/docs/0.14.0/glossary/business-glossary/) | [Policy Tag](https://cloud.google.com/bigquery/docs/best-practices-policy-tags) | <ul><li>Assigned Policy tags are created under DataHub taxonomy.</li></ul><ul><li>Only the latest assigned glossary term set as policy tag. BigQuery only supports one assigned policy tag.</li></ul> <ul><li>Policy Tags are not synced to DataHub as glossary term from BigQuery.</li></ul>
| Table Descriptions | [Table Description](https://datahubproject.io/docs/api/tutorials/descriptions/) | Table Description | - |
| Column Descriptions | [Column Description](https://datahubproject.io/docs/api/tutorials/descriptions/) | Column Description | - |

:::note

You can limit propagation based on specific Tags and Glossary Terms. If none are selected, ALL Tags or Glossary Terms will be automatically propagated to BigQuery tables and columns. (The recommended approach is to not specify a filter to avoid inconsistent states.)

:::

:::note

- BigQuery supports only one Policy Tag per table field. Consequently, the most recently assigned Glossary Term will be set as the Policy Tag for that field.
- Policy Tags cannot be applied to fields in External tables. Therefore, if a Glossary Term is assigned to a field in an External table, it will not be applied.

:::

2. **Fill in the required fields to connect to BigQuery, along with the name, description, and category**

<p align="left">
<img width="50%" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/automation/saas/bigquery-propagation/connection_config.png"/>
</p>

3. **Finally, click 'Save and Run' to start the automation**

## Propagating for Existing Assets

To ensure that all existing table Tags and Column Glossary Terms are propagated to BigQuery, you can back-fill historical data for existing assets. Note that the initial back-filling process may take some time, depending on the number of BigQuery assets you have.

To do so, follow these steps:

1. Navigate to the Automation you created in Step 3 above
2. Click the 3-dot "More" menu

<p align="left">
<img width="30%" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/automation/saas/automation-more-menu.png"/>
</p>

3. Click "Initialize"

<p align="left">
<img width="50%" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/automation/saas/automation-initialize.png"/>
</p>

This one-time step will kick off the back-filling process for existing descriptions. If you only want to begin propagating descriptions going forward, you can skip this step.

## Viewing Propagated Tags

You can view propagated Tags inside the BigQuery UI to confirm the automation is working as expected.

<p align="left">
<img width="50%" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/automation/saas/bigquery-propagation/labels.png"/>
</p>

## Troubleshooting BigQuery Propagation

### Q: What metadata elements support bi-directional syncing between DataHub and BigQuery?

A: The following metadata elements support bi-directional syncing:

- Tags (via BigQuery Labels): Changes made in either DataHub Table Tags or BigQuery Table Labels will be reflected in the other system.
- Descriptions: Both table and column descriptions are synced bi-directionally.

### Q: Are Policy Tags bi-directionally synced?

A: No, BigQuery Policy Tags are only propagated from DataHub to BigQuery, not vice versa. This means that Policy Tags should be mastered in DataHub using the [Business Glossary](https://datahubproject.io/docs/glossary/business-glossary/).

It is recommended to avoid enabling `extract_policy_tags_from_catalog` during
ingestion, as this will ingest policy tags as BigQuery labels. Our sync process
propagates Glossary Term assignments to BigQuery as Policy Tags.

In a future release, we plan to remove this restriction to support full bi-directional syncing.

### Q: What metadata is synced from BigQuery to DataHub during ingestion?

A: During ingestion from BigQuery:

- Tags and descriptions from BigQuery will be ingested into DataHub.
- Existing Policy Tags in BigQuery will not overwrite or create Business Glossary Terms in DataHub. It only syncs assigned column Glossary Terms from DataHub to BigQuery.

### Q: Where should I manage my Business Glossary?

A: The expectation is that you author and manage the glossary in DataHub. Policy tags in BigQuery should be treated as a reflection of the DataHub glossary, not as the primary source of truth.

### Q: Are there any limitations with Policy Tags in BigQuery?

A: Yes, BigQuery only supports one Policy Tag per column. If multiple glossary
terms are assigned to a column in DataHub, only the most recently assigned term
will be set as the policy tag in BigQuery. To reduce the scope of conflicts, you
can set up filters in the BigQuery Metadata Sync to only synchronize terms from
a specific area of the Business Glossary.

### Q: How frequently are changes synced between DataHub and BigQuery?

A: From DataHub to BigQuery, the sync happens instantly (within a few seconds)
when the change occurs in DataHub.

From BigQuery to DataHub, changes are synced when ingestion occurs, and the frequency depends on your custom ingestion schedule. (Visible on the **Integrations** page)

### Q: What happens if there's a conflict between DataHub and BigQuery metadata?

A: In case of conflicts (e.g., a tag is modified in both systems between syncs), the DataHub version will typically take precedence. However, it's best to make changes in one system consistently to avoid potential conflicts.

### Q: What permissions are required for bi-directional syncing?

A: Ensure that the service account used for the automation has the necessary permissions in both DataHub and BigQuery to read and write metadata. See the required BigQuery permissions at the top of the page.

## Related Documentation

- [DataHub Tags Documentation](https://datahubproject.io/docs/tags/)
- [DataHub Glossary Documentation](https://datahubproject.io/docs/glossary/business-glossary/)
- [BigQuery Labels Documentation](https://cloud.google.com/bigquery/docs/labels-intro)
- [BigQuery Policy Tags Documentation](https://cloud.google.com/bigquery/docs/best-practices-policy-tags)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
@AllArgsConstructor
@EqualsAndHashCode
public abstract class PluginSpec {
protected static String ENTITY_WILDCARD = "*";
protected static String WILDCARD = "*";

@Nonnull
public abstract AspectPluginConfig getConfig();
Expand Down Expand Up @@ -50,7 +50,7 @@ protected boolean isEntityAspectSupported(
return (getConfig().getSupportedEntityAspectNames().stream()
.anyMatch(
supported ->
ENTITY_WILDCARD.equals(supported.getEntityName())
WILDCARD.equals(supported.getEntityName())
|| supported.getEntityName().equals(entityName)))
&& isAspectSupported(aspectName);
}
Expand All @@ -59,13 +59,16 @@ protected boolean isAspectSupported(@Nonnull String aspectName) {
return getConfig().getSupportedEntityAspectNames().stream()
.anyMatch(
supported ->
ENTITY_WILDCARD.equals(supported.getAspectName())
WILDCARD.equals(supported.getAspectName())
|| supported.getAspectName().equals(aspectName));
}

protected boolean isChangeTypeSupported(@Nullable ChangeType changeType) {
return (changeType == null && getConfig().getSupportedOperations().isEmpty())
|| getConfig().getSupportedOperations().stream()
.anyMatch(supported -> supported.equalsIgnoreCase(String.valueOf(changeType)));
.anyMatch(
supported ->
WILDCARD.equals(supported)
|| supported.equalsIgnoreCase(String.valueOf(changeType)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,11 @@ async def _get_flow_run_graph(self, flow_run_id: str) -> Optional[List[Dict]]:
The flow run graph in json format.
"""
try:
response = orchestration.get_client()._client.get(
response_coroutine = orchestration.get_client()._client.get(
f"/flow_runs/{flow_run_id}/graph"
)

if asyncio.iscoroutine(response):
response = await response
response = await response_coroutine

if hasattr(response, "json"):
response_json = response.json()
Expand Down Expand Up @@ -410,10 +409,9 @@ async def get_flow_run(flow_run_id: UUID) -> FlowRun:
if not hasattr(client, "read_flow_run"):
raise ValueError("Client does not support async read_flow_run method")

response = client.read_flow_run(flow_run_id=flow_run_id)
response_coroutine = client.read_flow_run(flow_run_id=flow_run_id)

if asyncio.iscoroutine(response):
response = await response
response = await response_coroutine

return FlowRun.parse_obj(response)

Expand Down Expand Up @@ -477,10 +475,9 @@ async def get_task_run(task_run_id: UUID) -> TaskRun:
if not hasattr(client, "read_task_run"):
raise ValueError("Client does not support async read_task_run method")

response = client.read_task_run(task_run_id=task_run_id)
response_coroutine = client.read_task_run(task_run_id=task_run_id)

if asyncio.iscoroutine(response):
response = await response
response = await response_coroutine

return TaskRun.parse_obj(response)

Expand Down
Loading
Loading