Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat (ingest/delta-lake): Support ABS file location for delta lake ta… #11779

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
d89b107
feat (ingest/delta-lake): Support ABS file location for delta lake ta…
acrylJonny Nov 3, 2024
ba2a05e
Update source.py
acrylJonny Nov 3, 2024
a011e72
Update source.py
acrylJonny Nov 3, 2024
9a00b63
linting
acrylJonny Nov 4, 2024
6f7461f
Update source.py
acrylJonny Nov 4, 2024
c841f39
adding tests
acrylJonny Nov 4, 2024
6234836
linting test
acrylJonny Nov 4, 2024
66cacf5
Update test_delta_lake_abs.py
acrylJonny Nov 4, 2024
ec12968
linting
acrylJonny Nov 4, 2024
2f4112b
documentation and linting
acrylJonny Nov 4, 2024
9dd373f
Merge branch 'master' into azure-support-delta-lake
acrylJonny Nov 4, 2024
80f626c
Update source.py
acrylJonny Nov 4, 2024
c556f14
Update source.py
acrylJonny Nov 4, 2024
7648d88
Update source.py
acrylJonny Nov 4, 2024
5eb8a1c
test update
acrylJonny Nov 4, 2024
a2277e0
test mces
acrylJonny Nov 5, 2024
e62759e
azurite support for tests
acrylJonny Nov 5, 2024
34c0fb8
Merge branch 'master' into azure-support-delta-lake
acrylJonny Nov 5, 2024
9eb7d3f
Update source.py
acrylJonny Nov 5, 2024
f243c37
Merge branch 'azure-support-delta-lake' of https://github.com/acrylJo…
acrylJonny Nov 5, 2024
0808bdf
Update source.py
acrylJonny Nov 5, 2024
0309764
Update source.py
acrylJonny Nov 5, 2024
53527d8
Update source.py
acrylJonny Nov 5, 2024
6d9d4b3
Update source.py
acrylJonny Nov 5, 2024
75dc55d
Update test_delta_lake_abs.py
acrylJonny Nov 5, 2024
763a620
Merge branch 'master' into azure-support-delta-lake
acrylJonny Nov 5, 2024
e5a968b
Update test_delta_lake_abs.py
acrylJonny Nov 5, 2024
41e242f
Update test_delta_lake_abs.py
acrylJonny Nov 5, 2024
258b386
test update
acrylJonny Nov 6, 2024
cf71799
Update source.py
acrylJonny Nov 6, 2024
985462d
Update source.py
acrylJonny Nov 6, 2024
28d2041
Update config.py
acrylJonny Nov 6, 2024
2575a54
Update config.py
acrylJonny Nov 6, 2024
83fe86c
Update config.py
acrylJonny Nov 6, 2024
c9319b3
Update config.py
acrylJonny Nov 6, 2024
fef8233
support multiple browse paths
acrylJonny Nov 6, 2024
44242cf
Update source.py
acrylJonny Nov 6, 2024
12d2544
Update config.py
acrylJonny Nov 6, 2024
7197525
tests for multiple paths
acrylJonny Nov 6, 2024
6fad686
test linting
acrylJonny Nov 6, 2024
b52b23b
Update test_delta_lake_minio.py
acrylJonny Nov 6, 2024
b39dc95
linting
acrylJonny Nov 6, 2024
b6446e1
Update config.py
acrylJonny Nov 6, 2024
690327c
Update config.py
acrylJonny Nov 6, 2024
846d224
Update config.py
acrylJonny Nov 6, 2024
a7913f5
Update source.py
acrylJonny Nov 6, 2024
227d767
updates
acrylJonny Nov 6, 2024
95c628f
Update config.py
acrylJonny Nov 6, 2024
0b9ec02
testing updates
acrylJonny Nov 6, 2024
b477747
Update test_delta_lake_abs.py
acrylJonny Nov 6, 2024
dda748b
Update conftest.py
acrylJonny Nov 6, 2024
847ca0c
test updates
acrylJonny Nov 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions metadata-ingestion/docs/sources/delta-lake/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,93 @@ Execute the ingestion recipe:
datahub ingest -c delta.s3.dhub.yaml
```

### Delta Table on Azure Storage

#### Step 1: Create a Delta Table
Use the following PySpark code to create a sample Delta table in Azure Storage:

```python
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
import uuid
import random

def generate_data():
return [(y, m, d, str(uuid.uuid4()), str(random.randrange(10000) % 26 + 65) * 3, random.random()*10000)
for d in range(1, 29)
for m in range(1, 13)
for y in range(2000, 2021)]

# Configure Spark with Delta Lake and Azure Storage support
jar_packages = [
"io.delta:delta-core_2.12:1.2.1",
"org.apache.hadoop:hadoop-azure:3.2.0",
"com.microsoft.azure:azure-storage:8.6.6"
]

spark = SparkSession.builder \
.appName("delta-azure-quickstart") \
.master("local[*]") \
.config("spark.jars.packages", ",".join(jar_packages)) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()

# Configure Azure Storage access
storage_account = "your-storage-account"
container_name = "your-container"

# Configure Spark properties for Azure Storage
spark.conf.set(
f"fs.azure.account.key.{storage_account}.dfs.core.windows.net",
"your-account-key"
)

# Create and write sample data
table_path = f"abfss://{container_name}@{storage_account}.dfs.core.windows.net/sales-table"
columns = ["year", "month", "day", "sale_id", "customer", "total_cost"]

spark.sparkContext.parallelize(generate_data()) \
.toDF(columns) \
.repartition(1) \
.write \
.format("delta") \
.save(table_path)

# Read and verify the data
df = spark.read.format("delta").load(table_path)
df.show()
```

#### Step 2: Create DataHub Ingestion Recipe
Create a YAML file (delta.azure.dhub.yaml) with the following configuration:

```yaml
source:
type: "delta-lake"
config:
base_path: "https://your-storage-account.dfs.core.windows.net/your-container/sales-table"
azure:
azure_config:
account_name: "your-storage-account"
container_name: "your-container"
account_key: "*****"

# Optional: Enable Azure metadata ingestion
use_abs_blob_tags: true

sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"
```

#### Step 3: Execute the Ingestion
Run the following command to start the ingestion:
```bash
datahub ingest -c delta.azure.dhub.yaml
```

### Note

The above recipes are minimal recipes. Please refer to [Config Details](#config-details) section for the full configuration.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Dict, Optional, Union
from urllib.parse import urlparse

from azure.identity import ClientSecretCredential
from azure.storage.blob import BlobServiceClient
Expand Down Expand Up @@ -59,10 +60,19 @@ def get_filesystem_client(self) -> FileSystemClient:
)

def get_blob_service_client(self):
return BlobServiceClient(
account_url=f"https://{self.account_name}.blob.core.windows.net",
credential=f"{self.get_credentials()}",
)
if self.base_path.startswith("http://"): # Azurite
connection_string = (
f"DefaultEndpointsProtocol=http;"
f"AccountName={self.account_name};"
f"AccountKey={self.account_key};"
f"BlobEndpoint=http://{urlparse(self.base_path).netloc}/{self.account_name}"
)
return BlobServiceClient.from_connection_string(connection_string)
else: # Real Azure Storage
return BlobServiceClient(
account_url=f"https://{self.account_name}.blob.core.windows.net",
credential=self.get_credentials(),
)

def get_data_lake_service_client(self) -> DataLakeServiceClient:
return DataLakeServiceClient(
Expand Down
134 changes: 119 additions & 15 deletions metadata-ingestion/src/datahub/ingestion/source/delta_lake/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import logging
from typing import Optional
from typing import Any, Dict, List, Optional

import pydantic
from cached_property import cached_property
from pydantic import Field
from pydantic import Field, root_validator
from typing_extensions import Literal

from datahub.configuration.common import AllowDenyPattern
Expand All @@ -14,6 +14,8 @@
)
from datahub.ingestion.source.aws.aws_common import AwsConnectionConfig
from datahub.ingestion.source.aws.s3_util import is_s3_uri
from datahub.ingestion.source.azure.abs_utils import is_abs_uri
from datahub.ingestion.source.azure.azure_common import AzureConnectionConfig

# hide annoying debug errors from py4j
logging.getLogger("py4j").setLevel(logging.ERROR)
Expand All @@ -36,10 +38,28 @@ class S3(ConfigModel):
)


class Azure(ConfigModel):
"""Azure configuration for Delta Lake source"""

azure_config: Optional[AzureConnectionConfig] = Field(
default=None, description="Azure configuration"
)
use_abs_blob_tags: Optional[bool] = Field(
False,
description="Whether or not to create tags in datahub from Azure blob metadata",
)


class DeltaLakeSourceConfig(PlatformInstanceConfigMixin, EnvConfigMixin):
base_path: str = Field(
description="Path to table (s3 or local file system). If path is not a delta table path "
"then all subfolders will be scanned to detect and ingest delta tables."
base_paths: Optional[List[str]] = Field(
default=None,
description="List of paths to tables (s3, abfss, or local file system). If a path is not a delta table path "
"then all subfolders will be scanned to detect and ingest delta tables.",
)
# Not included in documentation, but kept for backwards compatibility
base_path: Optional[str] = Field(
default=None,
exclude=True,
)
relative_path: Optional[str] = Field(
default=None,
Expand Down Expand Up @@ -72,21 +92,105 @@ class DeltaLakeSourceConfig(PlatformInstanceConfigMixin, EnvConfigMixin):
"When set to `False`, number_of_files in delta table can not be reported.",
)

s3: Optional[S3] = Field()
s3: Optional[S3] = Field(default=None)
azure: Optional[Azure] = Field(default=None)

@cached_property
def is_s3(self):
return is_s3_uri(self.base_path or "")
@cached_property # type: ignore
def is_s3(self) -> bool:
table_path = next(iter(self.paths_to_scan), "")
return bool(
self.s3 is not None
and self.s3.aws_config is not None
and is_s3_uri(table_path)
)

@cached_property
def complete_path(self):
complete_path = self.base_path
@cached_property # type: ignore
def is_azure(self) -> bool:
table_path = next(iter(self.paths_to_scan), "")
return bool(
self.azure is not None
and self.azure.azure_config is not None
and is_abs_uri(table_path)
)

@property
def complete_paths(self) -> List[str]:
paths: List[str] = []
base_paths = self.paths_to_scan
if self.relative_path is not None:
complete_path = (
f"{complete_path.rstrip('/')}/{self.relative_path.lstrip('/')}"
paths.extend(
f"{path.rstrip('/')}/{self.relative_path.lstrip('/')}"
for path in base_paths
)
else:
paths.extend(base_paths)
return paths

@property
def paths_to_scan(self) -> List[str]:
paths: List[str] = []
if self.base_paths is not None:
paths.extend(self.base_paths)
if self.base_path is not None:
paths.append(self.base_path)
if not paths:
raise ValueError("At least one path must be specified via base_paths")
return paths

return complete_path
@root_validator
def validate_config(cls, values: Dict[str, Any]) -> Dict[str, Any]:
base_path = values.get("base_path")
base_paths = values.get("base_paths", []) or []

# Ensure at least one path is provided
if not base_path and not base_paths:
raise ValueError("Either base_path or base_paths must be specified")

# Combine paths for validation
paths = []
if base_paths:
paths.extend(base_paths)
if base_path is not None:
paths.append(base_path)

has_s3 = any(is_s3_uri(path) for path in paths)
has_azure = any(is_abs_uri(path) for path in paths)

# Validate S3 configuration
s3_config = values.get("s3")
if has_s3:
if not s3_config:
raise ValueError("S3 configuration required for S3 paths")
if not getattr(s3_config, "aws_config", None):
raise ValueError("AWS configuration required for S3 paths")

# Validate Azure configuration
azure_config = values.get("azure")
if has_azure:
if not azure_config:
raise ValueError(
"Azure configuration required for Azure Blob Storage paths"
)
if not getattr(azure_config, "azure_config", None):
raise ValueError(
"Azure connection config required for Azure Blob Storage paths"
)

# Validate that all paths are of compatible types
if has_s3 and has_azure:
raise ValueError("Cannot mix S3 and Azure paths in the same source")

return values

@pydantic.validator("base_path")
def warn_base_path_deprecated(cls, v: Optional[str]) -> Optional[str]:
if v is not None:
logger.warning(
"The 'base_path' option is deprecated and will be removed in a future release. "
"Please use 'base_paths' instead. "
"Example: base_paths: ['{path}']".format(path=v)
)
return v

@pydantic.validator("version_history_lookback")
def negative_version_history_implies_no_limit(cls, v):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
def read_delta_table(
path: str, opts: Dict[str, str], delta_lake_config: DeltaLakeSourceConfig
) -> Optional[DeltaTable]:
if not delta_lake_config.is_s3 and not pathlib.Path(path).exists():
if (
not (delta_lake_config.is_s3 or delta_lake_config.is_azure)
and not pathlib.Path(path).exists()
):
# The DeltaTable() constructor will create the path if it doesn't exist.
# Hence we need an extra, manual check here.
return None
Expand Down
Loading
Loading