Skip to content

Commit

Permalink
abstract file mixin
Browse files Browse the repository at this point in the history
  • Loading branch information
itcarroll committed Sep 30, 2024
1 parent 4aa1223 commit 95f7806
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 51 deletions.
5 changes: 3 additions & 2 deletions earthaccess/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import requests
import s3fs
from fsspec import AbstractFileSystem
from fsspec.spec import AbstractBufferedFile
from typing_extensions import Any, Dict, List, Optional, Union, deprecated

import earthaccess
Expand All @@ -11,7 +12,7 @@
from .auth import Auth
from .results import DataCollection, DataGranule
from .search import CollectionQuery, DataCollections, DataGranules, GranuleQuery
from .store import EarthAccessFile, Store
from .store import Store
from .system import PROD, System
from .utils import _validation as validate

Expand Down Expand Up @@ -242,7 +243,7 @@ def download(
def open(
granules: Union[List[str], List[DataGranule]],
provider: Optional[str] = None,
) -> List[EarthAccessFile]:
) -> List[AbstractBufferedFile]:
"""Returns a list of file-like objects that can be used to access files
hosted on S3 or HTTPS by third party libraries like xarray.
Expand Down
91 changes: 51 additions & 40 deletions earthaccess/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
from functools import lru_cache
from itertools import chain
from pathlib import Path
from pickle import dumps, loads
from typing import Any, Dict, List, Mapping, Optional, Tuple, Union
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union
from uuid import uuid4

import fsspec
import fsspec.implementations.http as httpfs
import requests
import s3fs
from multimethod import multimethod as singledispatchmethod
Expand All @@ -26,71 +26,82 @@
logger = logging.getLogger(__name__)


class EarthAccessFile:
"""Handle for a file-like object pointing to an on-prem or Earthdata Cloud granule."""

class EarthaccessMixin:
# doc: explain explicit mapping
def __init__(
self, f: fsspec.spec.AbstractBufferedFile, granule: DataGranule
) -> None:
"""EarthAccessFile connects an Earthdata search result with an open file-like object.
No methods exist on the class, which passes all attribute and method calls
directly to the file-like object given during initialization. An instance of
this class can be treated like that file-like object itself.
Parameters:
f: a file-like object
granule: a granule search result
"""
self.f = f
self,
f: fsspec.spec.AbstractBufferedFile,
granule: Union[DataGranule, None],
):
self.__dict__.update(f.__dict__)
self.granule = granule

def __getattr__(self, method: str) -> Any:
return getattr(self.f, method)

def __reduce__(self) -> Any:
def __reduce__(self) -> fsspec.spec.AbstractBufferedFile:
return make_instance, (
type(self.f),
type(self),
self.granule,
earthaccess.__auth__,
dumps(self.f),
*super().__reduce__(),
)

def __repr__(self) -> str:
return repr(self.f)

class EarthaccessS3File(EarthaccessMixin, s3fs.S3File):
pass


class EarthaccessHTTPFile(EarthaccessMixin, httpfs.HTTPFile):
pass


class EarthaccessHTTPStreamFile(EarthaccessMixin, httpfs.HTTPStreamFile):
pass


earthaccess_file_type = {
s3fs.S3File: EarthaccessS3File,
httpfs.HTTPFile: EarthaccessHTTPFile,
httpfs.HTTPStreamFile: EarthaccessHTTPStreamFile,
}


def _open_files(
url_mapping: Mapping[str, Union[DataGranule, None]],
fs: fsspec.AbstractFileSystem,
threads: Optional[int] = 8,
) -> List[EarthAccessFile]:
def multi_thread_open(data: tuple) -> EarthAccessFile:
urls, granule = data
return EarthAccessFile(fs.open(urls), granule)
) -> List[fsspec.spec.AbstractBufferedFile]:
def multi_thread_open(data: Tuple) -> fsspec.spec.AbstractBufferedFile:
url, granule = data
f = fs.open(url)
cls = earthaccess_file_type[type(f)]
return cls(f, granule)

fileset = pqdm(url_mapping.items(), multi_thread_open, n_jobs=threads)
return fileset


def make_instance(
cls: Any, granule: DataGranule, auth: Auth, data: Any
) -> EarthAccessFile:
cls: type,
granule: DataGranule,
auth: Auth,
fun: Callable,
fun_args: Tuple,
) -> fsspec.spec.AbstractBufferedFile:
"""Callable that re-creates an object from its serialized (pickled) self."""
# Attempt to re-authenticate
if not earthaccess.__auth__.authenticated:
earthaccess.__auth__ = auth
earthaccess.login()

# When sending EarthAccessFiles between processes, it's possible that
# we will need to switch between s3 <--> https protocols.
if (earthaccess.__store__.in_region and cls is not s3fs.S3File) or (
not earthaccess.__store__.in_region and cls is s3fs.S3File
):
# When sending an AbstractBufferedFile between processes, it's possible that
# we will need to switch between s3 <--> https protocols. We will re-open if we
# could use s3 AND the current filesystem does not (OR the converse); an XOR.
if earthaccess.__store__.in_region != (cls is EarthaccessS3File):
# NOTE: This uses the first data_link listed in the granule. That's not
# guaranteed to be the right one.
return EarthAccessFile(earthaccess.open([granule])[0], granule)
return earthaccess.open([granule])[0]
# Otherwise, use fun and fun_args from super().__reduce__()
else:
return EarthAccessFile(loads(data), granule)
return cls(fun(*fun_args), granule)


def _get_url_granule_mapping(
Expand Down Expand Up @@ -336,7 +347,7 @@ def open(
self,
granules: Union[List[str], List[DataGranule]],
provider: Optional[str] = None,
) -> List[EarthAccessFile]:
) -> List[fsspec.spec.AbstractBufferedFile]:
"""Returns a list of file-like objects that can be used to access files
hosted on S3 or HTTPS by third party libraries like xarray.
Expand Down
39 changes: 30 additions & 9 deletions tests/unit/test_store.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# package imports
import os
import pickle
import unittest

import earthaccess
import fsspec
import pytest
import responses
import s3fs
from earthaccess import Auth, Store
from earthaccess.store import EarthAccessFile
from vcr.unittest import VCRTestCase # type: ignore[import-untyped]


class TestStoreSessions(unittest.TestCase):
Expand Down Expand Up @@ -129,10 +130,30 @@ def test_store_can_create_s3_fsspec_session(self):
return None


def test_earthaccess_file_getattr():
fs = fsspec.filesystem("memory")
with fs.open("/foo", "wb") as f:
earthaccess_file = EarthAccessFile(f, granule="foo")
assert f.tell() == earthaccess_file.tell()
# cleanup
fs.store.clear()
class TestStoreOpen(VCRTestCase):
def _get_vcr(self, **kwargs):
myvcr = super()._get_vcr(**kwargs)
myvcr.cassette_library_dir = "tests/unit/fixtures/vcr_cassettes"
return myvcr

def test_round_trip_granules_pickle(self):
granules = earthaccess.search_data(
short_name="VIIRSJ1_L3m_CHL",
granule_name="*.9km.*",
count=1,
)
open_files = earthaccess.open(granules)
serialized = pickle.dumps(open_files)
assert open_files == pickle.loads(serialized)

def test_round_trip_urls_pickle(self):
granules = earthaccess.search_data(
short_name="VIIRSJ1_L3m_CHL",
granule_name="*.9km.*",
count=1,
)
urls = [i.data_links(access="on_prem")[0] for i in granules]
open_files = earthaccess.open(urls)
serialized = pickle.dumps(open_files)
assert open_files == pickle.loads(serialized)
# TODO make the test attempt to change region

0 comments on commit 95f7806

Please sign in to comment.