Skip to content

Commit

Permalink
AWS Batch: split CC and stack yaml files (#240)
Browse files Browse the repository at this point in the history
* AWS Batch: split CC and stack yaml files

* Missed line

* fix test

* Template testing

* fix tests
  • Loading branch information
carlosgjs authored Oct 13, 2023
1 parent 5334d63 commit 1fa1daf
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 62 deletions.
20 changes: 0 additions & 20 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -134,23 +134,3 @@ jobs:
--xml_path s3://scedc-pds/FDSNstationXML/CI/ \
--stations "SBC,RIO" --start_date 2022-02-02 --end_date 2022-02-04 \
--config configs/s3_anon.yaml
s3_singlepath:
strategy:
fail-fast: true
matrix:
python_version: ['3.9', '3.10']
runs-on: ubuntu-22.04
steps:
- name: Checkout Repo
uses: actions/[email protected]
- name: Setup NoisePy
uses: ./.github/actions/setup
with:
python-version: ${{matrix.python_version}}
- name: Test S3 data with a single path
run: |
noisepy cross_correlate --raw_data_path s3://scedc-pds/continuous_waveforms/2022/2022_002/ \
--ccf_path $RUNNER_TEMP/CCF_S3 --freq_norm rma \
--xml_path s3://scedc-pds/FDSNstationXML/CI/ \
--stations "SBC,RIO" \
--config configs/s3_anon.yaml
1 change: 1 addition & 0 deletions src/noisepy/seis/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
AWS_EXECUTION_ENV = "AWS_EXECUTION_ENV"
NO_DATA_MSG = "Abort! no available seismic files for FFT"
NO_CCF_DATA_MSG = "Abort! no available CCF data for stacking"
WILD_CARD = "*"
15 changes: 3 additions & 12 deletions src/noisepy/seis/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@
from typing import Any, Callable, Iterable, List, Optional

import dateutil.parser
import obspy
from datetimerange import DateTimeRange

from . import __version__
from .asdfstore import ASDFCCStore, ASDFRawDataStore, ASDFStackStore
from .channel_filter_store import LocationChannelFilterStore
from .channelcatalog import CSVChannelCatalog, XMLStationChannelCatalog
from .constants import CONFIG_FILE, STATION_FILE
from .constants import CONFIG_FILE, STATION_FILE, WILD_CARD
from .correlate import cross_correlate
from .datatypes import Channel, ConfigParameters
from .download import download
Expand All @@ -36,7 +35,6 @@
# Utility running the different steps from the command line. Defines the arguments for each step

default_data_path = "noisepy_data"
WILD_CARD = "*"


class Command(Enum):
Expand Down Expand Up @@ -141,12 +139,6 @@ def filter(ch: Channel) -> bool:
return filter


def get_date_range(args) -> DateTimeRange:
if "start_date" not in args or args.start_date is None or "end_date" not in args or args.end_date is None:
return None
return DateTimeRange(obspy.UTCDateTime(args.start_date).datetime, obspy.UTCDateTime(args.end_date).datetime)


def create_raw_store(args, params: ConfigParameters):
raw_dir = args.raw_data_path

Expand All @@ -167,12 +159,11 @@ def count(pat):
else:
raise ValueError(f"Either an --xml_path argument or a {STATION_FILE} must be provided")

date_range = get_date_range(args)
store = SCEDCS3DataStore(
raw_dir,
catalog,
get_channel_filter(params.net_list, params.stations, params.channels),
date_range,
DateTimeRange(params.start_date, params.end_date),
params.storage_options,
)
# Some SCEDC channels have duplicates differing only by location, so filter them out
Expand Down Expand Up @@ -304,7 +295,7 @@ def make_step_parser(subparsers: Any, cmd: Command, paths: List[str]) -> Any:
default="info",
choices=["notset", "debug", "info", "warning", "error", "critical"],
)
parser.add_argument("--logfile", type=str, default=None, help="Log file")
parser.add_argument("--logfile", type=str, default="log.txt", help="Log file")
parser.add_argument(
"-c", "--config", type=lambda f: _valid_config_file(parser, f), required=False, help="Configuration YAML file"
)
Expand Down
15 changes: 11 additions & 4 deletions src/noisepy/seis/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
import pandas as pd
from datetimerange import DateTimeRange

from noisepy.seis.constants import NO_CCF_DATA_MSG

from . import noise_module
from .constants import NO_CCF_DATA_MSG, WILD_CARD
from .datatypes import ConfigParameters, Stack, StackMethod, Station
from .scheduler import Scheduler, SingleNodeScheduler
from .stores import CrossCorrelationDataStore, StackStore
Expand Down Expand Up @@ -55,9 +54,17 @@ def stack_cross_correlations(
tlog = TimeLogger(logger=logger, level=logging.INFO)
t_tot = tlog.reset()

stations = set(fft_params.stations)
networks = set(fft_params.net_list)

def sta_filter(sta: Station) -> bool:
return (WILD_CARD in stations or sta.name in stations) and (WILD_CARD in networks or sta.network in networks)

def initializer():
# Important to have them sorted when we distribute work across nodes
pairs_all = sorted(cc_store.get_station_pairs(), key=lambda x: str(x))
pairs_all = sorted(
filter(lambda p: sta_filter(p[0]) and sta_filter(p[1]), cc_store.get_station_pairs()), key=lambda x: str(x)
)
if len(pairs_all) == 0:
raise IOError(NO_CCF_DATA_MSG)

Expand All @@ -78,7 +85,7 @@ def initializer():
if not all(results):
failed = [p for p, r in zip(pairs_node, results) if not r]
failed_str = "\n".join(map(str, failed))
raise RuntimeError(
logger.error(
f"Error stacking {len(failed)}/{len(results)} pairs. Check the logs for more information. "
f"Failed pairs: \n{failed_str}"
)
Expand Down
12 changes: 12 additions & 0 deletions tests/test_cross_correlation.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os
from unittest.mock import Mock

import pytest
from test_channelcatalog import MockCatalog

from noisepy.seis.constants import NO_DATA_MSG
from noisepy.seis.correlate import (
_filter_channel_data,
_safe_read_data,
Expand Down Expand Up @@ -42,6 +44,16 @@ def test_safe_read_channels():
assert ch_data.data.size == 0


def test_correlation_nodata():
config = ConfigParameters()
raw_store = Mock()
raw_store.get_timespans.return_value = []
cc_store = Mock()
with pytest.raises(IOError) as excinfo:
cross_correlate(raw_store, config, cc_store)
assert NO_DATA_MSG in str(excinfo.value)


def test_correlation():
config = ConfigParameters()
config.samp_freq = 1.0
Expand Down
6 changes: 2 additions & 4 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import obspy
import pytest

from noisepy.seis.constants import NO_CCF_DATA_MSG, NO_DATA_MSG
from noisepy.seis.constants import NO_CCF_DATA_MSG
from noisepy.seis.main import (
Command,
_valid_config_file,
Expand Down Expand Up @@ -49,9 +49,7 @@ def run_cmd_with_empty_dirs(cmd: Command, args: List[str]):

def test_main_cc(tmp_path):
tmp = str(tmp_path)
with pytest.raises(IOError) as excinfo:
run_cmd_with_empty_dirs(Command.CROSS_CORRELATE, [empty("raw_data", tmp), empty("xml", tmp)])
assert NO_DATA_MSG in str(excinfo.value)
run_cmd_with_empty_dirs(Command.CROSS_CORRELATE, [empty("raw_data", tmp), empty("xml", tmp)])


def test_main_stack(tmp_path):
Expand Down
7 changes: 3 additions & 4 deletions tests/test_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __reduce__(self):
return (MagicMock, ())


def test_stack_error():
def test_stack_error(caplog):
ts = date_range(1, 1, 2)
config = ConfigParameters(start_date=ts.start_datetime, end_date=ts.end_datetime)
sta = Station("CI", "BAK")
Expand All @@ -51,9 +51,8 @@ def test_stack_error():
stack_store.contains.return_value = False
with patch("noisepy.seis.stack.ProcessPoolExecutor") as mock_executor:
mock_executor.return_value = ThreadPoolExecutor(1)
with pytest.raises(RuntimeError) as e:
stack_cross_correlations(cc_store, stack_store, config)
assert "CI.BAK" in str(e)
stack_cross_correlations(cc_store, stack_store, config)
assert any(str(sta) in rec.message for rec in caplog.records if rec.levelname == "ERROR")


def test_stack_contains():
Expand Down
21 changes: 16 additions & 5 deletions tutorials/cloud/aws.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,20 +212,31 @@ aws batch create-job-queue --no-cli-pager --cli-input-yaml file://job_queue.yaml

## Create a Job Definition

Update the `jobRoleArn` and `executionRoleArn` fields in the `job_definition.yaml` file with the ARN of the role created in the first step. Add a name for the `jobDefinition` and update the `command` argument as needed (e.g., update the `ccf_path` argument). This command will become the default command but can still be overriden in individual jobs. You can adjust the timeout as appropriate too. Finally, run:
Update the `jobRoleArn` and `executionRoleArn` fields in the `job_definition.yaml` file with the ARN of the role created in the first step. Add a name for the `jobDefinition`. Finally, run:

```
aws batch register-job-definition --no-cli-pager --cli-input-yaml file://job_definition.yaml
```

## Submit a job
## Submit a Cross-Correlation job

Update `job.yaml` with a name and the names of your job queue and job definitions created in the last steps. You can then submit a job with:
Update `job_cc.yaml` with the names of your `jobQueue` and `jobDefinition` created in the last steps. Then update the S3 bucket paths
to the locations you want to use for the output and your `config.yaml` file.

```
aws batch submit-job --no-cli-pager --cli-input-yaml file://job.yaml --job-name "job_name_override"
aws batch submit-job --no-cli-pager --cli-input-yaml file://job_cc.yaml --job-name "<your job name>"
```

## Submit a Stacking job

Update `job_stack.yaml` with the names of your `jobQueue` and `jobDefinition` created in the last steps. Then update the S3 bucket paths
to the locations you want to use for your input CCFs (e.g. the output of the previous CC run), and the stack output. By default, NoisePy will look for a config
file in the `--ccf_path` location to use the same configuration for stacking that was used for cross-correlation.

```
aws batch submit-job --no-cli-pager --cli-input-yaml file://job_stack.yaml --job-name "<your job name>"
```

## Multi-node (array) jobs

See comment above `arrayProperties` in `job.yaml` for instructions on how to process in parallel across multiple nodes.
See comment above `arrayProperties` in `job_cc.yaml` and `job_stack.yaml` for instructions on how to process in parallel across multiple nodes.
18 changes: 18 additions & 0 deletions tutorials/cloud/job_cc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
jobName: 'noisepy-cross-correlate'
jobQueue: ''
jobDefinition: '' # [REQUIRED] The job definition used by this job.
# Uncomment to run a job across multiple nodes. The days in the time range will be split across the nodes.
# arrayProperties:
# size: 16 # number of nodes
containerOverrides: # An object with various properties that override the defaults for the job definition that specify the name of a container in the specified job definition and the overrides it should receive.
resourceRequirements:
- value: '90112' # CC requires more memory
type: MEMORY
command: # The command to send to the container that overrides the default command from the Docker image or the job definition.
- cross_correlate
- --raw_data_path=s3://scedc-pds/continuous_waveforms/
- --xml_path=s3://scedc-pds/FDSNstationXML/CI/
- --ccf_path=s3://<YOUR_S3_BUCKET>/<CC_PATH>
- --config=s3://<YOUR_S3_BUCKET>/<CONFIG_PATH>/config.yaml
timeout:
attemptDurationSeconds: 36000 # 10 hrs
16 changes: 3 additions & 13 deletions tutorials/cloud/job_definition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,13 @@ platformCapabilities:
containerProperties:
image: 'ghcr.io/noisepy/noisepy'
command:
- 'cross_correlate'
- --format=numpy
- --raw_data_path=s3://scedc-pds/continuous_waveforms/
- --xml_path=s3://scedc-pds/FDSNstationXML/CI/
- --ccf_path=s3://<YOUR_S3_BUCKET>/<CC_PATH>
- --net_list=CI
- --stations=*
- --start=2022-02-02
- --end=2022-02-03
- --loglevel=debug
- --logfile=log.txt
- '--help'
jobRoleArn: ''
executionRoleArn: ''
resourceRequirements: # The type and amount of resources to assign to a container.
- value: '16'
type: VCPU
- value: '65536'
- value: '32768'
type: MEMORY
networkConfiguration: # The network configuration for jobs that are running on Fargate resources.
assignPublicIp: ENABLED # Indicates whether the job has a public IP address. Valid values are: ENABLED, DISABLED.
Expand All @@ -31,4 +21,4 @@ retryStrategy: # The retry strategy to use for failed jobs that are submitted wi
attempts: 1 # The number of times to move a job to the RUNNABLE status.
propagateTags: true # Specifies whether to propagate the tags from the job or job definition to the corresponding Amazon ECS task.
timeout: # The timeout configuration for jobs that are submitted with this job definition, after which Batch terminates your jobs if they have not finished.
attemptDurationSeconds: 10000 # The job timeout time (in seconds) that's measured from the job attempt's startedAt timestamp.
attemptDurationSeconds: 36000 # The job timeout time (in seconds) that's measured from the job attempt's startedAt timestamp.
16 changes: 16 additions & 0 deletions tutorials/cloud/job_stack.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
jobName: 'noisepy-stack'
jobQueue: ''
jobDefinition: '' # [REQUIRED] The job definition used by this job.
# Uncomment to run a job across multiple nodes. The station pairs to be stacked will be split across the nodes.
# arrayProperties:
# size: 16 # number of nodes
containerOverrides: # An object with various properties that override the defaults for the job definition that specify the name of a container in the specified job definition and the overrides it should receive.
resourceRequirements:
- value: '32768'
type: MEMORY
command: # The command to send to the container that overrides the default command from the Docker image or the job definition.
- stack
- --ccf_path=s3://<YOUR_S3_BUCKET>/<CC_PATH>
- --stack_path=s3://<YOUR_S3_BUCKET>/<STACK_PATH>
timeout:
attemptDurationSeconds: 7200 # 2 hrs

0 comments on commit 1fa1daf

Please sign in to comment.