Skip to content

Commit

Permalink
Merge pull request #61 from Sense-Scape/60-bug-convert-circular-buffe…
Browse files Browse the repository at this point in the history
…r-to-queue

60 bug convert circular buffer to queue
  • Loading branch information
Grabt234 authored Aug 18, 2024
2 parents b44a46b + 5891570 commit fb06901
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 129 deletions.
8 changes: 4 additions & 4 deletions include/BaseModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
#include <queue>
#include <vector>
#include <chrono>
#include <queue>

/* Custom Includes */
#include "BaseChunk.h"
#include "ChunkTypesNamingUtility.h"
#include "CircularBuffer.h"
#include <plog/Log.h>
#include "plog/Initializers/RollingFileInitializer.h"
#include "QueueLengthChunk.h"
Expand Down Expand Up @@ -73,7 +73,7 @@ class BaseModule
* @return True if message was sucessfully inserted into the buffer
* @return False if message was unsucessfully inserted into the buffer
*/
bool TakeChunkFromModule(std::shared_ptr<BaseChunk> pBaseChunk);
bool TakeChunkFromModule(const std::shared_ptr<BaseChunk> &pBaseChunk);

/*
* @brief tacks time between consecutive chunk passes
Expand Down Expand Up @@ -118,7 +118,7 @@ class BaseModule

protected:
std::condition_variable m_cvDataInBuffer; ///< Conditional variable to control data in circulat buffer
CircularBuffer<std::shared_ptr<BaseChunk>> m_cbBaseChunkBuffer; ///< Input buffer of module
std::queue<std::shared_ptr<BaseChunk>> m_cbBaseChunkBuffer; ///< Input buffer of module
std::shared_ptr<BaseModule> m_pNextModule; ///< Shared pointer to next module into which messages are passed
std::atomic<bool> m_bShutDown; ///< Whether to try continuously process
std::mutex m_BufferStateMutex; ///< Mutex to facilitate multi module buffer size checking
Expand All @@ -143,7 +143,7 @@ class BaseModule
* @return True if message was sucessfully inserted into queue
* @return False if message was unsucessfully inserted into queue
*/
bool TryPassChunk(std::shared_ptr<BaseChunk> pBaseChunk);
bool TryPassChunk(const std::shared_ptr<BaseChunk> &pBaseChunk);

/**
* @brief Tries to extract a message from the input buffer
Expand Down
118 changes: 0 additions & 118 deletions include/CircularBuffer.h

This file was deleted.

20 changes: 13 additions & 7 deletions source/BaseModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
BaseModule::BaseModule(unsigned uMaxInputBufferSize) :
m_uMaxInputBufferSize(uMaxInputBufferSize),
m_cvDataInBuffer(),
m_cbBaseChunkBuffer(uMaxInputBufferSize),
m_cbBaseChunkBuffer(),
m_pNextModule(nullptr),
m_bShutDown()
{
Expand Down Expand Up @@ -44,8 +44,13 @@ void BaseModule::ContinuouslyTryProcess()
void BaseModule::StartProcessing()
{
if (!m_thread.joinable())
{
m_thread = std::thread([this]()
{ ContinuouslyTryProcess(); });

std::string strInfo = std::string(__FUNCTION__) + " " + GetModuleType() + ": Processing thread started";
PLOG_WARNING << strInfo;
}
else
{
// Log warning
Expand All @@ -66,7 +71,7 @@ void BaseModule::SetNextModule(std::shared_ptr<BaseModule> pNextModule)
m_pNextModule = pNextModule;
}

bool BaseModule::TryPassChunk(std::shared_ptr<BaseChunk> pBaseChunk)
bool BaseModule::TryPassChunk(const std::shared_ptr<BaseChunk> &pBaseChunk)
{
// Lets first check we are doing timing debugging
if (m_bTrackProcessTime)
Expand All @@ -83,14 +88,14 @@ bool BaseModule::TryPassChunk(std::shared_ptr<BaseChunk> pBaseChunk)
bool bReturnSuccessful = false;

if (m_pNextModule != nullptr)
bReturnSuccessful = m_pNextModule->TakeChunkFromModule(pBaseChunk);
bReturnSuccessful = m_pNextModule->TakeChunkFromModule(std::move(pBaseChunk));
else if (m_bTestMode)
m_pTestChunkOutput = pBaseChunk;

return bReturnSuccessful;
}

bool BaseModule::TakeChunkFromModule(std::shared_ptr<BaseChunk> pBaseChunk)
bool BaseModule::TakeChunkFromModule(const std::shared_ptr<BaseChunk> &pBaseChunk)
{
// Maintain lock to prevent another module trying to access buffer
// in the multi input buffer configuration
Expand All @@ -104,7 +109,7 @@ bool BaseModule::TakeChunkFromModule(std::shared_ptr<BaseChunk> pBaseChunk)
// Check if next module queue is full
if (bBufferHasSpace)
{
m_cbBaseChunkBuffer.put(pBaseChunk);
m_cbBaseChunkBuffer.push(std::move(pBaseChunk));

m_bAlreadyLoggedBufferFull = false;
bChunkPassed = true;
Expand All @@ -121,15 +126,16 @@ bool BaseModule::TakeChunkFromModule(std::shared_ptr<BaseChunk> pBaseChunk)
return bChunkPassed;
}

bool BaseModule::TakeFromBuffer(std::shared_ptr<BaseChunk>& pBaseChunk)
bool BaseModule::TakeFromBuffer(std::shared_ptr<BaseChunk> &pBaseChunk)
{
// Maintain lock to prevent another module trying to access buffer
// in the multi input buffer configuration
std::lock_guard<std::mutex> BufferStateLock(m_BufferStateMutex);

if (!m_cbBaseChunkBuffer.empty())
{
pBaseChunk = m_cbBaseChunkBuffer.get();
pBaseChunk = std::move(m_cbBaseChunkBuffer.front());
m_cbBaseChunkBuffer.pop();
return true;
}

Expand Down

0 comments on commit fb06901

Please sign in to comment.