Skip to content

Commit

Permalink
WIP: modify conditional var usage.
Browse files Browse the repository at this point in the history
Signed-off-by: Kasiewicz, Marek <[email protected]>
  • Loading branch information
Sakoram committed Dec 2, 2024
1 parent 474ed40 commit af9fc64
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 66 deletions.
23 changes: 17 additions & 6 deletions app/sample/ext_frame/rx_st20_pipeline_dyn_ext_frame_sample.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ struct rx_st20p_sample_ctx {
int idx;
st20p_rx_handle handle;

bool stop;
volatile bool stop;
pthread_t frame_thread;

int fb_recv;
Expand All @@ -25,12 +25,14 @@ struct rx_st20p_sample_ctx {
struct st20_ext_frame* ext_frames;
int ext_idx;
int fb_cnt;
bool frame_available;
};

static int rx_st20p_frame_available(void* priv) {
struct rx_st20p_sample_ctx* s = priv;

st_pthread_mutex_lock(&s->wake_mutex);
s->frame_available = true;
st_pthread_cond_signal(&s->wake_cond);
st_pthread_mutex_unlock(&s->wake_mutex);

Expand Down Expand Up @@ -119,18 +121,26 @@ static void rx_st20p_consume_frame(struct rx_st20p_sample_ctx* s,
s->fb_recv++;
}

static void wait_frame_available(struct rx_st20p_sample_ctx* s) {
st_pthread_mutex_lock(&s->wake_mutex);
while (!s->frame_available && !s->stop) {
st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex);
}
s->frame_available = false;
st_pthread_mutex_unlock(&s->wake_mutex);
}

static void* rx_st20p_frame_thread(void* arg) {
struct rx_st20p_sample_ctx* s = arg;
st20p_rx_handle handle = s->handle;
struct st_frame* frame;

info("%s(%d), start\n", __func__, s->idx);
while (!s->stop) {
/* get available frame */
frame = st20p_rx_get_frame(handle);
if (!frame) { /* no frame */
st_pthread_mutex_lock(&s->wake_mutex);
if (!s->stop) st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex);
st_pthread_mutex_unlock(&s->wake_mutex);
if (!frame) {
wait_frame_available(s);
continue;
}
rx_st20p_consume_frame(s, frame);
Expand Down Expand Up @@ -177,6 +187,7 @@ int main(int argc, char** argv) {
st_pthread_cond_init(&app[i]->wake_cond, NULL);
app[i]->dst_fd = -1;
app[i]->fb_cnt = ctx.framebuff_cnt;
app[i]->frame_available = false;

struct st20p_rx_ops ops_rx;
memset(&ops_rx, 0, sizeof(ops_rx));
Expand Down Expand Up @@ -255,8 +266,8 @@ int main(int argc, char** argv) {

// stop app thread
for (int i = 0; i < session_num; i++) {
app[i]->stop = true;
st_pthread_mutex_lock(&app[i]->wake_mutex);
app[i]->stop = true;
st_pthread_cond_signal(&app[i]->wake_cond);
st_pthread_mutex_unlock(&app[i]->wake_mutex);
pthread_join(app[i]->frame_thread, NULL);
Expand Down
20 changes: 15 additions & 5 deletions app/sample/ext_frame/tx_st20_pipeline_ext_frame_sample.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ struct tx_st20p_sample_ctx {
int idx;
st20p_tx_handle handle;

bool stop;
volatile bool stop;
pthread_t frame_thread;

int fb_send;
Expand All @@ -23,6 +23,7 @@ struct tx_st20p_sample_ctx {
uint8_t* frame_cursor;

mtl_dma_mem_handle dma_mem;
bool frame_available;
};

static int tx_st20p_close_source(struct tx_st20p_sample_ctx* s) {
Expand Down Expand Up @@ -111,6 +112,7 @@ static int tx_st20p_frame_available(void* priv) {
struct tx_st20p_sample_ctx* s = priv;

st_pthread_mutex_lock(&s->wake_mutex);
s->frame_available = true;
st_pthread_cond_signal(&s->wake_cond);
st_pthread_mutex_unlock(&s->wake_mutex);

Expand All @@ -127,6 +129,15 @@ static int tx_st20p_frame_done(void* priv, struct st_frame* frame) {
return 0;
}

static void wait_frame_available(struct tx_st20p_sample_ctx* s) {
st_pthread_mutex_lock(&s->wake_mutex);
while (!s->frame_available && !s->stop) {
st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex);
}
s->frame_available = false;
st_pthread_mutex_unlock(&s->wake_mutex);
}

static void* tx_st20p_frame_thread(void* arg) {
struct tx_st20p_sample_ctx* s = arg;
st20p_tx_handle handle = s->handle;
Expand All @@ -135,10 +146,8 @@ static void* tx_st20p_frame_thread(void* arg) {
info("%s(%d), start\n", __func__, s->idx);
while (!s->stop) {
frame = st20p_tx_get_frame(handle);
if (!frame) { /* no frame */
st_pthread_mutex_lock(&s->wake_mutex);
if (!s->stop) st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex);
st_pthread_mutex_unlock(&s->wake_mutex);
if (!frame) {
wait_frame_available(s);
continue;
}
struct st_ext_frame ext_frame;
Expand Down Expand Up @@ -205,6 +214,7 @@ int main(int argc, char** argv) {
app[i]->stop = false;
st_pthread_mutex_init(&app[i]->wake_mutex, NULL);
st_pthread_cond_init(&app[i]->wake_cond, NULL);
app[i]->frame_available = false;

struct st20p_tx_ops ops_tx;
memset(&ops_tx, 0, sizeof(ops_tx));
Expand Down
25 changes: 17 additions & 8 deletions app/sample/fwd/rx_st20p_tx_st20p_downsample_fwd.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ struct rx_st20p_tx_st20p_sample_ctx {
st20p_rx_handle rx_handle;
st20p_tx_handle tx_handle;

bool stop;
volatile bool stop;
bool ready;
pthread_t fwd_thread;

Expand All @@ -19,6 +19,7 @@ struct rx_st20p_tx_st20p_sample_ctx {
pthread_mutex_t wake_mutex;

struct st20_pgroup st20_pg;
bool frame_available;
};

static int tx_st20p_frame_available(void* priv) {
Expand All @@ -27,6 +28,7 @@ static int tx_st20p_frame_available(void* priv) {
if (!s->ready) return -EIO;

st_pthread_mutex_lock(&s->wake_mutex);
s->frame_available = true;
st_pthread_cond_signal(&s->wake_cond);
st_pthread_mutex_unlock(&s->wake_mutex);

Expand All @@ -39,12 +41,22 @@ static int rx_st20p_frame_available(void* priv) {
if (!s->ready) return -EIO;

st_pthread_mutex_lock(&s->wake_mutex);
s->frame_available = true;
st_pthread_cond_signal(&s->wake_cond);
st_pthread_mutex_unlock(&s->wake_mutex);

return 0;
}

static void wait_frame_available(struct rx_st20p_tx_st20p_sample_ctx* s) {
st_pthread_mutex_lock(&s->wake_mutex);
while (!s->frame_available && !s->stop) {
st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex);
}
s->frame_available = false;
st_pthread_mutex_unlock(&s->wake_mutex);
}

static void fwd_st20_consume_frame(struct rx_st20p_tx_st20p_sample_ctx* s,
struct st_frame* frame) {
st20p_tx_handle tx_handle = s->tx_handle;
Expand All @@ -53,9 +65,7 @@ static void fwd_st20_consume_frame(struct rx_st20p_tx_st20p_sample_ctx* s,
while (!s->stop) {
tx_frame = st20p_tx_get_frame(tx_handle);
if (!tx_frame) { /* no frame */
st_pthread_mutex_lock(&s->wake_mutex);
if (!s->stop) st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex);
st_pthread_mutex_unlock(&s->wake_mutex);
wait_frame_available(s);
continue;
}

Expand All @@ -76,9 +86,7 @@ static void* st20_fwd_st20_thread(void* arg) {
while (!s->stop) {
frame = st20p_rx_get_frame(rx_handle);
if (!frame) { /* no frame */
st_pthread_mutex_lock(&s->wake_mutex);
if (!s->stop) st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex);
st_pthread_mutex_unlock(&s->wake_mutex);
wait_frame_available(s);
continue;
}

Expand Down Expand Up @@ -129,6 +137,7 @@ int main(int argc, char** argv) {
app.st = ctx.st;
st_pthread_mutex_init(&app.wake_mutex, NULL);
st_pthread_cond_init(&app.wake_cond, NULL);
app.frame_available = false;

st20_get_pgroup(ST20_FMT_YUV_422_10BIT, &app.st20_pg);

Expand Down Expand Up @@ -204,8 +213,8 @@ int main(int argc, char** argv) {
}

// stop app thread
app.stop = true;
st_pthread_mutex_lock(&app.wake_mutex);
app.stop = true;
st_pthread_cond_signal(&app.wake_cond);
st_pthread_mutex_unlock(&app.wake_mutex);
pthread_join(app.fwd_thread, NULL);
Expand Down
37 changes: 29 additions & 8 deletions app/sample/fwd/rx_st20p_tx_st20p_merge_fwd.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ struct rx_ctx {
pthread_mutex_t rx_wake_mutex;
void* app;
int fb_rcv;
bool frame_available;
};

struct merge_fwd_sample_ctx {
Expand All @@ -21,13 +22,14 @@ struct merge_fwd_sample_ctx {
size_t fb_size;

bool ready;
bool stop;
volatile bool stop;
pthread_t fwd_thread;
pthread_cond_t tx_wake_cond;
pthread_mutex_t tx_wake_mutex;

int fb_fwd;
bool sync_tmstamp;
bool frame_available;
};

static int tx_st20p_frame_available(void* priv) {
Expand All @@ -36,6 +38,7 @@ static int tx_st20p_frame_available(void* priv) {
if (!s->ready) return -EIO;

st_pthread_mutex_lock(&s->tx_wake_mutex);
s->frame_available = true;
st_pthread_cond_signal(&s->tx_wake_cond);
st_pthread_mutex_unlock(&s->tx_wake_mutex);

Expand All @@ -49,12 +52,34 @@ static int rx_st20p_frame_available(void* priv) {
if (!app->ready) return -EIO;

st_pthread_mutex_lock(&s->rx_wake_mutex);
s->frame_available = true;
st_pthread_cond_signal(&s->rx_wake_cond);
st_pthread_mutex_unlock(&s->rx_wake_mutex);

return 0;
}


static void wait_frame_rx_available(struct rx_ctx* s) {
struct merge_fwd_sample_ctx* app = s->app;

st_pthread_mutex_lock(&s->rx_wake_mutex);
while (!s->frame_available && !app->stop) {
st_pthread_cond_wait(&s->rx_wake_cond, &s->rx_wake_mutex);
}
s->frame_available = false;
st_pthread_mutex_unlock(&s->rx_wake_mutex);
}

static void wait_frame_tx_available(struct merge_fwd_sample_ctx* s) {
st_pthread_mutex_lock(&s->tx_wake_mutex);
while (!s->frame_available && !s->stop) {
st_pthread_cond_wait(&s->tx_wake_cond, &s->tx_wake_mutex);
}
s->frame_available = false;
st_pthread_mutex_unlock(&s->tx_wake_mutex);
}

static void* tx_st20p_fwd_thread(void* args) {
struct merge_fwd_sample_ctx* s = args;
st20p_tx_handle tx_handle = s->tx_handle;
Expand All @@ -69,9 +94,7 @@ static void* tx_st20p_fwd_thread(void* args) {
uint64_t tx_tmstamp = 0;
frame = st20p_tx_get_frame(tx_handle);
if (!frame) { /* no frame */
st_pthread_mutex_lock(&s->tx_wake_mutex);
if (!s->stop) st_pthread_cond_wait(&s->tx_wake_cond, &s->tx_wake_mutex);
st_pthread_mutex_unlock(&s->tx_wake_mutex);
wait_frame_tx_available(s);
continue;
}

Expand All @@ -86,9 +109,7 @@ static void* tx_st20p_fwd_thread(void* args) {
} else {
rx_frame = st20p_rx_get_frame(rx_handle);
if (!rx_frame) { /* no frame */
st_pthread_mutex_lock(&rx->rx_wake_mutex);
if (!s->stop) st_pthread_cond_wait(&rx->rx_wake_cond, &rx->rx_wake_mutex);
st_pthread_mutex_unlock(&rx->rx_wake_mutex);
wait_frame_rx_available(rx);
continue;
}
}
Expand Down Expand Up @@ -276,8 +297,8 @@ int main(int argc, char** argv) {
}

// stop fwd thread
app.stop = true;
st_pthread_mutex_lock(&app.tx_wake_mutex);
app.stop = true;
st_pthread_cond_signal(&app.tx_wake_cond);
st_pthread_mutex_unlock(&app.tx_wake_mutex);
for (int i = 0; i < 4; i++) {
Expand Down
Loading

0 comments on commit af9fc64

Please sign in to comment.