Skip to content

Commit

Permalink
Merge branch 'master' into master+ing-200-view-lineage-for-sql-sources
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Oct 26, 2023
2 parents 337ca34 + 2ebf33e commit e60a7f4
Show file tree
Hide file tree
Showing 30 changed files with 645 additions and 112 deletions.
1 change: 1 addition & 0 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ module.exports = {
"docs/managed-datahub/observe/freshness-assertions",
"docs/managed-datahub/observe/volume-assertions",
"docs/managed-datahub/observe/custom-sql-assertions",
"docs/managed-datahub/observe/column-assertions",
],
},
{
Expand Down
10 changes: 6 additions & 4 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

## Next

- #9010 - In Redshift source's config `incremental_lineage` is set default to off.

### Breaking Changes

- #8810 - Removed support for SQLAlchemy 1.3.x. Only SQLAlchemy 1.4.x is supported now.
Expand Down Expand Up @@ -51,10 +53,10 @@ into
for example, using `datahub put` command. Policies can be also removed and re-created via UI.
- #9077 - The BigQuery ingestion source by default sets `match_fully_qualified_names: true`.
This means that any `dataset_pattern` or `schema_pattern` specified will be matched on the fully
qualified dataset name, i.e. `<project_name>.<dataset_name>`. If this is not the case, please
update your pattern (e.g. prepend your old dataset pattern with `.*\.` which matches the project part),
or set `match_fully_qualified_names: false` in your recipe. However, note that
setting this to `false` is deprecated and this flag will be removed entirely in a future release.
qualified dataset name, i.e. `<project_name>.<dataset_name>`. We attempt to support the old
pattern format by prepending `.*\\.` to dataset patterns lacking a period, so in most cases this
should not cause any issues. However, if you have a complex dataset pattern, we recommend you
manually convert it to the fully qualified format to avoid any potential issues.

### Potential Downtime

Expand Down
358 changes: 358 additions & 0 deletions docs/managed-datahub/observe/column-assertions.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@
* List of entity field types to fetch for a given entity
*/
public enum EntityFieldType {

/**
* Type of the entity (e.g. dataset, chart)
* @deprecated
*/
@Deprecated
RESOURCE_URN,
/**
* Urn of the entity
* @deprecated
*/
@Deprecated
RESOURCE_TYPE,
/**
* Type of the entity (e.g. dataset, chart)
*/
Expand Down
3 changes: 2 additions & 1 deletion metadata-ingestion/scripts/avro_codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ def add_name(self, name_attr, space_attr, new_schema):
return encoded


autogen_header = """# flake8: noqa
autogen_header = """# mypy: ignore-errors
# flake8: noqa
# This file is autogenerated by /metadata-ingestion/scripts/avro_codegen.py
# Do not modify manually!
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/scripts/modeldocgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,8 @@ def strip_types(field_path: str) -> str:
field_objects = []
for f in entity_fields:
field = avro.schema.Field(
type=f["type"],
name=f["name"],
f["type"],
f["name"],
has_default=False,
)
field_objects.append(field)
Expand Down
10 changes: 5 additions & 5 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"expandvars>=0.6.5",
"avro-gen3==0.7.11",
# "avro-gen3 @ git+https://github.com/acryldata/avro_gen@master#egg=avro-gen3",
"avro>=1.10.2,<1.11",
"avro>=1.11.3,<1.12",
"python-dateutil>=2.8.0",
"tabulate",
"progressbar2",
Expand Down Expand Up @@ -359,10 +359,10 @@
"redshift": sql_common
| redshift_common
| usage_common
| sqlglot_lib
| {"redshift-connector"},
"redshift-legacy": sql_common | redshift_common,
"redshift-usage-legacy": sql_common | usage_common | redshift_common,
| {"redshift-connector"}
| sqlglot_lib,
"redshift-legacy": sql_common | redshift_common | sqlglot_lib,
"redshift-usage-legacy": sql_common | redshift_common | sqlglot_lib | usage_common,
"s3": {*s3_base, *data_lake_profiling},
"gcs": {*s3_base, *data_lake_profiling},
"sagemaker": aws_common,
Expand Down
109 changes: 71 additions & 38 deletions metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
import json
import logging
from typing import Any, Callable, Dict, Generator, List, Optional, Type, Union
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Mapping,
Optional,
Type,
Union,
cast,
overload,
)

import avro.schema

Expand Down Expand Up @@ -54,6 +66,8 @@
avro.schema.PrimitiveSchema,
]

SchemaOrField = Union[avro.schema.Schema, avro.schema.Field]

FieldStack = List[avro.schema.Field]

# The latest avro code contains this type definition in a compatibility module,
Expand Down Expand Up @@ -124,16 +138,22 @@ def __init__(
self._meta_mapping_processor = meta_mapping_processor
self._schema_tags_field = schema_tags_field
self._tag_prefix = tag_prefix

# Map of avro schema type to the conversion handler
self._avro_type_to_mce_converter_map: Dict[
avro.schema.Schema,
Callable[[ExtendedAvroNestedSchemas], Generator[SchemaField, None, None]],
# TODO: Clean up this type... perhaps refactor
self._avro_type_to_mce_converter_map: Mapping[
Union[
Type[avro.schema.Schema],
Type[avro.schema.Field],
Type[avro.schema.LogicalSchema],
],
Callable[[SchemaOrField], Iterable[SchemaField]],
] = {
avro.schema.RecordSchema: self._gen_from_non_field_nested_schemas,
avro.schema.UnionSchema: self._gen_from_non_field_nested_schemas,
avro.schema.ArraySchema: self._gen_from_non_field_nested_schemas,
avro.schema.MapSchema: self._gen_from_non_field_nested_schemas,
avro.schema.Field: self._gen_nested_schema_from_field,
avro.schema.Field: self._gen_nested_schema_from_field, # type: ignore
avro.schema.PrimitiveSchema: self._gen_non_nested_to_mce_fields,
avro.schema.FixedSchema: self._gen_non_nested_to_mce_fields,
avro.schema.EnumSchema: self._gen_non_nested_to_mce_fields,
Expand All @@ -142,20 +162,22 @@ def __init__(

@staticmethod
def _get_type_name(
avro_schema: avro.schema.Schema, logical_if_present: bool = False
avro_schema: SchemaOrField, logical_if_present: bool = False
) -> str:
logical_type_name: Optional[str] = None
if logical_if_present:
logical_type_name = getattr(
avro_schema, "logical_type", None
) or avro_schema.props.get("logicalType")
logical_type_name = cast(
Optional[str],
getattr(avro_schema, "logical_type", None)
or avro_schema.props.get("logicalType"),
)
return logical_type_name or str(
getattr(avro_schema.type, "type", avro_schema.type)
)

@staticmethod
def _get_column_type(
avro_schema: avro.schema.Schema, logical_type: Optional[str]
avro_schema: SchemaOrField, logical_type: Optional[str]
) -> SchemaFieldDataType:
type_name: str = AvroToMceSchemaConverter._get_type_name(avro_schema)
TypeClass: Optional[Type] = AvroToMceSchemaConverter.field_type_mapping.get(
Expand Down Expand Up @@ -186,7 +208,7 @@ def _get_column_type(
)
return dt

def _is_nullable(self, schema: avro.schema.Schema) -> bool:
def _is_nullable(self, schema: SchemaOrField) -> bool:
if isinstance(schema, avro.schema.Field):
return self._is_nullable(schema.type)
if isinstance(schema, avro.schema.UnionSchema):
Expand All @@ -208,7 +230,7 @@ def _strip_namespace(name_or_fullname: str) -> str:
return name_or_fullname.rsplit(".", maxsplit=1)[-1]

@staticmethod
def _get_simple_native_type(schema: ExtendedAvroNestedSchemas) -> str:
def _get_simple_native_type(schema: SchemaOrField) -> str:
if isinstance(schema, (avro.schema.RecordSchema, avro.schema.Field)):
# For Records, fields, always return the name.
return AvroToMceSchemaConverter._strip_namespace(schema.name)
Expand All @@ -226,7 +248,7 @@ def _get_simple_native_type(schema: ExtendedAvroNestedSchemas) -> str:
return schema.type

@staticmethod
def _get_type_annotation(schema: ExtendedAvroNestedSchemas) -> str:
def _get_type_annotation(schema: SchemaOrField) -> str:
simple_native_type = AvroToMceSchemaConverter._get_simple_native_type(schema)
if simple_native_type.startswith("__struct_"):
simple_native_type = "struct"
Expand All @@ -237,10 +259,24 @@ def _get_type_annotation(schema: ExtendedAvroNestedSchemas) -> str:
else:
return f"[type={simple_native_type}]"

@staticmethod
@overload
def _get_underlying_type_if_option_as_union(
schema: SchemaOrField, default: SchemaOrField
) -> SchemaOrField:
...

@staticmethod
@overload
def _get_underlying_type_if_option_as_union(
schema: SchemaOrField, default: Optional[SchemaOrField] = None
) -> Optional[SchemaOrField]:
...

@staticmethod
def _get_underlying_type_if_option_as_union(
schema: AvroNestedSchemas, default: Optional[AvroNestedSchemas] = None
) -> AvroNestedSchemas:
schema: SchemaOrField, default: Optional[SchemaOrField] = None
) -> Optional[SchemaOrField]:
if isinstance(schema, avro.schema.UnionSchema) and len(schema.schemas) == 2:
(first, second) = schema.schemas
if first.type == AVRO_TYPE_NULL:
Expand All @@ -258,8 +294,8 @@ class SchemaFieldEmissionContextManager:

def __init__(
self,
schema: avro.schema.Schema,
actual_schema: avro.schema.Schema,
schema: SchemaOrField,
actual_schema: SchemaOrField,
converter: "AvroToMceSchemaConverter",
description: Optional[str] = None,
default_value: Optional[str] = None,
Expand All @@ -275,7 +311,7 @@ def __enter__(self):
self._converter._prefix_name_stack.append(type_annotation)
return self

def emit(self) -> Generator[SchemaField, None, None]:
def emit(self) -> Iterable[SchemaField]:
if (
not isinstance(
self._actual_schema,
Expand Down Expand Up @@ -307,7 +343,7 @@ def emit(self) -> Generator[SchemaField, None, None]:

description = self._description
if not description and actual_schema.props.get("doc"):
description = actual_schema.props.get("doc")
description = cast(Optional[str], actual_schema.props.get("doc"))

if self._default_value is not None:
description = f"{description if description else ''}\nField default value: {self._default_value}"
Expand All @@ -320,12 +356,12 @@ def emit(self) -> Generator[SchemaField, None, None]:
native_data_type = native_data_type[
slice(len(type_prefix), len(native_data_type) - 1)
]
native_data_type = actual_schema.props.get(
"native_data_type", native_data_type
native_data_type = cast(
str, actual_schema.props.get("native_data_type", native_data_type)
)

field_path = self._converter._get_cur_field_path()
merged_props = {}
merged_props: Dict[str, Any] = {}
merged_props.update(self._schema.other_props)
merged_props.update(schema.other_props)

Expand Down Expand Up @@ -363,12 +399,13 @@ def emit(self) -> Generator[SchemaField, None, None]:

meta_terms_aspect = meta_aspects.get(Constants.ADD_TERM_OPERATION)

logical_type_name: Optional[str] = (
logical_type_name: Optional[str] = cast(
Optional[str],
# logicalType nested inside type
getattr(actual_schema, "logical_type", None)
or actual_schema.props.get("logicalType")
# bare logicalType
or self._actual_schema.props.get("logicalType")
or self._actual_schema.props.get("logicalType"),
)

field = SchemaField(
Expand All @@ -392,14 +429,12 @@ def emit(self) -> Generator[SchemaField, None, None]:
def __exit__(self, exc_type, exc_val, exc_tb):
self._converter._prefix_name_stack.pop()

def _get_sub_schemas(
self, schema: ExtendedAvroNestedSchemas
) -> Generator[avro.schema.Schema, None, None]:
def _get_sub_schemas(self, schema: SchemaOrField) -> Iterable[SchemaOrField]:
"""Responsible for generation for appropriate sub-schemas for every nested AVRO type."""

def gen_items_from_list_tuple_or_scalar(
val: Any,
) -> Generator[avro.schema.Schema, None, None]:
) -> Iterable[avro.schema.Schema]:
if isinstance(val, (list, tuple)):
for i in val:
yield i
Expand Down Expand Up @@ -433,7 +468,7 @@ def gen_items_from_list_tuple_or_scalar(
def _gen_nested_schema_from_field(
self,
field: avro.schema.Field,
) -> Generator[SchemaField, None, None]:
) -> Iterable[SchemaField]:
"""Handles generation of MCE SchemaFields for an AVRO Field type."""
# NOTE: Here we only manage the field stack and trigger MCE Field generation from this field's type.
# The actual emitting of a field happens when
Expand All @@ -447,7 +482,7 @@ def _gen_nested_schema_from_field(

def _gen_from_last_field(
self, schema_to_recurse: Optional[AvroNestedSchemas] = None
) -> Generator[SchemaField, None, None]:
) -> Iterable[SchemaField]:
"""Emits the field most-recent field, optionally triggering sub-schema generation under the field."""
last_field_schema = self._fields_stack[-1]
# Generate the custom-description for the field.
Expand All @@ -467,8 +502,8 @@ def _gen_from_last_field(
yield from self._to_mce_fields(sub_schema)

def _gen_from_non_field_nested_schemas(
self, schema: AvroNestedSchemas
) -> Generator[SchemaField, None, None]:
self, schema: SchemaOrField
) -> Iterable[SchemaField]:
"""Handles generation of MCE SchemaFields for all standard AVRO nested types."""
# Handle recursive record definitions
recurse: bool = True
Expand Down Expand Up @@ -511,8 +546,8 @@ def _gen_from_non_field_nested_schemas(
yield from self._to_mce_fields(sub_schema)

def _gen_non_nested_to_mce_fields(
self, schema: AvroNonNestedSchemas
) -> Generator[SchemaField, None, None]:
self, schema: SchemaOrField
) -> Iterable[SchemaField]:
"""Handles generation of MCE SchemaFields for non-nested AVRO types."""
with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager(
schema,
Expand All @@ -521,9 +556,7 @@ def _gen_non_nested_to_mce_fields(
) as non_nested_emitter:
yield from non_nested_emitter.emit()

def _to_mce_fields(
self, avro_schema: avro.schema.Schema
) -> Generator[SchemaField, None, None]:
def _to_mce_fields(self, avro_schema: SchemaOrField) -> Iterable[SchemaField]:
# Invoke the relevant conversion handler for the schema element type.
schema_type = (
type(avro_schema)
Expand All @@ -541,7 +574,7 @@ def to_mce_fields(
meta_mapping_processor: Optional[OperationProcessor] = None,
schema_tags_field: Optional[str] = None,
tag_prefix: Optional[str] = None,
) -> Generator[SchemaField, None, None]:
) -> Iterable[SchemaField]:
"""
Converts a key or value type AVRO schema string to appropriate MCE SchemaFields.
:param avro_schema_string: String representation of the AVRO schema.
Expand Down
10 changes: 9 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,15 @@ def run(self) -> None:
record_envelopes = self.extractor.get_records(wu)
for record_envelope in self.transform(record_envelopes):
if not self.dry_run:
self.sink.write_record_async(record_envelope, callback)
try:
self.sink.write_record_async(
record_envelope, callback
)
except Exception as e:
# In case the sink's error handling is bad, we still want to report the error.
self.sink.report.report_failure(
f"Failed to write record: {e}"
)

except RuntimeError:
raise
Expand Down
Loading

0 comments on commit e60a7f4

Please sign in to comment.