Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

app: use block mode for pipeline sample #696

Merged
merged 1 commit into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 3 additions & 23 deletions app/sample/rx_st20_pipeline_sample.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ struct rx_st20p_sample_ctx {
pthread_t frame_thread;

int fb_recv;
pthread_cond_t wake_cond;
pthread_mutex_t wake_mutex;

size_t frame_size;
int dst_fd;
Expand All @@ -24,16 +22,6 @@ struct rx_st20p_sample_ctx {
int fb_cnt;
};

static int rx_st20p_frame_available(void* priv) {
struct rx_st20p_sample_ctx* s = priv;

st_pthread_mutex_lock(&s->wake_mutex);
st_pthread_cond_signal(&s->wake_cond);
st_pthread_mutex_unlock(&s->wake_mutex);

return 0;
}

static int rx_st20p_close_source(struct rx_st20p_sample_ctx* s) {
if (s->dst_begin) {
munmap(s->dst_begin, s->dst_end - s->dst_begin);
Expand Down Expand Up @@ -102,9 +90,7 @@ static void* rx_st20p_frame_thread(void* arg) {
while (!s->stop) {
frame = st20p_rx_get_frame(handle);
if (!frame) { /* no frame */
st_pthread_mutex_lock(&s->wake_mutex);
if (!s->stop) st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex);
st_pthread_mutex_unlock(&s->wake_mutex);
warn("%s(%d), get frame time out\n", __func__, s->idx);
continue;
}
dbg("%s(%d), one new frame\n", __func__, s->idx);
Expand Down Expand Up @@ -153,8 +139,6 @@ int main(int argc, char** argv) {
memset(app[i], 0, sizeof(struct rx_st20p_sample_ctx));
app[i]->idx = i;
app[i]->stop = false;
st_pthread_mutex_init(&app[i]->wake_mutex, NULL);
st_pthread_cond_init(&app[i]->wake_cond, NULL);
app[i]->dst_fd = -1;
app[i]->fb_cnt = ctx.framebuff_cnt;

Expand All @@ -177,7 +161,7 @@ int main(int argc, char** argv) {
ops_rx.output_fmt = ctx.output_fmt;
ops_rx.device = ST_PLUGIN_DEVICE_AUTO;
ops_rx.framebuff_cnt = app[i]->fb_cnt;
ops_rx.notify_frame_available = rx_st20p_frame_available;
ops_rx.flags = ST20P_RX_FLAG_BLOCK_GET;

st20p_rx_handle rx_handle = st20p_rx_create(ctx.st, &ops_rx);
if (!rx_handle) {
Expand Down Expand Up @@ -214,9 +198,7 @@ int main(int argc, char** argv) {
// stop app thread
for (int i = 0; i < session_num; i++) {
app[i]->stop = true;
st_pthread_mutex_lock(&app[i]->wake_mutex);
st_pthread_cond_signal(&app[i]->wake_cond);
st_pthread_mutex_unlock(&app[i]->wake_mutex);
if (app[i]->handle) st20p_rx_wake_block(app[i]->handle);
pthread_join(app[i]->frame_thread, NULL);
info("%s(%d), received frames %d\n", __func__, i, app[i]->fb_recv);

Expand All @@ -238,8 +220,6 @@ int main(int argc, char** argv) {
for (int i = 0; i < session_num; i++) {
if (app[i]) {
if (app[i]->handle) st20p_rx_free(app[i]->handle);
st_pthread_mutex_destroy(&app[i]->wake_mutex);
st_pthread_cond_destroy(&app[i]->wake_cond);
free(app[i]);
}
}
Expand Down
24 changes: 3 additions & 21 deletions app/sample/rx_st22_pipeline_sample.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ struct rx_st22p_sample_ctx {
pthread_t frame_thread;

int fb_recv;
pthread_cond_t wake_cond;
pthread_mutex_t wake_mutex;

size_t frame_size;
int dst_fd;
Expand All @@ -22,16 +20,6 @@ struct rx_st22p_sample_ctx {
uint8_t* dst_cursor;
};

static int rx_st22p_frame_available(void* priv) {
struct rx_st22p_sample_ctx* s = priv;

st_pthread_mutex_lock(&s->wake_mutex);
st_pthread_cond_signal(&s->wake_cond);
st_pthread_mutex_unlock(&s->wake_mutex);

return 0;
}

static int rx_st22p_close_source(struct rx_st22p_sample_ctx* s) {
if (s->dst_begin) {
munmap(s->dst_begin, s->dst_end - s->dst_begin);
Expand Down Expand Up @@ -107,9 +95,7 @@ static void* rx_st22p_frame_thread(void* arg) {
while (!s->stop) {
frame = st22p_rx_get_frame(handle);
if (!frame) { /* no frame */
st_pthread_mutex_lock(&s->wake_mutex);
if (!s->stop) st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex);
st_pthread_mutex_unlock(&s->wake_mutex);
warn("%s(%d), get frame time out\n", __func__, s->idx);
continue;
}
rx_st22p_consume_frame(s, frame);
Expand Down Expand Up @@ -149,8 +135,6 @@ int main(int argc, char** argv) {
memset(app[i], 0, sizeof(struct rx_st22p_sample_ctx));
app[i]->idx = i;
app[i]->stop = false;
st_pthread_mutex_init(&app[i]->wake_mutex, NULL);
st_pthread_cond_init(&app[i]->wake_cond, NULL);
app[i]->dst_fd = -1;

struct st22p_rx_ops ops_rx;
Expand All @@ -175,7 +159,7 @@ int main(int argc, char** argv) {
ops_rx.max_codestream_size = 0; /* let lib to decide */
ops_rx.framebuff_cnt = ctx.framebuff_cnt;
ops_rx.codec_thread_cnt = 2;
ops_rx.notify_frame_available = rx_st22p_frame_available;
ops_rx.flags = ST22P_RX_FLAG_BLOCK_GET;

st22p_rx_handle rx_handle = st22p_rx_create(ctx.st, &ops_rx);
if (!rx_handle) {
Expand Down Expand Up @@ -211,9 +195,7 @@ int main(int argc, char** argv) {
// stop app thread
for (int i = 0; i < session_num; i++) {
app[i]->stop = true;
st_pthread_mutex_lock(&app[i]->wake_mutex);
st_pthread_cond_signal(&app[i]->wake_cond);
st_pthread_mutex_unlock(&app[i]->wake_mutex);
if (app[i]->handle) st22p_rx_wake_block(app[i]->handle);
pthread_join(app[i]->frame_thread, NULL);
info("%s(%d), received frames %d\n", __func__, i, app[i]->fb_recv);

Expand Down
26 changes: 3 additions & 23 deletions app/sample/tx_st20_pipeline_sample.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ struct tx_st20p_sample_ctx {

int fb_send;
int fb_send_done;
pthread_cond_t wake_cond;
pthread_mutex_t wake_mutex;

size_t frame_size;
uint8_t* source_begin;
Expand Down Expand Up @@ -94,16 +92,6 @@ static int tx_st20p_open_source(struct tx_st20p_sample_ctx* s, char* file) {
return 0;
}

static int tx_st20p_frame_available(void* priv) {
struct tx_st20p_sample_ctx* s = priv;

st_pthread_mutex_lock(&s->wake_mutex);
st_pthread_cond_signal(&s->wake_cond);
st_pthread_mutex_unlock(&s->wake_mutex);

return 0;
}

static int tx_st20p_frame_done(void* priv, struct st_frame* frame) {
struct tx_st20p_sample_ctx* s = priv;
MTL_MAY_UNUSED(frame);
Expand All @@ -128,9 +116,7 @@ static void* tx_st20p_frame_thread(void* arg) {
while (!s->stop) {
frame = st20p_tx_get_frame(handle);
if (!frame) { /* no frame */
st_pthread_mutex_lock(&s->wake_mutex);
if (!s->stop) st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex);
st_pthread_mutex_unlock(&s->wake_mutex);
warn("%s(%d), get frame time out\n", __func__, s->idx);
continue;
}

Expand Down Expand Up @@ -189,8 +175,6 @@ int main(int argc, char** argv) {
snprintf(app[i]->meta.dummy, sizeof(app[i]->meta.dummy), "st20p_tx_%d", i);
app[i]->has_user_meta = true;
}
st_pthread_mutex_init(&app[i]->wake_mutex, NULL);
st_pthread_cond_init(&app[i]->wake_cond, NULL);

struct st20p_tx_ops ops_tx;
memset(&ops_tx, 0, sizeof(ops_tx));
Expand Down Expand Up @@ -218,7 +202,7 @@ int main(int argc, char** argv) {
ops_tx.transport_fmt = ctx.fmt;
ops_tx.device = ST_PLUGIN_DEVICE_AUTO;
ops_tx.framebuff_cnt = ctx.framebuff_cnt;
ops_tx.notify_frame_available = tx_st20p_frame_available;
ops_tx.flags = ST20P_TX_FLAG_BLOCK_GET;
ops_tx.notify_frame_done = tx_st20p_frame_done;

st20p_tx_handle tx_handle = st20p_tx_create(ctx.st, &ops_tx);
Expand Down Expand Up @@ -255,9 +239,7 @@ int main(int argc, char** argv) {
// stop app thread
for (int i = 0; i < session_num; i++) {
app[i]->stop = true;
st_pthread_mutex_lock(&app[i]->wake_mutex);
st_pthread_cond_signal(&app[i]->wake_cond);
st_pthread_mutex_unlock(&app[i]->wake_mutex);
if (app[i]->handle) st20p_tx_wake_block(app[i]->handle);
pthread_join(app[i]->frame_thread, NULL);
info("%s(%d), sent frames %d(done %d)\n", __func__, i, app[i]->fb_send,
app[i]->fb_send_done);
Expand All @@ -279,8 +261,6 @@ int main(int argc, char** argv) {
error:
for (int i = 0; i < session_num; i++) {
if (app[i]) {
st_pthread_mutex_destroy(&app[i]->wake_mutex);
st_pthread_cond_destroy(&app[i]->wake_cond);
if (app[i]->handle) st20p_tx_free(app[i]->handle);
free(app[i]);
}
Expand Down
26 changes: 3 additions & 23 deletions app/sample/tx_st22_pipeline_sample.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ struct tx_st22p_sample_ctx {
pthread_t frame_thread;

int fb_send;
pthread_cond_t wake_cond;
pthread_mutex_t wake_mutex;

size_t frame_size;
uint8_t* source_begin;
Expand Down Expand Up @@ -124,16 +122,6 @@ static int tx_st22p_open_source(struct st_sample_context* ctx,
return 0;
}

static int tx_st22p_frame_available(void* priv) {
struct tx_st22p_sample_ctx* s = priv;

st_pthread_mutex_lock(&s->wake_mutex);
st_pthread_cond_signal(&s->wake_cond);
st_pthread_mutex_unlock(&s->wake_mutex);

return 0;
}

static void tx_st22p_build_frame(struct tx_st22p_sample_ctx* s, struct st_frame* frame) {
if (s->frame_cursor + s->frame_size > s->source_end) {
s->frame_cursor = s->source_begin;
Expand Down Expand Up @@ -167,9 +155,7 @@ static void* tx_st22p_frame_thread(void* arg) {
while (!s->stop) {
frame = st22p_tx_get_frame(handle);
if (!frame) { /* no frame */
st_pthread_mutex_lock(&s->wake_mutex);
if (!s->stop) st_pthread_cond_wait(&s->wake_cond, &s->wake_mutex);
st_pthread_mutex_unlock(&s->wake_mutex);
warn("%s(%d), get frame time out\n", __func__, s->idx);
continue;
}
if (s->source_begin) tx_st22p_build_frame(s, frame);
Expand Down Expand Up @@ -212,8 +198,6 @@ int main(int argc, char** argv) {
app[i]->st = ctx.st;
app[i]->idx = i;
app[i]->stop = false;
st_pthread_mutex_init(&app[i]->wake_mutex, NULL);
st_pthread_cond_init(&app[i]->wake_cond, NULL);

struct st22p_tx_ops ops_tx;
memset(&ops_tx, 0, sizeof(ops_tx));
Expand All @@ -239,7 +223,7 @@ int main(int argc, char** argv) {
ops_tx.codestream_size = ops_tx.width * ops_tx.height * bpp / 8;
if (ops_tx.interlaced) ops_tx.codestream_size /= 2;
ops_tx.framebuff_cnt = ctx.framebuff_cnt;
ops_tx.notify_frame_available = tx_st22p_frame_available;
ops_tx.flags = ST22P_TX_FLAG_BLOCK_GET;

st22p_tx_handle tx_handle = st22p_tx_create(ctx.st, &ops_tx);
if (!tx_handle) {
Expand Down Expand Up @@ -270,9 +254,7 @@ int main(int argc, char** argv) {
// stop app thread
for (int i = 0; i < session_num; i++) {
app[i]->stop = true;
st_pthread_mutex_lock(&app[i]->wake_mutex);
st_pthread_cond_signal(&app[i]->wake_cond);
st_pthread_mutex_unlock(&app[i]->wake_mutex);
if (app[i]->handle) st22p_tx_wake_block(app[i]->handle);
pthread_join(app[i]->frame_thread, NULL);
info("%s(%d), sent frames %d\n", __func__, i, app[i]->fb_send);

Expand All @@ -293,8 +275,6 @@ int main(int argc, char** argv) {
error:
for (int i = 0; i < session_num; i++) {
if (app[i]) {
st_pthread_mutex_destroy(&app[i]->wake_mutex);
st_pthread_cond_destroy(&app[i]->wake_cond);
if (app[i]->handle) st22p_tx_free(app[i]->handle);
free(app[i]);
}
Expand Down
8 changes: 0 additions & 8 deletions app/src/app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,6 @@ struct st_app_tx_st22p_session {
double expect_fps;

pthread_t st22p_app_thread;
pthread_cond_t st22p_wake_cond;
pthread_mutex_t st22p_wake_mutex;
bool st22p_app_thread_stop;
};

Expand All @@ -415,8 +413,6 @@ struct st_app_rx_st22p_session {
double expect_fps;

pthread_t st22p_app_thread;
pthread_cond_t st22p_wake_cond;
pthread_mutex_t st22p_wake_mutex;
bool st22p_app_thread_stop;

struct st_display* display;
Expand Down Expand Up @@ -448,8 +444,6 @@ struct st_app_tx_st20p_session {
double expect_fps;

pthread_t st20p_app_thread;
pthread_cond_t st20p_wake_cond;
pthread_mutex_t st20p_wake_mutex;
bool st20p_app_thread_stop;
};

Expand All @@ -473,8 +467,6 @@ struct st_app_rx_st20p_session {
double expect_fps;

pthread_t st20p_app_thread;
pthread_cond_t st20p_wake_cond;
pthread_mutex_t st20p_wake_mutex;
bool st20p_app_thread_stop;

struct st_display* display;
Expand Down
31 changes: 5 additions & 26 deletions app/src/rx_st20p_app.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,6 @@

#include "rx_st20p_app.h"

static int app_rx_st20p_frame_available(void* priv) {
struct st_app_rx_st20p_session* s = priv;

st_pthread_mutex_lock(&s->st20p_wake_mutex);
st_pthread_cond_signal(&s->st20p_wake_cond);
st_pthread_mutex_unlock(&s->st20p_wake_mutex);

return 0;
}

static void app_rx_st20p_consume_frame(struct st_app_rx_st20p_session* s,
struct st_frame* frame) {
struct st_display* d = s->display;
Expand Down Expand Up @@ -63,11 +53,8 @@ static void* app_rx_st20p_frame_thread(void* arg) {
info("%s(%d), start\n", __func__, s->idx);
while (!s->st20p_app_thread_stop) {
frame = st20p_rx_get_frame(s->handle);
if (!frame) { /* no frame */
st_pthread_mutex_lock(&s->st20p_wake_mutex);
if (!s->st20p_app_thread_stop)
st_pthread_cond_wait(&s->st20p_wake_cond, &s->st20p_wake_mutex);
st_pthread_mutex_unlock(&s->st20p_wake_mutex);
if (!frame) { /* no ready frame */
warn("%s(%d), get frame time out\n", __func__, s->idx);
continue;
}

Expand Down Expand Up @@ -140,16 +127,11 @@ static int app_rx_st20p_uinit(struct st_app_rx_st20p_session* s) {
s->st20p_app_thread_stop = true;
if (s->st20p_app_thread_stop) {
/* wake up the thread */
st_pthread_mutex_lock(&s->st20p_wake_mutex);
st_pthread_cond_signal(&s->st20p_wake_cond);
st_pthread_mutex_unlock(&s->st20p_wake_mutex);
info("%s(%d), wait app thread stop\n", __func__, idx);
if (s->handle) st20p_rx_wake_block(s->handle);
pthread_join(s->st20p_app_thread, NULL);
}

st_pthread_mutex_destroy(&s->st20p_wake_mutex);
st_pthread_cond_destroy(&s->st20p_wake_cond);

if (s->handle) {
ret = st20p_rx_free(s->handle);
if (ret < 0) err("%s(%d), st20_rx_free fail %d\n", __func__, idx, ret);
Expand Down Expand Up @@ -235,16 +217,13 @@ static int app_rx_st20p_init(struct st_app_context* ctx,
ops.transport_fmt = st20p ? st20p->info.transport_format : ST20_FMT_YUV_422_10BIT;
ops.port.payload_type = st20p ? st20p->base.payload_type : ST_APP_PAYLOAD_TYPE_VIDEO;
ops.device = st20p ? st20p->info.device : ST_PLUGIN_DEVICE_AUTO;
ops.notify_frame_available = app_rx_st20p_frame_available;
ops.flags |= ST20P_RX_FLAG_BLOCK_GET;
ops.framebuff_cnt = s->framebuff_cnt;
/* always try to enable DMA offload */
ops.flags = ST20P_RX_FLAG_DMA_OFFLOAD;
ops.flags |= ST20P_RX_FLAG_DMA_OFFLOAD;
if (st20p && st20p->enable_rtcp) ops.flags |= ST20P_RX_FLAG_ENABLE_RTCP;
if (ctx->enable_timing_parser) ops.flags |= ST20P_RX_FLAG_TIMING_PARSER_STAT;

st_pthread_mutex_init(&s->st20p_wake_mutex, NULL);
st_pthread_cond_init(&s->st20p_wake_cond, NULL);

s->width = ops.width;
s->height = ops.height;
if (ops.interlaced) {
Expand Down
Loading