diff --git a/draft-avtransport-spec.bs b/draft-avtransport-spec.bs index 9edd77a..71b19ff 100644 --- a/draft-avtransport-spec.bs +++ b/draft-avtransport-spec.bs @@ -1166,6 +1166,10 @@ be different for segments that finalize the data. The [=header_7=] field can be used to reconstruct the header of the very first packet in order to determine the timestamps and data type. +Note: Segments are forbidden from partially overlapping. Each segment's offset +and size must either completely overlap with another segment (such as when retransmitting +lost packets), or must fit in between two segments perfectly. + ### Generic data parity ### {#generic-data-parity-packets} diff --git a/libavtransport/merger.c b/libavtransport/merger.c index 34ee2fd..1f5147b 100644 --- a/libavtransport/merger.c +++ b/libavtransport/merger.c @@ -36,7 +36,7 @@ #include "utils_packet.h" #include "mem.h" -static inline int fill_ranges(AVTMerger *m, bool is_parity, +static inline int fill_ranges(AVTMerger *m, bool is_parity, uint32_t *seg_data_off, uint32_t seg_off, uint32_t seg_size) { AVTMergerRange *ranges; @@ -58,9 +58,14 @@ static inline int fill_ranges(AVTMerger *m, bool is_parity, int dst_idx = 0; int consolidate_idx = -1; + + /* Attempt to work the range into the existing ranges */ for (auto i = 0; i < nb_ranges; i++) { AVTMergerRange *r = &ranges[i]; + /* Overlaps dealt with in validate_packet */ if ((seg_off + seg_size) == r->offset) { + /* Potential new segment's end reaches another segment exactly */ + /* Prepend to existing range */ r->offset = seg_off; r->size += seg_size; @@ -72,6 +77,8 @@ static inline int fill_ranges(AVTMerger *m, bool is_parity, consolidate_idx = i - 1; break; } else if ((r->offset + r->size) == seg_off) { + /* Potential new segment's start reaches another segment's start exactly */ + /* Extend exisint range */ r->size += seg_size; @@ -81,6 +88,7 @@ static inline int fill_ranges(AVTMerger *m, bool is_parity, consolidate_idx = i; break; } else if (seg_off > (r->offset + r->size)) { + /* New segment. Pick its index here for ordering. */ dst_idx = i + 1; } } @@ -178,33 +186,101 @@ static inline int fill_phantom_header(void *log_ctx, AVTMerger *m, return 0; } +static int validate_packet(void *log_ctx, AVTMerger *m, AVTPktd *p, int srs, + uint32_t seg_off, uint32_t seg_size, + uint32_t tot_size, bool is_parity) +{ + /* Check for length mismatch */ + if (m->target_tot_len && (tot_size != m->target_tot_len)) + return AVT_ERROR(EINVAL); + + /* Check for segmentation issues */ + if (srs == 0 && m->target == p->pkt.seq) { + avt_log(log_ctx, AVT_LOG_DEBUG, "Header packet for %" PRIu64 " indicates " + "no segmentation, but segments received", + p->pkt.seq); + return AVT_ERROR(EINVAL); + } + + /* Check for phantom header mismatch */; + const uint32_t hdr_part = p->pkt.seq % 7; + if (m->hdr_mask & (1 << (6 - hdr_part))) { + const uint8_t *hdr_part_data = &m->p.hdr[4*hdr_part]; + for (auto i = 0; i < 4; i++) + if (hdr_part_data[i] != p->pkt.generic_segment.header_7[i]) + return AVT_ERROR(EINVAL); + } + + /* Check for overlaps */ + AVTMergerRange *ranges; + uint32_t nb_ranges; + if (!is_parity) { + ranges = m->ranges; + nb_ranges = m->nb_ranges; + } else { + ranges = m->parity_ranges; + nb_ranges = m->nb_parity_ranges; + } + + for (auto i = 0; i < nb_ranges; i++) { + AVTMergerRange *r = &ranges[i]; + /* Check for overlaps first */ + if ((r->offset + r->size) < seg_off && + r->offset >= seg_off) { + /* Segment's start overlaps with another segment */ + return AVT_ERROR(EINVAL); + } else if ((seg_off + seg_size) > r->offset && + (seg_off + seg_size) <= (r->offset + r->size)) { + /* Segment's end overlaps with another segment */ + return AVT_ERROR(EINVAL); + } + } + + return 0; +} + int avt_pkt_merge_seg(void *log_ctx, AVTMerger *m, AVTPktd *p) { + int ret; + bool is_parity; - uint32_t seg_off, seg_size, tot_size; - int ret = avt_packet_series(p, &is_parity, &seg_off, &seg_size, &tot_size); + uint32_t seg_off, seg_size, tot_size, seg_data_off; + int srs = avt_packet_series(p, &is_parity, &seg_off, &seg_size, &tot_size); /* Packet needs nothing else */ - if (!ret) + if (!srs) return 0; size_t src_size; uint8_t *src = avt_buffer_get_data(&p->pl, &src_size); if (src_size != seg_size) { avt_log(log_ctx, AVT_LOG_ERROR, "Mismatch between signalled payload size " - "%u and payload %zu in %" PRIu64, + "%u and actual payload %zu in pkt %" PRIu64 "\n", seg_size, src_size, p->pkt.seq); + return AVT_ERROR(EINVAL); } - if (!m->active) { + if (m->active) { + uint32_t target; + if (srs > 0) + target = p->pkt.generic_segment.target_seq; + else if (!is_parity) + target = p->pkt.generic_segment.target_seq; + else + target = p->pkt.generic_parity.target_seq; + + if (target != m->target) + return AVT_ERROR(EBUSY); + } else { m->hdr_mask = 0x0; m->nb_ranges = 0; m->nb_parity_ranges = 0; m->pkt_len_track = 0; m->target_tot_len = tot_size; + m->nb_tgt_packets = !!tot_size; m->p_avail = false; - if (ret > 0) { + if (srs > 0) { /* Starting with a segment, not the actual start */ m->p = *p; m->p_avail = true; @@ -263,24 +339,25 @@ int avt_pkt_merge_seg(void *log_ctx, AVTMerger *m, AVTPktd *p) if (!dst) return AVT_ERROR(ENOMEM); - memcpy(dst + seg_off, src, src_size); + memcpy(dst + seg_off, src, seg_size); avt_buffer_quick_unref(&p->pl); *target = tmp_buf; } else { /* Resize the buffer if possible */ - int err = avt_buffer_resize(&p->pl, tot_size); - if (err < 0) - return err; + ret = avt_buffer_resize(&p->pl, tot_size); + if (ret < 0) + return ret; *target = p->pl; + p->pl = (AVTBuffer){ }; } /* Mark what we have available */ if (!is_parity) { - m->ranges[m->nb_ranges++] = (AVTMergerRange) { seg_off, src_size }; + m->ranges[m->nb_ranges++] = (AVTMergerRange) { seg_off, seg_size }; m->pkt_len_track += seg_size; } else { - m->parity_ranges[m->nb_parity_ranges++] = (AVTMergerRange) { seg_off, src_size }; + m->parity_ranges[m->nb_parity_ranges++] = (AVTMergerRange) { seg_off, seg_size }; m->pkt_parity_len_track += seg_size; } @@ -289,46 +366,54 @@ int avt_pkt_merge_seg(void *log_ctx, AVTMerger *m, AVTPktd *p) return AVT_ERROR(EAGAIN); } - /* Sanity checking */ - if (ret == 0 && m->target == p->pkt.seq) { - avt_log(log_ctx, AVT_LOG_WARN, "Header packet for %" PRIu64 " indicates " - "no segmentation, but segments received", - p->pkt.seq); - } + /* More sanity checking */ + ret = validate_packet(log_ctx, m, p, srs, + seg_off, seg_size, tot_size, is_parity); + if (ret < 0) + return ret; /* Packet header state */ - if (!m->p_avail && (ret < -1)) { + if (!m->p_avail && (srs < -1)) { ret = fill_phantom_header(log_ctx, m, p, is_parity); if (ret < 0) return ret; - } else if (ret == 0) { - /* We got a real header packet */ + } else if (srs == 1) { + /* We got a real header packet which we didn't have */ m->p.pkt = p->pkt; memcpy(m->p.hdr, p->hdr, sizeof(p->hdr)); m->p_avail = true; } + /* If total packet size was unknown, populate it and reallocate. + * Parity packets always have a total length */ + if (!is_parity && !m->target_tot_len) { + avt_assert1(tot_size); + + ret = avt_buffer_resize(&m->p.pl, tot_size); + if (ret < 0) + return ret; + + m->target_tot_len = tot_size; + } + + /* Track ranges */ + ret = fill_ranges(m, is_parity, &seg_data_off, seg_off, seg_size); + if (ret < 0) + return ret; + /* Copy new data */ if (!is_parity) { uint8_t *dst = avt_buffer_get_data(&m->p.pl, NULL); - memcpy(dst + seg_off, src, src_size); - avt_buffer_quick_unref(&p->pl); - m->pkt_len_track += seg_size; + memcpy(dst + seg_off, src + seg_data_off, seg_size); } else { uint8_t *dst = avt_buffer_get_data(&m->parity, NULL); - memcpy(dst + seg_off, src, src_size); - avt_buffer_quick_unref(&p->pl); - m->pkt_parity_len_track += seg_size; + memcpy(dst + seg_off, src + seg_data_off, seg_size); } - /* Track ranges */ - ret = fill_ranges(m, is_parity, seg_off, src_size /* Using the real payload size*/); - if (ret < 0) - return ret; + avt_buffer_quick_unref(&p->pl); if (m->pkt_parity_len_track == m->parity_tot_len) { // TODO: do something with the parity data here - ; } /* We can output something */ @@ -338,9 +423,42 @@ int avt_pkt_merge_seg(void *log_ctx, AVTMerger *m, AVTPktd *p) * contain any payload. */ avt_assert1(m->p_avail); *p = m->p; + m->p = (AVTPktd){ }; m->active = false; - return 0; + return m->pkt_len_track; } return AVT_ERROR(EAGAIN); } + +int avt_pkt_merge_force(void *log_ctx, AVTMerger *m, AVTPktd *p) +{ + /* If inactive, we don't have anything */ + if (!m->active) + return AVT_ERROR(ENOENT); + + /* If we couldn't collect enough header parts, we can't really output + * something worth propagating either. */ + if (!(m->hdr_mask == 0x7F)) + return AVT_ERROR(EAGAIN); + + *p = m->p; + m->p = (AVTPktd){ }; + + return m->pkt_len_track; +} + +void avt_pkt_merge_done(AVTMerger *m) +{ + avt_buffer_quick_unref(&m->p.pl); + m->active = false; +} + +void avt_pkt_merge_free(AVTMerger *m) +{ + avt_pkt_merge_done(m); + free(m->ranges); + free(m->parity_ranges); + avt_buffer_quick_unref(&m->parity); + memset(m, 0, sizeof(*m)); +} diff --git a/libavtransport/merger.h b/libavtransport/merger.h index 9a8c1c7..d973afd 100644 --- a/libavtransport/merger.h +++ b/libavtransport/merger.h @@ -42,27 +42,45 @@ typedef struct AVTMerger { uint8_t hdr_mask; /* 1 bit per 32-bits, 7th bit first, 0th bit last */ uint8_t _padding_; - /* Data */ + /* Total number of packet with known final size accepted */ + uint32_t nb_tgt_packets; + + /* Packet data */ AVTPktd p; uint32_t pkt_len_track; uint32_t target_tot_len; - - /* Parity */ - AVTBuffer parity; - uint32_t pkt_parity_len_track; - uint32_t parity_tot_len; - - /* Data ranges */ + /* Packet data ranges */ AVTMergerRange *ranges; uint32_t nb_ranges; uint32_t ranges_allocated; - /* Parity ranges */ + /* Parity data for the packet */ + AVTBuffer parity; // Preserved + uint32_t pkt_parity_len_track; + uint32_t parity_tot_len; + /* Parity date ranges */ AVTMergerRange *parity_ranges; uint32_t nb_parity_ranges; uint32_t parity_ranges_allocated; } AVTMerger; +/* Basic merger function. Input and output is 'p'. + * Returns the payload size once an output is possible. + * Returns AVT_ERROR(EAGAIN) if more segments are needed. + * Returns AVT_ERROR(EBUSY) if a packet belonging to a different target is + * given. + * Returns an error code in all other circumstances. */ int avt_pkt_merge_seg(void *log_ctx, AVTMerger *m, AVTPktd *p); +/* Force whatever output is possible out. p will be overwritten. */ +int avt_pkt_merge_force(void *log_ctx, AVTMerger *m, AVTPktd *p); + +/* avt_pkt_merge_seg() will reject any packet part of another group. + * If there's a packet which cannot be output, call this to reset the context. */ +void avt_pkt_merge_done(AVTMerger *m); + +/* Free up and reset the context upon uninitialization entirely. + * Frees up the parity data buffer as well. */ +void avt_pkt_merge_free(AVTMerger *m); + #endif /* AVTRANSPORT_MERGER_H */ diff --git a/libavtransport/rational.c b/libavtransport/rational.c index 877487b..a4d19ad 100644 --- a/libavtransport/rational.c +++ b/libavtransport/rational.c @@ -52,7 +52,7 @@ static inline rational_int rescale_rnd(rational_int a, return INT64_MIN; if (a < 0) - return -(uint64_t)rescale_rnd(-AVT_MAX(a, -INT64_MAX), b, c, rnd ^ ((rnd >> 1) & 1)); + return -(rational_int)rescale_rnd(-AVT_MAX(a, -INT64_MAX), b, c, rnd ^ ((rnd >> 1) & 1)); if (rnd == AVT_ROUND_NEAR_INF) r = c / 2; diff --git a/libavtransport/tests/meson.build b/libavtransport/tests/meson.build index f42f650..6868d0f 100644 --- a/libavtransport/tests/meson.build +++ b/libavtransport/tests/meson.build @@ -42,6 +42,16 @@ ldpc_encode_test = executable('ldpc_encode', ) test('LDPC encoding', ldpc_encode_test) +## Misc tests +## ========== +merger_test = executable('merger', + sources : [ 'merger.c' ], + include_directories : [ '../' ], + objects : [ avtransport_lib.extract_objects([ 'merger.c', 'buffer.c' ]) ], + dependencies : [ avtransport_dep ], +) +test('Packet merging', merger_test) + ## Packet encode/decode primitives tests ## ===================================== packet_encode_decode_test = executable('packet_encode_decode', diff --git a/libavtransport/utils_internal.h b/libavtransport/utils_internal.h index 1499578..82680de 100644 --- a/libavtransport/utils_internal.h +++ b/libavtransport/utils_internal.h @@ -31,6 +31,7 @@ #include #include #include +#include #include @@ -68,6 +69,9 @@ static inline int avt_ascii_to_int(int c) return -1; } +#define avt_hamming_dist(a, b) \ + stdc_count_ones((a) ^ (b)) + /* Zero (usually) alloc FIFO. Payload is ref'd, and leaves with a ref. */ typedef struct AVTPacketFifo { AVTPktd *data; @@ -132,7 +136,7 @@ size_t avt_pkt_fifo_size(AVTPacketFifo *fifo); /* Clear all packets in the fifo */ void avt_pkt_fifo_clear(AVTPacketFifo *fifo); -/* Free all resources */ +/* Free all resources in/for a fifo */ void avt_pkt_fifo_free(AVTPacketFifo *fifo); /* Sliding window */