Skip to content

Commit

Permalink
feat: performance improvement for large jobs (#38)
Browse files Browse the repository at this point in the history
* remove channel list in logger

Signed-off-by: Yiyu Ni <[email protected]>

* update FDSN and XML channel catalog

Signed-off-by: Yiyu Ni <[email protected]>

* improve PNWRawDataStore channel query efficiency

Signed-off-by: Yiyu Ni <[email protected]>

* minor update on format

Signed-off-by: Yiyu Ni <[email protected]>

* add test

Signed-off-by: Yiyu Ni <[email protected]>

* add test

Signed-off-by: Yiyu Ni <[email protected]>

* Delete src/noisepy/seis/io/mseedstore.py

* important logic for single_frequency

Signed-off-by: Yiyu Ni <[email protected]>

* important logic for single_frequency

Signed-off-by: Yiyu Ni <[email protected]>

* add test

Signed-off-by: Yiyu Ni <[email protected]>

---------

Signed-off-by: Yiyu Ni <[email protected]>
  • Loading branch information
niyiyu authored Nov 5, 2024
1 parent dc30ff6 commit 7811873
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 167 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ dask-worker-space/

# tmp directory
tmp/
dev/

# Mac OS
.DS_Store
Expand Down
121 changes: 0 additions & 121 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,124 +31,3 @@ repos:
rev: 23.7.0
hooks:
- id: black


# - repo: https://github.com/PyCQA/flake8
# rev: 6.0.0
# hooks:
# - id: flake8
# additional_dependencies:
# - flake8-black
# exclude: .git,__pycache__,build,dist
# repos:

# # Compare the local template version to the latest remote template version
# # This hook should always pass. It will print a message if the local version
# # is out of date.
# - repo: https://github.com/lincc-frameworks/pre-commit-hooks
# rev: v0.1.1
# hooks:
# - id: check-lincc-frameworks-template-version
# name: Check template version
# description: Compare current template version against latest
# verbose: true

# # Clear output from jupyter notebooks so that only the input cells are committed.
# - repo: local
# hooks:
# - id: jupyter-nb-clear-output
# name: Clear output from Jupyter notebooks
# description: Clear output from Jupyter notebooks.
# files: \.ipynb$
# stages: [commit]
# language: system
# entry: jupyter nbconvert --clear-output

# # Prevents committing directly branches named 'main' and 'master'.
# - repo: https://github.com/pre-commit/pre-commit-hooks
# rev: v4.4.0
# hooks:
# - id: no-commit-to-branch
# name: Prevent main branch commits
# description: Prevent the user from committing directly to the primary branch.
# - id: check-added-large-files
# name: Check for large files
# description: Prevent the user from committing very large files.
# args: ['--maxkb=500']

# # Verify that pyproject.toml is well formed
# - repo: https://github.com/abravalheri/validate-pyproject
# rev: v0.12.1
# hooks:
# - id: validate-pyproject
# name: Validate pyproject.toml
# description: Verify that pyproject.toml adheres to the established schema.


# # Automatically sort the imports used in .py files
# - repo: https://github.com/pycqa/isort
# rev: 5.12.0
# hooks:
# - id: isort
# name: Run isort
# description: Sort and organize imports in .py and .pyi files.
# types_or: [python, pyi]



# # Analyze the code style and report code that doesn't adhere.
# - repo: https://github.com/psf/black
# rev: 23.7.0
# hooks:
# - id: black-jupyter
# name: Format code using black
# types_or: [python, pyi, jupyter]
# # It is recommended to specify the latest version of Python
# # supported by your project here, or alternatively use
# # pre-commit's default_language_version, see
# # https://pre-commit.com/#top_level-default_language_version
# language_version: python3.10




# # Run unit tests, verify that they pass. Note that coverage is run against
# # the ./src directory here because that is what will be committed. In the
# # github workflow script, the coverage is run against the installed package
# # and uploaded to Codecov by calling pytest like so:
# # `python -m pytest --cov=<package_name> --cov-report=xml`
# - repo: local
# hooks:
# - id: pytest-check
# name: Run unit tests
# description: Run unit tests with pytest.
# entry: bash -c "if python -m pytest --co -qq; then python -m pytest --cov=./src --cov-report=html; fi"
# language: system
# pass_filenames: false
# always_run: true
# # Make sure Sphinx can build the documentation while explicitly omitting
# # notebooks from the docs, so users don't have to wait through the execution
# # of each notebook or each commit. By default, these will be checked in the
# # GitHub workflows.
# - repo: local
# hooks:
# - id: sphinx-build
# name: Build documentation with Sphinx
# entry: sphinx-build
# language: system
# always_run: true
# exclude_types: [file, symlink]
# args:
# [
# "-M", # Run sphinx in make mode, so we can use -D flag later
# # Note: -M requires next 3 args to be builder, source, output
# "html", # Specify builder
# "./docs", # Source directory of documents
# "./_readthedocs", # Output directory for rendered documents
# "-T", # Show full trace back on exception
# "-E", # Don't use saved env; always read all files
# "-d", # Flag for cached environment and doctrees
# "./docs/_build/doctrees", # Directory
# "-D", # Flag to override settings in conf.py
# "exclude_patterns=notebooks/*", # Exclude our notebooks from pre-commit
# ]
3 changes: 3 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ coverage:

# no comments in PRs
comment: off

ignore:
- "**/pnwstore.py"
74 changes: 40 additions & 34 deletions src/noisepy/seis/io/channelcatalog.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import glob
import logging
import os
import time
from abc import ABC, abstractmethod
from functools import lru_cache

Expand All @@ -10,7 +11,7 @@
import obspy.core.inventory as inventory
import pandas as pd
from datetimerange import DateTimeRange
from obspy import UTCDateTime, read_inventory
from obspy import read_inventory
from obspy.clients.fdsn import Client

from .datatypes import Channel, Station
Expand Down Expand Up @@ -94,46 +95,54 @@ def _get_inventory_from_file(self, xmlfile):

class FDSNChannelCatalog(ChannelCatalog):
"""
A channel catalog that queries the FDSN service
A channel catalog that queries the FDSN web service
FDSN ~ International Federation of Digital Seismograph Network
"""

def __init__(
self,
url_key: str,
cache_dir: str,
):
def __init__(self, url_key: str, cache_dir: str, sleep_time: int = 10):
"""
Constructs a FDSNChannelCatalog. A local directory will be used for inventory caching.
Args:
url_key (str): url key for obspy FDSN client, i.e., IRIS, SCEDC. See obspy.clients.fdsn
cache_dir (str): local database for metadata cache
sleep_time (int): give a random wait time for FDSN request
"""
super().__init__()
self.url_key = url_key
self.sleep_time = sleep_time

logger.info(f"Cache dir: ${cache_dir}")
logger.info(f"Using FDSN service by {self.url_key}")
logger.info(f"Cache dir: {cache_dir}")
self.cache = dc.Cache(cache_dir)

def get_full_channel(self, timespan: DateTimeRange, channel: Channel) -> Channel:
inv = self._get_inventory(str(timespan))
inv = self._get_inventory(channel.station)
return self.populate_from_inventory(inv, channel)

def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inventory:
return self._get_inventory(str(timespan))

def _get_cache_key(self, ts_str: str) -> str:
return f"{self.url_key}_{ts_str}"
return self._get_inventory(station)

@lru_cache
# pass the timestamp (DateTimeRange) as string so that the method is cacheable
# since DateTimeRange is not hasheable
def _get_inventory(self, ts_str: str) -> obspy.Inventory:
ts = DateTimeRange.from_range_text(ts_str)
key = self._get_cache_key(ts_str)
inventory = self.cache.get(key, None)
def _get_inventory(self, station: Station) -> obspy.Inventory:
inventory = self.cache.get(str(station), None) # check local cache
if inventory is None:
logging.info(f"Inventory not found in cache for key: '{key}'. Fetching from {self.url_key}.")
bulk_station_request = [
("*", "*", "*", "*", UTCDateTime(ts.start_datetime), UTCDateTime(ts.end_datetime))
]
logging.info(f"Inventory not found in cache for '{station}'. Fetching from {self.url_key}.")
# Don't send request too fast
time.sleep(np.random.uniform(self.sleep_time))
client = Client(self.url_key)
inventory = client.get_stations_bulk(bulk_station_request, level="channel")
self.cache[key] = inventory
try:
inventory = client.get_stations(
network=station.network,
station=station.name,
location="*",
channel="?H?,?N?",
level="channel",
)
except obspy.clients.fdsn.header.FDSNNoDataException:
logger.warning(f"FDSN returns no data for {station}. Returning empty Inventory()")
inventory = obspy.Inventory()
self.cache[str(station)] = inventory
return inventory


Expand Down Expand Up @@ -179,10 +188,10 @@ def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inve
station = inventory.Station(sta, lat, lon, elevation, channels=channels)
stations.append(station)
nets.append(inventory.Network(net, stations))
return inventory.Inventory(nets)
return obspy.Inventory(nets)


def sta_info_from_inv(inv: inventory.Inventory):
def sta_info_from_inv(inv: obspy.Inventory):
"""
this function outputs station info from the obspy inventory object
(used in S0B)
Expand Down Expand Up @@ -216,7 +225,7 @@ def sta_info_from_inv(inv: inventory.Inventory):
return sta, net, lon, lat, elv, location


def stats2inv_staxml(stats, respdir: str) -> inventory.Inventory:
def stats2inv_staxml(stats, respdir: str) -> obspy.Inventory:
if not respdir:
raise ValueError("Abort! staxml is selected but no directory is given to access the files")
else:
Expand All @@ -239,7 +248,7 @@ def stats2inv_staxml(stats, respdir: str) -> inventory.Inventory:


def stats2inv_sac(stats):
inv = inventory.Inventory(networks=[], source="homegrown")
inv = obspy.Inventory(networks=[], source="homegrown")
net = inventory.Network(
# This is the network code according to the SEED standard.
code=stats.network,
Expand Down Expand Up @@ -283,8 +292,8 @@ def stats2inv_sac(stats):
return inv


def stats2inv_mseed(stats, locs: pd.DataFrame) -> inventory.Inventory:
inv = inventory.Inventory(networks=[], source="homegrown")
def stats2inv_mseed(stats, locs: pd.DataFrame) -> obspy.Inventory:
inv = obspy.Inventory(networks=[], source="homegrown")
ista = locs[locs["station"] == stats.station].index.values.astype("int64")[0]

net = inventory.Network(
Expand Down Expand Up @@ -370,6 +379,3 @@ def cc_parameters(cc_para, coor, tcorr, ncorr, comp):
"comp": comp,
}
return parameters


# TODO: A channel catalog that uses the files in the SCEDC S3 bucket: s3://scedc-pds/FDSNstationXML/
6 changes: 4 additions & 2 deletions src/noisepy/seis/io/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,10 @@ class ConfigParameters(BaseModel):
sampling_rate: float = Field(default=20.0)
single_freq: bool = Field(
default=True,
description="Filter to only data sampled at ONE frequency (the closest >= to sampling_rate). "
"If False, it will use all data sample at >=sampling_rate",
description="Filter to only data sampled at ONE frequency (the closest >= to sampling_rate)."
" Mismatched frequencies will be dropped."
" If False, it will use all frequencies. "
" However, each station will still filter to only ONE frequency (the closest >= to sampling_rate)",
)

cc_len: int = Field(default=1800, description="basic unit of data length for fft (sec)")
Expand Down
23 changes: 17 additions & 6 deletions src/noisepy/seis/io/pnwstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import sqlite3
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta, timezone
from typing import Callable, List, Tuple

Expand Down Expand Up @@ -64,12 +65,18 @@ def _load_channels(self, full_path: str, chan_filter: Callable[[Channel], bool])
parts = full_path.split(os.path.sep)
assert len(parts) >= 4
net, year, doy = parts[-4:-1]

rst = self._dbquery(
cmd = (
f"SELECT DISTINCT network, station, channel, location, filename "
f"FROM tsindex WHERE filename LIKE '%%/{net}/{year}/{doy}/%%'"
f"FROM tsindex WHERE filename LIKE '%%/{net}/{year}/{doy}/%%' "
"AND (channel LIKE '_H_' OR channel LIKE '_N_') "
)

# if network is speficied, query will be faster
if net != "__":
cmd += f" AND network = '{net}'"
else:
logging.warning("Data path contains wildcards. Channel query might be slow.")
rst = self._dbquery(cmd)
for i in rst:
timespan = PNWDataStore._parse_timespan(os.path.basename(i[4]))
self.paths[timespan.start_datetime] = full_path
Expand All @@ -82,9 +89,13 @@ def _load_channels(self, full_path: str, chan_filter: Callable[[Channel], bool])
else:
self.channels[key].append(channel)

def get_channels(self, timespan: DateTimeRange) -> List[Channel]:
tmp_channels = self.channels.get(str(timespan), [])
return list(map(lambda c: self.chan_catalog.get_full_channel(timespan, c), tmp_channels))
def get_channels(self, date_range: DateTimeRange) -> List[Channel]:
tmp_channels = self.channels.get(str(date_range), [])
executor = ThreadPoolExecutor()
stations = set(map(lambda c: c.station, tmp_channels))
_ = list(executor.map(lambda s: self.chan_catalog.get_inventory(date_range, s), stations))
logger.info(f"Getting {len(tmp_channels)} channels for {date_range}")
return list(executor.map(lambda c: self.chan_catalog.get_full_channel(date_range, c), tmp_channels))

def get_timespans(self) -> List[DateTimeRange]:
return list([DateTimeRange.from_range_text(d) for d in sorted(self.channels.keys())])
Expand Down
4 changes: 2 additions & 2 deletions src/noisepy/seis/io/s3store.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(
def _load_channels(self, full_path: str, chan_filter: Callable[[Channel], bool]):
tlog = TimeLogger(logger=logger, level=logging.INFO)
msfiles = [f for f in self.fs.glob(fs_join(full_path, "*")) if self.file_re.match(f) is not None]
tlog.log(f"Loading {len(msfiles)} files from {full_path}")
tlog.log(f"Listing {len(msfiles)} files from {full_path}")
for f in msfiles:
timespan = self._parse_timespan(f)
self.paths[timespan.start_datetime] = full_path
Expand Down Expand Up @@ -93,7 +93,7 @@ def get_channels(self, date_range: DateTimeRange) -> List[Channel]:
executor = ThreadPoolExecutor()
stations = set(map(lambda c: c.station, tmp_channels))
_ = list(executor.map(lambda s: self.chan_catalog.get_inventory(date_range, s), stations))
logger.info(f"Getting {len(tmp_channels)} channels for {date_range}: {tmp_channels}")
logger.info(f"Getting {len(tmp_channels)} channels for {date_range}")
return list(executor.map(lambda c: self.chan_catalog.get_full_channel(date_range, c), tmp_channels))

def get_timespans(self) -> List[DateTimeRange]:
Expand Down
2 changes: 1 addition & 1 deletion src/noisepy/seis/io/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def log(self, message: str = None, start: float = -1.0) -> float:

def log_raw(self, message: str, dt: float):
if self.enabled:
self.logger.log(self.level, f"TIMING{self.prefix}: {dt:6.4f} secs. for {message}")
self.logger.log(self.level, f"TIMING{self.prefix}: {dt:6.3f} secs for {message}")
return self.time


Expand Down
Loading

0 comments on commit 7811873

Please sign in to comment.