From 6f853e405ff6d1f8f2ec8cce407968baca2a5b7b Mon Sep 17 00:00:00 2001 From: Lynne Date: Sat, 4 May 2024 04:49:38 +0200 Subject: [PATCH] Complete most of parsing, add packet merger Now all that's left to get basic file parsing is to channel the read and completed packets back through connection.c, and into input.c. --- libavtransport/buffer.c | 21 +- libavtransport/buffer.h | 53 +++- libavtransport/connection.c | 5 +- .../include/avtransport/connection.h | 16 +- libavtransport/include/avtransport/send.h | 2 +- libavtransport/include/avtransport/utils.h | 3 + libavtransport/io_dcb.c | 3 +- libavtransport/io_mmap.c | 55 ++-- libavtransport/ldpc_decode.c | 8 + libavtransport/ldpc_decode.h | 14 +- libavtransport/ldpc_encode.h | 6 +- libavtransport/merger.c | 258 ++++++++++++++++++ libavtransport/merger.h | 56 ++++ libavtransport/meson.build | 1 + libavtransport/output_packet.c | 1 + libavtransport/protocol_common.c | 4 +- libavtransport/protocol_common.h | 12 +- libavtransport/protocol_datagram.c | 87 +----- libavtransport/protocol_stream.c | 133 +++++---- libavtransport/utils_packet.h | 53 ++-- libavtransport/utils_packet_template.h | 58 +++- 21 files changed, 605 insertions(+), 244 deletions(-) create mode 100644 libavtransport/merger.c create mode 100644 libavtransport/merger.h diff --git a/libavtransport/buffer.c b/libavtransport/buffer.c index e600e39..37fc8b0 100644 --- a/libavtransport/buffer.c +++ b/libavtransport/buffer.c @@ -38,24 +38,12 @@ AVT_API AVTBuffer *avt_buffer_create(uint8_t *data, size_t len, if (!buf) return NULL; - buf->refcnt = malloc(sizeof(*buf->refcnt)); - if (!buf->refcnt) { + int err = avt_buffer_quick_create(buf, data, len, opaque, free_cb, 0); + if (err < 0) { free(buf); return NULL; } - atomic_init(buf->refcnt, 1); - - buf->base_data = data; - buf->end_data = data + len; - buf->data = data; - buf->len = len; - buf->opaque = opaque; - if (!free_cb) - buf->free = avt_buffer_default_free; - else - buf->free = free_cb; - return buf; } @@ -184,6 +172,11 @@ size_t avt_buffer_get_data_len(const AVTBuffer *buf) return buf->len; } +bool avt_buffer_read_only(AVTBuffer *buffer) +{ + return !!(buffer->flags & AVT_BUFFER_FLAG_READ_ONLY); +} + void avt_buffer_unref(AVTBuffer **_buf) { AVTBuffer *buf = *_buf; diff --git a/libavtransport/buffer.h b/libavtransport/buffer.h index 58d504e..b883878 100644 --- a/libavtransport/buffer.h +++ b/libavtransport/buffer.h @@ -34,6 +34,10 @@ #include #include "attributes.h" +enum AVTBufferFlags { + AVT_BUFFER_FLAG_READ_ONLY = 1 << 0, +}; + struct AVTBuffer { uint8_t *data; /* Current ref's view of the buffer */ size_t len; /* Current ref's size of the view of the buffer */ @@ -43,12 +47,53 @@ struct AVTBuffer { avt_free_cb free; void *opaque; + enum AVTBufferFlags flags; atomic_int *refcnt; }; void avt_buffer_update(AVTBuffer *buf, void *data, size_t len); int avt_buffer_resize(AVTBuffer *buf, size_t len); +static inline int avt_buffer_quick_create(AVTBuffer *buf, uint8_t *data, + size_t len, void *opaque, + avt_free_cb free_cb, + enum AVTBufferFlags flags) +{ + buf->refcnt = malloc(sizeof(*buf->refcnt)); + if (!buf->refcnt) + return AVT_ERROR(ENOMEM); + + atomic_init(buf->refcnt, 1); + + buf->base_data = data; + buf->end_data = data + len; + buf->data = data; + buf->len = len; + buf->opaque = opaque; + if (!free_cb) + buf->free = avt_buffer_default_free; + else + buf->free = free_cb; + + return 0; +} + +static inline uint8_t *avt_buffer_quick_alloc(AVTBuffer *buf, size_t len) +{ + void *alloc = malloc(len); + if (!alloc) + return NULL; + + int err = avt_buffer_quick_create(buf, alloc, len, NULL, + avt_buffer_default_free , 0); + if (err < 0) { + free(alloc); + return NULL; + } + + return alloc; +} + static inline void avt_buffer_quick_unref(AVTBuffer *buf) { if (!buf || !buf->refcnt) @@ -80,14 +125,6 @@ static inline void avt_buffer_quick_ref(AVTBuffer *dst, AVTBuffer *buf, dst->len = (len == AVT_BUFFER_REF_ALL) ? (dst->end_data - dst->data) : len; } -static inline void avt_buffer_move(AVTBuffer *dst, AVTBuffer **src) -{ - /* Move a reference to an already existing ref */ - avt_buffer_quick_unref(dst); - memcpy(dst, *src, sizeof(*dst)); - free(src); -} - int avt_buffer_offset(AVTBuffer *buf, ptrdiff_t offset); #endif diff --git a/libavtransport/connection.c b/libavtransport/connection.c index 60d15f8..83c5d26 100644 --- a/libavtransport/connection.c +++ b/libavtransport/connection.c @@ -123,8 +123,11 @@ int avt_connection_create(AVTContext *ctx, AVTConnection **_conn, goto fail; /* Protocol init */ + AVTProtocolOpts opts = { + .ldpc_iterations = info->input_opts.ldpc_iterations, + }; ret = avt_protocol_init(ctx, &conn->p, &conn->p_ctx, &conn->addr, - conn->io, conn->io_ctx); + conn->io, conn->io_ctx, &opts); if (ret < 0) goto fail; diff --git a/libavtransport/include/avtransport/connection.h b/libavtransport/include/avtransport/connection.h index 6513e2e..57c5697 100644 --- a/libavtransport/include/avtransport/connection.h +++ b/libavtransport/include/avtransport/connection.h @@ -175,11 +175,7 @@ typedef struct AVTConnectionInfo { * data with a particular offset. Returns the offset after * reading, or a negative error. * The buffer `data` is set to a buffer with the data requested. - * - * Note, `data` MAY be non-NULL. In which case, it will contain - * data from the previous read. The buffer must be expanded to - * include `len` more bytes. Only the data matters, the buffer - * may be recreated. */ + * Ownership of the buffer is not transferred. */ int64_t (*read)(void *opaque, AVTBuffer **data, size_t len, int64_t offset); @@ -207,15 +203,13 @@ typedef struct AVTConnectionInfo { /* Input options */ struct { - /* Probe data when opening. - * avt_connection_create() will block until the first packet is read. */ - bool probe; - /* Buffer size limit. Zero means automatic. Approximate/best effort. */ size_t buffer; - /* Always enable decoding and correction with LDPC codes. */ - bool force_ldpc; + /* Controls the number of iterations for LDPC decoding. Zero means + * automatic (very rarely), higher values increase overhead + * and decrease potential errors, negative values disables decoding. */ + int ldpc_iterations; } input_opts; struct { diff --git a/libavtransport/include/avtransport/send.h b/libavtransport/include/avtransport/send.h index ffc7cf6..3fbb2e4 100644 --- a/libavtransport/include/avtransport/send.h +++ b/libavtransport/include/avtransport/send.h @@ -64,7 +64,7 @@ typedef struct AVTSenderOptions { /* Compression level. Zero means automatic (the default for each compressor). */ int compress_level; - /* Set to true to enable sending hash packets. */ + /* Set to true to enable sending hash packets for all packets with a payload. */ bool hash; } AVTSenderOptions; diff --git a/libavtransport/include/avtransport/utils.h b/libavtransport/include/avtransport/utils.h index c8e2e09..a30b20c 100644 --- a/libavtransport/include/avtransport/utils.h +++ b/libavtransport/include/avtransport/utils.h @@ -83,6 +83,9 @@ AVT_API void avt_buffer_default_free(void *opaque, void *base_data, size_t len); AVT_API AVTBuffer *avt_buffer_create(uint8_t *data, size_t len, void *opaque, avt_free_cb free); +/* Check if buffer is writable */ +AVT_API bool avt_buffer_read_only(AVTBuffer *buffer); + /* Create and allocate a reference counted buffer. */ AVT_API AVTBuffer *avt_buffer_alloc(size_t len); diff --git a/libavtransport/io_dcb.c b/libavtransport/io_dcb.c index 5af525e..cd2e8da 100644 --- a/libavtransport/io_dcb.c +++ b/libavtransport/io_dcb.c @@ -81,9 +81,8 @@ static int64_t dcb_input(AVTIOCtx *io, AVTBuffer *buf, size_t len, uint8_t *src_data = avt_buffer_get_data(tmp, &src_len); memcpy(dst_data, src_data, AVT_MIN(len, src_len)); - avt_buffer_unref(&tmp); } else { - avt_buffer_move(buf, &tmp); + avt_buffer_quick_ref(buf, tmp, 0, AVT_BUFFER_REF_ALL); } return (io->rpos = ret); diff --git a/libavtransport/io_mmap.c b/libavtransport/io_mmap.c index e85cc9a..33cbaf5 100644 --- a/libavtransport/io_mmap.c +++ b/libavtransport/io_mmap.c @@ -46,7 +46,7 @@ struct AVTIOCtx { int fd; - AVTBuffer *map; + AVTBuffer map; avt_pos rpos; avt_pos wpos; @@ -63,7 +63,7 @@ static void mmap_buffer_free(void *opaque, void *base_data, size_t size) static COLD int mmap_close(AVTIOCtx **_io) { AVTIOCtx *io = *_io; - avt_buffer_unref(&io->map); + avt_buffer_quick_unref(&io->map); if (io->file_grew) ftruncate(io->fd, io->wpos); close(io->fd); @@ -102,13 +102,14 @@ static COLD int mmap_init_common(AVTContext *ctx, AVTIOCtx *io) return ret; } - io->map = avt_buffer_create(data, len, - (void *)((intptr_t)fd_dup), - mmap_buffer_free); - if (!io->map) { + ret = avt_buffer_quick_create(&io->map, data, len, + (void *)((intptr_t)fd_dup), + mmap_buffer_free, + AVT_BUFFER_FLAG_READ_ONLY); + if (ret < 0) { munmap(data, len); close(fd_dup); - return AVT_ERROR(ENOMEM); + return ret; } return 0; @@ -170,7 +171,7 @@ static int mmap_grow(AVTIOCtx *io, size_t amount) { int ret; size_t old_map_size; - uint8_t *old_map = avt_buffer_get_data(io->map, &old_map_size); + uint8_t *old_map = avt_buffer_get_data(&io->map, &old_map_size); void *new_map = MAP_FAILED; amount = AVT_MAX(amount, MIN_ALLOC); @@ -190,13 +191,13 @@ static int mmap_grow(AVTIOCtx *io, size_t amount) * If there's only a single reference to the map (ours), allow it to be * moved. */ new_map = mremap(old_map, old_map_size, new_map_size, - avt_buffer_get_refcount(io->map) == 1 ? MREMAP_MAYMOVE : - 0); + avt_buffer_get_refcount(&io->map) == 1 ? MREMAP_MAYMOVE : + 0); /* Success */ if (new_map != MAP_FAILED) { /* Update the existing buffer */ - avt_buffer_update(io->map, new_map, new_map_size); + avt_buffer_update(&io->map, new_map, new_map_size); return 0; } else if (new_map == MAP_FAILED && errno != ENOMEM) { ret = avt_handle_errno(io, "Error in mremap(): %i %s\n"); @@ -225,16 +226,18 @@ static int mmap_grow(AVTIOCtx *io, size_t amount) return ret; } - AVTBuffer *new_buf = avt_buffer_create(data, new_map_size, - (void *)((intptr_t)fd_dup), - mmap_buffer_free); - if (!io->map) { + AVTBuffer new_buf; + ret = avt_buffer_quick_create(&new_buf, data, new_map_size, + (void *)((intptr_t)fd_dup), + mmap_buffer_free, + AVT_BUFFER_FLAG_READ_ONLY); + if (ret < 0) { munmap(data, new_map_size); close(fd_dup); - return AVT_ERROR(ENOMEM); + return ret; } - avt_buffer_unref(&io->map); + avt_buffer_quick_unref(&io->map); io->map = new_buf; return 0; @@ -248,7 +251,7 @@ static int mmap_max_pkt_len(AVTIOCtx *io, size_t *mtu) static inline avt_pos mmap_seek(AVTIOCtx *io, avt_pos pos) { - if (pos > avt_buffer_get_data_len(io->map)) + if (pos > avt_buffer_get_data_len(&io->map)) return AVT_ERROR(ERANGE); return (io->rpos = pos); } @@ -259,7 +262,7 @@ static avt_pos mmap_write_pkt(AVTIOCtx *io, AVTPktd *p, int64_t timeout) uint8_t *pl_data = avt_buffer_get_data(&p->pl, &pl_len); size_t map_size; - uint8_t *map_data = avt_buffer_get_data(io->map, &map_size); + uint8_t *map_data = avt_buffer_get_data(&io->map, &map_size); if ((io->wpos + p->hdr_len + pl_len) > map_size) { int ret = mmap_grow(io, p->hdr_len + pl_len); @@ -267,7 +270,7 @@ static avt_pos mmap_write_pkt(AVTIOCtx *io, AVTPktd *p, int64_t timeout) return ret; } - map_data = avt_buffer_get_data(io->map, &map_size); + map_data = avt_buffer_get_data(&io->map, &map_size); memcpy(&map_data[io->wpos], p->hdr, p->hdr_len); memcpy(&map_data[io->wpos + p->hdr_len], pl_data, pl_len); @@ -285,7 +288,7 @@ static avt_pos mmap_write_vec(AVTIOCtx *io, AVTPktd *iov, uint32_t nb_iov, sum += iov[i].hdr_len + avt_buffer_get_data_len(&iov[i].pl); size_t map_size; - uint8_t *map_data = avt_buffer_get_data(io->map, &map_size); + uint8_t *map_data = avt_buffer_get_data(&io->map, &map_size); if ((io->wpos + sum) > map_size) { int ret = mmap_grow(io, sum); @@ -294,7 +297,7 @@ static avt_pos mmap_write_vec(AVTIOCtx *io, AVTPktd *iov, uint32_t nb_iov, } avt_pos offset = io->wpos; - map_data = avt_buffer_get_data(io->map, &map_size); + map_data = avt_buffer_get_data(&io->map, &map_size); for (int i = 0; i < nb_iov; i++) { pl_data = avt_buffer_get_data(&iov[i].pl, &pl_len); memcpy(&map_data[offset], iov[i].hdr, iov[i].hdr_len); @@ -313,7 +316,7 @@ static avt_pos mmap_rewrite(AVTIOCtx *io, AVTPktd *p, avt_pos off, uint8_t *pl_data = avt_buffer_get_data(&p->pl, &pl_len); size_t map_size; - uint8_t *map_data = avt_buffer_get_data(io->map, &map_size); + uint8_t *map_data = avt_buffer_get_data(&io->map, &map_size); if ((off + p->hdr_len + pl_len) > map_size) return AVT_ERROR(ERANGE); @@ -329,7 +332,7 @@ static inline int64_t mmap_read(AVTIOCtx *io, AVTBuffer *dst, enum AVTIOReadFlags flags) { size_t map_size; - uint8_t *map_data = avt_buffer_get_data(io->map, &map_size); + uint8_t *map_data = avt_buffer_get_data(&io->map, &map_size); /* TODO: cap map_size at wpos? */ len = AVT_MIN(map_size - io->rpos, len); @@ -339,7 +342,7 @@ static inline int64_t mmap_read(AVTIOCtx *io, AVTBuffer *dst, memcpy(dst, &map_data[io->rpos], len); } else { avt_buffer_quick_unref(dst); - avt_buffer_quick_ref(dst, io->map, io->rpos, len); + avt_buffer_quick_ref(dst, &io->map, io->rpos, len); } len = io->rpos + len; @@ -350,7 +353,7 @@ static inline int64_t mmap_read(AVTIOCtx *io, AVTBuffer *dst, static int mmap_flush(AVTIOCtx *io, int64_t timeout) { size_t map_size; - uint8_t *map_data = avt_buffer_get_data(io->map, &map_size); + uint8_t *map_data = avt_buffer_get_data(&io->map, &map_size); int ret = msync(map_data, map_size, timeout == 0 ? MS_ASYNC : MS_SYNC); if (ret < 0) diff --git a/libavtransport/ldpc_decode.c b/libavtransport/ldpc_decode.c index 3e5cbc5..0d6e6d3 100644 --- a/libavtransport/ldpc_decode.c +++ b/libavtransport/ldpc_decode.c @@ -25,3 +25,11 @@ */ #include "ldpc_decode.h" + +void avt_ldpc_decode_288_224(uint8_t *dst, int iterations) +{ +} + +void avt_ldpc_decode_2784_2016(uint8_t *dst, int iterations) +{ +} diff --git a/libavtransport/ldpc_decode.h b/libavtransport/ldpc_decode.h index 6b4ef04..05c723d 100644 --- a/libavtransport/ldpc_decode.h +++ b/libavtransport/ldpc_decode.h @@ -24,7 +24,15 @@ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -#ifndef LIBAVTRANSPORT_LDPC_DECODE -#define LIBAVTRANSPORT_LDPC_DECODE +#ifndef AVTRANSPORT_LDPC_DECODE +#define AVTRANSPORT_LDPC_DECODE -#endif +#include + +/* Decodes systematic LDPC codes. dst must point to the start of the message. */ + +void avt_ldpc_decode_288_224(uint8_t *dst, int iterations); + +void avt_ldpc_decode_2784_2016(uint8_t *dst, int iterations); + +#endif /* AVTRANSPORT_LDPC_DECODE */ diff --git a/libavtransport/ldpc_encode.h b/libavtransport/ldpc_encode.h index 07d252a..64156b3 100644 --- a/libavtransport/ldpc_encode.h +++ b/libavtransport/ldpc_encode.h @@ -24,8 +24,8 @@ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -#ifndef LIBAVTRANSPORT_LDPC_ENCODE -#define LIBAVTRANSPORT_LDPC_ENCODE +#ifndef AVTRANSPORT_LDPC_ENCODE +#define AVTRANSPORT_LDPC_ENCODE #include @@ -36,4 +36,4 @@ void avt_ldpc_encode_288_224(uint8_t *dst); void avt_ldpc_encode_2784_2016(uint8_t *dst); -#endif +#endif /* AVTRANSPORT_LDPC_ENCODE */ diff --git a/libavtransport/merger.c b/libavtransport/merger.c new file mode 100644 index 0000000..df65d5c --- /dev/null +++ b/libavtransport/merger.c @@ -0,0 +1,258 @@ +/* + * Copyright © 2024, Lynne + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include + +#include "merger.h" + +#include "packet_decode.h" +#include "bytestream.h" +#include "buffer.h" +#include "utils_packet.h" +#include "mem.h" + +static inline int fill_ranges(AVTMerger *m, uint32_t seg_off, uint32_t seg_size) +{ + int dst_idx = 0; + int consolidate_idx = -1; + for (auto i = 0; i < m->nb_ranges; i++) { + AVTMergerRange *r = &m->ranges[i]; + if ((seg_off + seg_size) == r->offset) { + /* Prepend to existing range */ + r->offset = seg_off; + r->size += seg_size; + + /* If the previous range is not completed with this one, exit */ + if (!i || ((m->ranges[i - 1].offset + m->ranges[i - 1].size) < r->offset)) + return AVT_ERROR(EAGAIN); + + consolidate_idx = i - 1; + break; + } else if ((r->offset + r->size) == seg_off) { + /* Extend exisint range */ + r->size += seg_size; + + if ((i == (m->nb_ranges - 1)) || ((r->offset + r->size) < m->ranges[i + 1].offset)) + return AVT_ERROR(EAGAIN); + + consolidate_idx = i; + break; + } else if (seg_off > (r->offset + r->size)) { + dst_idx = i + 1; + } + } + + /* Add a range */ + if (consolidate_idx < 0) { + if ((m->nb_ranges + 1) > m->ranges_allocated) { + AVTMergerRange *tmp = avt_reallocarray(m->ranges, + (m->nb_ranges + 1) << 1, + sizeof(*tmp)); + if (!tmp) + return AVT_ERROR(ENOMEM); + + m->ranges = tmp; + m->ranges_allocated = (m->nb_ranges + 1) << 1; + } + + memmove(&m->ranges[dst_idx + 1], &m->ranges[dst_idx], m->nb_ranges - dst_idx - 1); + m->nb_ranges++; + return AVT_ERROR(EAGAIN); + } + + m->ranges[consolidate_idx].size = m->ranges[consolidate_idx + 1].size; + memmove(&m->ranges[consolidate_idx + 1], &m->ranges[consolidate_idx + 2], + m->nb_ranges - consolidate_idx - 2); + m->nb_ranges--; + + return AVT_ERROR(EAGAIN); +} + +static inline int fill_phantom_header(void *log_ctx, AVTMerger *m, AVTPktd *p) +{ + int hdr_part = p->pkt.seq % 7; + memcpy(&m->p.hdr[4*hdr_part], p->pkt.generic_segment.header_7, 4); + m->hdr_mask |= 1 << (6 - hdr_part); + + /* We're not ready to reconstruct the header yet */ + if (!(m->hdr_mask == 0x7F)) + return 0; + + uint16_t tgt_desc = AVT_RB16(&m->p.hdr[0]); + if ((tgt_desc == (AVT_PKT_TIME_SYNC & 0xFF00)) || + (tgt_desc == (AVT_PKT_STREAM_DATA & 0xFF00))) + tgt_desc &= 0xFF00; + + AVTBytestream bs = avt_bs_init(m->p.hdr, sizeof(m->p.hdr)); + + // TODO more sanity checking + + switch (tgt_desc) { + case AVT_PKT_STREAM_DATA & ~(AVT_PKT_FLAG_LSB_BITMASK): + avt_decode_stream_data(&bs, &m->p.pkt.stream_data); + break; + case AVT_PKT_LUT_ICC: + avt_decode_lut_icc(&bs, &m->p.pkt.lut_icc); + /* name will be missing */ + break; + case AVT_PKT_FONT_DATA: + avt_decode_font_data(&bs, &m->p.pkt.font_data); + /* name will be missing */ + break; + case AVT_PKT_USER_DATA: + avt_decode_user_data(&bs, &m->p.pkt.user_data); + break; + case AVT_PKT_STREAM_CONFIG: [[fallthrough]]; + case AVT_PKT_METADATA: + avt_decode_generic_data(&bs, &m->p.pkt.generic_data); + break; + default: + /* Any other packets may not be segmented. Perhaps the top part of the + * reconstructed segment is faulty. Invalidate it. */ + m->hdr_mask &= 0x3F; + + /* If the current packet contributed this part, skip copying the + * payload, as it's likely wrong. Otherwise continue. */ + return hdr_part == 0 ? AVT_ERROR(EAGAIN) : 0; + } + + m->p_avail = true; + + return 0; +} + +int avt_pkt_merge_seg(void *log_ctx, AVTMerger *m, AVTPktd *p) +{ + uint32_t seg_off, seg_size, tot_size; + int ret = avt_packet_series(p, &seg_off, &seg_size, &tot_size); + + /* Packet needs nothing else */ + if (!ret) + return 0; + + size_t src_size; + uint8_t *src = avt_buffer_get_data(&p->pl, &src_size); + if (src_size != seg_size) { + avt_log(log_ctx, AVT_LOG_ERROR, "Mismatch between signalled payload size " + "%u and payload %zu in %" PRIu64, + seg_size, src_size, p->pkt.seq); + } + + if (!m->active) { + m->hdr_mask = 0x0; + m->nb_ranges = 0; + m->pkt_len_track = 0; + m->target_tot_len = tot_size; + + /* Starting with a segment, not the actual start */ + if (ret > 0) { + m->p = *p; + m->p_avail = true; + m->hdr_mask = 0x7F; + m->target = p->pkt.seq; + } else { + int hdr_part = p->pkt.seq % 7; + memcpy(&m->p.hdr[4*hdr_part], p->pkt.generic_segment.header_7, 4); + m->hdr_mask |= 1 << (6 - hdr_part); + m->target = p->pkt.generic_segment.target_seq; + m->p_avail = false; + } + + if ((m->nb_ranges + 1) > m->ranges_allocated) { + AVTMergerRange *tmp = avt_reallocarray(m->ranges, + (m->nb_ranges + 1) << 1, + sizeof(*tmp)); + if (!tmp) + return AVT_ERROR(ENOMEM); + + m->ranges = tmp; + m->ranges_allocated = (m->nb_ranges + 1) << 1; + } + + /* In case we have a read-only buffer, copy the data */ + if (seg_off || avt_buffer_read_only(&p->pl)) { + AVTBuffer tmp; + uint8_t *dst = avt_buffer_quick_alloc(&tmp, tot_size); + if (!dst) + return AVT_ERROR(ENOMEM); + + memcpy(dst + seg_off, src, src_size); + avt_buffer_quick_unref(&p->pl); + m->p.pl = tmp; + } else { + /* Resize the buffer if possible */ + int err = avt_buffer_resize(&p->pl, tot_size); + if (err < 0) + return err; + + /* In case starting with a segment, m->p is bare */ + m->p.pl = p->pl; + } + + m->ranges[m->nb_ranges++] = (AVTMergerRange) { seg_off, src_size }; + m->active = true; + m->pkt_len_track += seg_size; + + return AVT_ERROR(EAGAIN); + } + + /* Sanity checking */ + if (ret == 0 && m->target == p->pkt.seq) { + avt_log(log_ctx, AVT_LOG_WARN, "Header packet for %" PRIu64 " indicates " + "no segmentation, but segments received", + p->pkt.seq); + } + + if (!m->p_avail && (ret < -1)) { + ret = fill_phantom_header(log_ctx, m, p); + if (ret < 0) + return ret; + } else if (ret == 0) { + m->p.pkt = p->pkt; + memcpy(m->p.hdr, p->hdr, sizeof(p->hdr)); + m->p_avail = true; + } + + uint8_t *dst = avt_buffer_get_data(&m->p.pl, NULL); + memcpy(dst + seg_off, src, src_size); + avt_buffer_quick_unref(&p->pl); + m->pkt_len_track += seg_size; + + if (m->pkt_len_track == m->target_tot_len) { + /* As each packet with a payload is required to have a non-zero amount + * of payload, this should be impossible, as reconstructed headers don't + * contain any payload. */ + avt_assert1(m->p_avail); + *p = m->p; + m->active = false; + return 0; + } + + /* Merge ranges */ + return fill_ranges(m, seg_off, src_size /* Use the real segment size here */); +} diff --git a/libavtransport/merger.h b/libavtransport/merger.h new file mode 100644 index 0000000..ffcf671 --- /dev/null +++ b/libavtransport/merger.h @@ -0,0 +1,56 @@ +/* + * Copyright © 2024, Lynne + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef AVTRANSPORT_MERGER_H +#define AVTRANSPORT_MERGER_H + +#include "packet_common.h" + +typedef struct AVTMergerRange { + uint32_t offset; + uint32_t size; +} AVTMergerRange; + +/* One merger per seq ID */ +typedef struct AVTMerger { + bool active; + uint32_t target; + uint32_t target_tot_len; + + AVTPktd p; + bool p_avail; + uint32_t pkt_len_track; + + AVTMergerRange *ranges; + uint32_t nb_ranges; + uint32_t ranges_allocated; + + uint8_t hdr_mask; // 1 bit per 32-bits +} AVTMerger; + +int avt_pkt_merge_seg(void *log_ctx, AVTMerger *m, AVTPktd *p); + +#endif /* AVTRANSPORT_MERGER_H */ diff --git a/libavtransport/meson.build b/libavtransport/meson.build index 6ff830d..87daf2c 100644 --- a/libavtransport/meson.build +++ b/libavtransport/meson.build @@ -66,6 +66,7 @@ sources = [ 'ldpc_encode.c', 'reorder.c', + 'merger.c', 'ldpc_decode.c', avtransport_spec_pkt_headers, diff --git a/libavtransport/output_packet.c b/libavtransport/output_packet.c index 173240c..e3bc771 100644 --- a/libavtransport/output_packet.c +++ b/libavtransport/output_packet.c @@ -109,6 +109,7 @@ static inline enum AVTDataCompression compress_method(AVTPktd *p, return AVT_DATA_COMPRESSION_ZSTD; break; /* Permit user data to be compressed as well */ + case AVT_PKT_STREAM_CONFIG: [[fallthrough]]; case AVT_PKT_USER_DATA: [[fallthrough]]; case AVT_PKT_LUT_ICC: if (opts->compress & AVT_SENDER_COMPRESS_AUX) diff --git a/libavtransport/protocol_common.c b/libavtransport/protocol_common.c index e431e63..05b93bf 100644 --- a/libavtransport/protocol_common.c +++ b/libavtransport/protocol_common.c @@ -52,7 +52,7 @@ static const AVTProtocol *avt_protocol_list[AVT_PROTOCOL_MAX] = { /* For connections to call */ int avt_protocol_init(AVTContext *ctx, const AVTProtocol **_p, AVTProtocolCtx **p_ctx, AVTAddress *addr, - const AVTIO *io, AVTIOCtx *io_ctx) + const AVTIO *io, AVTIOCtx *io_ctx, AVTProtocolOpts *opts) { const AVTProtocol *p = avt_protocol_list[addr->proto]; if (!p) { @@ -61,7 +61,7 @@ int avt_protocol_init(AVTContext *ctx, const AVTProtocol **_p, return AVT_ERROR(ENOTSUP); } - int err = p->init(ctx, p_ctx, addr, io, io_ctx); + int err = p->init(ctx, p_ctx, addr, io, io_ctx, opts); if (err >= 0) *_p = p; diff --git a/libavtransport/protocol_common.h b/libavtransport/protocol_common.h index ae1a2c7..f8d8924 100644 --- a/libavtransport/protocol_common.h +++ b/libavtransport/protocol_common.h @@ -31,6 +31,10 @@ #include "utils_internal.h" #include "io_common.h" +typedef struct AVTProtocolOpts { + int ldpc_iterations; +} AVTProtocolOpts; + /* High level interface */ typedef struct AVTProtocolCtx AVTProtocolCtx; typedef struct AVTProtocol { @@ -39,7 +43,7 @@ typedef struct AVTProtocol { /* Initialize a context */ int (*init)(AVTContext *ctx, AVTProtocolCtx **p, AVTAddress *addr, - const AVTIO *io, AVTIOCtx *io_ctx); + const AVTIO *io, AVTIOCtx *io_ctx, AVTProtocolOpts *opts); /* Attempt to add a secondary destination, NULL if unsupported */ int (*add_dst)(AVTProtocolCtx *p, AVTAddress *addr); @@ -63,8 +67,7 @@ typedef struct AVTProtocol { void **series, int64_t pos); /* Receive a packet. Returns offset after reading. */ - int (*receive_packet)(AVTProtocolCtx *p, - union AVTPacketData *pkt, AVTBuffer **pl, + int (*receive_packet)(AVTProtocolCtx *s, AVTPktd *p, int64_t timeout); /* Seek to a place in the stream */ @@ -80,7 +83,8 @@ typedef struct AVTProtocol { COLD int avt_protocol_init(AVTContext *ctx, const AVTProtocol **_p, AVTProtocolCtx **p_ctx, AVTAddress *addr, - const AVTIO *io, AVTIOCtx *io_ctx); + const AVTIO *io, AVTIOCtx *io_ctx, + AVTProtocolOpts *opts); typedef struct AVTIndexContext { AVTIndexEntry *index; diff --git a/libavtransport/protocol_datagram.c b/libavtransport/protocol_datagram.c index dd7600a..e6e11f7 100644 --- a/libavtransport/protocol_datagram.c +++ b/libavtransport/protocol_datagram.c @@ -35,6 +35,7 @@ struct AVTProtocolCtx { const AVTIO *io; AVTIOCtx *io_ctx; + AVTProtocolOpts opts; }; static COLD int datagram_proto_close(AVTProtocolCtx **p) @@ -46,7 +47,7 @@ static COLD int datagram_proto_close(AVTProtocolCtx **p) } static COLD int datagram_proto_init(AVTContext *ctx, AVTProtocolCtx **_p, AVTAddress *addr, - const AVTIO *io, AVTIOCtx *io_ctx) + const AVTIO *io, AVTIOCtx *io_ctx, AVTProtocolOpts *opts) { AVTProtocolCtx *p = malloc(sizeof(*p)); if (!p) @@ -54,6 +55,8 @@ static COLD int datagram_proto_init(AVTContext *ctx, AVTProtocolCtx **_p, AVTAdd p->io = io; p->io_ctx = io_ctx; + p->opts = *opts; + *_p = p; return 0; @@ -91,90 +94,10 @@ static int datagram_proto_send_seq(AVTProtocolCtx *p, AVTPacketFifo *seq, return 0; } -static int datagram_proto_receive_packet(AVTProtocolCtx *p, - union AVTPacketData *pkt, AVTBuffer **pl, +static int datagram_proto_receive_packet(AVTProtocolCtx *s, AVTPktd *p, int64_t timeout) { -#if 0 - AVTBuffer *buf; - int64_t err = p->io->read_input(p->io_ctx, &buf, 0, timeout); - if (err < 0) - return err; - - size_t size; - uint8_t *data = avt_buffer_get_data(buf, &size); - - - - size_t pl_bytes = 0; - AVTBytestream bs = avt_bs_init(data, size); - - switch (desc) { - case AVT_PKT_SESSION_START: - avt_decode_session_start(&bs, &pkt->session_start); - return 0; - case AVT_PKT_TIME_SYNC & ~(AVT_PKT_FLAG_LSB_BITMASK): - avt_decode_time_sync(&bs, &pkt->time_sync); - return 0; - case AVT_PKT_VIDEO_INFO: - avt_decode_video_info(&bs, &pkt->video_info); - return 0; - case AVT_PKT_VIDEO_ORIENTATION: - avt_decode_video_orientation(&bs, &pkt->video_orientation); - return 0; - case AVT_PKT_STREAM_REGISTRATION: - avt_decode_stream_registration(&bs, &pkt->stream_registration); - return 0; - case AVT_PKT_STREAM_END: - avt_decode_stream_end(&bs, &pkt->stream_end); - return 0; - case AVT_PKT_STREAM_INDEX: - avt_decode_stream_index(&bs, &pkt->stream_index); - - err = avt_index_list_parse(&p->ic, &bs, &pkt->stream_index); - if (err < 0) - return err; - - return AVT_ERROR(EAGAIN); - case AVT_PKT_STREAM_DATA & ~(AVT_PKT_FLAG_LSB_BITMASK): - avt_decode_stream_data(&bs, &pkt->stream_data); - pl_bytes = pkt->stream_data.data_length; - break; - case AVT_PKT_LUT_ICC: [[fallthrough]]; - case AVT_PKT_FONT_DATA: [[fallthrough]]; - case AVT_PKT_METADATA: [[fallthrough]]; - case AVT_PKT_USER_DATA: [[fallthrough]]; - case AVT_PKT_STREAM_CONFIG: - avt_decode_generic_data(&bs, &pkt->generic_data); - pl_bytes = pkt->generic_data.payload_length; - break; - case AVT_PKT_LUT_ICC_SEGMENT: [[fallthrough]]; - case AVT_PKT_FONT_DATA_SEGMENT: [[fallthrough]]; - case AVT_PKT_METADATA_SEGMENT: [[fallthrough]]; - case AVT_PKT_USER_DATA_SEGMENT: [[fallthrough]]; - case AVT_PKT_STREAM_DATA_SEGMENT: [[fallthrough]]; - case AVT_PKT_STREAM_CONFIG_SEGMENT: - avt_decode_generic_segment(&bs, &pkt->generic_segment); - pl_bytes = pkt->generic_segment.seg_length; - break; - case AVT_PKT_LUT_ICC_PARITY: [[fallthrough]]; - case AVT_PKT_FONT_DATA_PARITY: [[fallthrough]]; - case AVT_PKT_METADATA_PARITY: [[fallthrough]]; - case AVT_PKT_USER_DATA_PARITY: [[fallthrough]]; - case AVT_PKT_STREAM_DATA_PARITY: [[fallthrough]]; - case AVT_PKT_STREAM_CONFIG_PARITY: - avt_decode_generic_parity(&bs, &pkt->generic_parity); - pl_bytes = pkt->generic_parity.parity_data_length; - break; - default: - avt_log(p, AVT_LOG_ERROR, "Unknown descriptor 0x%x received\n", desc); - return AVT_ERROR(ENOTSUP); - }; - - - return err; -#endif return 0; } diff --git a/libavtransport/protocol_stream.c b/libavtransport/protocol_stream.c index d43ec55..096e8a8 100644 --- a/libavtransport/protocol_stream.c +++ b/libavtransport/protocol_stream.c @@ -33,11 +33,12 @@ #include "protocol_common.h" #include "io_common.h" #include "bytestream.h" +#include "ldpc_decode.h" struct AVTProtocolCtx { const AVTIO *io; AVTIOCtx *io_ctx; - AVTBuffer *hdr; + AVTProtocolOpts opts; AVTIndexContext ic; }; @@ -45,14 +46,13 @@ struct AVTProtocolCtx { static COLD int stream_proto_close(AVTProtocolCtx **_p) { AVTProtocolCtx *p = *_p; - avt_buffer_unref(&p->hdr); free(p); *_p = NULL; return 0; } static COLD int stream_init(AVTContext *ctx, AVTProtocolCtx **_p, AVTAddress *addr, - const AVTIO *io, AVTIOCtx *io_ctx) + const AVTIO *io, AVTIOCtx *io_ctx, AVTProtocolOpts *opts) { AVTProtocolCtx *p = malloc(sizeof(*p)); if (!p) @@ -60,11 +60,7 @@ static COLD int stream_init(AVTContext *ctx, AVTProtocolCtx **_p, AVTAddress *ad p->io = io; p->io_ctx = io_ctx; - p->hdr = avt_buffer_alloc(AVT_MAX_HEADER_LEN); - if (!p->hdr) { - stream_proto_close(&p); - return AVT_ERROR(ENOMEM); - } + p->opts = *opts; *_p = p; @@ -103,100 +99,125 @@ static int stream_send_seq(AVTProtocolCtx *p, return 0; } -static int stream_receive_packet(AVTProtocolCtx *p, - union AVTPacketData *pkt, AVTBuffer **pl, +static int stream_receive_packet(AVTProtocolCtx *s, AVTPktd *p, int64_t timeout) { int64_t err = 0; - uint8_t hdr_buf[AVT_MAX_HEADER_LEN]; AVTBuffer buf = { - .base_data = hdr_buf, - .data = hdr_buf, - .end_data = &hdr_buf[AVT_MAX_HEADER_LEN], - .len = AVT_MAX_HEADER_LEN, + .base_data = p->hdr, + .data = p->hdr, + .end_data = &p->hdr[sizeof(p->hdr)], + .len = sizeof(p->hdr), }; /* Get the minimum header size */ - err = p->io->read_input(p->io_ctx, &buf, AVT_MIN_HEADER_LEN, + err = s->io->read_input(s->io_ctx, &buf, AVT_MIN_HEADER_LEN, timeout, AVT_IO_READ_MUTABLE); if (err < 0) return err; - /* TODO: part 1 of header FEC would happen here */ + /* Check LDPC codes */ + avt_ldpc_decode_288_224(buf.data, s->opts.ldpc_iterations); + + uint16_t desc = AVT_RB16(&buf.data[0]); - const uint16_t desc = AVT_RB16(&hdr_buf[0]); + /* For identification purposes, zero out any bitmask bits */ + if ((desc == (AVT_PKT_TIME_SYNC & 0xFF00)) || + (desc == (AVT_PKT_STREAM_DATA & 0xFF00))) + desc &= 0xFF00; /* Get the rest of the header */ const int left_size = avt_pkt_hdr_size(desc) - AVT_MIN_HEADER_LEN; - if (left_size) { + if (left_size > 0) { buf.data += AVT_MIN_HEADER_LEN; buf.len -= AVT_MIN_HEADER_LEN; - err = p->io->read_input(p->io_ctx, &buf, left_size, + err = s->io->read_input(s->io_ctx, &buf, left_size, timeout, AVT_IO_READ_MUTABLE); if (err < 0) return err; - /* TODO: part 2 of header FEC would happen here */ + /* Check LDPC codes */ + switch (left_size) { + case 36: avt_ldpc_decode_288_224(buf.data, s->opts.ldpc_iterations); + break; + case 384: avt_ldpc_decode_2784_2016(buf.data, s->opts.ldpc_iterations); + break; + default: + break; + } buf.data += AVT_MIN_HEADER_LEN; buf.len -= AVT_MIN_HEADER_LEN; } + p->hdr_len = buf.data - buf.base_data; + size_t pl_bytes = 0; - AVTBytestream bs = avt_bs_init(buf.base_data, buf.data - buf.base_data); + AVTBytestream bs = avt_bs_init(p->hdr, sizeof(p->hdr)); switch (desc) { case AVT_PKT_SESSION_START: - avt_decode_session_start(&bs, &pkt->session_start); + avt_decode_session_start(&bs, &p->pkt.session_start); return 0; case AVT_PKT_TIME_SYNC & ~(AVT_PKT_FLAG_LSB_BITMASK): - avt_decode_time_sync(&bs, &pkt->time_sync); + avt_decode_time_sync(&bs, &p->pkt.time_sync); return 0; case AVT_PKT_VIDEO_INFO: - avt_decode_video_info(&bs, &pkt->video_info); + avt_decode_video_info(&bs, &p->pkt.video_info); return 0; case AVT_PKT_VIDEO_ORIENTATION: - avt_decode_video_orientation(&bs, &pkt->video_orientation); + avt_decode_video_orientation(&bs, &p->pkt.video_orientation); return 0; case AVT_PKT_STREAM_REGISTRATION: - avt_decode_stream_registration(&bs, &pkt->stream_registration); + avt_decode_stream_registration(&bs, &p->pkt.stream_registration); return 0; case AVT_PKT_STREAM_END: - avt_decode_stream_end(&bs, &pkt->stream_end); + avt_decode_stream_end(&bs, &p->pkt.stream_end); return 0; case AVT_PKT_STREAM_INDEX: - avt_decode_stream_index(&bs, &pkt->stream_index); - pl_bytes = pkt->stream_index.nb_indices * AVT_PKT_INDEX_ENTRY_SIZE; + avt_decode_stream_index(&bs, &p->pkt.stream_index); + pl_bytes = p->pkt.stream_index.nb_indices * AVT_PKT_INDEX_ENTRY_SIZE; + + // TODO: store buffer in state + AVTBuffer *tmp = avt_buffer_alloc(pl_bytes); + if (!tmp) + return AVT_ERROR(ENOMEM); /* Read index entries */ - buf.data = buf.base_data; - buf.len = buf.end_data - buf.base_data; - err = p->io->read_input(p->io_ctx, &buf, pl_bytes, - timeout, AVT_IO_READ_MUTABLE); + err = s->io->read_input(s->io_ctx, tmp, pl_bytes, + timeout, 0x0); if (err < 0) return err; - bs = avt_bs_init(buf.base_data, buf.data - buf.base_data); + size_t index_size; + uint8_t *index_data = avt_buffer_get_data(tmp, &index_size); + + bs = avt_bs_init(index_data, index_size); /* Parse */ - err = avt_index_list_parse(&p->ic, &bs, &pkt->stream_index); + err = avt_index_list_parse(&s->ic, &bs, &p->pkt.stream_index); + avt_buffer_unref(&tmp); if (err < 0) return err; + /* Bypass reordering */ return AVT_ERROR(EAGAIN); case AVT_PKT_STREAM_DATA & ~(AVT_PKT_FLAG_LSB_BITMASK): - avt_decode_stream_data(&bs, &pkt->stream_data); - pl_bytes = pkt->stream_data.data_length; + avt_decode_stream_data(&bs, &p->pkt.stream_data); + pl_bytes = p->pkt.stream_data.data_length; break; - case AVT_PKT_LUT_ICC: [[fallthrough]]; - case AVT_PKT_FONT_DATA: [[fallthrough]]; - case AVT_PKT_METADATA: [[fallthrough]]; - case AVT_PKT_USER_DATA: [[fallthrough]]; - case AVT_PKT_STREAM_CONFIG: - avt_decode_generic_data(&bs, &pkt->generic_data); - pl_bytes = pkt->generic_data.payload_length; + case AVT_PKT_LUT_ICC: + avt_decode_lut_icc(&bs, &p->pkt.lut_icc); + case AVT_PKT_FONT_DATA: + avt_decode_font_data(&bs, &p->pkt.font_data); + case AVT_PKT_USER_DATA: + avt_decode_user_data(&bs, &p->pkt.user_data); + case AVT_PKT_STREAM_CONFIG: [[fallthrough]]; + case AVT_PKT_METADATA: + avt_decode_generic_data(&bs, &p->pkt.generic_data); + pl_bytes = p->pkt.generic_data.payload_length; break; case AVT_PKT_LUT_ICC_SEGMENT: [[fallthrough]]; case AVT_PKT_FONT_DATA_SEGMENT: [[fallthrough]]; @@ -204,8 +225,8 @@ static int stream_receive_packet(AVTProtocolCtx *p, case AVT_PKT_USER_DATA_SEGMENT: [[fallthrough]]; case AVT_PKT_STREAM_DATA_SEGMENT: [[fallthrough]]; case AVT_PKT_STREAM_CONFIG_SEGMENT: - avt_decode_generic_segment(&bs, &pkt->generic_segment); - pl_bytes = pkt->generic_segment.seg_length; + avt_decode_generic_segment(&bs, &p->pkt.generic_segment); + pl_bytes = p->pkt.generic_segment.seg_length; break; case AVT_PKT_LUT_ICC_PARITY: [[fallthrough]]; case AVT_PKT_FONT_DATA_PARITY: [[fallthrough]]; @@ -213,17 +234,25 @@ static int stream_receive_packet(AVTProtocolCtx *p, case AVT_PKT_USER_DATA_PARITY: [[fallthrough]]; case AVT_PKT_STREAM_DATA_PARITY: [[fallthrough]]; case AVT_PKT_STREAM_CONFIG_PARITY: - avt_decode_generic_parity(&bs, &pkt->generic_parity); - pl_bytes = pkt->generic_parity.parity_data_length; + avt_decode_generic_parity(&bs, &p->pkt.generic_parity); + pl_bytes = p->pkt.generic_parity.parity_data_length; break; default: avt_log(p, AVT_LOG_ERROR, "Unknown descriptor 0x%x received\n", desc); return AVT_ERROR(ENOTSUP); }; -// err = p->io->read_input(p->io_ctx, pl, pl_bytes, timeout, 0x0); -// if (err < 0) -// return err; + // TODO: pool buffer + AVTBuffer *tmp = avt_buffer_alloc(pl_bytes); + if (!tmp) + return AVT_ERROR(ENOMEM); + + avt_buffer_quick_ref(&p->pl, tmp, 0, AVT_BUFFER_REF_ALL); + avt_buffer_unref(&tmp); + + err = s->io->read_input(s->io_ctx, &p->pl, pl_bytes, timeout, 0x0); + if (err < 0) + return err; /* If there's not enough data, we'll try to do what we can with what * we get down the road */ diff --git a/libavtransport/utils_packet.h b/libavtransport/utils_packet.h index 39f87cb..9bcfc07 100644 --- a/libavtransport/utils_packet.h +++ b/libavtransport/utils_packet.h @@ -123,42 +123,45 @@ static inline void avt_packet_encode_header(AVTPktd *p) #undef GET #undef TYPE -#define avt_packet_get_duration(x, ...) \ - _Generic((x), \ - AVTPktd *: avt_packet_get_duration_d, \ - const AVTPktd *: avt_packet_get_duration_d, \ - union AVTPacketData: avt_packet_get_duration_p, \ - union AVTPacketData *: avt_packet_get_duration_pp \ +#define avt_packet_get_duration(x, ...) \ + _Generic((x), \ + AVTPktd *: avt_packet_get_duration_d, \ + const AVTPktd *: avt_packet_get_duration_d, \ + union AVTPacketData: avt_packet_get_duration_p, \ + union AVTPacketData *: avt_packet_get_duration_pp, \ + const union AVTPacketData *: avt_packet_get_duration_pp \ ) (x __VA_OPT__(,) __VA_ARGS__) -#define avt_packet_get_pts(x, ...) \ - _Generic((x), \ - AVTPktd *: avt_packet_get_pts_d, \ - const AVTPktd *: avt_packet_get_pts_d, \ - union AVTPacketData: avt_packet_get_pts_p, \ - union AVTPacketData *: avt_packet_get_pts_pp \ +#define avt_packet_get_pts(x, ...) \ + _Generic((x), \ + AVTPktd *: avt_packet_get_pts_d, \ + const AVTPktd *: avt_packet_get_pts_d, \ + union AVTPacketData: avt_packet_get_pts_p, \ + union AVTPacketData *: avt_packet_get_pts_pp, \ + const union AVTPacketData *: avt_packet_get_pts_pp \ ) (x __VA_OPT__(,) __VA_ARGS__) -#define avt_packet_get_size(x, ...) \ - _Generic((x), \ - AVTPktd *: avt_packet_get_size_d, \ - const AVTPktd *: avt_packet_get_size_d, \ - union AVTPacketData: avt_packet_get_size_p, \ - union AVTPacketData *: avt_packet_get_size_pp, \ +#define avt_packet_series(x, ...) \ + _Generic((x), \ + AVTPktd *: avt_packet_series_d, \ + const AVTPktd *: avt_packet_series_d, \ + union AVTPacketData: avt_packet_series_p, \ + union AVTPacketData *: avt_packet_series_pp, \ + const union AVTPacketData *: avt_packet_series_pp \ ) (x __VA_OPT__(,) __VA_ARGS__) -#define avt_packet_get_tb(x, ...) \ - _Generic((x), \ - AVTPktd *: avt_packet_get_tb_d, \ - const AVTPktd *: avt_packet_get_tb_d, \ - union AVTPacketData: avt_packet_get_tb_p, \ - union AVTPacketData *: avt_packet_get_tb_pp \ +#define avt_packet_get_tb(x, ...) \ + _Generic((x), \ + AVTPktd *: avt_packet_get_tb_d, \ + const AVTPktd *: avt_packet_get_tb_d, \ + union AVTPacketData: avt_packet_get_tb_p, \ + union AVTPacketData *: avt_packet_get_tb_pp, \ + const union AVTPacketData *: avt_packet_get_tb_pp \ ) (x __VA_OPT__(,) __VA_ARGS__) #define avt_packet_set_compression(x, ...) \ _Generic((x), \ AVTPktd *: avt_packet_set_compression_d, \ - const AVTPktd *: avt_packet_set_compression_d, \ union AVTPacketData: avt_packet_set_compression_p, \ union AVTPacketData *: avt_packet_set_compression_pp \ ) (x __VA_OPT__(,) __VA_ARGS__) diff --git a/libavtransport/utils_packet_template.h b/libavtransport/utils_packet_template.h index 9735b7e..555a6b0 100644 --- a/libavtransport/utils_packet_template.h +++ b/libavtransport/utils_packet_template.h @@ -47,16 +47,6 @@ static inline int64_t RENAME(avt_packet_get_pts)(const TYPE p) } } -static inline int64_t RENAME(avt_packet_get_size)(const TYPE p) -{ - switch (GET(desc)) { - case AVT_PKT_STREAM_DATA: - return GET(stream_data.data_length); - default: - return INT64_MIN; - } -} - static inline int RENAME(avt_packet_get_tb)(const TYPE p, AVTRational *tb) { switch (GET(desc)) { @@ -75,6 +65,7 @@ static inline void RENAME(avt_packet_set_compression)(TYPE p, case AVT_PKT_STREAM_DATA: GET(stream_data).pkt_compression = compression; return; + case AVT_PKT_STREAM_CONFIG: [[fallthrough]]; case AVT_PKT_METADATA: GET(generic_data).generic_data_compression = compression; return; @@ -93,6 +84,52 @@ static inline void RENAME(avt_packet_set_compression)(TYPE p, } } +static inline int RENAME(avt_packet_series)(const TYPE p, + uint32_t *off, + uint32_t *cur, + uint32_t *tot) +{ + switch (GET(desc)) { + case AVT_PKT_STREAM_DATA: + *cur = GET(stream_data).data_length; + *tot = !GET(stream_data).pkt_segmented ? GET(stream_data).data_length : 0; + *off = 0; + return GET(stream_data).pkt_segmented; + case AVT_PKT_STREAM_CONFIG: [[fallthrough]]; + case AVT_PKT_METADATA: + *cur = GET(generic_data).payload_length; + *tot = GET(generic_data).total_payload_length; + *off = 0; + return GET(generic_data).payload_length != GET(generic_data).total_payload_length; + case AVT_PKT_USER_DATA: + *cur = GET(user_data).userdata_pl_length; + *tot = GET(user_data).userdata_length; + *off = 0; + return GET(user_data).userdata_pl_length != GET(user_data).userdata_length; + case AVT_PKT_LUT_ICC: + *cur = GET(lut_icc).lut_pl_length; + *tot = GET(lut_icc).lut_data_length; + *off = 0; + return GET(lut_icc).lut_pl_length != GET(lut_icc).lut_data_length; + case AVT_PKT_FONT_DATA: + *cur = GET(font_data).font_pl_length; + *tot = GET(font_data).font_data_length; + *off = 0; + return GET(font_data).font_pl_length != GET(font_data).font_data_length; + case AVT_PKT_METADATA_SEGMENT: [[fallthrough]]; + case AVT_PKT_USER_DATA_SEGMENT: [[fallthrough]]; + case AVT_PKT_LUT_ICC_SEGMENT: [[fallthrough]]; + case AVT_PKT_FONT_DATA_SEGMENT: [[fallthrough]]; + case AVT_PKT_STREAM_DATA_SEGMENT: + *cur = GET(generic_segment).seg_length; + *tot = GET(generic_segment).pkt_total_data; + *off = GET(generic_segment).seg_offset; + return -1; + default: + return 0; + } +} + static inline void RENAME(avt_packet_change_size)(TYPE p, uint32_t seg_offset, uint32_t seg_length, @@ -103,6 +140,7 @@ static inline void RENAME(avt_packet_change_size)(TYPE p, GET(stream_data).data_length = seg_length; GET(stream_data).pkt_segmented = seg_length < tot_pl_size; return; + case AVT_PKT_STREAM_CONFIG: [[fallthrough]]; case AVT_PKT_METADATA: GET(generic_data).payload_length = seg_length; GET(generic_data).total_payload_length = tot_pl_size;