Skip to content

Commit

Permalink
fix(ingest/delta-lake): support parsing nested types correctly (datah…
Browse files Browse the repository at this point in the history
  • Loading branch information
dushayntAW authored Mar 6, 2024
1 parent 4fbe814 commit 68a26b4
Show file tree
Hide file tree
Showing 56 changed files with 3,351 additions and 163 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import json
import logging
import os
import time
from typing import Dict, Iterable, List
from typing import Any, Dict, Iterable, List
from urllib.parse import urlparse

from deltalake import DeltaTable
Expand Down Expand Up @@ -35,23 +36,22 @@
read_delta_table,
)
from datahub.ingestion.source.delta_lake.report import DeltaLakeSourceReport
from datahub.ingestion.source.schema_inference.csv_tsv import tableschema_type_map
from datahub.metadata._schema_classes import SchemaFieldClass
from datahub.metadata.com.linkedin.pegasus2avro.common import Status
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
SchemaField,
SchemaFieldDataType,
SchemaMetadata,
)
from datahub.metadata.schema_classes import (
DatasetPropertiesClass,
NullTypeClass,
OperationClass,
OperationTypeClass,
OtherSchemaClass,
)
from datahub.telemetry import telemetry
from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column

logging.getLogger("py4j").setLevel(logging.ERROR)
logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -126,26 +126,57 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source":
config = DeltaLakeSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def delta_type_to_hive_type(self, field_type: Any) -> str:
if isinstance(field_type, str):
"""
return the field type
"""
return field_type
else:
if field_type.get("type") == "array":
"""
if array is of complex type, recursively parse the
fields and create the native datatype
"""
return (
"array<"
+ self.delta_type_to_hive_type(field_type.get("elementType"))
+ ">"
)
elif field_type.get("type") == "struct":
parsed_struct = ""
for field in field_type.get("fields"):
"""
if field is of complex type, recursively parse
and create the native datatype
"""
parsed_struct += (
"{0}:{1}".format(
field.get("name"),
self.delta_type_to_hive_type(field.get("type")),
)
+ ","
)
return "struct<" + parsed_struct.rstrip(",") + ">"
return ""

def _parse_datatype(self, raw_field_json_str: str) -> List[SchemaFieldClass]:
raw_field_json = json.loads(raw_field_json_str)

# get the parent field name and type
field_name = raw_field_json.get("name")
field_type = self.delta_type_to_hive_type(raw_field_json.get("type"))

return get_schema_fields_for_hive_column(field_name, field_type)

def get_fields(self, delta_table: DeltaTable) -> List[SchemaField]:
fields: List[SchemaField] = []

for raw_field in delta_table.schema().fields:
field = SchemaField(
fieldPath=raw_field.name,
type=SchemaFieldDataType(
tableschema_type_map.get(raw_field.type.type, NullTypeClass)()
),
nativeDataType=raw_field.type.type,
recursive=False,
nullable=raw_field.nullable,
description=str(raw_field.metadata),
isPartitioningKey=True
if raw_field.name in delta_table.metadata().partition_columns
else False,
)
fields.append(field)
fields = sorted(fields, key=lambda f: f.fieldPath)
parsed_data_list = self._parse_datatype(raw_field.to_json())
fields = fields + parsed_data_list

fields = sorted(fields, key=lambda f: f.fieldPath)
return fields

def _create_operation_aspect_wu(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,21 @@
},
"fields": [
{
"fieldPath": "customer",
"fieldPath": "[version=2.0].[type=float].total_cost",
"nullable": true,
"description": "{}",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "string",
"nativeDataType": "float",
"recursive": false,
"isPartOfKey": false,
"isPartitioningKey": false
"jsonProps": "{\"native_data_type\": \"float\", \"_nullable\": true}"
},
{
"fieldPath": "day",
"fieldPath": "[version=2.0].[type=int].day",
"nullable": true,
"description": "{}",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
Expand All @@ -70,12 +68,11 @@
"nativeDataType": "integer",
"recursive": false,
"isPartOfKey": false,
"isPartitioningKey": false
"jsonProps": "{\"native_data_type\": \"integer\", \"_nullable\": true}"
},
{
"fieldPath": "month",
"fieldPath": "[version=2.0].[type=int].month",
"nullable": true,
"description": "{}",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
Expand All @@ -84,49 +81,46 @@
"nativeDataType": "integer",
"recursive": false,
"isPartOfKey": false,
"isPartitioningKey": false
"jsonProps": "{\"native_data_type\": \"integer\", \"_nullable\": true}"
},
{
"fieldPath": "sale_id",
"fieldPath": "[version=2.0].[type=int].year",
"nullable": true,
"description": "{}",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "string",
"nativeDataType": "integer",
"recursive": false,
"isPartOfKey": false,
"isPartitioningKey": false
"jsonProps": "{\"native_data_type\": \"integer\", \"_nullable\": true}"
},
{
"fieldPath": "total_cost",
"fieldPath": "[version=2.0].[type=string].customer",
"nullable": true,
"description": "{}",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NullType": {}
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "float",
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"isPartitioningKey": false
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
},
{
"fieldPath": "year",
"fieldPath": "[version=2.0].[type=string].sale_id",
"nullable": true,
"description": "{}",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "integer",
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"isPartitioningKey": false
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
}
]
}
Expand Down Expand Up @@ -380,6 +374,68 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:delta-lake,my-test-bucket/delta_tables/sales,DEV)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:34fc0473e206bb1f4307aadf4177b2fd",
"urn": "urn:li:container:34fc0473e206bb1f4307aadf4177b2fd"
},
{
"id": "urn:li:container:acebf8bcf966274632d3d2b710ef4947",
"urn": "urn:li:container:acebf8bcf966274632d3d2b710ef4947"
}
]
}
},
"systemMetadata": {
"lastObserved": 1672531200000,
"runId": "delta-lake-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:34fc0473e206bb1f4307aadf4177b2fd",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": []
}
},
"systemMetadata": {
"lastObserved": 1672531200000,
"runId": "delta-lake-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:acebf8bcf966274632d3d2b710ef4947",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:34fc0473e206bb1f4307aadf4177b2fd",
"urn": "urn:li:container:34fc0473e206bb1f4307aadf4177b2fd"
}
]
}
},
"systemMetadata": {
"lastObserved": 1672531200000,
"runId": "delta-lake-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:delta-lake,my-test-bucket/delta_tables/sales,DEV)",
Expand Down
Loading

0 comments on commit 68a26b4

Please sign in to comment.