Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data Broker #98

Open
wants to merge 8 commits into
base: Chris/UartDriver-DISCO
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Components/Core/Inc/Command.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ enum GLOBAL_COMMANDS : uint8_t
HEARTBEAT_COMMAND, // Control actions for heartbeat commands
RADIOHB_CHANGE_PERIOD, // Change Radio HB Period to Provided TaskCommand Period in Seconds
PROTOCOL_COMMAND, // Protocol command, used for commands to the Protocol Task
TELEMETRY_CHANGE_PERIOD, // Change Telemetry Period to Provided TaskCommand Period in Milliseconds
TELEMETRY_CHANGE_PERIOD, // Change Telemetry Period to Provided TaskCommand Period in Milliseconds
DATA_BROKER_COMMAND,
};


/* Class -----------------------------------------------------------------*/

/**
Expand Down
173 changes: 173 additions & 0 deletions Components/DataBroker/Inc/DataBroker.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/**
********************************************************************************
* @file DataBroker.hpp
* @author shivam
* @date Nov 23, 2024
* @brief
********************************************************************************
*/

#ifndef DATA_BROKER_HPP_
#define DATA_BROKER_HPP_

/************************************
* INCLUDES
************************************/
#include "Publisher.hpp"
#include "SensorDataTypes.hpp"
#include "Command.hpp"
#include "DataBrokerMessageTypes.hpp"
#include "SystemDefines.hpp"
#include "Mutex.hpp"
#include <type_traits>
#include <cstring>

/************************************
* MACROS AND DEFINES
************************************/

/************************************
* TYPEDEFS
************************************/

/************************************
* CLASS DEFINITIONS
************************************/
class DataBroker {
public:
/**
* @brief Publish data of a certain type
* NOTE: You must ensure that there is a publisher for that type
*/
template <typename T>
static void Publish(T* dataToPublish) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider making a static instance of the class instead of making everything in the class static

if (subscriberListLock.Lock(SUBSCRIBER_LIST_MUTEX_TIMEOUT)) {
Publisher<T>* publisher = getPublisher<T>();
if (publisher != nullptr) {
publisher->Publish(dataToPublish);
}
else {
SOAR_ASSERT("Data Publisher not found \n");
}
subscriberListLock.Unlock();
return;
}
else {
SOAR_PRINT("Could Not Subscribe to Data Broker Publisher \n");
}
return;
}

/**
* @brief Subscribe to a certain type of data in the system
* @param taskToSubscribe Task Handle of the task that will receive
* and handle the data. (i.e. -> Subscribe<T>(this))
*/
template <typename T>
static void Subscribe(Task* taskToSubscribe) {
if (subscriberListLock.Lock(SUBSCRIBER_LIST_MUTEX_TIMEOUT)) {
Publisher<T>* publisher = getPublisher<T>();
if (publisher != nullptr) {
publisher->Subscribe(taskToSubscribe);
}
else {
SOAR_ASSERT("Data Publisher not found \n");
}
subscriberListLock.Unlock();
return;
}
else {
SOAR_PRINT("Could Not Subscribe to Data Broker Publisher \n");
}
return;
}

/**
* @brief Unsubscribe to a certain type of data in the system
* @param taskToUnsubscribe Task Handle of the task that will stop
* receiving the data. (i.e. -> Unsubscribe<T>(this))
*/
template <typename T>
static void Unsubscribe(Task* taskToUnsubscribe) {
if (subscriberListLock.Lock(SUBSCRIBER_LIST_MUTEX_TIMEOUT)) {
Publisher<T>* publisher = getPublisher<T>();
if (publisher != nullptr) {
publisher->Unsubscribe(taskToUnsubscribe);
}
else {
SOAR_ASSERT("Data Publisher not found \n");
}
subscriberListLock.Unlock();
return;
}
else {
SOAR_PRINT("Could Not Unsubscribe to Data Broker Publisher \n");
}
return;
}

template <typename T>
static constexpr T ExtractData(const Command &cm) {
if (cm.GetCommand() != DATA_BROKER_COMMAND) {
SOAR_ASSERT("Not a Data Broker Command!\n");
}

// The data allocated by this command ptr will be freed when cm.Reset()]
// is called. So we do not have to free this memory here
T* dataPtr = reinterpret_cast<T*>(cm.GetDataPointer());

T data{};

std::memcpy(&data, dataPtr, sizeof(T));

return data;
}

static DataBrokerMessageTypes getMessageType(const Command &cm) {
return static_cast<DataBrokerMessageTypes>(cm.GetTaskCommand());
}

private:
// Deleting the default constructor as this class is not
// instanceable
DataBroker() = delete;

// Deleting the copy constructor to prevent copies
DataBroker(const DataBroker& obj) = delete;

// Deleting assignment operator to prevent assignment operations
DataBroker& operator=(DataBroker const&) = delete;

// Mutex to access the Subscriber List
inline static Mutex subscriberListLock{};
// Mutex lock wait time
static constexpr uint16_t SUBSCRIBER_LIST_MUTEX_TIMEOUT = 1000;

// matcher - match template type with publisher type
template <typename T, typename U>
static constexpr bool matchType() {
return std::is_same_v<T, U>;
}

// get data publisher
template <typename T>
static constexpr auto getPublisher(void) {
if constexpr (matchType<T, IMUData>()) {
return &IMU_Data_publisher;
} else if constexpr (matchType<T, ThermocoupleData>()) {
return &Thermocouple_Data_publisher;
} else {
SOAR_ASSERT(false, "This publisher type does not exist, you must create it");
}
}

// list of publishers
inline static Publisher<IMUData> IMU_Data_publisher {DataBrokerMessageTypes::IMU_DATA};
inline static Publisher<ThermocoupleData> Thermocouple_Data_publisher {DataBrokerMessageTypes::THERMOCOUPLE_DATA};

};
/************************************
* FUNCTION DECLARATIONS
************************************/

#endif /* DATA_BROKER_HPP_ */
108 changes: 108 additions & 0 deletions Components/DataBroker/Inc/Publisher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
********************************************************************************
* @file Publisher.hpp
* @author shivam
* @date Nov 23, 2024
* @brief
********************************************************************************
*/

#ifndef PUBLISHER_HPP_
#define PUBLISHER_HPP_

/************************************
* INCLUDES
************************************/
#include <DataBrokerMessageTypes.hpp>
#include <stdint.h>
#include <array>
#include "Task.hpp"
#include "Subscriber.hpp"
#include "SystemDefines.hpp"


/************************************
* MACROS AND DEFINES
************************************/

/************************************
* TYPEDEFS
************************************/

/************************************
* CLASS DEFINITIONS
************************************/
template <typename T, uint8_t MaxSubscribers = 5>
class Publisher {
public:
// Constructor
Publisher(DataBrokerMessageTypes messageType) {
publisherMessageType = messageType;
}

// subscribe
bool Subscribe(Task* taskToSubscribe) {
// Check if subscriber already exists
for (Subscriber& subscriber : subscribersList) {
if (subscriber.getSubscriberTaskHandle() == taskToSubscribe) {
return true;
}
}

// Add the subscriber
for (Subscriber& subscriber : subscribersList) {
if (subscriber.getSubscriberTaskHandle() == nullptr) {
subscriber.Init(taskToSubscribe);
return true;
}
}

SOAR_ASSERT(true, "Failed to add subscriber\n");
return false;
}

// unsubscribe
bool Unsubscribe(Task* taskToUnsubscribe) {
for (Subscriber& subscriber : subscribersList) {
if (subscriber.getSubscriberTaskHandle() == taskToUnsubscribe) {
subscriber.Delete();
return true;
}
}

SOAR_ASSERT(true, "Subscriber not Deleted\n");
return false;
}

// publish
void Publish(T* dataToPublish) {
for (const Subscriber& subscriber : subscribersList) {
if (subscriber.getSubscriberTaskHandle() != nullptr) {
// create command
uint16_t messageType = static_cast<uint16_t>(publisherMessageType);

Command brokerData(DATA_BROKER_COMMAND, messageType);

uint8_t* messsageData = reinterpret_cast<uint8_t*>(dataToPublish);

// copy data to command
brokerData.CopyDataToCommand(messsageData, sizeof(T));

subscriber.getSubscriberQueueHandle()->Send(brokerData);
}
}
}

private:
// list of subscribers
Subscriber subscribersList[MaxSubscribers] = {};

// message type for system routing
DataBrokerMessageTypes publisherMessageType = DataBrokerMessageTypes::INVALID;
};

/************************************
* FUNCTION DECLARATIONS
************************************/

#endif /* PUBLISHER_HPP_ */
59 changes: 59 additions & 0 deletions Components/DataBroker/Inc/Subscriber.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
********************************************************************************
* @file Subscriber.hpp
* @author shiva
* @date Nov 23, 2024
* @brief
********************************************************************************
*/

#ifndef SUBSCRIBER_HPP_
#define SUBSCRIBER_HPP_

/************************************
* INCLUDES
************************************/
#include "Task.hpp"
#include "SystemDefines.hpp"

/************************************
* MACROS AND DEFINES
************************************/

/************************************
* TYPEDEFS
************************************/

/************************************
* CLASS DEFINITIONS
************************************/
class Subscriber {
public:
void Init(Task* subscriberTaskHandle) {
if (taskHandle != nullptr || taskQueue != nullptr) {
SOAR_ASSERT(false, "You cannot overwrite a subscriber");
return;
}
taskHandle = subscriberTaskHandle;
taskQueue = taskHandle->GetEventQueue();
}

void Delete() {
taskHandle = nullptr;
taskQueue = nullptr;
}

inline const Task* getSubscriberTaskHandle() const { return taskHandle; }

inline Queue* getSubscriberQueueHandle() const { return taskQueue; }

private:
Task* taskHandle = nullptr;
Queue* taskQueue = nullptr;
};

/************************************
* FUNCTION DECLARATIONS
************************************/

#endif /* SUBSCRIBER_HPP_ */
1 change: 1 addition & 0 deletions Components/FlightControl/Inc/WatchdogTask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class WatchdogTask : public Task
static void HeartbeatFailureCallback(TimerHandle_t rtTimerHandle); // Callback for timer which aborts system in case of data ghosting
void HandleCommand(Command& cm);
void HandleHeartbeat(uint16_t taskCommand); // If it receives a heartbeat then it resets the timer
void HandleDataBrokerCommand(const Command& cm);
Timer* heartbeatTimer;

private:
Expand Down
2 changes: 1 addition & 1 deletion Components/FlightControl/TelemetryTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void TelemetryTask::Run(void* pvParams)
HandleCommand(cm);

osDelay(loggingDelayMs);
RunLogSequence();
// RunLogSequence();
}
}

Expand Down
Loading