Skip to content

Commit

Permalink
Merge branch 'master' into kafka-sink-errors
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Oct 25, 2023
2 parents b9b1437 + 2d1584b commit 0c5e832
Show file tree
Hide file tree
Showing 12 changed files with 205 additions and 54 deletions.
68 changes: 36 additions & 32 deletions docs-website/docusaurus.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ module.exports = {
projectName: "datahub", // Usually your repo name.
staticDirectories: ["static", "genStatic"],
stylesheets: ["https://fonts.googleapis.com/css2?family=Manrope:wght@400;500;700&display=swap"],
scripts: [
{
src: "https://tools.luckyorange.com/core/lo.js?site-id=28ea8a38",
async: true,
defer: true,
},
],
noIndex: isSaas,
customFields: {
isSaas: isSaas,
Expand Down Expand Up @@ -50,44 +57,41 @@ module.exports = {
position: "right",
},
{
to: "https://demo.datahubproject.io/",
label: "Demo",
position: "right",
},
{
href: "https://blog.datahubproject.io/",
label: "Blog",
position: "right",
},
{
href: "https://feature-requests.datahubproject.io/roadmap",
label: "Roadmap",
type: "dropdown",
label: "Resources",
position: "right",
items: [
{
href: "https://demo.datahubproject.io/",
label: "Demo",
},
{
href: "https://blog.datahubproject.io/",
label: "Blog",
},
{
href: "https://feature-requests.datahubproject.io/roadmap",
label: "Roadmap",
},
{
href: "https://slack.datahubproject.io",
label: "Slack",
},
{
href: "https://github.com/datahub-project/datahub",
label: "GitHub",
},
{
href: "https://www.youtube.com/channel/UC3qFQC5IiwR5fvWEqi_tJ5w",
label: "YouTube",
},
],
},
{
type: "docsVersionDropdown",
position: "right",
position: "left",
dropdownActiveClassDisabled: true,
},
{
href: "https://slack.datahubproject.io",
"aria-label": "Slack",
position: "right",
className: "item__icon item__slack",
},
{
href: "https://github.com/datahub-project/datahub",
"aria-label": "GitHub",
position: "right",
className: "item__icon item__github",
},

{
href: "https://www.youtube.com/channel/UC3qFQC5IiwR5fvWEqi_tJ5w",
"aria-label": "YouTube",
position: "right",
className: "item__icon item__youtube",
},
],
},
footer: {
Expand Down
27 changes: 18 additions & 9 deletions docs-website/src/styles/global.scss
Original file line number Diff line number Diff line change
Expand Up @@ -144,20 +144,29 @@ div[class^="announcementBar"] {

/** Navbar */

@media only screen and (max-width: 1050px) {
.navbar__toggle {
display: inherit;
}
.navbar__item {
display: none;
}
}

.navbar {
.navbar__logo {
height: 3rem;
}

.navbar__link {
align-items: center;
margin: 0 1rem 0;
padding: 0;
border-bottom: 2px solid transparent;
}

.dropdown > .navbar__link:after {
top: -1px;
border-width: 0.3em 0.3em 0;
margin-left: 0.4em;
}

.navbar__link--active {
border-bottom-color: var(--ifm-navbar-link-hover-color);
}
.navbar__item {
padding: 0.25rem 0;
svg[class*="iconExternalLink"] {
display: none;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import { translate } from "@docusaurus/Translate";
import { useLocation } from "@docusaurus/router";
import DefaultNavbarItem from "@theme/NavbarItem/DefaultNavbarItem";
import DropdownNavbarItem from "@theme/NavbarItem/DropdownNavbarItem";

import styles from "./styles.module.scss";

const getVersionMainDoc = (version) => version.docs.find((doc) => doc.id === version.mainDocId);
export default function DocsVersionDropdownNavbarItem({
mobile,
Expand Down Expand Up @@ -60,6 +63,7 @@ export default function DocsVersionDropdownNavbarItem({
return (
<DropdownNavbarItem
{...props}
className={styles.versionNavItem}
mobile={mobile}
label={dropdownLabel}
to={false} // This component is Swizzled to disable the link here
Expand Down
8 changes: 8 additions & 0 deletions docs-website/src/theme/NavbarItem/styles.module.scss
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
.versionNavItem {
margin-left: 0 !important;
padding: 0.2em 1em !important;
display: block;
border-radius: var(--ifm-button-border-radius) !important;
color: var(--ifm-menu-color-active);
background: var(--ifm-menu-color-background-active);
}
41 changes: 40 additions & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,50 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
by Looker and LookML source connectors.
- #8853 - The Airflow plugin no longer supports Airflow 2.0.x or Python 3.7. See the docs for more details.
- #8853 - Introduced the Airflow plugin v2. If you're using Airflow 2.3+, the v2 plugin will be enabled by default, and so you'll need to switch your requirements to include `pip install 'acryl-datahub-airflow-plugin[plugin-v2]'`. To continue using the v1 plugin, set the `DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN` environment variable to `true`.
- #8943 The Unity Catalog ingestion source has a new option `include_metastore`, which will cause all urns to be changed when disabled.
- #8943 - The Unity Catalog ingestion source has a new option `include_metastore`, which will cause all urns to be changed when disabled.
This is currently enabled by default to preserve compatibility, but will be disabled by default and then removed in the future.
If stateful ingestion is enabled, simply setting `include_metastore: false` will perform all required cleanup.
Otherwise, we recommend soft deleting all databricks data via the DataHub CLI:
`datahub delete --platform databricks --soft` and then reingesting with `include_metastore: false`.
- #8846 - Changed enum values in resource filters used by policies. `RESOURCE_TYPE` became `TYPE` and `RESOURCE_URN` became `URN`.
Any existing policies using these filters (i.e. defined for particular `urns` or `types` such as `dataset`) need to be upgraded
manually, for example by retrieving their respective `dataHubPolicyInfo` aspect and changing part using filter i.e.
```yaml
"resources": {
"filter": {
"criteria": [
{
"field": "RESOURCE_TYPE",
"condition": "EQUALS",
"values": [
"dataset"
]
}
]
}
```
into
```yaml
"resources": {
"filter": {
"criteria": [
{
"field": "TYPE",
"condition": "EQUALS",
"values": [
"dataset"
]
}
]
}
```
for example, using `datahub put` command. Policies can be also removed and re-created via UI.
- #9077 - The BigQuery ingestion source by default sets `match_fully_qualified_names: true`.
This means that any `dataset_pattern` or `schema_pattern` specified will be matched on the fully
qualified dataset name, i.e. `<project_name>.<dataset_name>`. If this is not the case, please
update your pattern (e.g. prepend your old dataset pattern with `.*\.` which matches the project part),
or set `match_fully_qualified_names: false` in your recipe. However, note that
setting this to `false` is deprecated and this flag will be removed entirely in a future release.

### Potential Downtime

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from typing import Optional

from pydantic import Field, root_validator
Expand Down Expand Up @@ -67,9 +68,25 @@ class DataHubSourceConfig(StatefulIngestionConfigBase):
),
)

pull_from_datahub_api: bool = Field(
default=False,
description="Use the DataHub API to fetch versioned aspects.",
hidden_from_docs=True,
)

max_workers: int = Field(
default=5 * (os.cpu_count() or 4),
description="Number of worker threads to use for datahub api ingestion.",
hidden_from_docs=True,
)

@root_validator
def check_ingesting_data(cls, values):
if not values.get("database_connection") and not values.get("kafka_connection"):
if (
not values.get("database_connection")
and not values.get("kafka_connection")
and not values.get("pull_from_datahub_api")
):
raise ValueError(
"Your current config will not ingest any data."
" Please specify at least one of `database_connection` or `kafka_connection`, ideally both."
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging
from concurrent import futures
from typing import Dict, Iterable, List

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.graph.filters import RemovedStatusFilter
from datahub.ingestion.source.datahub.config import DataHubSourceConfig
from datahub.ingestion.source.datahub.report import DataHubSourceReport
from datahub.metadata._schema_classes import _Aspect

logger = logging.getLogger(__name__)

# Should work for at least mysql, mariadb, postgres
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f"


class DataHubApiReader:
def __init__(
self,
config: DataHubSourceConfig,
report: DataHubSourceReport,
graph: DataHubGraph,
):
self.config = config
self.report = report
self.graph = graph

def get_aspects(self) -> Iterable[MetadataChangeProposalWrapper]:
urns = self.graph.get_urns_by_filter(
status=RemovedStatusFilter.ALL,
batch_size=self.config.database_query_batch_size,
)
tasks: List[futures.Future[Iterable[MetadataChangeProposalWrapper]]] = []
with futures.ThreadPoolExecutor(
max_workers=self.config.max_workers
) as executor:
for urn in urns:
tasks.append(executor.submit(self._get_aspects_for_urn, urn))
for task in futures.as_completed(tasks):
yield from task.result()

def _get_aspects_for_urn(self, urn: str) -> Iterable[MetadataChangeProposalWrapper]:
aspects: Dict[str, _Aspect] = self.graph.get_entity_semityped(urn) # type: ignore
for aspect in aspects.values():
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=aspect,
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.datahub.config import DataHubSourceConfig
from datahub.ingestion.source.datahub.datahub_api_reader import DataHubApiReader
from datahub.ingestion.source.datahub.datahub_database_reader import (
DataHubDatabaseReader,
)
Expand Down Expand Up @@ -58,6 +59,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
logger.info(f"Ingesting DataHub metadata up until {self.report.stop_time}")
state = self.stateful_ingestion_handler.get_last_run_state()

if self.config.pull_from_datahub_api:
yield from self._get_api_workunits()

if self.config.database_connection is not None:
yield from self._get_database_workunits(
from_createdon=state.database_createdon_datetime
Expand Down Expand Up @@ -139,6 +143,18 @@ def _get_kafka_workunits(
)
self._commit_progress(i)

def _get_api_workunits(self) -> Iterable[MetadataWorkUnit]:
if self.ctx.graph is None:
self.report.report_failure(
"datahub_api",
"Specify datahub_api on your ingestion recipe to ingest from the DataHub API",
)
return

reader = DataHubApiReader(self.config, self.report, self.ctx.graph)
for mcp in reader.get_aspects():
yield mcp.as_workunit()

def _commit_progress(self, i: Optional[int] = None) -> None:
"""Commit progress to stateful storage, if there have been no errors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
gen_database_key,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField
from datahub.metadata.schema_classes import RecordTypeClass
from datahub.metadata.schema_classes import MapTypeClass, RecordTypeClass
from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column
from datahub.utilities.sqlalchemy_type_converter import (
MapType,
Expand All @@ -46,7 +46,9 @@

logger = logging.getLogger(__name__)

assert STRUCT, "required type modules are not available"
register_custom_type(STRUCT, RecordTypeClass)
register_custom_type(MapType, MapTypeClass)


class CustomAthenaRestDialect(AthenaRestDialect):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
DatasetLineageTypeClass,
DatasetPropertiesClass,
GlobalTagsClass,
MapTypeClass,
SubTypesClass,
TagAssociationClass,
UpstreamClass,
Expand All @@ -90,7 +89,6 @@
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.sqlalchemy_query_combiner import SQLAlchemyQueryCombinerReport
from datahub.utilities.sqlalchemy_type_converter import MapType

if TYPE_CHECKING:
from datahub.ingestion.source.ge_data_profiler import (
Expand Down Expand Up @@ -140,6 +138,7 @@ class SqlWorkUnit(MetadataWorkUnit):


_field_type_mapping: Dict[Type[TypeEngine], Type] = {
# Note: to add dialect-specific types to this mapping, use the `register_custom_type` function.
types.Integer: NumberTypeClass,
types.Numeric: NumberTypeClass,
types.Boolean: BooleanTypeClass,
Expand All @@ -156,8 +155,6 @@ class SqlWorkUnit(MetadataWorkUnit):
types.DATETIME: TimeTypeClass,
types.TIMESTAMP: TimeTypeClass,
types.JSON: RecordTypeClass,
# additional type definitions that are used by the Athena source
MapType: MapTypeClass, # type: ignore
# Because the postgresql dialect is used internally by many other dialects,
# we add some postgres types here. This is ok to do because the postgresql
# dialect is built-in to sqlalchemy.
Expand Down
Loading

0 comments on commit 0c5e832

Please sign in to comment.