-
Notifications
You must be signed in to change notification settings - Fork 0
/
MetricUpdater.cpp
356 lines (303 loc) · 14.4 KB
/
MetricUpdater.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
/*==============================================================================
Metric Updater
This file implements the methods of the Metric Updater class subscribing to
the relevant metric values of the application and publishes a data file with
the metric values when a new solution is requested.
Author and Copyright: Geir Horn, University of Oslo
Contact: [email protected]
License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/)
==============================================================================*/
#include <ranges> // Better containers
#include <source_location> // Informative error messages
#include <sstream> // To format error messages
#include <stdexcept> // standard exceptions
#include <iterator> // Iterator support
#include <ranges> // Container ranges
#include <algorithm> // Algorithms
#include "Utility/ConsolePrint.hpp" // For logging
#include "Communication/AMQ/AMQEndpoint.hpp" // For Topic subscriptions
#include "MetricUpdater.hpp"
namespace NebulOuS
{
// --------------------------------------------------------------------------
// Subscribing to metric prediction values
// --------------------------------------------------------------------------
//
// The Optimiser controller defines the metric names used in the optimisatoin
// model, and the metric subscription will subscribe to these. It is allowed
// that the metric list may change during run-time, and therefore the message
// hadler will make subscriptions for new metrics and remove subscriptions for
// metrics that are not included in the list, but currently having
// subscriptions.
void MetricUpdater::AddMetricSubscription(
const MetricTopic & MetricDefinitions, const Address OptimiserController )
{
JSON TheMetrics = MetricDefinitions.at( MetricTopic::Keys::MetricList );
if( TheMetrics.is_array() )
{
// The first step is to try inserting the metrics into the metric value
// map and if this is successful, a subscription is created for the
// publisherof this metric value. The metric names are recorded since
// some of them may correspond to known metrics, some of them may
// correspond to metrics that are new.
std::set< std::string > TheMetricNames;
for (auto & MetricRecord : TheMetrics )
{
auto [ MetricRecordPointer, MetricAdded ] = MetricValues.try_emplace(
MetricRecord.get<std::string>(), JSON() );
TheMetricNames.insert( MetricRecordPointer->first );
// If a new metric was added, a subscription will be set up for this
// new metric, and the flag indicating that values have been received
// for all metrics will be reset.
if( MetricAdded )
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription,
std::string( MetricValueUpdate::MetricValueRootString )
+ MetricRecordPointer->first ),
GetSessionLayerAddress() );
}
// There could be some metric value records that were defined by the
// previous metrics defined, but missing from the new metric set. If
// this is the case, the metric value records for the missing metrics
// should be unsubcribed and their metric records removed.
for( const auto & TheMetric : std::views::keys( MetricValues ) )
if( !TheMetricNames.contains( TheMetric ) )
{
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription,
std::string( MetricValueUpdate::MetricValueRootString ) + TheMetric ),
GetSessionLayerAddress() );
MetricValues.erase( TheMetric );
}
// Finally the number of metrics that does not yet have a value is counted
// to ensure that these must be received before the application context
// can be forwarded to the solver manager.
if( MetricValues.empty() )
UnsetMetrics = 1;
else
UnsetMetrics = std::ranges::count_if( std::views::values( MetricValues ),
[](const auto & MetricValue){ return MetricValue.is_null(); } );
}
else
{
std::source_location Location = std::source_location::current();
std::ostringstream ErrorMessage;
ErrorMessage << "[" << Location.file_name() << " at line " << Location.line()
<< " in function " << Location.function_name() <<"] "
<< "The message to define the application's execution context "
<< "was given as: " << std::endl
<< std::endl << TheMetrics.dump(2) << std::endl
<< "this is not as expected!";
throw std::invalid_argument( ErrorMessage.str() );
}
Theron::ConsoleOutput Output;
Output << "Received metric subscription request: " << std::endl
<< MetricDefinitions.dump(2) << std::endl;
}
// The metric update value is received whenever any of subscribed forecasters
// has a new value for its metric. The format of the message is described in
// the project wiki page [1], with an example message given as
// {
// "metricValue": 12.34,
// "level": 1,
// "timestamp": 163532341,
// "probability": 0.98,
// "confidence_interval " : [8,15]
// "predictionTime": 163532342,
// }
//
// Currently only the metric value and the timestamp will be used from this
// record. It would be interesting in the future to explore ways to use the
// confidence interval in some Bayesian resoning about the true value.
//
// The sender address will contain the metric topic, but this will contain the
// generic metric prediction root string, and this string must be removed
// before the metric name can be updated.
//
// Note that the map's [] operator cannot be used to look up the topic in the
// current map because it assumes the implicit creation of non-existing keys,
// which means that an empty metric value record should be constructed first
// and then used. To modify the existing record, the 'at' function must be
// used.
void MetricUpdater::UpdateMetricValue(
const MetricValueUpdate & TheMetricValue, const Address TheMetricTopic)
{
Theron::ConsoleOutput Output;
Output << "Metric value received: " << std::endl
<< " Topic: " << TheMetricTopic.AsString() << std::endl
<< TheMetricValue.dump(2) << std::endl;
Theron::AMQ::TopicName TheTopic
= TheMetricTopic.AsString().erase( 0,
MetricValueUpdate::MetricValueRootString.size() );
if( MetricValues.contains( TheTopic ) )
{
MetricValues.at( TheTopic )
= TheMetricValue.at( MetricValueUpdate::Keys::ValueLabel );
ValidityTime = std::max( ValidityTime,
TheMetricValue.at(
MetricValueUpdate::Keys::TimePoint ).get< Solver::TimePointType >() );
if( UnsetMetrics )
UnsetMetrics = std::ranges::count_if( std::views::values( MetricValues ),
[](const auto & MetricValue){ return MetricValue.is_null(); } );
Output << "Metric " << TheTopic << " has new value "
<< MetricValues.at( TheTopic ) << std::endl;
}
else
{
Output << TheTopic << " is not a known metric and ignored " << std::endl;
}
}
// --------------------------------------------------------------------------
// Application lifcycle
// --------------------------------------------------------------------------
//
// When the lifecycle message is received, the state is just recorded in the
// state variable.
void MetricUpdater::LifecycleHandler(
const ApplicationLifecycle & TheState,
const Address TheLifecycleTopic )
{
Theron::ConsoleOutput Output;
ApplicationState = TheState;
Output << "Application state updated: " << std::endl
<< TheState.dump(2) << std::endl;
}
// The message handler used the conversion operator to read out the state
// carried in the message. It is based on having a static map from the textual
// representation of the state to the enumeration.
MetricUpdater::ApplicationLifecycle::operator State() const
{
static std::map< std::string_view, State > LifecycleStates{
{"NEW", State::New},
{"READY", State::Ready},
{"DEPLOYING", State::Deploying},
{"RUNNING", State::Running},
{"FAILED", State::Failed}
};
return LifecycleStates.at( this->at("state").get< std::string >() );
}
// --------------------------------------------------------------------------
// SLO Violation Events
// --------------------------------------------------------------------------
//
// When an SLO Violation is predicted a message is received from the SLO
// violation detector and this will trigger the definition of a new
// application execution context and a request to the Solution Manager to
// generate a new solution for this context.
//
// Note that the identifier of the application execution context is defined
// based on the time point of the severity message. The Optimiser controller
// must look for this identifier type on the solutions in order to decide
// which solutions to deploy.
//
// The message will be ignored if not all metric values have been received
// or if there are no metric values defined. In both cases the SLO violation
// message will just be ignored. In order to avoid the scan over all metrics
// to see if they are set, a boolean flag will be used and set once all metrics
// have values. Then future scans will be avoided.
// The message will be ignored if not all metric values have been received
// or if there are no metric values defined. In both cases the SLO violation
// message will just be ignored. In order to avoid the scan over all metrics
// to see if they are set, a boolean flag will be used and set once all metrics
// have values. Then future scans will be avoided.
void MetricUpdater::SLOViolationHandler(
const SLOViolation & SeverityMessage, const Address TheSLOTopic )
{
Theron::ConsoleOutput Output;
Output << "Metric Updater: SLO violation received " << std::endl
<< SeverityMessage.dump(2) << std::endl;
if(( ApplicationState == ApplicationLifecycle::State::Running ) &&
( UnsetMetrics == 0 ) )
{
Send( Solver::ApplicationExecutionContext(
SeverityMessage.at(
MetricValueUpdate::Keys::TimePoint ).get< Solver::TimePointType >(),
MetricValues, true
), TheSolverManager );
ApplicationState = ApplicationLifecycle::State::Deploying;
}
else
{
Output << "... failed to forward the application execution context (size: "
<< MetricValues.size() << "," << " Unset: " << UnsetMetrics
<< " Application state: " << ApplicationState
<< ")" << std::endl;
if( MetricValues.empty() )
Output << "The Metric Value map is empty! " << std::endl;
else
for( auto & MetricRecord : MetricValues )
Output << MetricRecord.first << " with value "
<< MetricRecord.second.dump(2) << " end " << std::endl;
}
}
// --------------------------------------------------------------------------
// Constructor and destructor
// --------------------------------------------------------------------------
//
// The constructor initialises the base classes and sets the validity time
// to zero so that it will be initialised by the first metric values received.
// The message handlers are registered, and the the updater will then subscribe
// to the two topics published by the Optimisation Controller: One for the
// initial message defining the metrics and the associated topics to subscribe
// to for their values, and the second to know when a reconfiguration has been
// enacted based on a previously sent application execution context. One
// subscritpion is also made to receive the SLO violation message indicating
// that the running configuration is no longer valid and that a reconfiguration
// must be made.
MetricUpdater::MetricUpdater( const std::string UpdaterName,
const Address ManagerOfSolvers )
: Actor( UpdaterName ),
StandardFallbackHandler( Actor::GetAddress().AsString() ),
NetworkingActor( Actor::GetAddress().AsString() ),
MetricValues(), ValidityTime(0), UnsetMetrics(1),
ApplicationState( ApplicationLifecycle::State::New ),
TheSolverManager( ManagerOfSolvers )
{
RegisterHandler( this, &MetricUpdater::AddMetricSubscription );
RegisterHandler( this, &MetricUpdater::UpdateMetricValue );
RegisterHandler( this, &MetricUpdater::LifecycleHandler );
RegisterHandler( this, &MetricUpdater::SLOViolationHandler );
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription,
MetricTopic::AMQTopic ),
GetSessionLayerAddress() );
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription,
ApplicationLifecycle::AMQTopic ),
GetSessionLayerAddress() );
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription,
SLOViolation::AMQTopic ),
GetSessionLayerAddress() );
}
// The destructor is closing the established subscription if the network is
// still running. If this is called when the application is closing the network
// connection should be stopped, and in that case all subscriptions will be
// automatically cancelled.
MetricUpdater::~MetricUpdater()
{
if( HasNetwork() )
{
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription,
MetricTopic::AMQTopic ),
GetSessionLayerAddress() );
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription,
ApplicationLifecycle::AMQTopic ),
GetSessionLayerAddress() );
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription,
SLOViolation::AMQTopic ),
GetSessionLayerAddress() );
std::ranges::for_each( std::views::keys( MetricValues ),
[this]( const Theron::AMQ::TopicName & TheMetricTopic ){
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription,
std::string( MetricValueUpdate::MetricValueRootString )
+ TheMetricTopic ),
GetSessionLayerAddress() );
});
}
}
} // End name space NebulOuS