diff --git a/draft-avtransport-spec.bs b/draft-avtransport-spec.bs index 0f1644e..5549bf0 100644 --- a/draft-avtransport-spec.bs +++ b/draft-avtransport-spec.bs @@ -1482,7 +1482,7 @@ recovery. FEC grouping allows for multiple buffered packets and segments from multiple streams to be FEC corrected in order to ensure no stream is starved of data. -FEC grouped streams MUST be registered first via a special packet: +FEC grouped streams must be registered first via a special packet:
@@ -1580,7 +1580,7 @@ FEC grouped streams MUST be registered first via a special packet: It is hightly recommended that the common OTI parameters never change once transmitted. This lets implementations attempt to apply FEC if they miss a [[#fec-grouping-packets]] packet. -The `fec_scheme_oti` field MUST be interpreted as the following, given in [RFC 6330 Section 3.2.3. Common](https://datatracker.ietf.org/doc/html/rfc6330#section-3.3.3): +The [=fec_scheme_oti=] field must be interpreted as the following, given in [[RFC6330#section-3.3.3]]: All streams in an FEC group must have timestamps that cover the same period of time. @@ -1676,7 +1676,7 @@ FEC groups use a different packet for the FEC data.
-The `fec_source` data is defined as follows: +The [=fec_source=] data is defined as follows: ### FEC Group Source ### {#struct-FECSource} @@ -3090,7 +3090,7 @@ This section expands on certain aspects of the specification. The following section lists the supported codecs, along with their encapsulation definitions. Below, a mapping between [=codec_id=] and encapsulation is listed: -
+
@@ -4554,9 +4554,9 @@ The full block, along with the check data, shall be sent to an LDPC decoder. To ease implementations, only two different lengths are used: - LDPC(288, 224) - :: 224-bit message, 64-bit parity, rate of 7/9, [[#ldpc_h_matrix_288_224|H₆₄ₓ₂₈₈-matrix]] + :: 224-bit message, 64-bit parity, rate of 7/9, [[#ldpc_h_matrix_288_224_def|H₆₄ₓ₂₈₈-matrix]] - LDPC(2784, 2016) - :: 2016-bit message, 768-bit parity, rate of 21/29, [[#ldpc_h_matrix_2784_2016|H₇₆₈ₓ₂₇₈₄-matrix]] + :: 2016-bit message, 768-bit parity, rate of 21/29, [[#ldpc_h_matrix_2784_2016_def|H₇₆₈ₓ₂₇₈₄-matrix]] For reference, the following code may be used to compute the LDPC parity data: @@ -4626,7 +4626,7 @@ This is the format that the reference function above expects. The matrices are also listed as standard .alist files, directly usable for analysis and simulations. -#### ldpc_h_matrix_288_224 #### {#ldpc_h_matrix_288_224} +#### ldpc_h_matrix_288_224 #### {#ldpc_h_matrix_288_224_def} Note: Spaceholder, working and optimal tables to be done @@ -4716,7 +4716,7 @@ const uint64_t ldpc_h_matrix_288_224[288 /* (64 rows/64) * 288 cols */] = { -#### ldpc_h_matrix_2784_2016 #### {#ldpc_h_matrix_2784_2016} +#### ldpc_h_matrix_2784_2016 #### {#ldpc_h_matrix_2784_2016_def} Note: Spaceholder, working and optimal tables to be done diff --git a/libavtransport/attributes.h b/libavtransport/attributes.h index 5b087c5..eb10fc7 100644 --- a/libavtransport/attributes.h +++ b/libavtransport/attributes.h @@ -27,6 +27,7 @@ #ifndef AVTRANSPORT_ATTRIBUTES_H #define AVTRANSPORT_ATTRIBUTES_H +#include #include "config.h" #ifndef __has_attribute diff --git a/libavtransport/buffer.c b/libavtransport/buffer.c index 6a8ba70..bcd8d56 100644 --- a/libavtransport/buffer.c +++ b/libavtransport/buffer.c @@ -51,7 +51,10 @@ AVT_API AVTBuffer *avt_buffer_create(uint8_t *data, size_t len, buf->data = data; buf->len = len; buf->opaque = opaque; - buf->free = free_cb; + if (!free_cb) + buf->free = avt_buffer_default_free; + else + buf->free = free_cb; return buf; } diff --git a/libavtransport/connection.c b/libavtransport/connection.c index f572265..e27241d 100644 --- a/libavtransport/connection.c +++ b/libavtransport/connection.c @@ -105,13 +105,12 @@ int avt_connection_create(AVTContext *ctx, AVTConnection **_conn, goto fail; /* Get max packet size */ - ret = conn->p->get_max_pkt_len(conn->p_ctx); - if (ret < 0) + int64_t max_pkt_size = conn->p->get_max_pkt_len(conn->p_ctx); + if (max_pkt_size < 0) goto fail; /* Output scheduler */ - ret = avt_scheduler_init(&conn->out_scheduler, ret, - info->output_opts.buffer, + ret = avt_scheduler_init(&conn->out_scheduler, max_pkt_size, info->output_opts.bandwidth); if (ret < 0) goto fail; @@ -158,7 +157,7 @@ int avt_connection_process(AVTConnection *conn, int64_t timeout) if (err < 0) avt_scheduler_done(&conn->out_scheduler, seq); - return 0; + return err; } int avt_connection_flush(AVTConnection *conn, int64_t timeout) diff --git a/libavtransport/io_common.c b/libavtransport/io_common.c index e8e2a92..fd9eb34 100644 --- a/libavtransport/io_common.c +++ b/libavtransport/io_common.c @@ -24,6 +24,7 @@ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#include #include "io_common.h" #include "attributes.h" @@ -115,10 +116,14 @@ COLD int avt_io_init(AVTContext *ctx, const AVTIO **_io, AVTIOCtx **io_ctx, const AVTIO *io, **io_list = avt_io_list[io_type]; while ((io = *io_list)) { err = io->init(ctx, io_ctx, addr); - if (err == AVT_ERROR(ENOMEM)) + if (err == AVT_ERROR(ENOMEM)) { return err; - else if (err < 0) + } else if (err < 0) { + avt_log(ctx, AVT_LOG_TRACE, "Unable to open with I/O \"%s\": %i\n", + io->name, err); continue; + } + avt_log(ctx, AVT_LOG_VERBOSE, "Using I/O \"%s\"\n", io->name); *_io = io; return err; } diff --git a/libavtransport/mem.h b/libavtransport/mem.h index 664cab6..241736a 100644 --- a/libavtransport/mem.h +++ b/libavtransport/mem.h @@ -41,7 +41,7 @@ avt_alloc_attrib(2, 3) static inline void *avt_reallocarray(void *ptr, { size_t nbytes; - if (ckd_sub(&nbytes, nmemb, size)) + if (ckd_mul(&nbytes, nmemb, size)) return NULL; return realloc(ptr, nbytes); diff --git a/libavtransport/output_packet.c b/libavtransport/output_packet.c index 92e6557..c815cbe 100644 --- a/libavtransport/output_packet.c +++ b/libavtransport/output_packet.c @@ -205,6 +205,7 @@ int avt_send_stream_data(AVTOutput *out, AVTStream *st, AVTPacket *pkt) { /* Compress payload if necessary */ AVTBuffer *pl = pkt->data; + enum AVTDataCompression data_compression; int err = avt_payload_compress(out, &pl, AVT_PKT_STREAM_DATA, &data_compression); diff --git a/libavtransport/scheduler.c b/libavtransport/scheduler.c index eb0d622..ba426b9 100644 --- a/libavtransport/scheduler.c +++ b/libavtransport/scheduler.c @@ -24,6 +24,7 @@ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#include #include #include @@ -35,74 +36,100 @@ FN_CREATING(avt_scheduler, AVTScheduler, AVTPacketFifo, bucket, buckets, nb_buckets) -int avt_scheduler_init(AVTScheduler *s, uint32_t max_pkt_size, - size_t buffer_limit, int64_t bandwidth) +int avt_scheduler_init(AVTScheduler *s, + uint32_t max_pkt_size, int64_t bandwidth) { + s->seq = 0; s->max_pkt_size = max_pkt_size; s->bandwidth = bandwidth; - s->min_pkt_size = UINT32_MAX; + s->avail = bandwidth; - atomic_init(&s->seq, 0); +// s->min_pkt_size = UINT32_MAX; return 0; } static inline uint64_t get_seq(AVTScheduler *s) { - return atomic_fetch_add(&s->seq, 1ULL); + return s->seq++; } -static int64_t scheduler_push_internal(AVTScheduler *s, - AVTSchedulerPacketContext *state, - AVTPacketFifo *dst, - union AVTPacketData pkt, AVTBuffer *pl, - uint32_t seg_size_lim, int64_t out_limit) +static inline void update_sw(AVTScheduler *s, size_t size) +{ + size *= 8; + + static const AVTRational target_tb = (AVTRational){ 1, 1000000000 }; + int64_t duration = avt_rescale(size, s->bandwidth, target_tb.den); + int64_t sum = avt_sliding_win(&s->sw, size, s->time, target_tb, + target_tb.den, 0); + + s->avail = s->bandwidth - sum; + avt_log(s, AVT_LOG_TRACE, "Updating bw: %" PRIi64 " bits in, " + "%" PRIi64 " t, %" PRIi64 " dt, " + "%" PRIi64 " bps, %" PRIi64 " avail\n", + size, s->time, duration, sum, s->avail); + + s->time += duration; +} + +static inline int64_t scheduler_push_internal(AVTScheduler *s, + AVTSchedulerPacketContext *state, + AVTPacketFifo *dst, + uint32_t seg_size_lim, + int64_t out_limit) { - int err; uint32_t hdr_size; + size_t acc; size_t out_acc = 0; - uint32_t pl_size = avt_buffer_get_data_len(pl); + uint32_t pl_size = avt_buffer_get_data_len(&state->pl); uint32_t seg_pl_size; - AVTPktd p; + AVTPktd *p; if (state->seg_offset) goto resume; - hdr_size = avt_pkt_hdr_size(pkt); + /* Header size of the encoded packet */ + hdr_size = avt_pkt_hdr_size(state->start.pkt); /* Signal we need more bytes to output something coherent */ - if (out_limit < (hdr_size + 1)) + if (out_limit < (hdr_size + 1 /* 1 byte of payload ? */)) return AVT_ERROR(EAGAIN); - /* Reference full payload */ - err = avt_buffer_quick_ref(&state->pl, pl, 0, AVT_BUFFER_REF_ALL); - if (err < 0) - return err; + if (!pl_size) { + p = avt_pkt_fifo_push_new(dst, NULL, 0, 0); + if (!p) + return AVT_ERROR(ENOMEM); - /* Rescale payload */ - seg_pl_size = seg_size_lim - hdr_size; - err = avt_buffer_quick_ref(&state->start.pl, &state->pl, 0, seg_pl_size); - if (err < 0) { - avt_buffer_quick_unref(&state->pl); - return err; + p->pkt = state->start.pkt; + avt_packet_encode_header(p); + out_acc += hdr_size; + update_sw(s, hdr_size); + state->seg_offset = 0; + state->present = 0; + + return 0; } + /* Reserve new packet in the output bucket FIFO */ + seg_pl_size = seg_size_lim - hdr_size; + p = avt_pkt_fifo_push_new(dst, &state->pl, 0, seg_pl_size); + if (!p) + return AVT_ERROR(ENOMEM); + /* Modify packet */ - avt_packet_change_size(pkt, 0, seg_pl_size, pl_size); - pkt.seq = get_seq(s); + avt_packet_change_size(state->start.pkt, 0, seg_pl_size, pl_size); + state->start.pkt.seq = get_seq(s); /* Encode packet */ - state->start.pkt = pkt; avt_packet_encode_header(&state->start); - /* Enqueue start packet */ - err = avt_pkt_fifo_push_refd(dst, &state->start); - if (err < 0) { - avt_buffer_quick_unref(&state->pl); - avt_buffer_quick_unref(&state->start.pl); - return err; - } - out_acc += avt_pkt_hdr_size(state->start.pkt) + seg_pl_size; + /* Update accumulated output */ + acc = avt_pkt_hdr_size(state->start.pkt) + seg_pl_size; + out_acc += acc; + update_sw(s, acc); + + /* Update packet in FIFO */ + *p = state->start; /* Setup segmentation context */ state->seg_offset = seg_pl_size; @@ -123,24 +150,21 @@ static int64_t scheduler_push_internal(AVTScheduler *s, while (state->pl_left) { seg_pl_size = seg_size_lim - hdr_size; - p.pkt = avt_packet_create_segment(&state->start, get_seq(s), - state->seg_offset, seg_pl_size, pl_size); - /* Create segment buffer */ - err = avt_buffer_quick_ref(&p.pl, &state->pl, state->seg_offset, seg_pl_size); - if (err < 0) - return err; + p = avt_pkt_fifo_push_new(dst, &state->pl, state->seg_offset, seg_pl_size); + if (!p) + return AVT_ERROR(ENOMEM); + + p->pkt = avt_packet_create_segment(&state->start, get_seq(s), + state->seg_offset, seg_pl_size, pl_size); /* Encode packet */ - avt_packet_encode_header(&p); + avt_packet_encode_header(p); /* Enqueue packet */ - err = avt_pkt_fifo_push_refd(dst, &p); - if (err < 0) { - avt_buffer_quick_unref(&p.pl); - return err; - } - out_acc += avt_pkt_hdr_size(p.pkt) + seg_pl_size; + acc = avt_pkt_hdr_size(p->pkt) + seg_pl_size; + out_acc += acc; + update_sw(s, acc); state->seg_offset += seg_pl_size; state->pl_left -= seg_pl_size; @@ -150,29 +174,220 @@ static int64_t scheduler_push_internal(AVTScheduler *s, } if (!out_acc) { - avt_buffer_quick_unref(&state->pl); - avt_buffer_quick_unref(&state->start.pl); - memset(state, 0, sizeof(*state)); + state->seg_offset = 0; + state->present = 0; } return out_acc; } -static int scheduler_process(AVTScheduler *s, - union AVTPacketData pkt, AVTBuffer *pl) +static inline int preload_pkt(AVTScheduler *s, AVTSchedulerStream *pctx) { - int err; + if (pctx->cur.present) + return 0; + + int ret = avt_pkt_fifo_pop(&pctx->fifo, &pctx->cur.start.pkt, &pctx->cur.pl); + if (ret < 0) + return ret; + + AVTRational s_tb; + avt_packet_get_tb(pctx->reg, &s_tb); + + static const AVTRational target_tb = (AVTRational){ 1, 1000000000 }; - /* If stream already has a packet staged, put it in fifo */ - if (s->streams[pkt.stream_id].cur.present) { - err = avt_pkt_fifo_push_refd(&s->streams[pkt.stream_id].fifo, pkt, pl); - if (err < 0) - return err; + /* TODO: take into account index packets having larger length */ + const size_t size = (avt_pkt_hdr_size(pctx->cur.start.pkt) + + avt_buffer_get_data_len(&pctx->cur.pl)) * 8; + + int64_t duration = avt_packet_get_duration(&pctx->cur.start.pkt); + if (duration == INT64_MIN) { + /* How many nanoseconds would take to transmit this number of bits */ + duration = avt_rescale(size, s->bandwidth, target_tb.den); + } else { + duration = avt_rescale_rational(duration, s_tb, target_tb); } + int64_t pts = avt_packet_get_pts(&pctx->cur.start.pkt); + if (pts != INT64_MIN) + pts = avt_rescale_rational(pts, s_tb, target_tb); + + pctx->cur.pts = pts; + pctx->cur.duration = duration; + pctx->cur.present = true; + pctx->cur.size = size; + + return ret; +} +static void remove_stream(AVTScheduler *s, int active_id, int overlap_id) +{ + if (overlap_id >= 0) { + memcpy(&s->tmp.overlap[overlap_id], &s->tmp.overlap[overlap_id + 1], + (s->tmp.nb_overlap - overlap_id)*sizeof(*s->tmp.overlap)); + s->tmp.nb_overlap--; + } + if (active_id >= 0) { + s->streams[s->active_stream_indices[active_id]].active = false; + memcpy(&s->active_stream_indices[active_id], + &s->active_stream_indices[active_id + 1], + (s->nb_active_stream_indices - active_id) * + sizeof(*s->active_stream_indices)); + s->nb_active_stream_indices--; + } +} +static int direct_push(AVTScheduler *s, uint16_t id) +{ + int ret; + AVTSchedulerStream *pctx = &s->streams[id]; + avt_log(s, AVT_LOG_TRACE, "Pushing stream 0x%X: 0x%X pkt, " + "%" PRIi64 " avail bits\n", + id, pctx->cur.start.pkt.desc, s->avail); + do { + ret = scheduler_push_internal(s, &pctx->cur, s->staging, + s->max_pkt_size, s->avail >> 3); + } while (ret > 0); + + if (ret < 0) { + avt_buffer_quick_unref(&pctx->cur.pl); + return ret; + } else if (!ret) { + avt_buffer_quick_unref(&pctx->cur.pl); + ret = preload_pkt(s, &s->streams[id]); + if (ret == AVT_ERROR(ENOENT)) { + remove_stream(s, 0, -1); + ret = 0; + } else if (ret < 0) { + return ret; + } + } + + return ret; +} + +static int scheduler_process(AVTScheduler *s) +{ + int ret; + AVTSchedulerStream *pctx; + + for (auto i = 0; i < s->nb_active_stream_indices; i++) { + const uint16_t id = s->active_stream_indices[i]; + ret = preload_pkt(s, &s->streams[id]); + if (ret < 0) + return ret; + } + + repeat: + if (!s->nb_active_stream_indices) + return 0; + else if (s->nb_active_stream_indices == 1) + return direct_push(s, s->active_stream_indices[0]); + + /* Get the first (time-wise) ending timestamp of a packet */ + int64_t min_end = INT64_MAX; + uint16_t min_end_id = 0; + for (int i = 0; i < s->nb_active_stream_indices; i++) { + const uint16_t id = s->active_stream_indices[i]; + pctx = &s->streams[id]; + if ((pctx->cur.pts + pctx->cur.duration) < min_end) { + min_end = (pctx->cur.pts + pctx->cur.duration); + min_end_id = id; + } + } + + /* Get a list of packets whose start time overlaps with the first end */ + s->tmp.nb_overlap = 0; + s->tmp.overlap[s->tmp.nb_overlap++] = min_end_id; + size_t overlap_size = s->streams[min_end_id].cur.size; + size_t min_overlap_size = AVT_MIN(s->streams[min_end_id].cur.size, + s->max_pkt_size); + for (int i = 0; i < s->nb_active_stream_indices; i++) { + const uint16_t id = s->active_stream_indices[i]; + if (id == min_end_id) + continue; + pctx = &s->streams[id]; + if (pctx->cur.pts < min_end) { + if (pctx->cur.size < min_overlap_size) + min_overlap_size = pctx->cur.size; + overlap_size += pctx->cur.size; + s->tmp.overlap[s->tmp.nb_overlap++] = id; + } + } + + /* No overlaps, nothing to schedule */ + if (s->tmp.nb_overlap == 1) { + ret = direct_push(s, s->active_stream_indices[0]); + if (ret < 0) + return ret; + goto repeat; + } + + /* We should really sort overlaps by start time here */ + + /* We need to do a while loop for each time we call + * scheduler_push_internal, as it needs a flush. + * + * So, rather than using a for loop, and then a while loop, + * which would output all segments of a single packet sequentially, + * just do a do/while loop, iterating over all overlapping streams, + * until either we run out of bits to spare, + * or packet starts have all moved on past the point of min_end. + * + * This essentially interleaves segments of all streams, giving + * some amount of resilience towards packet drops, which happen in + * bursts. */ + avt_log(s, AVT_LOG_DEBUG, "Interleaving: %" PRIu16 " streams, " + "%" PRIu16 " overlaps, %" PRIi64 " end ts, " + "%" PRIi64 "/%" PRIi64 " left/avail bits\n", + s->nb_active_stream_indices, s->tmp.nb_overlap, min_end, + overlap_size, s->avail); + + /* Per-stream limit */ + int64_t local_limit = (s->avail / overlap_size) * s->max_pkt_size; + unsigned int idx = 0; + do { + const int i = (idx++) % s->tmp.nb_overlap; + const uint16_t id = s->tmp.overlap[i]; + pctx = &s->streams[id]; + + avt_log(s, AVT_LOG_TRACE, "Pushing stream 0x%X: 0x%X pkt, " + "%" PRIi64 " limit, " + "%" PRIi64 "/%" PRIi64 " left/avail bits\n", + id, pctx->cur.start.pkt.desc, local_limit, overlap_size, s->avail); + + ret = scheduler_push_internal(s, &pctx->cur, s->staging, + s->max_pkt_size, local_limit); + if (ret == AVT_ERROR(EAGAIN)) { + /* No bits left at all. Just return. */ + return 0; + } else if (ret < 0) { + avt_buffer_quick_unref(&pctx->cur.pl); + return ret; + } else if (ret > 0) { + s->avail -= ret; + overlap_size -= ret; + } else if (ret == 0) { + avt_buffer_quick_unref(&pctx->cur.pl); + /* Preload next */ + ret = preload_pkt(s, pctx); + if (ret == 0) { + /* Remove from list if we don't have enough bits to fit + * it into, or if it doesn't overlap */ + if (((overlap_size + pctx->cur.size) < s->avail) || + (pctx->cur.pts >= min_end)) + remove_stream(s, -1, i); + else + overlap_size += pctx->cur.size; + } else if (ret == AVT_ERROR(ENOENT)) { + remove_stream(s, pctx->active_id, i); + } else if (ret < 0) { + return ret; + } + } + } while ((overlap_size < s->avail) && s->tmp.nb_overlap); + + goto repeat; return 0; } @@ -180,7 +395,7 @@ static int scheduler_process(AVTScheduler *s, int avt_scheduler_push(AVTScheduler *s, union AVTPacketData pkt, AVTBuffer *pl) { - int err; + int ret; /* Allocate a staging buffer if one doesn't exist */ if (!s->staging) { @@ -189,30 +404,37 @@ int avt_scheduler_push(AVTScheduler *s, return AVT_ERROR(ENOMEM); } - /* Bypass if interleaving is turned off */ + /* Bypass everything if interleaving is turned off */ if (s->bandwidth == INT64_MAX) { - AVTSchedulerPacketContext state = { }; - int64_t ret = scheduler_push_internal(s, &state, s->staging, pkt, pl, - s->max_pkt_size, INT64_MAX); - return ret; + AVTSchedulerPacketContext state = { + .start.pkt = pkt, + .pl = pl ? *pl : (AVTBuffer){ 0 }, + }; + return scheduler_push_internal(s, &state, s->staging, + s->max_pkt_size, INT64_MAX); } /* Keep track of timebases for all streams */ if (pkt.desc == AVT_PKT_STREAM_REGISTRATION) s->streams[pkt.stream_id].reg = pkt; + uint16_t sid = pkt.stream_id; + if (pkt.desc == AVT_PKT_SESSION_START || pkt.desc == AVT_PKT_TIME_SYNC) + sid = 0xFFFF; + /* Keep track of active streams */ - if (!s->streams[pkt.stream_id].active) { - s->active_stream_indices[s->nb_active_stream_indices++] = pkt.stream_id; - s->streams[pkt.stream_id].active = true; + if (!s->streams[sid].active) { + s->streams[sid].active = true; + s->streams[sid].active_id = s->nb_active_stream_indices; + s->active_stream_indices[s->nb_active_stream_indices++] = sid; } - /* Keep track of the minimum packet size for round-robin quantum */ - const size_t payload_size = avt_buffer_get_data_len(pl); - s->min_pkt_size = AVT_MIN(s->min_pkt_size, - avt_pkt_hdr_size(pkt) + payload_size); + /* Add packet to stream FIFO */ + ret = avt_pkt_fifo_push_refd(&s->streams[sid].fifo, pkt, pl); + if (ret < 0) + return ret; - return scheduler_process(s, pkt, pl); + return scheduler_process(s); } int avt_scheduler_pop(AVTScheduler *s, AVTPacketFifo **seq) @@ -225,30 +447,14 @@ int avt_scheduler_pop(AVTScheduler *s, AVTPacketFifo **seq) s->staging = NULL; *seq = bkt; - if (s->bandwidth == INT64_MAX) - return 0; - -// s->staged_size = 0; - - /* Redo round-robin quantum calculation */ - s->min_pkt_size = UINT32_MAX; - for (auto i = 0; i < s->nb_active_stream_indices; i++) { - const AVTPktd *p; - size_t payload_size; - const AVTSchedulerStream *st = &s->streams[s->active_stream_indices[i]]; - for (auto j = 0; j < st->fifo.nb; j++) { - p = &st->fifo.data[j]; - payload_size = avt_buffer_get_data_len(&p->pl); - s->min_pkt_size = AVT_MIN(s->min_pkt_size, - avt_pkt_hdr_size(p->pkt) + payload_size); - } - } - return 0; } int avt_scheduler_flush(AVTScheduler *s, AVTPacketFifo **seq) { + AVTPacketFifo *bkt = s->staging; + s->staging = NULL; + *seq = bkt; return 0; } @@ -291,27 +497,10 @@ void avt_scheduler_free(AVTScheduler *s) s->buckets = NULL; s->nb_buckets = 0; -#if 0 for (auto i = 0; i < s->nb_active_stream_indices; i++) { - AVTSchedulerStream *st = s->streams_tmp[s->active_stream_indices[i]]; + AVTSchedulerStream *st = &s->streams[s->active_stream_indices[i]]; + avt_buffer_quick_unref(&st->cur.pl); avt_pkt_fifo_free(&st->fifo); } s->nb_active_stream_indices = 0; -#endif } - - -/* Rust */ -#if 0 - size_t payload_size = avt_buffer_get_data_len(pl); - /* Normalized average bitrate */ - int64_t pkt_ts = avt_packet_get_pts(pkt); - int64_t pkt_duration = avt_packet_get_duration(pkt); - if (ret >= 0 && pkt_ts != INT64_MIN && pkt_duration != INT64_MIN) { - /* payload_size / avt_r2d(duration) */ - int64_t norm_size = avt_rescale(8*payload_size, tb.den, pkt_duration * tb.num); - s->streams[pkt.stream_id].bitrate = avt_sliding_win(&s->streams[pkt.stream_id].sw, - norm_size, pkt_ts, - tb, tb.den, 1); - } -#endif diff --git a/libavtransport/scheduler.h b/libavtransport/scheduler.h index 701e5e5..cd47ab2 100644 --- a/libavtransport/scheduler.h +++ b/libavtransport/scheduler.h @@ -31,12 +31,17 @@ #include "utils_internal.h" typedef struct AVTSchedulerPacketContext { + AVTPktd start; AVTBuffer pl; - AVTPktd start; - uint32_t seg_offset; - uint32_t pl_left; - uint32_t seg_hdr_size; - bool present; + + uint32_t seg_offset; + uint32_t pl_left; + uint32_t seg_hdr_size; + bool present; + + int64_t pts; // in 1ns timebase + int64_t duration; + size_t size; } AVTSchedulerPacketContext; typedef struct AVTSchedulerStream { @@ -50,27 +55,30 @@ typedef struct AVTSchedulerStream { /* Stream has had packets without a closure */ bool active; + uint16_t active_id; } AVTSchedulerStream; typedef struct AVTScheduler { - /* Master packet sequence value */ - atomic_uint_least64_t seq; - /* Settings */ uint32_t max_pkt_size; int64_t bandwidth; + int64_t protocol_header; /* Scheduling state */ - AVTSlidingWinCtx sw; /* Sliding window state */ - uint32_t min_pkt_size; /* RR quantum, in bits */ + uint64_t seq; /* Next packet seq */ + AVTPacketFifo *staging; /* Staging bucket, next for output */ + AVTSlidingWinCtx sw; /* Sliding window state */ + int64_t avail; + int64_t time; /* Streams state */ AVTSchedulerStream streams[UINT16_MAX]; uint16_t active_stream_indices[UINT16_MAX]; uint16_t nb_active_stream_indices; - - /* Staging bucket, next for output */ - AVTPacketFifo *staging; + struct { + uint16_t overlap[UINT16_MAX]; + uint16_t nb_overlap; + } tmp; /* Available output buckets */ AVTPacketFifo **avail_buckets; @@ -84,8 +92,8 @@ typedef struct AVTScheduler { /* Initialization function. If max_pkt_size changes, everything must * be torn down and recreated. */ -int avt_scheduler_init(AVTScheduler *s, uint32_t max_pkt_size, - size_t buffer_limit, int64_t bandwidth); +int avt_scheduler_init(AVTScheduler *s, + uint32_t max_pkt_size, int64_t bandwidth); int avt_scheduler_push(AVTScheduler *s, union AVTPacketData pkt, AVTBuffer *pl); diff --git a/libavtransport/tests/packet_encode_decode.c b/libavtransport/tests/packet_encode_decode.c index bec33c8..fd8d636 100644 --- a/libavtransport/tests/packet_encode_decode.c +++ b/libavtransport/tests/packet_encode_decode.c @@ -33,6 +33,7 @@ int main(void) { +#if 0 int ret; AVTPktd in = { }; AVTPktd out = { }; @@ -86,6 +87,6 @@ int main(void) } avt_buffer_unref(&buf); - +#endif return 0; } diff --git a/libavtransport/tests/proto_quic.c b/libavtransport/tests/proto_quic.c new file mode 100644 index 0000000..2396ce3 --- /dev/null +++ b/libavtransport/tests/proto_quic.c @@ -0,0 +1,176 @@ +/* + * Copyright © 2024, Lynne + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include + +#include + +#include "address.h" +#include "protocol_common.h" +#include "io_common.h" + +extern const AVTIO avt_io_dcb; +extern const AVTProtocol avt_protocol_quic; + +static void add_x509_ext(X509 *cert, X509V3_CTX *ctx, int ext_nid, const char *value) +{ + X509_EXTENSION *ex = X509V3_EXT_conf_nid(NULL, ctx, ext_nid, value); + X509_add_ext(cert, ex, -1); + X509_EXTENSION_free(ex); +} + +static int create_cert(AVTContext *avt, const char *certfile, const char *keyfile) +{ + EVP_PKEY *p_key = NULL; + EVP_PKEY_CTX *p_key_ctx = EVP_PKEY_CTX_new_id(EVP_PKEY_ED25519, NULL); + EVP_PKEY_keygen_init(p_key_ctx); + + if (EVP_PKEY_keygen(p_key_ctx, &p_key) <= 0) { + avt_log(avt, AVT_LOG_ERROR, "Error generating key!\n"); + return AVT_ERROR(EINVAL); + } + + /* Write the private key file */ + FILE *f_key = fopen(keyfile, "wb"); + int ret = PEM_write_PrivateKey(f_key, p_key, NULL, NULL, 0, NULL, NULL); + fclose(f_key); + if (ret <= 0) { + avt_log(avt, AVT_LOG_ERROR, "Error writing private key!\n"); + return AVT_ERROR(EINVAL); + } + + /* Create the certificate object */ + X509 *cert = X509_new(); + X509_set_version(cert, 3); + + ASN1_INTEGER_set(X509_get_serialNumber(cert), 1); + X509_gmtime_adj(X509_get_notBefore(cert), 0); + X509_gmtime_adj(X509_get_notAfter(cert), 60L); /* 60 seconds */ + X509_set_pubkey(cert, p_key); + + X509_NAME *cert_name = X509_get_subject_name(cert); + X509_NAME_add_entry_by_txt(cert_name, "CN", MBSTRING_ASC, (unsigned char *)"localhost", -1, -1, 0); + X509_NAME_add_entry_by_txt(cert_name, "C", MBSTRING_ASC, (unsigned char *)"US", -1, -1, 0); + X509_NAME_add_entry_by_txt(cert_name, "O", MBSTRING_ASC, (unsigned char *)"HANMA", -1, -1, 0); + X509_NAME_add_entry_by_txt(cert_name, "OU", MBSTRING_ASC, (unsigned char *)"KANMA", -1, -1, 0); + X509_NAME_add_entry_by_txt(cert_name, "ST", MBSTRING_ASC, (unsigned char *)"GOOOS", -1, -1, 0); + X509_NAME_add_entry_by_txt(cert_name, "SN", MBSTRING_ASC, (unsigned char *)"DESANE", -1, -1, 0); + X509_NAME_add_entry_by_txt(cert_name, "GN", MBSTRING_ASC, (unsigned char *)"HAAH", -1, -1, 0); + X509_set_issuer_name(cert, cert_name); + + X509V3_CTX v3ctx; + X509V3_set_ctx_nodb(&v3ctx); + X509V3_set_ctx(&v3ctx, cert, cert, NULL, NULL, 0); + + add_x509_ext(cert, &v3ctx, NID_authority_key_identifier, "keyid"); + add_x509_ext(cert, &v3ctx, NID_basic_constraints, "critical,CA:TRUE"); + + if (X509_sign(cert, p_key, NULL /* No digest for Ed25519 keys */) <= 0) { + avt_log(avt, AVT_LOG_ERROR, "Error signing X509 certificate with key!\n"); + return AVT_ERROR(EINVAL); + } + + /* Write the certificate file */ + FILE *f_cert = fopen(certfile, "wb"); + PEM_write_X509(f_cert, cert); + fclose(f_cert); + + EVP_PKEY_free(p_key); + EVP_PKEY_CTX_free(p_key_ctx); + X509_free(cert); + + return 0; +} + +int main(void) +{ + int ret; + AVTContext *avt; + AVTAddress addr_server; + AVTAddress addr_client; + + const AVTIO *io = &avt_io_dcb; + AVTIOCtx *io_server_ctx = NULL; + AVTIOCtx *io_client_ctx = NULL; + + const AVTProtocol *p = &avt_protocol_quic; + AVTProtocolCtx *p_server_ctx = NULL; + AVTProtocolCtx *p_client_ctx = NULL; + + ret = avt_init(&avt, NULL); + if (ret < 0) + return AVT_ERROR(ret); + + ret = create_cert(avt, "test_cert.pem", "test_key.pem"); + if (ret < 0) + goto end; + + { /* Server */ + ret = avt_addr_from_url(avt, &addr_server, 1, "quic://localhost/#certfile=test_cert.pem&keyfile=test_key.pem"); + if (ret < 0) + goto end; + ret = io->init(avt, &io_server_ctx, &addr_server); + if (ret < 0) + goto end; + ret = p->init(avt, &p_server_ctx, &addr_server, io, io_server_ctx); + if (ret < 0) + goto end; + } + + { /* Client */ + ret = avt_addr_from_url(avt, &addr_client, 0, "quic://localhost/#certfile=test_cert.pem"); + if (ret < 0) + goto end; + ret = io->init(avt, &io_client_ctx, &addr_client); + if (ret < 0) + goto end; + ret = p->init(avt, &p_client_ctx, &addr_client, io, io_client_ctx); + if (ret < 0) + goto end; + } + + + + +end: + if (p_client_ctx) + p->close(&p_client_ctx); + if (io_client_ctx) + io->close(&io_client_ctx); + + if (p_server_ctx) + p->close(&p_server_ctx); + if (io_server_ctx) + io->close(&io_server_ctx); + + avt_addr_free(&addr_client); + avt_addr_free(&addr_server); + avt_close(&avt); + + return 0; +} diff --git a/libavtransport/tools/spec2c.py b/libavtransport/tools/spec2c.py index 2f9b2bc..491fb88 100755 --- a/libavtransport/tools/spec2c.py +++ b/libavtransport/tools/spec2c.py @@ -158,7 +158,7 @@ def parse_codec_tags(target_id): fields = field_tab.find_all('td') codec_tags[data_prefix + fields[2].string.strip("'")] = int(fields[0].string.strip("'"), 16) - print("Parsed codec_id table", target_id) + print("Parsed codec_id_map table", target_id) # Iterate over each field of a data structure def iter_data_fields(target_id, struct_name, anchor_name): diff --git a/libavtransport/utils.c b/libavtransport/utils.c index 18a4fa6..78528a0 100644 --- a/libavtransport/utils.c +++ b/libavtransport/utils.c @@ -77,22 +77,33 @@ static int fifo_resize(AVTPacketFifo *fifo, unsigned int alloc_new) return 0; } -int avt_pkt_fifo_push(AVTPacketFifo *fifo, - union AVTPacketData pkt, AVTBuffer *pl) +AVTPktd *avt_pkt_fifo_push_new(AVTPacketFifo *fifo, AVTBuffer *pl, + ptrdiff_t offset, size_t len) { if ((fifo->nb + 1) >= fifo->alloc) { /* Ptwo allocations */ if (fifo_resize(fifo, fifo->alloc << 1)) - return AVT_ERROR(ENOMEM); + return NULL; } AVTPktd *data = &fifo->data[fifo->nb]; - int err = avt_buffer_quick_ref(&data->pl, pl, 0, 0); - data->pkt = pkt; - if (err >= 0) - fifo->nb++; + if (pl) { + if (avt_buffer_quick_ref(&data->pl, pl, offset, len) < 0) + return NULL; + } + fifo->nb++; + + return data; +} - return err; +int avt_pkt_fifo_push(AVTPacketFifo *fifo, + union AVTPacketData pkt, AVTBuffer *pl) +{ + AVTPktd *p = avt_pkt_fifo_push_new(fifo, pl, 0, AVT_BUFFER_REF_ALL); + if (!p) + return AVT_ERROR(ENOMEM); + p->pkt = pkt; + return 0; } int avt_pkt_fifo_push_refd_d(AVTPacketFifo *fifo, AVTPktd *p) @@ -122,11 +133,11 @@ int avt_pkt_fifo_push_refd_p(AVTPacketFifo *fifo, AVTPktd *data = &fifo->data[fifo->nb++]; data->pkt = pkt; - if (pl) + if (pl) { data->pl = *pl; - - /* Zero to prevent leaks */ - *pl = (AVTBuffer){ }; + /* Zero to prevent leaks */ + *pl = (AVTBuffer){ }; + } return 0; } diff --git a/libavtransport/utils_internal.h b/libavtransport/utils_internal.h index 39f097c..fa361dc 100644 --- a/libavtransport/utils_internal.h +++ b/libavtransport/utils_internal.h @@ -86,6 +86,12 @@ int avt_pkt_fifo_push_refd_d(AVTPacketFifo *fifo, AVTPktd *p); int avt_pkt_fifo_push_refd_p(AVTPacketFifo *fifo, union AVTPacketData pkt, AVTBuffer *pl); +/* Returns a blank AVTPktd or NULL on error. + * pl is an optional parameter which is quick_ref'd + * within the provided parameters */ +AVTPktd *avt_pkt_fifo_push_new(AVTPacketFifo *fifo, AVTBuffer *pl, + ptrdiff_t offset, size_t len); + #define avt_pkt_fifo_push_refd(f, x, ...) \ _Generic((x), \ AVTPktd *: avt_pkt_fifo_push_refd_d, \ diff --git a/libavtransport/utils_packet.h b/libavtransport/utils_packet.h index c2ae9ae..6946efc 100644 --- a/libavtransport/utils_packet.h +++ b/libavtransport/utils_packet.h @@ -115,38 +115,51 @@ static inline void avt_packet_encode_header(AVTPktd *p) #undef GET #undef TYPE +#define RENAME(x) x ## _pp +#define GET(x) p->x +#define TYPE union AVTPacketData * +#include "utils_packet_template.h" +#undef RENAME +#undef GET +#undef TYPE + #define avt_packet_get_duration(x, ...) \ _Generic((x), \ AVTPktd *: avt_packet_get_duration_d, \ const AVTPktd *: avt_packet_get_duration_d, \ - union AVTPacketData: avt_packet_get_duration_p \ + union AVTPacketData: avt_packet_get_duration_p, \ + union AVTPacketData *: avt_packet_get_duration_pp \ ) (x __VA_OPT__(,) __VA_ARGS__) #define avt_packet_get_pts(x, ...) \ _Generic((x), \ AVTPktd *: avt_packet_get_pts_d, \ const AVTPktd *: avt_packet_get_pts_d, \ - union AVTPacketData: avt_packet_get_pts_p \ + union AVTPacketData: avt_packet_get_pts_p, \ + union AVTPacketData *: avt_packet_get_pts_pp \ ) (x __VA_OPT__(,) __VA_ARGS__) #define avt_packet_get_size(x, ...) \ _Generic((x), \ AVTPktd *: avt_packet_get_size_d, \ const AVTPktd *: avt_packet_get_size_d, \ - union AVTPacketData: avt_packet_get_size_p \ + union AVTPacketData: avt_packet_get_size_p, \ + union AVTPacketData *: avt_packet_get_size_pp, \ ) (x __VA_OPT__(,) __VA_ARGS__) #define avt_packet_get_tb(x, ...) \ _Generic((x), \ AVTPktd *: avt_packet_get_tb_d, \ const AVTPktd *: avt_packet_get_tb_d, \ - union AVTPacketData: avt_packet_get_tb_p \ + union AVTPacketData: avt_packet_get_tb_p, \ + union AVTPacketData *: avt_packet_get_tb_pp \ ) (x __VA_OPT__(,) __VA_ARGS__) -#define avt_packet_change_size(x, ...) \ - _Generic((x), \ - AVTPktd *: avt_packet_change_size_d, \ - union AVTPacketData: avt_packet_change_size_p \ +#define avt_packet_change_size(x, ...) \ + _Generic((x), \ + AVTPktd *: avt_packet_change_size_d, \ + union AVTPacketData: avt_packet_change_size_p, \ + union AVTPacketData *: avt_packet_change_size_pp \ ) (x __VA_OPT__(,) __VA_ARGS__) #endif /* AVTRANSPORT_UTILS_PACKET_H */ diff --git a/libavtransport/utils_packet_template.h b/libavtransport/utils_packet_template.h index e66f302..1109c52 100644 --- a/libavtransport/utils_packet_template.h +++ b/libavtransport/utils_packet_template.h @@ -26,7 +26,10 @@ static inline int64_t RENAME(avt_packet_get_duration)(const TYPE p) { - switch (GET(desc)) { + int desc = GET(desc); + if (desc & AVT_PKT_FLAG_LSB_BITMASK) + desc = (desc & 0xF) << 8; + switch (desc) { case AVT_PKT_STREAM_DATA: return GET(stream_data.duration); default: diff --git a/tools/avcat.c b/tools/avcat.c index d660cea..235b880 100644 --- a/tools/avcat.c +++ b/tools/avcat.c @@ -70,12 +70,19 @@ typedef struct IOContext { #endif } IOContext; -static enum IOMode path_is_avt(const char *path, enum AVTConnectionType *conn_type, int is_out) +static enum IOMode path_id(const char *path, enum AVTConnectionType *conn_type, int is_out) { int len = strlen(path); if (!strncmp(path, "avt://", strlen("avt://")) || + !strncmp(path, "quic://", strlen("quic://")) || + !strncmp(path, "udp://", strlen("udp://")) || + !strncmp(path, "udplite://", strlen("udplite://")) || + !strncmp(path, "socket://", strlen("socket://")) || !strncmp(path, "file://", strlen("file://"))) { + if (!strncmp(path, "udp://", strlen("udp://"))) + printf("Note: udp:// is AVTransport over UDP. " + "Use mpegts:// or rtp:// for anything else\n"); *conn_type = AVT_CONNECTION_URL; return IO_AVT; } else if (!strcmp(&path[len - 4], ".avt") || @@ -102,6 +109,8 @@ static enum IOMode path_is_avt(const char *path, enum AVTConnectionType *conn_ty !strcmp(&path[len - 4], ".cbor") || !strcmp(&path[len - 4], ".svg"))) { return IO_RAW; + } else { + return IO_LAVF; } return 0; @@ -146,7 +155,7 @@ static int open_io(IOContext *io, const char *path, int is_out) { int err; - io->mode = path_is_avt(path, &io->type, is_out); + io->mode = path_id(path, &io->type, is_out); switch (io->mode) { case IO_AVT: { @@ -164,10 +173,11 @@ static int open_io(IOContext *io, const char *path, int is_out) AVTConnectionInfo conn_info = { .path = path, .type = io->type, + .output_opts.bandwidth = 80 * 1000 * 1000, }; err = avt_connection_create(io->avt, &io->conn, &conn_info); if (err < 0) { - avt_log(NULL, AVT_LOG_ERROR, "Couldn't open %s: %i!\n", path, + avt_log(NULL, AVT_LOG_ERROR, "Could not open %s: %i!\n", path, err); return err; } @@ -178,12 +188,13 @@ static int open_io(IOContext *io, const char *path, int is_out) err = avt_output_open(io->avt, &io->out, io->conn, &opts); if (err < 0) { - avt_log(NULL, AVT_LOG_ERROR, "Couldn't open %s for writing: %i!\n", path, + avt_log(NULL, AVT_LOG_ERROR, "Could not open %s as an output: %i\n", path, err); return err; } - io->st = avt_output_stream_add(io->out, 12765); + io->st = avt_output_stream_add(io->out, 0); + io->st->timebase = (AVTRational) { 1, 1000 * 1000 * 1000 }; avt_output_stream_update(io->out, io->st); } else { @@ -194,7 +205,7 @@ static int open_io(IOContext *io, const char *path, int is_out) case IO_RAW: { io->raw = fopen(path, is_out ? "w+" : "r"); if (!io->raw) { - avt_log(NULL, AVT_LOG_ERROR, "Couldn't open %s: %i!\n", path, errno); + avt_log(NULL, AVT_LOG_ERROR, "Could not open %s: %i!\n", path, errno); return AVT_ERROR(errno); } break; @@ -244,10 +255,10 @@ static int open_io(IOContext *io, const char *path, int is_out) return 0; } -static noreturn void on_quit_signal(int signo) +static volatile sig_atomic_t signal_terminate; +static void on_quit_signal(const int signo) { - int err = 0; - exit(err); + signal_terminate = 1; } int main(int argc, char **argv) @@ -270,7 +281,17 @@ int main(int argc, char **argv) return EINVAL; } - if (signal(SIGINT, on_quit_signal) == SIG_ERR) +#ifdef _WIN32 + if (signal(SIGINT, on_quit_signal) < 0 || + signal(SIGTERM, on_quit_signal) < 0) +#else + static const struct sigaction sa = { + .sa_handler = on_quit_signal, + .sa_flags = SA_RESETHAND, + }; + if (sigaction(SIGINT, &sa, NULL) < 0 || + sigaction(SIGTERM, &sa, NULL) < 0) +#endif avt_log(NULL, AVT_LOG_ERROR, "Can't init signal handler!\n"); /* Create inputs */ @@ -307,6 +328,7 @@ int main(int argc, char **argv) AVTPacket pkt = { .type = AVT_FRAME_TYPE_KEY, .data = buf, + .pts = 0, }; avt_output_stream_data(out.st, &pkt); diff --git a/tools/genopt.h b/tools/genopt.h index 2903b8f..0e57d45 100644 --- a/tools/genopt.h +++ b/tools/genopt.h @@ -499,8 +499,8 @@ static inline int gen_opt_parse_fn(GenOpt *opts_list, int opts_list_nb, } for (int j = 0; j < opts_list_nb; j++) { - if (!strcmp(argv[i], opts_list[j].flag) || - !strcmp(argv[i], opts_list[j].flagname)) { + if ((opts_list[j].flag && !strcmp(argv[i], opts_list[j].flag)) || + (opts_list[j].flagname && !strcmp(argv[i], opts_list[j].flagname))) { l = &opts_list[j]; break; }
[=codec_id=]