-
Notifications
You must be signed in to change notification settings - Fork 0
/
SolverManager.hpp
325 lines (266 loc) · 14.2 KB
/
SolverManager.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
/*==============================================================================
Solver Manager
This class handles the Execution Context mssage containing a time stamp and a
set of variable value assignments.It manages a time sorted queue and dispatches
the first application execution context to the solver when the solver is ready.
The solution returned for a given execution context will be published together
with the execution context and the maximal utility value found by the solver.
The solver actor class is given as a template argument to the solver manager,
and at least one solver actor is instantiated at start up. This to allow
multiple solvers to run in parallel should this be necessary to serve properly
the queue of waiting application execution contexts. If there are multiple
objects defined, they have to be optimised individualy, and for this purpose
it would also be useful to have multiple solvers running in parallel working
on the same problem, but for different objective functions. This will reduce
the time to find the Pareto front [1] for the multi-objective optimisation
problem.
The functionality of receiving and maintaining the work queue separately from
the solver is done to avoid blocking the reception of new execution contexts
while the solver searches for a solution in separate threads. This is done
for other entities to use the solver to find the optised configuration, i.e.
feasible value assignments to all propblem variables, maximising the givne
utiliy for a particular set of independent metric variables, i.e. the
application execution context. The idea is that other components may use
the solver in this way to produce training sets for machine learning methods
that aims to estimate the application's performance indicators or even the
change in utility as a function of the varying the metric values of the
application execution context.
References:
[1] https://en.wikipedia.org/wiki/Pareto_front
Author and Copyright: Geir Horn, University of Oslo
Contact: [email protected]
License: MPL2.0 (https://www.mozilla.org/en-US/MPL/2.0/)
==============================================================================*/
#ifndef NEBULOUS_SOLUTION_MANAGER
#define NEBULOUS_SOLUTION_MANAGER
// Standard headers
#include <string_view> // Constant strings
#include <string> // Normal strings
#include <map> // Multimap for the work queue
#include <unordered_set> // Solver ready status
#include <list> // Pool of local solvers
#include <ranges> // Range based views
#include <algorithm> // Standard algorithms
#include <iterator> // For inserters
#include <sstream> // For nice error messages
#include <stdexcept> // Standard exceptions
#include <source_location> // Error location reporting
#include <condition_variable> // Execution stop management
#include <mutex> // Lock the condtion variable
#include <tuple> // For constructing solvers
// Other packages
#include <nlohmann/json.hpp> // JSON object definition
using JSON = nlohmann::json; // Short form name space
#include <boost/core/demangle.hpp> // To print readable types
// Theron++ headers
#include "Actor.hpp" // Actor base class
#include "Utility/StandardFallbackHandler.hpp" // Exception unhanded messages
#include "Communication/NetworkingActor.hpp" // Networking actors
// AMQ communication headers
#include "Communication/AMQ/AMQjson.hpp" // For JSON metric messages
#include "Communication/AMQ/AMQEndpoint.hpp" // For AMQ related things
// NebulOuS headers
#include "ExecutionControl.hpp" // Shut down messages
#include "Solver.hpp" // The basic solver class
namespace NebulOuS
{
/*==============================================================================
Solution Manager
==============================================================================*/
template< SolverAlgorithm SolverType >
class SolverManager
: virtual public Theron::Actor,
virtual public Theron::StandardFallbackHandler,
virtual public Theron::NetworkingActor<
typename Theron::AMQ::Message::PayloadType >,
virtual public ExecutionControl
{
// There is a topic name used to publish solutions found by the solvers. This
// topic is given to the constructor and kept as a constant during the class
// execution. The same goes for the topic on which application execution
// contexts will arrive for processing.
private:
const Theron::AMQ::TopicName SolutionReceiver, ContextTopic;
// --------------------------------------------------------------------------
// Solver management
// --------------------------------------------------------------------------
//
// The solution manager dispatches the application execution contexts as
// requests for solutions to a pool of solvers.
std::list< SolverType > SolverPool;
std::unordered_set< Address > ActiveSolvers, PassiveSolvers;
// --------------------------------------------------------------------------
// Application Execution Context management
// --------------------------------------------------------------------------
//
// The contexts are dispatched in time sorted order. However, the time
// to solve a problem depends on the complexity of the the context and the
// results may therefore become available out-of-order.
std::multimap< Solver::TimePointType,
Solver::ApplicationExecutionContext > ContextQueue;
// When the new applicaton execution context message arrives, it will be
// queued, and its time point recoreded. If there are passive solvers,
// the handler will immediately dispatch the contexts to each of these in
// time order. Essentially, it implements a 'riffle' for the passive solvers
// and the pending contexts.The issue is that there are likely different
// cardinalities of the two sets, and the solvers should be marked as
// active after the dispatch and the contexts should be removed from the
// queue after the dispatch.
void DispatchToSolvers( void )
{
if( !PassiveSolvers.empty() && !ContextQueue.empty() )
{
for( const auto & [ SolverAddress, ContextElement ] :
std::ranges::views::zip( PassiveSolvers, ContextQueue ) )
Send( ContextElement.second, SolverAddress );
// The number of contexts dispatched must equal the minimum of the
// available solvers and the available contexts.
std::size_t DispatchedContexts
= std::min( PassiveSolvers.size(), ContextQueue.size() );
// Then move the passive solver addresses used to active solver addresses
std::ranges::move(
std::ranges::subrange( PassiveSolvers.begin(),
std::ranges::next( PassiveSolvers.begin(),
DispatchedContexts,
PassiveSolvers.end() ) ),
std::inserter( ActiveSolvers, ActiveSolvers.end() ) );
// Then the dispatched context identifiers are removed from queue
ContextQueue.erase( ContextQueue.begin(),
std::ranges::next( ContextQueue.begin(),
DispatchedContexts,
ContextQueue.end() ) );
}
}
// The handler function simply enqueues the received context, records its
// timesamp and dispatch as many contexts as possible to the solvers.
void HandleApplicationExecutionContext(
const Solver:: ApplicationExecutionContext & TheContext,
const Address TheRequester )
{
ContextQueue.emplace(
TheContext.at( Solver::ApplicationExecutionContext::Keys::TimeStamp
).get< Solver::TimePointType >(),
TheContext );
DispatchToSolvers();
}
// --------------------------------------------------------------------------
// Solutions
// --------------------------------------------------------------------------
//
// When a solution is received from a solver, it will be dispatched to all
// entities subscribing to the solution topic, and the solver will be returned
// to the pool of passive solvers. The dispatch function will be called at the
// end to ensure that the solver starts working on queued application execution
// contexts, if any.
void PublishSolution( const Solver::Solution & TheSolution,
const Address TheSolver )
{
Send( TheSolution, Address( SolutionReceiver ) );
PassiveSolvers.insert( ActiveSolvers.extract( TheSolver ) );
DispatchToSolvers();
}
// --------------------------------------------------------------------------
// Constructor and destructor
// --------------------------------------------------------------------------
//
// The constructor takes the name of the Solution Mnager Actor, the name of
// the topic where the solutions should be published, and the topic where the
// application execution contexts will be published. If the latter is empty,
// the manager will not listen to any externally generated requests, only those
// being sent from the Metric Updater supposed to exist on the same Actor
// system node as the manager.The final arguments to the constructor is a
// set of arguments to the solver type in the order expected by the solver
// type and repeated for the number of (local) solvers that should be created.
//
// Currently this manager does not support dispatching configurations to
// remote solvers and collect responses from these. However, this can be
// circumvented by creating a local "solver" transferring the requests to
// a remote solvers and collecting results from the remote solver.
public:
template< typename ...SolverArgTypes >
SolverManager( const std::string & TheActorName,
const Theron::AMQ::TopicName & SolutionTopic,
const Theron::AMQ::TopicName & ContextPublisherTopic,
const unsigned int NumberOfSolvers,
const std::string SolverRootName,
SolverArgTypes && ...SolverArguments )
: Actor( TheActorName ),
StandardFallbackHandler( Actor::GetAddress().AsString() ),
NetworkingActor( Actor::GetAddress().AsString() ),
ExecutionControl( Actor::GetAddress().AsString() ),
SolutionReceiver( SolutionTopic ),
ContextTopic( ContextPublisherTopic ),
SolverPool(), ActiveSolvers(), PassiveSolvers(),
ContextQueue()
{
// The solvers are created by expanding the arguments for the solvers
// one by one creating new elements in the solver pool. The solvers
// will be named with a sequence number from 1 and up added to the
// root solver name, e.g., if the root name is "MySolver" the solvers
// will have names "MySolver_1", "MySolver_2",... and so forth. Since
// all solvers are of the same type they should take the same arguments
// and so the given arguments are just fowarded to each solver constructor.
for( unsigned int i = 1; i <= NumberOfSolvers; i++ )
{
std::ostringstream TheSolverName;
TheSolverName << SolverRootName << "_" << i;
SolverPool.emplace_back( TheSolverName.str(),
std::forward< SolverArgTypes >(SolverArguments)... );
}
// If the solvers were successfully created, their addresses are recorded as
// passive servers, and a publisher is made for the solution channel, and
// optionally, a subscritpion is made for the alternative context publisher
// topic. If the solvers could not be created, then an invalid argument
// exception will be thrown.
if( !SolverPool.empty() )
{
std::ranges::transform( SolverPool,
std::inserter( PassiveSolvers, PassiveSolvers.end() ),
[](const SolverType & TheSolver){ return TheSolver.GetAddress(); } );
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::Publisher,
SolutionTopic ), GetSessionLayerAddress() );
if( !ContextPublisherTopic.empty() )
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::Subscription,
ContextPublisherTopic ), GetSessionLayerAddress() );
Send( ExecutionControl::StatusMessage(
ExecutionControl::StatusMessage::State::Started
), Address( ExecutionControl::StatusMessage::AMQTopic ) );
}
else
{
std::source_location Location = std::source_location::current();
std::ostringstream ErrorMessage;
ErrorMessage << "[" << Location.file_name() << " at line "
<< Location.line()
<< "in function " << Location.function_name() <<"] "
<< "It was not possible to construct any solver of type "
<< boost::core::demangle( typeid( SolverType ).name() )
<< " from the given constructor argument types: ";
(( ErrorMessage << boost::core::demangle( typeid( SolverArguments ).name() ) << " " ), ... );
throw std::invalid_argument( ErrorMessage.str() );
}
// Finally, the handlers for the messages are registered
RegisterHandler(this, &SolverManager::HandleApplicationExecutionContext );
RegisterHandler(this, &SolverManager::PublishSolution );
}
// The destructor closes all the open topics if the network is still open
// when the destructor is invoked.
virtual ~SolverManager( void )
{
if( HasNetwork() )
{
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::ClosePublisher,
SolutionReceiver
), GetSessionLayerAddress() );
Send( Theron::AMQ::NetworkLayer::TopicSubscription(
Theron::AMQ::NetworkLayer::TopicSubscription::Action::CloseSubscription,
ContextTopic
), GetSessionLayerAddress() );
}
}
};
} // namespace NebulOuS
#endif // NEBULOUS_SOLUTION_MANAGER