Skip to content

Commit

Permalink
Merge pull request #16 from pbranson/fix_walk
Browse files Browse the repository at this point in the history
Fix walk
  • Loading branch information
pbranson authored Aug 9, 2021
2 parents 36a6ada + 004eaad commit 90fa079
Show file tree
Hide file tree
Showing 6 changed files with 423 additions and 4 deletions.
File renamed without changes.
File renamed without changes.
415 changes: 415 additions & 0 deletions notebooks/catalog_basic_CSIRO_SWAN.ipynb

Large diffs are not rendered by default.

File renamed without changes.
2 changes: 1 addition & 1 deletion rompy/intake.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def _open_dataset(self):
futures = [__open_preprocess(url,self.chunks,self.ds_filters,self.xarray_kwargs) for url in self.urlpath]
dsets = compute(*futures,traverse=False)
if len(dsets[0].lead) == 1: # Assumes this indicates that each timestep of forecase is separate file
inits = [to_datetime(ds.init.values[0]) for ds in dsets]
inits = sorted([to_datetime(ds.init.values[0]) for ds in dsets])
dsets_concat = []
for i in set(inits):
subset = [ds for ds in dsets if ds.init.values[0] == i]
Expand Down
10 changes: 7 additions & 3 deletions rompy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ def dict_product(d):
yield dict(zip(keys, element))

def walk_server(urlpath, fn_fmt, fmt_fields, url_replace):
from os.path import dirname
from functools import reduce
from operator import iconcat
from dask import delayed, compute
import dask.config as dc

# Targetted scans of the file system based on date range
test_urls = set([urlpath.format(**pv) for pv in dict_product(fmt_fields)])
Expand All @@ -37,12 +35,16 @@ def walk_server(urlpath, fn_fmt, fmt_fields, url_replace):
def check_url(test_url,test_fns):
from fsspec import filesystem
from fsspec.utils import get_protocol
from os.path import dirname
fs = filesystem(get_protocol(test_url))
logger.debug(f'testing {test_url}')
urls = []
if fs.exists(test_url):
for url, _ , links in fs.walk(test_url):
urls += [dirname(url) + '/' + fn for fn in links if fn in test_fns]
# test for case that url is a local directory, otherwise likely a http url
if fs.isfile(url):
url = dirname(url)
urls += [url + '/' + fn for fn in links if fn in test_fns]
return urls

valid_urls = compute(*[check_url(test_url,test_fns) for test_url in test_urls],
Expand All @@ -55,6 +57,8 @@ def check_url(test_url,test_fns):

for f,r in url_replace.items():
valid_urls = [u.replace(f,r) for u in valid_urls]

logger.debug(f'valid_urls after replace : {valid_urls}')

return valid_urls

Expand Down

0 comments on commit 90fa079

Please sign in to comment.