-
Notifications
You must be signed in to change notification settings - Fork 1
/
Precip_pipe_QAQC.py
251 lines (207 loc) · 14 KB
/
Precip_pipe_QAQC.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# This code attempts to QA/QC the PC_Raw_Pipe data in a full year for all
# wx stations and all years
# Written and modified by Julien Bodart (VIU) - 14.07.2024
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
import re
import datetime as dtime
from sqlalchemy import create_engine, MetaData, Table
#%% import support functions
import qaqc_functions
from push_sql_function import get_engine, get_metadata, update_records
from qaqc_stations_list import *
#%% establish a connection with MySQL database 'viuhydro_wx_data_v2'
import config
engine = config.main_sql()
metadata = get_metadata(engine)
#%% create list of stations to qaqc for this variable
var = 'PC_Raw_Pipe'
var_flags = var + '_flags'
wx_stations = {name: globals()[name] for name in globals() if name.startswith('clean_')}
wx_stations = [station for station, variables in wx_stations.items() if var in variables]
wx_stations_name = list(map(lambda st: str.replace(st, 'clean_', ''), wx_stations)) # remove 'clean_' for csv export
wx_stations_name_cap = [wx_name.capitalize() for wx_name in wx_stations_name] # capitalise station name
#%% Loop over each station at a time and clean up the precip pipe variable
for l in range(len(wx_stations_name)):
sql_database = wx_stations_name[l]
sql_name = wx_stations_name_cap[l]
print('###### Cleaning Precip pipe data for station: %s ######' %(sql_name))
#%% import current data from "clean"
sql_file = pd.read_sql(sql="SELECT * FROM clean_" + sql_database, con = engine)
#%% time in rennell and datlamen is not rounded to nearest hour
# round time in the clean db data so it matches the qaqc db
if wx_stations_name[l] == 'rennellpass' or wx_stations_name[l] == 'datlamen':
sql_file = sql_file.copy()
sql_file ['DateTime'] = pd.to_datetime(sql_file['DateTime'])
sql_file['DateTime'] = sql_file['DateTime'].dt.floor('h')
deltas = sql_file['DateTime'].diff()[1:]
same_vals = deltas[deltas < timedelta(hours=1)]
sql_file = sql_file.drop(same_vals.index)
sql_file = sql_file.set_index('DateTime').asfreq('h').reset_index() # make sure records are continuous every hour
# else if not rennell or datlamen (i.e. for all other stations), make sure
# time is consecutively increasing by one hour, if not add records and place nans
else:
sql_file = sql_file.set_index('DateTime').asfreq('h').reset_index() # make sure records are continuous every hour
#%% make sure you only go as far as specific date for all wx stations for current water year
qaqc_upToDate = (datetime.now()- dtime.timedelta(days=7)).strftime("%Y-%m-%d %H") + ':00:00' # todays date rounded to nearest hour
sql_file_idx_latest = int(np.flatnonzero(sql_file['DateTime'] == qaqc_upToDate)[0]) if np.flatnonzero(sql_file['DateTime'] == qaqc_upToDate).size > 0 else 0 # today's date - 7 days
# sql_file_idx_latest = int(np.flatnonzero(sql_file['DateTime'] == '2024-02-19 06:00:00')[0]) if np.flatnonzero(sql_file['DateTime'] == '2024-02-19 06:00:00').size > 0 else 0 # arbitrary date
# if sql_file_idx_latest is null, this means the wx station transmission
# has stopped between the last time this code ran and the qaqc_upToDate
# date (i.e. over the last week))
if sql_file_idx_latest != 0:
sql_file = sql_file[:sql_file_idx_latest]
# sql_file = sql_file[sql_file_idx_latest:]
#%% Make sure there is no gap in datetime (all dates are consecutive) and place
# nans in all other values if any gaps are identified
df_dt = pd.Series.to_frame(sql_file['DateTime'])
sql_file = sql_file.set_index('DateTime').asfreq('h').reset_index()
dt_sql = pd.to_datetime(sql_file['DateTime'])
# get your indices for each water year
if 10 <= datetime.now().month and datetime.now().month <= 12:
yr_range = np.arange(dt_sql[0].year, datetime.now().year+1) # find min and max years
else:
yr_range = np.arange(dt_sql[0].year, datetime.now().year) # find min and max years
if wx_stations_name[l] == 'claytonfalls':
delete = [np.flatnonzero(yr_range == 2014),np.flatnonzero(yr_range == 2016),np.flatnonzero(yr_range == 2018)]
yr_range = np.delete(yr_range, delete)
if wx_stations_name[l] == 'mountarrowsmith':
yr_range = np.delete(yr_range, np.flatnonzero(yr_range == 2016))
if wx_stations_name[l] == 'tetrahedron':
yr_range = np.delete(yr_range, np.flatnonzero(yr_range == 2016))
if wx_stations_name[l] == 'lowercain':
yr_range = np.delete(yr_range, np.flatnonzero(yr_range == 2018))
if wx_stations_name[l] == 'mountmaya':
yr_range = np.delete(yr_range, np.flatnonzero(yr_range == 2013))
if wx_stations_name[l] == 'eastbuxton':
yr_range = np.delete(yr_range, np.flatnonzero(yr_range == 2014))
qaqc_arr_final = [] # set up the variable
# start the qaqc process for each water year at specific weather station
# only run for last water year to save memory on server
for k in range(len(yr_range)-1,len(yr_range)):
print('## Cleaning data for year: %d-%d ##' %(yr_range[k],yr_range[k]+1))
# find indices of water years
start_yr_sql = qaqc_functions.nearest(dt_sql, datetime(yr_range[k], 10, 1))
end_yr_sql = qaqc_functions.nearest(dt_sql, datetime(yr_range[k]+1, 9, 30, 23, 00, 00))
# select data for the whole water year based on datetime object
dt_yr = np.concatenate(([np.where(dt_sql == start_yr_sql), np.where(dt_sql == end_yr_sql)]))
# store for plotting (if needed)
raw = sql_file[var].iloc[np.arange(dt_yr[0].item(),dt_yr[1].item()+1)]
qaqc_arr = sql_file.copy() # array to QAQC
#%% Fix jumps in precipitation data from sudden drainage events during site visits
data = qaqc_arr[var].iloc[np.arange(dt_yr[0].item(),dt_yr[1].item()+1)]
flag = 5
qaqc_5, flags_5 = qaqc_functions.precip_drainage_fix(qaqc_arr[var], data, flag, dt_yr, qaqc_arr['DateTime'], wx_stations_name[l], yr_range[k]+1)
qaqc_arr[var] = qaqc_5
#%% Apply static range test (remove values where difference is > than value)
# Maximum value between each step: 10 degrees
data = qaqc_arr[var].iloc[np.arange(dt_yr[0].item(),dt_yr[1].item()+1)]
flag = 1
step_size = 5 # in mm
qaqc_1, flags_1 = qaqc_functions.static_range_test(qaqc_arr[var], data, flag, step_size)
qaqc_arr[var] = qaqc_1
#%% Bring timeseries back to 0 at start of water year
data = qaqc_arr[var].iloc[np.arange(dt_yr[0].item(),dt_yr[1].item()+1)]
flag = 3
qaqc_3, flags_3 = qaqc_functions.reset_zero_watyr(qaqc_arr[var], data, flag)
qaqc_arr[var] = qaqc_3
#%% Remove outliers based on mean and std using a rolling window for each
data = qaqc_arr[var].iloc[np.arange(dt_yr[0].item(),dt_yr[1].item()+1)]
flag = 4
st_dev = 3 # specify how many times you want to multiple st_dev (good starting point is 3; 1 is too harsh)
qaqc_4, flags_4 = qaqc_functions.mean_rolling_month_window(qaqc_arr[var], flag, dt_sql, st_dev)
qaqc_arr[var] = qaqc_4
#%% Remove non-sensical zero values if they are not bounded by a
# specific threshold for i-1 and i+1 (e.g. -3 to 3). This removes
# false zeros in the data
data = qaqc_arr[var].iloc[np.arange(dt_yr[0].item(),dt_yr[1].item()+1)]
flag = 6
false_zero_threshold = 15 # in mm
qaqc_6, flags_6 = qaqc_functions.false_zero_removal(qaqc_arr[var], data, flag, false_zero_threshold)
qaqc_arr[var] = qaqc_6
#%% Remove all negative values (non-sensical)
data = qaqc_arr[var].iloc[np.arange(dt_yr[0].item(),dt_yr[1].item()+1)]
flag = 2
qaqc_2, flags_2 = qaqc_functions.negtozero(qaqc_arr[var], data, flag)
qaqc_arr[var] = qaqc_2
#%% one more pass to correct remaining outliers using the step size
# and different levels until it's all 'shaved off'
data = qaqc_arr[var].iloc[np.arange(dt_yr[0].item(),dt_yr[1].item()+1)]
flag = 7
step_sizes = [7,6] # in mm
qaqc_7, flags_7 = qaqc_functions.static_range_multiple(qaqc_arr[var], data, flag, step_sizes)
qaqc_arr[var] = qaqc_7
#%% Fix decreasing trends in PC_Raw_Pipe data which can
# be linked to evaporation
data = qaqc_arr[var].iloc[np.arange(dt_yr[0].item(),dt_yr[1].item()+1)]
flag = 9
qaqc_9, flags_9 = qaqc_functions.fix_pc_pipe_evaporation(qaqc_arr[var], data, flag)
qaqc_arr[var] = qaqc_9
#%% Interpolate nans with method='linear' using pandas.DataFrame.interpolate
# First, identify gaps larger than 3 hours (which should not be interpolated)
data = qaqc_arr[var].iloc[np.arange(dt_yr[0].item(),dt_yr[1].item()+1)]
flag = 8
max_hours = 3
qaqc_8, flags_8 = qaqc_functions.interpolate_qaqc(qaqc_arr[var], data, flag, max_hours)
qaqc_arr[var] = qaqc_8
#%% merge flags together into large array, with comma separating multiple
# flags for each row if these exist
flags = pd.concat([flags_1,flags_2,flags_3,flags_4,flags_5,flags_6,flags_7,flags_8,flags_9],axis=1)
qaqc_arr[var_flags] = flags.apply(qaqc_functions.merge_row, axis=1)
#%% append to qaqc_arr_final after every k iteration
qaqc_arr_final.append(qaqc_arr.iloc[np.arange(dt_yr[0].item(),dt_yr[1].item()+1)])
#%% push qaqced variable to SQL database
# as above, skip iteration if all air_temp is null
if sql_file[var].isnull().all() or dt_yr.size == 0:
continue
# otherwise, if data (most stations), keep running
else:
print('# Writing newly qaqced data to SQL database #')
qaqc_arr_final = pd.concat(qaqc_arr_final) # concatenate lists
sql_qaqc_name = 'qaqc_' + wx_stations_name[l]
qaqced_array = pd.concat([qaqc_arr_final['DateTime'],qaqc_arr_final[var],qaqc_arr_final[var_flags]],axis=1)
qaqced_array = qaqced_array.replace(np.nan, None) # replace nans by None for sql database
# import current qaqc sql db and find columns matching the qaqc variable here
existing_qaqc_sql = pd.read_sql('SELECT * FROM %s' %sql_qaqc_name, engine)
#%% write data to sql database using brute approach (re-write whole db - quicker on laptop but gets instantly killed on remote desktop)
# colnames = existing_qaqc_sql.columns
# col_positions = [i for i, s in enumerate(colnames) if var in s]
# existing_qaqc_sql[colnames[col_positions]] = pd.concat([qaqced_array[var],qaqced_array[var_flags]],axis=1)
# # make sure you keep the same variable dtypes when pushing new df to sql
# metadata_map = MetaData(bind=engine)
# table_map = Table(sql_qaqc_name, metadata, autoload_with=engine)
# # map SQLAlchemy types to pandas dtypes
# type_mapping = {
# 'DATETIME': 'datetime64[ns]',
# 'DOUBLE': 'float64',
# 'FLOAT': 'float64',
# 'TEXT': 'object',
# }
# # map the correct dytpe in df to sql and push to sql db
# existing_qaqc_sql = existing_qaqc_sql.astype({col.name: type_mapping.get(str(col.type).upper(), 'object') for col in table_map.columns if col.name in existing_qaqc_sql.columns})
# existing_qaqc_sql[var] = existing_qaqc_sql[var].astype('float64')
# existing_qaqc_sql[var_flags] = existing_qaqc_sql[var_flags].astype('object')
# existing_qaqc_sql.to_sql(name='%s' %sql_qaqc_name, con=engine, if_exists = 'replace', index=False)
# # make sure you assign 'DateTime' column as the primary column
# with engine.connect() as con:
# con.execute('ALTER TABLE `qaqc_%s`' %wx_stations_name[l] + ' ADD PRIMARY KEY (`DateTime`);')
#%% write data to sql database using soft approach (re-write only idx and vars needed - very slow on laptop but fast on remote desktop)
qaqc_idx_sql = existing_qaqc_sql[var_flags].notna()[::-1].idxmax()+1 # find latest valid value in sql database and fill after that
dt_qaqc_idx_sql = existing_qaqc_sql['DateTime'].iloc[qaqc_idx_sql] # find matching datetime object in the qaqc db
qaqc_idx_sql = (np.flatnonzero(qaqced_array['DateTime'] == dt_qaqc_idx_sql)[0]) if np.flatnonzero(qaqced_array['DateTime'] == dt_qaqc_idx_sql).size > 0 else 0
print('Amount of days to push to qaqc database: %d' %(int((qaqced_array.index[-1] - qaqced_array.index[qaqc_idx_sql])/24)))
column_mapping = {
'DateTime': 'DateTime',
var: var,
var_flags: var_flags
}
update_records(engine, metadata, 'qaqc_' + wx_stations_name[l], qaqced_array[qaqc_idx_sql:], column_mapping)
# skip iteration if the weather station has stopped transmitting for some reasons
else:
# if transmission has stopped since last week, skip this station
print('Careful: %s has stopped transmitting and will not be qaqced until back on live' %(sql_name))
continue
# Close the sql connection after the loop has completed
print('## Finished Precip_pipe qaqc for all stations ##')
engine.dispose()