From e4f51886ece4372dd0c583dd53700da40319d37c Mon Sep 17 00:00:00 2001 From: Frank Du Date: Wed, 10 Jan 2024 13:25:03 +0800 Subject: [PATCH] app: use block mode for pipeline sample (#696) Signed-off-by: Frank Du --- app/sample/rx_st20_pipeline_sample.c | 26 ++--------- app/sample/rx_st22_pipeline_sample.c | 24 ++-------- app/sample/tx_st20_pipeline_sample.c | 26 ++--------- app/sample/tx_st22_pipeline_sample.c | 26 ++--------- app/src/app_base.h | 8 ---- app/src/rx_st20p_app.c | 31 ++----------- app/src/rx_st22p_app.c | 30 ++---------- app/src/tx_st20p_app.c | 31 ++----------- app/src/tx_st22p_app.c | 30 ++---------- doc/design.md | 4 +- include/st_pipeline_api.h | 46 ++++++++++++++++++- lib/src/st2110/pipeline/st20_pipeline_rx.c | 14 ++++++ lib/src/st2110/pipeline/st20_pipeline_tx.c | 14 ++++++ lib/src/st2110/pipeline/st22_pipeline_rx.c | 14 ++++++ lib/src/st2110/pipeline/st22_pipeline_tx.c | 14 ++++++ tests/script/loop_json/st20p_1v_1080p59.json | 4 +- ...v_planar10.json => st20p_1v_planar10.json} | 8 ++-- ...{st20p_2v_v210.json => st20p_1v_v210.json} | 8 ++-- tests/src/st20p_test.cpp | 2 + tests/src/st22p_test.cpp | 2 + 20 files changed, 148 insertions(+), 214 deletions(-) rename tests/script/loop_json/{st20p_2v_planar10.json => st20p_1v_planar10.json} (91%) rename tests/script/loop_json/{st20p_2v_v210.json => st20p_1v_v210.json} (91%) diff --git a/app/sample/rx_st20_pipeline_sample.c b/app/sample/rx_st20_pipeline_sample.c index 68b112339..3588304c3 100644 --- a/app/sample/rx_st20_pipeline_sample.c +++ b/app/sample/rx_st20_pipeline_sample.c @@ -12,8 +12,6 @@ struct rx_st20p_sample_ctx { pthread_t frame_thread; int fb_recv; - pthread_cond_t wake_cond; - pthread_mutex_t wake_mutex; size_t frame_size; int dst_fd; @@ -24,16 +22,6 @@ struct rx_st20p_sample_ctx { int fb_cnt; }; -static int rx_st20p_frame_available(void* priv) { - struct rx_st20p_sample_ctx* s = priv; - - st_pthread_mutex_lock(&s->wake_mutex); - st_pthread_cond_signal(&s->wake_cond); - st_pthread_mutex_unlock(&s->wake_mutex); - - return 0; -} - static int rx_st20p_close_source(struct rx_st20p_sample_ctx* s) { if (s->dst_begin) { munmap(s->dst_begin, s->dst_end - s->dst_begin); @@ -102,9 +90,7 @@ static void* rx_st20p_frame_thread(void* arg) { while (!s->stop) { frame = st20p_rx_get_frame(handle); if (!frame) { /* no frame */ - st_pthread_mutex_lock(&s->wake_mutex); - if (!s->stop) st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex); - st_pthread_mutex_unlock(&s->wake_mutex); + warn("%s(%d), get frame time out\n", __func__, s->idx); continue; } dbg("%s(%d), one new frame\n", __func__, s->idx); @@ -153,8 +139,6 @@ int main(int argc, char** argv) { memset(app[i], 0, sizeof(struct rx_st20p_sample_ctx)); app[i]->idx = i; app[i]->stop = false; - st_pthread_mutex_init(&app[i]->wake_mutex, NULL); - st_pthread_cond_init(&app[i]->wake_cond, NULL); app[i]->dst_fd = -1; app[i]->fb_cnt = ctx.framebuff_cnt; @@ -177,7 +161,7 @@ int main(int argc, char** argv) { ops_rx.output_fmt = ctx.output_fmt; ops_rx.device = ST_PLUGIN_DEVICE_AUTO; ops_rx.framebuff_cnt = app[i]->fb_cnt; - ops_rx.notify_frame_available = rx_st20p_frame_available; + ops_rx.flags = ST20P_RX_FLAG_BLOCK_GET; st20p_rx_handle rx_handle = st20p_rx_create(ctx.st, &ops_rx); if (!rx_handle) { @@ -214,9 +198,7 @@ int main(int argc, char** argv) { // stop app thread for (int i = 0; i < session_num; i++) { app[i]->stop = true; - st_pthread_mutex_lock(&app[i]->wake_mutex); - st_pthread_cond_signal(&app[i]->wake_cond); - st_pthread_mutex_unlock(&app[i]->wake_mutex); + if (app[i]->handle) st20p_rx_wake_block(app[i]->handle); pthread_join(app[i]->frame_thread, NULL); info("%s(%d), received frames %d\n", __func__, i, app[i]->fb_recv); @@ -238,8 +220,6 @@ int main(int argc, char** argv) { for (int i = 0; i < session_num; i++) { if (app[i]) { if (app[i]->handle) st20p_rx_free(app[i]->handle); - st_pthread_mutex_destroy(&app[i]->wake_mutex); - st_pthread_cond_destroy(&app[i]->wake_cond); free(app[i]); } } diff --git a/app/sample/rx_st22_pipeline_sample.c b/app/sample/rx_st22_pipeline_sample.c index b644e5132..eeff3df24 100644 --- a/app/sample/rx_st22_pipeline_sample.c +++ b/app/sample/rx_st22_pipeline_sample.c @@ -12,8 +12,6 @@ struct rx_st22p_sample_ctx { pthread_t frame_thread; int fb_recv; - pthread_cond_t wake_cond; - pthread_mutex_t wake_mutex; size_t frame_size; int dst_fd; @@ -22,16 +20,6 @@ struct rx_st22p_sample_ctx { uint8_t* dst_cursor; }; -static int rx_st22p_frame_available(void* priv) { - struct rx_st22p_sample_ctx* s = priv; - - st_pthread_mutex_lock(&s->wake_mutex); - st_pthread_cond_signal(&s->wake_cond); - st_pthread_mutex_unlock(&s->wake_mutex); - - return 0; -} - static int rx_st22p_close_source(struct rx_st22p_sample_ctx* s) { if (s->dst_begin) { munmap(s->dst_begin, s->dst_end - s->dst_begin); @@ -107,9 +95,7 @@ static void* rx_st22p_frame_thread(void* arg) { while (!s->stop) { frame = st22p_rx_get_frame(handle); if (!frame) { /* no frame */ - st_pthread_mutex_lock(&s->wake_mutex); - if (!s->stop) st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex); - st_pthread_mutex_unlock(&s->wake_mutex); + warn("%s(%d), get frame time out\n", __func__, s->idx); continue; } rx_st22p_consume_frame(s, frame); @@ -149,8 +135,6 @@ int main(int argc, char** argv) { memset(app[i], 0, sizeof(struct rx_st22p_sample_ctx)); app[i]->idx = i; app[i]->stop = false; - st_pthread_mutex_init(&app[i]->wake_mutex, NULL); - st_pthread_cond_init(&app[i]->wake_cond, NULL); app[i]->dst_fd = -1; struct st22p_rx_ops ops_rx; @@ -175,7 +159,7 @@ int main(int argc, char** argv) { ops_rx.max_codestream_size = 0; /* let lib to decide */ ops_rx.framebuff_cnt = ctx.framebuff_cnt; ops_rx.codec_thread_cnt = 2; - ops_rx.notify_frame_available = rx_st22p_frame_available; + ops_rx.flags = ST22P_RX_FLAG_BLOCK_GET; st22p_rx_handle rx_handle = st22p_rx_create(ctx.st, &ops_rx); if (!rx_handle) { @@ -211,9 +195,7 @@ int main(int argc, char** argv) { // stop app thread for (int i = 0; i < session_num; i++) { app[i]->stop = true; - st_pthread_mutex_lock(&app[i]->wake_mutex); - st_pthread_cond_signal(&app[i]->wake_cond); - st_pthread_mutex_unlock(&app[i]->wake_mutex); + if (app[i]->handle) st22p_rx_wake_block(app[i]->handle); pthread_join(app[i]->frame_thread, NULL); info("%s(%d), received frames %d\n", __func__, i, app[i]->fb_recv); diff --git a/app/sample/tx_st20_pipeline_sample.c b/app/sample/tx_st20_pipeline_sample.c index 2b61ce1a6..f5befa19e 100644 --- a/app/sample/tx_st20_pipeline_sample.c +++ b/app/sample/tx_st20_pipeline_sample.c @@ -14,8 +14,6 @@ struct tx_st20p_sample_ctx { int fb_send; int fb_send_done; - pthread_cond_t wake_cond; - pthread_mutex_t wake_mutex; size_t frame_size; uint8_t* source_begin; @@ -94,16 +92,6 @@ static int tx_st20p_open_source(struct tx_st20p_sample_ctx* s, char* file) { return 0; } -static int tx_st20p_frame_available(void* priv) { - struct tx_st20p_sample_ctx* s = priv; - - st_pthread_mutex_lock(&s->wake_mutex); - st_pthread_cond_signal(&s->wake_cond); - st_pthread_mutex_unlock(&s->wake_mutex); - - return 0; -} - static int tx_st20p_frame_done(void* priv, struct st_frame* frame) { struct tx_st20p_sample_ctx* s = priv; MTL_MAY_UNUSED(frame); @@ -128,9 +116,7 @@ static void* tx_st20p_frame_thread(void* arg) { while (!s->stop) { frame = st20p_tx_get_frame(handle); if (!frame) { /* no frame */ - st_pthread_mutex_lock(&s->wake_mutex); - if (!s->stop) st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex); - st_pthread_mutex_unlock(&s->wake_mutex); + warn("%s(%d), get frame time out\n", __func__, s->idx); continue; } @@ -189,8 +175,6 @@ int main(int argc, char** argv) { snprintf(app[i]->meta.dummy, sizeof(app[i]->meta.dummy), "st20p_tx_%d", i); app[i]->has_user_meta = true; } - st_pthread_mutex_init(&app[i]->wake_mutex, NULL); - st_pthread_cond_init(&app[i]->wake_cond, NULL); struct st20p_tx_ops ops_tx; memset(&ops_tx, 0, sizeof(ops_tx)); @@ -218,7 +202,7 @@ int main(int argc, char** argv) { ops_tx.transport_fmt = ctx.fmt; ops_tx.device = ST_PLUGIN_DEVICE_AUTO; ops_tx.framebuff_cnt = ctx.framebuff_cnt; - ops_tx.notify_frame_available = tx_st20p_frame_available; + ops_tx.flags = ST20P_TX_FLAG_BLOCK_GET; ops_tx.notify_frame_done = tx_st20p_frame_done; st20p_tx_handle tx_handle = st20p_tx_create(ctx.st, &ops_tx); @@ -255,9 +239,7 @@ int main(int argc, char** argv) { // stop app thread for (int i = 0; i < session_num; i++) { app[i]->stop = true; - st_pthread_mutex_lock(&app[i]->wake_mutex); - st_pthread_cond_signal(&app[i]->wake_cond); - st_pthread_mutex_unlock(&app[i]->wake_mutex); + if (app[i]->handle) st20p_tx_wake_block(app[i]->handle); pthread_join(app[i]->frame_thread, NULL); info("%s(%d), sent frames %d(done %d)\n", __func__, i, app[i]->fb_send, app[i]->fb_send_done); @@ -279,8 +261,6 @@ int main(int argc, char** argv) { error: for (int i = 0; i < session_num; i++) { if (app[i]) { - st_pthread_mutex_destroy(&app[i]->wake_mutex); - st_pthread_cond_destroy(&app[i]->wake_cond); if (app[i]->handle) st20p_tx_free(app[i]->handle); free(app[i]); } diff --git a/app/sample/tx_st22_pipeline_sample.c b/app/sample/tx_st22_pipeline_sample.c index 17a42942c..d42049205 100644 --- a/app/sample/tx_st22_pipeline_sample.c +++ b/app/sample/tx_st22_pipeline_sample.c @@ -13,8 +13,6 @@ struct tx_st22p_sample_ctx { pthread_t frame_thread; int fb_send; - pthread_cond_t wake_cond; - pthread_mutex_t wake_mutex; size_t frame_size; uint8_t* source_begin; @@ -124,16 +122,6 @@ static int tx_st22p_open_source(struct st_sample_context* ctx, return 0; } -static int tx_st22p_frame_available(void* priv) { - struct tx_st22p_sample_ctx* s = priv; - - st_pthread_mutex_lock(&s->wake_mutex); - st_pthread_cond_signal(&s->wake_cond); - st_pthread_mutex_unlock(&s->wake_mutex); - - return 0; -} - static void tx_st22p_build_frame(struct tx_st22p_sample_ctx* s, struct st_frame* frame) { if (s->frame_cursor + s->frame_size > s->source_end) { s->frame_cursor = s->source_begin; @@ -167,9 +155,7 @@ static void* tx_st22p_frame_thread(void* arg) { while (!s->stop) { frame = st22p_tx_get_frame(handle); if (!frame) { /* no frame */ - st_pthread_mutex_lock(&s->wake_mutex); - if (!s->stop) st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex); - st_pthread_mutex_unlock(&s->wake_mutex); + warn("%s(%d), get frame time out\n", __func__, s->idx); continue; } if (s->source_begin) tx_st22p_build_frame(s, frame); @@ -212,8 +198,6 @@ int main(int argc, char** argv) { app[i]->st = ctx.st; app[i]->idx = i; app[i]->stop = false; - st_pthread_mutex_init(&app[i]->wake_mutex, NULL); - st_pthread_cond_init(&app[i]->wake_cond, NULL); struct st22p_tx_ops ops_tx; memset(&ops_tx, 0, sizeof(ops_tx)); @@ -239,7 +223,7 @@ int main(int argc, char** argv) { ops_tx.codestream_size = ops_tx.width * ops_tx.height * bpp / 8; if (ops_tx.interlaced) ops_tx.codestream_size /= 2; ops_tx.framebuff_cnt = ctx.framebuff_cnt; - ops_tx.notify_frame_available = tx_st22p_frame_available; + ops_tx.flags = ST22P_TX_FLAG_BLOCK_GET; st22p_tx_handle tx_handle = st22p_tx_create(ctx.st, &ops_tx); if (!tx_handle) { @@ -270,9 +254,7 @@ int main(int argc, char** argv) { // stop app thread for (int i = 0; i < session_num; i++) { app[i]->stop = true; - st_pthread_mutex_lock(&app[i]->wake_mutex); - st_pthread_cond_signal(&app[i]->wake_cond); - st_pthread_mutex_unlock(&app[i]->wake_mutex); + if (app[i]->handle) st22p_tx_wake_block(app[i]->handle); pthread_join(app[i]->frame_thread, NULL); info("%s(%d), sent frames %d\n", __func__, i, app[i]->fb_send); @@ -293,8 +275,6 @@ int main(int argc, char** argv) { error: for (int i = 0; i < session_num; i++) { if (app[i]) { - st_pthread_mutex_destroy(&app[i]->wake_mutex); - st_pthread_cond_destroy(&app[i]->wake_cond); if (app[i]->handle) st22p_tx_free(app[i]->handle); free(app[i]); } diff --git a/app/src/app_base.h b/app/src/app_base.h index e38cfe120..10b5a4953 100644 --- a/app/src/app_base.h +++ b/app/src/app_base.h @@ -392,8 +392,6 @@ struct st_app_tx_st22p_session { double expect_fps; pthread_t st22p_app_thread; - pthread_cond_t st22p_wake_cond; - pthread_mutex_t st22p_wake_mutex; bool st22p_app_thread_stop; }; @@ -415,8 +413,6 @@ struct st_app_rx_st22p_session { double expect_fps; pthread_t st22p_app_thread; - pthread_cond_t st22p_wake_cond; - pthread_mutex_t st22p_wake_mutex; bool st22p_app_thread_stop; struct st_display* display; @@ -448,8 +444,6 @@ struct st_app_tx_st20p_session { double expect_fps; pthread_t st20p_app_thread; - pthread_cond_t st20p_wake_cond; - pthread_mutex_t st20p_wake_mutex; bool st20p_app_thread_stop; }; @@ -473,8 +467,6 @@ struct st_app_rx_st20p_session { double expect_fps; pthread_t st20p_app_thread; - pthread_cond_t st20p_wake_cond; - pthread_mutex_t st20p_wake_mutex; bool st20p_app_thread_stop; struct st_display* display; diff --git a/app/src/rx_st20p_app.c b/app/src/rx_st20p_app.c index 1373cef5b..53eb5df86 100644 --- a/app/src/rx_st20p_app.c +++ b/app/src/rx_st20p_app.c @@ -4,16 +4,6 @@ #include "rx_st20p_app.h" -static int app_rx_st20p_frame_available(void* priv) { - struct st_app_rx_st20p_session* s = priv; - - st_pthread_mutex_lock(&s->st20p_wake_mutex); - st_pthread_cond_signal(&s->st20p_wake_cond); - st_pthread_mutex_unlock(&s->st20p_wake_mutex); - - return 0; -} - static void app_rx_st20p_consume_frame(struct st_app_rx_st20p_session* s, struct st_frame* frame) { struct st_display* d = s->display; @@ -63,11 +53,8 @@ static void* app_rx_st20p_frame_thread(void* arg) { info("%s(%d), start\n", __func__, s->idx); while (!s->st20p_app_thread_stop) { frame = st20p_rx_get_frame(s->handle); - if (!frame) { /* no frame */ - st_pthread_mutex_lock(&s->st20p_wake_mutex); - if (!s->st20p_app_thread_stop) - st_pthread_cond_wait(&s->st20p_wake_cond, &s->st20p_wake_mutex); - st_pthread_mutex_unlock(&s->st20p_wake_mutex); + if (!frame) { /* no ready frame */ + warn("%s(%d), get frame time out\n", __func__, s->idx); continue; } @@ -140,16 +127,11 @@ static int app_rx_st20p_uinit(struct st_app_rx_st20p_session* s) { s->st20p_app_thread_stop = true; if (s->st20p_app_thread_stop) { /* wake up the thread */ - st_pthread_mutex_lock(&s->st20p_wake_mutex); - st_pthread_cond_signal(&s->st20p_wake_cond); - st_pthread_mutex_unlock(&s->st20p_wake_mutex); info("%s(%d), wait app thread stop\n", __func__, idx); + if (s->handle) st20p_rx_wake_block(s->handle); pthread_join(s->st20p_app_thread, NULL); } - st_pthread_mutex_destroy(&s->st20p_wake_mutex); - st_pthread_cond_destroy(&s->st20p_wake_cond); - if (s->handle) { ret = st20p_rx_free(s->handle); if (ret < 0) err("%s(%d), st20_rx_free fail %d\n", __func__, idx, ret); @@ -235,16 +217,13 @@ static int app_rx_st20p_init(struct st_app_context* ctx, ops.transport_fmt = st20p ? st20p->info.transport_format : ST20_FMT_YUV_422_10BIT; ops.port.payload_type = st20p ? st20p->base.payload_type : ST_APP_PAYLOAD_TYPE_VIDEO; ops.device = st20p ? st20p->info.device : ST_PLUGIN_DEVICE_AUTO; - ops.notify_frame_available = app_rx_st20p_frame_available; + ops.flags |= ST20P_RX_FLAG_BLOCK_GET; ops.framebuff_cnt = s->framebuff_cnt; /* always try to enable DMA offload */ - ops.flags = ST20P_RX_FLAG_DMA_OFFLOAD; + ops.flags |= ST20P_RX_FLAG_DMA_OFFLOAD; if (st20p && st20p->enable_rtcp) ops.flags |= ST20P_RX_FLAG_ENABLE_RTCP; if (ctx->enable_timing_parser) ops.flags |= ST20P_RX_FLAG_TIMING_PARSER_STAT; - st_pthread_mutex_init(&s->st20p_wake_mutex, NULL); - st_pthread_cond_init(&s->st20p_wake_cond, NULL); - s->width = ops.width; s->height = ops.height; if (ops.interlaced) { diff --git a/app/src/rx_st22p_app.c b/app/src/rx_st22p_app.c index 534ecae57..bb6988b0d 100644 --- a/app/src/rx_st22p_app.c +++ b/app/src/rx_st22p_app.c @@ -4,16 +4,6 @@ #include "rx_st22p_app.h" -static int app_rx_st22p_frame_available(void* priv) { - struct st_app_rx_st22p_session* s = priv; - - st_pthread_mutex_lock(&s->st22p_wake_mutex); - st_pthread_cond_signal(&s->st22p_wake_cond); - st_pthread_mutex_unlock(&s->st22p_wake_mutex); - - return 0; -} - static void app_rx_st22p_consume_frame(struct st_app_rx_st22p_session* s, struct st_frame* frame) { struct st_display* d = s->display; @@ -48,11 +38,8 @@ static void* app_rx_st22p_frame_thread(void* arg) { info("%s(%d), start\n", __func__, s->idx); while (!s->st22p_app_thread_stop) { frame = st22p_rx_get_frame(s->handle); - if (!frame) { /* no frame */ - st_pthread_mutex_lock(&s->st22p_wake_mutex); - if (!s->st22p_app_thread_stop) - st_pthread_cond_wait(&s->st22p_wake_cond, &s->st22p_wake_mutex); - st_pthread_mutex_unlock(&s->st22p_wake_mutex); + if (!frame) { /* no ready frame */ + warn("%s(%d), get frame time out\n", __func__, s->idx); continue; } @@ -111,17 +98,11 @@ static int app_rx_st22p_uinit(struct st_app_rx_st22p_session* s) { s->st22p_app_thread_stop = true; if (s->st22p_app_thread_stop) { - /* wake up the thread */ - st_pthread_mutex_lock(&s->st22p_wake_mutex); - st_pthread_cond_signal(&s->st22p_wake_cond); - st_pthread_mutex_unlock(&s->st22p_wake_mutex); info("%s(%d), wait app thread stop\n", __func__, idx); + if (s->handle) st22p_rx_wake_block(s->handle); pthread_join(s->st22p_app_thread, NULL); } - st_pthread_mutex_destroy(&s->st22p_wake_mutex); - st_pthread_cond_destroy(&s->st22p_wake_cond); - if (s->handle) { ret = st22p_rx_free(s->handle); if (ret < 0) err("%s(%d), st20_rx_free fail %d\n", __func__, idx, ret); @@ -183,13 +164,10 @@ static int app_rx_st22p_init(struct st_app_context* ctx, ops.device = st22p ? st22p->info.device : ST_PLUGIN_DEVICE_AUTO; ops.codec_thread_cnt = st22p ? st22p->info.codec_thread_count : 0; ops.max_codestream_size = 0; - ops.notify_frame_available = app_rx_st22p_frame_available; + ops.flags |= ST22P_RX_FLAG_BLOCK_GET; ops.framebuff_cnt = s->framebuff_cnt; if (st22p && st22p->enable_rtcp) ops.flags |= ST22P_RX_FLAG_ENABLE_RTCP; - st_pthread_mutex_init(&s->st22p_wake_mutex, NULL); - st_pthread_cond_init(&s->st22p_wake_cond, NULL); - s->width = ops.width; s->height = ops.height; if (ops.interlaced) { diff --git a/app/src/tx_st20p_app.c b/app/src/tx_st20p_app.c index e3b98ed55..7a993ea60 100644 --- a/app/src/tx_st20p_app.c +++ b/app/src/tx_st20p_app.c @@ -27,16 +27,6 @@ static void app_tx_st20p_display_frame(struct st_app_tx_st20p_session* s, } } -static int app_tx_st20p_frame_available(void* priv) { - struct st_app_tx_st20p_session* s = priv; - - st_pthread_mutex_lock(&s->st20p_wake_mutex); - st_pthread_cond_signal(&s->st20p_wake_cond); - st_pthread_mutex_unlock(&s->st20p_wake_mutex); - - return 0; -} - static int app_tx_st20p_notify_event(void* priv, enum st_event event, void* args) { struct st_app_tx_st20p_session* s = priv; if (event == ST_EVENT_VSYNC) { @@ -79,11 +69,8 @@ static void* app_tx_st20p_frame_thread(void* arg) { info("%s(%d), start\n", __func__, idx); while (!s->st20p_app_thread_stop) { frame = st20p_tx_get_frame(handle); - if (!frame) { /* no frame */ - st_pthread_mutex_lock(&s->st20p_wake_mutex); - if (!s->st20p_app_thread_stop) - st_pthread_cond_wait(&s->st20p_wake_cond, &s->st20p_wake_mutex); - st_pthread_mutex_unlock(&s->st20p_wake_mutex); + if (!frame) { /* no ready frame */ + warn("%s(%d), get frame time out\n", __func__, s->idx); continue; } app_tx_st20p_build_frame(s, frame); @@ -165,11 +152,9 @@ static int app_tx_st20p_start_source(struct st_app_tx_st20p_session* s) { static void app_tx_st20p_stop_source(struct st_app_tx_st20p_session* s) { s->st20p_app_thread_stop = true; - /* wake up the thread */ - st_pthread_mutex_lock(&s->st20p_wake_mutex); - st_pthread_cond_signal(&s->st20p_wake_cond); - st_pthread_mutex_unlock(&s->st20p_wake_mutex); if (s->st20p_app_thread) { + info("%s(%d), wait app thread stop\n", __func__, s->idx); + if (s->handle) st20p_tx_wake_block(s->handle); pthread_join(s->st20p_app_thread, NULL); s->st20p_app_thread = 0; } @@ -212,9 +197,6 @@ static int app_tx_st20p_uinit(struct st_app_tx_st20p_session* s) { st_app_free(s->display); } - st_pthread_mutex_destroy(&s->st20p_wake_mutex); - st_pthread_cond_destroy(&s->st20p_wake_cond); - return 0; } @@ -299,7 +281,7 @@ static int app_tx_st20p_init(struct st_app_context* ctx, st_json_st20p_session_t ops.transport_fmt = st20p ? st20p->info.transport_format : ST20_FMT_YUV_422_10BIT; ops.device = st20p ? st20p->info.device : ST_PLUGIN_DEVICE_AUTO; ops.framebuff_cnt = 2; - ops.notify_frame_available = app_tx_st20p_frame_available; + ops.flags |= ST20P_TX_FLAG_BLOCK_GET; ops.start_vrx = ctx->tx_start_vrx; ops.pad_interval = ctx->tx_pad_interval; ops.rtp_timestamp_delta_us = ctx->tx_ts_delta_us; @@ -324,9 +306,6 @@ static int app_tx_st20p_init(struct st_app_context* ctx, st_json_st20p_session_t s->framebuff_cnt = ops.framebuff_cnt; s->st20p_source_fd = -1; - st_pthread_mutex_init(&s->st20p_wake_mutex, NULL); - st_pthread_cond_init(&s->st20p_wake_cond, NULL); - handle = st20p_tx_create(ctx->st, &ops); if (!handle) { err("%s(%d), st20p_tx_create fail\n", __func__, idx); diff --git a/app/src/tx_st22p_app.c b/app/src/tx_st22p_app.c index f91783698..85bb13f9a 100644 --- a/app/src/tx_st22p_app.c +++ b/app/src/tx_st22p_app.c @@ -27,16 +27,6 @@ static void app_tx_st22p_display_frame(struct st_app_tx_st22p_session* s, } } -static int app_tx_st22p_frame_available(void* priv) { - struct st_app_tx_st22p_session* s = priv; - - st_pthread_mutex_lock(&s->st22p_wake_mutex); - st_pthread_cond_signal(&s->st22p_wake_cond); - st_pthread_mutex_unlock(&s->st22p_wake_mutex); - - return 0; -} - static void app_tx_st22p_build_frame(struct st_app_tx_st22p_session* s, struct st_frame* frame) { if (s->st22p_frame_cursor + s->st22p_frame_size > s->st22p_source_end) { @@ -64,11 +54,8 @@ static void* app_tx_st22p_frame_thread(void* arg) { info("%s(%d), start\n", __func__, idx); while (!s->st22p_app_thread_stop) { frame = st22p_tx_get_frame(handle); - if (!frame) { /* no frame */ - st_pthread_mutex_lock(&s->st22p_wake_mutex); - if (!s->st22p_app_thread_stop) - st_pthread_cond_wait(&s->st22p_wake_cond, &s->st22p_wake_mutex); - st_pthread_mutex_unlock(&s->st22p_wake_mutex); + if (!frame) { /* no ready frame */ + warn("%s(%d), get frame time out\n", __func__, s->idx); continue; } app_tx_st22p_build_frame(s, frame); @@ -146,10 +133,9 @@ static int app_tx_st22p_start_source(struct st_app_tx_st22p_session* s) { static void app_tx_st22p_stop_source(struct st_app_tx_st22p_session* s) { s->st22p_app_thread_stop = true; /* wake up the thread */ - st_pthread_mutex_lock(&s->st22p_wake_mutex); - st_pthread_cond_signal(&s->st22p_wake_cond); - st_pthread_mutex_unlock(&s->st22p_wake_mutex); if (s->st22p_app_thread) { + info("%s(%d), wait app thread stop\n", __func__, s->idx); + if (s->handle) st22p_tx_wake_block(s->handle); pthread_join(s->st22p_app_thread, NULL); s->st22p_app_thread = 0; } @@ -192,9 +178,6 @@ static int app_tx_st22p_uinit(struct st_app_tx_st22p_session* s) { st_app_free(s->display); } - st_pthread_mutex_destroy(&s->st22p_wake_mutex); - st_pthread_cond_destroy(&s->st22p_wake_cond); - return 0; } @@ -253,7 +236,7 @@ static int app_tx_st22p_init(struct st_app_context* ctx, st_json_st22p_session_t ops.codestream_size = ops.width * ops.height * 3 / 8; if (ops.interlaced) ops.codestream_size /= 2; /* the size is for each field */ ops.framebuff_cnt = 2; - ops.notify_frame_available = app_tx_st22p_frame_available; + ops.flags |= ST22P_TX_FLAG_BLOCK_GET; if (st22p && st22p->enable_rtcp) ops.flags |= ST22P_TX_FLAG_ENABLE_RTCP; if (ctx->tx_no_bulk) ops.flags |= ST22P_TX_FLAG_DISABLE_BULK; @@ -270,9 +253,6 @@ static int app_tx_st22p_init(struct st_app_context* ctx, st_json_st22p_session_t s->framebuff_cnt = ops.framebuff_cnt; s->st22p_source_fd = -1; - st_pthread_mutex_init(&s->st22p_wake_mutex, NULL); - st_pthread_cond_init(&s->st22p_wake_cond, NULL); - handle = st22p_tx_create(ctx->st, &ops); if (!handle) { err("%s(%d), st22p_tx_create fail\n", __func__, idx); diff --git a/doc/design.md b/doc/design.md index 77d15a16e..032b1f032 100644 --- a/doc/design.md +++ b/doc/design.md @@ -336,7 +336,7 @@ The Get/Put API is straightforward to use; consider the following example: Sample application code can be find at [tx_st20_pipeline_sample.c](../app/sample/tx_st20_pipeline_sample.c) and [rx_st20_pipeline_sample.c](../app/sample/rx_st20_pipeline_sample.c) By default, the `st20p_tx_get_frame` and `st20p_rx_get_frame` functions operate in non-blocking mode, which means the function call will immediately return `NULL` if no frame is available. -To switch to blocking mode, where the call will wait until a frame is ready for application use or one second timeout occurs, you must enable the `ST20P_TX_FLAG_BLOCK_GET` or `ST20P_RX_FLAG_BLOCK_GET` flag respectively during the session creation stage. +To switch to blocking mode, where the call will wait until a frame is ready for application use or one second timeout occurs, you must enable the `ST20P_TX_FLAG_BLOCK_GET` or `ST20P_RX_FLAG_BLOCK_GET` flag respectively during the session creation stage, and application can use `st20p_tx_wake_block`/`st20p_rx_wake_block` to wake up the waiting directly. ### 6.4 ST22 support @@ -362,7 +362,7 @@ The plugin guide can be find at [plugin](./plugin.md), the pipeline detail imple Sample application code can be find at [tx_st22_pipeline_sample.c](../app/sample/tx_st22_pipeline_sample.c) and [rx_st22_pipeline_sample.c](../app/sample/rx_st22_pipeline_sample.c) By default, the `st22p_tx_get_frame` and `st22p_rx_get_frame` functions operate in non-blocking mode, which means the function call will immediately return `NULL` if no frame is available. -To switch to blocking mode, where the call will wait until a frame is ready for application use or one second timeout occurs, you must enable the `ST22P_TX_FLAG_BLOCK_GET` or `ST22P_RX_FLAG_BLOCK_GET` flag respectively during the session creation stage. +To switch to blocking mode, where the call will wait until a frame is ready for application use or one second timeout occurs, you must enable the `ST22P_TX_FLAG_BLOCK_GET` or `ST22P_RX_FLAG_BLOCK_GET` flag respectively during the session creation stage, and application can use `st22p_tx_wake_block`/`st22p_rx_wake_block` to wake up the waiting directly. #### 6.4.2 ST22 codestream mode diff --git a/include/st_pipeline_api.h b/include/st_pipeline_api.h index 5fe9366a7..cd7a5029e 100644 --- a/include/st_pipeline_api.h +++ b/include/st_pipeline_api.h @@ -1333,6 +1333,17 @@ size_t st22p_tx_frame_size(st22p_tx_handle handle); */ int st22p_tx_update_destination(st22p_tx_handle handle, struct st_tx_dest_info* dst); +/** + * Wake up the block wait on st22p_tx_get_frame if ST22P_TX_FLAG_BLOCK_GET is enabled. + * + * @param handle + * The handle to the tx st2110-22(pipeline) session. + * @return + * - 0: Success. + * - <0: Error code. + */ +int st22p_tx_wake_block(st22p_tx_handle handle); + /** * Create one rx st2110-22 pipeline session. * @@ -1452,6 +1463,17 @@ int st22p_rx_get_queue_meta(st22p_rx_handle handle, struct st_queue_meta* meta); */ int st22p_rx_update_source(st22p_rx_handle handle, struct st_rx_source_info* src); +/** + * Wake up the block wait on st22p_rx_get_frame if ST22P_RX_FLAG_BLOCK_GET is enabled. + * + * @param handle + * The handle to the rx st2110-22(pipeline) session. + * @return + * - 0: Success. + * - <0: Error code. + */ +int st22p_rx_wake_block(st22p_rx_handle handle); + /** * Create one tx st2110-20 pipeline session. * @@ -1597,6 +1619,17 @@ int st20p_tx_reset_port_stats(st20p_tx_handle handle, enum mtl_session_port port */ int st20p_tx_update_destination(st20p_tx_handle handle, struct st_tx_dest_info* dst); +/** + * Wake up the block wait on st20p_tx_get_frame if ST20P_TX_FLAG_BLOCK_GET is enabled. + * + * @param handle + * The handle to the tx st2110-20(pipeline) session. + * @return + * - 0: Success. + * - <0: Error code. + */ +int st20p_tx_wake_block(st20p_tx_handle handle); + /** * Create one rx st2110-20 pipeline session. * @@ -1758,7 +1791,7 @@ int st20p_rx_update_source(st20p_rx_handle handle, struct st_rx_source_info* src /** * Get the timing parser pass critical to rx st2110-20(pipeline) session. - * Only avaiable if ST20P_RX_FLAG_TIMING_PARSER_META is enabled. + * Only available if ST20P_RX_FLAG_TIMING_PARSER_META is enabled. * * @param handle * The handle to the rx st2110-20(pipeline) session. @@ -1770,6 +1803,17 @@ int st20p_rx_update_source(st20p_rx_handle handle, struct st_rx_source_info* src */ int st20p_rx_timing_parser_critical(st20p_rx_handle handle, struct st20_rx_tp_pass* pass); +/** + * Wake up the block wait on st20p_rx_get_frame if ST20P_RX_FLAG_BLOCK_GET is enabled. + * + * @param handle + * The handle to the rx st2110-20(pipeline) session. + * @return + * - 0: Success. + * - <0: Error code. + */ +int st20p_rx_wake_block(st20p_rx_handle handle); + /** * Convert color format from source frame to destination frame. * diff --git a/lib/src/st2110/pipeline/st20_pipeline_rx.c b/lib/src/st2110/pipeline/st20_pipeline_rx.c index 05fb11bc6..b942c9dc2 100644 --- a/lib/src/st2110/pipeline/st20_pipeline_rx.c +++ b/lib/src/st2110/pipeline/st20_pipeline_rx.c @@ -1012,3 +1012,17 @@ int st20p_rx_timing_parser_critical(st20p_rx_handle handle, return st20_rx_timing_parser_critical(ctx->transport, pass); } + +int st20p_rx_wake_block(st20p_rx_handle handle) { + struct st20p_rx_ctx* ctx = handle; + int cidx = ctx->idx; + + if (ctx->type != MT_ST20_HANDLE_PIPELINE_RX) { + err("%s(%d), invalid type %d\n", __func__, cidx, ctx->type); + return 0; + } + + if (ctx->block_get) rx_st20p_block_wake(ctx); + + return 0; +} diff --git a/lib/src/st2110/pipeline/st20_pipeline_tx.c b/lib/src/st2110/pipeline/st20_pipeline_tx.c index ef6a6c839..09c34fe19 100644 --- a/lib/src/st2110/pipeline/st20_pipeline_tx.c +++ b/lib/src/st2110/pipeline/st20_pipeline_tx.c @@ -865,3 +865,17 @@ int st20p_tx_update_destination(st20p_tx_handle handle, struct st_tx_dest_info* return st20_tx_update_destination(ctx->transport, dst); } + +int st20p_tx_wake_block(st20p_tx_handle handle) { + struct st20p_tx_ctx* ctx = handle; + int cidx = ctx->idx; + + if (ctx->type != MT_ST20_HANDLE_PIPELINE_TX) { + err("%s(%d), invalid type %d\n", __func__, cidx, ctx->type); + return 0; + } + + if (ctx->block_get) tx_st20p_block_wake(ctx); + + return 0; +} diff --git a/lib/src/st2110/pipeline/st22_pipeline_rx.c b/lib/src/st2110/pipeline/st22_pipeline_rx.c index ef1498b4b..5c2e3c89c 100644 --- a/lib/src/st2110/pipeline/st22_pipeline_rx.c +++ b/lib/src/st2110/pipeline/st22_pipeline_rx.c @@ -690,3 +690,17 @@ int st22p_rx_update_source(st22p_rx_handle handle, struct st_rx_source_info* src return st22_rx_update_source(ctx->transport, src); } + +int st22p_rx_wake_block(st22p_rx_handle handle) { + struct st22p_rx_ctx* ctx = handle; + int cidx = ctx->idx; + + if (ctx->type != MT_ST22_HANDLE_PIPELINE_RX) { + err("%s(%d), invalid type %d\n", __func__, cidx, ctx->type); + return 0; + } + + if (ctx->block_get) rx_st22p_block_wake(ctx); + + return 0; +} diff --git a/lib/src/st2110/pipeline/st22_pipeline_tx.c b/lib/src/st2110/pipeline/st22_pipeline_tx.c index 55895d4a5..91bf63cc1 100644 --- a/lib/src/st2110/pipeline/st22_pipeline_tx.c +++ b/lib/src/st2110/pipeline/st22_pipeline_tx.c @@ -733,3 +733,17 @@ int st22p_tx_update_destination(st22p_tx_handle handle, struct st_tx_dest_info* return st22_tx_update_destination(ctx->transport, dst); } + +int st22p_tx_wake_block(st22p_tx_handle handle) { + struct st22p_tx_ctx* ctx = handle; + int cidx = ctx->idx; + + if (ctx->type != MT_ST22_HANDLE_PIPELINE_TX) { + err("%s(%d), invalid type %d\n", __func__, cidx, ctx->type); + return 0; + } + + if (ctx->block_get) tx_st22p_block_wake(ctx); + + return 0; +} diff --git a/tests/script/loop_json/st20p_1v_1080p59.json b/tests/script/loop_json/st20p_1v_1080p59.json index 0234c8a20..d6ff01b5d 100644 --- a/tests/script/loop_json/st20p_1v_1080p59.json +++ b/tests/script/loop_json/st20p_1v_1080p59.json @@ -19,7 +19,7 @@ ], "st20p": [ { - "replicas": 2, + "replicas": 1, "start_port": 20000, "payload_type": 112, "width": 1920, @@ -43,7 +43,7 @@ ], "st20p": [ { - "replicas": 2, + "replicas": 1, "start_port": 20000, "payload_type": 112, "width": 1920, diff --git a/tests/script/loop_json/st20p_2v_planar10.json b/tests/script/loop_json/st20p_1v_planar10.json similarity index 91% rename from tests/script/loop_json/st20p_2v_planar10.json rename to tests/script/loop_json/st20p_1v_planar10.json index eea0cd33c..1937e24c4 100644 --- a/tests/script/loop_json/st20p_2v_planar10.json +++ b/tests/script/loop_json/st20p_1v_planar10.json @@ -12,14 +12,14 @@ "tx_sessions": [ { "dip": [ - "192.168.17.102" + "local:1" ], "interface": [ 0 ], "st20p": [ { - "replicas": 2, + "replicas": 1, "start_port": 20000, "payload_type": 112, "width": 1920, @@ -36,14 +36,14 @@ "rx_sessions": [ { "ip": [ - "192.168.17.101" + "local:0" ], "interface": [ 1 ], "st20p": [ { - "replicas": 2, + "replicas": 1, "start_port": 20000, "payload_type": 112, "width": 1920, diff --git a/tests/script/loop_json/st20p_2v_v210.json b/tests/script/loop_json/st20p_1v_v210.json similarity index 91% rename from tests/script/loop_json/st20p_2v_v210.json rename to tests/script/loop_json/st20p_1v_v210.json index ab42a7418..79c73ddb2 100644 --- a/tests/script/loop_json/st20p_2v_v210.json +++ b/tests/script/loop_json/st20p_1v_v210.json @@ -12,14 +12,14 @@ "tx_sessions": [ { "dip": [ - "192.168.17.102" + "local:1" ], "interface": [ 0 ], "st20p": [ { - "replicas": 2, + "replicas": 1, "start_port": 20000, "payload_type": 112, "width": 1920, @@ -36,14 +36,14 @@ "rx_sessions": [ { "ip": [ - "192.168.17.101" + "local:0" ], "interface": [ 1 ], "st20p": [ { - "replicas": 2, + "replicas": 1, "start_port": 20000, "payload_type": 112, "width": 1920, diff --git a/tests/src/st20p_test.cpp b/tests/src/st20p_test.cpp index 126e8d27a..37014e2d0 100644 --- a/tests/src/st20p_test.cpp +++ b/tests/src/st20p_test.cpp @@ -1046,6 +1046,7 @@ static void st20p_rx_digest_test(enum st_fps fps[], int width[], int height[], EXPECT_NEAR(vsyncrate_tx[i], st_frame_rate(fps[i]), st_frame_rate(fps[i]) * 0.1); test_ctx_tx[i]->stop = true; + if (para->block_get) st20p_tx_wake_block(tx_handle[i]); test_ctx_tx[i]->cv.notify_all(); tx_thread[i].join(); if (para->send_done_check) { @@ -1071,6 +1072,7 @@ static void st20p_rx_digest_test(enum st_fps fps[], int width[], int height[], EXPECT_NEAR(vsyncrate_rx[i], st_frame_rate(fps[i]), st_frame_rate(fps[i]) * 0.1); test_ctx_rx[i]->stop = true; + if (para->block_get) st20p_rx_wake_block(rx_handle[i]); test_ctx_rx[i]->cv.notify_all(); rx_thread[i].join(); } diff --git a/tests/src/st22p_test.cpp b/tests/src/st22p_test.cpp index 17e4ba59c..a28559eb9 100644 --- a/tests/src/st22p_test.cpp +++ b/tests/src/st22p_test.cpp @@ -1078,6 +1078,7 @@ static void st22p_rx_digest_test(enum st_fps fps[], int width[], int height[], EXPECT_NEAR(vsyncrate_tx[i], st_frame_rate(fps[i]), st_frame_rate(fps[i]) * 0.1); test_ctx_tx[i]->stop = true; + if (para->block_get) st22p_tx_wake_block(tx_handle[i]); test_ctx_tx[i]->cv.notify_all(); tx_thread[i].join(); } @@ -1095,6 +1096,7 @@ static void st22p_rx_digest_test(enum st_fps fps[], int width[], int height[], EXPECT_NEAR(vsyncrate_rx[i], st_frame_rate(fps[i]), st_frame_rate(fps[i]) * 0.1); test_ctx_rx[i]->stop = true; + if (para->block_get) st22p_rx_wake_block(rx_handle[i]); test_ctx_rx[i]->cv.notify_all(); rx_thread[i].join(); }