Skip to content

Commit

Permalink
fix(ingest/abs): split abs utils into multiple files (datahub-project…
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Jul 19, 2024
1 parent 348d449 commit 65ef858
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 67 deletions.
6 changes: 4 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/abs/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.abs.config import DataLakeSourceConfig, PathSpec
from datahub.ingestion.source.abs.report import DataLakeSourceReport
from datahub.ingestion.source.azure.abs_util import (
from datahub.ingestion.source.azure.abs_folder_utils import (
get_abs_properties,
get_abs_tags,
list_folders,
)
from datahub.ingestion.source.azure.abs_utils import (
get_container_name,
get_container_relative_path,
get_key_prefix,
list_folders,
strip_abs_prefix,
)
from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import logging
import os
import re
from typing import Dict, Iterable, List, Optional

from azure.storage.blob import BlobProperties
Expand All @@ -10,67 +8,10 @@
from datahub.ingestion.source.azure.azure_common import AzureConnectionConfig
from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass

ABS_PREFIXES_REGEX = re.compile(
r"(http[s]?://[a-z0-9]{3,24}\.blob\.core\.windows\.net/)"
)

logging.getLogger("py4j").setLevel(logging.ERROR)
logger: logging.Logger = logging.getLogger(__name__)


def is_abs_uri(uri: str) -> bool:
return bool(ABS_PREFIXES_REGEX.match(uri))


def get_abs_prefix(abs_uri: str) -> Optional[str]:
result = re.search(ABS_PREFIXES_REGEX, abs_uri)
if result and result.groups():
return result.group(1)
return None


def strip_abs_prefix(abs_uri: str) -> str:
# remove abs prefix https://<storage-account>.blob.core.windows.net
abs_prefix = get_abs_prefix(abs_uri)
if not abs_prefix:
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
length_abs_prefix = len(abs_prefix)
return abs_uri[length_abs_prefix:]


def make_abs_urn(abs_uri: str, env: str) -> str:
abs_name = strip_abs_prefix(abs_uri)

if abs_name.endswith("/"):
abs_name = abs_name[:-1]

name, extension = os.path.splitext(abs_name)

if extension != "":
extension = extension[1:] # remove the dot
return f"urn:li:dataset:(urn:li:dataPlatform:abs,{name}_{extension},{env})"

return f"urn:li:dataset:(urn:li:dataPlatform:abs,{abs_name},{env})"


def get_container_name(abs_uri: str) -> str:
if not is_abs_uri(abs_uri):
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
return strip_abs_prefix(abs_uri).split("/")[0]


def get_key_prefix(abs_uri: str) -> str:
if not is_abs_uri(abs_uri):
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
return strip_abs_prefix(abs_uri).split("/", maxsplit=1)[1]


def get_abs_properties(
container_name: str,
blob_name: Optional[str],
Expand Down Expand Up @@ -280,7 +221,3 @@ def list_folders(
this_dict[folder_name] = folder_name

yield f"{folder_name}"


def get_container_relative_path(abs_uri: str) -> str:
return "/".join(strip_abs_prefix(abs_uri).split("/")[1:])
66 changes: 66 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/azure/abs_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
import re
from typing import Optional

# This file should not import any abs spectific modules as we import it in path_spec.py in datat_lake_common.py

ABS_PREFIXES_REGEX = re.compile(
r"(http[s]?://[a-z0-9]{3,24}\.blob\.core\.windows\.net/)"
)


def is_abs_uri(uri: str) -> bool:
return bool(ABS_PREFIXES_REGEX.match(uri))


def get_abs_prefix(abs_uri: str) -> Optional[str]:
result = re.search(ABS_PREFIXES_REGEX, abs_uri)
if result and result.groups():
return result.group(1)
return None


def strip_abs_prefix(abs_uri: str) -> str:
# remove abs prefix https://<storage-account>.blob.core.windows.net
abs_prefix = get_abs_prefix(abs_uri)
if not abs_prefix:
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
length_abs_prefix = len(abs_prefix)
return abs_uri[length_abs_prefix:]


def make_abs_urn(abs_uri: str, env: str) -> str:
abs_name = strip_abs_prefix(abs_uri)

if abs_name.endswith("/"):
abs_name = abs_name[:-1]

name, extension = os.path.splitext(abs_name)

if extension != "":
extension = extension[1:] # remove the dot
return f"urn:li:dataset:(urn:li:dataPlatform:abs,{name}_{extension},{env})"

return f"urn:li:dataset:(urn:li:dataPlatform:abs,{abs_name},{env})"


def get_container_name(abs_uri: str) -> str:
if not is_abs_uri(abs_uri):
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
return strip_abs_prefix(abs_uri).split("/")[0]


def get_key_prefix(abs_uri: str) -> str:
if not is_abs_uri(abs_uri):
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
return strip_abs_prefix(abs_uri).split("/", maxsplit=1)[1]


def get_container_relative_path(abs_uri: str) -> str:
return "/".join(strip_abs_prefix(abs_uri).split("/")[1:])
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
get_s3_prefix,
is_s3_uri,
)
from datahub.ingestion.source.azure.abs_util import (
from datahub.ingestion.source.azure.abs_utils import (
get_abs_prefix,
get_container_name,
get_container_relative_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from datahub.configuration.common import ConfigModel
from datahub.ingestion.source.aws.s3_util import is_s3_uri
from datahub.ingestion.source.azure.abs_util import is_abs_uri
from datahub.ingestion.source.azure.abs_utils import is_abs_uri
from datahub.ingestion.source.gcs.gcs_utils import is_gcs_uri

# hide annoying debug errors from py4j
Expand Down

0 comments on commit 65ef858

Please sign in to comment.