Skip to content

Commit

Permalink
Better retries for IO, logging (#243)
Browse files Browse the repository at this point in the history
* Better retries for IO, logging

* Use a non-platform specific error code in test

* Fix test on windows
  • Loading branch information
carlosgjs authored Oct 16, 2023
1 parent 1fa1daf commit b55b0f2
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 64 deletions.
2 changes: 1 addition & 1 deletion .github/actions/setup/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ runs:
shell: sh
run: |
python3 -m ensurepip
pip3 install --upgrade pip
python3 -m pip install --upgrade pip
- name: Setup MPI
if: ${{ inputs.mpi == 'true' }}
uses: mpi4py/setup-mpi@v1
Expand Down
39 changes: 24 additions & 15 deletions src/noisepy/seis/correlate.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,20 @@ def init() -> List:
return [timespans]

[timespans] = scheduler.initialize(init, 1)
errors = False
failed = []
for its in scheduler.get_indices(timespans):
ts = timespans[its]
errors = errors or cc_timespan(raw_store, fft_params, cc_store, ts, pair_filter)
failed_pairs = cc_timespan(raw_store, fft_params, cc_store, ts, pair_filter)
if len(failed_pairs) > 0:
failed.extend((ts, failed_pairs))

tlog.log(f"Step 1 in total with {os.cpu_count()} cores", t_s1_total)
if errors:
raise RuntimeError("Errors occurred during cross-correlation. Check logs for details")
if len(failed):
failed_str = "\n".join(map(str, failed))
logger.error(
"Errors occurred during cross-correlation. Check logs for details. "
f"The following pairs failed:\n{failed_str}"
)


def cc_timespan(
Expand All @@ -110,8 +116,7 @@ def cc_timespan(
cc_store: CrossCorrelationDataStore,
ts: DateTimeRange,
pair_filter: Callable[[Channel, Channel], bool] = lambda src, rec: True,
) -> bool:
errors = False
) -> List[Tuple[Station, Station]]:
executor = ThreadPoolExecutor()
tlog = TimeLogger(logger, logging.INFO, prefix="CC Main")
"""
Expand Down Expand Up @@ -155,7 +160,7 @@ def cc_timespan(
)
if len(missing_channels) == 0:
logger.warning(f"{ts} already completed")
return False
return []

ch_data_tuples = _read_channels(
executor, ts, raw_store, missing_channels, fft_params.samp_freq, fft_params.single_freq
Expand All @@ -164,15 +169,15 @@ def cc_timespan(

if len(ch_data_tuples) == 0:
logger.warning(f"No data available for {ts}")
return False
return missing_pairs

tlog.log(f"Read channel data: {len(ch_data_tuples)} channels")
ch_data_tuples_pre = preprocess_all(executor, ch_data_tuples, raw_store, fft_params, ts)
del ch_data_tuples
tlog.log(f"Preprocess: {len(ch_data_tuples_pre)} channels")
if len(ch_data_tuples_pre) == 0:
logger.warning(f"No data available for {ts} after preprocessing")
return False
return missing_pairs

nchannels = len(ch_data_tuples_pre)
nseg_chunk = check_memory(fft_params, nchannels)
Expand Down Expand Up @@ -201,7 +206,7 @@ def cc_timespan(
Nfft = max(map(lambda d: d.length, fft_datas))
if Nfft == 0:
logger.error(f"No FFT data available for any channel in {ts}, skipping")
return True
return missing_pairs

if len(ffts) != nchannels:
logger.warning("it seems some stations miss data in download step, but it is OKAY!")
Expand Down Expand Up @@ -231,11 +236,15 @@ def cc_timespan(
)
tasks.append(t)
compute_results = get_results(tasks, "Cross correlation")
successes, save_tasks = zip(*compute_results)
errors = errors or not all(successes)
_, save_tasks = zip(*compute_results)
save_tasks = [t for t in save_tasks if t]
save_results = get_results(save_tasks, "Save correlations")
errors = errors or not all(save_results)
_ = get_results(save_tasks, "Save correlations")
failed_pairs = [
pair[0]
for pair, (comp_res, save_task) in zip(work_items, compute_results)
if not (comp_res and (save_task is None or save_task.result()))
]

save_exec.shutdown()
tlog.log("Correlate and write to store")

Expand All @@ -244,7 +253,7 @@ def cc_timespan(

tlog.log(f"Process the chunk of {ts}", t_chunk)
executor.shutdown()
return errors
return failed_pairs


def create_pairs(
Expand Down
35 changes: 11 additions & 24 deletions src/noisepy/seis/hierarchicalstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,21 @@
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from pathlib import Path
from time import sleep
from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, TypeVar

import fsspec
import numpy as np
from botocore.exceptions import ClientError
from datetimerange import DateTimeRange

from noisepy.seis.utils import io_retry

from .datatypes import AnnotatedData, Station
from .stores import timespan_str
from .utils import TimeLogger, fs_join, get_filesystem, unstack

META_ATTR = "metadata"
VERSION_ATTR = "version"
FIND_RETRIES = 5
FIND_RETRY_SLEEP = 0.05 # (seconds)
ERR_SLOWDOWN = "SlowDown"
FAKE_STA = "FAKE_STATION"

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -219,32 +217,21 @@ def contains(self, src_sta: Station, rec_sta: Station, timespan: DateTimeRange)
def _load_src(self, src: str):
if self.dir_cache.is_src_loaded(src):
return
paths = self._find(src)
logger.info(f"Loading directory cache for {src} - ix: {self.dir_cache.stations_idx.get(src, -4)}")
paths = io_retry(self._fs_find, src)

grouped_paths = defaultdict(list)
for rec_sta, timespan in [p for p in paths if p]:
grouped_paths[rec_sta].append(timespan)
for rec_sta, timespans in grouped_paths.items():
self.dir_cache.add(src, rec_sta, sorted(timespans, key=lambda t: t.start_datetime.timestamp()))
# if we didn't find any paths, add a fake entry so we don't try again and is_src_loaded returns True
if len(grouped_paths) == 0:
self.dir_cache.add(src, FAKE_STA, [])

def _find(self, src):
i = 0
while i < FIND_RETRIES:
try:
i += 1
paths = [
self.helper.parse_path(p)
for p in self.helper.get_fs().find(fs_join(self.helper.get_root_dir(), src))
]
return paths
except ClientError as e:
# when using S3 we sometimes get a SlowDown client error so back off if that happens
if e.response.get("Error", {}).get("Code", None) != ERR_SLOWDOWN:
logger.error(f"Got ClientError while listing {src}: {e}")
raise
logger.warning(f"Got ClientError while listing {src}: {e}. Retry {i} of {FIND_RETRIES}")
sleep(FIND_RETRY_SLEEP * i)
raise RuntimeError(f"Could not get directory listing for {src} after {FIND_RETRIES} retries")
def _fs_find(self, src):
paths = [self.helper.parse_path(p) for p in self.helper.get_fs().find(fs_join(self.helper.get_root_dir(), src))]
return paths

def append(self, timespan: DateTimeRange, src: Station, rec: Station, data: List[T]):
path = self._get_path(src, rec, timespan)
Expand Down
4 changes: 2 additions & 2 deletions src/noisepy/seis/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
SingleNodeScheduler,
)
from .stack import stack_cross_correlations
from .utils import fs_join, get_filesystem
from .utils import fs_join, get_filesystem, io_retry
from .zarrstore import ZarrCCStore, ZarrStackStore

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -236,7 +236,7 @@ def cmd_wrapper(cmd: Callable[[ConfigParameters], None], src_dir: str, tgt_dir:
storage_options = params.storage_options
makedir(tgt_dir, storage_options)
logger.info(f"Running {args.cmd.name}. Start: {params.start_date}. End: {params.end_date}")
params.save_yaml(fs_join(tgt_dir, CONFIG_FILE))
io_retry(params.save_yaml, fs_join(tgt_dir, CONFIG_FILE))
cmd(params)
except Exception as e:
logger.exception(e)
Expand Down
25 changes: 23 additions & 2 deletions src/noisepy/seis/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import errno
import logging
import os
import posixpath
import time
from concurrent.futures import Future
from typing import Any, Iterable, List
from typing import Any, Callable, Iterable, List
from urllib.parse import urlparse

import fsspec
Expand All @@ -15,6 +16,8 @@
from noisepy.seis.constants import AWS_EXECUTION_ENV

S3_SCHEME = "s3"
FIND_RETRIES = 5
FIND_RETRY_SLEEP = 0.1 # (100ms)
utils_logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -47,6 +50,23 @@ def get_fs_sep(path1: str) -> str:
return os.sep


def io_retry(func: Callable, *args, **kwargs) -> Any:
"""
Retry the given function up to FIND_RETRIES times if an OSError(EBUSY) is raised, sleeping between retries.
If the function still fails, the exception is raised.
"""
for i in range(1, FIND_RETRIES + 1):
try:
return func(*args, **kwargs)
except OSError as e:
# when using S3 we sometimes get a SlowDown client error which is turned into
# an OSError(EBUSY) so back off if that happens
if e.errno != errno.EBUSY or i >= FIND_RETRIES:
raise
utils_logger.warning(f"Retrying {func.__name__} after error: {e}. Retry {i} of {FIND_RETRIES}")
time.sleep(FIND_RETRY_SLEEP * i)


class TimeLogger:
"""
A utility class to measure and log the time spent in code fragments. The basic usage is to call::
Expand Down Expand Up @@ -122,7 +142,8 @@ def pbar_update(_: Future):

# Show a progress bar when processing futures
with logging_redirect_tqdm():
with tqdm(total=len(futures), desc=task_name, position=position) as pbar:
size = len(futures)
with tqdm(total=size, desc=task_name, position=position, miniters=int(size / 100)) as pbar:
for f in futures:
f.add_done_callback(pbar_update)
return [f.result() for f in futures]
Expand Down
49 changes: 29 additions & 20 deletions tests/test_hierarchicalstores.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import errno
from concurrent.futures import ThreadPoolExecutor
from typing import Tuple
from unittest import mock

import pytest
from botocore.exceptions import ClientError
from datetimerange import DateTimeRange
from fsspec.implementations.local import LocalFileSystem # noqa F401
from utils import date_range

from noisepy.seis.hierarchicalstores import (
ERR_SLOWDOWN,
FIND_RETRIES,
PairDirectoryCache,
)
from noisepy.seis.hierarchicalstores import PairDirectoryCache
from noisepy.seis.numpystore import NumpyArrayStore, NumpyCCStore
from noisepy.seis.utils import FIND_RETRIES, io_retry
from noisepy.seis.zarrstore import ZarrStoreHelper


Expand Down Expand Up @@ -59,6 +57,20 @@ def check_1day():
assert cache.contains("src", "rec", tsh1)


def test_concurrent():
cache = PairDirectoryCache()
ts1 = date_range(4, 1, 2)

sta = [f"s{i}" for i in range(100)]
exec = ThreadPoolExecutor()
res = list(exec.map(lambda s: cache.add(s, "rec", [ts1]), sta))
assert len(res) == len(sta)
assert not any(res)
for s in sta:
assert cache.is_src_loaded(s)
assert cache.contains(s, "rec", ts1)


numpy_paths = [
(
"some/path/CI.BAK/CI.ARV/2021_07_01_00_00_00T2021_07_02_00_00_00.tar.gz",
Expand Down Expand Up @@ -98,26 +110,23 @@ def test_zarr_parse_path(tmp_path, path: str, expected: Tuple[str, DateTimeRange

@mock.patch("fsspec.implementations.local.LocalFileSystem.find")
def test_find(find_mock, tmp_path):
def retry_find():
io_retry(store._fs_find, "foo")

store = NumpyCCStore(str(tmp_path), "r")
find_mock.side_effect = ClientError({"Error": {"Code": ERR_SLOWDOWN}}, "ListObjectsV2")
with pytest.raises(RuntimeError):
store._find("foo")
find_mock.side_effect = OSError(errno.EBUSY, "busy")
with pytest.raises(OSError):
retry_find()
assert FIND_RETRIES == find_mock.call_count

# if it's not a SlowDown error then we shouldn't retry
find_mock.side_effect = ClientError({"Error": {"Code": "other error"}}, "ListObjectsV2")
with pytest.raises(ClientError):
store._find("foo")
find_mock.side_effect = OSError(errno.EFAULT, "auth")
with pytest.raises(OSError):
retry_find()
assert FIND_RETRIES + 1 == find_mock.call_count

# same with other type of ClientError
find_mock.side_effect = ClientError({}, "operation")
with pytest.raises(ClientError):
store._find("foo")
assert FIND_RETRIES + 2 == find_mock.call_count

# same with other type of exceptoins
find_mock.side_effect = Exception()
with pytest.raises(Exception):
store._find("foo")
assert FIND_RETRIES + 3 == find_mock.call_count
retry_find()
assert FIND_RETRIES + 2 == find_mock.call_count

0 comments on commit b55b0f2

Please sign in to comment.