-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathOneArgo.py
1589 lines (1504 loc) · 82.8 KB
/
OneArgo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
# Argo.py
#------------------------------------------------------------------------------
# Created By: Savannah Stephenson and Hartmut Frenzel
# Creation Date: 07/26/2024
# Version: 0.1 (alpha)
#------------------------------------------------------------------------------
""" The Argo class contains the primary functions for downloading and handling
data gathered from the Argo Global Data Assebly Centers.
"""
#------------------------------------------------------------------------------
#
#
## Standard Imports
from datetime import datetime, timedelta, timezone
import shutil
import gzip
## Third Party Imports
from pathlib import Path
import requests
import numpy as np
import matplotlib.path as mpltPath
from matplotlib.ticker import FixedLocator
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import pandas as pd
import cartopy.crs as ccrs
import cartopy.feature as cf
from cartopy.mpl.gridliner import LONGITUDE_FORMATTER, LATITUDE_FORMATTER
import netCDF4
# Local Imports
from OneArgoSettings import DownloadSettings, SourceSettings
class Argo:
""" The Argo class contains the primary functions for downloading and handling
data gathered from GDAC including a constructor, select_profiels(),
trajectories(), load_float_data(), and sections().
"""
#######################################################################
# Constructor
#######################################################################
def __init__(self, user_settings: str = None) -> None:
""" The Argo constructor downloads the index files form GDAC and
stores them in the proper directories defined in the
DownloadSettings class. It then constructs thee dataframes
from the argo_synthetic-profile_index.txt file and the
ar_index_global_prof.txt file for use in class function
calls. Two of the dataframes are a reflection of the index
files, the third dataframe is a two column frame with
float ids and if they are a bgc float or not.
:param: user_settings : str - An optional parameter that will be used
to initialize the *Settings classes if passed. This should be the
full filepath.
NOTE: If the user has their own settings configuration and has
set keep_index_in_memory to false then the dataframes will be
removed from memory at the end of construction and will be
reloaded with following Argo function calls, meaning that
functions will take longer but occupy less memory if this
option is set to false.
"""
self.download_settings = DownloadSettings(user_settings)
self.source_settings = SourceSettings(user_settings)
if self.download_settings.verbose:
print('Starting initialize process...')
if self.download_settings.verbose:
print(f'Your current download settings are: {self.download_settings}')
if self.download_settings.verbose:
print(f'Your current source settings are: {self.source_settings}')
# Check for and create subdirectories if needed
if self.download_settings.verbose:
print('Checking for subdirectories...')
self.__initialize_subdirectories()
# Download files from GDAC to Index directory
if self.download_settings.verbose:
print('\nDownloading index files...')
for file in self.download_settings.index_files:
self.__download_file(file)
# Load the index files into dataframes
if self.download_settings.verbose:
print('\nTransferring index files into dataframes...')
self.sprof_index = self.__load_sprof_dataframe()
self.prof_index = self.__load_prof_dataframe()
# Add column noting if a profile is also in the sprof_index, which is true for bgc floats
if self.download_settings.verbose:
print('Marking bgc floats in prof_index dataframe...')
self.__mark_bgcs_in_prof()
# Create float_stats reference index for use in select profiles
if self.download_settings.verbose:
print('Creating float_stats dataframe...')
self.float_stats = self.__load_float_stats()
# Print number of floats
if self.download_settings.verbose:
self.__display_floats()
print('Initialization is finished\n\n')
if not self.download_settings.keep_index_in_memory:
if self.download_settings.verbose:
print('Removing dataframes from memory...')
del self.sprof_index
del self.prof_index
#######################################################################
# Public Functions
#######################################################################
def select_profiles(self, lon_lim: list = [-180, 180], lat_lim: list = [-90, 90],
start_date: str = '1995-01-01', end_date: str = None, **kwargs)-> dict:
""" select_profiles is a public function of the Argo class that returns a
dictionary if float IDs and profile lists that match the passed criteria.
:param: lon_lim : list - Longitude limits
:param: lat_lim : list - Latitude limits
:param: start_date : str - A UTC date in YYYY-MM-DD format.
:param: end_date : str - An optional UTC date in YYYY-MM-DD format.
:param: kargs : keyvalue arguments - Optional key argument values for
further filtering of the float profiles returned by the function.
:return: narrowed_profiles : dict - A dictionary with float ID
keys corresponding to a list of profiles that match criteria.
NOTE:
The longitude and latitude limits can be entered as either
two element lists, in which case the limits will be interpreted
as maximum and minimum limits tht form a rectangle, or they
can be entered as a longer list in which case each pair of longitude
and latitude values correspond to a vertices of a polygon.
The longitude and latitude limits can be input in any 360 degree
range that encloses all the desired longitude values.
Key/argument value options in progress:
floats=floats[] or float: Select profiles only from these floats that must
match all other criteria
ocean=ocean: Valid choices are 'A' (Atlantic), 'P' (Pacific), and
'I' (Indian). This selection is in addition to the specified
longitude and latitude limits. (To select all floats and
profiles from one ocean basin, leave lon_lim and lat_lim
empty.)
outside='none' or 'time' or 'space' or'both': By default, only float profiles
that are within both the temporal and spatial constraints are
returned ('none'); specify to also maintain profiles outside
the temporal constraints ('time'), spatial constraints
('space'), or both constraints ('both')
type', type: Valid choices are 'bgc' (select BGC floats only),
'phys' (select core and deep floats only),
and 'all' (select all floats that match other criteria).
If type is not specified, but sensors are, then the type will
be set to 'bgc' if sensors other than PRES, PSAL, TEMP, or CNDC
are specified.
In all other cases the default type is DownloadSettings.float_type,
which is set in the Argo constructor, you can also set the float_type
as a different value if passing a configuration file to the Argo constructor.
would like to implement before end of project/easier ones
sensor='sensor' or [sensors], SENSOR_TYPE: This option allows the selection by
sensor type. Available as of 2024: PRES, PSAL, TEMP, DOXY, BBP,
BBP470, BBP532, BBP700, TURBIDITY, CP, CP660, CHLA, CDOM,
NITRATE, BISULFIDE, PH_IN_SITU_TOTAL, DOWN_IRRADIANCE,
DOWN_IRRADIANCE380, DOWN_IRRADIANCE412, DOWN_IRRADIANCE443,
DOWN_IRRADIANCE490, DOWN_IRRADIANCE555, DOWN_IRRADIANCE670,
UP_RADIANCE, UP_RADIANCE412, UP_RADIANCE443, UP_RADIANCE490,
UP_RADIANCE555, DOWNWELLING_PAR, CNDC, DOXY2, DOXY3, BBP700_2
Multiple sensors can be entered as a list, e.g.: ['DOXY';'NITRATE']
dac=dac: Select by Data Assimilation Center responsible for the floats.
A single DAC can be entered as a string (e.g.: 'aoml'),
multiple DACs can be entered as a list of strings (e.g.:
['meds';'incois'].
Valid values as of 2024 are any: {'aoml'; 'bodc'; 'coriolis'; ...
'csio'; 'csiro'; 'incois'; 'jma'; 'kma'; 'kordi'; 'meds'}
"""
if self.download_settings.verbose:
print('Starting select_profiles...')
self.epsilon = 1e-3
self.lon_lim = lon_lim
self.lat_lim = lat_lim
self.start_date = start_date
self.end_date = end_date
self.outside = kwargs.get('outside')
self.float_type = kwargs.get('type') if kwargs.get('type') is not None \
else self.download_settings.float_type
self.float_ids = kwargs.get('floats')
self.ocean = kwargs.get('ocean')
self.sensor = kwargs.get('sensor')
if self.download_settings.verbose:
print('Validating parameters...')
self.__validate_lon_lat_limits()
self.__validate_start_end_dates()
if self.outside:
self.__validate_outside_kwarg()
if self.float_type:
self.__validate_type_kwarg()
if self.ocean:
self.__validate_ocean_kwarg()
# if self.sensor : self.__validate_sensor_kwarg()
# Load correct dataframes according to self.float_type and self.float_ids
# we set self.selected_from_sprof_index and self.selected_from_prof_index
# in this function which will be used in __narrow_profiles_by_criteria
self.__prepare_selection()
# Narrow down float profiles and save in dictionary
narrowed_profiles = self.__narrow_profiles_by_criteria()
if not self.download_settings.keep_index_in_memory:
if self.download_settings.verbose:
print('Removing dataframes from memory...')
del self.sprof_index
del self.prof_index
del self.selection_frame
if self.download_settings.verbose:
print(f'Floats Selected: {narrowed_profiles.keys()}\n')
return narrowed_profiles
def trajectories(self, floats: int | list | dict, visible: bool = True,
save_to: str = None)-> None:
""" This function plots the trajectories of one or more specified float(s)
:param: floats : int | list | dict - Floats to plot.
:param: visible : bool - A boolean value determining if the trajectories
plot is shown to the user through a popup window.
:param: save_to : str - A path to a folder where the user would like
to save the trajectories plot(s). The path must exist.
The file name is automatically generated.
"""
# Validate save_to file path
if save_to is not None:
save_to = Path(save_to)
self.__validate_plot_save_path(Path(save_to))
# Check that dataframes are loaded into memory
if not self.download_settings.keep_index_in_memory:
self.sprof_index = self.__load_sprof_dataframe()
self.prof_index = self.__load_prof_dataframe()
# Validate passed floats
self.float_ids = floats
self.__validate_floats_kwarg()
# Pull rows/profiles for passed floats
floats_profiles = self.__filter_by_floats()
# If keep index in memory is false remove other dataframes
if not self.download_settings.keep_index_in_memory:
if self.download_settings.verbose:
print('Removing dataframes from memory...')
del self.sprof_index
del self.prof_index
# Set up basic graph size
fig = plt.figure(figsize=(10, 10))
# Define the median longitude for the graph to be centered on
lons = floats_profiles['longitude'].dropna().values.tolist()
sorted_lons = np.sort(lons)
median_lon = np.nanmedian(sorted_lons)
ax = fig.add_subplot(1, 1, 1, projection=ccrs.PlateCarree(central_longitude=median_lon))
# Add landmasses and coastlines
ax.add_feature(cf.COASTLINE, linewidth=1.5)
ax.add_feature(cf.LAND, zorder=2, edgecolor='k', facecolor='lightgray')
# Plot trajectories of passed floats with colorblind friendly pallet
colors = ("#56B4E9", "#009E73", "#F0E442", "#0072B2",
"#CC79A7", "#D55E00", "#E69F00", "#000000")
for i, float_id in enumerate(self.float_ids):
specific_float_profiles = floats_profiles[floats_profiles['wmoid'] == float_id]
ax.plot(specific_float_profiles['longitude'].values,
specific_float_profiles['latitude'].values,
marker='.', alpha=0.7, linestyle='-', linewidth=2, transform=ccrs.Geodetic(),
label=f'Float {float_id}', color=colors[i % len(colors)])
# Set graph limits based on passed points
self.__set_graph_limits(ax, 'x')
self.__set_graph_limits(ax, 'y')
# Add grid lines
self.__add_grid_lines(ax)
# Add Legend outside of the main plot
if len(self.float_ids) > 1:
plt.legend(bbox_to_anchor=(1.05, 0.5), loc='center left')
# Setting Title
if len(self.float_ids) == 1:
ax.set_title(f'Trajectory of {self.float_ids[0]}', fontsize=18, fontweight='bold')
elif len(self.float_ids) < 4:
ax.set_title(f'Trajectories of {self.float_ids}', fontsize=18, fontweight='bold')
else:
ax.set_title('Trajectories of Selected Floats', fontsize=18, fontweight='bold')
plt.tight_layout();
# Saving Graph
if save_to is not None:
if len(self.float_ids) == 1:
save_path = save_to.joinpath(f'trajectories_{self.float_ids}[0]')
else:
save_path = save_to.joinpath(f'trajectories_plot_{len(self.float_ids)}_floats')
plt.savefig(f'{save_path}')
# Displaying graph
if visible:
plt.show()
def load_float_data(self, floats: int | list | dict, variables: str | list = None)-> pd:
""" A function to load float data into memory.
:param: floats : int | list | dict - A float or list of floats to
load data from. Or a dictionary specifying floats and profiles
to read from the .nc file.
:param: variables : str | list - An optional parameter to list variables
that the user would like included in the dataframe. If the variable is not
in the float passed then only the surface level of the profile will be included.
:return: float_data : pd - A dataframe with requested float data.
"""
# Check that index files are in memory
if not self.download_settings.keep_index_in_memory:
self.sprof_index = self.__load_sprof_dataframe()
self.prof_index = self.__load_prof_dataframe()
# Check that passed float is inside of the dataframes
self.float_ids = floats
self.__validate_floats_kwarg()
# Validate passed variables
self.float_variables = variables
if self.float_variables:
self.__validate_float_variables_arg()
# Check if the user has passed only phys float variables
if self.float_variables is not None:
phys_variables = ['TEMP', 'PSAL', 'PRES', 'CNDC']
only_phys = all(x in phys_variables for x in self.float_variables)
else:
only_phys = False
# Download .nc files for passed floats
files = []
for wmoid in self.float_ids:
# If the float is a phys float, or if the user has provided no variables
# or only phys variables then then use the corresponding prof file,
# unless floats and profiles were specified with a dictionary
# (the profile indices refer to the sprof index file, so Sprof.nc files
# must be loaded for consistency)
if ((not self.float_stats.loc[self.float_stats['wmoid'] == wmoid, 'is_bgc'].values[0])
or (((self.float_variables is None) or (only_phys)) and not type(floats) is dict)):
file_name = f'{wmoid}_prof.nc'
files.append(file_name)
# If the float is a bgc float it will have a corresponding Sprof file
else:
file_name = f'{wmoid}_Sprof.nc'
files.append(file_name)
# Download file
self.__download_file(file_name)
# Read from nc files into dataframe
float_data_frame = self.__fill_float_data_dataframe(files)
return float_data_frame
def sections(self, float_data: pd, variables: str | list, visible: bool = True,
save_to: str = None)-> None:
""" A function to graph section plots for the passed variables using data
from the passed float_data dataframe.
:param: float_data : pd - A dataframe created from load_float_data
that contains data pulled from .nc files.
:param: variables : str or list - The variable(s) the user would
like section plots of.
:param: visible : bool - A boolean value determining if the section
plot is shown to the user through a popup window.
:param: save_to : str - A path to a folder where the
user would like to save the section plot(s). The folder must exist.
The filename is automatically generated.
"""
# Validate passed variables
self.float_variables = variables
self.__validate_float_variables_and_permutations_arg()
# Validate passed dataframe
self.float_data = float_data
self.__validate_float_data_dataframe()
# Validate save_to file path
if save_to is not None:
save_to = Path(save_to)
self.__validate_plot_save_path(save_to)
# Determine Unique WMOID
unique_float_ids = self.float_data['WMOID'].unique()
# Make one plot for each float/variable combination
for float_id in unique_float_ids:
filtered_df = self.float_data[self.float_data['WMOID'] == float_id]
# Getting unique profile values for the current float
unique_values = filtered_df['CYCLE_NUMBER'].unique()
# Check that the float has more than one profile (more than one cycle number)
if len(unique_values) == len(filtered_df):
if self.download_settings.verbose:
print(f'Float {float_id} has only one profile, skipping this float...')
continue
if self.download_settings.verbose:
print(f'Generating section plots for float {float_id}...')
for variable in self.float_variables:
# Pulling column for current float and variable
float_variable_data = filtered_df[variable]
# Check that the float actually has data for the passed variable
if float_variable_data.isna().all():
if self.download_settings.verbose:
print(f'Float {float_id} has no data for variable {variable}, ' +
'skipping plot...')
continue
# Otherwise plot the section
if self.download_settings.verbose:
print(f'Generating {variable} section plot for float {float_id}...')
self.__plot_section(self.float_data, float_id, variable, visible, save_to)
#######################################################################
# Private Functions
#######################################################################
def __initialize_subdirectories(self) -> None:
""" A function that checks for and creates the necessary folders as
listed in the download settings sub_dir list.
"""
for directory in self.download_settings.sub_dirs:
directory_path = self.download_settings.base_dir.joinpath(directory)
if directory_path.exists():
if self.download_settings.verbose:
print(f'The {directory_path} directory already exists')
else:
try:
if self.download_settings.verbose:
print(f'Creating the {directory} directory')
directory_path.mkdir()
except OSError as e:
if self.download_settings.verbose:
print(f'Failed to create the {directory} directory: {e}')
def __download_file(self, file_name: str) -> None:
""" A function to download and save an index file from GDAC sources.
:param: filename : str - The name of the file we are downloading.
"""
if file_name.endswith('.txt'):
directory = Path(self.download_settings.base_dir.joinpath("Index"))
elif file_name.endswith('.nc'):
directory = Path(self.download_settings.base_dir.joinpath("Profiles"))
# Get the expected filepath for the file
file_path = directory.joinpath(file_name)
# Check if the filepath exists
if file_path.exists():
# Check if .txt file needs to be updated
if file_name.endswith('.txt') :
# Check if the settings allow for updates of index files
if self.download_settings.update == 0:
if self.download_settings.verbose:
print('The download settings have update set to 0, ' +
'indicating index files will not be updated.')
else:
last_modified_time = Path(file_path).stat().st_mtime
current_time = datetime.now().timestamp()
seconds_since_modified = current_time - last_modified_time
# Check if the file should be updated
if seconds_since_modified > self.download_settings.update:
if self.download_settings.verbose:
print(f'Updating {file_name}...')
self.__try_download(file_name ,True)
else:
if self.download_settings.verbose:
print(f'{file_name} does not need to be updated yet.')
# Check if .nc file needs to be updated
elif file_name.endswith('.nc'):
# Check if the file should be updated using function
if self.__check_nc_update(file_path, file_name):
if self.download_settings.verbose:
print(f'Updating {file_name}...')
self.__try_download(file_name ,True)
else:
if self.download_settings.verbose:
print(f'{file_name} does not need to be updated yet.')
# if the file doesn't exist then download it
else:
if self.download_settings.verbose:
print(f'{file_name} needs to be downloaded.')
self.__try_download(file_name, False)
def __check_nc_update(self, file_path: Path, file_name: str)-> bool:
""" A function to check if an .nc file needs to be updated.
:param: file_path : Path - The file_path for the .nc file we
are checking for update.
:param: file_name : str - The name of the .nc file.
:return: update_status : bool - A boolean value indicating
that the passed file should be updated.
"""
# Pull float id from file_name
float_id = file_name.split('_')[0]
# Get float's latest update date
if (self.prof_index.loc[self.prof_index['wmoid'] == int(float_id), 'is_bgc'].any()
and file_name.endswith('_prof.nc')):
# Use the prof update date for the bgc float because user didn't pass any bgc sensors
dates_for_float = self.prof_index[self.prof_index['wmoid'] == int(float_id)]
index_update_date = pd.to_datetime( \
dates_for_float['date_update'].drop_duplicates().max())
else:
index_update_date = pd.to_datetime( \
self.float_stats.loc[self.float_stats['wmoid'] == int(float_id),
'date_update'].iloc[0])
# Read DATE_UPDATE from .nc file
nc_file = netCDF4.Dataset(file_path, mode='r')
netcdf_update_date = nc_file.variables['DATE_UPDATE'][:]
nc_file.close()
# Convert the byte strings of file_update_date into a regular string
julian_date_str = b''.join(netcdf_update_date).decode('utf-8')
netcdf_update_date = datetime.strptime(julian_date_str,
'%Y%m%d%H%M%S').replace(tzinfo=timezone.utc)
netcdf_update_date = np.datetime64(netcdf_update_date)
# If the .nc file's update date is less than
# the date in the index file return true
# indicating that the .nc file must be updated
# otherwise return false
return bool(netcdf_update_date < index_update_date)
def __try_download(self, file_name: str, update_status: bool)-> None:
""" A function that attempts to download a file from both GDAC sources.
:param: file_name : str - The name of the file to download
:param: update_status: bool - True if the file exists and we
are trying to update it. False if the file hasn't been
downloaded yet.
"""
if file_name.endswith('.txt'):
directory = Path(self.download_settings.base_dir.joinpath("Index"))
first_save_path = directory.joinpath("".join([file_name, ".gz"]))
second_save_path = directory.joinpath(file_name)
elif file_name.endswith('.nc'):
directory = Path(self.download_settings.base_dir.joinpath("Profiles"))
first_save_path = directory.joinpath(file_name)
second_save_path = None
success = False
iterations = 0
# Determining float id if file is an .nc file
if file_name.endswith('.nc'):
# Extract float id from filename
float_id = file_name.split('_')[0]
# Extract dac for that float id from datafrmae
filtered_df = self.prof_index[self.prof_index['wmoid'] == int(float_id)]
dac = filtered_df['dacs'].iloc[0]
# Add trailing forward slashes for formating
dac = f'{dac}/'
float_id = f'{float_id}/'
while (not success) and (iterations < self.download_settings.max_attempts):
# Try both hosts (preferred one is listed first in SourceSettings)
for host in self.source_settings.hosts:
if file_name.endswith('.txt'):
url = "".join([host, file_name, ".gz"])
elif file_name.endswith('.nc'):
url = "".join([host,'dac/', dac, float_id, file_name])
if self.download_settings.verbose:
print(f'Downloading {file_name} from {url}...')
try:
with requests.get(url, stream=True,
timeout=self.download_settings.timeout) as r:
r.raise_for_status()
with open(first_save_path, 'wb') as f:
r.raw.decode_content = True
shutil.copyfileobj(r.raw, f)
if second_save_path is not None:
# If the file has a second save path it was first downloaded as a .gz file
# so it must be unzipped.
if self.download_settings.verbose:
print(f'Unzipping {file_name}.gz...')
with gzip.open(first_save_path, 'rb') as gz_file:
with open(second_save_path, 'wb') as txt_file:
shutil.copyfileobj(gz_file, txt_file)
# Remove extraneous .gz file
first_save_path.unlink()
success = True
elif file_name.endswith('.nc'):
# Check that the file can be read, only keep download if file can be read
try:
nc_file = netCDF4.Dataset(first_save_path, mode='r')
nc_file.close()
success = True
except OSError:
# The file could not be read
if self.download_settings.verbose:
print(f'{first_save_path} cannot be read; trying again...')
if success:
if self.download_settings.verbose:
print('Success!')
# Exit the loop if download is successful so we don't try additional
# sources for no reason
break
except requests.RequestException as e:
print(f'Error encountered: {e}. Trying next host...')
# Increment Iterations
iterations += 1
# If ultimately nothing could be downloaded
if not success:
if update_status:
print(f'WARNING: Update of {file_name} failed, you are working with outdated data.')
else:
raise OSError('Download failed!' +
f'{file_name} could not be downloaded at this time.')
def __load_sprof_dataframe(self) -> pd:
""" A function to load the sprof index file into a dataframe for easier reference.
"""
file_name = "argo_synthetic-profile_index.txt"
file_path = Path.joinpath(self.download_settings.base_dir, 'Index', file_name)
# There are 8 header lines in both index files
sprof_index = pd.read_csv(file_path, delimiter=',', header=8,
parse_dates=['date','date_update'], date_format='%Y%m%d%H%M%S')
# Parsing out variables in first column: file
dacs = sprof_index['file'].str.split('/').str[0]
sprof_index.insert(1, "dacs", dacs)
wmoid = sprof_index['file'].str.split('/').str[1].astype('int')
sprof_index.insert(0, "wmoid", wmoid)
profile = sprof_index['file'].str.split('_').str[1].str.replace('.nc', '')
sprof_index.insert(2, "profile", profile)
# Splitting the parameters into their own columns
parameters_split = sprof_index['parameters'].str.split()
data_types_split = sprof_index['parameter_data_mode'].apply(list)
# R: raw data, A: adjusted mode (real-time adjusted),
# D: delayed mode quality controlled
data_type_mapping = {np.nan: 0, 'R':1, 'A':2, 'D':3 }
mapped_data_types_split = data_types_split.apply(lambda lst: [data_type_mapping.get(x, 0)
if pd.notna(x) else 0
for x in lst])
# Create a new DataFrame from the split parameters
expanded_df = pd.DataFrame({
'index': sprof_index.index.repeat(parameters_split.str.len()),
'parameter': parameters_split.explode(),
'data_type': mapped_data_types_split.explode()
})
# Pivot the expanded DataFrame to get parameters as columns
# Line here to suppress warning about fillna()
# being depreciated in future versions of pandas:
# with pd.option_context('future.no_silent_downcasting', True):
result_df = expanded_df.pivot(index='index', columns='parameter', \
values='data_type').fillna(0).infer_objects(copy=False).astype('int8')
# Fill in source_settings information based off of sprof index file before removing rows
if self.download_settings.verbose:
print('Filling in source settings information...')
self.source_settings.set_avail_vars(sprof_index)
# Merge the pivoted DataFrame back with the original DataFrame and drop split rows
if self.download_settings.verbose:
print('Marking Parameters with their data mode...')
sprof_index = sprof_index.drop(columns=['parameters', 'parameter_data_mode'])
sprof_index = sprof_index.join(result_df)
# Add profile_index column
sprof_index.sort_values(by=['wmoid', 'date'], inplace=True)
sprof_index.insert(0, "profile_index", 0)
sprof_index['profile_index'] = sprof_index.groupby('wmoid')['date'].cumcount() + 1
return sprof_index
def __load_prof_dataframe(self) -> pd:
""" A function to load the prof index file into a dataframe for easier reference.
"""
file_name = "ar_index_global_prof.txt"
file_path = Path.joinpath(self.download_settings.base_dir, 'Index', file_name)
# There are 8 header lines in this index file
prof_index = pd.read_csv(file_path, delimiter=',', header=8,
parse_dates=['date','date_update'], date_format='%Y%m%d%H%M%S')
# Splitting up parts of the first column
dacs = prof_index['file'].str.split('/').str[0]
prof_index.insert(0, "dacs", dacs)
wmoid = prof_index['file'].str.split('/').str[1].astype('int')
prof_index.insert(1, "wmoid", wmoid)
d_file = prof_index['file'].str.split('/').str[3].str.startswith('D')
prof_index.insert(2, "D_file", d_file)
# Add profile_index column
prof_index.sort_values(by=['wmoid', 'date'], inplace=True)
prof_index.insert(0, "profile_index", 0)
prof_index['profile_index'] = prof_index.groupby('wmoid')['date'].cumcount() + 1
# Fill in source_settings information based off of sprof index file before removing rows
if self.download_settings.verbose:
print('Filling in source settings information...')
self.source_settings.set_dacs(prof_index)
return prof_index
def __mark_bgcs_in_prof(self):
""" A function to mark whether the floats listed in prof_index are
biogeochemical floats or not.
"""
bgc_floats = self.sprof_index['wmoid'].unique()
is_bgc = self.prof_index['wmoid'].isin(bgc_floats)
self.prof_index.insert(1, "is_bgc", is_bgc)
def __load_float_stats(self)-> pd:
""" Function to create a dataframe with float IDs,
their is_bgc status, and their most recent update
date for use in select_profiles().
Data for physical floats are taken from the prof index
file and data for BGC floats are taken from the Sprof index file.
"""
# Dataframe with wmoid and date updated for both prof and sprof
float_bgc_status_prof = self.prof_index.loc[~self.prof_index['is_bgc'], ['wmoid',
'date_update']]
float_bgc_status_sprof = self.sprof_index[['wmoid', 'date_update']]
# Only keeping rows with most recent date updated
floats_stats_prof = float_bgc_status_prof.groupby('wmoid',
as_index=False)['date_update'].max()
floats_stats_sprof = float_bgc_status_sprof.groupby('wmoid',
as_index=False)['date_update'].max()
# Adding the is_bgc column
floats_stats_sprof['is_bgc'] = True
floats_stats_prof['is_bgc'] = False
# Combining the two dataframes for one refrence frame for all floats
floats_stats = pd.concat([floats_stats_sprof, floats_stats_prof]).sort_values(by='wmoid')
return floats_stats
def __display_floats(self) -> None:
""" A function to display information about the number of floats initially
observed in the unfiltered dataframes.
"""
floats = self.prof_index['wmoid'].unique()
profiles = self.prof_index['file'].unique()
print(f"\n{len(floats)} floats with {len(profiles)} profiles found.")
bgc_floats = self.sprof_index['wmoid'].unique()
profiles = self.sprof_index['file'].unique()
print(f"{len(bgc_floats)} BGC floats with {len(profiles)} profiles found.")
def __validate_lon_lat_limits(self)-> None:
""" Function to validate the length, order, and contents of
longitude and latitude limits passed to select_profiles.
"""
if self.download_settings.verbose:
print('Validating longitude and latitude limits...')
# Validating Lists
if len(self.lon_lim) != len(self.lat_lim):
raise KeyError('The length of the longitude and latitude lists must be equal.')
if len(self.lon_lim) == 2:
if (self.lon_lim[1] <= self.lon_lim[0]) or (self.lat_lim[1] <= self.lat_lim[0]):
if self.download_settings.verbose:
print(f'Longitude Limits: min={self.lon_lim[0]} max={self.lon_lim[1]}')
print(f'Latitude Limits: min={self.lat_lim[0]} max={self.lat_lim[1]}')
raise KeyError('When passing longitude and latitude lists using the [min, max] ' +
'format, the max value must be greater than the min value.')
if ((abs(self.lon_lim[1] - self.lon_lim[0] - 360.0) < self.epsilon) and
(abs(self.lat_lim[1] - self.lat_lim[0] - 180.0) < self.epsilon)):
self.keep_full_geographic = True
else:
self.keep_full_geographic = False
# Validating latitudes
if not all(-90 <= lat <= 90 for lat in self.lat_lim):
print(f'Latitudes: {self.lat_lim}')
raise KeyError('Latitude values should be between -90 and 90.')
# Validate Longitudes
# Checking range of longitude values
lon_range = max(self.lon_lim) - min(self.lon_lim)
if lon_range > 360 or lon_range <= 0:
if self.download_settings.verbose:
print(f'Current longitude range: {lon_range}')
raise KeyError('The range between the maximum and minimum longitude values must be ' +
'between 0 and 360.')
# Adjusting values to fit between -180 and 360
if min(self.lon_lim) < -180:
if self.download_settings.verbose:
print('Adjusting within -180')
self.lon_lim = [lon + 360.00 for lon in self.lon_lim]
def __validate_start_end_dates(self):
""" A function to validate the start and end date strings passed to select_profiles and
converts them to datetimes for easier comparison to dataframe values later on.
"""
if self.download_settings.verbose:
print('Validating start and end dates...')
# Parse Strings to Datetime Objects
try:
# Check if the string matches the expected format
self.start_date = datetime.fromisoformat(self.start_date).replace(tzinfo=timezone.utc)
# end_date is optional and should be set to tomorrow if not provided
if self.end_date is not None:
self.end_date = datetime.fromisoformat(self.end_date).replace(tzinfo=timezone.utc)
else:
self.end_date = datetime.now(timezone.utc) + timedelta(days=1)
except ValueError:
print(f" Start date: {self.start_date} or end date: {self.end_date} is not in the " +
"expected format 'yyyy-mm-dd'")
# Validate datetimes
if self.start_date > self.end_date:
if self.download_settings.verbose:
print(f'Current start date: {self.start_date}')
print(f'Current end date: {self.end_date}')
raise ValueError('The start date must be before the end date.')
if self.start_date < datetime(1995, 1, 1, tzinfo=timezone.utc):
if self.download_settings.verbose:
print(f'Current start date: {self.start_date}')
raise ValueError('Start date must be after at least: ' +
f'{datetime(1995, 1, 1, tzinfo=timezone.utc)}.')
# Set to datetime64 for dataframe comparisons
self.start_date = np.datetime64(self.start_date)
self.end_date = np.datetime64(self.end_date)
def __validate_outside_kwarg(self):
""" A function to validate the value of the
optional 'outside' keyword argument.
"""
if self.download_settings.verbose:
print("Validating 'outside' keyword argument...")
if self.outside is not None:
if self.outside not in ('time', 'space', 'both'):
raise KeyError("The only acceptable values for the 'outside' keyword argument " +
"are 'time', 'space', and 'both'.")
def __validate_type_kwarg(self):
""" A function to validate the value of the
optional 'type' keyword argument.
"""
if self.download_settings.verbose:
print("Validating 'type' keyword argument...")
if self.float_type not in ('all', 'phys', 'bgc'):
raise KeyError("The only acceptable values for the 'type' keyword argument are 'all'," +
" 'phys', and 'bgc'.")
def __validate_floats_kwarg(self):
""" A function to validate the 'floats' keyword argument.
The 'floats' must be a list even if it is a single float.
If the floats passed are in a dictionary we separate the keys
from the dictionary for flexibility.
"""
if self.download_settings.verbose:
print("Validating passed floats...")
# If user has passed a dictionary
if isinstance(self.float_ids, dict):
self.float_profiles_dict = self.float_ids
self.float_ids = list(self.float_ids.keys())
# If user has passed a single float
elif not isinstance(self.float_ids, list):
self.float_profiles_dict = None
self.float_ids = [self.float_ids]
# If user has passed a list
else:
self.float_profiles_dict = None
# Finding float IDs that are not present in the index dataframes
missing_floats = [float_id for float_id in self.float_ids if float_id not in
self.prof_index['wmoid'].values]
if missing_floats:
raise KeyError("The following float IDs do not exist in the dataframes: " +
f"{missing_floats}")
def __validate_ocean_kwarg(self):
""" A function to validate the value of the
optional 'ocean' keyword argument.
"""
if self.download_settings.verbose:
print("Validating 'ocean' keyword argument...")
if self.ocean not in ('A', 'P', 'I'):
raise KeyError("The only acceptable values for the 'ocean' keyword argument are 'A' " +
"(Atlantic), 'P' (Pacific), and 'I' (Indian).")
def __validate_float_variables_arg(self):
""" A function to validate the value of the
optional 'variables' passed to
load_float_data.
"""
if self.download_settings.verbose:
print("Validating passed 'variables'...")
# If user has passed a single variable convert to list
if not isinstance(self.float_variables, list):
self.float_variables = [self.float_variables]
# Finding variables that are not present avaliable variables list
nonexistent_vars = [x for x in self.float_variables if x not in
self.source_settings.avail_vars]
if nonexistent_vars:
raise KeyError("The following variables do not exist in the dataframes: " +
f"{nonexistent_vars}")
def __validate_float_variables_and_permutations_arg(self):
""" A function to validate the value of the
optional 'variables' passed to
load_float_data.
"""
if self.download_settings.verbose:
print("Validating passed 'variables'...")
# If user has passed a single variable convert to list
if not isinstance(self.float_variables, list):
self.float_variables = [self.float_variables]
# Constructing list of variables avaliable for plotting
adjusted_variables = []
for variable in self.source_settings.avail_vars:
adjusted_variables.append(variable + '_ADJUSTED')
adjusted_variables.append(variable + '_ADJUSTED_ERROR')
available_variables = self.source_settings.avail_vars + adjusted_variables
# Finding variables that are not present in the available variables list
nonexistent_vars = [x for x in self.float_variables if x not in available_variables]
if nonexistent_vars:
raise KeyError("The following variables do not exist in the dataframes: " +
f"{nonexistent_vars}")
def __validate_float_data_dataframe(self):
""" A function to validate a dataframe passed
to sections() so ensure that it has the
expected columns for graphing section
plots.
"""
if self.download_settings.verbose:
print("Validating passed float_data_dataframe...")
# Check that the dataframe at the very least has wmoid and variable columns
required_columns = ['WMOID'] + self.float_variables
# Identify missing columns
missing_columns = set(required_columns) - set(self.float_data.columns)
if missing_columns:
raise KeyError("The following columns are missing from the dataframe: " +
f"{missing_columns}")
def __validate_plot_save_path(self, save_path: Path):
""" A function to validate that the save path passed
actually exists.
"""
if not save_path.exists():
print(f'{save_path} not found')
raise FileNotFoundError
def __prepare_selection(self):
""" A function that determines what dataframes will be loaded/used
when selecting floats. We determine what dataframes to load
based on two factors: type and passed floats.
If type is 'phys', the dataframe based on
ar_index_global_prof.txt will be used.
If type is 'bgc', the dataframe based on
argo_synthetic-profile_index.txt will be used.
If type is 'all', both dataframes are used.
BGC floats are taken from argo_synthetic-profile_index.txt,
non-BGC floats from ar_index_global_prof.txt.
If the user passed floats, we only load the passed floats
into the selection frames.
If keep_index_in_memory is set to false, the dataframes created
during Argo's constructor are deleted. In this function we only
reload the necessary dataframes into memory.
"""
if self.download_settings.verbose:
print('Preparing float data for filtering...')
selected_floats_phys = None
selected_floats_bgc = None
# Load dataframes into memory if they are not there
if not self.download_settings.keep_index_in_memory:
self.sprof_index = self.__load_sprof_dataframe()
self.prof_index = self.__load_prof_dataframe()
# We can only validate floats after the dataframes are loaded into memory
if self.float_ids:
self.__validate_floats_kwarg()
# If we aren't filtering from specific floats assign selected frames
# to the whole index frames
if self.float_ids is None:
self.selected_from_prof_index = self.prof_index[~self.prof_index['is_bgc']]
self.selected_from_sprof_index = self.sprof_index
# If we do have specific floats to filter from, assign
# selected floats by pulling those floats from the
# larger dataframes, only adding floats that match the
# type to the frames.
else:
# Empty default dataframes are needed for the len function below
self.selected_from_prof_index = pd.DataFrame({'wmoid': []})
self.selected_from_sprof_index = pd.DataFrame({'wmoid': []})
if self.float_type != 'phys':
# Make a list of bgc floats that the user wants
bgc_filter = ((self.float_stats['wmoid'].isin(self.float_ids)) &
(self.float_stats['is_bgc'] == True))
selected_floats_bgc = self.float_stats[bgc_filter]['wmoid'].tolist()
# Gather bgc profiles for these floats from sprof index frame
self.selected_from_sprof_index = \
self.sprof_index[self.sprof_index['wmoid'].isin(selected_floats_bgc)]
if self.float_type != 'bgc':
# Make a list of phys floats that the user wants
phys_filter = ((self.float_stats['wmoid'].isin(self.float_ids)) &
(self.float_stats['is_bgc'] == False))
selected_floats_phys = self.float_stats[phys_filter]['wmoid'].tolist()
# Gather phys profiles for these floats from prof index frame
self.selected_from_prof_index = \
self.prof_index[self.prof_index['wmoid'].isin(selected_floats_phys)]
if self.download_settings.verbose:
num_unique_floats = len(self.selected_from_sprof_index['wmoid'].unique()) + \
len(self.selected_from_prof_index['wmoid'].unique())
print(f"Filtering through {num_unique_floats} floats")
num_profiles = len(self.selected_from_sprof_index) + len(self.selected_from_prof_index)
print(f'There are {num_profiles} profiles associated with these floats\n')
def __narrow_profiles_by_criteria(self)-> dict:
""" A function to narrow down the available profiles to only those
that meet the criteria passed to select_profiles.
:return: narrowed_profiles : dict - A dictionary with float ID
keys corresponding to a list of profiles that match criteria.
"""
# Filter by time, space, and type constraints first.
if self.float_type == 'bgc' or self.selected_from_prof_index.empty:
# Empty df for concat
self.selection_frame_phys = pd.DataFrame()
else:
self.selection_frame_phys = \
self.__get_in_time_and_space_constraints(self.selected_from_prof_index)
if self.float_type == 'phys' or self.selected_from_sprof_index.empty:
# Empty df for concat
self.selection_frame_bgc = pd.DataFrame()
else:
self.selection_frame_bgc = \
self.__get_in_time_and_space_constraints(self.selected_from_sprof_index)
# Set the selection frame
self.selection_frame = pd.concat([self.selection_frame_bgc, self.selection_frame_phys])
# Remove extraneous frames
if not self.download_settings.keep_index_in_memory:
del self.sprof_index
del self.prof_index
del self.selection_frame_bgc
del self.selection_frame_phys
if self.selection_frame.empty:
if self.download_settings.verbose:
print('No matching floats found')
return {}
if self.download_settings.verbose:
print(f"{len(self.selection_frame['wmoid'].unique())} floats selected")
print(f'{len(self.selection_frame)} profiles selected according to time and space ' +
'constraints')