Skip to content

Commit

Permalink
clean up dynamic window computation
Browse files Browse the repository at this point in the history
  • Loading branch information
dionhaefner committed Mar 11, 2021
1 parent 63679e2 commit 160476e
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 50 deletions.
33 changes: 26 additions & 7 deletions fowd/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,39 @@
# sea state aggregation periods (in minutes)
SEA_STATE_INTERVALS = (10, 30, 'dynamic')

# settings for dynamic sea state aggregation period
# the shortest and longest permitted dynamic window sizes (in minutes)
DYNAMIC_WINDOW_LENGTH_BOUNDS = (10, 60)
# time in minutes after which the dynamic window size is re-computed
DYNAMIC_WINDOW_UPDATE_FREQUENCY = 30
# dynamic window calculation is based on this many minutes before and after current wave
DYNAMIC_WINDOW_REFERENCE_PERIOD = (12 * 60, 0)
# total number of different window lengths to try within length bounds
NUM_DYNAMIC_WINDOWS = 11
# number of times each window is re-evaluated for better statistics
NUM_DYNAMIC_WINDOW_SAMPLES = 10

# gravitational acceleration in m/s^2
GRAVITY = 9.81

# reference density in kg/m^3
DENSITY = 1024

# QC thresholds
QC_FLAG_A_THRESHOLD = 25 # maximum allowed zero-crossing period (in seconds)
QC_FLAG_B_THRESHOLD = 2 # maximum allowed multiple of limit rate of change
QC_FLAG_C_THRESHOLD = 10 # maximum allowed number of consecutive identical values
QC_FLAG_D_THRESHOLD = 8 # maximum allowed exceedance of surface elevation MADN
QC_FLAG_E_THRESHOLD = 2 # maximum float precision to which sampling rate has to be uniform
QC_FLAG_F_THRESHOLD = 0.05 # maximum allowed ratio of invalid data
QC_FLAG_G_THRESHOLD = 100 # minimum number of zero-crossing periods in wave history
# maximum allowed zero-crossing period (in seconds)
QC_FLAG_A_THRESHOLD = 25
# maximum allowed multiple of limit rate of change
QC_FLAG_B_THRESHOLD = 2
# maximum allowed number of consecutive identical values
QC_FLAG_C_THRESHOLD = 10
# maximum allowed exceedance of surface elevation MADN
QC_FLAG_D_THRESHOLD = 8
# maximum float precision to which sampling rate has to be uniform
QC_FLAG_E_THRESHOLD = 2
# maximum allowed ratio of invalid data
QC_FLAG_F_THRESHOLD = 0.05
# minimum number of zero-crossing periods in wave history
QC_FLAG_G_THRESHOLD = 100

# QC logging options
# wave height (in units of sig. wave height) for which failed QC is logged
Expand Down
18 changes: 11 additions & 7 deletions fowd/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def find_wave_indices(z, start_idx=0):
active = True


def compute_dynamic_window_size(t, elevation, min_length, max_length, num_windows=10):
def compute_dynamic_window_size(t, elevation, min_length, max_length, num_windows, num_samples):
"""Compute best window size from data by minimizing fluctuations around mean energy drift.
Reference:
Expand All @@ -66,7 +66,6 @@ def compute_dynamic_window_size(t, elevation, min_length, max_length, num_window
www.nature.com, doi:10.1038/s41598-019-51706-8.
"""
# TODO: introduce constants for some of these choices
best_window_length = None
best_window_stationarity = float('inf')

Expand All @@ -76,17 +75,23 @@ def compute_dynamic_window_size(t, elevation, min_length, max_length, num_window
if isinstance(max_length, np.timedelta64):
max_length = int(max_length / np.timedelta64(1, 's'))

for window_length in np.linspace(min_length, max_length, num_windows, dtype='int'):
window_lengths = np.linspace(min_length, max_length, num_windows, dtype='int')

for window_length in window_lengths:
window_stationarity = []
for current_time_idx in np.linspace(0, window_length, 10, endpoint=False, dtype='int'):
if current_time_idx >= len(t):
window_offsets = np.linspace(0, window_length, num_samples, endpoint=False, dtype='int')

for window_offset in window_offsets:
if window_offset >= len(t):
continue

current_time_idx = window_offset
window_stds = []
while True:
next_time = t[current_time_idx] + np.timedelta64(window_length, 's')
if next_time > t[-1]:
break

next_time_idx = get_time_index(next_time, t)
window_stds.append(np.nanstd(elevation[current_time_idx:next_time_idx], ddof=1.5))
current_time_idx = next_time_idx
Expand Down Expand Up @@ -260,9 +265,8 @@ def compute_spectral_density(elevation, sample_dt):
nperseg = round(SPECTRUM_WINDOW_SIZE / sample_dt)
nfft = 2 ** (math.ceil(math.log(nperseg, 2))) # round to next higher power of 2
return scipy.signal.welch(
elevation, 1 / sample_dt,
elevation, 1 / sample_dt, window='hann',
nperseg=nperseg, nfft=nfft, noverlap=nperseg // 2,
window='hann', detrend=False # data is already detrended
)


Expand Down
2 changes: 1 addition & 1 deletion fowd/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@
dtype='int64',
attrs=dict(
long_name='Length of dynamically computed sea state window',
units='seconds',
units='minutes',
)
)
})
Expand Down
98 changes: 63 additions & 35 deletions fowd/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

from .constants import (
WAVE_HISTORY_LENGTH, SEA_STATE_INTERVALS,
QC_FAIL_LOG_THRESHOLD, QC_EXTREME_WAVE_LOG_THRESHOLD
QC_FAIL_LOG_THRESHOLD, QC_EXTREME_WAVE_LOG_THRESHOLD,
DYNAMIC_WINDOW_LENGTH_BOUNDS, DYNAMIC_WINDOW_UPDATE_FREQUENCY,
DYNAMIC_WINDOW_REFERENCE_PERIOD, NUM_DYNAMIC_WINDOWS,
NUM_DYNAMIC_WINDOW_SAMPLES
)

from .operators import (
Expand Down Expand Up @@ -112,6 +115,30 @@ def is_same_version(version1, version2):
return split_v1[:2] == split_v2[:2]


@contextlib.contextmanager
def strict_filelock(target_file):
"""File-based lock that throws an exception if lock is already in place."""
lockfile = f'{target_file}.lock'
if os.path.isfile(lockfile):
raise RuntimeError(
f'File {target_file} appears to be locked. '
'Make sure that no other process is accessing it, then remove manually.'
)

with open(lockfile, 'w'):
pass

try:
yield
finally:
try:
os.remove(lockfile)
except OSError:
warnings.warn(f'Could not remove lock file {lockfile}')


# main functions

def initialize_processing(record_file, state_file, input_hash):
"""Parse temporary files to re-start processing if possible."""
default_params = dict(
Expand Down Expand Up @@ -169,26 +196,37 @@ def initialize_processing(record_file, state_file, input_hash):
)


@contextlib.contextmanager
def strict_filelock(target_file):
"""File-based lock that throws an exception if lock is already in place."""
lockfile = f'{target_file}.lock'
if os.path.isfile(lockfile):
raise RuntimeError(
f'File {target_file} appears to be locked. '
'Make sure that no other process is accessing it, then remove manually.'
)
def get_dynamic_window_size(current_window_size, current_time, time, elevation, last_updated=None):
skip_update = (
last_updated is not None
and current_time - last_updated < np.timedelta64(DYNAMIC_WINDOW_UPDATE_FREQUENCY, 'm')
)

with open(lockfile, 'w'):
pass
if skip_update:
return current_window_size, last_updated

try:
yield
finally:
try:
os.remove(lockfile)
except OSError:
warnings.warn(f'Could not remove lock file {lockfile}')
print("not skipping")

ref_period_start, ref_period_end = DYNAMIC_WINDOW_REFERENCE_PERIOD
dynamic_window_start = get_time_index(
current_time - np.timedelta64(ref_period_start, 'm'),
time,
)
dynamic_window_end = get_time_index(
current_time + np.timedelta64(ref_period_end, 'm'),
time,
)
dynamic_window_idx = slice(dynamic_window_start, dynamic_window_end)
dynamic_sea_state_period = compute_dynamic_window_size(
time[dynamic_window_idx],
elevation[dynamic_window_idx],
min_length=np.timedelta64(DYNAMIC_WINDOW_LENGTH_BOUNDS[0], 'm'),
max_length=np.timedelta64(DYNAMIC_WINDOW_LENGTH_BOUNDS[1], 'm'),
num_windows=NUM_DYNAMIC_WINDOWS,
num_samples=NUM_DYNAMIC_WINDOW_SAMPLES,
)

return dynamic_sea_state_period // 60, current_time


def compute_wave_records(time, elevation, elevation_normalized, outfile, statefile,
Expand Down Expand Up @@ -264,7 +302,7 @@ def handle_output(wave_records, wave_params_history, num_flags_fired):
postfix=dict(waves_processed=str(local_wave_id)), initial=start_idx
)

dynamic_sea_state_period = None
dynamic_sea_state_period = dynamic_period_last_update = None

with strict_filelock(outfile), tqdm.tqdm(**pbar_kwargs) as pbar:
# initialize output files
Expand Down Expand Up @@ -347,22 +385,12 @@ def handle_output(wave_records, wave_params_history, num_flags_fired):
# compute sea state parameters
for sea_state_period in SEA_STATE_INTERVALS:
if sea_state_period == 'dynamic':
if local_wave_id % 1000 == 0 or dynamic_sea_state_period is None:
dynamic_window_start = get_time_index(
wave_params['start_time'] - np.timedelta64(12, 'h'),
time
)
dynamic_window_idx = slice(dynamic_window_start, wave_start)
dynamic_sea_state_period = compute_dynamic_window_size(
time[dynamic_window_idx],
elevation[dynamic_window_idx],
min_length=np.timedelta64(10, 'm'),
max_length=np.timedelta64(60, 'm'),
num_windows=11,
)

dynamic_sea_state_period, dynamic_period_last_update = get_dynamic_window_size(
dynamic_sea_state_period, wave_params["start_time"], time, elevation,
last_updated=dynamic_period_last_update
)
this_wave_records['sea_state_dynamic_window_length'] = dynamic_sea_state_period
offset = np.timedelta64(dynamic_sea_state_period, 's')
offset = np.timedelta64(dynamic_sea_state_period, 'm')
else:
offset = np.timedelta64(sea_state_period, 'm')

Expand Down

0 comments on commit 160476e

Please sign in to comment.