From 50e8e13f6cd7c82e6ca1cd74a603ee1077976221 Mon Sep 17 00:00:00 2001 From: Carlos Garcia Jurado Suarez Date: Mon, 16 Oct 2023 16:34:44 -0700 Subject: [PATCH] refactor: Decouple data loading from plot_all_moveout --- src/noisepy/seis/datatypes.py | 11 +++++-- src/noisepy/seis/numpystore.py | 6 ++-- src/noisepy/seis/plotting_modules.py | 33 +++++++++---------- src/noisepy/seis/stores.py | 15 +++++++++ tutorials/get_started.ipynb | 24 +++++++++++--- tutorials/noisepy_pnwstore_tutorial.ipynb | 15 +++++++-- tutorials/noisepy_scedc_tutorial.ipynb | 15 +++++++-- tutorials/plot_stacks.ipynb | 40 +++++++++++++++++++++-- 8 files changed, 123 insertions(+), 36 deletions(-) diff --git a/src/noisepy/seis/datatypes.py b/src/noisepy/seis/datatypes.py index 417f863a..ea1a0802 100644 --- a/src/noisepy/seis/datatypes.py +++ b/src/noisepy/seis/datatypes.py @@ -393,10 +393,15 @@ def to_json_types(params: Dict[str, Any]) -> Dict[str, Any]: def _to_json_type(value: Any) -> Any: # special case since numpy arrays are not json serializable - if type(value) == np.ndarray: + if isinstance(value, np.ndarray): return list(map(_to_json_type, value)) - elif type(value) == np.float64 or type(value) == np.float32: + elif isinstance(value, np.float64) or isinstance(value, np.float32): return float(value) - elif type(value) == np.int64 or type(value) == np.int32 or type(value) == np.int16 or type(value) == np.int8: + elif ( + isinstance(value, np.int64) + or isinstance(value, np.int32) + or isinstance(value, np.int16) + or isinstance(value, np.int8) + ): return int(value) return value diff --git a/src/noisepy/seis/numpystore.py b/src/noisepy/seis/numpystore.py index 3ba407ca..2b719246 100644 --- a/src/noisepy/seis/numpystore.py +++ b/src/noisepy/seis/numpystore.py @@ -10,7 +10,7 @@ from .datatypes import CrossCorrelation, Stack, to_json_types from .hierarchicalstores import ArrayStore, HierarchicalStoreBase -from .stores import parse_timespan +from .stores import CrossCorrelationDataStore, StackStore, parse_timespan from .utils import fs_join logger = logging.getLogger(__name__) @@ -89,12 +89,12 @@ def read(self, path: str) -> Optional[Tuple[np.ndarray, Dict[str, Any]]]: return None -class NumpyStackStore(HierarchicalStoreBase[Stack]): +class NumpyStackStore(HierarchicalStoreBase[Stack], StackStore): def __init__(self, root_dir: str, mode: str = "a", storage_options={}): super().__init__(NumpyArrayStore(root_dir, mode, storage_options=storage_options), Stack.load_instances) -class NumpyCCStore(HierarchicalStoreBase[CrossCorrelation]): +class NumpyCCStore(HierarchicalStoreBase[CrossCorrelation], CrossCorrelationDataStore): def __init__(self, root_dir: str, mode: str = "a", storage_options={}): super().__init__( NumpyArrayStore(root_dir, mode, storage_options=storage_options), CrossCorrelation.load_instances diff --git a/src/noisepy/seis/plotting_modules.py b/src/noisepy/seis/plotting_modules.py index 768d6914..bb3a35f7 100644 --- a/src/noisepy/seis/plotting_modules.py +++ b/src/noisepy/seis/plotting_modules.py @@ -1,6 +1,6 @@ import logging import os -from concurrent.futures import ThreadPoolExecutor +from typing import List, Tuple import matplotlib.pyplot as plt import numpy as np @@ -11,8 +11,8 @@ from obspy.signal.filter import bandpass from scipy.fftpack import next_fast_len -from noisepy.seis.stores import CrossCorrelationDataStore, StackStore -from noisepy.seis.utils import TimeLogger, get_results +from noisepy.seis.datatypes import Stack, Station +from noisepy.seis.stores import CrossCorrelationDataStore logging.getLogger("matplotlib.font_manager").disabled = True logger = logging.getLogger(__name__) @@ -630,7 +630,7 @@ def plot_substack_all_spect( def plot_all_moveout( - store: StackStore, + sta_stacks: List[Tuple[Station, Station, Stack]], stack_name, freqmin, freqmax, @@ -664,15 +664,16 @@ def plot_all_moveout( if sdir is None: raise ValueError("sdir argument must be provided if savefig=True") - sta_pairs = store.get_station_pairs() - ts = store.get_timespans(sta_pairs[0][0], sta_pairs[0][1]) - ts = ts[0] if len(ts) else None - if len(sta_pairs) == 0: + if len(sta_stacks) == 0: logger.error("No data available for plotting") return # Read some common arguments from the first available data set: - stacks = store.read(ts, sta_pairs[0][0], sta_pairs[0][1]) + stacks = sta_stacks[0][1] + stacks = list(filter(lambda x: x.name == stack_name and x.component == ccomp, stacks)) + if len(stacks) == 0: + logger.error(f"No data available for plotting {stack_name}/{ccomp}") + return dtmp = stacks[0].data params = stacks[0].parameters if len(params) == 0 or dtmp.size == 0: @@ -681,7 +682,7 @@ def plot_all_moveout( dt, maxlag = (params[p] for p in ["dt", "maxlag"]) stack_method = stack_name.split("0")[-1] - print(stack_name, stack_method) + logger.info(f"Plottting: {stack_method}, {len(sta_stacks)} station pairs") # lags for display if not disp_lag: @@ -693,15 +694,14 @@ def plot_all_moveout( indx2 = indx1 + 2 * int(disp_lag / dt) + 1 # cc matrix - nwin = len(sta_pairs) + nwin = len(sta_stacks) data = np.zeros(shape=(nwin, indx2 - indx1), dtype=np.float32) dist = np.zeros(nwin, dtype=np.float32) ngood = np.zeros(nwin, dtype=np.int16) # load cc and parameter matrix def load(ii): - (src, rec) = sta_pairs[ii] - stacks = store.read(ts, src, rec) + (src, rec), stacks = sta_stacks[ii] stacks = list(filter(lambda x: x.name == stack_name and x.component == ccomp, stacks)) if len(stacks) == 0: logger.warning(f"No data available for {src}_{rec}/{stack_name}/{ccomp}") @@ -712,11 +712,8 @@ def load(ii): crap = bandpass(all_data, freqmin, freqmax, int(1 / dt), corners=4, zerophase=True) data[ii] = crap[indx1:indx2] - tlog = TimeLogger(level=logging.INFO) - with ThreadPoolExecutor() as exec: - futures = [exec.submit(load, ii) for ii in range(nwin)] - _ = get_results(futures) - tlog.log(f"loading {nwin} stacks") + for i in range(nwin): + load(i) # average cc ntrace = int(np.round(np.max(dist) + 0.51) / dist_inc) diff --git a/src/noisepy/seis/stores.py b/src/noisepy/seis/stores.py index 84f90390..d90a487f 100644 --- a/src/noisepy/seis/stores.py +++ b/src/noisepy/seis/stores.py @@ -1,13 +1,16 @@ import datetime +import logging import os import re from abc import ABC, abstractmethod +from concurrent.futures import Executor, ThreadPoolExecutor from typing import Generic, List, Optional, Tuple, TypeVar import obspy from datetimerange import DateTimeRange from noisepy.seis.constants import DATE_FORMAT +from noisepy.seis.utils import TimeLogger, get_results from .datatypes import ( AnnotatedData, @@ -82,6 +85,18 @@ def get_station_pairs(self) -> List[Tuple[Station, Station]]: def read(self, timespan: DateTimeRange, src_sta: Station, rec_sta: Station) -> List[T]: pass + def read_bulk( + self, timespan: DateTimeRange, pairs: List[Tuple[Station, Station]], executor: Executor = ThreadPoolExecutor() + ) -> List[Tuple[Tuple[Station, Station], List[T]]]: + """ + Reads the data for all the given station pairs (and timespan) in parallel. + """ + tlog = TimeLogger(level=logging.INFO) + futures = [executor.submit(self.read, timespan, p[0], p[1]) for p in pairs] + results = get_results(futures) + tlog.log(f"loading {len(pairs)} stacks") + return list(zip(pairs, results)) + class CrossCorrelationDataStore(ComputedDataStore[CrossCorrelation]): pass diff --git a/tutorials/get_started.ipynb b/tutorials/get_started.ipynb index 3667ff1f..26fdd3d0 100644 --- a/tutorials/get_started.ipynb +++ b/tutorials/get_started.ipynb @@ -174,8 +174,8 @@ "source": [ "config.stations = [\"A*\"]\n", "config.channels = [\"BHE\",\"BHN\",\"BHZ\"]\n", - "config.start_date = isoparse(\"2019-02-01\")\n", - "config.end_date = isoparse(\"2019-02-02\")\n", + "config.start_date = isoparse(\"2019-02-01T00:00:00Z\")\n", + "config.end_date = isoparse(\"2019-02-02T00:00:00Z\")\n", "\n", "# Download data locally. Enters raw data path, channel types, stations, config, and fdsn server.\n", "download(raw_data_path, config)" @@ -288,7 +288,8 @@ }, "outputs": [], "source": [ - "timespans = cc_store.get_timespans()\n", + "pairs = cc_store.get_station_pairs()\n", + "timespans = cc_store.get_timespans(*pairs[0])\n", "plotting_modules.plot_substack_cc(cc_store, timespans[0], 0.1, 1, 200, False)" ] }, @@ -314,10 +315,23 @@ "source": [ "# open a new cc store in read-only mode since we will be doing parallel access for stacking\n", "cc_store = ASDFCCStore(cc_data_path, mode=\"r\")\n", + "print(cc_store.get_station_pairs())\n", "stack_store = ASDFStackStore(stack_data_path)\n", + "config.stations = [\"*\"] # stacking doesn't support prefixes yet, so allow all stations\n", "stack_cross_correlations(cc_store, stack_store, config)" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pairs = stack_store.get_station_pairs()\n", + "print(f\"Found {len(pairs)} station pairs\")\n", + "sta_stacks = stack_store.read_bulk(None, pairs) # no timestamp used in ASDFStackStore" + ] + }, { "cell_type": "markdown", "metadata": { @@ -347,7 +361,7 @@ }, "outputs": [], "source": [ - "plotting_modules.plot_all_moveout(stack_store, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)" + "plotting_modules.plot_all_moveout(sta_stacks, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)" ] } ], @@ -374,7 +388,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.12" + "version": "3.10.13" } }, "nbformat": 4, diff --git a/tutorials/noisepy_pnwstore_tutorial.ipynb b/tutorials/noisepy_pnwstore_tutorial.ipynb index 0f8189cf..29e51b31 100644 --- a/tutorials/noisepy_pnwstore_tutorial.ipynb +++ b/tutorials/noisepy_pnwstore_tutorial.ipynb @@ -336,6 +336,17 @@ "print(os.listdir(stack_data_path))" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pairs = stack_store.get_station_pairs()\n", + "print(f\"Found {len(pairs)} station pairs\")\n", + "sta_stacks = stack_store.read_bulk(None, pairs) # no timestamp used in ASDFStackStore" + ] + }, { "cell_type": "code", "execution_count": null, @@ -344,7 +355,7 @@ }, "outputs": [], "source": [ - "plotting_modules.plot_all_moveout(stack_store, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)" + "plotting_modules.plot_all_moveout(sta_stacks, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)" ] } ], @@ -372,7 +383,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.12" + "version": "3.10.13" } }, "nbformat": 4, diff --git a/tutorials/noisepy_scedc_tutorial.ipynb b/tutorials/noisepy_scedc_tutorial.ipynb index 85de56cb..5af5ec87 100644 --- a/tutorials/noisepy_scedc_tutorial.ipynb +++ b/tutorials/noisepy_scedc_tutorial.ipynb @@ -436,6 +436,17 @@ "cc_store.get_station_pairs()" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pairs = stack_store.get_station_pairs()\n", + "print(f\"Found {len(pairs)} station pairs\")\n", + "sta_stacks = stack_store.read_bulk(None, pairs) # no timestamp used in ASDFStackStore" + ] + }, { "cell_type": "code", "execution_count": null, @@ -444,7 +455,7 @@ }, "outputs": [], "source": [ - "plot_all_moveout(stack_store, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)" + "plot_all_moveout(sta_stacks, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)" ] }, { @@ -479,7 +490,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.12" + "version": "3.10.13" } }, "nbformat": 4, diff --git a/tutorials/plot_stacks.ipynb b/tutorials/plot_stacks.ipynb index d55db254..e1a6f736 100644 --- a/tutorials/plot_stacks.ipynb +++ b/tutorials/plot_stacks.ipynb @@ -23,11 +23,12 @@ "from noisepy.seis import __version__ # noisepy core functions\n", "from noisepy.seis.plotting_modules import plot_all_moveout\n", "from noisepy.seis.numpystore import NumpyStackStore\n", - "from noisepy.seis.datatypes import Stack, Station\n", + "import random\n", "print(f\"Using NoisePy version {__version__}\")\n", "\n", "\n", - "stack_data_path = \"s3://scoped-noise/scedc_CI_stack_v2/\"\n", + "stack_data_path = \"s3://scoped-noise/scedc_CI_2022_stack/\"\n", + "\n", "S3_STORAGE_OPTIONS = {\"s3\": {\"anon\": False}}\n", "stack_store = NumpyStackStore(stack_data_path, storage_options=S3_STORAGE_OPTIONS)" ] @@ -38,7 +39,40 @@ "metadata": {}, "outputs": [], "source": [ - "plot_all_moveout(stack_store, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)" + "pairs = stack_store.get_station_pairs()\n", + "print(f\"Found {len(pairs)} station pairs\")\n", + "# Get the first timespan available for the first pair\n", + "ts = stack_store.get_timespans(*pairs[0])[0]\n", + "print(f\"Timespan: {ts}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# load 10% of the data to plot\n", + "sample = random.sample(pairs, int(len(pairs)*.1))\n", + "print(len(sample))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sta_stacks = stack_store.read_bulk(ts, sample)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "plot_all_moveout(sta_stacks, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)" ] } ],