diff --git a/ecosystem/gstreamer_plugin/gst_mtl_common.c b/ecosystem/gstreamer_plugin/gst_mtl_common.c index a00b0a944..d7cf2ebcd 100644 --- a/ecosystem/gstreamer_plugin/gst_mtl_common.c +++ b/ecosystem/gstreamer_plugin/gst_mtl_common.c @@ -8,6 +8,8 @@ #include "gst_mtl_common.h" +guint gst_mtl_port_idx = MTL_PORT_P; + gboolean gst_mtl_common_parse_input_finfo(const GstVideoFormatInfo* finfo, enum st_frame_fmt* fmt) { if (finfo->format == GST_VIDEO_FORMAT_v210) { @@ -203,3 +205,189 @@ gboolean gst_mtl_common_parse_sampling(gint sampling, enum st30_sampling* st_sam return FALSE; } } + +void gst_mtl_common_init_general_argumetns(GObjectClass* gobject_class) { + g_object_class_install_property( + gobject_class, PROP_GENERAL_LOG_LEVEL, + g_param_spec_boolean("silent", "Silent", "Turn on silent mode.", FALSE, + G_PARAM_READWRITE)); + + g_object_class_install_property( + gobject_class, PROP_GENERAL_DEV_ARGS_PORT, + g_param_spec_string("dev-port", "DPDK device port", + "DPDK port for synchronous ST 2110-20 uncompressed" + "video transmission, bound to the VFIO DPDK driver. ", + NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property( + gobject_class, PROP_GENERAL_DEV_ARGS_SIP, + g_param_spec_string("dev-ip", "Local device IP", + "Local IP address that the port will be " + "identified by. This is the address from which ARP " + "responses will be sent.", + NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property( + gobject_class, PROP_GENERAL_DEV_ARGS_DMA_DEV, + g_param_spec_string("dma-dev", "DPDK DMA port", + "DPDK port for the MTL direct memory functionality.", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property( + gobject_class, PROP_GENERAL_PORT_PORT, + g_param_spec_string("port", "Transmission Device Port", + "DPDK device port initialized for the transmission.", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property( + gobject_class, PROP_GENERAL_PORT_IP, + g_param_spec_string("ip", "Sender node's IP", "Receiving MTL node IP address.", + NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property( + gobject_class, PROP_GENERAL_PORT_UDP_PORT, + g_param_spec_uint("udp-port", "Sender UDP port", "Receiving MTL node UDP port.", 0, + G_MAXUINT, 20000, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property( + gobject_class, PROP_GENERAL_PORT_TX_QUEUES, + g_param_spec_uint("tx-queues", "Number of TX queues", + "Number of TX queues to initialize in DPDK backend.", 0, + G_MAXUINT, 16, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property( + gobject_class, PROP_GENERAL_PORT_RX_QUEUES, + g_param_spec_uint("rx-queues", "Number of RX queues", + "Number of RX queues to initialize in DPDK backend.", 0, + G_MAXUINT, 16, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property( + gobject_class, PROP_GENERAL_PORT_PAYLOAD_TYPE, + g_param_spec_uint("payload-type", "ST 2110 payload type", + "SMPTE ST 2110 payload type.", 0, G_MAXUINT, 112, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +} + +void gst_mtl_common_set_general_argumetns(GObject* object, guint prop_id, + const GValue* value, GParamSpec* pspec, + StDevArgs* devArgs, SessionPortArgs* portArgs, + guint* log_level) { + switch (prop_id) { + case PROP_GENERAL_LOG_LEVEL: + *log_level = g_value_get_uint(value); + break; + case PROP_GENERAL_DEV_ARGS_PORT: + strncpy(devArgs->port, g_value_get_string(value), MTL_PORT_MAX_LEN); + break; + case PROP_GENERAL_DEV_ARGS_SIP: + strncpy(devArgs->local_ip_string, g_value_get_string(value), MTL_PORT_MAX_LEN); + break; + case PROP_GENERAL_DEV_ARGS_DMA_DEV: + strncpy(devArgs->dma_dev, g_value_get_string(value), MTL_PORT_MAX_LEN); + break; + case PROP_GENERAL_PORT_PORT: + strncpy(portArgs->port, g_value_get_string(value), MTL_PORT_MAX_LEN); + break; + case PROP_GENERAL_PORT_IP: + strncpy(portArgs->session_ip_string, g_value_get_string(value), MTL_PORT_MAX_LEN); + break; + case PROP_GENERAL_PORT_UDP_PORT: + portArgs->udp_port = g_value_get_uint(value); + break; + case PROP_GENERAL_PORT_PAYLOAD_TYPE: + portArgs->payload_type = g_value_get_uint(value); + break; + case PROP_GENERAL_PORT_RX_QUEUES: + devArgs->rx_queues_cnt[MTL_PORT_P] = g_value_get_uint(value); + break; + case PROP_GENERAL_PORT_TX_QUEUES: + devArgs->tx_queues_cnt[MTL_PORT_P] = g_value_get_uint(value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +void gst_mtl_common_get_general_argumetns(GObject* object, guint prop_id, + const GValue* value, GParamSpec* pspec, + StDevArgs* devArgs, SessionPortArgs* portArgs, + guint* log_level) { + switch (prop_id) { + case PROP_GENERAL_LOG_LEVEL: + g_value_set_uint(value, *log_level); + break; + case PROP_GENERAL_DEV_ARGS_PORT: + g_value_set_string(value, devArgs->port); + break; + case PROP_GENERAL_DEV_ARGS_SIP: + g_value_set_string(value, devArgs->local_ip_string); + break; + case PROP_GENERAL_DEV_ARGS_DMA_DEV: + g_value_set_string(value, devArgs->dma_dev); + break; + case PROP_GENERAL_PORT_PORT: + g_value_set_string(value, portArgs->port); + break; + case PROP_GENERAL_PORT_IP: + g_value_set_string(value, portArgs->session_ip_string); + break; + case PROP_GENERAL_PORT_UDP_PORT: + g_value_set_uint(value, portArgs->udp_port); + break; + case PROP_GENERAL_PORT_PAYLOAD_TYPE: + g_value_set_uint(value, portArgs->payload_type); + break; + case PROP_GENERAL_PORT_RX_QUEUES: + g_value_set_uint(value, devArgs->rx_queues_cnt[MTL_PORT_P]); + break; + case PROP_GENERAL_PORT_TX_QUEUES: + g_value_set_uint(value, devArgs->tx_queues_cnt[MTL_PORT_P]); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +gboolean gst_mtl_common_parse_dev_arguments(struct mtl_init_params* mtl_init_params, + StDevArgs* devArgs) { + gint ret; + + if (gst_mtl_port_idx > MTL_PORT_R) { + GST_ERROR("%s, invalid port number %d\n", __func__, gst_mtl_port_idx); + return FALSE; + } + + strncpy(mtl_init_params->port[gst_mtl_port_idx], devArgs->port, MTL_PORT_MAX_LEN); + + ret = inet_pton(AF_INET, devArgs->local_ip_string, + mtl_init_params->sip_addr[gst_mtl_port_idx]); + if (ret != 1) { + GST_ERROR("%s, sip %s is not valid ip address\n", __func__, devArgs->local_ip_string); + return FALSE; + } + + if (devArgs->rx_queues_cnt[gst_mtl_port_idx]) { + mtl_init_params->rx_queues_cnt[gst_mtl_port_idx] = + devArgs->rx_queues_cnt[gst_mtl_port_idx]; + } else { + mtl_init_params->rx_queues_cnt[gst_mtl_port_idx] = 16; + } + + if (devArgs->tx_queues_cnt[gst_mtl_port_idx]) { + mtl_init_params->tx_queues_cnt[gst_mtl_port_idx] = + devArgs->tx_queues_cnt[gst_mtl_port_idx]; + } else { + mtl_init_params->tx_queues_cnt[gst_mtl_port_idx] = 16; + } + + mtl_init_params->num_ports++; + + if (devArgs->dma_dev && strlen(devArgs->dma_dev)) { + strncpy(mtl_init_params->dma_dev_port[0], devArgs->dma_dev, MTL_PORT_MAX_LEN); + } + + gst_mtl_port_idx++; + return ret; +} diff --git a/ecosystem/gstreamer_plugin/gst_mtl_common.h b/ecosystem/gstreamer_plugin/gst_mtl_common.h index 088a978ea..50f3dfcfe 100644 --- a/ecosystem/gstreamer_plugin/gst_mtl_common.h +++ b/ecosystem/gstreamer_plugin/gst_mtl_common.h @@ -12,10 +12,30 @@ #include #include #include +#include #include +#define PAYLOAD_TYPE_AUDIO (111) +#define PAYLOAD_TYPE_VIDEO (112) +#define PAYLOAD_TYPE_ANCILLARY (113) + #define NS_PER_MS (1000 * 1000) +enum { + PROP_GENERAL_0, + PROP_GENERAL_LOG_LEVEL, + PROP_GENERAL_DEV_ARGS_PORT, + PROP_GENERAL_DEV_ARGS_SIP, + PROP_GENERAL_DEV_ARGS_DMA_DEV, + PROP_GENERAL_PORT_PORT, + PROP_GENERAL_PORT_IP, + PROP_GENERAL_PORT_UDP_PORT, + PROP_GENERAL_PORT_PAYLOAD_TYPE, + PROP_GENERAL_PORT_RX_QUEUES, + PROP_GENERAL_PORT_TX_QUEUES, + PROP_GENERAL_MAX +}; + enum gst_mtl_supported_fps { GST_MTL_SUPPORTED_FPS_23_98 = 2398, GST_MTL_SUPPORTED_FPS_24 = 24, @@ -60,4 +80,19 @@ gboolean gst_mtl_common_parse_pixel_format(const char* format, enum st_frame_fmt gboolean gst_mtl_common_parse_audio_format(const char* format, enum st30_fmt* audio); gboolean gst_mtl_common_parse_sampling(gint sampling, enum st30_sampling* st_sampling); +gboolean gst_mtl_common_parse_dev_arguments(struct mtl_init_params* mtl_init_params, + StDevArgs* devArgs); + +void gst_mtl_common_init_general_argumetns(GObjectClass* gobject_class); + +void gst_mtl_common_set_general_argumetns(GObject* object, guint prop_id, + const GValue* value, GParamSpec* pspec, + StDevArgs* devArgs, SessionPortArgs* portArgs, + guint* log_level); + +void gst_mtl_common_get_general_argumetns(GObject* object, guint prop_id, + const GValue* value, GParamSpec* pspec, + StDevArgs* devArgs, SessionPortArgs* portArgs, + guint* log_level); + #endif /* __GST_MTL_COMMON_H__ */ \ No newline at end of file diff --git a/ecosystem/gstreamer_plugin/gst_mtl_st20p_rx.h b/ecosystem/gstreamer_plugin/gst_mtl_st20p_rx.h index 94137e5d2..579e88294 100644 --- a/ecosystem/gstreamer_plugin/gst_mtl_st20p_rx.h +++ b/ecosystem/gstreamer_plugin/gst_mtl_st20p_rx.h @@ -48,6 +48,9 @@ #define __GST_MTL_ST20P_RX_H__ #include +#include +#include +#include #include "gst_mtl_common.h" diff --git a/ecosystem/gstreamer_plugin/gst_mtl_st40_rx.c b/ecosystem/gstreamer_plugin/gst_mtl_st40_rx.c new file mode 100644 index 000000000..c5c504f11 --- /dev/null +++ b/ecosystem/gstreamer_plugin/gst_mtl_st40_rx.c @@ -0,0 +1,511 @@ +/* + * GStreamer + * Copyright (C) 2005 Thomas Vander Stichele + * Copyright (C) 2005 Ronald S. Bultje + * Copyright (C) 2024 Intel Corporation + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Alternatively, the contents of this file may be used under the + * GNU Lesser General Public License Version 2.1 (the "LGPL"), in + * which case the following provisions apply instead of the ones + * mentioned above: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +/** + * SECTION:element-mtl_rx_src + * + * The mtl_rx_src element is a GStreamer src plugin designed to interface with + * the Media Transport Library (MTL). + * MTL is a software-based solution optimized for high-throughput, low-latency + * transmission and reception of media data. + * + * It features an efficient user-space LibOS UDP stack specifically crafted for + * media transport and includes a built-in SMPTE ST 2110-compliant + * implementation for Professional Media over Managed IP Networks. + * + * This element allows GStreamer pipelines to recive media data using the MTL + * mbufwork, ensuring efficient and reliable media transport over IP networks. + * + */ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include "gst_mtl_st40_rx.h" + +GST_DEBUG_CATEGORY_STATIC(gst_mtl_st40_rx_debug); +#define GST_CAT_DEFAULT gst_mtl_st40_rx_debug + +#ifndef GST_LICENSE +#define GST_LICENSE "LGPL" +#endif + +#ifndef GST_API_VERSION +#define GST_API_VERSION "1.0" +#endif + +#ifndef GST_PACKAGE_NAME +#define GST_PACKAGE_NAME "Media Transport Library st2110 st40 rx plugin" +#endif + +#ifndef GST_PACKAGE_ORIGIN +#define GST_PACKAGE_ORIGIN "https://github.com/OpenVisualCloud/Media-Transport-Library" +#endif + +#ifndef PACKAGE +#define PACKAGE "gst-mtl-st40-rx" +#endif + +#ifndef PACKAGE_VERSION +#define PACKAGE_VERSION "1.0" +#endif + +enum { + PROP_ST40_RX_BUFFER_SIZE = PROP_GENERAL_MAX, + PROP_ST40_RX_TIMEOUT_MBUF_GET, + PROP_MAX +}; + +/* pad template */ +static GstStaticPadTemplate gst_mtl_st40_rx_src_pad_template = + GST_STATIC_PAD_TEMPLATE("src", GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); + +#define gst_mtl_st40_rx_parent_class parent_class +G_DEFINE_TYPE_WITH_CODE(Gst_Mtl_St40_Rx, gst_mtl_st40_rx, GST_TYPE_BASE_SRC, + GST_DEBUG_CATEGORY_INIT(gst_mtl_st40_rx_debug, "mtl_st40_rx", 0, + "MTL St2110 st40 transmission src")); + +#define IS_POWER_OF_2(x) (((x) & ((x)-1)) == 0) + +GST_ELEMENT_REGISTER_DEFINE(mtl_st40_rx, "mtl_st40_rx", GST_RANK_NONE, + GST_TYPE_MTL_ST40_RX); + +static void gst_mtl_st40_rx_set_property(GObject* object, guint prop_id, + const GValue* value, GParamSpec* pspec); +static void gst_mtl_st40_rx_get_property(GObject* object, guint prop_id, GValue* value, + GParamSpec* pspec); +static void gst_mtl_st40_rx_finalize(GObject* object); + +static gboolean gst_mtl_st40_rx_start(GstBaseSrc* basesrc); +static gboolean gst_mtl_st40_rx_stop(GstBaseSrc* basesrc); +static GstFlowReturn gst_mtl_st40_rx_create(GstBaseSrc* basesrc, guint64 offset, + guint length, GstBuffer** buffer); + +static gint gst_mtl_st40_rx_mbuff_avalible(void* priv); +static void* gst_mtl_st40_rx_get_mbuf_with_timeout(Gst_Mtl_St40_Rx* src, + st40_rx_handle handle, void** usrptr, + uint16_t* size); +static GstFlowReturn gst_mtl_st40_rx_fill_buffer(Gst_Mtl_St40_Rx* src, GstBuffer** buf, + void* usrptr); + +static gint gst_mtl_st40_rx_mbuff_avalible(void* priv) { + Gst_Mtl_St40_Rx* src = (Gst_Mtl_St40_Rx*)priv; + + pthread_mutex_lock(&(src->get_mbuff_mutex)); + pthread_cond_signal(&(src->get_mbuff_cond)); + pthread_mutex_unlock(&(src->get_mbuff_mutex)); + + return 0; +} + +static void gst_mtl_st40_rx_class_init(Gst_Mtl_St40_RxClass* klass) { + GObjectClass* gobject_class; + GstElementClass* gstelement_class; + GstBaseSrcClass* gstbasesrc_class; + + gobject_class = G_OBJECT_CLASS(klass); + gstelement_class = GST_ELEMENT_CLASS(klass); + gstbasesrc_class = GST_BASE_SRC_CLASS(klass); + + gst_element_class_set_metadata( + gstelement_class, "MtlRxSt40Src", "Src/Audio", + "MTL transmission plugin for SMPTE ST 2110-20 standard (uncompressed video)", + "Dawid Wesierski "); + + gst_element_class_add_static_pad_template(gstelement_class, + &gst_mtl_st40_rx_src_pad_template); + + gobject_class->set_property = GST_DEBUG_FUNCPTR(gst_mtl_st40_rx_set_property); + gobject_class->get_property = GST_DEBUG_FUNCPTR(gst_mtl_st40_rx_get_property); + gobject_class->finalize = GST_DEBUG_FUNCPTR(gst_mtl_st40_rx_finalize); + + gstbasesrc_class->start = GST_DEBUG_FUNCPTR(gst_mtl_st40_rx_start); + gstbasesrc_class->stop = GST_DEBUG_FUNCPTR(gst_mtl_st40_rx_stop); + gstbasesrc_class->create = GST_DEBUG_FUNCPTR(gst_mtl_st40_rx_create); + + gst_mtl_common_init_general_argumetns(gobject_class); + + g_object_class_install_property( + gobject_class, PROP_ST40_RX_BUFFER_SIZE, + g_param_spec_uint("buffer-size", "Buffer Size", + "Size of the buffer used for receiving data", 0, G_MAXUINT, 1024, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +} + +static gboolean gst_mtl_st40_rx_start(GstBaseSrc* basesrc) { + struct mtl_init_params mtl_init_params = {0}; + struct st40_rx_ops* ops_rx; + gint ret; + + Gst_Mtl_St40_Rx* src = GST_MTL_ST40_RX(basesrc); + ops_rx = &src->ops_rx; + + GST_DEBUG_OBJECT(src, "start"); + GST_DEBUG("Media Transport Initialization start"); + + /* mtl is already initialzied */ + if (src->mtl_lib_handle) { + GST_INFO("Mtl already initialized"); + if (mtl_start(src->mtl_lib_handle)) { + GST_ERROR("Failed to start MTL"); + return FALSE; + } + return TRUE; + } else { + if (src->mtl_lib_handle) { + GST_ERROR("MTL already initialized"); + return FALSE; + } + + if (!gst_mtl_common_parse_dev_arguments(&mtl_init_params, &(src->devArgs))) { + GST_ERROR("Failed to parse dev arguments"); + return FALSE; + } + + if (src->log_level && src->log_level < MTL_LOG_LEVEL_MAX) { + mtl_init_params.log_level = src->log_level; + } else { + mtl_init_params.log_level = MTL_LOG_LEVEL_INFO; + } + + src->mtl_lib_handle = mtl_init(&mtl_init_params); + if (!src->mtl_lib_handle) { + GST_ERROR("Could not initialize MTL"); + return FALSE; + } + } + + if (src->timeout_mbuf_get_seconds <= 0) { + src->timeout_mbuf_get_seconds = 10; + } else if (src->timeout_mbuf_get_seconds <= 3) { + GST_WARNING("Timeout for getting mbuf is too low, setting to 3 seconds"); + src->timeout_mbuf_get_seconds = 3; + } + ops_rx->name = "st40src"; + ops_rx->priv = basesrc; + ops_rx->notify_rtp_ready = gst_mtl_st40_rx_mbuff_avalible; + + if (src->mbuff_size) { + if (!IS_POWER_OF_2(src->mbuff_size)) { + GST_WARNING("Buffer size is not power of 2, setting to 1024"); + ops_rx->rtp_ring_size = 1024; + src->mbuff_size = 1024; + } else { + ops_rx->rtp_ring_size = src->mbuff_size; + } + } else { + ops_rx->rtp_ring_size = 1024; + src->mbuff_size = 1024; + } + + if (inet_pton(AF_INET, src->portArgs.session_ip_string, ops_rx->ip_addr[MTL_PORT_P]) != + 1) { + GST_ERROR("Invalid destination IP address: %s", src->portArgs.session_ip_string); + return FALSE; + } + + if (strlen(src->portArgs.port) == 0) { + strncpy(ops_rx->port[MTL_PORT_P], src->devArgs.port, MTL_PORT_MAX_LEN); + } else { + strncpy(ops_rx->port[MTL_PORT_P], src->portArgs.port, MTL_PORT_MAX_LEN); + } + ops_rx->num_port = 1; + + if ((src->portArgs.udp_port < 0) || (src->portArgs.udp_port > 0xFFFF)) { + GST_ERROR("%s, invalid UDP port: %d\n", __func__, src->portArgs.udp_port); + } else { + ops_rx->udp_port[0] = src->portArgs.udp_port; + } + + if (src->portArgs.payload_type == 0) { + ops_rx->payload_type = PAYLOAD_TYPE_ANCILLARY; + } else if ((src->portArgs.payload_type < 0) || (src->portArgs.payload_type > 0x7F)) { + GST_ERROR("%s, invalid payload_type: %d\n", __func__, src->portArgs.payload_type); + } else { + ops_rx->payload_type = src->portArgs.payload_type; + } + + ret = mtl_start(src->mtl_lib_handle); + if (ret < 0) { + GST_ERROR("Failed to start MTL"); + return FALSE; + } + + if (pthread_mutex_init(&(src->get_mbuff_mutex), NULL) || + pthread_cond_init(&(src->get_mbuff_cond), NULL)) { + GST_ERROR("Failed to initialize mutex or condition variable"); + return FALSE; + } + + src->rx_handle = st40_rx_create(src->mtl_lib_handle, ops_rx); + if (!src->rx_handle) { + GST_ERROR("Failed to create st40 rx handle"); + return FALSE; + } + + return TRUE; +} + +static void gst_mtl_st40_rx_init(Gst_Mtl_St40_Rx* src) { + GstElement* element = GST_ELEMENT(src); + GstPad* srcpad; + + srcpad = gst_element_get_static_pad(element, "src"); + if (!srcpad) { + GST_ERROR_OBJECT(src, "Failed to get src pad from child element"); + return; + } +} + +static void gst_mtl_st40_rx_set_property(GObject* object, guint prop_id, + const GValue* value, GParamSpec* pspec) { + Gst_Mtl_St40_Rx* self = GST_MTL_ST40_RX(object); + + if (prop_id < PROP_GENERAL_MAX) { + gst_mtl_common_set_general_argumetns(object, prop_id, value, pspec, &(self->devArgs), + &(self->portArgs), &self->log_level); + return; + } + + switch (prop_id) { + case PROP_ST40_RX_BUFFER_SIZE: + self->mbuff_size = g_value_get_uint(value); + break; + case PROP_ST40_RX_TIMEOUT_MBUF_GET: + self->timeout_mbuf_get_seconds = g_value_get_uint(value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void gst_mtl_st40_rx_get_property(GObject* object, guint prop_id, GValue* value, + GParamSpec* pspec) { + Gst_Mtl_St40_Rx* src = GST_MTL_ST40_RX(object); + + if (prop_id < PROP_GENERAL_MAX) { + gst_mtl_common_get_general_argumetns(object, prop_id, value, pspec, &(src->devArgs), + &(src->portArgs), &src->log_level); + return; + } + + switch (prop_id) { + case PROP_ST40_RX_BUFFER_SIZE: + g_value_set_uint(value, src->mbuff_size); + break; + case PROP_ST40_RX_TIMEOUT_MBUF_GET: + g_value_set_uint(value, src->timeout_mbuf_get_seconds); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void* gst_mtl_st40_rx_get_mbuf_with_timeout(Gst_Mtl_St40_Rx* src, + st40_rx_handle handle, void** usrptr, + uint16_t* size) { + struct timespec ts; + gint ret; + void* mbuf = NULL; + + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += src->timeout_mbuf_get_seconds; + ret = pthread_mutex_timedlock(&(src->get_mbuff_mutex), &ts); + + if (!ret) + ret = pthread_cond_timedwait(&(src->get_mbuff_cond), &(src->get_mbuff_mutex), &ts); + + if (ret == ETIMEDOUT) + return NULL; + + pthread_mutex_unlock(&(src->get_mbuff_mutex)); + + mbuf = st40_rx_get_mbuf(src->rx_handle, usrptr, size); + if (!mbuf) + GST_ERROR("Failed to get ancillary mbuf\n"); + + return mbuf; +} + +static GstFlowReturn gst_mtl_st40_rx_fill_buffer(Gst_Mtl_St40_Rx* src, GstBuffer** buffer, + void* usrptr) { + struct st40_rfc8331_rtp_hdr* hdr; + struct st40_rfc8331_payload_hdr* payload_hdr; + GstMapInfo dest_info; + guint16 data, fill_size; + gint udw_size; + + hdr = (struct st40_rfc8331_rtp_hdr*)usrptr; + payload_hdr = (struct st40_rfc8331_payload_hdr*)(&hdr[1]); + udw_size = payload_hdr->second_hdr_chunk.data_count & 0xff; + + if (src->udw_size == 0) { + src->udw_size = udw_size; + src->anc_data = (char*)malloc(udw_size); + } else if (src->udw_size != udw_size) { + GST_WARNING("Size of recieved ancillary data has changed"); + if (src->anc_data) { + free(src->anc_data); + src->anc_data = NULL; + } + src->udw_size = udw_size; + src->anc_data = (char*)malloc(udw_size); + } + + if (udw_size == 0) { + GST_ERROR("Ancillary data size is 0"); + return GST_FLOW_ERROR; + } + + *buffer = gst_buffer_new_allocate(NULL, src->udw_size, NULL); + if (!*buffer) { + GST_ERROR("Failed to allocate space for the buffer"); + return GST_FLOW_ERROR; + } + + if(!gst_buffer_map(*buffer, &dest_info, GST_MAP_WRITE)) { + GST_ERROR("Failed to map the buffer"); + return GST_FLOW_ERROR; + } + + for (int i = 0; i < udw_size; i++) { + data = st40_get_udw(i + 3, (uint8_t*)&payload_hdr->second_hdr_chunk); + if (!st40_check_parity_bits(data)) { + GST_ERROR("Ancillary data parity bits check failed"); + return GST_FLOW_ERROR; + } + src->anc_data[i] = data & 0xff; + } + + fill_size = gst_buffer_fill(*buffer, 0, src->anc_data, udw_size); + gst_buffer_unmap(*buffer, &dest_info); + + if (fill_size != src->udw_size) { + GST_WARNING("Failed to fill buffer"); + } + + return GST_FLOW_OK; +} + +static GstFlowReturn gst_mtl_st40_rx_create(GstBaseSrc* basesrc, guint64 offset, + guint length, GstBuffer** buffer) { + Gst_Mtl_St40_Rx* src = GST_MTL_ST40_RX(basesrc); + void *mbuf, *usrptr; + guint16 size; + gint ret; + + GST_OBJECT_LOCK(src); + + /* get the mbuff */ + mbuf = st40_rx_get_mbuf(src->rx_handle, &usrptr, &size); + if (!mbuf) { + mbuf = gst_mtl_st40_rx_get_mbuf_with_timeout(src, src->rx_handle, &usrptr, &size); + } + + if (size == 0) { + GST_ERROR("No anciallry data recived"); + GST_OBJECT_UNLOCK(src); + return GST_FLOW_ERROR; + } + + if (!mbuf) { + GST_OBJECT_UNLOCK(src); + return GST_FLOW_EOS; + } + + ret = gst_mtl_st40_rx_fill_buffer(src, buffer, usrptr); + + if (ret != GST_FLOW_OK) { + GST_ERROR("Failed to fill buffer"); + } + + st40_rx_put_mbuf(src->rx_handle, mbuf); + GST_OBJECT_UNLOCK(src); + return GST_FLOW_OK; +} + +static void gst_mtl_st40_rx_finalize(GObject* object) { + Gst_Mtl_St40_Rx* src = GST_MTL_ST40_RX(object); + + if (src->anc_data) free(src->anc_data); + + if (src->rx_handle) { + if (st40_rx_free(src->rx_handle)) { + GST_ERROR("Failed to free rx handle"); + } + } + + pthread_mutex_destroy(&src->get_mbuff_mutex); + pthread_cond_destroy(&src->get_mbuff_cond); + + if (src->mtl_lib_handle) { + if (mtl_stop(src->mtl_lib_handle) || mtl_uninit(src->mtl_lib_handle)) { + GST_ERROR("Failed to uninitialize MTL library"); + } + } +} + +static gboolean gst_mtl_st40_rx_stop(GstBaseSrc* basesrc) { + Gst_Mtl_St40_Rx* src = GST_MTL_ST40_RX(basesrc); + + if (src->mtl_lib_handle) { + mtl_stop(src->mtl_lib_handle); + } + + return TRUE; +} + +static gboolean plugin_init(GstPlugin* mtl_st40_rx) { + return gst_element_register(mtl_st40_rx, "mtl_st40_rx", GST_RANK_SECONDARY, + GST_TYPE_MTL_ST40_RX); +} + +GST_PLUGIN_DEFINE(GST_VERSION_MAJOR, GST_VERSION_MINOR, mtl_st40_rx, + "software-based solution designed for high-throughput transmission", + plugin_init, PACKAGE_VERSION, GST_LICENSE, GST_PACKAGE_NAME, + GST_PACKAGE_ORIGIN); diff --git a/ecosystem/gstreamer_plugin/gst_mtl_st40_rx.h b/ecosystem/gstreamer_plugin/gst_mtl_st40_rx.h new file mode 100644 index 000000000..a82837c55 --- /dev/null +++ b/ecosystem/gstreamer_plugin/gst_mtl_st40_rx.h @@ -0,0 +1,87 @@ +/* + * GStreamer + * Copyright (C) 2005 Thomas Vander Stichele + * Copyright (C) 2005 Ronald S. Bultje + * Copyright (C) 2020 Niels De Graef + * Copyright (C) 2024 Intel Corporation + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Alternatively, the contents of this file may be used under the + * GNU Lesser General Public License Version 2.1 (the "LGPL"), in + * which case the following provisions apply instead of the ones + * mentioned above: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __GST_MTL_ST40_RX_H__ +#define __GST_MTL_ST40_RX_H__ + +#include + +#include "gst_mtl_common.h" + +G_BEGIN_DECLS + +#define GST_TYPE_MTL_ST40_RX (gst_mtl_st40_rx_get_type()) +G_DECLARE_FINAL_TYPE(Gst_Mtl_St40_Rx, gst_mtl_st40_rx, GST, MTL_ST40_RX, GstBaseSrc) + +struct _Gst_Mtl_St40_Rx { + GstBaseSrc element; + GstBuffer* buffer; + + pthread_mutex_t get_mbuff_mutex; + pthread_cond_t get_mbuff_cond; + /*< private >*/ + struct st40_rx_ops ops_rx; + guint log_level; + mtl_handle mtl_lib_handle; + st40_rx_handle rx_handle; + + /* arguments for mtl mbuf buffers */ + guint timeout_mbuf_get_seconds; + /* final size of the ring would be mbuff_size * mbuff_ring_amount */ + guint mbuff_ring_amount; + guint16 mbuff_size; + guint16 udw_size; + char* anc_data; + + /* arguments for imtl initialization device */ + StDevArgs devArgs; + /* arguments for imtl rx session */ + SessionPortArgs portArgs; +}; + +G_END_DECLS + +#endif /* __GST_MTL_ST40_RX_H__ */ diff --git a/ecosystem/gstreamer_plugin/meson.build b/ecosystem/gstreamer_plugin/meson.build index 5d679ea3b..34ed5aef5 100644 --- a/ecosystem/gstreamer_plugin/meson.build +++ b/ecosystem/gstreamer_plugin/meson.build @@ -57,7 +57,6 @@ gst_mtl_st20p_tx = library('gstmtl_st20p_tx', c_args: plugin_c_args ) - # mtl_st20p_rx Plugin gst_mtl_st20p_rx_sources = [ 'gst_mtl_st20p_rx.c' @@ -72,7 +71,6 @@ gst_mtl_st20p_rx = library('gstmtl_st20p_rx', c_args: plugin_c_args ) - # mtl_st30p_rx Plugin gst_mtl_st30p_rx_sources = [ 'gst_mtl_st30p_rx.c' @@ -87,7 +85,21 @@ gst_mtl_st30p_rx = library('gstmtl_st30p_rx', c_args: plugin_c_args ) -# The mtl_st30p_tx Plugin +# mtl_st30p_tx Plugin +gstmtl_st30p_tx_sources = [ + 'gst_mtl_st30p_tx.c', +] + +gstmtl_st30p_tx = library('gstmtl_st30p_tx', + gstmtl_st30p_tx_sources, + dependencies : [gst_dep, gstbase_dep, gstreamer_audio_dep, mtl_dep], + install : true, + install_dir : plugins_install_dir, + include_directories: inc_dirs, + c_args: plugin_c_args +) + +# mtl_st30p_tx Plugin gstmtl_st30p_tx_sources = [ 'gst_mtl_st30p_tx.c', ] @@ -100,3 +112,17 @@ gstmtl_st30p_tx = library('gstmtl_st30p_tx', include_directories: inc_dirs, c_args: plugin_c_args ) + +# mtl_st40_rx Plugin +gst_mtl_st40_rx_sources = [ + 'gst_mtl_st40_rx.c' +] + +gst_mtl_st40_rx = library('gstmtl_st40_rx', + gst_mtl_common_sources + gst_mtl_st40_rx_sources, + dependencies : [gst_dep, gstbase_dep, gstreamer_audio_dep, mtl_dep], + install : true, + install_dir : plugins_install_dir, + include_directories: inc_dirs, + c_args: plugin_c_args +) diff --git a/tests/tools/RxTxApp/src/parse_json.c b/tests/tools/RxTxApp/src/parse_json.c index e4b2e4a5c..5f4c403b5 100644 --- a/tests/tools/RxTxApp/src/parse_json.c +++ b/tests/tools/RxTxApp/src/parse_json.c @@ -1128,6 +1128,7 @@ static int st_json_parse_tx_anc(int idx, json_object* anc_obj, err("%s, invalid anc type %s\n", __func__, type); return -ST_JSON_NOT_VALID; } + /* parse anc format */ const char* anc_format = json_object_get_string(st_json_object_object_get(anc_obj, "ancillary_format"));