Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster device initialization by multi-threading. #519

Merged
merged 8 commits into from
Nov 6, 2024
6 changes: 6 additions & 0 deletions MMCore/CoreFeatures.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ const auto& featureMap() {
// enabled perhaps a few years later.
}
},
{
"ParallelDeviceInitialization", {
[] { return g_flags.ParallelDeviceInitialization; },
[](bool e) { g_flags.ParallelDeviceInitialization = e; }
}
},
// How to add a new Core feature: see the comment at the top of this file.
// Features (the string names) must never be removed once added!
};
Expand Down
1 change: 1 addition & 0 deletions MMCore/CoreFeatures.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace features {

struct Flags {
bool strictInitializationChecks = false;
bool ParallelDeviceInitialization = true;
// How to add a new Core feature: see the comment in the .cpp file.
};

Expand Down
159 changes: 156 additions & 3 deletions MMCore/MMCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,13 @@
#include <cassert>
#include <chrono>
#include <cstring>
#include <deque>
#include <fstream>
#include <future>
#include <map>
#include <set>
#include <sstream>
#include <thread>
#include <vector>

#ifdef _MSC_VER
Expand Down Expand Up @@ -108,7 +112,7 @@
* (Keep the 3 numbers on one line to make it easier to look at diffs when
* merging/rebasing.)
*/
const int MMCore_versionMajor = 11, MMCore_versionMinor = 1, MMCore_versionPatch = 1;
const int MMCore_versionMajor = 11, MMCore_versionMinor = 2, MMCore_versionPatch = 1;


///////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -196,6 +200,11 @@ CMMCore::~CMMCore()
* attempted on a device that is not successfully initialized. When disabled,
* no exception is thrown and a warning is logged (and the operation may
* potentially cause incorrect behavior or a crash).
* - "ParallelDeviceInitialization" (default: enabled) When enabled, serial ports
* are initialized in serial order, and all other devices are in parallel, using
* multiple threads, one per device module. Early testing shows this to be
* reliable, but switch this off when issues are encountered during
* device initialization.
*
* Permanently enabled features:
* - None so far.
Expand Down Expand Up @@ -840,17 +849,34 @@ void CMMCore::reset() throw (CMMError)
}


/**
* Calls Initialize() method for each loaded device.
* Parallel implemnetation should be faster
*/
void CMMCore::initializeAllDevices() throw (CMMError)
{
if (this->isFeatureEnabled("ParallelDeviceInitialization"))
{
initializeAllDevicesParallel();
}
else
{
initializeAllDevicesSerial();
}
}


/**
* Calls Initialize() method for each loaded device.
* This method also initialized allowed values for core properties, based
* on the collection of loaded devices.
*/
void CMMCore::initializeAllDevices() throw (CMMError)
void CMMCore::initializeAllDevicesSerial() throw (CMMError)
{
std::vector<std::string> devices = deviceManager_->GetDeviceList();
LOG_INFO(coreLogger_) << "Will initialize " << devices.size() << " devices";

for (size_t i=0; i<devices.size(); i++)
for (size_t i = 0; i < devices.size(); i++)
{
std::shared_ptr<DeviceInstance> pDevice;
try {
Expand All @@ -873,6 +899,133 @@ void CMMCore::initializeAllDevices() throw (CMMError)
updateCoreProperties();
}


/**
* Calls Initialize() method for each loaded device.
* This implementation initializes devices on separate threads, one per device module (adapter).
* This method also initializes allowed values for core properties, based
* on the collection of loaded devices.
*/
void CMMCore::initializeAllDevicesParallel() throw (CMMError)
{
std::vector<std::string> devices = deviceManager_->GetDeviceList();
LOG_INFO(coreLogger_) << "Will initialize " << devices.size() << " devices";

std::map<std::shared_ptr<LoadedDeviceAdapter>, std::vector<std::pair<std::shared_ptr<DeviceInstance>, std::string>>> moduleMap;
std::vector<std::shared_ptr<DeviceInstance>> ports;

// first round, collect all DeviceAdapters
for (size_t i = 0; i < devices.size(); i++)
{
std::shared_ptr<DeviceInstance> pDevice;
try {
pDevice = deviceManager_->GetDevice(devices[i]);
}
catch (CMMError& err) {
logError(devices[i].c_str(), err.getMsg().c_str());
throw;
}
if (pDevice->GetType() == MM::SerialDevice)
{
ports.push_back(pDevice);
}
else {
std::shared_ptr<LoadedDeviceAdapter> pAdapter;
pAdapter = pDevice->GetAdapterModule();

if (moduleMap.find(pAdapter) == moduleMap.end())
{
std::vector<std::pair<std::shared_ptr<DeviceInstance>, std::string>> pDevices;
pDevices.push_back(make_pair(pDevice, devices[i]));
moduleMap.insert({ pAdapter, pDevices });
}
else
{
moduleMap.find(pAdapter)->second.push_back(make_pair(pDevice, devices[i]));
}
}
}

// Initialize ports first. This should be fast, so no need to go parallel (also could not hurt really)
for (std::shared_ptr<DeviceInstance> pPort : ports)
{
mm::DeviceModuleLockGuard guard(pPort);
LOG_INFO(coreLogger_) << "Will initialize device " << pPort->GetLabel();
pPort->Initialize();
LOG_INFO(coreLogger_) << "Did initialize device " << pPort->GetLabel();
}

// second round, spin up threads to initialize non-port devices, one thread per module
std::vector<std::future<int>> futures;
std::map<std::shared_ptr<LoadedDeviceAdapter>, std::vector<std::pair<std::shared_ptr<DeviceInstance>, std::string>>>::iterator it;
for (it = moduleMap.begin(); it != moduleMap.end(); it++)
{
auto f = std::async(std::launch::async, &CMMCore::initializeVectorOfDevices, this, it->second);
futures.push_back(std::move(f));
}
for (int i = 0; i < futures.size(); i++) {
// Note: we could do a 'f.wait_for(std::chrono::seconds(20)' to wait up to 20 seconds before giving up
// which would avoid hanging with devices that hang in their initialize function
try
{
futures[i].get();
}
catch (...)
{
std::exception_ptr pex = std::current_exception();
// The std::future returned by std::async is special and its destructor blocks until the future completes.
// This is okay if there are 0 or 1 errors total(the successful initializations run to completion and the exception is propagated).
// When there are 2 or more errors, however, the second exception would be thrown in the destructor of the future,
// and throwing anything in a destructor is very bad(might terminate by default).
for (int j = i + 1; j < futures.size(); j++)
{
try
{
futures[j].get();
}
catch (std::exception exj) {
// ignore these exceptions;
}
}
// Rethrow the first exception
std::rethrow_exception(pex);
}
}

// assign default roles syncronously
for (it = moduleMap.begin(); it != moduleMap.end(); it++)
{
std::vector<std::pair<std::shared_ptr<DeviceInstance>, std::string>> pDevices = it->second;
for (int i = 0; i < pDevices.size(); i++)
{
assignDefaultRole(pDevices[i].first);
}
}
LOG_INFO(coreLogger_) << "Finished initializing " << devices.size() << " devices";

updateCoreProperties();
// not sure if this cleanup is needed, but should not hurt:
moduleMap.clear();
ports.clear();
}


/**
* This helper function is executed by a single thread, allowing initializeAllDevices to operate multi-threaded.
* All devices are supposed to originate from the same device adapter
*/
int CMMCore::initializeVectorOfDevices(std::vector<std::pair<std::shared_ptr<DeviceInstance>, std::string>> pDevices) {
for (int i = 0; i < pDevices.size(); i++) {
std::shared_ptr<DeviceInstance> pDevice = pDevices[i].first;

mm::DeviceModuleLockGuard guard(pDevice);
LOG_INFO(coreLogger_) << "Will initialize device " << pDevices[i].second;
pDevice->Initialize();
LOG_INFO(coreLogger_) << "Did initialize device " << pDevices[i].second;
}
return DEVICE_OK;
}

/**
* Updates CoreProperties (currently all Core properties are
* devices types) with the loaded hardware.
Expand Down
3 changes: 3 additions & 0 deletions MMCore/MMCore.h
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,9 @@ class CMMCore
void assignDefaultRole(std::shared_ptr<DeviceInstance> pDev);
void updateCoreProperty(const char* propName, MM::DeviceType devType) throw (CMMError);
void loadSystemConfigurationImpl(const char* fileName) throw (CMMError);
void initializeAllDevicesSerial() throw (CMMError);
void initializeAllDevicesParallel() throw (CMMError);
int initializeVectorOfDevices(std::vector<std::pair<std::shared_ptr<DeviceInstance>, std::string>> pDevices);
};

#if defined(__GNUC__) && !defined(__clang__)
Expand Down
Loading