Skip to content

Commit

Permalink
Fixes swallowed exceptions in computation thread
Browse files Browse the repository at this point in the history
Exceptions are now transferred from the computation thread
to the main one using std::future mechanisms.
Only the first encountered exception is thrown in the main
thread.

Signed-off-by: Sylvain Leclerc <[email protected]>
  • Loading branch information
sylvlecl committed Oct 6, 2023
1 parent 8c362df commit 5067673
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/libs/antares/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ add_subdirectory(object)

add_subdirectory(array)
add_subdirectory(correlation)

add_subdirectory(concurrency)

add_subdirectory(logs)
add_subdirectory(jit)
Expand Down
9 changes: 9 additions & 0 deletions src/libs/antares/concurrency/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@


add_library(concurrency)
add_library(Antares::concurrency ALIAS concurrency)

target_sources(concurrency PRIVATE concurrency.cpp)
target_include_directories(concurrency PUBLIC include)

target_link_libraries(concurrency yuni-static-core)
53 changes: 53 additions & 0 deletions src/libs/antares/concurrency/concurrency.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//
// Created by leclercsyl on 05/10/23.
//
#include <memory>
#include "yuni/job/job.h"
#include "antares/concurrency/concurrency.h"

namespace Antares::Concurrency
{

class JobImpl : public Yuni::Job::IJob {
public:
JobImpl(const Task& task) : task_(task) {}

TaskFuture getFuture() {
return task_.get_future();
}

protected:
void onExecute() override {
task_();
}

private:
std::packaged_task<void()> task_;
};

std::unique_ptr<Yuni::Job::IJob> MakeJob(const Task& task) {
return std::make_unique<JobImpl>(task);
}

std::unique_ptr<JobImpl> MakePackagedJob(const Task& task) {
return std::make_unique<JobImpl>(task);
}

std::future<void> AddTask(Yuni::Job::QueueService& threadPool, const Task& task) {
auto job = MakePackagedJob(task);
auto future = job->getFuture();
threadPool.add(Yuni::Job::IJob::Ptr(job.release()));
return future;
}

void FutureSet::add(TaskFuture&& f) {
futures_.push_back(std::move(f));
}

void FutureSet::join() {
for (auto& f: futures_) {
f.get();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//
// Created by leclercsyl on 05/10/23.
//

#ifndef ANTARES_CONCURRENCY_H
#define ANTARES_CONCURRENCY_H

#include <future>
#include "yuni/job/queue/service.h"

namespace Antares::Concurrency
{

typedef std::function<void()> Task;
typedef std::future<void> TaskFuture;

/*!
* \brief Queues the provided function and returns the corresponding std::future.
*/
TaskFuture AddTask(Yuni::Job::QueueService& threadPool, const Task& task);

/*!
* \brief Utility class to gather futures to wait for.
*/
class FutureSet
{
public:
/*!
* \brief Adds one future to be monitored by this set.
*
* Note: the provided future will be left in "moved from" state.
*/
void add(TaskFuture&& f);

/*!
* \brief Waits for completion of all added futures.
*
* If one of the future ends on exception, re-throws the first encountered exception.
*/
void join();

private:
std::vector<TaskFuture> futures_;
};

}


#endif //ANTARES_CONCURRENCY_H
2 changes: 2 additions & 0 deletions src/solver/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ add_library(antares-solver-main-adequacy ${SRC_SOLVER_MAIN_ADEQUACY})
target_link_libraries(antares-solver-main-adequacy
PRIVATE
Antares::infoCollection
antares-solver-simulation
)

add_library(antares-solver-main-economy ${SRC_SOLVER_MAIN_ECONOMY})
target_link_libraries(antares-solver-main-economy
PRIVATE
Antares::infoCollection
antares-solver-simulation
)
1 change: 1 addition & 0 deletions src/solver/simulation/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,5 @@ target_link_libraries(antares-solver-simulation PRIVATE
yuni-static-core
Antares::study
Antares::result_writer
Antares::concurrency
)
40 changes: 23 additions & 17 deletions src/solver/simulation/solver.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,15 @@
#include <yuni/core/system/suspend.h>
#include <yuni/job/job.h>

#include "antares/concurrency/concurrency.h"

namespace Antares::Solver::Simulation
{

using namespace Antares::Concurrency;

template<class Impl>
class yearJob final : public Yuni::Job::IJob
class yearJob
{
public:
yearJob(ISimulation<Impl>* simulation,
Expand Down Expand Up @@ -133,7 +137,8 @@ private:
}
}

virtual void onExecute() override
public:
void operator()()
{
Progression::Task progression(study, y, Solver::Progression::sectYear);

Expand Down Expand Up @@ -964,6 +969,7 @@ void ISimulation<Impl>::loopThroughYears(uint firstYear,
std::vector<unsigned int>::iterator year_it;

bool yearPerformed = false;
FutureSet results;
for (year_it = set_it->yearsIndices.begin(); year_it != set_it->yearsIndices.end();
++year_it)
{
Expand All @@ -983,21 +989,20 @@ void ISimulation<Impl>::loopThroughYears(uint firstYear,
// have to be rerun (meaning : they must be run once). if(!set_it->yearFailed[y])
// continue;

pQueueService->add(
new yearJob<ImplementationType>(this,
y,
set_it->yearFailed,
set_it->isFirstPerformedYearOfASet,
pFirstSetParallelWithAPerformedYearWasRun,
numSpace,
randomForParallelYears,
performCalculations,
study,
state,
pYearByYear,
pDurationCollector,
pResultWriter));

std::function<void()> task = yearJob<ImplementationType>(this,
y,
set_it->yearFailed,
set_it->isFirstPerformedYearOfASet,
pFirstSetParallelWithAPerformedYearWasRun,
numSpace,
randomForParallelYears,
performCalculations,
study,
state,
pYearByYear,
pDurationCollector,
pResultWriter);
results.add(AddTask(*pQueueService, task));
} // End loop over years of the current set of parallel years

logPerformedYearsInAset(*set_it);
Expand All @@ -1006,6 +1011,7 @@ void ISimulation<Impl>::loopThroughYears(uint firstYear,

pQueueService->wait(Yuni::qseIdle);
pQueueService->stop();
results.join();

// At this point, the first set of parallel year(s) was run with at least one year performed
if (!pFirstSetParallelWithAPerformedYearWasRun && yearPerformed)
Expand Down
1 change: 1 addition & 0 deletions src/tests/src/libs/antares/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
add_subdirectory(concurrency)
add_subdirectory(study)

set(src_libs_antares "${CMAKE_SOURCE_DIR}/libs/antares")
Expand Down
12 changes: 12 additions & 0 deletions src/tests/src/libs/antares/concurrency/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@

add_executable(test-concurrency ${SRC_LINK_PROPERTIES})

target_sources(test-concurrency PRIVATE test_concurrency.cpp)

target_link_libraries(test-concurrency
PRIVATE
Boost::unit_test_framework
Antares::concurrency
)

add_test(NAME concurrency COMMAND test-concurrency)
76 changes: 76 additions & 0 deletions src/tests/src/libs/antares/concurrency/test_concurrency.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//
// Created by leclercsyl on 06/10/23.
//
#define BOOST_TEST_MODULE test-concurrency tests
#define BOOST_TEST_DYN_LINK
#include <boost/test/unit_test.hpp>
#include <boost/test/data/test_case.hpp>

#include "antares/concurrency/concurrency.h"

using namespace Yuni::Job;
using namespace Antares::Concurrency;

std::unique_ptr<QueueService> createThreadPool(int size)
{
auto threadPool = std::make_unique<QueueService>();
threadPool->maximumThreadCount(size);
threadPool->start();
return threadPool;
}

BOOST_AUTO_TEST_CASE(test_no_error)
{
auto threadPool = createThreadPool(1);
int counter = 0;
Task incrementCounter = [&counter]() {
counter++;
};
TaskFuture future = AddTask(*threadPool, incrementCounter);
future.get();
BOOST_CHECK(counter == 1);
}


template <class Exc>
Task failingTask() {
return []() {
throw Exc();
};
}

class TestException {};

BOOST_AUTO_TEST_CASE(test_throw)
{
auto threadPool = createThreadPool(1);
TaskFuture future = AddTask(*threadPool, failingTask<TestException>());
BOOST_CHECK_THROW(future.get(), TestException);
}

BOOST_AUTO_TEST_CASE(test_future_set)
{
auto threadPool = createThreadPool(4);
int counter = 0;
Task incrementCounter = [&counter]() {
counter++;
};
FutureSet futures;
for (int i = 0; i < 10; i++) {
futures.add(AddTask(*threadPool, incrementCounter));
}
futures.join();
BOOST_CHECK(counter == 10);
}

class TestException1 {};
class TestException2 {};

BOOST_AUTO_TEST_CASE(test_future_set_rethrows_first)
{
auto threadPool = createThreadPool(2);
FutureSet futures;
futures.add(AddTask(*threadPool, failingTask<TestException1>()));
futures.add(AddTask(*threadPool, failingTask<TestException2>()));
BOOST_CHECK_THROW(futures.join(), TestException1);
}

0 comments on commit 5067673

Please sign in to comment.