Skip to content

Commit

Permalink
Complete most of parsing, add packet merger
Browse files Browse the repository at this point in the history
Now all that's left to get basic file parsing is
to channel the read and completed packets back through
connection.c, and into input.c.
  • Loading branch information
cyanreg committed May 4, 2024
1 parent 9e43788 commit 6f853e4
Show file tree
Hide file tree
Showing 21 changed files with 605 additions and 244 deletions.
21 changes: 7 additions & 14 deletions libavtransport/buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,12 @@ AVT_API AVTBuffer *avt_buffer_create(uint8_t *data, size_t len,
if (!buf)
return NULL;

buf->refcnt = malloc(sizeof(*buf->refcnt));
if (!buf->refcnt) {
int err = avt_buffer_quick_create(buf, data, len, opaque, free_cb, 0);
if (err < 0) {
free(buf);
return NULL;
}

atomic_init(buf->refcnt, 1);

buf->base_data = data;
buf->end_data = data + len;
buf->data = data;
buf->len = len;
buf->opaque = opaque;
if (!free_cb)
buf->free = avt_buffer_default_free;
else
buf->free = free_cb;

return buf;
}

Expand Down Expand Up @@ -184,6 +172,11 @@ size_t avt_buffer_get_data_len(const AVTBuffer *buf)
return buf->len;
}

bool avt_buffer_read_only(AVTBuffer *buffer)
{
return !!(buffer->flags & AVT_BUFFER_FLAG_READ_ONLY);
}

void avt_buffer_unref(AVTBuffer **_buf)
{
AVTBuffer *buf = *_buf;
Expand Down
53 changes: 45 additions & 8 deletions libavtransport/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
#include <avtransport/utils.h>
#include "attributes.h"

enum AVTBufferFlags {
AVT_BUFFER_FLAG_READ_ONLY = 1 << 0,
};

struct AVTBuffer {
uint8_t *data; /* Current ref's view of the buffer */
size_t len; /* Current ref's size of the view of the buffer */
Expand All @@ -43,12 +47,53 @@ struct AVTBuffer {

avt_free_cb free;
void *opaque;
enum AVTBufferFlags flags;
atomic_int *refcnt;
};

void avt_buffer_update(AVTBuffer *buf, void *data, size_t len);
int avt_buffer_resize(AVTBuffer *buf, size_t len);

static inline int avt_buffer_quick_create(AVTBuffer *buf, uint8_t *data,
size_t len, void *opaque,
avt_free_cb free_cb,
enum AVTBufferFlags flags)
{
buf->refcnt = malloc(sizeof(*buf->refcnt));
if (!buf->refcnt)
return AVT_ERROR(ENOMEM);

atomic_init(buf->refcnt, 1);

buf->base_data = data;
buf->end_data = data + len;
buf->data = data;
buf->len = len;
buf->opaque = opaque;
if (!free_cb)
buf->free = avt_buffer_default_free;
else
buf->free = free_cb;

return 0;
}

static inline uint8_t *avt_buffer_quick_alloc(AVTBuffer *buf, size_t len)
{
void *alloc = malloc(len);
if (!alloc)
return NULL;

int err = avt_buffer_quick_create(buf, alloc, len, NULL,
avt_buffer_default_free , 0);
if (err < 0) {
free(alloc);
return NULL;
}

return alloc;
}

static inline void avt_buffer_quick_unref(AVTBuffer *buf)
{
if (!buf || !buf->refcnt)
Expand Down Expand Up @@ -80,14 +125,6 @@ static inline void avt_buffer_quick_ref(AVTBuffer *dst, AVTBuffer *buf,
dst->len = (len == AVT_BUFFER_REF_ALL) ? (dst->end_data - dst->data) : len;
}

static inline void avt_buffer_move(AVTBuffer *dst, AVTBuffer **src)
{
/* Move a reference to an already existing ref */
avt_buffer_quick_unref(dst);
memcpy(dst, *src, sizeof(*dst));
free(src);
}

int avt_buffer_offset(AVTBuffer *buf, ptrdiff_t offset);

#endif
5 changes: 4 additions & 1 deletion libavtransport/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,11 @@ int avt_connection_create(AVTContext *ctx, AVTConnection **_conn,
goto fail;

/* Protocol init */
AVTProtocolOpts opts = {
.ldpc_iterations = info->input_opts.ldpc_iterations,
};
ret = avt_protocol_init(ctx, &conn->p, &conn->p_ctx, &conn->addr,
conn->io, conn->io_ctx);
conn->io, conn->io_ctx, &opts);
if (ret < 0)
goto fail;

Expand Down
16 changes: 5 additions & 11 deletions libavtransport/include/avtransport/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,7 @@ typedef struct AVTConnectionInfo {
* data with a particular offset. Returns the offset after
* reading, or a negative error.
* The buffer `data` is set to a buffer with the data requested.
*
* Note, `data` MAY be non-NULL. In which case, it will contain
* data from the previous read. The buffer must be expanded to
* include `len` more bytes. Only the data matters, the buffer
* may be recreated. */
* Ownership of the buffer is not transferred. */
int64_t (*read)(void *opaque, AVTBuffer **data,
size_t len, int64_t offset);

Expand Down Expand Up @@ -207,15 +203,13 @@ typedef struct AVTConnectionInfo {

/* Input options */
struct {
/* Probe data when opening.
* avt_connection_create() will block until the first packet is read. */
bool probe;

/* Buffer size limit. Zero means automatic. Approximate/best effort. */
size_t buffer;

/* Always enable decoding and correction with LDPC codes. */
bool force_ldpc;
/* Controls the number of iterations for LDPC decoding. Zero means
* automatic (very rarely), higher values increase overhead
* and decrease potential errors, negative values disables decoding. */
int ldpc_iterations;
} input_opts;

struct {
Expand Down
2 changes: 1 addition & 1 deletion libavtransport/include/avtransport/send.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ typedef struct AVTSenderOptions {
/* Compression level. Zero means automatic (the default for each compressor). */
int compress_level;

/* Set to true to enable sending hash packets. */
/* Set to true to enable sending hash packets for all packets with a payload. */
bool hash;
} AVTSenderOptions;

Expand Down
3 changes: 3 additions & 0 deletions libavtransport/include/avtransport/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ AVT_API void avt_buffer_default_free(void *opaque, void *base_data, size_t len);
AVT_API AVTBuffer *avt_buffer_create(uint8_t *data, size_t len,
void *opaque, avt_free_cb free);

/* Check if buffer is writable */
AVT_API bool avt_buffer_read_only(AVTBuffer *buffer);

/* Create and allocate a reference counted buffer. */
AVT_API AVTBuffer *avt_buffer_alloc(size_t len);

Expand Down
3 changes: 1 addition & 2 deletions libavtransport/io_dcb.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,8 @@ static int64_t dcb_input(AVTIOCtx *io, AVTBuffer *buf, size_t len,
uint8_t *src_data = avt_buffer_get_data(tmp, &src_len);

memcpy(dst_data, src_data, AVT_MIN(len, src_len));
avt_buffer_unref(&tmp);
} else {
avt_buffer_move(buf, &tmp);
avt_buffer_quick_ref(buf, tmp, 0, AVT_BUFFER_REF_ALL);
}

return (io->rpos = ret);
Expand Down
55 changes: 29 additions & 26 deletions libavtransport/io_mmap.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

struct AVTIOCtx {
int fd;
AVTBuffer *map;
AVTBuffer map;

avt_pos rpos;
avt_pos wpos;
Expand All @@ -63,7 +63,7 @@ static void mmap_buffer_free(void *opaque, void *base_data, size_t size)
static COLD int mmap_close(AVTIOCtx **_io)
{
AVTIOCtx *io = *_io;
avt_buffer_unref(&io->map);
avt_buffer_quick_unref(&io->map);
if (io->file_grew)
ftruncate(io->fd, io->wpos);
close(io->fd);
Expand Down Expand Up @@ -102,13 +102,14 @@ static COLD int mmap_init_common(AVTContext *ctx, AVTIOCtx *io)
return ret;
}

io->map = avt_buffer_create(data, len,
(void *)((intptr_t)fd_dup),
mmap_buffer_free);
if (!io->map) {
ret = avt_buffer_quick_create(&io->map, data, len,
(void *)((intptr_t)fd_dup),
mmap_buffer_free,
AVT_BUFFER_FLAG_READ_ONLY);
if (ret < 0) {
munmap(data, len);
close(fd_dup);
return AVT_ERROR(ENOMEM);
return ret;
}

return 0;
Expand Down Expand Up @@ -170,7 +171,7 @@ static int mmap_grow(AVTIOCtx *io, size_t amount)
{
int ret;
size_t old_map_size;
uint8_t *old_map = avt_buffer_get_data(io->map, &old_map_size);
uint8_t *old_map = avt_buffer_get_data(&io->map, &old_map_size);
void *new_map = MAP_FAILED;

amount = AVT_MAX(amount, MIN_ALLOC);
Expand All @@ -190,13 +191,13 @@ static int mmap_grow(AVTIOCtx *io, size_t amount)
* If there's only a single reference to the map (ours), allow it to be
* moved. */
new_map = mremap(old_map, old_map_size, new_map_size,
avt_buffer_get_refcount(io->map) == 1 ? MREMAP_MAYMOVE :
0);
avt_buffer_get_refcount(&io->map) == 1 ? MREMAP_MAYMOVE :
0);

/* Success */
if (new_map != MAP_FAILED) {
/* Update the existing buffer */
avt_buffer_update(io->map, new_map, new_map_size);
avt_buffer_update(&io->map, new_map, new_map_size);
return 0;
} else if (new_map == MAP_FAILED && errno != ENOMEM) {
ret = avt_handle_errno(io, "Error in mremap(): %i %s\n");
Expand Down Expand Up @@ -225,16 +226,18 @@ static int mmap_grow(AVTIOCtx *io, size_t amount)
return ret;
}

AVTBuffer *new_buf = avt_buffer_create(data, new_map_size,
(void *)((intptr_t)fd_dup),
mmap_buffer_free);
if (!io->map) {
AVTBuffer new_buf;
ret = avt_buffer_quick_create(&new_buf, data, new_map_size,
(void *)((intptr_t)fd_dup),
mmap_buffer_free,
AVT_BUFFER_FLAG_READ_ONLY);
if (ret < 0) {
munmap(data, new_map_size);
close(fd_dup);
return AVT_ERROR(ENOMEM);
return ret;
}

avt_buffer_unref(&io->map);
avt_buffer_quick_unref(&io->map);
io->map = new_buf;

return 0;
Expand All @@ -248,7 +251,7 @@ static int mmap_max_pkt_len(AVTIOCtx *io, size_t *mtu)

static inline avt_pos mmap_seek(AVTIOCtx *io, avt_pos pos)
{
if (pos > avt_buffer_get_data_len(io->map))
if (pos > avt_buffer_get_data_len(&io->map))
return AVT_ERROR(ERANGE);
return (io->rpos = pos);
}
Expand All @@ -259,15 +262,15 @@ static avt_pos mmap_write_pkt(AVTIOCtx *io, AVTPktd *p, int64_t timeout)
uint8_t *pl_data = avt_buffer_get_data(&p->pl, &pl_len);

size_t map_size;
uint8_t *map_data = avt_buffer_get_data(io->map, &map_size);
uint8_t *map_data = avt_buffer_get_data(&io->map, &map_size);

if ((io->wpos + p->hdr_len + pl_len) > map_size) {
int ret = mmap_grow(io, p->hdr_len + pl_len);
if (ret < 0)
return ret;
}

map_data = avt_buffer_get_data(io->map, &map_size);
map_data = avt_buffer_get_data(&io->map, &map_size);
memcpy(&map_data[io->wpos], p->hdr, p->hdr_len);
memcpy(&map_data[io->wpos + p->hdr_len], pl_data, pl_len);

Expand All @@ -285,7 +288,7 @@ static avt_pos mmap_write_vec(AVTIOCtx *io, AVTPktd *iov, uint32_t nb_iov,
sum += iov[i].hdr_len + avt_buffer_get_data_len(&iov[i].pl);

size_t map_size;
uint8_t *map_data = avt_buffer_get_data(io->map, &map_size);
uint8_t *map_data = avt_buffer_get_data(&io->map, &map_size);

if ((io->wpos + sum) > map_size) {
int ret = mmap_grow(io, sum);
Expand All @@ -294,7 +297,7 @@ static avt_pos mmap_write_vec(AVTIOCtx *io, AVTPktd *iov, uint32_t nb_iov,
}

avt_pos offset = io->wpos;
map_data = avt_buffer_get_data(io->map, &map_size);
map_data = avt_buffer_get_data(&io->map, &map_size);
for (int i = 0; i < nb_iov; i++) {
pl_data = avt_buffer_get_data(&iov[i].pl, &pl_len);
memcpy(&map_data[offset], iov[i].hdr, iov[i].hdr_len);
Expand All @@ -313,7 +316,7 @@ static avt_pos mmap_rewrite(AVTIOCtx *io, AVTPktd *p, avt_pos off,
uint8_t *pl_data = avt_buffer_get_data(&p->pl, &pl_len);

size_t map_size;
uint8_t *map_data = avt_buffer_get_data(io->map, &map_size);
uint8_t *map_data = avt_buffer_get_data(&io->map, &map_size);

if ((off + p->hdr_len + pl_len) > map_size)
return AVT_ERROR(ERANGE);
Expand All @@ -329,7 +332,7 @@ static inline int64_t mmap_read(AVTIOCtx *io, AVTBuffer *dst,
enum AVTIOReadFlags flags)
{
size_t map_size;
uint8_t *map_data = avt_buffer_get_data(io->map, &map_size);
uint8_t *map_data = avt_buffer_get_data(&io->map, &map_size);

/* TODO: cap map_size at wpos? */
len = AVT_MIN(map_size - io->rpos, len);
Expand All @@ -339,7 +342,7 @@ static inline int64_t mmap_read(AVTIOCtx *io, AVTBuffer *dst,
memcpy(dst, &map_data[io->rpos], len);
} else {
avt_buffer_quick_unref(dst);
avt_buffer_quick_ref(dst, io->map, io->rpos, len);
avt_buffer_quick_ref(dst, &io->map, io->rpos, len);
}

len = io->rpos + len;
Expand All @@ -350,7 +353,7 @@ static inline int64_t mmap_read(AVTIOCtx *io, AVTBuffer *dst,
static int mmap_flush(AVTIOCtx *io, int64_t timeout)
{
size_t map_size;
uint8_t *map_data = avt_buffer_get_data(io->map, &map_size);
uint8_t *map_data = avt_buffer_get_data(&io->map, &map_size);

int ret = msync(map_data, map_size, timeout == 0 ? MS_ASYNC : MS_SYNC);
if (ret < 0)
Expand Down
8 changes: 8 additions & 0 deletions libavtransport/ldpc_decode.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,11 @@
*/

#include "ldpc_decode.h"

void avt_ldpc_decode_288_224(uint8_t *dst, int iterations)
{
}

void avt_ldpc_decode_2784_2016(uint8_t *dst, int iterations)
{
}
14 changes: 11 additions & 3 deletions libavtransport/ldpc_decode.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,15 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef LIBAVTRANSPORT_LDPC_DECODE
#define LIBAVTRANSPORT_LDPC_DECODE
#ifndef AVTRANSPORT_LDPC_DECODE
#define AVTRANSPORT_LDPC_DECODE

#endif
#include <stdint.h>

/* Decodes systematic LDPC codes. dst must point to the start of the message. */

void avt_ldpc_decode_288_224(uint8_t *dst, int iterations);

void avt_ldpc_decode_2784_2016(uint8_t *dst, int iterations);

#endif /* AVTRANSPORT_LDPC_DECODE */
Loading

0 comments on commit 6f853e4

Please sign in to comment.