Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catalog_to_dd.py: input and output of dt.cc #516

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
* Iman Kahbasi
* eQ Halauwet
* Glenn Nelson
* Eleanor R. H. Mestel
107 changes: 103 additions & 4 deletions eqcorrscan/utils/catalog_to_dd.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,13 +464,73 @@ def _prep_horiz_picks(catalog, stream_dict, event_id_mapper):
event.picks.append(new_pick)
return catalog

def _read_correlation_file(existing_corr_file, event_id_mapper=None):
"""
Read in existing correlation file into a dictionary of existing pairs

:type existing_corr_file: str
:param existing_corr_file:
File path for existing correlations file.

:rtype: dict
:return: Existing pairs in format {id1: [id2, id3 ...], id2: [id3 ...]}

"""
existing_pairs = {}
counter = 0
with open(existing_corr_file, "r") as f:
for line in f:
cols = line.split()
if cols[0] == "#":
in_event_id_1 = int(cols[1])
in_event_id_2 = int(cols[2])
in_XX = cols[3] ## not clear to me what this 0.0 is


for key, value in event_id_mapper.items():
if value == in_event_id_1:
in_eventid1 = key
elif value == in_event_id_2:
in_eventid2 = key

if in_eventid1 in existing_pairs:
existing_pairs[in_eventid1].append(in_eventid2)
else:
existing_pairs.update({in_eventid1: [in_eventid2]})

counter +=1

# # tried to set it up with differential_times dictionary
# # but can't make full _DTObs because no tt1 & tt2 just dt
#
# diff_time = _EventPair(
# event_id_1=in_event_id_1,
# event_id_2=in_event_id_2)
#
#elif len(cols) == 4:
# in_sta = cols[0]
# in_dt = float(cols[1])
# in_weight = float(cols[2])
# in_phase = cols[3]
#
#
# diff_time.obs.append(
# _DTObs(station=in_sta,
# tt1=XX,
# tt2=XX, weight=in_weight,
# phase=in_phase))

Logger.info(
f"{counter} existing correlation measurements from {existing_corr_file}")
return existing_pairs

def compute_differential_times(catalog, correlation, stream_dict=None,
event_id_mapper=None, max_sep=8., min_link=8,
min_cc=None, extract_len=None, pre_pick=None,
shift_len=None, interpolate=False,
all_horiz=False, max_workers=None,
max_trace_workers=1, *args, **kwargs):
max_trace_workers=1, existing_corr_file=None,
*args, **kwargs):
"""
Generate groups of differential times for a catalog.

Expand Down Expand Up @@ -517,6 +577,9 @@ def compute_differential_times(catalog, correlation, stream_dict=None,
Maximum number of workers for parallel correlation of traces insted of
events. If None then all threads will be used (but can only be used
when max_workers = 1).
:type existing_corr_file: str
:param existing_corr_file:
File path for existing correlations file.

:rtype: dict
:return: Dictionary of differential times keyed by event id.
Expand All @@ -534,6 +597,11 @@ def compute_differential_times(catalog, correlation, stream_dict=None,
multiple events and may require more memory, but the latter can be
quicker for few events with many or very long traces and requires less
memory.

.. note::
The existing_corr_file is only checked for event pairs, not stations
calculated within pairs. It doesn't account for the addtion of new
data to an existing event pair.
"""
include_master = kwargs.get("include_master", False)
correlation_kwargs = dict(
Expand Down Expand Up @@ -563,7 +631,14 @@ def compute_differential_times(catalog, correlation, stream_dict=None,

additional_args = dict(min_link=min_link, event_id_mapper=event_id_mapper)
if correlation:
# importing exisitng correlation pairs if they exist
differential_times = {}
if existing_corr_file is not None:
existing_pairs = _read_correlation_file(
existing_corr_file=existing_corr_file,
event_id_mapper=event_id_mapper)
##TODO: implement checking whether there are stations missing
# from existing correlations
additional_args.update(correlation_kwargs)
n = len(sparse_catalog)
if max_workers == 1:
Expand All @@ -574,6 +649,11 @@ def compute_differential_times(catalog, correlation, stream_dict=None,
master_id = str(master.resource_id)
sub_catalog = [ev for j, ev in enumerate(sparse_catalog)
if distance_filter[i][j]]
if existing_corr_file is not None and master_id in existing_pairs:
sub_catalog = [ev for j,ev in enumerate(sub_catalog)
if ev.resource_id.id not in existing_pairs[master_id]]
Logger.info(
f"Existing correlations for {master_id}: {len(existing_pairs[master_id])}")
if master_id not in additional_args["stream_dict"].keys():
Logger.warning(
f"{master_id} not in waveforms, skipping")
Expand Down Expand Up @@ -699,7 +779,8 @@ def write_correlations(catalog, stream_dict, extract_len, pre_pick,
shift_len, event_id_mapper=None, lowcut=1.0,
highcut=10.0, max_sep=8, min_link=8, min_cc=0.0,
interpolate=False, all_horiz=False, max_workers=None,
parallel_process=False, *args, **kwargs):
parallel_process=False, existing_corr_file=None,
output_filename="dt.cc", *args, **kwargs):
"""
Write a dt.cc file for hypoDD input for a given list of events.

Expand Down Expand Up @@ -746,6 +827,12 @@ def write_correlations(catalog, stream_dict, extract_len, pre_pick,
:param parallel_process:
Whether to process streams in parallel or not. Experimental, may use
too much memory.
:type existing_corr_file: str
:param existing_corr_file:
File path for existing correlations file.
:type output_filename: str
:param output_filename:
File path for output correlations file.

:rtype: dict
:returns: event_id_mapper
Expand All @@ -754,6 +841,10 @@ def write_correlations(catalog, stream_dict, extract_len, pre_pick,
You can provide processed waveforms, or let this function filter your
data for you. Filtering is undertaken by detrending and bandpassing
with a 8th order zerophase butterworth filter.

.. note::
Existing correlations will be output first (without checking),
followed by any additional new calculations.
"""
# Depreciated argument
cc_thresh = kwargs.get("cc_thresh", None)
Expand Down Expand Up @@ -787,8 +878,16 @@ def write_correlations(catalog, stream_dict, extract_len, pre_pick,
max_sep=max_sep, min_link=min_link, max_workers=max_workers,
stream_dict=processed_stream_dict, min_cc=min_cc,
extract_len=extract_len, pre_pick=pre_pick, shift_len=shift_len,
interpolate=interpolate, all_horiz=all_horiz, **kwargs)
with open("dt.cc", "w") as f:
interpolate=interpolate, all_horiz=all_horiz,
existing_corr_file=existing_corr_file, **kwargs)
with open(output_filename, "w") as f:
if existing_corr_file is not None:
with open(existing_corr_file, "r") as in_f:
for line in in_f:
f.write(line)
## TODO: speed up saving existing correlations output (do I need to write out every line individually?)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to just dump everything you can do something like:

with open(blah) as f:
    f.write(["\n".join(l for l in in_f])

assuming that your lines don't already have a newline character at the end of them. If they do then you can just ''.join(in_f) and write that string.

## TODO: sort output by event numbers (interleaving existing with new)
## TODO: check existing correlations for those which wouldn't be calculated otherwise?
for master_id, linked_events in correlation_times.items():
for linked_event in linked_events:
f.write(linked_event.cc_string)
Expand Down