Skip to content

Commit

Permalink
Merge pull request #66 from Sense-Scape/64-feature-add-function-callb…
Browse files Browse the repository at this point in the history
…ack-map

64 feature add function callback map

and close #65
  • Loading branch information
Grabt234 authored Aug 27, 2024
2 parents db4fd32 + 828fb72 commit a61a3cc
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 121 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,3 @@ jobs:

- name: Build
run: cmake --build build

- name: Test
run: ctest --test-dir build --no-tests=error -C debug
18 changes: 0 additions & 18 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@ add_library(BaseModuleLib
${SOURCES}
)

# Test executable
add_executable(BaseModuleTest
${SOURCE_TESTS_DIR}/BaseModuleTest.cpp
)

add_subdirectory(components/Chunk_Types)
add_subdirectory(components/plog)

Expand All @@ -36,26 +31,13 @@ target_include_directories(BaseModuleLib
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/components/plog/include
)

target_include_directories(BaseModuleTest
PRIVATE ${INCLUDE_DIR}
PRIVATE ${INCLUDE_TESTS_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/components/doctest/doctest
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/components/Chunk_Types/components/Nlohmann_JSON/include
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/components/Chunk_Types/components/Nlohmann_JSON/include/nlohmann
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/components/Chunk_Types/include
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/components/plog/include
)

# Adding threads
target_link_libraries(BaseModuleLib PRIVATE pthread)
target_link_libraries(BaseModuleLib PRIVATE ${THREADS_LIBRARIES})

# Link with required libraries for the main executable
target_link_libraries(BaseModuleLib PRIVATE ChunkTypesLib doctest::doctest)

# Link with required libraries for the test executable
target_link_libraries(BaseModuleTest PRIVATE ChunkTypesLib BaseModuleLib doctest::doctest)

# Enable testing
enable_testing()

Expand Down
49 changes: 21 additions & 28 deletions include/BaseModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <vector>
#include <chrono>
#include <queue>
#include <functional>

/* Custom Includes */
#include "BaseChunk.h"
Expand Down Expand Up @@ -83,59 +84,51 @@ class BaseModule
void TrackProcessTime(bool bTrackTime, std::string sTrackerMessage);

/*
* @brief used to test the processing of a particular module
* @param[in] pBaseChunk Pointer to base chuk
*/
void TestProcess(std::shared_ptr<BaseChunk> pBaseChunk);

/*
* @brief Puts the module into a test mode where a single chunk will be processed and returned when requested
* @param[in] Boolean as to whethere the module should be in test mode
* @brief Function to start the reporting thread
*/
void SetTestMode(bool bTestModeState);
void StartReporting();

/*
* @brief If the mo
* @param[in] Boolean as to whethere the module should be in test mode
/**
* @brief Registers which function should be called when processing a chunk
* @param[in] eChunkType pointer to chunk to be processed
* @param[in] function class function (&,ClassName::FunctionName)
*/
std::shared_ptr<BaseChunk> GetTestOutput();
template <typename T>
void RegisterChunkCallbackFunction(ChunkType eChunkType,T function);

/*
* @brief Function to start the reporting thread
/**
* @brief Calls processing function for particular chunk type
* @param[in] pBaseChunk pointer to chunk to be processed
*/
void StartReporting();
void CallChunkCallbackFunction(std::shared_ptr<BaseChunk> pBaseChunk);

private:
size_t m_uMaxInputBufferSize; ///< Max size of the class input buffer

// Contolling Timing For Debugging
std::atomic<bool> m_bTrackProcessTime = false; ///< Boolean as to whether to track and log the processing time
std::string m_sTrackerMessage = ""; ///< Log message printed when logging chunk processing time
std::chrono::high_resolution_clock::time_point m_CurrentTrackingTime; ///< Initial time used to track time between consecutive chunk passes
std::chrono::high_resolution_clock::time_point m_PreviousTimeTracking; ///< Final time used to track time between consecutive chunk passes

bool m_bTestMode; ///< Boolean as to whether the module is doing doing normal processing or test processing
std::shared_ptr<BaseChunk> m_pTestChunkOutput; ///< Member used to store test outputs

protected:
// Controlling Queues
std::condition_variable m_cvDataInBuffer; ///< Conditional variable to control data in circulat buffer
std::queue<std::shared_ptr<BaseChunk>> m_cbBaseChunkBuffer; ///< Input buffer of module
std::queue<std::shared_ptr<BaseChunk>> m_cbBaseChunkBuffer; ///< Input buffer of module
std::shared_ptr<BaseModule> m_pNextModule; ///< Shared pointer to next module into which messages are passed
std::atomic<bool> m_bShutDown; ///< Whether to try continuously process
std::mutex m_BufferStateMutex; ///< Mutex to facilitate multi module buffer size checking
std::thread m_thread; ///< Thread object for module processing
bool m_bAlreadyLoggedBufferFull; ///< Boolean State machine tracking if we have logged whether queue is full

// Controlling Reporting
std::thread m_QueueSizeReportingThread;
std::chrono::high_resolution_clock::time_point m_CurrentQueueReportTime;
std::chrono::high_resolution_clock::time_point m_PreviousQueueReportTime;


/**
* @brief Returns true if a message pointer had been retrieved an passed on to next module.
* If no pointer in queue then returns false
* @param[in] pBaseChunk pointer to base chunk
*/
virtual void Process(std::shared_ptr<BaseChunk> pBaseChunk);
// Controling Function Calls
std::mutex m_FunctionCallbackMapMutex;
std::map<ChunkType,std::function<void(std::shared_ptr<BaseChunk>)>> m_FunctionCallbackMap;

/**
* @brief Passes base chunk pointer to next module
Expand All @@ -153,7 +146,7 @@ class BaseModule
*/
bool TakeFromBuffer(std::shared_ptr<BaseChunk> &pBaseChunk);

/*
/**
* @brief Calls the infinite loop to report
*/
void StartReportingLoop();
Expand Down
35 changes: 0 additions & 35 deletions include_tests/BaseModuleTests.h

This file was deleted.

60 changes: 23 additions & 37 deletions source/BaseModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ BaseModule::BaseModule(unsigned uMaxInputBufferSize) :
m_pNextModule(nullptr),
m_bShutDown()
{

}

BaseModule::~BaseModule()
Expand All @@ -20,18 +19,13 @@ BaseModule::~BaseModule()
m_thread.join();
}

void BaseModule::Process(std::shared_ptr<BaseChunk> pBaseChunk)
{
TryPassChunk(pBaseChunk);
}

void BaseModule::ContinuouslyTryProcess()
{
while (!m_bShutDown)
{
std::shared_ptr<BaseChunk> pBaseChunk;
if (TakeFromBuffer(pBaseChunk))
Process(pBaseChunk);
CallChunkCallbackFunction(pBaseChunk);
else
{
// Wait to be notified that there is data available
Expand All @@ -56,8 +50,6 @@ void BaseModule::StartProcessing()
// Log warning
std::string strWarning = std::string(__FUNCTION__) + ": Processing thread already started";
PLOG_WARNING << strWarning;
// And force a crash if in debug and not release
assert(true && m_bTestMode);
}
}

Expand Down Expand Up @@ -89,8 +81,6 @@ bool BaseModule::TryPassChunk(const std::shared_ptr<BaseChunk> &pBaseChunk)

if (m_pNextModule != nullptr)
bReturnSuccessful = m_pNextModule->TakeChunkFromModule(std::move(pBaseChunk));
else if (m_bTestMode)
m_pTestChunkOutput = pBaseChunk;

return bReturnSuccessful;
}
Expand Down Expand Up @@ -152,28 +142,6 @@ void BaseModule::TrackProcessTime(bool bTrackTime, std::string sTrackerMessage)
m_CurrentTrackingTime = std::chrono::high_resolution_clock::now();
}

void BaseModule::TestProcess(std::shared_ptr<BaseChunk> pBaseChunk)
{
Process(pBaseChunk);
}

std::shared_ptr<BaseChunk> BaseModule::GetTestOutput()
{
if (m_pTestChunkOutput != nullptr)
return std::move(m_pTestChunkOutput);
else
{
std::string strError = std::string(__FUNCTION__) + " No test chunk output to return";
PLOG_ERROR << strError;
throw;
}
}

void BaseModule::SetTestMode(bool bTestModeState)
{
m_bTestMode = bTestModeState;
}

void BaseModule::StartReporting()
{
m_QueueSizeReportingThread = std::thread([this]()
Expand All @@ -192,12 +160,30 @@ void BaseModule::StartReportingLoop()

// Then transmit
auto pQueueChunk = std::make_shared<QueueLengthChunk>(strModuleName, u16CurrentBufferSize);
auto bSuccessfullyPassed = TryPassChunk(pQueueChunk);

if (!bSuccessfullyPassed)
PLOG_WARNING << GetModuleType() + " Failed to pass chunk" ;
CallChunkCallbackFunction(pQueueChunk);

// And sleep as not to send too many
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}

template <typename T>
void BaseModule::RegisterChunkCallbackFunction(ChunkType eChunkType,T function)
{
std::unique_lock<std::mutex> BufferAccessLock(m_FunctionCallbackMapMutex);
m_FunctionCallbackMap[eChunkType] = std::bind(function, this, std::placeholders::_1);
}

void BaseModule::CallChunkCallbackFunction(std::shared_ptr<BaseChunk> pBaseChunk)
{
std::unique_lock<std::mutex> BufferAccessLock(m_FunctionCallbackMapMutex);
auto eChunkType = pBaseChunk->GetChunkType();

// Check if we have registered a function to process the chunk
if (m_FunctionCallbackMap.find(eChunkType) != m_FunctionCallbackMap.end())
m_FunctionCallbackMap[eChunkType](pBaseChunk);
else
// Otherwise pass on
TryPassChunk(pBaseChunk);

}

0 comments on commit a61a3cc

Please sign in to comment.