-
Notifications
You must be signed in to change notification settings - Fork 1
/
PkWindSpeed_QAQC.py
205 lines (171 loc) · 11.6 KB
/
PkWindSpeed_QAQC.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# This code attempts to QA/QC the Peak Wind Speed data in a full year for all
# wx stations and all years
# Written and modified by Julien Bodart (VIU) - 15.07.2024
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
import datetime as dtime
from sqlalchemy import create_engine, MetaData, Table
#%% import support functions
import qaqc_functions
from push_sql_function import get_engine, get_metadata, update_records
from qaqc_stations_list import *
#%% establish a connection with MySQL database 'viuhydro_wx_data_v2'
import config
engine = config.main_sql()
metadata = get_metadata(engine)
#%% create list of stations to qaqc for this variable
var = 'Pk_Wind_Speed'
var_flags = var +'_flags'
wx_stations = {name: globals()[name] for name in globals() if name.startswith('clean_')}
wx_stations = [station for station, variables in wx_stations.items() if var in variables]
wx_stations_name = list(map(lambda st: str.replace(st, 'clean_', ''), wx_stations)) # remove 'clean_' for csv export
wx_stations_name_cap = [wx_name.capitalize() for wx_name in wx_stations_name] # capitalise station name
#%% Loop over each station at a time and clean up the Pk Wind Speed variable
for l in range(len(wx_stations_name)):
sql_database = wx_stations_name[l]
sql_name = wx_stations_name_cap[l]
print('###### Cleaning Peak Wind Speed data for station: %s ######' %(sql_name))
#%% import current data from "clean"
sql_file = pd.read_sql(sql="SELECT * FROM clean_" + sql_database, con = engine)
#%% time in rennell and datlamen is not rounded to nearest hour
# round time in the clean db data so it matches the qaqc db
if wx_stations_name[l] == 'rennellpass' or wx_stations_name[l] == 'datlamen':
sql_file = sql_file.copy()
sql_file ['DateTime'] = pd.to_datetime(sql_file['DateTime'])
sql_file['DateTime'] = sql_file['DateTime'].dt.floor('h')
deltas = sql_file['DateTime'].diff()[1:]
same_vals = deltas[deltas < timedelta(hours=1)]
sql_file = sql_file.drop(same_vals.index)
sql_file = sql_file.set_index('DateTime').asfreq('h').reset_index() # make sure records are continuous every hour
# else if not rennell or datlamen (i.e. for all other stations), make sure
# time is consecutively increasing by one hour, if not add records and place nans
else:
sql_file = sql_file.set_index('DateTime').asfreq('h').reset_index() # make sure records are continuous every hour
#%% make sure you only go as far as specific date for all wx stations for current water year
# Machmell went offline in Feb 2023
if wx_stations_name[l] == 'machmell':
sql_file_idx_latest = int(np.flatnonzero(sql_file['DateTime'] == '2023-02-12 11:00:00')[0]) if np.flatnonzero(sql_file['DateTime'] == '2023-02-12 11:00:00').size > 0 else 0 # arbitrary date
sql_file = sql_file[:sql_file_idx_latest+1]
# for all other stations, qaqc data up to last week
else:
qaqc_upToDate = (datetime.now()- dtime.timedelta(days=7)).strftime("%Y-%m-%d %H") + ':00:00' # todays date rounded to nearest hour
sql_file_idx_latest = int(np.flatnonzero(sql_file['DateTime'] == qaqc_upToDate)[0]) if np.flatnonzero(sql_file['DateTime'] == qaqc_upToDate).size > 0 else 0 # today's date - 7 days
# sql_file_idx_latest = int(np.flatnonzero(sql_file['DateTime'] == '2024-02-19 06:00:00')[0]) if np.flatnonzero(sql_file['DateTime'] == '2024-02-19 06:00:00').size > 0 else 0 # arbitrary date
# if sql_file_idx_latest is null, this means the wx station transmission
# has stopped between the last time this code ran and the qaqc_upToDate
# date (i.e. over the last week))
if sql_file_idx_latest != 0:
sql_file = sql_file[:sql_file_idx_latest]
# sql_file = sql_file[sql_file_idx_latest:]
#%% Make sure there is no gap in datetime (all dates are consecutive) and place
# nans in all other values if any gaps are identified
df_dt = pd.Series.to_frame(sql_file['DateTime'])
sql_file = sql_file.set_index('DateTime').asfreq('h').reset_index()
dt_sql = pd.to_datetime(sql_file['DateTime'])
# get your indices for each water year
if 10 <= datetime.now().month and datetime.now().month <= 12:
yr_range = np.arange(dt_sql[0].year, datetime.now().year+1) # find min and max years
elif wx_stations_name[l] == 'machmell':
yr_range = np.arange(dt_sql[0].year, datetime.now().year-1) # find min and max years
elif wx_stations_name[l] == 'placeglacier' and dt_sql[0].year == datetime.now().year:
yr_range = np.arange(2023, datetime.now().year) # specify this for placeglacier's first year
else:
yr_range = np.arange(dt_sql[0].year, datetime.now().year) # find min and max years
if wx_stations_name[l] == 'machmell':
yr_range = np.delete(yr_range, np.flatnonzero(yr_range == 2022))
qaqc_arr_final = [] # set up the variable
# start the qaqc process for each water year at specific weather station
# only run for last water year to save memory on server
for k in range(len(yr_range)-1,len(yr_range)):
print('## Cleaning data for year: %d-%d ##' %(yr_range[k],yr_range[k]+1))
# find indices of water years
start_yr_sql = qaqc_functions.nearest(dt_sql, datetime(yr_range[k], 10, 1))
end_yr_sql = qaqc_functions.nearest(dt_sql, datetime(yr_range[k]+1, 9, 30, 23, 00, 00))
# select data for the whole water year based on datetime object
dt_yr = np.concatenate(([np.where(dt_sql == start_yr_sql), np.where(dt_sql == end_yr_sql)]))
# store for plotting (if needed)
raw = sql_file[var].iloc[np.arange(dt_yr[0].item(),dt_yr[1].item()+1)]
qaqc_arr = sql_file.copy() # array to QAQC
#%% Remove values above 140 or below 0 km/h threshold
# above 140 km/h
data = qaqc_arr[var].iloc[np.arange(dt_yr[0].item(),dt_yr[1].item()+1)]
flag = 2
threshold = 140 # in km/h
qaqc_2, flags_2a = qaqc_functions.reset_max_threshold(qaqc_arr[var], data, flag, threshold)
qaqc_arr[var] = qaqc_2
# below 0 km/h
data = qaqc_arr[var].iloc[np.arange(dt_yr[0].item(),dt_yr[1].item()+1)]
flag = 2
threshold = 0 # in km/h
qaqc_2, flags_2b = qaqc_functions.reset_min_threshold(qaqc_arr[var], data, flag, threshold)
qaqc_arr[var] = qaqc_2
# merge flags
flags2 = pd.concat([flags_2a,flags_2b],axis=1) # merge so you don't lose previous flags 2
flags_2 = flags2.apply(qaqc_functions.merge_row, axis=1).astype(int) # merge so you don't lose previous flags 2
#%% Remove duplicate consecutive values over specific window
data = qaqc_arr[var].iloc[np.arange(dt_yr[0].item(),dt_yr[1].item()+1)]
flag = 3
window = 50 # hours
qaqc_3, flags_3 = qaqc_functions.duplicates_window_WindDir(qaqc_arr[var], data, flag, window)
qaqc_arr[var] = qaqc_3
#%% merge flags together into large array, with comma separating multiple
# flags for each row if these exist
flags = pd.concat([flags_2,flags_3],axis=1)
qaqc_arr[var_flags] = flags.apply(qaqc_functions.merge_row, axis=1)
#%% append to qaqc_arr_final after every k iteration
qaqc_arr_final.append(qaqc_arr.iloc[np.arange(dt_yr[0].item(),dt_yr[1].item()+1)])
#%% push qaqced variable to SQL database
# as above, skip iteration if all pk_wind_speed is null
if sql_file[var].isnull().all() or dt_yr.size == 0:
continue
# otherwise, if data (most stations), keep running
else:
print('# Writing newly qaqced data to SQL database #')
qaqc_arr_final = pd.concat(qaqc_arr_final) # concatenate lists
sql_qaqc_name = 'qaqc_' + wx_stations_name[l]
qaqced_array = pd.concat([qaqc_arr_final['DateTime'],qaqc_arr_final[var],qaqc_arr_final[var_flags]],axis=1)
qaqced_array = qaqced_array.replace(np.nan, None) # replace nans by None for sql database
# import current qaqc sql db and find columns matching the qaqc variable here
existing_qaqc_sql = pd.read_sql('SELECT * FROM %s' %sql_qaqc_name, engine)
#%% write data to sql database using brute approach (re-write whole db - quicker on laptop but gets instantly killed on remote desktop)
# colnames = existing_qaqc_sql.columns
# col_positions = [i for i, s in enumerate(colnames) if var in s]
# existing_qaqc_sql[colnames[col_positions]] = pd.concat([qaqced_array[var],qaqced_array[var_flags]],axis=1)
# # make sure you keep the same variable dtypes when pushing new df to sql
# metadata_map = MetaData(bind=engine)
# table_map = Table(sql_qaqc_name, metadata, autoload_with=engine)
# # map SQLAlchemy types to pandas dtypes
# type_mapping = {
# 'DATETIME': 'datetime64[ns]',
# 'DOUBLE': 'float64',
# 'FLOAT': 'float64',
# 'TEXT': 'object',
# }
# # map the correct dytpe in df to sql and push to sql db
# existing_qaqc_sql = existing_qaqc_sql.astype({col.name: type_mapping.get(str(col.type).upper(), 'object') for col in table_map.columns if col.name in existing_qaqc_sql.columns})
# existing_qaqc_sql[var] = existing_qaqc_sql[var].astype('float64')
# existing_qaqc_sql[var_flags] = existing_qaqc_sql[var_flags].astype('object')
# existing_qaqc_sql.to_sql(name='%s' %sql_qaqc_name, con=engine, if_exists = 'replace', index=False)
# # make sure you assign 'DateTime' column as the primary column
# with engine.connect() as con:
# con.execute('ALTER TABLE `qaqc_%s`' %wx_stations_name[l] + ' ADD PRIMARY KEY (`DateTime`);')
#%% write data to sql database using soft approach (re-write only idx and vars needed - very slow on laptop but fast on remote desktop)
qaqc_idx_sql = existing_qaqc_sql[var_flags].notna()[::-1].idxmax()+1 # find latest valid value in sql database and fill after that
dt_qaqc_idx_sql = existing_qaqc_sql['DateTime'].iloc[qaqc_idx_sql] # find matching datetime object in the qaqc db
qaqc_idx_sql = (np.flatnonzero(qaqced_array['DateTime'] == dt_qaqc_idx_sql)[0]) if np.flatnonzero(qaqced_array['DateTime'] == dt_qaqc_idx_sql).size > 0 else 0
print('Amount of days to push to qaqc database: %d' %(int((qaqced_array.index[-1] - qaqced_array.index[qaqc_idx_sql])/24)))
column_mapping = {
'DateTime': 'DateTime',
var: var,
var_flags: var_flags
}
update_records(engine, metadata, 'qaqc_' + wx_stations_name[l], qaqced_array[qaqc_idx_sql:], column_mapping)
# skip iteration if the weather station has stopped transmitting for some reasons
else:
# if transmission has stopped since last week, skip this station
print('Careful: %s has stopped transmitting and will not be qaqced until back on live' %(sql_name))
continue
# Close the sql connection after the loop has completed
print('## Finished PkWindSpeed qaqc for all stations ##')
engine.dispose()