Skip to content

Commit

Permalink
refactor: Decouple data loading from plot_all_moveout
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosgjs committed Oct 16, 2023
1 parent 1fa1daf commit 50e8e13
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 36 deletions.
11 changes: 8 additions & 3 deletions src/noisepy/seis/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,10 +393,15 @@ def to_json_types(params: Dict[str, Any]) -> Dict[str, Any]:

def _to_json_type(value: Any) -> Any:
# special case since numpy arrays are not json serializable
if type(value) == np.ndarray:
if isinstance(value, np.ndarray):
return list(map(_to_json_type, value))
elif type(value) == np.float64 or type(value) == np.float32:
elif isinstance(value, np.float64) or isinstance(value, np.float32):
return float(value)
elif type(value) == np.int64 or type(value) == np.int32 or type(value) == np.int16 or type(value) == np.int8:
elif (
isinstance(value, np.int64)
or isinstance(value, np.int32)
or isinstance(value, np.int16)
or isinstance(value, np.int8)
):
return int(value)
return value
6 changes: 3 additions & 3 deletions src/noisepy/seis/numpystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from .datatypes import CrossCorrelation, Stack, to_json_types
from .hierarchicalstores import ArrayStore, HierarchicalStoreBase
from .stores import parse_timespan
from .stores import CrossCorrelationDataStore, StackStore, parse_timespan
from .utils import fs_join

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -89,12 +89,12 @@ def read(self, path: str) -> Optional[Tuple[np.ndarray, Dict[str, Any]]]:
return None


class NumpyStackStore(HierarchicalStoreBase[Stack]):
class NumpyStackStore(HierarchicalStoreBase[Stack], StackStore):
def __init__(self, root_dir: str, mode: str = "a", storage_options={}):
super().__init__(NumpyArrayStore(root_dir, mode, storage_options=storage_options), Stack.load_instances)


class NumpyCCStore(HierarchicalStoreBase[CrossCorrelation]):
class NumpyCCStore(HierarchicalStoreBase[CrossCorrelation], CrossCorrelationDataStore):
def __init__(self, root_dir: str, mode: str = "a", storage_options={}):
super().__init__(
NumpyArrayStore(root_dir, mode, storage_options=storage_options), CrossCorrelation.load_instances
Expand Down
33 changes: 15 additions & 18 deletions src/noisepy/seis/plotting_modules.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple

import matplotlib.pyplot as plt
import numpy as np
Expand All @@ -11,8 +11,8 @@
from obspy.signal.filter import bandpass
from scipy.fftpack import next_fast_len

from noisepy.seis.stores import CrossCorrelationDataStore, StackStore
from noisepy.seis.utils import TimeLogger, get_results
from noisepy.seis.datatypes import Stack, Station
from noisepy.seis.stores import CrossCorrelationDataStore

logging.getLogger("matplotlib.font_manager").disabled = True
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -630,7 +630,7 @@ def plot_substack_all_spect(


def plot_all_moveout(
store: StackStore,
sta_stacks: List[Tuple[Station, Station, Stack]],
stack_name,
freqmin,
freqmax,
Expand Down Expand Up @@ -664,15 +664,16 @@ def plot_all_moveout(
if sdir is None:
raise ValueError("sdir argument must be provided if savefig=True")

sta_pairs = store.get_station_pairs()
ts = store.get_timespans(sta_pairs[0][0], sta_pairs[0][1])
ts = ts[0] if len(ts) else None
if len(sta_pairs) == 0:
if len(sta_stacks) == 0:
logger.error("No data available for plotting")
return

# Read some common arguments from the first available data set:
stacks = store.read(ts, sta_pairs[0][0], sta_pairs[0][1])
stacks = sta_stacks[0][1]
stacks = list(filter(lambda x: x.name == stack_name and x.component == ccomp, stacks))
if len(stacks) == 0:
logger.error(f"No data available for plotting {stack_name}/{ccomp}")
return
dtmp = stacks[0].data
params = stacks[0].parameters
if len(params) == 0 or dtmp.size == 0:
Expand All @@ -681,7 +682,7 @@ def plot_all_moveout(

dt, maxlag = (params[p] for p in ["dt", "maxlag"])
stack_method = stack_name.split("0")[-1]
print(stack_name, stack_method)
logger.info(f"Plottting: {stack_method}, {len(sta_stacks)} station pairs")

# lags for display
if not disp_lag:
Expand All @@ -693,15 +694,14 @@ def plot_all_moveout(
indx2 = indx1 + 2 * int(disp_lag / dt) + 1

# cc matrix
nwin = len(sta_pairs)
nwin = len(sta_stacks)
data = np.zeros(shape=(nwin, indx2 - indx1), dtype=np.float32)
dist = np.zeros(nwin, dtype=np.float32)
ngood = np.zeros(nwin, dtype=np.int16)

# load cc and parameter matrix
def load(ii):
(src, rec) = sta_pairs[ii]
stacks = store.read(ts, src, rec)
(src, rec), stacks = sta_stacks[ii]
stacks = list(filter(lambda x: x.name == stack_name and x.component == ccomp, stacks))
if len(stacks) == 0:
logger.warning(f"No data available for {src}_{rec}/{stack_name}/{ccomp}")
Expand All @@ -712,11 +712,8 @@ def load(ii):
crap = bandpass(all_data, freqmin, freqmax, int(1 / dt), corners=4, zerophase=True)
data[ii] = crap[indx1:indx2]

tlog = TimeLogger(level=logging.INFO)
with ThreadPoolExecutor() as exec:
futures = [exec.submit(load, ii) for ii in range(nwin)]
_ = get_results(futures)
tlog.log(f"loading {nwin} stacks")
for i in range(nwin):
load(i)

# average cc
ntrace = int(np.round(np.max(dist) + 0.51) / dist_inc)
Expand Down
15 changes: 15 additions & 0 deletions src/noisepy/seis/stores.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import datetime
import logging
import os
import re
from abc import ABC, abstractmethod
from concurrent.futures import Executor, ThreadPoolExecutor
from typing import Generic, List, Optional, Tuple, TypeVar

import obspy
from datetimerange import DateTimeRange

from noisepy.seis.constants import DATE_FORMAT
from noisepy.seis.utils import TimeLogger, get_results

from .datatypes import (
AnnotatedData,
Expand Down Expand Up @@ -82,6 +85,18 @@ def get_station_pairs(self) -> List[Tuple[Station, Station]]:
def read(self, timespan: DateTimeRange, src_sta: Station, rec_sta: Station) -> List[T]:
pass

def read_bulk(
self, timespan: DateTimeRange, pairs: List[Tuple[Station, Station]], executor: Executor = ThreadPoolExecutor()
) -> List[Tuple[Tuple[Station, Station], List[T]]]:
"""
Reads the data for all the given station pairs (and timespan) in parallel.
"""
tlog = TimeLogger(level=logging.INFO)

Check warning on line 94 in src/noisepy/seis/stores.py

View check run for this annotation

Codecov / codecov/patch

src/noisepy/seis/stores.py#L94

Added line #L94 was not covered by tests
futures = [executor.submit(self.read, timespan, p[0], p[1]) for p in pairs]
results = get_results(futures)
tlog.log(f"loading {len(pairs)} stacks")
return list(zip(pairs, results))

Check warning on line 98 in src/noisepy/seis/stores.py

View check run for this annotation

Codecov / codecov/patch

src/noisepy/seis/stores.py#L96-L98

Added lines #L96 - L98 were not covered by tests


class CrossCorrelationDataStore(ComputedDataStore[CrossCorrelation]):
pass
Expand Down
24 changes: 19 additions & 5 deletions tutorials/get_started.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@
"source": [
"config.stations = [\"A*\"]\n",
"config.channels = [\"BHE\",\"BHN\",\"BHZ\"]\n",
"config.start_date = isoparse(\"2019-02-01\")\n",
"config.end_date = isoparse(\"2019-02-02\")\n",
"config.start_date = isoparse(\"2019-02-01T00:00:00Z\")\n",
"config.end_date = isoparse(\"2019-02-02T00:00:00Z\")\n",
"\n",
"# Download data locally. Enters raw data path, channel types, stations, config, and fdsn server.\n",
"download(raw_data_path, config)"
Expand Down Expand Up @@ -288,7 +288,8 @@
},
"outputs": [],
"source": [
"timespans = cc_store.get_timespans()\n",
"pairs = cc_store.get_station_pairs()\n",
"timespans = cc_store.get_timespans(*pairs[0])\n",
"plotting_modules.plot_substack_cc(cc_store, timespans[0], 0.1, 1, 200, False)"
]
},
Expand All @@ -314,10 +315,23 @@
"source": [
"# open a new cc store in read-only mode since we will be doing parallel access for stacking\n",
"cc_store = ASDFCCStore(cc_data_path, mode=\"r\")\n",
"print(cc_store.get_station_pairs())\n",
"stack_store = ASDFStackStore(stack_data_path)\n",
"config.stations = [\"*\"] # stacking doesn't support prefixes yet, so allow all stations\n",
"stack_cross_correlations(cc_store, stack_store, config)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pairs = stack_store.get_station_pairs()\n",
"print(f\"Found {len(pairs)} station pairs\")\n",
"sta_stacks = stack_store.read_bulk(None, pairs) # no timestamp used in ASDFStackStore"
]
},
{
"cell_type": "markdown",
"metadata": {
Expand Down Expand Up @@ -347,7 +361,7 @@
},
"outputs": [],
"source": [
"plotting_modules.plot_all_moveout(stack_store, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)"
"plotting_modules.plot_all_moveout(sta_stacks, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)"
]
}
],
Expand All @@ -374,7 +388,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
"version": "3.10.13"
}
},
"nbformat": 4,
Expand Down
15 changes: 13 additions & 2 deletions tutorials/noisepy_pnwstore_tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,17 @@
"print(os.listdir(stack_data_path))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pairs = stack_store.get_station_pairs()\n",
"print(f\"Found {len(pairs)} station pairs\")\n",
"sta_stacks = stack_store.read_bulk(None, pairs) # no timestamp used in ASDFStackStore"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -344,7 +355,7 @@
},
"outputs": [],
"source": [
"plotting_modules.plot_all_moveout(stack_store, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)"
"plotting_modules.plot_all_moveout(sta_stacks, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)"
]
}
],
Expand Down Expand Up @@ -372,7 +383,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
"version": "3.10.13"
}
},
"nbformat": 4,
Expand Down
15 changes: 13 additions & 2 deletions tutorials/noisepy_scedc_tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,17 @@
"cc_store.get_station_pairs()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pairs = stack_store.get_station_pairs()\n",
"print(f\"Found {len(pairs)} station pairs\")\n",
"sta_stacks = stack_store.read_bulk(None, pairs) # no timestamp used in ASDFStackStore"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -444,7 +455,7 @@
},
"outputs": [],
"source": [
"plot_all_moveout(stack_store, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)"
"plot_all_moveout(sta_stacks, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)"
]
},
{
Expand Down Expand Up @@ -479,7 +490,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
"version": "3.10.13"
}
},
"nbformat": 4,
Expand Down
40 changes: 37 additions & 3 deletions tutorials/plot_stacks.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
"from noisepy.seis import __version__ # noisepy core functions\n",
"from noisepy.seis.plotting_modules import plot_all_moveout\n",
"from noisepy.seis.numpystore import NumpyStackStore\n",
"from noisepy.seis.datatypes import Stack, Station\n",
"import random\n",
"print(f\"Using NoisePy version {__version__}\")\n",
"\n",
"\n",
"stack_data_path = \"s3://scoped-noise/scedc_CI_stack_v2/\"\n",
"stack_data_path = \"s3://scoped-noise/scedc_CI_2022_stack/\"\n",
"\n",
"S3_STORAGE_OPTIONS = {\"s3\": {\"anon\": False}}\n",
"stack_store = NumpyStackStore(stack_data_path, storage_options=S3_STORAGE_OPTIONS)"
]
Expand All @@ -38,7 +39,40 @@
"metadata": {},
"outputs": [],
"source": [
"plot_all_moveout(stack_store, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)"
"pairs = stack_store.get_station_pairs()\n",
"print(f\"Found {len(pairs)} station pairs\")\n",
"# Get the first timespan available for the first pair\n",
"ts = stack_store.get_timespans(*pairs[0])[0]\n",
"print(f\"Timespan: {ts}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# load 10% of the data to plot\n",
"sample = random.sample(pairs, int(len(pairs)*.1))\n",
"print(len(sample))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sta_stacks = stack_store.read_bulk(ts, sample)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"plot_all_moveout(sta_stacks, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)"
]
}
],
Expand Down

0 comments on commit 50e8e13

Please sign in to comment.