Skip to content

Commit

Permalink
feat(azure): support azure blob container storage (#93)
Browse files Browse the repository at this point in the history
* feat(azure): support azure blob container storage

 - add azure implementation, and register with scheme "azure"
 - update test suite to use azure client when "azure" adapter is requested
 - add `pathy[azure]` extras to setup.py
 - remove "list_buckets" from BucketClient classes

BREAKING CHANGE: This removes an internal bit of code that allows for enumerating buckets in certain situations. The API was impossible to reach without going indirectly through the glob functionality, and it's unclear whether the code paths were ever reached outside of specific unit testing situations. If there's an explicit need for listing buckets, we can add a top-level API for it.
  • Loading branch information
justindujardin authored Nov 16, 2022
1 parent 567d137 commit 9624856
Show file tree
Hide file tree
Showing 15 changed files with 478 additions and 109 deletions.
1 change: 1 addition & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ jobs:
GCS_CREDENTIALS: ${{ secrets.GCS_CREDENTIALS }}
PATHY_S3_ACCESS_ID: ${{ secrets.PATHY_S3_ACCESS_ID }}
PATHY_S3_ACCESS_SECRET: ${{ secrets.PATHY_S3_ACCESS_SECRET }}
PATHY_AZURE_CONNECTION_STRING: ${{ secrets.PATHY_AZURE_CONNECTION_STRING }}
run: rm -rf ./pathy/ && sh tools/test_package.sh
- name: Report Code Coverage
env:
Expand Down
54 changes: 49 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,55 @@ assert not greeting.exists()

The table below details the supported cloud provider APIs.

| Cloud Service | Support | Install Extras |
| :------------------- | :-----: | :----------------------: |
| Google Cloud Storage || `pip install pathy[gcs]` |
| Amazon S3 || `pip install pathy[s3]` |
| Azure || |
| Cloud Service | Support | Install Extras |
| :------------------- | :-----: | :------------------------: |
| Google Cloud Storage || `pip install pathy[gcs]` |
| Amazon S3 || `pip install pathy[s3]` |
| Azure || `pip install pathy[azure]` |

### Google Cloud Storage

Google recommends using a JSON credentials file, which you can specify by path:

```python
from google.oauth2 import service_account
from pathy import set_client_params

credentials = service_account.Credentials.from_service_account_file("./my-creds.json")
set_client_params("gs", credentials=credentials)
```

### Amazon S3

S3 uses a JSON credentials file, which you can specify by path:

```python
from pathy import set_client_params

set_client_params("s3", key_id="YOUR_ACCESS_KEY_ID", key_secret="YOUR_ACCESS_SECRET")
```

### Azure

Azure blob storage can be passed a `connection_string`:

```python
from pathy import set_client_params

set_client_params("azure", connection_string="YOUR_CONNECTION_STRING")
```

or a `BlobServiceClient` instance:

```python
from azure.storage.blob import BlobServiceClient
from pathy import set_client_params

service: BlobServiceClient = BlobServiceClient.from_connection_string(
"YOUR_CONNECTION_STRING"
)
set_client_params("azure", service=service)
```

## Semantic Versioning

Expand Down
20 changes: 3 additions & 17 deletions pathy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,6 @@ def lookup_bucket(self, path: "Pathy") -> Optional[Bucket]:
def get_bucket(self, path: "Pathy") -> Bucket:
raise NotImplementedError(SUBCLASS_ERROR)

def list_buckets(self) -> Generator[Bucket, None, None]:
raise NotImplementedError(SUBCLASS_ERROR)

def list_blobs(
self,
path: "Pathy",
Expand All @@ -209,7 +206,7 @@ def list_blobs(

def scandir(
self,
path: Optional["Pathy"] = None,
path: "Pathy",
prefix: Optional[str] = None,
delimiter: Optional[str] = None,
) -> "PathyScanDir":
Expand Down Expand Up @@ -819,7 +816,7 @@ class PathyScanDir(Iterator[Any], ContextManager[Any], ABC):
def __init__(
self,
client: BucketClient,
path: Optional[PurePathy] = None,
path: PurePathy,
prefix: Optional[str] = None,
delimiter: Optional[str] = None,
) -> None:
Expand Down Expand Up @@ -1009,14 +1006,9 @@ def get_bucket(self, path: PurePathy) -> BucketFS:
return BucketFS(str(path.root), bucket=bucket_path)
raise FileNotFoundError(f"Bucket {path.root} does not exist!")

def list_buckets(self, **kwargs: Dict[str, Any]) -> Generator[BucketFS, None, None]:
for f in self.root.glob("*"):
if f.is_dir():
yield BucketFS(f.name, f)

def scandir(
self,
path: Optional[Pathy] = None,
path: Pathy,
prefix: Optional[str] = None,
delimiter: Optional[str] = None,
) -> PathyScanDir:
Expand Down Expand Up @@ -1072,12 +1064,6 @@ class ScanDirFS(PathyScanDir):
_client: BucketClientFS

def scandir(self) -> Generator[BucketEntry, None, None]:
if self._path is None or not self._path.root:
for bucket in self._client.list_buckets():
yield BucketEntryFS(bucket.name, is_dir=True, raw=None)
return
assert self._path is not None
assert self._path.root is not None
scan_path = self._client.root / self._path.root
if isinstance(self._path, BasePath):
scan_path = (
Expand Down
14 changes: 14 additions & 0 deletions pathy/_tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,17 @@
except ImportError:
s3_installed = False
s3_testable = False


azure_testable: bool
azure_installed: bool


try:
from ..azure import BucketClientAzure

azure_installed = bool(BucketClientAzure)
azure_testable = azure_installed and "PATHY_AZURE_CONNECTION_STRING" in os.environ
except ImportError:
azure_installed = False
azure_testable = False
35 changes: 33 additions & 2 deletions pathy/_tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import os
import random
import shutil
import sys
import tempfile
Expand All @@ -10,14 +11,16 @@

from pathy import Pathy, set_client_params, use_fs, use_fs_cache

from . import gcs_testable, s3_testable
from . import azure_testable, gcs_testable, s3_testable

# Which adapters to use
TEST_ADAPTERS = ["fs"]
if gcs_testable:
TEST_ADAPTERS.append("gcs")
if s3_testable:
TEST_ADAPTERS.append("s3")
if azure_testable:
TEST_ADAPTERS.append("azure")

# A unique identifier used to allow each python version and OS to test
# with separate bucket paths. This makes it possible to parallelize the
Expand Down Expand Up @@ -97,14 +100,25 @@ def s3_credentials_from_env() -> Optional[Tuple[str, str]]:
return (access_key_id, access_secret)


def azure_credentials_from_env() -> Optional[str]:
"""Extract an access key ID and Secret from the environment."""
if not azure_testable:
return None

connection_string: Optional[str] = os.environ.get(
"PATHY_AZURE_CONNECTION_STRING", None
)
return connection_string


@pytest.fixture()
def with_adapter(
adapter: str, bucket: str, other_bucket: str
) -> Generator[str, None, None]:
tmp_dir = None
scheme = "gs"
if adapter == "gcs":
# Use GCS
# Use google-cloud-storage
use_fs(False)
credentials = gcs_credentials_from_env()
if credentials is not None:
Expand All @@ -117,6 +131,23 @@ def with_adapter(
if credentials is not None:
key_id, key_secret = credentials
set_client_params("s3", key_id=key_id, key_secret=key_secret)
elif adapter == "azure":
scheme = "azure"
# Use azure-storage-blob
use_fs(False)
connection_string = azure_credentials_from_env()
assert connection_string is not None, "expected valid connection_string in env"
# Sometimes pass BlobServiceClient, and sometimes a connection string
if bool(random.getrandbits(1)):
from azure.storage.blob import BlobServiceClient

service: BlobServiceClient = BlobServiceClient.from_connection_string(
connection_string
)
set_client_params("azure", service=service)
else:
set_client_params("azure", connection_string=connection_string)

elif adapter == "fs":
# Use local file-system in a temp folder
tmp_dir = tempfile.mkdtemp()
Expand Down
113 changes: 113 additions & 0 deletions pathy/_tests/test_azure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
from dataclasses import dataclass

import mock
import pytest

from pathy import Pathy, get_client

from . import azure_testable
from .conftest import ENV_ID

AZURE_ADAPTER = ["azure"]


@pytest.mark.parametrize("adapter", AZURE_ADAPTER)
@pytest.mark.skipif(not azure_testable, reason="requires azure")
def test_azure_recreate_expected_args(with_adapter: str) -> None:
from pathy.azure import BucketClientAzure

# Must specify either service, or connection_string
with pytest.raises(ValueError):
BucketClientAzure()


@pytest.mark.parametrize("adapter", AZURE_ADAPTER)
@pytest.mark.skipif(not azure_testable, reason="requires azure")
def test_azure_bucket_get_blob_failure_cases(with_adapter: str, bucket: str) -> None:
root = Pathy(f"{with_adapter}://{bucket}")
client = root.client(root)
azure_bucket = client.get_bucket(root)
# Returns none if blob contains invalid character
assert azure_bucket.get_blob("invlid_bkt_nme_18$%^@57582397?.___asd") is None


@pytest.mark.parametrize("adapter", AZURE_ADAPTER)
@pytest.mark.skipif(not azure_testable, reason="requires azure")
def test_azure_bucket_client_list_blobs(with_adapter: str, bucket: str) -> None:
"""Test corner-case in Azure client that isn't easily reachable from Pathy"""
from pathy.azure import BucketClientAzure

client: BucketClientAzure = get_client("azure")

# Invalid bucket
root = Pathy("azure://invalid_h3gE_ds5daEf_Sdf15487t2n4")
blobs = list(client.list_blobs(root))
assert blobs == []

# Invalid search prefix
root = Pathy(f"azure://{bucket}")
blobs = list(client.list_blobs(root, prefix="#@/:|*?%@^@$^@$@#$@#$"))
assert blobs == []


@dataclass
class MockAzureBlobClientCopyProperties:
status: str
id: str


@dataclass
class MockAzureBlobClientProperties:
copy: MockAzureBlobClientCopyProperties


@dataclass
class MockAzureBlobClient:
aborted = False

def start_copy_from_url(self, url: str) -> None:
pass

def get_blob_properties(self) -> MockAzureBlobClientProperties:
return MockAzureBlobClientProperties(
MockAzureBlobClientCopyProperties(status="failed", id="test")
)

def abort_copy(self, copy_id: str) -> None:
self.aborted = True


@dataclass
class MockAzureBlobServiceClient:
mock_client: MockAzureBlobClient

def get_blob_client(self, container: str, blob_name: str) -> MockAzureBlobClient:
return self.mock_client


@pytest.mark.parametrize("adapter", AZURE_ADAPTER)
@pytest.mark.skipif(not azure_testable, reason="requires azure")
def test_azure_bucket_copy_blob_aborts_copy_on_failure(
with_adapter: str, bucket: str
) -> None:
"""Verify abort_copy is called when copy status != 'success'"""
root: Pathy = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/azure_abort_copy")

# Create a source blob to copy
cool_blob = root / "file.txt"
cool_blob.write_text("cool")
client = root.client(root)

# Get the bucket client / source blob
azure_bucket = client.get_bucket(root)
azure_blob = azure_bucket.get_blob(cool_blob.prefix[:-1])
assert azure_blob is not None

# Mock out blob client to catch `abort_copy` call
mock_blob_client = MockAzureBlobClient()
mock_bucket = MockAzureBlobServiceClient(mock_blob_client)
assert mock_blob_client.aborted is False
with mock.patch.object(azure_bucket, "client", new=mock_bucket):
assert azure_bucket.copy_blob(azure_blob, azure_bucket, "file2.txt") is None
# abort_copy was called
assert mock_blob_client.aborted is True
2 changes: 0 additions & 2 deletions pathy/_tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,6 @@ def test_client_base_bucket_client_raises_not_implemented() -> None:
client.is_dir(Pathy("gs://foo"))
with pytest.raises(NotImplementedError):
client.get_bucket(Pathy("gs://foo"))
with pytest.raises(NotImplementedError):
client.list_buckets()
with pytest.raises(NotImplementedError):
client.list_blobs(Pathy("gs://foo"))
with pytest.raises(NotImplementedError):
Expand Down
20 changes: 1 addition & 19 deletions pathy/_tests/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,7 @@
import mock
import pytest

from pathy import (
BlobFS,
BucketClientFS,
BucketFS,
ClientError,
Pathy,
ScanDirFS,
get_client,
)
from pathy import BlobFS, BucketClientFS, BucketFS, ClientError, Pathy, get_client

FS_ADAPTER = ["fs"]

Expand Down Expand Up @@ -110,13 +102,3 @@ def test_file_bucket_client_fs_list_blobs(with_adapter: str) -> None:

blobs = [b.name for b in client.list_blobs(root, prefix="foo/bar/baz")]
assert len(blobs) == 1


@pytest.mark.parametrize("adapter", FS_ADAPTER)
def test_file_scandir_list_buckets(
with_adapter: str, bucket: str, other_bucket: str
) -> None:
root = Pathy()
client = root.client(root)
scandir = ScanDirFS(client=client, path=root)
assert sorted([s.name for s in scandir]) == sorted([bucket, other_bucket])
13 changes: 0 additions & 13 deletions pathy/_tests/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,6 @@ def test_gcs_import_error_missing_deps() -> None:
get_client("gs")


@pytest.mark.parametrize("adapter", GCS_ADAPTER)
@pytest.mark.skipif(not gcs_testable, reason="requires gcs")
def test_gcs_scandir_list_buckets(
with_adapter: str, bucket: str, other_bucket: str
) -> None:
from pathy.gcs import ScanDirGCS

root = Pathy("gs://foo/bar")
client = root.client(root)
scandir = ScanDirGCS(client=client, path=Pathy())
assert sorted([s.name for s in scandir]) == sorted([bucket, other_bucket])


@pytest.mark.parametrize("adapter", GCS_ADAPTER)
@pytest.mark.skipif(not gcs_testable, reason="requires gcs")
def test_gcs_scandir_invalid_bucket_name(with_adapter: str) -> None:
Expand Down
Loading

0 comments on commit 9624856

Please sign in to comment.