Skip to content

Commit

Permalink
ThreadX integration
Browse files Browse the repository at this point in the history
  • Loading branch information
QuangHaiNguyen committed Apr 19, 2024
1 parent 1705a0a commit bbfe5bf
Show file tree
Hide file tree
Showing 7 changed files with 1,159 additions and 130 deletions.
6 changes: 5 additions & 1 deletion easy_embedded/service/task_worker/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ set(FRAMEWORK_ROOT_DIR ${CMAKE_SOURCE_DIR}/easy_embedded)
# Source files ---------------------------------------------------------------
target_sources(ez_task_worker_lib
PRIVATE
$<$<BOOL:${ENABLE_THREADX}>:threadx_port/ez_threadx_port.c>
ez_task_worker.c
)

Expand All @@ -26,15 +27,17 @@ target_sources(ez_task_worker_lib
target_compile_definitions(ez_task_worker_lib
PUBLIC
EZ_TASK_WORKER_ENABLE=$<BOOL:${ENABLE_EZ_TASK_WORKER}>
EZ_THREADX_PORT_ENABLE=$<BOOL:${ENABLE_THREADX}>
)


# Include directory -----------------------------------------------------------
target_include_directories(ez_task_worker_lib
PUBLIC
${CMAKE_CURRENT_LIST_DIR}
$<$<BOOL:${ENABLE_THREADX}>:${CMAKE_CURRENT_LIST_DIR}/threadx_port>
PRIVATE
# Please add private folders here
#
INTERFACE
# Please add interface folders here
)
Expand All @@ -46,6 +49,7 @@ target_link_libraries(ez_task_worker_lib
# Please add public libraries
PRIVATE
ez_utilities_lib
$<$<BOOL:${ENABLE_THREADX}>:threadx>
INTERFACE
# Please add interface libraries
)
Expand Down
221 changes: 172 additions & 49 deletions easy_embedded/service/task_worker/ez_task_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include "ez_utilities_common.h"
#include "ez_task_worker.h"

/*the rest of include go here*/

/*****************************************************************************
* Component Preprocessor Macros
Expand Down Expand Up @@ -65,33 +64,60 @@ struct ezTaskBlockCommon
* Component Variable Definitions
*****************************************************************************/
static struct Node worker_list = EZ_LINKEDLIST_INIT_NODE(worker_list);

static struct ezTaskWorkerThreadInterfaces *rtos_interfaces = NULL;

/*****************************************************************************
* Function Definitions
*****************************************************************************/
/* None */

/*****************************************************************************
* Function: ezTaskWorker_CheckRtosInterfaceSanity
*//**
* @brief This function check if all of the function pointers are set accordngly
*
* @details
*
* @param[in] rtos_interfaces: pointer to the rtos interface
* @return true if function pointers are set, else false
*
* @pre None
* @post None
*
* \b Example
* @code
* @endcode
*
* @see
*
*****************************************************************************/
static bool ezTaskWorker_CheckRtosInterfaceSanity(struct ezTaskWorkerThreadInterfaces *rtos_interfaces);

/*****************************************************************************
* Public functions
*****************************************************************************/
bool ezTaskWorker_InitializeWorker(struct ezTaskWorker *worker,
uint8_t *queue_buffer,
uint32_t queue_buffer_size)
bool ezTaskWorker_CreateWorker(struct ezTaskWorker *worker,
uint8_t *queue_buffer,
uint32_t queue_buffer_size,
void *thread_func)
{
bool bRet = false;
ezSTATUS status = ezFAIL;
EZTRACE("ezTaskWorker_InitializeWorker()");
EZTRACE("ezTaskWorker_CreateWorker()");

if((worker != NULL)
&& (queue_buffer != NULL)
&& (queue_buffer_size > 0))
&& (rtos_interfaces != NULL))
{
status = ezQueue_CreateQueue(&worker->msg_queue, queue_buffer, queue_buffer_size);
if(status == ezSUCCESS)
{
#if (EZ_THREADX_PORT_ENABLE == 1)
bRet = rtos_interfaces->create_event(worker);
bRet &= rtos_interfaces->create_semaphore(worker);
bRet &= rtos_interfaces->create_thread(worker, thread_func);
#else
ezLinkedList_InitNode(&worker->node);
bRet = EZ_LINKEDLIST_ADD_TAIL(&worker_list, &worker->node);
bRet &= EZ_LINKEDLIST_ADD_TAIL(&worker_list, &worker->node);
#endif
}
}
else
Expand All @@ -102,14 +128,27 @@ bool ezTaskWorker_InitializeWorker(struct ezTaskWorker *worker,
return bRet;
}

bool ezTaskWorker_SetRtosInterface(struct ezTaskWorkerThreadInterfaces *interfaces)
{
bool ret = false;
EZTRACE("ezTaskWorker_SetRtosInterface()");
if(interfaces != NULL)
{
rtos_interfaces = interfaces;
ret = ezTaskWorker_CheckRtosInterfaceSanity(interfaces);
}
return ret;
}


bool ezTaskWorker_EnqueueTask(struct ezTaskWorker *worker,
ezTaskWorkerTaskFunc task,
ezTaskWorkerCallbackFunc callback,
void *context,
uint32_t context_size)
uint32_t context_size,
uint32_t ticks_to_wait)
{
bool bRet = false;
bool ret = false;
void *buff = NULL;
ezTaskBlock_t task_block = NULL;
ezSTATUS status = ezFAIL;
Expand All @@ -118,51 +157,66 @@ bool ezTaskWorker_EnqueueTask(struct ezTaskWorker *worker,

if((worker != NULL) && (task != NULL) && (callback != NULL))
{
/**The idea to store common data and context data is we reserve a buffer
* with the size = common size + context size. Then, the buffer is convert to
* ezTaskBlockCommon to store common data. After that it is offseted to the
* address = common address + sizeof(ezTaskBlockCommon). Then the context
* is copied to that address.
*/
task_block = (ezTaskBlock_t)ezQueue_ReserveElement(&worker->msg_queue,
&buff,
sizeof(struct ezTaskBlockCommon) + context_size);
if(task_block != NULL && buff != NULL)
#if (EZ_THREADX_PORT_ENABLE == 1)
ret = rtos_interfaces->take_semaphore(worker, ticks_to_wait);
#else
ret = true;
#endif
if(ret == true)
{
/* Set common data */
((struct ezTaskBlockCommon*)buff)->callback = callback;
((struct ezTaskBlockCommon*)buff)->task = task;

/* Offset the pointer */
buff += sizeof(struct ezTaskBlockCommon);

/* Copy context data */
memcpy(buff, context, context_size);

status = ezQueue_PushReservedElement(&worker->msg_queue,
(ezReservedElement)task_block);
if(status == ezSUCCESS)
{
bRet = true;
EZINFO("Add new task to queue");
}
else
/**The idea to store common data and context data is we reserve a buffer
* with the size = common size + context size. Then, the buffer is convert to
* ezTaskBlockCommon to store common data. After that it is offseted to the
* address = common address + sizeof(ezTaskBlockCommon). Then the context
* is copied to that address.
*/
task_block = (ezTaskBlock_t)ezQueue_ReserveElement(&worker->msg_queue,
&buff,
sizeof(struct ezTaskBlockCommon) + context_size);
if((task_block != NULL) && (buff != NULL))
{
ezQueue_ReleaseReservedElement(&worker->msg_queue,
(ezReservedElement)task_block);
EZERROR("Cannot push task to queue");
/* Set common data */
((struct ezTaskBlockCommon*)buff)->callback = callback;
((struct ezTaskBlockCommon*)buff)->task = task;

/* Offset the pointer */
buff += sizeof(struct ezTaskBlockCommon);

/* Copy context data */
memcpy(buff, context, context_size);
status = ezQueue_PushReservedElement(&worker->msg_queue,
(ezReservedElement)task_block);
if(status == ezSUCCESS)
{
ret = true;
#if (EZ_THREADX_PORT_ENABLE == 1)
rtos_interfaces->set_events(worker, EZ_EVENT_TASK_AVAIL);
#endif
EZINFO("Add new task to %s",worker->worker_name);
}
else
{
ret = false;
ezQueue_ReleaseReservedElement(&worker->msg_queue,
(ezReservedElement)task_block);
EZERROR("Cannot add task to %s",worker->worker_name);
}
}
#if (EZ_THREADX_PORT_ENABLE == 1)
ret &= rtos_interfaces->give_semaphore(worker);
#endif
}
else
{
EZERROR("Do not have enough memory in the queue");
}
}

if(ret == false)
{
EZERROR("Enqueue task error");
}

return bRet;
return ret;
}

void ezTaskWorker_Run(void)
void ezTaskWorker_ExecuteTaskNoRTOS(void)
{
struct Node *it = NULL;
struct ezTaskBlockCommon *common = NULL;
Expand Down Expand Up @@ -190,9 +244,78 @@ void ezTaskWorker_Run(void)
}
}

#if (EZ_THREADX_PORT_ENABLE == 1)
void ezTaskWorker_ExecuteTask(struct ezTaskWorker *worker, uint32_t ticks_to_wait)
{
bool ret = false;
ezSTATUS status = ezFAIL;
void *context = NULL;
struct ezTaskBlockCommon *common = NULL;
uint32_t data_size = 0;

if(worker != NULL)
{
EZTRACE("ezTaskWorker_ExecuteTask(woker = %s)", worker->worker_name);
ret = rtos_interfaces->get_events(worker, EZ_EVENT_TASK_AVAIL, ticks_to_wait);

if(ret == false)
{
EZERROR("Get event failed");
}
else
{
EZINFO("Receive EZ_EVENT_TASK_AVAIL");
ret = rtos_interfaces->take_semaphore(worker, ticks_to_wait);
}

if(ret == false)
{
EZERROR("Take semaphore failed");
}
else
{
status = ezQueue_GetFront(&worker->msg_queue, (void**)&common, &data_size);

if((status == ezSUCCESS) && (common->task != NULL))
{
context = common;
context += sizeof(struct ezTaskBlockCommon);
common->task(context, common->callback);
}
else
{
ret = false;
EZERROR("Get data from queue failed");
}

status = ezQueue_PopFront(&worker->msg_queue);
rtos_interfaces->give_semaphore(worker);
}
}
}
#endif

/*****************************************************************************
* Local functions
*****************************************************************************/
static bool ezTaskWorker_CheckRtosInterfaceSanity(struct ezTaskWorkerThreadInterfaces *rtos_interfaces)
{
bool ret = false;
EZTRACE("ezTaskWorker_CheckRtosInterfaceSanity()");

if(rtos_interfaces != NULL)
{
ret = (rtos_interfaces->create_thread != NULL);
ret &= (rtos_interfaces->create_semaphore != NULL);
ret &= (rtos_interfaces->give_semaphore != NULL);
ret &= (rtos_interfaces->take_semaphore != NULL);
ret &= (rtos_interfaces->create_event != NULL);
ret &= (rtos_interfaces->set_events != NULL);
ret &= (rtos_interfaces->get_events != NULL);
}

return ret;
}

#endif /* EZ_TASK_WORKER_ENABLE == 1 */
/* End of file*/
Loading

0 comments on commit bbfe5bf

Please sign in to comment.