forked from ibpsa/project1-boptest
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtestcase.py
740 lines (601 loc) · 22.2 KB
/
testcase.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
# -*- coding: utf-8 -*-
"""
This module defines the API to the test case used by the REST requests to
perform functions such as advancing the simulation, retreiving test case
information, and calculating and reporting results.
"""
from pyfmi import load_fmu
import numpy as np
import copy
import time
from data.data_manager import Data_Manager
from forecast.forecaster import Forecaster
from kpis.kpi_calculator import KPI_Calculator
class TestCase(object):
'''Class that implements the test case.
'''
def __init__(self, fmupath='models/wrapped.fmu'):
'''Constructor.
Parameters
----------
fmupath : str, optional
Path to the test case fmu.
Default is assuming a particular directory structure.
'''
# Set BOPTEST version number
with open('version.txt', 'r') as f:
self.version = f.read()
# Set test case fmu path
self.fmupath = fmupath
# Instantiate a data manager for this test case
self.data_manager = Data_Manager(testcase=self)
# Load data and the kpis_json for the test case
self.data_manager.load_data_and_jsons()
# Instantiate a forecaster for this test case
self.forecaster = Forecaster(testcase=self)
# Define name
self.name = self.config_json['name']
# Load fmu
self.fmu = load_fmu(self.fmupath)
self.fmu.set_log_level(7)
# Get version and check is 2.0
self.fmu_version = self.fmu.get_version()
if self.fmu_version != '2.0':
raise ValueError('FMU must be version 2.0.')
# Get building area
self.area = self.config_json['area']
# Get available control inputs and outputs
self.input_names = self.fmu.get_model_variables(causality = 2).keys()
self.output_names = self.fmu.get_model_variables(causality = 3).keys()
# Set default communication step
self.set_step(self.config_json['step'])
# Set default forecast parameters
self.set_forecast_parameters(self.config_json['horizon'], self.config_json['interval'])
# Initialize simulation data arrays
self.__initilize_data()
# Set default fmu simulation options
self.options = self.fmu.simulate_options()
self.options['CVode_options']['rtol'] = 1e-6
self.options['CVode_options']['store_event_points'] = False
self.options['filter'] = self.output_names + self.input_names
# Instantiate a KPI calculator for the test case
self.cal = KPI_Calculator(testcase=self)
# Initialize test case
self.initialize(self.config_json['start_time'], self.config_json['warmup_period'])
# Set default scenario
self.set_scenario(self.config_json['scenario'])
def __initilize_data(self):
'''Initializes objects for simulation data storage.
Uses self.output_names and self.input_names to create
self.y, self.y_store, self.u, self.u_store,
self.inputs_metadata, self.outputs_metadata.
Parameters
----------
None
Returns
-------
None
'''
# Get input and output meta-data
self.inputs_metadata = self._get_var_metadata(self.fmu, self.input_names, inputs=True)
self.outputs_metadata = self._get_var_metadata(self.fmu, self.output_names)
# Outputs data
self.y = {'time':np.array([])}
for key in self.output_names:
# Do not store outputs that are current values of control inputs
flag = False
for key_u in self.input_names:
if key[:-2] == key_u[:-2]:
flag = True
break
if flag:
# Remove outputs that are current values of control inputs
# from outputs metadata dictionary
self.outputs_metadata.pop(key)
else:
self.y[key] = np.array([])
self.y_store = copy.deepcopy(self.y)
# Inputs data
self.u = {'time':np.array([])}
for key in self.input_names:
self.u[key] = np.array([])
self.u_store = copy.deepcopy(self.u)
def __simulation(self,start_time,end_time,input_object=None):
'''Simulates the FMU using the pyfmi fmu.simulate function.
Parameters
----------
start_time: int
Start time of simulation in seconds.
final_time: int
Final time of simulation in seconds.
input_object: pyfmi input_object, optional
Input object for simulation
Default is None
Returns
-------
res: pyfmi results object
Results of the fmu simulation.
'''
# Set fmu initialization option
self.options['initialize'] = self.initialize_fmu
# Set sample rate
self.options['ncp'] = int((end_time-start_time)/30)
# Simulate fmu
try:
res = self.fmu.simulate(start_time = start_time,
final_time = end_time,
options=self.options,
input=input_object)
except Exception as e:
return None
# Set internal fmu initialization
self.initialize_fmu = False
return res
def __get_results(self, res, store=True, store_initial=False):
'''Get results at the end of a simulation and throughout the
simulation period for storage. This method assigns these results
to `self.y` and, if `store=True`, also to `self.y_store` and
to `self.u_store`.
This method is used by `initialize()` and `advance()` to retrieve
results. `initialize()` does not store results whereas `advance()`
does.
Parameters
----------
res: pyfmi results object
Results of the fmu simulation.
store: boolean
Set to true if desired to store results in `self.y_store` and
`self.u_store`
store_initial: boolean
Set to true if desired to store initial point.
'''
# Determine if store initial point
if store_initial:
i = 0
else:
i = 1
# Store measurements
for key in self.y.keys():
self.y[key] = res[key][-1]
if store:
self.y_store[key] = np.append(self.y_store[key], res[key][i:])
# Store control signals (will be baseline if not activated, test controller input if activated)
for key in self.u.keys():
# Replace '_u' and '_y' for key used to collect data and don't overwrite time
if key[-2:] == '_u':
key_data = key[:-2]+'_y'
elif key == 'time':
key_data = 'time'
else:
key_data = key
self.u[key] = res[key_data][-1]
if store:
self.u_store[key] = np.append(self.u_store[key], res[key_data][i:])
def advance(self,u):
'''Advances the test case model simulation forward one step.
Parameters
----------
u : dict
Defines the control input data to be used for the step.
{<input_name> : <input_value>}
Returns
-------
z : dict
Contains the full state of measurement and input data at the end
of the step.
{<point_name> : <point_value>}
If empty, simulation end time has been reached.
If None, a simulation error has occured.
'''
# Calculate and store the elapsed time
if hasattr(self, 'tic_time'):
self.tac_time = time.time()
self.elapsed_control_time_ratio = np.append(self.elapsed_control_time_ratio, (self.tac_time-self.tic_time)/self.step)
# Set final time
self.final_time = self.start_time + self.step
# Set control inputs if they exist and are written
# Check if possible to overwrite
if u.keys():
# If there are overwriting keys available
# Check that any are overwritten
written = False
for key in u.keys():
if u[key]:
written = True
break
# If there are, create input object
if written:
u_list = []
u_trajectory = self.start_time
for key in u.keys():
if key != 'time' and u[key]:
value = float(u[key])
# Check min/max if not activation input
if '_activate' not in key:
checked_value = self._check_value_min_max(key, value)
else:
checked_value = value
u_list.append(key)
u_trajectory = np.vstack((u_trajectory, checked_value))
input_object = (u_list, np.transpose(u_trajectory))
# Otherwise, input object is None
else:
input_object = None
# Otherwise, input object is None
else:
input_object = None
# Simulate if not end of test
if self.start_time < self.end_time:
# Make sure stop at end of test
if self.final_time > self.end_time:
self.final_time = self.end_time
res = self.__simulation(self.start_time,self.final_time,input_object)
# Process results
if res is not None:
# Get result and store measurement and control inputs
self.__get_results(res, store=True, store_initial=False)
# Advance start time
self.start_time = self.final_time
# Raise the flag to compute time lapse
self.tic_time = time.time()
# Get full current state
z = self._get_full_current_state()
return z
else:
# Error in simulation
return None
else:
# Simulation at end time
return dict()
def initialize(self, start_time, warmup_period, end_time=np.inf):
'''Initialize the test simulation.
Parameters
----------
start_time: int
Start time of simulation to initialize to in seconds.
warmup_period: int
Length of time before start_time to simulate for warmup in seconds.
end_time: int, optional
Specifies a finite end time to allow simulation to continue
Default value is infinite.
Returns
-------
z : dict
Contains the full state of measurement and input data at the end
of the initialization.
{<point_name> : <point_value>}.
If None, a simulation error has occured.
'''
# Reset fmu
self.fmu.reset()
# Reset simulation data storage
self.__initilize_data()
self.elapsed_control_time_ratio =np.array([])
# Record initial testing time
self.initial_time = start_time
# Record end testing time
self.end_time = end_time
# Set fmu intitialization
self.initialize_fmu = True
# Simulate fmu for warmup period.
# Do not allow negative starting time to avoid confusions
res = self.__simulation(max(start_time-warmup_period,0), start_time)
# Process result
if res is not None:
# Get result
self.__get_results(res, store=True, store_initial=True)
# Set internal start time to start_time
self.start_time = start_time
# Initialize KPI Calculator
self.cal.initialize()
# Get full current state
z = self._get_full_current_state()
return z
else:
return None
def get_step(self):
'''Returns the current simulation step in seconds.'''
return self.step
def set_step(self,step):
'''Sets the simulation step in seconds.
Parameters
----------
step : int
Simulation step in seconds.
Returns
-------
None
'''
self.step = float(step)
return None
def get_inputs(self):
'''Returns a dictionary of control inputs and their meta-data.
Parameters
----------
None
Returns
-------
inputs : dict
Dictionary of control inputs and their meta-data.
'''
inputs = self.inputs_metadata
return inputs
def get_measurements(self):
'''Returns a dictionary of measurements and their meta-data.
Parameters
----------
None
Returns
-------
measurements : dict
Dictionary of measurements and their meta-data.
'''
measurements = self.outputs_metadata
return measurements
def get_results(self, var, start_time, final_time):
'''Returns measurement and control input trajectories.
Parameters
----------
var : str
Name of variable.
start_time : float
Start time of data to return in seconds.
final_time : float
Start time of data to return in seconds.
Returns
-------
Y : dict or None
Dictionary of variable trajectories with time as lists.
{'time':[<time_data>],
'var':[<var_data>]
}
Returns None if no variable can be found
'''
# Get correct point
if var in self.y_store.keys():
Y = {'time':self.y_store['time'],
var:self.y_store[var]
}
elif var in self.u_store.keys():
Y = {'time':self.u_store['time'],
var:self.u_store[var]
}
else:
Y = None
return Y
# Get correct time
time1 = Y['time']
for key in [var,'time']:
Y[key] = Y[key][time1>=start_time]
time2 = time1[time1>=start_time]
Y[key] = Y[key][time2<=final_time]
return Y
def get_kpis(self):
'''Returns KPI data.
Requires standard sensor signals.
Parameters
----------
None
Returns
-------
kpis : dict
Dictionary containing KPI names and values.
{<kpi_name>:<kpi_value>}
'''
# Set correct price scenario for cost
if self.scenario['electricity_price'] == 'constant':
price_scenario = 'Constant'
elif self.scenario['electricity_price'] == 'dynamic':
price_scenario = 'Dynamic'
elif self.scenario['electricity_price'] == 'highly_dynamic':
price_scenario = 'HighlyDynamic'
# Calculate the core kpis
kpis = self.cal.get_core_kpis(price_scenario=price_scenario)
return kpis
def set_forecast_parameters(self,horizon,interval):
'''Sets the forecast horizon and interval, both in seconds.
Parameters
----------
horizon : int
Forecast horizon in seconds.
interval : int
Forecast interval in seconds.
Returns
-------
None
'''
self.horizon = float(horizon)
self.interval = float(interval)
return None
def get_forecast_parameters(self):
'''Returns the current forecast horizon and interval parameters.'''
forecast_parameters = dict()
forecast_parameters['horizon'] = self.horizon
forecast_parameters['interval'] = self.interval
return forecast_parameters
def get_forecast(self):
'''Returns the test case data forecast
Parameters
----------
None
Returns
-------
forecast : dict
Dictionary with the requested forecast data
{<variable_name>:<variable_forecast_trajectory>}
where <variable_name> is a string with the variable
key and <variable_forecast_trajectory> is a list with
the forecasted values. 'time' is included as a variable
'''
# Get the forecast
forecast = self.forecaster.get_forecast(horizon=self.horizon,
interval=self.interval)
return forecast
def set_scenario(self, scenario):
'''Sets the case scenario.
Parameters
----------
scenario : dict
{'electricity_price': <'constant' or 'dynamic' or 'highly_dynamic'>,
'time_period': see available keys for test case
}
If any value is None, it will not change existing.
Returns
-------
result : dict
{'electricity_price': if succeeded in changing then True, else None,
'time_period': if succeeded then initial measurements, else None
}
'''
result = {'electricity_price':None,
'time_period':None}
if not hasattr(self,'scenario'):
self.scenario = {}
# Handle electricity price
if scenario['electricity_price']:
self.scenario['electricity_price'] = scenario['electricity_price']
result['electricity_price'] = True
# Handle timeperiod
if scenario['time_period']:
self.scenario['time_period'] = scenario['time_period']
warmup_period = 7*24*3600
key = self.scenario['time_period']
start_time = self.days_json[key]*24*3600-7*24*3600
end_time = start_time + 14*24*3600
result['time_period'] = self.initialize(start_time, warmup_period, end_time=end_time)
# It's needed to reset KPI Calculator when scenario is changed
self.cal.initialize()
return result
def get_scenario(self):
'''Returns the current case scenario.'''
scenario = self.scenario
return scenario
def get_name(self):
'''Returns the name of the test case fmu.
Parameters
----------
None
Returns
-------
name : dict
Name of test case as {'name': <str>}
'''
name = {'name':self.name}
return name
def get_elapsed_control_time_ratio(self):
'''Returns the elapsed control time ratio vector for the case.
Parameters
----------
None
Returns
-------
elapsed_control_time_ratio : np array of floats
elapsed_control_time_ratio for each control step.
'''
elapsed_control_time_ratio = self.elapsed_control_time_ratio
return elapsed_control_time_ratio
def get_version(self):
'''Returns the version number of BOPTEST.
Parameters
----------
None
Returns
-------
version : dict
Version of BOPTEST as {'version': <str>}
'''
return {'version':self.version}
def _get_var_metadata(self, fmu, var_list, inputs=False):
'''Build a dictionary of variables and their metadata.
Parameters
----------
fmu : pyfmi fmu object
FMU from which to get variable metadata
var_list : list of str
List of variable names
Returns
-------
var_metadata : dict
Dictionary of variable names as keys and metadata as fields.
{<var_name_str> :
"Unit" : str,
"Description" : str,
"Minimum" : float,
"Maximum" : float
}
'''
# Inititalize
var_metadata = dict()
# Get metadata
for var in var_list:
# Units
if var == 'time':
unit = 's'
description = 'Time of simulation'
mini = None
maxi = None
elif '_activate' in var:
unit = None
description = fmu.get_variable_description(var)
mini = None
maxi = None
else:
unit = fmu.get_variable_unit(var)
description = fmu.get_variable_description(var)
if inputs:
mini = fmu.get_variable_min(var)
maxi = fmu.get_variable_max(var)
else:
mini = None
maxi = None
var_metadata[var] = {'Unit':unit,
'Description':description,
'Minimum':mini,
'Maximum':maxi}
return var_metadata
def _check_value_min_max(self, var, value):
'''Check that the input value does not violate the min or max.
Note that if it does, the value is truncated to the minimum or maximum.
Parameters
----------
var : str
Name of variable
value : numeric
Specified value of variable
Return
------
checked_value : float
Value of variable truncated by min and max.
'''
# Get minimum and maximum for variable
mini = self.inputs_metadata[var]['Minimum']
maxi = self.inputs_metadata[var]['Maximum']
# Check the value and truncate if necessary
if value > maxi:
checked_value = maxi
print('WARNING: Value of {0} for {1} is above maximum of {2}. Using {2}.'.format(value, var, maxi))
elif value < mini:
checked_value = mini
print('WARNING: Value of {0} for {1} is below minimum of {2}. Using {2}.'.format(value, var, mini))
else:
checked_value = value
return checked_value
def _get_area(self):
'''Get the building floor area in m^2.
Returns
-------
area : float
Building floor area in m^2
'''
area = self.area
return area
def _get_full_current_state(self):
'''Combines the self.y and self.u dictionaries into one.
Returns
-------
z : dict
Combination of self.y and self.u dictionaries.
'''
z = self.y.copy()
z.update(self.u)
return z