Skip to content

Commit

Permalink
Finalize packet merging
Browse files Browse the repository at this point in the history
The logic is that the reoder code is going to just construct more
merger instances when a merger instance rejects a packet.
  • Loading branch information
cyanreg committed May 24, 2024
1 parent 01826c7 commit 3f09ce8
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 45 deletions.
4 changes: 4 additions & 0 deletions draft-avtransport-spec.bs
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,10 @@ be different for segments that finalize the data.
The [=header_7=] field can be used to reconstruct the header of the very first
packet in order to determine the timestamps and data type.

Note: Segments are <b>forbidden</b> from partially overlapping. Each segment's offset
and size must either completely overlap with another segment (such as when retransmitting
lost packets), or must fit in between two segments perfectly.


### Generic data parity ### {#generic-data-parity-packets}

Expand Down
186 changes: 152 additions & 34 deletions libavtransport/merger.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
#include "utils_packet.h"
#include "mem.h"

static inline int fill_ranges(AVTMerger *m, bool is_parity,
static inline int fill_ranges(AVTMerger *m, bool is_parity, uint32_t *seg_data_off,
uint32_t seg_off, uint32_t seg_size)
{
AVTMergerRange *ranges;
Expand All @@ -58,9 +58,14 @@ static inline int fill_ranges(AVTMerger *m, bool is_parity,

int dst_idx = 0;
int consolidate_idx = -1;

/* Attempt to work the range into the existing ranges */
for (auto i = 0; i < nb_ranges; i++) {
AVTMergerRange *r = &ranges[i];
/* Overlaps dealt with in validate_packet */
if ((seg_off + seg_size) == r->offset) {
/* Potential new segment's end reaches another segment exactly */

/* Prepend to existing range */
r->offset = seg_off;
r->size += seg_size;
Expand All @@ -72,6 +77,8 @@ static inline int fill_ranges(AVTMerger *m, bool is_parity,
consolidate_idx = i - 1;
break;
} else if ((r->offset + r->size) == seg_off) {
/* Potential new segment's start reaches another segment's start exactly */

/* Extend exisint range */
r->size += seg_size;

Expand All @@ -81,6 +88,7 @@ static inline int fill_ranges(AVTMerger *m, bool is_parity,
consolidate_idx = i;
break;
} else if (seg_off > (r->offset + r->size)) {
/* New segment. Pick its index here for ordering. */
dst_idx = i + 1;
}
}
Expand Down Expand Up @@ -178,33 +186,101 @@ static inline int fill_phantom_header(void *log_ctx, AVTMerger *m,
return 0;
}

static int validate_packet(void *log_ctx, AVTMerger *m, AVTPktd *p, int srs,
uint32_t seg_off, uint32_t seg_size,
uint32_t tot_size, bool is_parity)
{
/* Check for length mismatch */
if (m->target_tot_len && (tot_size != m->target_tot_len))
return AVT_ERROR(EINVAL);

/* Check for segmentation issues */
if (srs == 0 && m->target == p->pkt.seq) {
avt_log(log_ctx, AVT_LOG_DEBUG, "Header packet for %" PRIu64 " indicates "
"no segmentation, but segments received",
p->pkt.seq);
return AVT_ERROR(EINVAL);
}

/* Check for phantom header mismatch */;
const uint32_t hdr_part = p->pkt.seq % 7;
if (m->hdr_mask & (1 << (6 - hdr_part))) {
const uint8_t *hdr_part_data = &m->p.hdr[4*hdr_part];
for (auto i = 0; i < 4; i++)
if (hdr_part_data[i] != p->pkt.generic_segment.header_7[i])
return AVT_ERROR(EINVAL);
}

/* Check for overlaps */
AVTMergerRange *ranges;
uint32_t nb_ranges;
if (!is_parity) {
ranges = m->ranges;
nb_ranges = m->nb_ranges;
} else {
ranges = m->parity_ranges;
nb_ranges = m->nb_parity_ranges;
}

for (auto i = 0; i < nb_ranges; i++) {
AVTMergerRange *r = &ranges[i];
/* Check for overlaps first */
if ((r->offset + r->size) < seg_off &&
r->offset >= seg_off) {
/* Segment's start overlaps with another segment */
return AVT_ERROR(EINVAL);
} else if ((seg_off + seg_size) > r->offset &&
(seg_off + seg_size) <= (r->offset + r->size)) {
/* Segment's end overlaps with another segment */
return AVT_ERROR(EINVAL);
}
}

return 0;
}

int avt_pkt_merge_seg(void *log_ctx, AVTMerger *m, AVTPktd *p)
{
int ret;

bool is_parity;
uint32_t seg_off, seg_size, tot_size;
int ret = avt_packet_series(p, &is_parity, &seg_off, &seg_size, &tot_size);
uint32_t seg_off, seg_size, tot_size, seg_data_off;
int srs = avt_packet_series(p, &is_parity, &seg_off, &seg_size, &tot_size);

/* Packet needs nothing else */
if (!ret)
if (!srs)
return 0;

size_t src_size;
uint8_t *src = avt_buffer_get_data(&p->pl, &src_size);
if (src_size != seg_size) {
avt_log(log_ctx, AVT_LOG_ERROR, "Mismatch between signalled payload size "
"%u and payload %zu in %" PRIu64,
"%u and actual payload %zu in pkt %" PRIu64 "\n",
seg_size, src_size, p->pkt.seq);
return AVT_ERROR(EINVAL);
}

if (!m->active) {
if (m->active) {
uint32_t target;
if (srs > 0)
target = p->pkt.generic_segment.target_seq;
else if (!is_parity)
target = p->pkt.generic_segment.target_seq;
else
target = p->pkt.generic_parity.target_seq;

if (target != m->target)
return AVT_ERROR(EBUSY);
} else {
m->hdr_mask = 0x0;
m->nb_ranges = 0;
m->nb_parity_ranges = 0;
m->pkt_len_track = 0;
m->target_tot_len = tot_size;
m->nb_tgt_packets = !!tot_size;
m->p_avail = false;

if (ret > 0) {
if (srs > 0) {
/* Starting with a segment, not the actual start */
m->p = *p;
m->p_avail = true;
Expand Down Expand Up @@ -263,24 +339,25 @@ int avt_pkt_merge_seg(void *log_ctx, AVTMerger *m, AVTPktd *p)
if (!dst)
return AVT_ERROR(ENOMEM);

memcpy(dst + seg_off, src, src_size);
memcpy(dst + seg_off, src, seg_size);
avt_buffer_quick_unref(&p->pl);
*target = tmp_buf;
} else {
/* Resize the buffer if possible */
int err = avt_buffer_resize(&p->pl, tot_size);
if (err < 0)
return err;
ret = avt_buffer_resize(&p->pl, tot_size);
if (ret < 0)
return ret;

*target = p->pl;
p->pl = (AVTBuffer){ };
}

/* Mark what we have available */
if (!is_parity) {
m->ranges[m->nb_ranges++] = (AVTMergerRange) { seg_off, src_size };
m->ranges[m->nb_ranges++] = (AVTMergerRange) { seg_off, seg_size };
m->pkt_len_track += seg_size;
} else {
m->parity_ranges[m->nb_parity_ranges++] = (AVTMergerRange) { seg_off, src_size };
m->parity_ranges[m->nb_parity_ranges++] = (AVTMergerRange) { seg_off, seg_size };
m->pkt_parity_len_track += seg_size;
}

Expand All @@ -289,46 +366,54 @@ int avt_pkt_merge_seg(void *log_ctx, AVTMerger *m, AVTPktd *p)
return AVT_ERROR(EAGAIN);
}

/* Sanity checking */
if (ret == 0 && m->target == p->pkt.seq) {
avt_log(log_ctx, AVT_LOG_WARN, "Header packet for %" PRIu64 " indicates "
"no segmentation, but segments received",
p->pkt.seq);
}
/* More sanity checking */
ret = validate_packet(log_ctx, m, p, srs,
seg_off, seg_size, tot_size, is_parity);
if (ret < 0)
return ret;

/* Packet header state */
if (!m->p_avail && (ret < -1)) {
if (!m->p_avail && (srs < -1)) {
ret = fill_phantom_header(log_ctx, m, p, is_parity);
if (ret < 0)
return ret;
} else if (ret == 0) {
/* We got a real header packet */
} else if (srs == 1) {
/* We got a real header packet which we didn't have */
m->p.pkt = p->pkt;
memcpy(m->p.hdr, p->hdr, sizeof(p->hdr));
m->p_avail = true;
}

/* If total packet size was unknown, populate it and reallocate.
* Parity packets always have a total length */
if (!is_parity && !m->target_tot_len) {
avt_assert1(tot_size);

ret = avt_buffer_resize(&m->p.pl, tot_size);
if (ret < 0)
return ret;

m->target_tot_len = tot_size;
}

/* Track ranges */
ret = fill_ranges(m, is_parity, &seg_data_off, seg_off, seg_size);
if (ret < 0)
return ret;

/* Copy new data */
if (!is_parity) {
uint8_t *dst = avt_buffer_get_data(&m->p.pl, NULL);
memcpy(dst + seg_off, src, src_size);
avt_buffer_quick_unref(&p->pl);
m->pkt_len_track += seg_size;
memcpy(dst + seg_off, src + seg_data_off, seg_size);
} else {
uint8_t *dst = avt_buffer_get_data(&m->parity, NULL);
memcpy(dst + seg_off, src, src_size);
avt_buffer_quick_unref(&p->pl);
m->pkt_parity_len_track += seg_size;
memcpy(dst + seg_off, src + seg_data_off, seg_size);
}

/* Track ranges */
ret = fill_ranges(m, is_parity, seg_off, src_size /* Using the real payload size*/);
if (ret < 0)
return ret;
avt_buffer_quick_unref(&p->pl);

if (m->pkt_parity_len_track == m->parity_tot_len) {
// TODO: do something with the parity data here
;
}

/* We can output something */
Expand All @@ -338,9 +423,42 @@ int avt_pkt_merge_seg(void *log_ctx, AVTMerger *m, AVTPktd *p)
* contain any payload. */
avt_assert1(m->p_avail);
*p = m->p;
m->p = (AVTPktd){ };
m->active = false;
return 0;
return m->pkt_len_track;
}

return AVT_ERROR(EAGAIN);
}

int avt_pkt_merge_force(void *log_ctx, AVTMerger *m, AVTPktd *p)
{
/* If inactive, we don't have anything */
if (!m->active)
return AVT_ERROR(ENOENT);

/* If we couldn't collect enough header parts, we can't really output
* something worth propagating either. */
if (!(m->hdr_mask == 0x7F))
return AVT_ERROR(EAGAIN);

*p = m->p;
m->p = (AVTPktd){ };

return m->pkt_len_track;
}

void avt_pkt_merge_done(AVTMerger *m)
{
avt_buffer_quick_unref(&m->p.pl);
m->active = false;
}

void avt_pkt_merge_free(AVTMerger *m)
{
avt_pkt_merge_done(m);
free(m->ranges);
free(m->parity_ranges);
avt_buffer_quick_unref(&m->parity);
memset(m, 0, sizeof(*m));
}
36 changes: 27 additions & 9 deletions libavtransport/merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,45 @@ typedef struct AVTMerger {
uint8_t hdr_mask; /* 1 bit per 32-bits, 7th bit first, 0th bit last */
uint8_t _padding_;

/* Data */
/* Total number of packet with known final size accepted */
uint32_t nb_tgt_packets;

/* Packet data */
AVTPktd p;
uint32_t pkt_len_track;
uint32_t target_tot_len;

/* Parity */
AVTBuffer parity;
uint32_t pkt_parity_len_track;
uint32_t parity_tot_len;

/* Data ranges */
/* Packet data ranges */
AVTMergerRange *ranges;
uint32_t nb_ranges;
uint32_t ranges_allocated;

/* Parity ranges */
/* Parity data for the packet */
AVTBuffer parity; // Preserved
uint32_t pkt_parity_len_track;
uint32_t parity_tot_len;
/* Parity date ranges */
AVTMergerRange *parity_ranges;
uint32_t nb_parity_ranges;
uint32_t parity_ranges_allocated;
} AVTMerger;

/* Basic merger function. Input and output is 'p'.
* Returns the payload size once an output is possible.
* Returns AVT_ERROR(EAGAIN) if more segments are needed.
* Returns AVT_ERROR(EBUSY) if a packet belonging to a different target is
* given.
* Returns an error code in all other circumstances. */
int avt_pkt_merge_seg(void *log_ctx, AVTMerger *m, AVTPktd *p);

/* Force whatever output is possible out. p will be overwritten. */
int avt_pkt_merge_force(void *log_ctx, AVTMerger *m, AVTPktd *p);

/* avt_pkt_merge_seg() will reject any packet part of another group.
* If there's a packet which cannot be output, call this to reset the context. */
void avt_pkt_merge_done(AVTMerger *m);

/* Free up and reset the context upon uninitialization entirely.
* Frees up the parity data buffer as well. */
void avt_pkt_merge_free(AVTMerger *m);

#endif /* AVTRANSPORT_MERGER_H */
2 changes: 1 addition & 1 deletion libavtransport/rational.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ static inline rational_int rescale_rnd(rational_int a,
return INT64_MIN;

if (a < 0)
return -(uint64_t)rescale_rnd(-AVT_MAX(a, -INT64_MAX), b, c, rnd ^ ((rnd >> 1) & 1));
return -(rational_int)rescale_rnd(-AVT_MAX(a, -INT64_MAX), b, c, rnd ^ ((rnd >> 1) & 1));

if (rnd == AVT_ROUND_NEAR_INF)
r = c / 2;
Expand Down
10 changes: 10 additions & 0 deletions libavtransport/tests/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ ldpc_encode_test = executable('ldpc_encode',
)
test('LDPC encoding', ldpc_encode_test)

## Misc tests
## ==========
merger_test = executable('merger',
sources : [ 'merger.c' ],
include_directories : [ '../' ],
objects : [ avtransport_lib.extract_objects([ 'merger.c', 'buffer.c' ]) ],
dependencies : [ avtransport_dep ],
)
test('Packet merging', merger_test)

## Packet encode/decode primitives tests
## =====================================
packet_encode_decode_test = executable('packet_encode_decode',
Expand Down
Loading

0 comments on commit 3f09ce8

Please sign in to comment.