Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Nov 8, 2024
2 parents 07bef2a + 8ca8fd9 commit 24a1bca
Show file tree
Hide file tree
Showing 24 changed files with 676 additions and 542 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ static List<UpgradeStep> generateSteps(
.getBootstrap()
.getTemplates()
.stream()
.map(cfg -> cfg.withOverride(opContext.getObjectMapper()))
.filter(cfg -> cfg.isBlocking() == isBlocking)
.map(cfg -> new BootstrapMCPStep(opContext, entityService, cfg))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package com.linkedin.datahub.upgrade.system.bootstrapmcps.model;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@AllArgsConstructor
@NoArgsConstructor
Expand All @@ -23,6 +26,7 @@ public static class Bootstrap {
private List<MCPTemplate> templates;
}

@Slf4j
@AllArgsConstructor
@NoArgsConstructor
@Data
Expand All @@ -36,5 +40,19 @@ public static class MCPTemplate {
@Builder.Default private boolean optional = false;
@Nonnull private String mcps_location;
@Nullable private String values_env;
@Nullable private String revision_env;

public MCPTemplate withOverride(ObjectMapper objectMapper) {
if (revision_env != null) {
String overrideJson = System.getenv().getOrDefault(revision_env, "{}");
try {
return objectMapper.readerForUpdating(this).readValue(overrideJson);
} catch (IOException e) {
log.error("Error applying override {} to {}", overrideJson, this);
throw new RuntimeException(e);
}
}
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.UrnUtils;
Expand All @@ -17,6 +18,7 @@
import io.datahubproject.test.metadata.context.TestOperationContexts;
import java.io.IOException;
import java.util.List;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Listeners;
import org.testng.annotations.Test;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
Expand All @@ -28,10 +30,17 @@ public class BootstrapMCPUtilTest {
static final OperationContext OP_CONTEXT =
TestOperationContexts.systemContextNoSearchAuthorization();
private static final String DATAHUB_TEST_VALUES_ENV = "DATAHUB_TEST_VALUES_ENV";
private static final String DATAHUB_TEST_REVISION_ENV = "DATAHUB_TEST_REVISION_ENV";
private static final AuditStamp TEST_AUDIT_STAMP = AuditStampUtils.createDefaultAuditStamp();

@SystemStub private EnvironmentVariables environmentVariables;

@BeforeMethod
private void resetEnvironment() {
environmentVariables.remove(DATAHUB_TEST_VALUES_ENV);
environmentVariables.remove(DATAHUB_TEST_REVISION_ENV);
}

@Test
public void testResolveYamlConf() throws IOException {
BootstrapMCPConfigFile initConfig =
Expand All @@ -51,9 +60,28 @@ public void testResolveYamlConf() throws IOException {
}

@Test
public void testResolveMCPTemplateDefaults() throws IOException {
environmentVariables.remove(DATAHUB_TEST_VALUES_ENV);
public void testResolveYamlConfOverride() throws IOException {
environmentVariables.set(DATAHUB_TEST_REVISION_ENV, "{\"version\":\"2024110600\"}");

BootstrapMCPConfigFile initConfig =
BootstrapMCPUtil.resolveYamlConf(
OP_CONTEXT, "bootstrapmcp/test.yaml", BootstrapMCPConfigFile.class);
assertEquals(initConfig.getBootstrap().getTemplates().size(), 1);

BootstrapMCPConfigFile.MCPTemplate template =
initConfig.getBootstrap().getTemplates().get(0).withOverride(new ObjectMapper());
assertEquals(template.getName(), "datahub-test");
assertEquals(template.getVersion(), "2024110600");
assertFalse(template.isForce());
assertFalse(template.isBlocking());
assertTrue(template.isAsync());
assertFalse(template.isOptional());
assertEquals(template.getMcps_location(), "bootstrapmcp/datahub-test-mcp.yaml");
assertEquals(template.getValues_env(), "DATAHUB_TEST_VALUES_ENV");
}

@Test
public void testResolveMCPTemplateDefaults() throws IOException {
BootstrapMCPConfigFile.MCPTemplate template =
BootstrapMCPUtil.resolveYamlConf(
OP_CONTEXT, "bootstrapmcp/test.yaml", BootstrapMCPConfigFile.class)
Expand Down Expand Up @@ -186,8 +214,6 @@ public void testResolveMCPTemplateOverride() throws IOException {

@Test
public void testMCPBatch() throws IOException {
environmentVariables.remove(DATAHUB_TEST_VALUES_ENV);

BootstrapMCPConfigFile.MCPTemplate template =
BootstrapMCPUtil.resolveYamlConf(
OP_CONTEXT, "bootstrapmcp/test.yaml", BootstrapMCPConfigFile.class)
Expand Down
3 changes: 2 additions & 1 deletion datahub-upgrade/src/test/resources/bootstrapmcp/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ bootstrap:
# blocking: false
# async: true
mcps_location: "bootstrapmcp/datahub-test-mcp.yaml"
values_env: "DATAHUB_TEST_VALUES_ENV"
values_env: "DATAHUB_TEST_VALUES_ENV"
revision_env: "DATAHUB_TEST_REVISION_ENV"
22 changes: 22 additions & 0 deletions docs/advanced/bootstrap-mcps.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,28 @@ to the required json structure and stored as a string.
executorId: default
```

## `bootstrap_mcps.yaml` Override

Additionally, the `bootstrap_mcps.yaml` can be overridden.
This might be useful for applying changes to the version when using helm defined template values.

```yaml
bootstrap:
templates:
- name: myMCPTemplate
version: v1
mcps_location: <classpath or file location>
values_env: <value environment variable>
revision_env: REVISION_ENV
```

In the above example, we've added a `revision_env` which allows overriding the MCP bootstrap definition itself (excluding `revision_env`).

In this example we could configure `REVISION_ENV` to contain a timestamp or hash: `{"version":"2024060600"}`
This value can be changed/incremented each time the helm supplied template values change. This ensures the MCP is updated
with the latest values during deployment.


## Known Limitations

* Supported change types:
Expand Down
23 changes: 11 additions & 12 deletions metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,17 @@ def gen_containers(
)

container_urn = container_key.as_urn()

if parent_container_key: # Yield Container aspect first for auto_browse_path_v2
parent_container_urn = make_container_urn(guid=parent_container_key.guid())

# Set database container
parent_container_mcp = MetadataChangeProposalWrapper(
entityUrn=f"{container_urn}",
aspect=ContainerClass(container=parent_container_urn),
)
yield parent_container_mcp.as_workunit()

yield MetadataChangeProposalWrapper(
entityUrn=f"{container_urn}",
aspect=ContainerProperties(
Expand Down Expand Up @@ -276,18 +287,6 @@ def gen_containers(
tags=sorted(tags),
)

if parent_container_key:
parent_container_urn = make_container_urn(
guid=parent_container_key.guid(),
)

# Set database container
parent_container_mcp = MetadataChangeProposalWrapper(
entityUrn=f"{container_urn}",
aspect=ContainerClass(container=parent_container_urn),
)
yield parent_container_mcp.as_workunit()


def add_dataset_to_container(
container_key: KeyType, dataset_urn: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,6 @@ def _process_schema(
logger.warning(
f"Could not create table ref for {table_item.path}: {e}"
)
yield from []
return

if self.config.include_tables:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,18 +276,23 @@ def get_workunits_internal(
logger.info(f"Found {self.report.num_unique_queries} unique queries")

with self.report.audit_log_load_timer, queries_deduped:
i = 0
for _, query_instances in queries_deduped.items():
last_log_time = datetime.now()
last_report_time = datetime.now()
for i, (_, query_instances) in enumerate(queries_deduped.items()):
for query in query_instances.values():
if i > 0 and i % 10000 == 0:
now = datetime.now()
if (now - last_log_time).total_seconds() >= 60:
logger.info(
f"Added {i} query log equeries_dedupedntries to SQL aggregator"
f"Added {i} deduplicated query log entries to SQL aggregator"
)
last_log_time = now

if (now - last_report_time).total_seconds() >= 300:
if self.report.sql_aggregator:
logger.info(self.report.sql_aggregator.as_string())
last_report_time = now

self.aggregator.add(query)
i += 1

yield from auto_workunit(self.aggregator.gen_metadata())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ class DremioSQLQueries:
SYS.JOBS_RECENT
WHERE
STATUS = 'COMPLETED'
AND LENGTH(queried_datasets)>0
AND ARRAY_SIZE(queried_datasets)>0
AND user_name != '$dremio$'
AND query_type not like '%INTERNAL%'
"""
Expand All @@ -251,10 +251,10 @@ class DremioSQLQueries:
SELECT
*
FROM
SYS.PROJECT.HISTORY.JOBS
sys.project.history.jobs
WHERE
STATUS = 'COMPLETED'
AND LENGTH(queried_datasets)>0
AND ARRAY_SIZE(queried_datasets)>0
AND user_name != '$dremio$'
AND query_type not like '%INTERNAL%'
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
DataProcessInstance,
InstanceRunResult,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand Down Expand Up @@ -43,7 +42,6 @@
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
)
from datahub.metadata.schema_classes import StatusClass
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.dataset_urn import DatasetUrn

Expand Down Expand Up @@ -281,15 +279,6 @@ def _get_connector_workunits(
for mcp in datajob.generate_mcp(materialize_iolets=False):
yield mcp.as_workunit()

# Materialize the upstream referenced datasets.
# We assume that the downstreams are materialized by other ingestion sources.
for iolet in datajob.inlets:
# We don't want these to be tracked by stateful ingestion.
yield MetadataChangeProposalWrapper(
entityUrn=str(iolet),
aspect=StatusClass(removed=False),
).as_workunit(is_primary_source=False)

# Map Fivetran's job/sync history entity with Datahub's data process entity
if len(connector.jobs) >= MAX_JOBS_PER_CONNECTOR:
self.report.warning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class User:
groupUserAccessRight: Optional[str] = None

def get_urn_part(self, use_email: bool, remove_email_suffix: bool) -> str:
if use_email:
if use_email and self.emailAddress:
if remove_email_suffix:
return self.emailAddress.split("@")[0]
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,10 @@ def _recurse_into_query(
if upstream_query_ids:
for upstream_query_id in upstream_query_ids:
upstream_query = self._query_map.get(upstream_query_id)
if upstream_query:
if (
upstream_query
and upstream_query.query_id not in composed_of_queries
):
temp_query_lineage_info = _recurse_into_query(
upstream_query, recursion_path
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,38 +226,6 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,38 +234,6 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190",
Expand Down
Empty file.
Empty file.
Loading

0 comments on commit 24a1bca

Please sign in to comment.