From 781187340aee16090d43ea86ea43beaaf7417bf7 Mon Sep 17 00:00:00 2001 From: Yiyu Ni Date: Mon, 4 Nov 2024 17:39:44 -0800 Subject: [PATCH] feat: performance improvement for large jobs (#38) * remove channel list in logger Signed-off-by: Yiyu Ni * update FDSN and XML channel catalog Signed-off-by: Yiyu Ni * improve PNWRawDataStore channel query efficiency Signed-off-by: Yiyu Ni * minor update on format Signed-off-by: Yiyu Ni * add test Signed-off-by: Yiyu Ni * add test Signed-off-by: Yiyu Ni * Delete src/noisepy/seis/io/mseedstore.py * important logic for single_frequency Signed-off-by: Yiyu Ni * important logic for single_frequency Signed-off-by: Yiyu Ni * add test Signed-off-by: Yiyu Ni --------- Signed-off-by: Yiyu Ni --- .gitignore | 1 + .pre-commit-config.yaml | 121 -------------------------- codecov.yml | 3 + src/noisepy/seis/io/channelcatalog.py | 74 ++++++++-------- src/noisepy/seis/io/datatypes.py | 6 +- src/noisepy/seis/io/pnwstore.py | 23 +++-- src/noisepy/seis/io/s3store.py | 4 +- src/noisepy/seis/io/utils.py | 2 +- tests/test_channelcatalog.py | 16 +++- 9 files changed, 83 insertions(+), 167 deletions(-) diff --git a/.gitignore b/.gitignore index 5367475..a92355b 100644 --- a/.gitignore +++ b/.gitignore @@ -139,6 +139,7 @@ dask-worker-space/ # tmp directory tmp/ +dev/ # Mac OS .DS_Store diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8214508..1e644af 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -31,124 +31,3 @@ repos: rev: 23.7.0 hooks: - id: black - - - # - repo: https://github.com/PyCQA/flake8 - # rev: 6.0.0 - # hooks: - # - id: flake8 - # additional_dependencies: - # - flake8-black - # exclude: .git,__pycache__,build,dist -# repos: - -# # Compare the local template version to the latest remote template version -# # This hook should always pass. It will print a message if the local version -# # is out of date. -# - repo: https://github.com/lincc-frameworks/pre-commit-hooks -# rev: v0.1.1 -# hooks: -# - id: check-lincc-frameworks-template-version -# name: Check template version -# description: Compare current template version against latest -# verbose: true - -# # Clear output from jupyter notebooks so that only the input cells are committed. -# - repo: local -# hooks: -# - id: jupyter-nb-clear-output -# name: Clear output from Jupyter notebooks -# description: Clear output from Jupyter notebooks. -# files: \.ipynb$ -# stages: [commit] -# language: system -# entry: jupyter nbconvert --clear-output - -# # Prevents committing directly branches named 'main' and 'master'. -# - repo: https://github.com/pre-commit/pre-commit-hooks -# rev: v4.4.0 -# hooks: -# - id: no-commit-to-branch -# name: Prevent main branch commits -# description: Prevent the user from committing directly to the primary branch. -# - id: check-added-large-files -# name: Check for large files -# description: Prevent the user from committing very large files. -# args: ['--maxkb=500'] - -# # Verify that pyproject.toml is well formed -# - repo: https://github.com/abravalheri/validate-pyproject -# rev: v0.12.1 -# hooks: -# - id: validate-pyproject -# name: Validate pyproject.toml -# description: Verify that pyproject.toml adheres to the established schema. - - -# # Automatically sort the imports used in .py files -# - repo: https://github.com/pycqa/isort -# rev: 5.12.0 -# hooks: -# - id: isort -# name: Run isort -# description: Sort and organize imports in .py and .pyi files. -# types_or: [python, pyi] - - - -# # Analyze the code style and report code that doesn't adhere. -# - repo: https://github.com/psf/black -# rev: 23.7.0 -# hooks: -# - id: black-jupyter -# name: Format code using black -# types_or: [python, pyi, jupyter] -# # It is recommended to specify the latest version of Python -# # supported by your project here, or alternatively use -# # pre-commit's default_language_version, see -# # https://pre-commit.com/#top_level-default_language_version -# language_version: python3.10 - - - - -# # Run unit tests, verify that they pass. Note that coverage is run against -# # the ./src directory here because that is what will be committed. In the -# # github workflow script, the coverage is run against the installed package -# # and uploaded to Codecov by calling pytest like so: -# # `python -m pytest --cov= --cov-report=xml` -# - repo: local -# hooks: -# - id: pytest-check -# name: Run unit tests -# description: Run unit tests with pytest. -# entry: bash -c "if python -m pytest --co -qq; then python -m pytest --cov=./src --cov-report=html; fi" -# language: system -# pass_filenames: false -# always_run: true -# # Make sure Sphinx can build the documentation while explicitly omitting -# # notebooks from the docs, so users don't have to wait through the execution -# # of each notebook or each commit. By default, these will be checked in the -# # GitHub workflows. -# - repo: local -# hooks: -# - id: sphinx-build -# name: Build documentation with Sphinx -# entry: sphinx-build -# language: system -# always_run: true -# exclude_types: [file, symlink] -# args: -# [ -# "-M", # Run sphinx in make mode, so we can use -D flag later -# # Note: -M requires next 3 args to be builder, source, output -# "html", # Specify builder -# "./docs", # Source directory of documents -# "./_readthedocs", # Output directory for rendered documents -# "-T", # Show full trace back on exception -# "-E", # Don't use saved env; always read all files -# "-d", # Flag for cached environment and doctrees -# "./docs/_build/doctrees", # Directory -# "-D", # Flag to override settings in conf.py -# "exclude_patterns=notebooks/*", # Exclude our notebooks from pre-commit -# ] diff --git a/codecov.yml b/codecov.yml index bb6575c..ed29d6b 100644 --- a/codecov.yml +++ b/codecov.yml @@ -21,3 +21,6 @@ coverage: # no comments in PRs comment: off + +ignore: + - "**/pnwstore.py" diff --git a/src/noisepy/seis/io/channelcatalog.py b/src/noisepy/seis/io/channelcatalog.py index e579b0a..2618863 100644 --- a/src/noisepy/seis/io/channelcatalog.py +++ b/src/noisepy/seis/io/channelcatalog.py @@ -1,6 +1,7 @@ import glob import logging import os +import time from abc import ABC, abstractmethod from functools import lru_cache @@ -10,7 +11,7 @@ import obspy.core.inventory as inventory import pandas as pd from datetimerange import DateTimeRange -from obspy import UTCDateTime, read_inventory +from obspy import read_inventory from obspy.clients.fdsn import Client from .datatypes import Channel, Station @@ -94,46 +95,54 @@ def _get_inventory_from_file(self, xmlfile): class FDSNChannelCatalog(ChannelCatalog): """ - A channel catalog that queries the FDSN service + A channel catalog that queries the FDSN web service FDSN ~ International Federation of Digital Seismograph Network """ - def __init__( - self, - url_key: str, - cache_dir: str, - ): + def __init__(self, url_key: str, cache_dir: str, sleep_time: int = 10): + """ + Constructs a FDSNChannelCatalog. A local directory will be used for inventory caching. + + Args: + url_key (str): url key for obspy FDSN client, i.e., IRIS, SCEDC. See obspy.clients.fdsn + cache_dir (str): local database for metadata cache + sleep_time (int): give a random wait time for FDSN request + """ super().__init__() self.url_key = url_key + self.sleep_time = sleep_time - logger.info(f"Cache dir: ${cache_dir}") + logger.info(f"Using FDSN service by {self.url_key}") + logger.info(f"Cache dir: {cache_dir}") self.cache = dc.Cache(cache_dir) def get_full_channel(self, timespan: DateTimeRange, channel: Channel) -> Channel: - inv = self._get_inventory(str(timespan)) + inv = self._get_inventory(channel.station) return self.populate_from_inventory(inv, channel) def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inventory: - return self._get_inventory(str(timespan)) - - def _get_cache_key(self, ts_str: str) -> str: - return f"{self.url_key}_{ts_str}" + return self._get_inventory(station) @lru_cache - # pass the timestamp (DateTimeRange) as string so that the method is cacheable - # since DateTimeRange is not hasheable - def _get_inventory(self, ts_str: str) -> obspy.Inventory: - ts = DateTimeRange.from_range_text(ts_str) - key = self._get_cache_key(ts_str) - inventory = self.cache.get(key, None) + def _get_inventory(self, station: Station) -> obspy.Inventory: + inventory = self.cache.get(str(station), None) # check local cache if inventory is None: - logging.info(f"Inventory not found in cache for key: '{key}'. Fetching from {self.url_key}.") - bulk_station_request = [ - ("*", "*", "*", "*", UTCDateTime(ts.start_datetime), UTCDateTime(ts.end_datetime)) - ] + logging.info(f"Inventory not found in cache for '{station}'. Fetching from {self.url_key}.") + # Don't send request too fast + time.sleep(np.random.uniform(self.sleep_time)) client = Client(self.url_key) - inventory = client.get_stations_bulk(bulk_station_request, level="channel") - self.cache[key] = inventory + try: + inventory = client.get_stations( + network=station.network, + station=station.name, + location="*", + channel="?H?,?N?", + level="channel", + ) + except obspy.clients.fdsn.header.FDSNNoDataException: + logger.warning(f"FDSN returns no data for {station}. Returning empty Inventory()") + inventory = obspy.Inventory() + self.cache[str(station)] = inventory return inventory @@ -179,10 +188,10 @@ def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inve station = inventory.Station(sta, lat, lon, elevation, channels=channels) stations.append(station) nets.append(inventory.Network(net, stations)) - return inventory.Inventory(nets) + return obspy.Inventory(nets) -def sta_info_from_inv(inv: inventory.Inventory): +def sta_info_from_inv(inv: obspy.Inventory): """ this function outputs station info from the obspy inventory object (used in S0B) @@ -216,7 +225,7 @@ def sta_info_from_inv(inv: inventory.Inventory): return sta, net, lon, lat, elv, location -def stats2inv_staxml(stats, respdir: str) -> inventory.Inventory: +def stats2inv_staxml(stats, respdir: str) -> obspy.Inventory: if not respdir: raise ValueError("Abort! staxml is selected but no directory is given to access the files") else: @@ -239,7 +248,7 @@ def stats2inv_staxml(stats, respdir: str) -> inventory.Inventory: def stats2inv_sac(stats): - inv = inventory.Inventory(networks=[], source="homegrown") + inv = obspy.Inventory(networks=[], source="homegrown") net = inventory.Network( # This is the network code according to the SEED standard. code=stats.network, @@ -283,8 +292,8 @@ def stats2inv_sac(stats): return inv -def stats2inv_mseed(stats, locs: pd.DataFrame) -> inventory.Inventory: - inv = inventory.Inventory(networks=[], source="homegrown") +def stats2inv_mseed(stats, locs: pd.DataFrame) -> obspy.Inventory: + inv = obspy.Inventory(networks=[], source="homegrown") ista = locs[locs["station"] == stats.station].index.values.astype("int64")[0] net = inventory.Network( @@ -370,6 +379,3 @@ def cc_parameters(cc_para, coor, tcorr, ncorr, comp): "comp": comp, } return parameters - - -# TODO: A channel catalog that uses the files in the SCEDC S3 bucket: s3://scedc-pds/FDSNstationXML/ diff --git a/src/noisepy/seis/io/datatypes.py b/src/noisepy/seis/io/datatypes.py index ac11e4c..4a25e69 100644 --- a/src/noisepy/seis/io/datatypes.py +++ b/src/noisepy/seis/io/datatypes.py @@ -152,8 +152,10 @@ class ConfigParameters(BaseModel): sampling_rate: float = Field(default=20.0) single_freq: bool = Field( default=True, - description="Filter to only data sampled at ONE frequency (the closest >= to sampling_rate). " - "If False, it will use all data sample at >=sampling_rate", + description="Filter to only data sampled at ONE frequency (the closest >= to sampling_rate)." + " Mismatched frequencies will be dropped." + " If False, it will use all frequencies. " + " However, each station will still filter to only ONE frequency (the closest >= to sampling_rate)", ) cc_len: int = Field(default=1800, description="basic unit of data length for fft (sec)") diff --git a/src/noisepy/seis/io/pnwstore.py b/src/noisepy/seis/io/pnwstore.py index e347a94..f8721d2 100644 --- a/src/noisepy/seis/io/pnwstore.py +++ b/src/noisepy/seis/io/pnwstore.py @@ -2,6 +2,7 @@ import logging import os import sqlite3 +from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta, timezone from typing import Callable, List, Tuple @@ -64,12 +65,18 @@ def _load_channels(self, full_path: str, chan_filter: Callable[[Channel], bool]) parts = full_path.split(os.path.sep) assert len(parts) >= 4 net, year, doy = parts[-4:-1] - - rst = self._dbquery( + cmd = ( f"SELECT DISTINCT network, station, channel, location, filename " - f"FROM tsindex WHERE filename LIKE '%%/{net}/{year}/{doy}/%%'" + f"FROM tsindex WHERE filename LIKE '%%/{net}/{year}/{doy}/%%' " + "AND (channel LIKE '_H_' OR channel LIKE '_N_') " ) + # if network is speficied, query will be faster + if net != "__": + cmd += f" AND network = '{net}'" + else: + logging.warning("Data path contains wildcards. Channel query might be slow.") + rst = self._dbquery(cmd) for i in rst: timespan = PNWDataStore._parse_timespan(os.path.basename(i[4])) self.paths[timespan.start_datetime] = full_path @@ -82,9 +89,13 @@ def _load_channels(self, full_path: str, chan_filter: Callable[[Channel], bool]) else: self.channels[key].append(channel) - def get_channels(self, timespan: DateTimeRange) -> List[Channel]: - tmp_channels = self.channels.get(str(timespan), []) - return list(map(lambda c: self.chan_catalog.get_full_channel(timespan, c), tmp_channels)) + def get_channels(self, date_range: DateTimeRange) -> List[Channel]: + tmp_channels = self.channels.get(str(date_range), []) + executor = ThreadPoolExecutor() + stations = set(map(lambda c: c.station, tmp_channels)) + _ = list(executor.map(lambda s: self.chan_catalog.get_inventory(date_range, s), stations)) + logger.info(f"Getting {len(tmp_channels)} channels for {date_range}") + return list(executor.map(lambda c: self.chan_catalog.get_full_channel(date_range, c), tmp_channels)) def get_timespans(self) -> List[DateTimeRange]: return list([DateTimeRange.from_range_text(d) for d in sorted(self.channels.keys())]) diff --git a/src/noisepy/seis/io/s3store.py b/src/noisepy/seis/io/s3store.py index 33aa59a..c754a3c 100644 --- a/src/noisepy/seis/io/s3store.py +++ b/src/noisepy/seis/io/s3store.py @@ -62,7 +62,7 @@ def __init__( def _load_channels(self, full_path: str, chan_filter: Callable[[Channel], bool]): tlog = TimeLogger(logger=logger, level=logging.INFO) msfiles = [f for f in self.fs.glob(fs_join(full_path, "*")) if self.file_re.match(f) is not None] - tlog.log(f"Loading {len(msfiles)} files from {full_path}") + tlog.log(f"Listing {len(msfiles)} files from {full_path}") for f in msfiles: timespan = self._parse_timespan(f) self.paths[timespan.start_datetime] = full_path @@ -93,7 +93,7 @@ def get_channels(self, date_range: DateTimeRange) -> List[Channel]: executor = ThreadPoolExecutor() stations = set(map(lambda c: c.station, tmp_channels)) _ = list(executor.map(lambda s: self.chan_catalog.get_inventory(date_range, s), stations)) - logger.info(f"Getting {len(tmp_channels)} channels for {date_range}: {tmp_channels}") + logger.info(f"Getting {len(tmp_channels)} channels for {date_range}") return list(executor.map(lambda c: self.chan_catalog.get_full_channel(date_range, c), tmp_channels)) def get_timespans(self) -> List[DateTimeRange]: diff --git a/src/noisepy/seis/io/utils.py b/src/noisepy/seis/io/utils.py index e2e6a0d..6e3f2dc 100644 --- a/src/noisepy/seis/io/utils.py +++ b/src/noisepy/seis/io/utils.py @@ -115,7 +115,7 @@ def log(self, message: str = None, start: float = -1.0) -> float: def log_raw(self, message: str, dt: float): if self.enabled: - self.logger.log(self.level, f"TIMING{self.prefix}: {dt:6.4f} secs. for {message}") + self.logger.log(self.level, f"TIMING{self.prefix}: {dt:6.3f} secs for {message}") return self.time diff --git a/tests/test_channelcatalog.py b/tests/test_channelcatalog.py index 463bc3a..a7b12ab 100644 --- a/tests/test_channelcatalog.py +++ b/tests/test_channelcatalog.py @@ -9,6 +9,7 @@ from noisepy.seis.io.channelcatalog import ( ChannelCatalog, CSVChannelCatalog, + FDSNChannelCatalog, XMLStationChannelCatalog, stats2inv_mseed, ) @@ -20,7 +21,7 @@ @pytest.mark.parametrize("stat,ch,lat,lon,elev", chan_data) -def test_csv(stat: str, ch: str, lat: float, lon: float, elev: float): +def test_CSVChannelCatalog(stat: str, ch: str, lat: float, lon: float, elev: float): cat = CSVChannelCatalog(file) chan = Channel(ChannelType(ch), Station("CI", stat)) full_ch = cat.get_full_channel(DateTimeRange(), chan) @@ -80,3 +81,16 @@ def test_XMLStationChannelCatalogCustomPath(): yaq_inv = cat.get_inventory(DateTimeRange(), Station("CI", "YAQ")) assert len(yaq_inv) == 1 assert len(yaq_inv.networks[0].stations) == 1 + + +def test_FDSNStationChannelCatalog(tmp_path: str): + cat = FDSNChannelCatalog("IRIS", tmp_path) + chan = Channel(ChannelType("BHZ"), Station("UW", "SEP")) + yaq_inv = cat.get_inventory(DateTimeRange(), chan.station) + assert len(yaq_inv) == 1 + assert len(yaq_inv.networks[0].stations) == 1 + _ = cat.get_full_channel(DateTimeRange(), chan) + + chan = Channel(ChannelType("ABC"), Station("UW", "DEF")) + yaq_inv = cat.get_inventory(DateTimeRange(), chan.station) + assert len(yaq_inv) == 0