From 95f780656d228493a70f0563b3d5520caf3e9b78 Mon Sep 17 00:00:00 2001 From: Ian Carroll Date: Mon, 30 Sep 2024 14:33:15 -0400 Subject: [PATCH 1/2] abstract file mixin --- earthaccess/api.py | 5 ++- earthaccess/store.py | 91 ++++++++++++++++++++++------------------ tests/unit/test_store.py | 39 +++++++++++++---- 3 files changed, 84 insertions(+), 51 deletions(-) diff --git a/earthaccess/api.py b/earthaccess/api.py index 4b4b1698..fce20c61 100644 --- a/earthaccess/api.py +++ b/earthaccess/api.py @@ -3,6 +3,7 @@ import requests import s3fs from fsspec import AbstractFileSystem +from fsspec.spec import AbstractBufferedFile from typing_extensions import Any, Dict, List, Optional, Union, deprecated import earthaccess @@ -11,7 +12,7 @@ from .auth import Auth from .results import DataCollection, DataGranule from .search import CollectionQuery, DataCollections, DataGranules, GranuleQuery -from .store import EarthAccessFile, Store +from .store import Store from .system import PROD, System from .utils import _validation as validate @@ -242,7 +243,7 @@ def download( def open( granules: Union[List[str], List[DataGranule]], provider: Optional[str] = None, -) -> List[EarthAccessFile]: +) -> List[AbstractBufferedFile]: """Returns a list of file-like objects that can be used to access files hosted on S3 or HTTPS by third party libraries like xarray. diff --git a/earthaccess/store.py b/earthaccess/store.py index 61437542..4b3d7bc6 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -5,11 +5,11 @@ from functools import lru_cache from itertools import chain from pathlib import Path -from pickle import dumps, loads -from typing import Any, Dict, List, Mapping, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union from uuid import uuid4 import fsspec +import fsspec.implementations.http as httpfs import requests import s3fs from multimethod import multimethod as singledispatchmethod @@ -26,71 +26,82 @@ logger = logging.getLogger(__name__) -class EarthAccessFile: - """Handle for a file-like object pointing to an on-prem or Earthdata Cloud granule.""" - +class EarthaccessMixin: + # doc: explain explicit mapping def __init__( - self, f: fsspec.spec.AbstractBufferedFile, granule: DataGranule - ) -> None: - """EarthAccessFile connects an Earthdata search result with an open file-like object. - - No methods exist on the class, which passes all attribute and method calls - directly to the file-like object given during initialization. An instance of - this class can be treated like that file-like object itself. - - Parameters: - f: a file-like object - granule: a granule search result - """ - self.f = f + self, + f: fsspec.spec.AbstractBufferedFile, + granule: Union[DataGranule, None], + ): + self.__dict__.update(f.__dict__) self.granule = granule - def __getattr__(self, method: str) -> Any: - return getattr(self.f, method) - - def __reduce__(self) -> Any: + def __reduce__(self) -> fsspec.spec.AbstractBufferedFile: return make_instance, ( - type(self.f), + type(self), self.granule, earthaccess.__auth__, - dumps(self.f), + *super().__reduce__(), ) - def __repr__(self) -> str: - return repr(self.f) + +class EarthaccessS3File(EarthaccessMixin, s3fs.S3File): + pass + + +class EarthaccessHTTPFile(EarthaccessMixin, httpfs.HTTPFile): + pass + + +class EarthaccessHTTPStreamFile(EarthaccessMixin, httpfs.HTTPStreamFile): + pass + + +earthaccess_file_type = { + s3fs.S3File: EarthaccessS3File, + httpfs.HTTPFile: EarthaccessHTTPFile, + httpfs.HTTPStreamFile: EarthaccessHTTPStreamFile, +} def _open_files( url_mapping: Mapping[str, Union[DataGranule, None]], fs: fsspec.AbstractFileSystem, threads: Optional[int] = 8, -) -> List[EarthAccessFile]: - def multi_thread_open(data: tuple) -> EarthAccessFile: - urls, granule = data - return EarthAccessFile(fs.open(urls), granule) +) -> List[fsspec.spec.AbstractBufferedFile]: + def multi_thread_open(data: Tuple) -> fsspec.spec.AbstractBufferedFile: + url, granule = data + f = fs.open(url) + cls = earthaccess_file_type[type(f)] + return cls(f, granule) fileset = pqdm(url_mapping.items(), multi_thread_open, n_jobs=threads) return fileset def make_instance( - cls: Any, granule: DataGranule, auth: Auth, data: Any -) -> EarthAccessFile: + cls: type, + granule: DataGranule, + auth: Auth, + fun: Callable, + fun_args: Tuple, +) -> fsspec.spec.AbstractBufferedFile: + """Callable that re-creates an object from its serialized (pickled) self.""" # Attempt to re-authenticate if not earthaccess.__auth__.authenticated: earthaccess.__auth__ = auth earthaccess.login() - # When sending EarthAccessFiles between processes, it's possible that - # we will need to switch between s3 <--> https protocols. - if (earthaccess.__store__.in_region and cls is not s3fs.S3File) or ( - not earthaccess.__store__.in_region and cls is s3fs.S3File - ): + # When sending an AbstractBufferedFile between processes, it's possible that + # we will need to switch between s3 <--> https protocols. We will re-open if we + # could use s3 AND the current filesystem does not (OR the converse); an XOR. + if earthaccess.__store__.in_region != (cls is EarthaccessS3File): # NOTE: This uses the first data_link listed in the granule. That's not # guaranteed to be the right one. - return EarthAccessFile(earthaccess.open([granule])[0], granule) + return earthaccess.open([granule])[0] + # Otherwise, use fun and fun_args from super().__reduce__() else: - return EarthAccessFile(loads(data), granule) + return cls(fun(*fun_args), granule) def _get_url_granule_mapping( @@ -336,7 +347,7 @@ def open( self, granules: Union[List[str], List[DataGranule]], provider: Optional[str] = None, - ) -> List[EarthAccessFile]: + ) -> List[fsspec.spec.AbstractBufferedFile]: """Returns a list of file-like objects that can be used to access files hosted on S3 or HTTPS by third party libraries like xarray. diff --git a/tests/unit/test_store.py b/tests/unit/test_store.py index 9b12c267..aa644dcb 100644 --- a/tests/unit/test_store.py +++ b/tests/unit/test_store.py @@ -1,13 +1,14 @@ -# package imports import os +import pickle import unittest +import earthaccess import fsspec import pytest import responses import s3fs from earthaccess import Auth, Store -from earthaccess.store import EarthAccessFile +from vcr.unittest import VCRTestCase # type: ignore[import-untyped] class TestStoreSessions(unittest.TestCase): @@ -129,10 +130,30 @@ def test_store_can_create_s3_fsspec_session(self): return None -def test_earthaccess_file_getattr(): - fs = fsspec.filesystem("memory") - with fs.open("/foo", "wb") as f: - earthaccess_file = EarthAccessFile(f, granule="foo") - assert f.tell() == earthaccess_file.tell() - # cleanup - fs.store.clear() +class TestStoreOpen(VCRTestCase): + def _get_vcr(self, **kwargs): + myvcr = super()._get_vcr(**kwargs) + myvcr.cassette_library_dir = "tests/unit/fixtures/vcr_cassettes" + return myvcr + + def test_round_trip_granules_pickle(self): + granules = earthaccess.search_data( + short_name="VIIRSJ1_L3m_CHL", + granule_name="*.9km.*", + count=1, + ) + open_files = earthaccess.open(granules) + serialized = pickle.dumps(open_files) + assert open_files == pickle.loads(serialized) + + def test_round_trip_urls_pickle(self): + granules = earthaccess.search_data( + short_name="VIIRSJ1_L3m_CHL", + granule_name="*.9km.*", + count=1, + ) + urls = [i.data_links(access="on_prem")[0] for i in granules] + open_files = earthaccess.open(urls) + serialized = pickle.dumps(open_files) + assert open_files == pickle.loads(serialized) + # TODO make the test attempt to change region From af8030d6abd6d1a679c0d0f6f206d6bd755f010a Mon Sep 17 00:00:00 2001 From: Ian Carroll Date: Mon, 30 Sep 2024 15:10:51 -0400 Subject: [PATCH 2/2] auth in test --- tests/unit/test_store.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/test_store.py b/tests/unit/test_store.py index aa644dcb..e60576a5 100644 --- a/tests/unit/test_store.py +++ b/tests/unit/test_store.py @@ -137,6 +137,7 @@ def _get_vcr(self, **kwargs): return myvcr def test_round_trip_granules_pickle(self): + earthaccess.login(strategy="netrc") granules = earthaccess.search_data( short_name="VIIRSJ1_L3m_CHL", granule_name="*.9km.*", @@ -147,6 +148,7 @@ def test_round_trip_granules_pickle(self): assert open_files == pickle.loads(serialized) def test_round_trip_urls_pickle(self): + earthaccess.login(strategy="netrc") granules = earthaccess.search_data( short_name="VIIRSJ1_L3m_CHL", granule_name="*.9km.*",