diff --git a/metadata-ingestion/docs/sources/delta-lake/delta-lake.md b/metadata-ingestion/docs/sources/delta-lake/delta-lake.md index 9c620d9840e34..16ceb159d70c9 100644 --- a/metadata-ingestion/docs/sources/delta-lake/delta-lake.md +++ b/metadata-ingestion/docs/sources/delta-lake/delta-lake.md @@ -137,6 +137,93 @@ Execute the ingestion recipe: datahub ingest -c delta.s3.dhub.yaml ``` +### Delta Table on Azure Storage + +#### Step 1: Create a Delta Table +Use the following PySpark code to create a sample Delta table in Azure Storage: + +```python +from pyspark.sql import SparkSession +from delta.tables import DeltaTable +import uuid +import random + +def generate_data(): + return [(y, m, d, str(uuid.uuid4()), str(random.randrange(10000) % 26 + 65) * 3, random.random()*10000) + for d in range(1, 29) + for m in range(1, 13) + for y in range(2000, 2021)] + +# Configure Spark with Delta Lake and Azure Storage support +jar_packages = [ + "io.delta:delta-core_2.12:1.2.1", + "org.apache.hadoop:hadoop-azure:3.2.0", + "com.microsoft.azure:azure-storage:8.6.6" +] + +spark = SparkSession.builder \ + .appName("delta-azure-quickstart") \ + .master("local[*]") \ + .config("spark.jars.packages", ",".join(jar_packages)) \ + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ + .getOrCreate() + +# Configure Azure Storage access +storage_account = "your-storage-account" +container_name = "your-container" + +# Configure Spark properties for Azure Storage +spark.conf.set( + f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", + "your-account-key" +) + +# Create and write sample data +table_path = f"abfss://{container_name}@{storage_account}.dfs.core.windows.net/sales-table" +columns = ["year", "month", "day", "sale_id", "customer", "total_cost"] + +spark.sparkContext.parallelize(generate_data()) \ + .toDF(columns) \ + .repartition(1) \ + .write \ + .format("delta") \ + .save(table_path) + +# Read and verify the data +df = spark.read.format("delta").load(table_path) +df.show() +``` + +#### Step 2: Create DataHub Ingestion Recipe +Create a YAML file (delta.azure.dhub.yaml) with the following configuration: + +```yaml +source: + type: "delta-lake" + config: + base_path: "https://your-storage-account.dfs.core.windows.net/your-container/sales-table" + azure: + azure_config: + account_name: "your-storage-account" + container_name: "your-container" + account_key: "*****" + + # Optional: Enable Azure metadata ingestion + use_abs_blob_tags: true + +sink: + type: "datahub-rest" + config: + server: "http://localhost:8080" +``` + +#### Step 3: Execute the Ingestion +Run the following command to start the ingestion: +```bash +datahub ingest -c delta.azure.dhub.yaml +``` + ### Note The above recipes are minimal recipes. Please refer to [Config Details](#config-details) section for the full configuration. diff --git a/metadata-ingestion/src/datahub/ingestion/source/azure/azure_common.py b/metadata-ingestion/src/datahub/ingestion/source/azure/azure_common.py index 46de4e09d7ee5..f12d859ce3fbc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/azure/azure_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/azure/azure_common.py @@ -1,4 +1,5 @@ from typing import Dict, Optional, Union +from urllib.parse import urlparse from azure.identity import ClientSecretCredential from azure.storage.blob import BlobServiceClient @@ -59,10 +60,19 @@ def get_filesystem_client(self) -> FileSystemClient: ) def get_blob_service_client(self): - return BlobServiceClient( - account_url=f"https://{self.account_name}.blob.core.windows.net", - credential=f"{self.get_credentials()}", - ) + if self.base_path.startswith("http://"): # Azurite + connection_string = ( + f"DefaultEndpointsProtocol=http;" + f"AccountName={self.account_name};" + f"AccountKey={self.account_key};" + f"BlobEndpoint=http://{urlparse(self.base_path).netloc}/{self.account_name}" + ) + return BlobServiceClient.from_connection_string(connection_string) + else: # Real Azure Storage + return BlobServiceClient( + account_url=f"https://{self.account_name}.blob.core.windows.net", + credential=self.get_credentials(), + ) def get_data_lake_service_client(self) -> DataLakeServiceClient: return DataLakeServiceClient( diff --git a/metadata-ingestion/src/datahub/ingestion/source/delta_lake/config.py b/metadata-ingestion/src/datahub/ingestion/source/delta_lake/config.py index 81a54d1327d05..7e22c227b6743 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/delta_lake/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/delta_lake/config.py @@ -1,9 +1,9 @@ import logging -from typing import Optional +from typing import Any, Dict, List, Optional import pydantic from cached_property import cached_property -from pydantic import Field +from pydantic import Field, root_validator from typing_extensions import Literal from datahub.configuration.common import AllowDenyPattern @@ -14,6 +14,8 @@ ) from datahub.ingestion.source.aws.aws_common import AwsConnectionConfig from datahub.ingestion.source.aws.s3_util import is_s3_uri +from datahub.ingestion.source.azure.abs_utils import is_abs_uri +from datahub.ingestion.source.azure.azure_common import AzureConnectionConfig # hide annoying debug errors from py4j logging.getLogger("py4j").setLevel(logging.ERROR) @@ -36,10 +38,28 @@ class S3(ConfigModel): ) +class Azure(ConfigModel): + """Azure configuration for Delta Lake source""" + + azure_config: Optional[AzureConnectionConfig] = Field( + default=None, description="Azure configuration" + ) + use_abs_blob_tags: Optional[bool] = Field( + False, + description="Whether or not to create tags in datahub from Azure blob metadata", + ) + + class DeltaLakeSourceConfig(PlatformInstanceConfigMixin, EnvConfigMixin): - base_path: str = Field( - description="Path to table (s3 or local file system). If path is not a delta table path " - "then all subfolders will be scanned to detect and ingest delta tables." + base_paths: Optional[List[str]] = Field( + default=None, + description="List of paths to tables (s3, abfss, or local file system). If a path is not a delta table path " + "then all subfolders will be scanned to detect and ingest delta tables.", + ) + # Not included in documentation, but kept for backwards compatibility + base_path: Optional[str] = Field( + default=None, + exclude=True, ) relative_path: Optional[str] = Field( default=None, @@ -72,21 +92,105 @@ class DeltaLakeSourceConfig(PlatformInstanceConfigMixin, EnvConfigMixin): "When set to `False`, number_of_files in delta table can not be reported.", ) - s3: Optional[S3] = Field() + s3: Optional[S3] = Field(default=None) + azure: Optional[Azure] = Field(default=None) - @cached_property - def is_s3(self): - return is_s3_uri(self.base_path or "") + @cached_property # type: ignore + def is_s3(self) -> bool: + table_path = next(iter(self.paths_to_scan), "") + return bool( + self.s3 is not None + and self.s3.aws_config is not None + and is_s3_uri(table_path) + ) - @cached_property - def complete_path(self): - complete_path = self.base_path + @cached_property # type: ignore + def is_azure(self) -> bool: + table_path = next(iter(self.paths_to_scan), "") + return bool( + self.azure is not None + and self.azure.azure_config is not None + and is_abs_uri(table_path) + ) + + @property + def complete_paths(self) -> List[str]: + paths: List[str] = [] + base_paths = self.paths_to_scan if self.relative_path is not None: - complete_path = ( - f"{complete_path.rstrip('/')}/{self.relative_path.lstrip('/')}" + paths.extend( + f"{path.rstrip('/')}/{self.relative_path.lstrip('/')}" + for path in base_paths ) + else: + paths.extend(base_paths) + return paths + + @property + def paths_to_scan(self) -> List[str]: + paths: List[str] = [] + if self.base_paths is not None: + paths.extend(self.base_paths) + if self.base_path is not None: + paths.append(self.base_path) + if not paths: + raise ValueError("At least one path must be specified via base_paths") + return paths - return complete_path + @root_validator + def validate_config(cls, values: Dict[str, Any]) -> Dict[str, Any]: + base_path = values.get("base_path") + base_paths = values.get("base_paths", []) or [] + + # Ensure at least one path is provided + if not base_path and not base_paths: + raise ValueError("Either base_path or base_paths must be specified") + + # Combine paths for validation + paths = [] + if base_paths: + paths.extend(base_paths) + if base_path is not None: + paths.append(base_path) + + has_s3 = any(is_s3_uri(path) for path in paths) + has_azure = any(is_abs_uri(path) for path in paths) + + # Validate S3 configuration + s3_config = values.get("s3") + if has_s3: + if not s3_config: + raise ValueError("S3 configuration required for S3 paths") + if not getattr(s3_config, "aws_config", None): + raise ValueError("AWS configuration required for S3 paths") + + # Validate Azure configuration + azure_config = values.get("azure") + if has_azure: + if not azure_config: + raise ValueError( + "Azure configuration required for Azure Blob Storage paths" + ) + if not getattr(azure_config, "azure_config", None): + raise ValueError( + "Azure connection config required for Azure Blob Storage paths" + ) + + # Validate that all paths are of compatible types + if has_s3 and has_azure: + raise ValueError("Cannot mix S3 and Azure paths in the same source") + + return values + + @pydantic.validator("base_path") + def warn_base_path_deprecated(cls, v: Optional[str]) -> Optional[str]: + if v is not None: + logger.warning( + "The 'base_path' option is deprecated and will be removed in a future release. " + "Please use 'base_paths' instead. " + "Example: base_paths: ['{path}']".format(path=v) + ) + return v @pydantic.validator("version_history_lookback") def negative_version_history_implies_no_limit(cls, v): diff --git a/metadata-ingestion/src/datahub/ingestion/source/delta_lake/delta_lake_utils.py b/metadata-ingestion/src/datahub/ingestion/source/delta_lake/delta_lake_utils.py index 071dacae4a8c5..732f64c8ae26a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/delta_lake/delta_lake_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/delta_lake/delta_lake_utils.py @@ -19,7 +19,10 @@ def read_delta_table( path: str, opts: Dict[str, str], delta_lake_config: DeltaLakeSourceConfig ) -> Optional[DeltaTable]: - if not delta_lake_config.is_s3 and not pathlib.Path(path).exists(): + if ( + not (delta_lake_config.is_s3 or delta_lake_config.is_azure) + and not pathlib.Path(path).exists() + ): # The DeltaTable() constructor will create the path if it doesn't exist. # Hence we need an extra, manual check here. return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py b/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py index 98133ca69011e..8d18ddb798678 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py @@ -2,10 +2,13 @@ import logging import os import time -from typing import Dict, Iterable, List +from typing import Dict, Iterable, List, Optional from urllib.parse import urlparse +from azure.identity import ClientSecretCredential +from azure.storage.blob import BlobServiceClient from deltalake import DeltaTable +from mypy_boto3_s3 import S3Client from datahub.emitter.mce_builder import ( make_data_platform_urn, @@ -27,8 +30,16 @@ from datahub.ingestion.source.aws.s3_util import ( get_bucket_name, get_key_prefix, + is_s3_uri, strip_s3_prefix, ) +from datahub.ingestion.source.azure.abs_folder_utils import get_abs_tags +from datahub.ingestion.source.azure.abs_utils import ( + get_abs_prefix, + get_container_name, + is_abs_uri, + strip_abs_prefix, +) from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator from datahub.ingestion.source.delta_lake.config import DeltaLakeSourceConfig from datahub.ingestion.source.delta_lake.delta_lake_utils import ( @@ -103,13 +114,33 @@ def __init__(self, config: DeltaLakeSourceConfig, ctx: PipelineContext): super().__init__(ctx) self.source_config = config self.report = DeltaLakeSourceReport() - if self.source_config.is_s3: + + s3_client: Optional[S3Client] + azure_client: Optional[BlobServiceClient] + + paths: List[str] = getattr(self.source_config, "paths_to_scan", []) or [] + + if not paths: + return + + if any(path and is_s3_uri(path) for path in paths): if ( self.source_config.s3 is None or self.source_config.s3.aws_config is None ): - raise ValueError("AWS Config must be provided for S3 base path.") + raise ValueError("AWS Config must be provided for S3 base paths.") self.s3_client = self.source_config.s3.aws_config.get_s3_client() + elif any(path and is_abs_uri(path) for path in paths): + if ( + self.source_config.azure is None + or self.source_config.azure.azure_config is None + ): + raise ValueError( + "Azure Config must be provided for Azure Blob Storage paths" + ) + self.azure_client = ( + self.source_config.azure.azure_config.get_blob_service_client() + ) # self.profiling_times_taken = [] config_report = { @@ -200,14 +231,29 @@ def ingest_table( logger.debug( f"Skipping table ({table_name}) present at location {path} as table pattern does not match" ) + return logger.debug(f"Ingesting table {table_name} from location {path}") if self.source_config.relative_path is None: - browse_path: str = ( - strip_s3_prefix(path) if self.source_config.is_s3 else path.strip("/") - ) + if self.source_config.is_s3: + browse_path = strip_s3_prefix(path) + elif self.source_config.is_azure: + if path.startswith(("http://", "https://")): + container_name = get_container_name(path) + prefix = get_key_prefix(path) + browse_path = ( + f"{container_name}/{prefix}" if prefix else container_name + ) + else: + browse_path = strip_abs_prefix(path) + else: + browse_path = path.strip("/") else: - browse_path = path.split(self.source_config.base_path)[1].strip("/") + base_path = next(iter(self.source_config.paths_to_scan), "") + if base_path: + browse_path = path.split(base_path)[1].strip("/") + else: + browse_path = path.strip("/") data_platform_urn = make_data_platform_urn(self.source_config.platform) logger.info(f"Creating dataset urn with name: {browse_path}") @@ -227,7 +273,7 @@ def ingest_table( "table_creation_time": str(delta_table.metadata().created_time), "id": str(delta_table.metadata().id), "version": str(delta_table.version()), - "location": self.source_config.complete_path, + "location": path, } if self.source_config.require_files: customProperties["number_of_files"] = str(get_file_count(delta_table)) @@ -271,6 +317,23 @@ def ingest_table( ) if s3_tags is not None: dataset_snapshot.aspects.append(s3_tags) + if ( + self.source_config.is_azure + and self.source_config.azure + and self.source_config.azure.use_abs_blob_tags + ): + container_name = get_container_name(path) + abs_prefix = get_abs_prefix(path) + abs_tags = get_abs_tags( + container_name, + abs_prefix, + dataset_urn, + self.source_config.azure.azure_config, + self.ctx, + self.source_config.azure.use_abs_blob_tags, + ) + if abs_tags is not None: + dataset_snapshot.aspects.append(abs_tags) mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) yield MetadataWorkUnit(id=str(delta_table.metadata().id), mce=mce) @@ -301,6 +364,69 @@ def get_storage_options(self) -> Dict[str, str]: if aws_config.aws_endpoint_url: opts["AWS_ENDPOINT_URL"] = aws_config.aws_endpoint_url return opts + elif ( + self.source_config.is_azure + and self.source_config.azure + and self.source_config.azure.azure_config + ): + azure_config = self.source_config.azure.azure_config + creds = azure_config.get_credentials() + table_path = next(iter(self.source_config.paths_to_scan), "") + parsed_url = urlparse(table_path) + + opts: Dict[str, str] = {} + + if azure_config.account_name: + opts["fs.azure.account.name"] = str(azure_config.account_name) + + connection_parts = [] + if parsed_url.scheme: + connection_parts.append(f"DefaultEndpointsProtocol={parsed_url.scheme}") + if azure_config.account_name: + connection_parts.append(f"AccountName={azure_config.account_name}") + + if isinstance(creds, ClientSecretCredential): + if all( + [ + azure_config.client_id, + azure_config.client_secret, + azure_config.tenant_id, + ] + ): + opts.update( + { + "fs.azure.account.oauth2.client.id": str( + azure_config.client_id + ), + "fs.azure.account.oauth2.client.secret": str( + azure_config.client_secret + ), + "fs.azure.account.oauth2.client.endpoint": ( + f"https://login.microsoftonline.com/{azure_config.tenant_id}/oauth2/token" + ), + } + ) + else: + if azure_config.account_key: + connection_parts.append(f"AccountKey={azure_config.account_key}") + elif azure_config.sas_token: + connection_parts.append( + f"SharedAccessSignature={str(azure_config.sas_token or '').lstrip('?')}" + ) + + if ( + not isinstance(creds, ClientSecretCredential) + and azure_config.container_name + and table_path + ): + container_part = f"/{azure_config.container_name}" + endpoint = table_path.split(container_part)[0] + "/" + connection_parts.append(f"BlobEndpoint={endpoint}") + opts["fs.azure.account.connection.string"] = ";".join( + filter(None, connection_parts) + ) + + return {k: str(v) for k, v in opts.items() if v is not None} else: return {} @@ -317,10 +443,14 @@ def process_folder(self, path: str) -> Iterable[MetadataWorkUnit]: def get_folders(self, path: str) -> Iterable[str]: if self.source_config.is_s3: return self.s3_get_folders(path) + elif self.source_config.is_azure: + return self.azure_get_folders(path) else: return self.local_get_folders(path) def s3_get_folders(self, path: str) -> Iterable[str]: + if self.s3_client is None: + return [] parse_result = urlparse(path) for page in self.s3_client.get_paginator("list_objects_v2").paginate( Bucket=parse_result.netloc, Prefix=parse_result.path[1:], Delimiter="/" @@ -328,6 +458,34 @@ def s3_get_folders(self, path: str) -> Iterable[str]: for o in page.get("CommonPrefixes", []): yield f"{parse_result.scheme}://{parse_result.netloc}/{o.get('Prefix')}" + def azure_get_folders(self, path: str) -> Iterable[str]: + """List folders from Azure Blob Storage.""" + + if self.azure_client is None: + return [] + try: + container_name = get_container_name(path) + prefix = get_key_prefix(path) + parsed_url = urlparse(path) + + container_client = self.azure_client.get_container_client(container_name) + + for blob in container_client.walk_blobs( + name_starts_with=prefix, delimiter="/" + ): + if hasattr(blob, "prefix"): + folder_path = ( + f"{parsed_url.scheme}://{parsed_url.netloc}/" + f"{container_name}/{blob.prefix}" + ) + yield folder_path + + except Exception as e: + self.report.report_failure( + "azure-folders", f"Failed to list Azure Blob Storage folders: {e}" + ) + logger.error(f"Error listing Azure folders: {str(e)}") + def local_get_folders(self, path: str) -> Iterable[str]: if not os.path.isdir(path): raise FileNotFoundError( @@ -343,7 +501,10 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: self.source_config.env, ) self.storage_options = self.get_storage_options() - yield from self.process_folder(self.source_config.complete_path) + + for path in self.source_config.complete_paths: + if path: + yield from self.process_folder(path) def get_report(self) -> SourceReport: return self.report diff --git a/metadata-ingestion/tests/integration/delta_lake/conftest.py b/metadata-ingestion/tests/integration/delta_lake/conftest.py new file mode 100644 index 0000000000000..ebb139420f45c --- /dev/null +++ b/metadata-ingestion/tests/integration/delta_lake/conftest.py @@ -0,0 +1,11 @@ +import pytest +import warnings + + +@pytest.fixture(autouse=True) +def handle_base_path_warning(): + warnings.filterwarnings( + "ignore", + message="The 'base_path' option is deprecated", + module="datahub.ingestion.source.delta_lake.config", + ) diff --git a/metadata-ingestion/tests/integration/delta_lake/delta_lake_azure_mces_golden.json b/metadata-ingestion/tests/integration/delta_lake/delta_lake_azure_mces_golden.json new file mode 100644 index 0000000000000..95671b4f5a09c --- /dev/null +++ b/metadata-ingestion/tests/integration/delta_lake/delta_lake_azure_mces_golden.json @@ -0,0 +1,426 @@ +[ +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:delta-lake,my-test-bucket/delta_tables/sales,DEV)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "number_of_files": "3", + "partition_columns": "[]", + "table_creation_time": "1655664813952", + "id": "eca9d2a0-4ce6-4ace-a732-75fda0157fb8", + "version": "0", + "location": "s3://my-test-bucket/delta_tables/sales" + }, + "name": "my_table", + "description": "my table description", + "tags": [] + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "my_table", + "platform": "urn:li:dataPlatform:delta-lake", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[type=float].total_cost", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "float", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"float\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=int].day", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "integer", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"integer\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=int].month", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "integer", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"integer\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=int].year", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "integer", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"integer\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=string].customer", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=string].sale_id", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + } + ] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1672531200000, + "runId": "delta-lake-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:34fc0473e206bb1f4307aadf4177b2fd", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "delta-lake", + "env": "DEV", + "folder_abs_path": "my-test-bucket" + }, + "name": "my-test-bucket", + "env": "DEV" + } + }, + "systemMetadata": { + "lastObserved": 1672531200000, + "runId": "delta-lake-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:34fc0473e206bb1f4307aadf4177b2fd", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1672531200000, + "runId": "delta-lake-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:34fc0473e206bb1f4307aadf4177b2fd", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:delta-lake" + } + }, + "systemMetadata": { + "lastObserved": 1672531200000, + "runId": "delta-lake-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:34fc0473e206bb1f4307aadf4177b2fd", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1672531200000, + "runId": "delta-lake-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:34fc0473e206bb1f4307aadf4177b2fd", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1672531200000, + "runId": "delta-lake-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:acebf8bcf966274632d3d2b710ef4947", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "delta-lake", + "env": "DEV", + "folder_abs_path": "my-test-bucket/delta_tables" + }, + "name": "delta_tables", + "env": "DEV" + } + }, + "systemMetadata": { + "lastObserved": 1672531200000, + "runId": "delta-lake-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:acebf8bcf966274632d3d2b710ef4947", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1672531200000, + "runId": "delta-lake-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:acebf8bcf966274632d3d2b710ef4947", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:delta-lake" + } + }, + "systemMetadata": { + "lastObserved": 1672531200000, + "runId": "delta-lake-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:acebf8bcf966274632d3d2b710ef4947", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1672531200000, + "runId": "delta-lake-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:acebf8bcf966274632d3d2b710ef4947", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:34fc0473e206bb1f4307aadf4177b2fd" + } + }, + "systemMetadata": { + "lastObserved": 1672531200000, + "runId": "delta-lake-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:acebf8bcf966274632d3d2b710ef4947", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:34fc0473e206bb1f4307aadf4177b2fd", + "urn": "urn:li:container:34fc0473e206bb1f4307aadf4177b2fd" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1672531200000, + "runId": "delta-lake-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:delta-lake,my-test-bucket/delta_tables/sales,DEV)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:acebf8bcf966274632d3d2b710ef4947" + } + }, + "systemMetadata": { + "lastObserved": 1672531200000, + "runId": "delta-lake-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:delta-lake,my-test-bucket/delta_tables/sales,DEV)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1672531200000, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "operationType": "CUSTOM", + "customOperationType": "CONVERT", + "customProperties": { + "engineInfo": "local Delta-Standalone/0.4.0", + "isBlindAppend": "True", + "isolationLevel": "Serializable", + "version": "0" + }, + "lastUpdatedTimestamp": 1655664815399 + } + }, + "systemMetadata": { + "lastObserved": 1672531200000, + "runId": "delta-lake-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:delta-lake,my-test-bucket/delta_tables/sales,DEV)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:34fc0473e206bb1f4307aadf4177b2fd", + "urn": "urn:li:container:34fc0473e206bb1f4307aadf4177b2fd" + }, + { + "id": "urn:li:container:acebf8bcf966274632d3d2b710ef4947", + "urn": "urn:li:container:acebf8bcf966274632d3d2b710ef4947" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1672531200000, + "runId": "delta-lake-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:delta-lake,my-test-bucket/delta_tables/sales,DEV)", + "changeType": "PATCH", + "aspectName": "datasetProperties", + "aspect": { + "json": [ + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1655664815399 + } + } + ] + }, + "systemMetadata": { + "lastObserved": 1672531200000, + "runId": "delta-lake-test", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/delta_lake/docker-compose.yml b/metadata-ingestion/tests/integration/delta_lake/docker-compose.yml index 0b27218d23b96..d0772e1032c7e 100644 --- a/metadata-ingestion/tests/integration/delta_lake/docker-compose.yml +++ b/metadata-ingestion/tests/integration/delta_lake/docker-compose.yml @@ -10,3 +10,18 @@ services: - 9000:9000 # S3 API - 9001:9001 # Web UI command: server /data --console-address ":9001" + + azurite: + image: mcr.microsoft.com/azure-storage/azurite:3.23.0 + container_name: "azurite_test" + ports: + - 10000:10000 + - 10001:10001 + - 10002:10002 + command: azurite --blobHost 0.0.0.0 --queueHost 0.0.0.0 --tableHost 0.0.0.0 --loose + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:10000" ] + interval: 5s + timeout: 5s + retries: 3 + diff --git a/metadata-ingestion/tests/integration/delta_lake/test_delta_lake_abs.py b/metadata-ingestion/tests/integration/delta_lake/test_delta_lake_abs.py new file mode 100644 index 0000000000000..6721693fca258 --- /dev/null +++ b/metadata-ingestion/tests/integration/delta_lake/test_delta_lake_abs.py @@ -0,0 +1,179 @@ +import logging +import os +import subprocess + +import freezegun +import pytest +from azure.storage.blob import BlobServiceClient + +from datahub.ingestion.run.pipeline import Pipeline +from tests.test_helpers import mce_helpers +from tests.test_helpers.docker_helpers import wait_for_port + +logger = logging.getLogger(__name__) +pytestmark = pytest.mark.integration_batch_2 + +FROZEN_TIME = "2020-04-14 07:00:00" +AZURITE_BLOB_PORT = 10000 + + +def is_azurite_up(container_name: str) -> bool: + """Check if Azurite blob storage is responsive on a container""" + cmd = f"docker logs {container_name} 2>&1 | grep 'Azurite Blob service is successfully listening'" + ret = subprocess.run( + cmd, + shell=True, + ) + return ret.returncode == 0 + + +@pytest.fixture(scope="module") +def test_resources_dir(pytestconfig): + return pytestconfig.rootpath / "tests/integration/delta_lake" + + +@pytest.fixture(scope="module") +def azurite_runner(docker_compose_runner, pytestconfig, test_resources_dir): + container_name = "azurite_test" + with docker_compose_runner( + test_resources_dir / "docker-compose.yml", container_name + ) as docker_services: + wait_for_port( + docker_services, + container_name, + AZURITE_BLOB_PORT, + timeout=120, + checker=lambda: is_azurite_up(container_name), + ) + yield docker_services + + +@pytest.fixture(scope="module", autouse=True) +def azure_container(azurite_runner): + connection_string = ( + "DefaultEndpointsProtocol=http;" + "AccountName=devstoreaccount1;" + "AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;" + f"BlobEndpoint=http://localhost:{AZURITE_BLOB_PORT}/devstoreaccount1" + ) + + blob_service_client = BlobServiceClient.from_connection_string(connection_string) + container_name = "test-container" + container_client = blob_service_client.create_container(container_name) + return container_client + + +@pytest.fixture(scope="module", autouse=True) +def populate_azure_storage(pytestconfig, azure_container): + test_resources_dir = ( + pytestconfig.rootpath / "tests/integration/delta_lake/test_data/" + ) + + logger.info(f"Files in test_resources_dir {test_resources_dir}:") + for root, _dirs, files in os.walk(test_resources_dir): + for file in files: + logger.info(f"Found file: {os.path.join(root, file)}") + + for root, _dirs, files in os.walk(test_resources_dir): + for file in files: + full_path = os.path.join(root, file) + rel_path = os.path.relpath(full_path, test_resources_dir) + try: + with open(full_path, "rb") as data: + azure_container.upload_blob( + name=rel_path, data=data, overwrite=True + ) + logger.info(f"Uploaded {rel_path}") + except Exception as e: + logger.error(f"Failed to upload {rel_path}: {e}") + + blobs = list(azure_container.list_blobs()) + logger.info("Uploaded blobs:") + for blob in blobs: + logger.info(f" {blob.name}") + + yield + + +@freezegun.freeze_time("2023-01-01 00:00:00+00:00") +def test_delta_lake_ingest_azure(pytestconfig, tmp_path, test_resources_dir): + base_path = f"http://localhost:{AZURITE_BLOB_PORT}/devstoreaccount1/test-container/delta_tables/sales" + + # Run the metadata ingestion pipeline. + pipeline = Pipeline.create( + { + "run_id": "delta-lake-azure-test", + "source": { + "type": "delta-lake", + "config": { + "env": "DEV", + "base_path": base_path, + "azure": { + "azure_config": { + "account_name": "devstoreaccount1", + "container_name": "test-container", + "account_key": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", + }, + }, + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/delta_lake_azure_mces.json", + }, + }, + } + ) + + logger.info(f"Starting pipeline run with base_path: {base_path}") + pipeline.run() + pipeline.raise_from_status() + + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / "delta_lake_azure_mces.json", + golden_path=test_resources_dir / "delta_lake_azure_mces_golden.json", + ) + + +@freezegun.freeze_time("2023-01-01 00:00:00+00:00") +def test_delta_lake_ingest_azure_base_paths(pytestconfig, tmp_path, test_resources_dir): + base_path = f"http://localhost:{AZURITE_BLOB_PORT}/devstoreaccount1/test-container/delta_tables/sales" + + # Run the metadata ingestion pipeline. + pipeline = Pipeline.create( + { + "run_id": "delta-lake-azure-test", + "source": { + "type": "delta-lake", + "config": { + "env": "DEV", + "base_paths": [base_path], + "azure": { + "azure_config": { + "account_name": "devstoreaccount1", + "container_name": "test-container", + "account_key": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", + }, + }, + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/delta_lake_azure_mces.json", + }, + }, + } + ) + + logger.info(f"Starting pipeline run with base_path: {base_path}") + pipeline.run() + pipeline.raise_from_status() + + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / "delta_lake_azure_mces.json", + golden_path=test_resources_dir / "delta_lake_azure_mces_golden.json", + ) diff --git a/metadata-ingestion/tests/integration/delta_lake/test_delta_lake_minio.py b/metadata-ingestion/tests/integration/delta_lake/test_delta_lake_minio.py index 6146c6d1a948c..a36643a00dac5 100644 --- a/metadata-ingestion/tests/integration/delta_lake/test_delta_lake_minio.py +++ b/metadata-ingestion/tests/integration/delta_lake/test_delta_lake_minio.py @@ -112,3 +112,43 @@ def test_delta_lake_ingest(pytestconfig, tmp_path, test_resources_dir): output_path=tmp_path / "delta_lake_minio_mces.json", golden_path=test_resources_dir / "delta_lake_minio_mces_golden.json", ) + + +@freezegun.freeze_time("2023-01-01 00:00:00+00:00") +def test_delta_lake_ingest_base_paths(pytestconfig, tmp_path, test_resources_dir): + # Run the metadata ingestion pipeline. + pipeline = Pipeline.create( + { + "run_id": "delta-lake-test", + "source": { + "type": "delta-lake", + "config": { + "env": "DEV", + "base_paths": ["s3://my-test-bucket/delta_tables/sales"], + "s3": { + "aws_config": { + "aws_access_key_id": "miniouser", + "aws_secret_access_key": "miniopassword", + "aws_endpoint_url": f"http://localhost:{MINIO_PORT}", + "aws_region": "us-east-1", + }, + }, + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/delta_lake_minio_mces.json", + }, + }, + } + ) + pipeline.run() + pipeline.raise_from_status() + + # Verify the output. + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / "delta_lake_minio_mces.json", + golden_path=test_resources_dir / "delta_lake_minio_mces_golden.json", + )