From b55b0f2693bac5b2a1807e9823b0d20fef810429 Mon Sep 17 00:00:00 2001 From: carlosgjs Date: Mon, 16 Oct 2023 16:22:35 -0700 Subject: [PATCH] Better retries for IO, logging (#243) * Better retries for IO, logging * Use a non-platform specific error code in test * Fix test on windows --- .github/actions/setup/action.yaml | 2 +- src/noisepy/seis/correlate.py | 39 ++++++++++++-------- src/noisepy/seis/hierarchicalstores.py | 35 ++++++------------ src/noisepy/seis/main.py | 4 +-- src/noisepy/seis/utils.py | 25 +++++++++++-- tests/test_hierarchicalstores.py | 49 +++++++++++++++----------- 6 files changed, 90 insertions(+), 64 deletions(-) diff --git a/.github/actions/setup/action.yaml b/.github/actions/setup/action.yaml index e8d670a2..1e0d4f30 100644 --- a/.github/actions/setup/action.yaml +++ b/.github/actions/setup/action.yaml @@ -19,7 +19,7 @@ runs: shell: sh run: | python3 -m ensurepip - pip3 install --upgrade pip + python3 -m pip install --upgrade pip - name: Setup MPI if: ${{ inputs.mpi == 'true' }} uses: mpi4py/setup-mpi@v1 diff --git a/src/noisepy/seis/correlate.py b/src/noisepy/seis/correlate.py index 9fd48698..02258c93 100644 --- a/src/noisepy/seis/correlate.py +++ b/src/noisepy/seis/correlate.py @@ -94,14 +94,20 @@ def init() -> List: return [timespans] [timespans] = scheduler.initialize(init, 1) - errors = False + failed = [] for its in scheduler.get_indices(timespans): ts = timespans[its] - errors = errors or cc_timespan(raw_store, fft_params, cc_store, ts, pair_filter) + failed_pairs = cc_timespan(raw_store, fft_params, cc_store, ts, pair_filter) + if len(failed_pairs) > 0: + failed.extend((ts, failed_pairs)) tlog.log(f"Step 1 in total with {os.cpu_count()} cores", t_s1_total) - if errors: - raise RuntimeError("Errors occurred during cross-correlation. Check logs for details") + if len(failed): + failed_str = "\n".join(map(str, failed)) + logger.error( + "Errors occurred during cross-correlation. Check logs for details. " + f"The following pairs failed:\n{failed_str}" + ) def cc_timespan( @@ -110,8 +116,7 @@ def cc_timespan( cc_store: CrossCorrelationDataStore, ts: DateTimeRange, pair_filter: Callable[[Channel, Channel], bool] = lambda src, rec: True, -) -> bool: - errors = False +) -> List[Tuple[Station, Station]]: executor = ThreadPoolExecutor() tlog = TimeLogger(logger, logging.INFO, prefix="CC Main") """ @@ -155,7 +160,7 @@ def cc_timespan( ) if len(missing_channels) == 0: logger.warning(f"{ts} already completed") - return False + return [] ch_data_tuples = _read_channels( executor, ts, raw_store, missing_channels, fft_params.samp_freq, fft_params.single_freq @@ -164,7 +169,7 @@ def cc_timespan( if len(ch_data_tuples) == 0: logger.warning(f"No data available for {ts}") - return False + return missing_pairs tlog.log(f"Read channel data: {len(ch_data_tuples)} channels") ch_data_tuples_pre = preprocess_all(executor, ch_data_tuples, raw_store, fft_params, ts) @@ -172,7 +177,7 @@ def cc_timespan( tlog.log(f"Preprocess: {len(ch_data_tuples_pre)} channels") if len(ch_data_tuples_pre) == 0: logger.warning(f"No data available for {ts} after preprocessing") - return False + return missing_pairs nchannels = len(ch_data_tuples_pre) nseg_chunk = check_memory(fft_params, nchannels) @@ -201,7 +206,7 @@ def cc_timespan( Nfft = max(map(lambda d: d.length, fft_datas)) if Nfft == 0: logger.error(f"No FFT data available for any channel in {ts}, skipping") - return True + return missing_pairs if len(ffts) != nchannels: logger.warning("it seems some stations miss data in download step, but it is OKAY!") @@ -231,11 +236,15 @@ def cc_timespan( ) tasks.append(t) compute_results = get_results(tasks, "Cross correlation") - successes, save_tasks = zip(*compute_results) - errors = errors or not all(successes) + _, save_tasks = zip(*compute_results) save_tasks = [t for t in save_tasks if t] - save_results = get_results(save_tasks, "Save correlations") - errors = errors or not all(save_results) + _ = get_results(save_tasks, "Save correlations") + failed_pairs = [ + pair[0] + for pair, (comp_res, save_task) in zip(work_items, compute_results) + if not (comp_res and (save_task is None or save_task.result())) + ] + save_exec.shutdown() tlog.log("Correlate and write to store") @@ -244,7 +253,7 @@ def cc_timespan( tlog.log(f"Process the chunk of {ts}", t_chunk) executor.shutdown() - return errors + return failed_pairs def create_pairs( diff --git a/src/noisepy/seis/hierarchicalstores.py b/src/noisepy/seis/hierarchicalstores.py index 8c1bc3b6..c5bdfdfb 100644 --- a/src/noisepy/seis/hierarchicalstores.py +++ b/src/noisepy/seis/hierarchicalstores.py @@ -6,23 +6,21 @@ from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone from pathlib import Path -from time import sleep from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, TypeVar import fsspec import numpy as np -from botocore.exceptions import ClientError from datetimerange import DateTimeRange +from noisepy.seis.utils import io_retry + from .datatypes import AnnotatedData, Station from .stores import timespan_str from .utils import TimeLogger, fs_join, get_filesystem, unstack META_ATTR = "metadata" VERSION_ATTR = "version" -FIND_RETRIES = 5 -FIND_RETRY_SLEEP = 0.05 # (seconds) -ERR_SLOWDOWN = "SlowDown" +FAKE_STA = "FAKE_STATION" logger = logging.getLogger(__name__) @@ -219,32 +217,21 @@ def contains(self, src_sta: Station, rec_sta: Station, timespan: DateTimeRange) def _load_src(self, src: str): if self.dir_cache.is_src_loaded(src): return - paths = self._find(src) + logger.info(f"Loading directory cache for {src} - ix: {self.dir_cache.stations_idx.get(src, -4)}") + paths = io_retry(self._fs_find, src) grouped_paths = defaultdict(list) for rec_sta, timespan in [p for p in paths if p]: grouped_paths[rec_sta].append(timespan) for rec_sta, timespans in grouped_paths.items(): self.dir_cache.add(src, rec_sta, sorted(timespans, key=lambda t: t.start_datetime.timestamp())) + # if we didn't find any paths, add a fake entry so we don't try again and is_src_loaded returns True + if len(grouped_paths) == 0: + self.dir_cache.add(src, FAKE_STA, []) - def _find(self, src): - i = 0 - while i < FIND_RETRIES: - try: - i += 1 - paths = [ - self.helper.parse_path(p) - for p in self.helper.get_fs().find(fs_join(self.helper.get_root_dir(), src)) - ] - return paths - except ClientError as e: - # when using S3 we sometimes get a SlowDown client error so back off if that happens - if e.response.get("Error", {}).get("Code", None) != ERR_SLOWDOWN: - logger.error(f"Got ClientError while listing {src}: {e}") - raise - logger.warning(f"Got ClientError while listing {src}: {e}. Retry {i} of {FIND_RETRIES}") - sleep(FIND_RETRY_SLEEP * i) - raise RuntimeError(f"Could not get directory listing for {src} after {FIND_RETRIES} retries") + def _fs_find(self, src): + paths = [self.helper.parse_path(p) for p in self.helper.get_fs().find(fs_join(self.helper.get_root_dir(), src))] + return paths def append(self, timespan: DateTimeRange, src: Station, rec: Station, data: List[T]): path = self._get_path(src, rec, timespan) diff --git a/src/noisepy/seis/main.py b/src/noisepy/seis/main.py index 218aea6c..6bac9f53 100644 --- a/src/noisepy/seis/main.py +++ b/src/noisepy/seis/main.py @@ -28,7 +28,7 @@ SingleNodeScheduler, ) from .stack import stack_cross_correlations -from .utils import fs_join, get_filesystem +from .utils import fs_join, get_filesystem, io_retry from .zarrstore import ZarrCCStore, ZarrStackStore logger = logging.getLogger(__name__) @@ -236,7 +236,7 @@ def cmd_wrapper(cmd: Callable[[ConfigParameters], None], src_dir: str, tgt_dir: storage_options = params.storage_options makedir(tgt_dir, storage_options) logger.info(f"Running {args.cmd.name}. Start: {params.start_date}. End: {params.end_date}") - params.save_yaml(fs_join(tgt_dir, CONFIG_FILE)) + io_retry(params.save_yaml, fs_join(tgt_dir, CONFIG_FILE)) cmd(params) except Exception as e: logger.exception(e) diff --git a/src/noisepy/seis/utils.py b/src/noisepy/seis/utils.py index 07498930..adb426cf 100644 --- a/src/noisepy/seis/utils.py +++ b/src/noisepy/seis/utils.py @@ -1,9 +1,10 @@ +import errno import logging import os import posixpath import time from concurrent.futures import Future -from typing import Any, Iterable, List +from typing import Any, Callable, Iterable, List from urllib.parse import urlparse import fsspec @@ -15,6 +16,8 @@ from noisepy.seis.constants import AWS_EXECUTION_ENV S3_SCHEME = "s3" +FIND_RETRIES = 5 +FIND_RETRY_SLEEP = 0.1 # (100ms) utils_logger = logging.getLogger(__name__) @@ -47,6 +50,23 @@ def get_fs_sep(path1: str) -> str: return os.sep +def io_retry(func: Callable, *args, **kwargs) -> Any: + """ + Retry the given function up to FIND_RETRIES times if an OSError(EBUSY) is raised, sleeping between retries. + If the function still fails, the exception is raised. + """ + for i in range(1, FIND_RETRIES + 1): + try: + return func(*args, **kwargs) + except OSError as e: + # when using S3 we sometimes get a SlowDown client error which is turned into + # an OSError(EBUSY) so back off if that happens + if e.errno != errno.EBUSY or i >= FIND_RETRIES: + raise + utils_logger.warning(f"Retrying {func.__name__} after error: {e}. Retry {i} of {FIND_RETRIES}") + time.sleep(FIND_RETRY_SLEEP * i) + + class TimeLogger: """ A utility class to measure and log the time spent in code fragments. The basic usage is to call:: @@ -122,7 +142,8 @@ def pbar_update(_: Future): # Show a progress bar when processing futures with logging_redirect_tqdm(): - with tqdm(total=len(futures), desc=task_name, position=position) as pbar: + size = len(futures) + with tqdm(total=size, desc=task_name, position=position, miniters=int(size / 100)) as pbar: for f in futures: f.add_done_callback(pbar_update) return [f.result() for f in futures] diff --git a/tests/test_hierarchicalstores.py b/tests/test_hierarchicalstores.py index 772c06cb..b3be9cd4 100644 --- a/tests/test_hierarchicalstores.py +++ b/tests/test_hierarchicalstores.py @@ -1,18 +1,16 @@ +import errno +from concurrent.futures import ThreadPoolExecutor from typing import Tuple from unittest import mock import pytest -from botocore.exceptions import ClientError from datetimerange import DateTimeRange from fsspec.implementations.local import LocalFileSystem # noqa F401 from utils import date_range -from noisepy.seis.hierarchicalstores import ( - ERR_SLOWDOWN, - FIND_RETRIES, - PairDirectoryCache, -) +from noisepy.seis.hierarchicalstores import PairDirectoryCache from noisepy.seis.numpystore import NumpyArrayStore, NumpyCCStore +from noisepy.seis.utils import FIND_RETRIES, io_retry from noisepy.seis.zarrstore import ZarrStoreHelper @@ -59,6 +57,20 @@ def check_1day(): assert cache.contains("src", "rec", tsh1) +def test_concurrent(): + cache = PairDirectoryCache() + ts1 = date_range(4, 1, 2) + + sta = [f"s{i}" for i in range(100)] + exec = ThreadPoolExecutor() + res = list(exec.map(lambda s: cache.add(s, "rec", [ts1]), sta)) + assert len(res) == len(sta) + assert not any(res) + for s in sta: + assert cache.is_src_loaded(s) + assert cache.contains(s, "rec", ts1) + + numpy_paths = [ ( "some/path/CI.BAK/CI.ARV/2021_07_01_00_00_00T2021_07_02_00_00_00.tar.gz", @@ -98,26 +110,23 @@ def test_zarr_parse_path(tmp_path, path: str, expected: Tuple[str, DateTimeRange @mock.patch("fsspec.implementations.local.LocalFileSystem.find") def test_find(find_mock, tmp_path): + def retry_find(): + io_retry(store._fs_find, "foo") + store = NumpyCCStore(str(tmp_path), "r") - find_mock.side_effect = ClientError({"Error": {"Code": ERR_SLOWDOWN}}, "ListObjectsV2") - with pytest.raises(RuntimeError): - store._find("foo") + find_mock.side_effect = OSError(errno.EBUSY, "busy") + with pytest.raises(OSError): + retry_find() assert FIND_RETRIES == find_mock.call_count # if it's not a SlowDown error then we shouldn't retry - find_mock.side_effect = ClientError({"Error": {"Code": "other error"}}, "ListObjectsV2") - with pytest.raises(ClientError): - store._find("foo") + find_mock.side_effect = OSError(errno.EFAULT, "auth") + with pytest.raises(OSError): + retry_find() assert FIND_RETRIES + 1 == find_mock.call_count - # same with other type of ClientError - find_mock.side_effect = ClientError({}, "operation") - with pytest.raises(ClientError): - store._find("foo") - assert FIND_RETRIES + 2 == find_mock.call_count - # same with other type of exceptoins find_mock.side_effect = Exception() with pytest.raises(Exception): - store._find("foo") - assert FIND_RETRIES + 3 == find_mock.call_count + retry_find() + assert FIND_RETRIES + 2 == find_mock.call_count