Skip to content

Commit

Permalink
#8: replace busy polling with pthread_cond_wait
Browse files Browse the repository at this point in the history
  • Loading branch information
ballle98 committed Aug 7, 2023
1 parent 796bb3e commit 22f8c28
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 42 deletions.
102 changes: 62 additions & 40 deletions aq_programmer.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ unsigned char _pgm_command = NUL;

bool _last_sent_was_cmd = false;

static pthread_mutex_t _pgm_command_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t _pgm_command_sent_cond = PTHREAD_COND_INITIALIZER;

// External view of adding to queue
void aq_send_cmd(unsigned char cmd) {
push_aq_cmd(cmd);
Expand Down Expand Up @@ -162,9 +165,12 @@ unsigned char pop_aq_cmd(struct aqualinkdata *aq_data)
if (in_programming_mode(aq_data) && ( in_ot_programming_mode(aq_data) == false && in_iaqt_programming_mode(aq_data) == false )) {
//if (aq_data->active_thread.thread_id != 0) {
if ( _pgm_command != NUL && aq_data->last_packet_type == CMD_STATUS) {
pthread_mutex_lock(&_pgm_command_mutex);
cmd = _pgm_command;
_pgm_command = NUL;
LOG(PROG_LOG, LOG_DEBUG_SERIAL, "RS SEND cmd '0x%02hhx' (programming)\n", cmd);
pthread_cond_signal(&_pgm_command_sent_cond);
pthread_mutex_unlock(&_pgm_command_mutex);
} else if (_pgm_command != NUL) {
LOG(PROG_LOG, LOG_DEBUG_SERIAL, "RS Waiting to send cmd '0x%02hhx' (programming)\n", _pgm_command);
} else {
Expand Down Expand Up @@ -1071,39 +1077,33 @@ void _aq_programmer(program_type r_type, char *args, struct aqualinkdata *aq_dat

void waitForSingleThreadOrTerminate(struct programmingThreadCtrl *threadCtrl, program_type type)
{
//static int tries = 120;
int tries = 120;
static int waitTime = 1;
int i=0;

i = 0;
while (get_aq_cmd_length() > 0 && ( i++ <= tries) ) {
LOG(PROG_LOG, LOG_DEBUG, "Thread %p (%s) sleeping, waiting command queue to empty\n", &threadCtrl->thread_id, ptypeName(type));
sleep(waitTime);
}
if (i >= tries) {
LOG(PROG_LOG, LOG_ERR, "Thread %p (%s) timeout waiting, ending\n",&threadCtrl->thread_id,ptypeName(type));
free(threadCtrl);
pthread_exit(0);
}
int ret = 0;
struct timespec max_wait;
clock_gettime(CLOCK_REALTIME, &max_wait);
max_wait.tv_sec += 30;

while ( (threadCtrl->aq_data->active_thread.thread_id != 0) && ( i++ <= tries) ) {
//LOG(PROG_LOG, LOG_DEBUG, "Thread %d sleeping, waiting for thread %d to finish\n", threadCtrl->thread_id, threadCtrl->aq_data->active_thread.thread_id);
LOG(PROG_LOG, LOG_DEBUG, "Thread %p (%s) sleeping, waiting for thread %p (%s) to finish\n",
&threadCtrl->thread_id, ptypeName(type),
threadCtrl->aq_data->active_thread.thread_id, ptypeName(threadCtrl->aq_data->active_thread.ptype));
sleep(waitTime);
}

if (i >= tries) {
//LOG(PROG_LOG, LOG_ERR, "Thread %d timeout waiting, ending\n",threadCtrl->thread_id);
LOG(PROG_LOG, LOG_ERR, "Thread (%s) %p timeout waiting for thread (%s) %p to finish\n",
ptypeName(type), &threadCtrl->thread_id, ptypeName(threadCtrl->aq_data->active_thread.ptype),
threadCtrl->aq_data->active_thread.thread_id);
free(threadCtrl);
pthread_exit(0);
}

pthread_mutex_lock(&threadCtrl->aq_data->mutex);
while (threadCtrl->aq_data->active_thread.thread_id != 0)
{
LOG(PROG_LOG, LOG_DEBUG, "Thread %d,%p (%s) sleeping, waiting for thread %d,%p (%s) to finish\n",
type, &threadCtrl->thread_id, ptypeName(type),
threadCtrl->aq_data->active_thread.ptype, threadCtrl->aq_data->active_thread.thread_id, ptypeName(threadCtrl->aq_data->active_thread.ptype));
if ((ret = pthread_cond_timedwait(&threadCtrl->aq_data->thread_finished_cond,
&threadCtrl->aq_data->mutex, &max_wait)))
{
LOG(PROG_LOG, LOG_ERR, "Thread %d,%p err %s waiting for thread %d,%p to finish\n",
type, &threadCtrl->thread_id, strerror(ret),
threadCtrl->aq_data->active_thread.ptype,
threadCtrl->aq_data->active_thread.thread_id);

if ((ret = pthread_mutex_unlock(&threadCtrl->aq_data->mutex)))
{
LOG(PROG_LOG, LOG_ERR, "waitForSingleThreadOrTerminate mutex unlock ret %s\n", strerror(ret));
}
free(threadCtrl);
pthread_exit(0);
}
}
// Clear out any messages to the UI.
threadCtrl->aq_data->last_display_message[0] = '\0';
threadCtrl->aq_data->active_thread.thread_id = &threadCtrl->thread_id;
Expand All @@ -1119,11 +1119,12 @@ void waitForSingleThreadOrTerminate(struct programmingThreadCtrl *threadCtrl, pr
threadCtrl->aq_data->active_thread.ptype,
threadCtrl->aq_data->active_thread.thread_id,
ptypeName(threadCtrl->aq_data->active_thread.ptype));
pthread_mutex_unlock(&threadCtrl->aq_data->mutex);
}

void cleanAndTerminateThread(struct programmingThreadCtrl *threadCtrl)
{
waitfor_queue2empty();
pthread_mutex_lock(&threadCtrl->aq_data->mutex);
#ifndef AQ_DEBUG
LOG(PROG_LOG, LOG_DEBUG, "Thread %d,%p (%s) finished\n",threadCtrl->aq_data->active_thread.ptype, threadCtrl->thread_id,ptypeName(threadCtrl->aq_data->active_thread.ptype));
#else
Expand All @@ -1137,10 +1138,11 @@ void cleanAndTerminateThread(struct programmingThreadCtrl *threadCtrl)
elapsed.tv_sec, elapsed.tv_nsec / 1000000L);
#endif

// Quick delay to allow for last message to be sent.
delay(500);
threadCtrl->aq_data->active_thread.thread_id = 0;
threadCtrl->aq_data->active_thread.ptype = AQP_NULL;
pthread_cond_signal(&threadCtrl->aq_data->thread_finished_cond);
pthread_mutex_unlock(&threadCtrl->aq_data->mutex);

threadCtrl->thread_id = 0;
// Force update, change display message
threadCtrl->aq_data->updated = true;
Expand Down Expand Up @@ -2203,14 +2205,34 @@ void longwaitfor_queue2empty()
_waitfor_queue2empty(true);
}

void send_cmd(unsigned char cmd)
bool send_cmd(unsigned char cmd)
{
waitfor_queue2empty();

_pgm_command = cmd;
//delay(200);
bool ret=true;
int pret = 0;
struct timespec max_wait;

clock_gettime(CLOCK_REALTIME, &max_wait);
max_wait.tv_sec += 5;

pthread_mutex_lock(&_pgm_command_mutex);
_pgm_command = cmd;
LOG(PROG_LOG, LOG_INFO, "Queue send '0x%02hhx' to controller (programming)\n", _pgm_command);
while (_pgm_command != NUL)
{
if ((pret = pthread_cond_timedwait(&_pgm_command_sent_cond,
&_pgm_command_mutex, &max_wait)))
{
LOG(PROG_LOG, LOG_ERR, "send_cmd 0x%02hhx err %s\n",
cmd, strerror(pret));
ret = false;
break;
}
}
if (ret) {
LOG(PROG_LOG, LOG_INFO, "sent '0x%02hhx' to controller\n", _pgm_command);
}
pthread_mutex_unlock(&_pgm_command_mutex);
return ret;
}
void force_queue_delete()
{
Expand Down
2 changes: 2 additions & 0 deletions aqualink.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ struct aqualinkdata
struct timespec last_active_time;
struct timespec start_active_time;
#endif
pthread_mutex_t mutex;
pthread_cond_t thread_finished_cond;
};


Expand Down
3 changes: 3 additions & 0 deletions aqualinkd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1543,6 +1543,9 @@ void main_loop()
signal(SIGQUIT, intHandler);
signal(SIGRESTART, intHandler);

pthread_mutex_init(&_aqualink_data.mutex, NULL);
pthread_cond_init(&_aqualink_data.thread_finished_cond, NULL);

if (!start_net_services(&_aqualink_data))
{
LOG(AQUA_LOG,LOG_ERR, "Can not start webserver on port %s.\n", _aqconfig_.socket_port);
Expand Down
4 changes: 2 additions & 2 deletions pda_aq_programmer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ bool set_PDA_aqualink_time(struct aqualinkdata *aq_data);


// These are from aq_programmer.c , exposed here for PDA AQ PROGRAMMER
void send_cmd(unsigned char cmd);
bool send_cmd(unsigned char cmd);
bool push_aq_cmd(unsigned char cmd);
bool waitForMessage(struct aqualinkdata *aq_data, char* message, int numMessageReceived);
void waitfor_queue2empty();
void longwaitfor_queue2empty();

//void pda_programming_thread_check(struct aqualinkdata *aq_data);

#endif // AQ_PDA_PROGRAMMER_H_
#endif // AQ_PDA_PROGRAMMER_H_

0 comments on commit 22f8c28

Please sign in to comment.