Skip to content

Commit

Permalink
Launch message handler thread further up stack
Browse files Browse the repository at this point in the history
This ensures message handlers for both Bin mon and LS mons are launched in a new thread to overcome NNG stack limits
  • Loading branch information
firthm01 committed May 13, 2024
1 parent 77184d2 commit 942ba7b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "communication/monitoring_metadata_receiver.hpp"
#include "scene_store.pb.h"
#include <functional>
#include <future>

namespace ear {
namespace plugin {
Expand Down Expand Up @@ -62,7 +63,12 @@ void MonitoringMetadataReceiver::handleReceive(std::error_code ec,
if (!sceneStore.ParseFromArray(message.data(), message.size())) {
throw std::runtime_error("Failed to parse Scene Object");
}
handler_(std::move(sceneStore));
// Called by NNG callback on thread with small stack.
// Launch task in another thread to overcome stack limitation.
auto future = std::async(std::launch::async, [this, sceneStore]() {
handler_(sceneStore);
});
future.get();
} catch (const std::runtime_error& e) {
EAR_LOGGER_ERROR(
logger_, "Failed to parse and dispatch scene metadata: {}", e.what());
Expand Down
54 changes: 22 additions & 32 deletions ear-production-suite-plugins/lib/src/scene_gains_calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
#include "ear/metadata.hpp"
#include "helper/eps_to_ear_metadata_converter.hpp"
#include "helper/container_helpers.hpp"
#include <future>
#include <algorithm>


namespace {

int inputCount(ear::plugin::ItemGains const& itemGains) {
Expand Down Expand Up @@ -54,40 +52,32 @@ SceneGainsCalculator::SceneGainsCalculator(ear::Layout outputLayout,
totalInputChannels{inputChannelCount} {}

bool SceneGainsCalculator::update(proto::SceneStore store) {
// Called by NNG callback on thread with small stack.
// Launch task in another thread to overcome stack limitation.
auto future = std::async(std::launch::async, [this, store]() {

// First figure out what we need to process updates for
std::vector<communication::ConnectionId> cachedIdsChecklist;
cachedIdsChecklist.reserve(routingCache_.size());
for(auto const&[key, val] : routingCache_) {
cachedIdsChecklist.push_back(key);
}
/// Check-off found items, and also delete changed items from routing cache to be re-evaluated
for(const auto& item : store.monitoring_items()) {
auto itemId = communication::ConnectionId{ item.connection_id() };
cachedIdsChecklist.erase(std::remove(cachedIdsChecklist.begin(), cachedIdsChecklist.end(), itemId), cachedIdsChecklist.end());
if(item.changed()) {
removeItem(itemId);
}
}
/// Delete removed items from routing cache (i.e, those that weren't checked-off and therefore remain in cachedIdsChecklist)
for(const auto& itemId : cachedIdsChecklist) {
// First figure out what we need to process updates for
std::vector<communication::ConnectionId> cachedIdsChecklist;
cachedIdsChecklist.reserve(routingCache_.size());
for(auto const&[key, val] : routingCache_) {
cachedIdsChecklist.push_back(key);
}
/// Check-off found items, and also delete changed items from routing cache to be re-evaluated
for(const auto& item : store.monitoring_items()) {
auto itemId = communication::ConnectionId{ item.connection_id() };
cachedIdsChecklist.erase(std::remove(cachedIdsChecklist.begin(), cachedIdsChecklist.end(), itemId), cachedIdsChecklist.end());
if(item.changed()) {
removeItem(itemId);
}
}
/// Delete removed items from routing cache (i.e, those that weren't checked-off and therefore remain in cachedIdsChecklist)
for(const auto& itemId : cachedIdsChecklist) {
removeItem(itemId);
}

// Now get the gain updates we need
for(const auto& item : store.monitoring_items()) {
/// If it's not in routingCache_, it's new or changed, so needs re-evaluating
if(!mapHasKey(routingCache_, communication::ConnectionId{ item.connection_id() })) {
addOrUpdateItem(item);
}
// Now get the gain updates we need
for(const auto& item : store.monitoring_items()) {
/// If it's not in routingCache_, it's new or changed, so needs re-evaluating
if(!mapHasKey(routingCache_, communication::ConnectionId{ item.connection_id() })) {
addOrUpdateItem(item);
}

});

future.get();
}

return true;
}
Expand Down

0 comments on commit 942ba7b

Please sign in to comment.