Skip to content

Commit

Permalink
app: use block mode for pipeline sample (#696)
Browse files Browse the repository at this point in the history
Signed-off-by: Frank Du <[email protected]>
  • Loading branch information
frankdjx authored Jan 10, 2024
1 parent 78d6096 commit e4f5188
Show file tree
Hide file tree
Showing 20 changed files with 148 additions and 214 deletions.
26 changes: 3 additions & 23 deletions app/sample/rx_st20_pipeline_sample.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ struct rx_st20p_sample_ctx {
pthread_t frame_thread;

int fb_recv;
pthread_cond_t wake_cond;
pthread_mutex_t wake_mutex;

size_t frame_size;
int dst_fd;
Expand All @@ -24,16 +22,6 @@ struct rx_st20p_sample_ctx {
int fb_cnt;
};

static int rx_st20p_frame_available(void* priv) {
struct rx_st20p_sample_ctx* s = priv;

st_pthread_mutex_lock(&s->wake_mutex);
st_pthread_cond_signal(&s->wake_cond);
st_pthread_mutex_unlock(&s->wake_mutex);

return 0;
}

static int rx_st20p_close_source(struct rx_st20p_sample_ctx* s) {
if (s->dst_begin) {
munmap(s->dst_begin, s->dst_end - s->dst_begin);
Expand Down Expand Up @@ -102,9 +90,7 @@ static void* rx_st20p_frame_thread(void* arg) {
while (!s->stop) {
frame = st20p_rx_get_frame(handle);
if (!frame) { /* no frame */
st_pthread_mutex_lock(&s->wake_mutex);
if (!s->stop) st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex);
st_pthread_mutex_unlock(&s->wake_mutex);
warn("%s(%d), get frame time out\n", __func__, s->idx);
continue;
}
dbg("%s(%d), one new frame\n", __func__, s->idx);
Expand Down Expand Up @@ -153,8 +139,6 @@ int main(int argc, char** argv) {
memset(app[i], 0, sizeof(struct rx_st20p_sample_ctx));
app[i]->idx = i;
app[i]->stop = false;
st_pthread_mutex_init(&app[i]->wake_mutex, NULL);
st_pthread_cond_init(&app[i]->wake_cond, NULL);
app[i]->dst_fd = -1;
app[i]->fb_cnt = ctx.framebuff_cnt;

Expand All @@ -177,7 +161,7 @@ int main(int argc, char** argv) {
ops_rx.output_fmt = ctx.output_fmt;
ops_rx.device = ST_PLUGIN_DEVICE_AUTO;
ops_rx.framebuff_cnt = app[i]->fb_cnt;
ops_rx.notify_frame_available = rx_st20p_frame_available;
ops_rx.flags = ST20P_RX_FLAG_BLOCK_GET;

st20p_rx_handle rx_handle = st20p_rx_create(ctx.st, &ops_rx);
if (!rx_handle) {
Expand Down Expand Up @@ -214,9 +198,7 @@ int main(int argc, char** argv) {
// stop app thread
for (int i = 0; i < session_num; i++) {
app[i]->stop = true;
st_pthread_mutex_lock(&app[i]->wake_mutex);
st_pthread_cond_signal(&app[i]->wake_cond);
st_pthread_mutex_unlock(&app[i]->wake_mutex);
if (app[i]->handle) st20p_rx_wake_block(app[i]->handle);
pthread_join(app[i]->frame_thread, NULL);
info("%s(%d), received frames %d\n", __func__, i, app[i]->fb_recv);

Expand All @@ -238,8 +220,6 @@ int main(int argc, char** argv) {
for (int i = 0; i < session_num; i++) {
if (app[i]) {
if (app[i]->handle) st20p_rx_free(app[i]->handle);
st_pthread_mutex_destroy(&app[i]->wake_mutex);
st_pthread_cond_destroy(&app[i]->wake_cond);
free(app[i]);
}
}
Expand Down
24 changes: 3 additions & 21 deletions app/sample/rx_st22_pipeline_sample.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ struct rx_st22p_sample_ctx {
pthread_t frame_thread;

int fb_recv;
pthread_cond_t wake_cond;
pthread_mutex_t wake_mutex;

size_t frame_size;
int dst_fd;
Expand All @@ -22,16 +20,6 @@ struct rx_st22p_sample_ctx {
uint8_t* dst_cursor;
};

static int rx_st22p_frame_available(void* priv) {
struct rx_st22p_sample_ctx* s = priv;

st_pthread_mutex_lock(&s->wake_mutex);
st_pthread_cond_signal(&s->wake_cond);
st_pthread_mutex_unlock(&s->wake_mutex);

return 0;
}

static int rx_st22p_close_source(struct rx_st22p_sample_ctx* s) {
if (s->dst_begin) {
munmap(s->dst_begin, s->dst_end - s->dst_begin);
Expand Down Expand Up @@ -107,9 +95,7 @@ static void* rx_st22p_frame_thread(void* arg) {
while (!s->stop) {
frame = st22p_rx_get_frame(handle);
if (!frame) { /* no frame */
st_pthread_mutex_lock(&s->wake_mutex);
if (!s->stop) st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex);
st_pthread_mutex_unlock(&s->wake_mutex);
warn("%s(%d), get frame time out\n", __func__, s->idx);
continue;
}
rx_st22p_consume_frame(s, frame);
Expand Down Expand Up @@ -149,8 +135,6 @@ int main(int argc, char** argv) {
memset(app[i], 0, sizeof(struct rx_st22p_sample_ctx));
app[i]->idx = i;
app[i]->stop = false;
st_pthread_mutex_init(&app[i]->wake_mutex, NULL);
st_pthread_cond_init(&app[i]->wake_cond, NULL);
app[i]->dst_fd = -1;

struct st22p_rx_ops ops_rx;
Expand All @@ -175,7 +159,7 @@ int main(int argc, char** argv) {
ops_rx.max_codestream_size = 0; /* let lib to decide */
ops_rx.framebuff_cnt = ctx.framebuff_cnt;
ops_rx.codec_thread_cnt = 2;
ops_rx.notify_frame_available = rx_st22p_frame_available;
ops_rx.flags = ST22P_RX_FLAG_BLOCK_GET;

st22p_rx_handle rx_handle = st22p_rx_create(ctx.st, &ops_rx);
if (!rx_handle) {
Expand Down Expand Up @@ -211,9 +195,7 @@ int main(int argc, char** argv) {
// stop app thread
for (int i = 0; i < session_num; i++) {
app[i]->stop = true;
st_pthread_mutex_lock(&app[i]->wake_mutex);
st_pthread_cond_signal(&app[i]->wake_cond);
st_pthread_mutex_unlock(&app[i]->wake_mutex);
if (app[i]->handle) st22p_rx_wake_block(app[i]->handle);
pthread_join(app[i]->frame_thread, NULL);
info("%s(%d), received frames %d\n", __func__, i, app[i]->fb_recv);

Expand Down
26 changes: 3 additions & 23 deletions app/sample/tx_st20_pipeline_sample.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ struct tx_st20p_sample_ctx {

int fb_send;
int fb_send_done;
pthread_cond_t wake_cond;
pthread_mutex_t wake_mutex;

size_t frame_size;
uint8_t* source_begin;
Expand Down Expand Up @@ -94,16 +92,6 @@ static int tx_st20p_open_source(struct tx_st20p_sample_ctx* s, char* file) {
return 0;
}

static int tx_st20p_frame_available(void* priv) {
struct tx_st20p_sample_ctx* s = priv;

st_pthread_mutex_lock(&s->wake_mutex);
st_pthread_cond_signal(&s->wake_cond);
st_pthread_mutex_unlock(&s->wake_mutex);

return 0;
}

static int tx_st20p_frame_done(void* priv, struct st_frame* frame) {
struct tx_st20p_sample_ctx* s = priv;
MTL_MAY_UNUSED(frame);
Expand All @@ -128,9 +116,7 @@ static void* tx_st20p_frame_thread(void* arg) {
while (!s->stop) {
frame = st20p_tx_get_frame(handle);
if (!frame) { /* no frame */
st_pthread_mutex_lock(&s->wake_mutex);
if (!s->stop) st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex);
st_pthread_mutex_unlock(&s->wake_mutex);
warn("%s(%d), get frame time out\n", __func__, s->idx);
continue;
}

Expand Down Expand Up @@ -189,8 +175,6 @@ int main(int argc, char** argv) {
snprintf(app[i]->meta.dummy, sizeof(app[i]->meta.dummy), "st20p_tx_%d", i);
app[i]->has_user_meta = true;
}
st_pthread_mutex_init(&app[i]->wake_mutex, NULL);
st_pthread_cond_init(&app[i]->wake_cond, NULL);

struct st20p_tx_ops ops_tx;
memset(&ops_tx, 0, sizeof(ops_tx));
Expand Down Expand Up @@ -218,7 +202,7 @@ int main(int argc, char** argv) {
ops_tx.transport_fmt = ctx.fmt;
ops_tx.device = ST_PLUGIN_DEVICE_AUTO;
ops_tx.framebuff_cnt = ctx.framebuff_cnt;
ops_tx.notify_frame_available = tx_st20p_frame_available;
ops_tx.flags = ST20P_TX_FLAG_BLOCK_GET;
ops_tx.notify_frame_done = tx_st20p_frame_done;

st20p_tx_handle tx_handle = st20p_tx_create(ctx.st, &ops_tx);
Expand Down Expand Up @@ -255,9 +239,7 @@ int main(int argc, char** argv) {
// stop app thread
for (int i = 0; i < session_num; i++) {
app[i]->stop = true;
st_pthread_mutex_lock(&app[i]->wake_mutex);
st_pthread_cond_signal(&app[i]->wake_cond);
st_pthread_mutex_unlock(&app[i]->wake_mutex);
if (app[i]->handle) st20p_tx_wake_block(app[i]->handle);
pthread_join(app[i]->frame_thread, NULL);
info("%s(%d), sent frames %d(done %d)\n", __func__, i, app[i]->fb_send,
app[i]->fb_send_done);
Expand All @@ -279,8 +261,6 @@ int main(int argc, char** argv) {
error:
for (int i = 0; i < session_num; i++) {
if (app[i]) {
st_pthread_mutex_destroy(&app[i]->wake_mutex);
st_pthread_cond_destroy(&app[i]->wake_cond);
if (app[i]->handle) st20p_tx_free(app[i]->handle);
free(app[i]);
}
Expand Down
26 changes: 3 additions & 23 deletions app/sample/tx_st22_pipeline_sample.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ struct tx_st22p_sample_ctx {
pthread_t frame_thread;

int fb_send;
pthread_cond_t wake_cond;
pthread_mutex_t wake_mutex;

size_t frame_size;
uint8_t* source_begin;
Expand Down Expand Up @@ -124,16 +122,6 @@ static int tx_st22p_open_source(struct st_sample_context* ctx,
return 0;
}

static int tx_st22p_frame_available(void* priv) {
struct tx_st22p_sample_ctx* s = priv;

st_pthread_mutex_lock(&s->wake_mutex);
st_pthread_cond_signal(&s->wake_cond);
st_pthread_mutex_unlock(&s->wake_mutex);

return 0;
}

static void tx_st22p_build_frame(struct tx_st22p_sample_ctx* s, struct st_frame* frame) {
if (s->frame_cursor + s->frame_size > s->source_end) {
s->frame_cursor = s->source_begin;
Expand Down Expand Up @@ -167,9 +155,7 @@ static void* tx_st22p_frame_thread(void* arg) {
while (!s->stop) {
frame = st22p_tx_get_frame(handle);
if (!frame) { /* no frame */
st_pthread_mutex_lock(&s->wake_mutex);
if (!s->stop) st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex);
st_pthread_mutex_unlock(&s->wake_mutex);
warn("%s(%d), get frame time out\n", __func__, s->idx);
continue;
}
if (s->source_begin) tx_st22p_build_frame(s, frame);
Expand Down Expand Up @@ -212,8 +198,6 @@ int main(int argc, char** argv) {
app[i]->st = ctx.st;
app[i]->idx = i;
app[i]->stop = false;
st_pthread_mutex_init(&app[i]->wake_mutex, NULL);
st_pthread_cond_init(&app[i]->wake_cond, NULL);

struct st22p_tx_ops ops_tx;
memset(&ops_tx, 0, sizeof(ops_tx));
Expand All @@ -239,7 +223,7 @@ int main(int argc, char** argv) {
ops_tx.codestream_size = ops_tx.width * ops_tx.height * bpp / 8;
if (ops_tx.interlaced) ops_tx.codestream_size /= 2;
ops_tx.framebuff_cnt = ctx.framebuff_cnt;
ops_tx.notify_frame_available = tx_st22p_frame_available;
ops_tx.flags = ST22P_TX_FLAG_BLOCK_GET;

st22p_tx_handle tx_handle = st22p_tx_create(ctx.st, &ops_tx);
if (!tx_handle) {
Expand Down Expand Up @@ -270,9 +254,7 @@ int main(int argc, char** argv) {
// stop app thread
for (int i = 0; i < session_num; i++) {
app[i]->stop = true;
st_pthread_mutex_lock(&app[i]->wake_mutex);
st_pthread_cond_signal(&app[i]->wake_cond);
st_pthread_mutex_unlock(&app[i]->wake_mutex);
if (app[i]->handle) st22p_tx_wake_block(app[i]->handle);
pthread_join(app[i]->frame_thread, NULL);
info("%s(%d), sent frames %d\n", __func__, i, app[i]->fb_send);

Expand All @@ -293,8 +275,6 @@ int main(int argc, char** argv) {
error:
for (int i = 0; i < session_num; i++) {
if (app[i]) {
st_pthread_mutex_destroy(&app[i]->wake_mutex);
st_pthread_cond_destroy(&app[i]->wake_cond);
if (app[i]->handle) st22p_tx_free(app[i]->handle);
free(app[i]);
}
Expand Down
8 changes: 0 additions & 8 deletions app/src/app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,6 @@ struct st_app_tx_st22p_session {
double expect_fps;

pthread_t st22p_app_thread;
pthread_cond_t st22p_wake_cond;
pthread_mutex_t st22p_wake_mutex;
bool st22p_app_thread_stop;
};

Expand All @@ -415,8 +413,6 @@ struct st_app_rx_st22p_session {
double expect_fps;

pthread_t st22p_app_thread;
pthread_cond_t st22p_wake_cond;
pthread_mutex_t st22p_wake_mutex;
bool st22p_app_thread_stop;

struct st_display* display;
Expand Down Expand Up @@ -448,8 +444,6 @@ struct st_app_tx_st20p_session {
double expect_fps;

pthread_t st20p_app_thread;
pthread_cond_t st20p_wake_cond;
pthread_mutex_t st20p_wake_mutex;
bool st20p_app_thread_stop;
};

Expand All @@ -473,8 +467,6 @@ struct st_app_rx_st20p_session {
double expect_fps;

pthread_t st20p_app_thread;
pthread_cond_t st20p_wake_cond;
pthread_mutex_t st20p_wake_mutex;
bool st20p_app_thread_stop;

struct st_display* display;
Expand Down
31 changes: 5 additions & 26 deletions app/src/rx_st20p_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,6 @@

#include "rx_st20p_app.h"

static int app_rx_st20p_frame_available(void* priv) {
struct st_app_rx_st20p_session* s = priv;

st_pthread_mutex_lock(&s->st20p_wake_mutex);
st_pthread_cond_signal(&s->st20p_wake_cond);
st_pthread_mutex_unlock(&s->st20p_wake_mutex);

return 0;
}

static void app_rx_st20p_consume_frame(struct st_app_rx_st20p_session* s,
struct st_frame* frame) {
struct st_display* d = s->display;
Expand Down Expand Up @@ -63,11 +53,8 @@ static void* app_rx_st20p_frame_thread(void* arg) {
info("%s(%d), start\n", __func__, s->idx);
while (!s->st20p_app_thread_stop) {
frame = st20p_rx_get_frame(s->handle);
if (!frame) { /* no frame */
st_pthread_mutex_lock(&s->st20p_wake_mutex);
if (!s->st20p_app_thread_stop)
st_pthread_cond_wait(&s->st20p_wake_cond, &s->st20p_wake_mutex);
st_pthread_mutex_unlock(&s->st20p_wake_mutex);
if (!frame) { /* no ready frame */
warn("%s(%d), get frame time out\n", __func__, s->idx);
continue;
}

Expand Down Expand Up @@ -140,16 +127,11 @@ static int app_rx_st20p_uinit(struct st_app_rx_st20p_session* s) {
s->st20p_app_thread_stop = true;
if (s->st20p_app_thread_stop) {
/* wake up the thread */
st_pthread_mutex_lock(&s->st20p_wake_mutex);
st_pthread_cond_signal(&s->st20p_wake_cond);
st_pthread_mutex_unlock(&s->st20p_wake_mutex);
info("%s(%d), wait app thread stop\n", __func__, idx);
if (s->handle) st20p_rx_wake_block(s->handle);
pthread_join(s->st20p_app_thread, NULL);
}

st_pthread_mutex_destroy(&s->st20p_wake_mutex);
st_pthread_cond_destroy(&s->st20p_wake_cond);

if (s->handle) {
ret = st20p_rx_free(s->handle);
if (ret < 0) err("%s(%d), st20_rx_free fail %d\n", __func__, idx, ret);
Expand Down Expand Up @@ -235,16 +217,13 @@ static int app_rx_st20p_init(struct st_app_context* ctx,
ops.transport_fmt = st20p ? st20p->info.transport_format : ST20_FMT_YUV_422_10BIT;
ops.port.payload_type = st20p ? st20p->base.payload_type : ST_APP_PAYLOAD_TYPE_VIDEO;
ops.device = st20p ? st20p->info.device : ST_PLUGIN_DEVICE_AUTO;
ops.notify_frame_available = app_rx_st20p_frame_available;
ops.flags |= ST20P_RX_FLAG_BLOCK_GET;
ops.framebuff_cnt = s->framebuff_cnt;
/* always try to enable DMA offload */
ops.flags = ST20P_RX_FLAG_DMA_OFFLOAD;
ops.flags |= ST20P_RX_FLAG_DMA_OFFLOAD;
if (st20p && st20p->enable_rtcp) ops.flags |= ST20P_RX_FLAG_ENABLE_RTCP;
if (ctx->enable_timing_parser) ops.flags |= ST20P_RX_FLAG_TIMING_PARSER_STAT;

st_pthread_mutex_init(&s->st20p_wake_mutex, NULL);
st_pthread_cond_init(&s->st20p_wake_cond, NULL);

s->width = ops.width;
s->height = ops.height;
if (ops.interlaced) {
Expand Down
Loading

0 comments on commit e4f5188

Please sign in to comment.