Skip to content
This repository has been archived by the owner on Jun 2, 2023. It is now read-only.

Adding State Updating and # of Tasks to LSTM and RGCN models #104

Merged
merged 34 commits into from
Jun 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
94d0454
adding DA capabilities to LSTM
May 25, 2021
b9f5393
updating loss functions and adding tasks to RGCN
May 26, 2021
a1ff04f
updating parameter definitions
May 26, 2021
b7e9046
updating dropout argument names
Jun 2, 2021
494b931
cleaning LSTM
jzwart Jun 2, 2021
b3f8df9
Being explicit about the number of tasks
jzwart Jun 2, 2021
34e9983
cleaning up init states call
jzwart Jun 2, 2021
10ffdbd
Updating rgcn for clarity
Jun 2, 2021
c97f179
Merge branch 'adding_da_to_models' of https://github.com/USGS-R/river…
Jun 2, 2021
cf95324
updating lambda_aux argument
Jun 2, 2021
088d4a7
making stateful based on return_state argument
Jun 2, 2021
4101879
separating output, rgcn layers
jsadler2 Jun 2, 2021
6fc687d
making a more generic multitask loss function
jsadler2 Jun 2, 2021
7c36a4f
adding SingletaskLSTMModel
jsadler2 Jun 2, 2021
c582d3d
Convert to MultitaskLSTM; update GRU classes
jsadler2 Jun 2, 2021
ab4cff1
explicit about number of tasks for y_data_components
jzwart Jun 3, 2021
3129bd5
convenience fxn weighted_masked_rmse
jsadler2 Jun 4, 2021
c8c140a
lambda_aux -> lambdas in rnns
jsadler2 Jun 4, 2021
3e54465
num_tasks, lambdas in train functions
jsadler2 Jun 4, 2021
e939460
Merge branch 'adding_da_to_models' of github.com:USGS-R/river-dl into…
jsadler2 Jun 4, 2021
17bfab8
[#106] taking train_step out in rnns
jsadler2 Jun 4, 2021
2c7580b
[#106] provide loss_func to train func; compiles rnns
jsadler2 Jun 4, 2021
3799509
[#98] multitask nse, kge functions; rm weights
jsadler2 Jun 4, 2021
a4d54e2
[#106] match train cli with train.py fxn
jsadler2 Jun 4, 2021
0f568ae
take out unneeded check on h_/c_init in RGCN
jsadler2 Jun 4, 2021
6cd7b52
[#98] don't pass weights to `fit` call
jsadler2 Jun 4, 2021
9bc45ca
add `num_tasks` to predict fxns
jsadler2 Jun 4, 2021
63bb515
Snakefile updates for lambdas, num_tasks, loss_func
jsadler2 Jun 4, 2021
8c80003
RGCN `states` attribute; just final states
jsadler2 Jun 4, 2021
3eca6f2
typo in predict
jsadler2 Jun 4, 2021
4527d67
Black formatting and docstring corrections
jsadler2 Jun 4, 2021
8d061f8
attr for rnns
jsadler2 Jun 4, 2021
1c1641a
"outputs" -> "variables" in `num_task` docstring
jsadler2 Jun 7, 2021
8362981
can provide h_/c_init to initalize rnn
jsadler2 Jun 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,18 @@ from river_dl.evaluate import combined_metrics
from river_dl.postproc_utils import plot_obs
from river_dl.predict import predict_from_io_data
from river_dl.train import train_model
from river_dl import loss_functions as lf

out_dir = config['out_dir']
code_dir = config['code_dir']
loss_function = lf.multitask_rmse(config['lambdas'])

rule all:
input:
expand("{outdir}/{metric_type}_metrics.csv",
outdir=out_dir,
metric_type=['overall', 'month', 'reach', 'month_reach'],
),
expand( "{outdir}/{plt_variable}_{partition}.png",
outdir=out_dir,
plt_variable=['temp', 'flow'],
partition=['trn', 'val'],
),

rule prep_io_data:
input:
Expand Down Expand Up @@ -62,7 +59,7 @@ rule prep_io_data:
# shell:
# """
# module load analytics cuda10.1/toolkit/10.1.105
# run_training -e /home/jsadler/.conda/envs/rgcn --no-node-list "python {code_dir}/train_model.py -o {params.run_dir} -i {input[0]} -p {params.pt_epochs} -f {params.ft_epochs} --lamb {params.lamb} --model rgcn -s 135"
# run_training -e /home/jsadler/.conda/envs/rgcn --no-node-list "python {code_dir}/train_model.py -o {params.run_dir} -i {input[0]} -p {params.pt_epochs} -f {params.ft_epochs} --lambdas {params.lamb} --loss_func multitask_rmse --model rgcn -s 135"
# """


Expand All @@ -79,7 +76,7 @@ rule train_model_local_or_cpu:
run_dir=lambda wildcards, output: os.path.split(output[0][:-1])[0],
run:
train_model(input[0], config['pt_epochs'], config['ft_epochs'], config['hidden_size'],
params.run_dir, model_type='rgcn', lamb=config['lamb'])
loss_func=loss_function, out_dir=params.run_dir, model_type='rgcn', num_tasks=2)

rule make_predictions:
input:
Expand All @@ -93,7 +90,7 @@ rule make_predictions:
predict_from_io_data(model_type='rgcn', model_weights_dir=model_dir,
hidden_size=config['hidden_size'], io_data=input[1],
partition=wildcards.partition, outfile=output[0],
logged_q=False)
logged_q=False, num_tasks=2)


def get_grp_arg(wildcards):
Expand Down
11 changes: 5 additions & 6 deletions config.yml
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
# Input files
obs_flow: "/home/jsadler/drb_data/obs_flow_full_raw"
obs_temp: "/home/jsadler/drb_data/obs_temp_full"
sntemp_file: "/home/jsadler/drb_data/uncal_sntemp_input_output"
catchment_attr: "/home/jsadler/drb_data/seg_attr_drb.feather"
dist_matrix: "/home/jsadler/drb_data/distance_matrix.npz"
obs_flow: "../drb-dl-model/data/in/obs_flow_subset"
obs_temp: "../drb-dl-model/data/in/obs_temp_subset"
sntemp_file: "../drb-dl-model/data/in/uncal_sntemp_input_output_subset"
dist_matrix: "../drb-dl-model/data/in/distance_matrix_subset.npz"

out_dir: "test_val_functionality"
code_dir: "/home/jsadler/river-dl/river_dl"

x_vars: ["seg_rain", "seg_tave_air", "seginc_swrad", "seg_length", "seginc_potet", "seg_slope", "seg_humid", "seg_elev"]
primary_variable: "flow"
lamb: 1
lambdas: [100, 100]

train_start_date:
- '1985-10-01'
Expand Down
136 changes: 71 additions & 65 deletions river_dl/RGCN.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,27 @@


class RGCN(layers.Layer):
def __init__(self, hidden_size, A, flow_in_temp=False, rand_seed=None):
def __init__(
self, hidden_size, A, recurrent_dropout=0, dropout=0, rand_seed=None,
):
"""

:param hidden_size: [int] the number of hidden units
:param A: [numpy array] adjacency matrix
:param flow_in_temp: [bool] whether the flow predictions should feed
into the temp predictions
:param recurrent_dropout: [float] value between 0 and 1 for the
probability of a recurrent element to be zero
:param dropout: [float] value between 0 and 1 for the probability of an
input element to be zero
:param rand_seed: [int] the random seed for initialization
"""
super().__init__()
self.hidden_size = hidden_size
self.A = A.astype("float32")
self.flow_in_temp = flow_in_temp

# set up the layer
self.lstm = tf.keras.layers.LSTMCell(hidden_size)
self.lstm = tf.keras.layers.LSTMCell(
hidden_size, recurrent_dropout=recurrent_dropout, dropout=dropout
)

### set up the weights ###
w_initializer = tf.random_normal_initializer(
Expand Down Expand Up @@ -88,44 +93,15 @@ def __init__(self, hidden_size, A, flow_in_temp=False, rand_seed=None):
shape=[hidden_size], initializer="zeros", name="b_c"
)

if self.flow_in_temp:
# was W2
self.W_out_flow = self.add_weight(
shape=[hidden_size, 1], initializer=w_initializer, name="W_out"
)
# was b2
self.b_out_flow = self.add_weight(
shape=[1], initializer="zeros", name="b_out"
)

self.W_out_temp = self.add_weight(
shape=[hidden_size + 1, 1],
initializer=w_initializer,
name="W_out",
)

self.b_out_temp = self.add_weight(
shape=[1], initializer="zeros", name="b_out"
)
else:
# was W2
self.W_out = self.add_weight(
shape=[hidden_size, 2], initializer=w_initializer, name="W_out"
)
# was b2
self.b_out = self.add_weight(
shape=[2], initializer="zeros", name="b_out"
)

@tf.function
def call(self, inputs, **kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to add a docstring to this function so that we are clear on what the arguments are.

graph_size = self.A.shape[0]
hidden_state_prev, cell_state_prev = (
tf.zeros([graph_size, self.hidden_size]),
tf.zeros([graph_size, self.hidden_size]),
)
out = []
h_list = []
c_list = []
n_steps = inputs.shape[1]
# set the initial h & c states to the supplied h and c states if using
# DA, or 0's otherwise
hidden_state_prev = tf.cast(kwargs["h_init"], tf.float32)
cell_state_prev = tf.cast(kwargs["c_init"], tf.float32)
for t in range(n_steps):
h_graph = tf.nn.tanh(
tf.matmul(
Expand Down Expand Up @@ -157,42 +133,72 @@ def call(self, inputs, **kwargs):
+ self.b_c
)

if self.flow_in_temp:
out_pred_q = (
tf.matmul(h_update, self.W_out_flow) + self.b_out_flow
)
out_pred_t = (
tf.matmul(
tf.concat([h_update, out_pred_q], axis=1),
self.W_out_temp,
)
+ self.b_out_temp
)
out_pred = tf.concat([out_pred_t, out_pred_q], axis=1)
else:
out_pred = tf.matmul(h_update, self.W_out) + self.b_out

out.append(out_pred)

hidden_state_prev = h_update
cell_state_prev = c_update
out = tf.stack(out)
out = tf.transpose(out, [1, 0, 2])
return out

h_list.append(h_update)
c_list.append(c_update)
jsadler2 marked this conversation as resolved.
Show resolved Hide resolved

h_list = tf.stack(h_list)
c_list = tf.stack(c_list)
h_list = tf.transpose(h_list, [1, 0, 2])
c_list = tf.transpose(c_list, [1, 0, 2])
return h_list, c_list


class RGCNModel(tf.keras.Model):
def __init__(self, hidden_size, A, flow_in_temp=False, rand_seed=None):
def __init__(
self,
hidden_size,
A,
num_tasks=1,
recurrent_dropout=0,
dropout=0,
rand_seed=None,
):
"""
:param hidden_size: [int] the number of hidden units
:param A: [numpy array] adjacency matrix
:param flow_in_temp: [bool] whether the flow predictions should feed
:param num_tasks: [int] number of prediction tasks to perform -
currently supports either 1 or 2 prediction tasks
:param recurrent_dropout: [float] value between 0 and 1 for the
probability of a recurrent element to be zero
:param dropout: [float] value between 0 and 1 for the probability of an
input element to be zero
into the temp predictions
:param rand_seed: [int] the random seed for initialization
"""
super().__init__()
self.rgcn_layer = RGCN(hidden_size, A, flow_in_temp, rand_seed)
self.hidden_size = hidden_size
self.num_tasks = num_tasks
self.recurrent_dropout = recurrent_dropout
self.dropout = dropout

self.rgcn_layer = RGCN(
hidden_size, A, recurrent_dropout, dropout, rand_seed
)

self.states = None

self.dense_main = layers.Dense(1, name="dense_main")
if self.num_tasks == 2:
self.dense_aux = layers.Dense(1, name="dense_aux")

def call(self, inputs, **kwargs):
output = self.rgcn_layer(inputs)
return output
batch_size = inputs.shape[0]
h_init = kwargs.get("h_init", tf.zeros([batch_size, self.hidden_size]))
c_init = kwargs.get("c_init", tf.zeros([batch_size, self.hidden_size]))
h_gr, c_gr = self.rgcn_layer(inputs, h_init=h_init, c_init=c_init)
self.states = h_gr[:, -1, :], c_gr[:, -1, :]

if self.num_tasks == 1:
main_prediction = self.dense_main(h_gr)
return main_prediction
elif self.num_tasks == 2:
main_prediction = self.dense_main(h_gr)
aux_prediction = self.dense_aux(h_gr)
return tf.concat([main_prediction, aux_prediction], axis=2)
else:
raise ValueError(
f"This model only supports 1 or 2 tasks (not {self.num_tasks})"
)
68 changes: 25 additions & 43 deletions river_dl/loss_functions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import numpy as np
import tensorflow as tf


Expand Down Expand Up @@ -69,55 +68,43 @@ def samplewise_nnse_loss(y_true, y_pred):
return 1 - nnse_val


def nnse_masked_one_var(data, y_pred, var_idx):
y_true, y_pred, weights = y_data_components(data, y_pred, var_idx)
return nnse_loss(y_true, y_pred)
def multitask_nse(lambdas):
return multitask_loss(lambdas, nnse_loss)


def nnse_one_var_samplewise(data, y_pred, var_idx):
y_true, y_pred, weights = y_data_components(data, y_pred, var_idx)
return samplewise_nnse_loss(y_true, y_pred)
def multitask_samplewise_nse(lambdas):
return multitask_loss(lambdas, samplewise_nnse_loss)


def y_data_components(data, y_pred, var_idx):
weights = data[:, :, -2:]
y_true = data[:, :, :-2]
def multitask_rmse(lambdas):
return multitask_loss(lambdas, rmse)

# ensure y_pred, weights, and y_true are all tensors the same data type
y_true = tf.convert_to_tensor(y_true)
weights = tf.convert_to_tensor(weights)
y_true = tf.cast(y_true, y_pred.dtype)
weights = tf.cast(weights, y_pred.dtype)

# make all zero-weighted observations 'nan' so they don't get counted
# at all in the loss calculation
y_true = tf.where(weights == 0, np.nan, y_true)
def multitask_kge(lambdas):
return multitask_loss(lambdas, kge_loss)

weights = weights[:, :, var_idx]
y_true = y_true[:, :, var_idx]
y_pred = y_pred[:, :, var_idx]
return y_true, y_pred, weights


def rmse_masked_one_var(data, y_pred, var_idx):
y_true, y_pred, weights = y_data_components(data, y_pred, var_idx)
return rmse(y_true, y_pred)


def weighted_masked_rmse(lamb=0.5):
def multitask_loss(lambdas, loss_func):
"""
calculate a weighted, masked rmse.
:param lamb: [float] (short for lambda). The factor that the auxiliary loss
will be multiplied by before added to the main loss.
calculate a weighted multi-task loss for a given number of variables with a
given loss function
:param lambdas: [array-like float] The factor that losses will be
multiplied by before being added together.
:param loss_func: [function] Loss function that will be used to calculate
the loss of each variable. Must take as input parameters [y_true, y_pred]
"""

def rmse_masked_combined(data, y_pred):
rmse_main = rmse_masked_one_var(data, y_pred, 0)
rmse_aux = rmse_masked_one_var(data, y_pred, 1)
rmse_loss = rmse_main + lamb * rmse_aux
return rmse_loss
def combine_loss(y_true, y_pred):
losses = []
n_vars = y_pred.shape[-1]
for var_id in range(n_vars):
ind_var_loss = loss_func(y_true[:, :, var_id], y_pred[:, :, var_id])
weighted_ind_var_loss = lambdas[var_id] * ind_var_loss
losses.append(weighted_ind_var_loss)
total_loss = sum(losses)
return total_loss

return rmse_masked_combined
return combine_loss


def mean_masked(y):
Expand Down Expand Up @@ -181,10 +168,5 @@ def kge_norm_loss(y_true, y_pred):
return 1 - norm_kge(y_true, y_pred)


def kge_loss_one_var(data, y_pred, var_idx):
y_true, y_pred, weights = y_data_components(data, y_pred, var_idx)
return kge_loss(y_true, y_pred)


def kge_loss(y_true, y_pred):
return -1 * kge(y_true, y_pred)
Loading