diff --git a/metadata-ingestion/src/datahub/ingestion/source/abs/source.py b/metadata-ingestion/src/datahub/ingestion/source/abs/source.py index 07cc694e1b162..c9833f6982599 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/abs/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/abs/source.py @@ -52,13 +52,15 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.abs.config import DataLakeSourceConfig, PathSpec from datahub.ingestion.source.abs.report import DataLakeSourceReport -from datahub.ingestion.source.azure.abs_util import ( +from datahub.ingestion.source.azure.abs_folder_utils import ( get_abs_properties, get_abs_tags, + list_folders, +) +from datahub.ingestion.source.azure.abs_utils import ( get_container_name, get_container_relative_path, get_key_prefix, - list_folders, strip_abs_prefix, ) from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator diff --git a/metadata-ingestion/src/datahub/ingestion/source/azure/abs_util.py b/metadata-ingestion/src/datahub/ingestion/source/azure/abs_folder_utils.py similarity index 79% rename from metadata-ingestion/src/datahub/ingestion/source/azure/abs_util.py rename to metadata-ingestion/src/datahub/ingestion/source/azure/abs_folder_utils.py index 34faa0f0979ef..ce166f2942dac 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/azure/abs_util.py +++ b/metadata-ingestion/src/datahub/ingestion/source/azure/abs_folder_utils.py @@ -1,6 +1,4 @@ import logging -import os -import re from typing import Dict, Iterable, List, Optional from azure.storage.blob import BlobProperties @@ -10,67 +8,10 @@ from datahub.ingestion.source.azure.azure_common import AzureConnectionConfig from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass -ABS_PREFIXES_REGEX = re.compile( - r"(http[s]?://[a-z0-9]{3,24}\.blob\.core\.windows\.net/)" -) - logging.getLogger("py4j").setLevel(logging.ERROR) logger: logging.Logger = logging.getLogger(__name__) -def is_abs_uri(uri: str) -> bool: - return bool(ABS_PREFIXES_REGEX.match(uri)) - - -def get_abs_prefix(abs_uri: str) -> Optional[str]: - result = re.search(ABS_PREFIXES_REGEX, abs_uri) - if result and result.groups(): - return result.group(1) - return None - - -def strip_abs_prefix(abs_uri: str) -> str: - # remove abs prefix https://.blob.core.windows.net - abs_prefix = get_abs_prefix(abs_uri) - if not abs_prefix: - raise ValueError( - f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}" - ) - length_abs_prefix = len(abs_prefix) - return abs_uri[length_abs_prefix:] - - -def make_abs_urn(abs_uri: str, env: str) -> str: - abs_name = strip_abs_prefix(abs_uri) - - if abs_name.endswith("/"): - abs_name = abs_name[:-1] - - name, extension = os.path.splitext(abs_name) - - if extension != "": - extension = extension[1:] # remove the dot - return f"urn:li:dataset:(urn:li:dataPlatform:abs,{name}_{extension},{env})" - - return f"urn:li:dataset:(urn:li:dataPlatform:abs,{abs_name},{env})" - - -def get_container_name(abs_uri: str) -> str: - if not is_abs_uri(abs_uri): - raise ValueError( - f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}" - ) - return strip_abs_prefix(abs_uri).split("/")[0] - - -def get_key_prefix(abs_uri: str) -> str: - if not is_abs_uri(abs_uri): - raise ValueError( - f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}" - ) - return strip_abs_prefix(abs_uri).split("/", maxsplit=1)[1] - - def get_abs_properties( container_name: str, blob_name: Optional[str], @@ -280,7 +221,3 @@ def list_folders( this_dict[folder_name] = folder_name yield f"{folder_name}" - - -def get_container_relative_path(abs_uri: str) -> str: - return "/".join(strip_abs_prefix(abs_uri).split("/")[1:]) diff --git a/metadata-ingestion/src/datahub/ingestion/source/azure/abs_utils.py b/metadata-ingestion/src/datahub/ingestion/source/azure/abs_utils.py new file mode 100644 index 0000000000000..042e1b4ef921f --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/azure/abs_utils.py @@ -0,0 +1,66 @@ +import os +import re +from typing import Optional + +# This file should not import any abs spectific modules as we import it in path_spec.py in datat_lake_common.py + +ABS_PREFIXES_REGEX = re.compile( + r"(http[s]?://[a-z0-9]{3,24}\.blob\.core\.windows\.net/)" +) + + +def is_abs_uri(uri: str) -> bool: + return bool(ABS_PREFIXES_REGEX.match(uri)) + + +def get_abs_prefix(abs_uri: str) -> Optional[str]: + result = re.search(ABS_PREFIXES_REGEX, abs_uri) + if result and result.groups(): + return result.group(1) + return None + + +def strip_abs_prefix(abs_uri: str) -> str: + # remove abs prefix https://.blob.core.windows.net + abs_prefix = get_abs_prefix(abs_uri) + if not abs_prefix: + raise ValueError( + f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}" + ) + length_abs_prefix = len(abs_prefix) + return abs_uri[length_abs_prefix:] + + +def make_abs_urn(abs_uri: str, env: str) -> str: + abs_name = strip_abs_prefix(abs_uri) + + if abs_name.endswith("/"): + abs_name = abs_name[:-1] + + name, extension = os.path.splitext(abs_name) + + if extension != "": + extension = extension[1:] # remove the dot + return f"urn:li:dataset:(urn:li:dataPlatform:abs,{name}_{extension},{env})" + + return f"urn:li:dataset:(urn:li:dataPlatform:abs,{abs_name},{env})" + + +def get_container_name(abs_uri: str) -> str: + if not is_abs_uri(abs_uri): + raise ValueError( + f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}" + ) + return strip_abs_prefix(abs_uri).split("/")[0] + + +def get_key_prefix(abs_uri: str) -> str: + if not is_abs_uri(abs_uri): + raise ValueError( + f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}" + ) + return strip_abs_prefix(abs_uri).split("/", maxsplit=1)[1] + + +def get_container_relative_path(abs_uri: str) -> str: + return "/".join(strip_abs_prefix(abs_uri).split("/")[1:]) diff --git a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py index 2ebdd2b4126bb..f594c61f4e5ed 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py @@ -16,7 +16,7 @@ get_s3_prefix, is_s3_uri, ) -from datahub.ingestion.source.azure.abs_util import ( +from datahub.ingestion.source.azure.abs_utils import ( get_abs_prefix, get_container_name, get_container_relative_path, diff --git a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py index e21cdac1edf75..71765f9be5e32 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py +++ b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py @@ -11,7 +11,7 @@ from datahub.configuration.common import ConfigModel from datahub.ingestion.source.aws.s3_util import is_s3_uri -from datahub.ingestion.source.azure.abs_util import is_abs_uri +from datahub.ingestion.source.azure.abs_utils import is_abs_uri from datahub.ingestion.source.gcs.gcs_utils import is_gcs_uri # hide annoying debug errors from py4j