From 900cbdc91ccbccf261d3a7aa3d7f4bc0215dc5c0 Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Mon, 30 Oct 2023 16:26:24 +0100 Subject: [PATCH] Add thousand islands based source --- README.md | 44 +++++-------------------------- c_src/Makefile | 2 +- c_src/ioq.c | 55 --------------------------------------- c_src/ioq.h | 28 -------------------- c_src/libav.c | 15 +++++------ lib/avx/nif.ex | 4 --- mix.exs | 2 +- test/avx/demuxer_test.exs | 20 +++++++++++++- test/support.ex | 11 ++++++++ 9 files changed, 44 insertions(+), 137 deletions(-) delete mode 100644 c_src/ioq.c delete mode 100644 c_src/ioq.h diff --git a/README.md b/README.md index baec76f..aca9938 100644 --- a/README.md +++ b/README.md @@ -41,17 +41,14 @@ function. Check the tests, but in practice this is the flow for decoding audio from a multi-track file from file to file in a lazy fashion. -If initialized directly from a file, the demuxer -supports all protocols supported by libav itself (such as RTMP, UDP, HLS, local files, ...). -If a custom reader is provided, such the the MailboxReader, users should take -care of fetching and sending the bytes to the demuxer by themselves (very useful when -sending data from browser microphone through liveview and to the demuxer, for example). +Supports all protocols supported by libav itself (such as RTMP, UDP, HLS, local +files, unix sockets, TCP, UDP, ...). ```elixir -demuxer = Demuxer.new_from_file(input_path) +{:ok, demuxer} = Demuxer.new_from_file(input_path) # Detect available stream and select one (or more) -{streams, demuxer} = Demuxer.streams(demuxer) +streams = Demuxer.read_streams(demuxer) audio_stream = Enum.find(streams, fn stream -> stream.codec_type == :audio end) # Initialize the decoder. The sample rate, channel and audio format will match @@ -67,37 +64,8 @@ demuxer |> Enum.into(output) ``` -An experimental feature: the demuxer can be initialized with a custom reader, -which might send data from mailbox, a file and other custom patterns. - -This is an example reading from a file. -```elixir -demuxer = - Demuxer.new_in_memory(%{ - opaque: File.open!(input_path, [:raw, :read]), - read: fn input, size -> - resp = IO.binread(input, size) - {resp, input} - end, - close: fn input -> File.close(input) end - }) - -``` - -This one uses the MailboxReader: you can send messages with the data to -it and will act as a source to the Demuxer. -```elixir -{:ok, pid} = AVx.Demuxer.MailboxReader.start_link() - -demuxer = AVx.Demuxer.new_in_memory(%{ - opaque: pid, - read: &AVx.Demuxer.MailboxReader.read/2, - close: &AVx.Demuxer.MailboxReader.close/1 - }) - -# In another process -:ok = AVx.Demuxer.MailboxReader.add_data(pid, <<>>) -``` +I'm currently working on a ThousandIsland Handler that can be used +to create a Demuxer source suitable for streaming setups. And that's it. Compared to using the `ffmpeg` executable directly, here you have access to every single packet, which you can re-route, manipulate and process at will. diff --git a/c_src/Makefile b/c_src/Makefile index 6358849..cbea621 100644 --- a/c_src/Makefile +++ b/c_src/Makefile @@ -22,7 +22,7 @@ endif all: $(LIB_SO) -$(LIB_SO): libav.c ioq.c demuxer.c decoder.c +$(LIB_SO): libav.c demuxer.c decoder.c @ mkdir -p $(PRIV_DIR) $(CC) $(CFLAGS) $(LDFLAGS) -o $@ $^ $(LIBS) diff --git a/c_src/ioq.c b/c_src/ioq.c deleted file mode 100644 index c9b3025..0000000 --- a/c_src/ioq.c +++ /dev/null @@ -1,55 +0,0 @@ -#include -#include -#include - -int queue_is_filled(Ioq *q) { return q->buf_end == q->size; } -int queue_freespace(Ioq *q) { return q->size - q->buf_end; } - -void queue_grow(Ioq *q, int factor) { - u_long new_size; - - new_size = q->size * factor; - q->ptr = realloc(q->ptr, new_size); - q->size = new_size; -} - -void queue_copy(Ioq *q, void *src, int size) { - // Do we have enough space for the data? If not, reallocate some space. - if (queue_freespace(q) < size) - queue_grow(q, 2); - - memcpy(q->ptr + q->buf_end, src, size); - q->buf_end += size; -} - -void queue_deq(Ioq *q) { - int unread; - - unread = q->buf_end - q->pos; - if (unread == 0) { - q->pos = 0; - q->buf_end = 0; - } else { - memmove(q->ptr, q->ptr + q->pos, unread); - q->pos = 0; - q->buf_end = unread; - } -} - -int queue_read(Ioq *q, void *dst, int buf_size) { - int unread; - int size; - - unread = q->buf_end - q->pos; - if (unread <= 0) - return -1; - - size = buf_size > unread ? unread : buf_size; - memcpy(dst, q->ptr + q->pos, size); - q->pos += size; - - if (q->mode == QUEUE_MODE_SHIFT) - queue_deq(q); - - return size; -} diff --git a/c_src/ioq.h b/c_src/ioq.h deleted file mode 100644 index 2b88369..0000000 --- a/c_src/ioq.h +++ /dev/null @@ -1,28 +0,0 @@ -#include - -typedef enum { QUEUE_MODE_SHIFT, QUEUE_MODE_GROW } QUEUE_MODE; - -typedef struct { - void *ptr; - // The total size of ptr - u_long size; - // Where the next bytes should be written at. - u_long buf_end; - - // position of the last read. - u_long pos; - - // Used to differentiate wether the queue is removing - // the bytes each time they're read or it is growing to - // accomodate more. The latter is used when probing the - // input to find the header. - QUEUE_MODE mode; - int input_eos; -} Ioq; - -int queue_is_filled(Ioq *q); -int queue_freespace(Ioq *q); -void queue_grow(Ioq *q, int factor); -void queue_copy(Ioq *q, void *src, int size); -void queue_deq(Ioq *q); -int queue_read(Ioq *q, void *dst, int buf_size); diff --git a/c_src/libav.c b/c_src/libav.c index 6c504ab..779a36e 100644 --- a/c_src/libav.c +++ b/c_src/libav.c @@ -1,7 +1,3 @@ -#include "libavcodec/codec_par.h" -#include "libavformat/avformat.h" -#include "libavutil/log.h" -#include "libavutil/samplefmt.h" #include #include #include @@ -450,17 +446,18 @@ static ErlNifFunc nif_funcs[] = { // {erl_function_name, erl_function_arity, c_function} // Demuxer - {"demuxer_alloc_from_file", 1, enif_demuxer_alloc_from_file}, + {"demuxer_alloc_from_file", 1, enif_demuxer_alloc_from_file, + ERL_NIF_DIRTY_JOB_IO_BOUND}, {"demuxer_streams", 1, enif_demuxer_streams}, {"demuxer_read_packet", 1, enif_demuxer_read_packet}, - // Decoder + // // Decoder {"decoder_alloc", 1, enif_decoder_alloc}, {"decoder_stream_format", 1, enif_decoder_stream_format}, {"decoder_add_data", 2, enif_decoder_add_data}, - // General + // // General {"packet_stream_index", 1, enif_packet_stream_index}, - // TODO - // Maybe unpack_* would be better function naming. + // // TODO + // // Maybe unpack_* would be better function naming. {"packet_unpack", 1, enif_packet_unpack}, {"audio_frame_unpack", 1, enif_audio_frame_unpack}, }; diff --git a/lib/avx/nif.ex b/lib/avx/nif.ex index 964b956..b3672fd 100644 --- a/lib/avx/nif.ex +++ b/lib/avx/nif.ex @@ -10,10 +10,6 @@ defmodule AVx.NIF do raise "NIF demuxer_alloc_from_file/1 not implemented" end - def demuxer_read_header(_ctx) do - raise "NIF demuxer_read_header/1 not implemented" - end - def demuxer_streams(_ctx) do raise "NIF demuxer_streams/1 not implemented" end diff --git a/mix.exs b/mix.exs index b6799db..b2c7faa 100644 --- a/mix.exs +++ b/mix.exs @@ -27,7 +27,7 @@ defmodule AVx.MixProject do {:nimble_options, "~> 1.0.0"}, {:elixir_make, "~> 0.6", runtime: false}, {:jason, "~> 1.4.1"}, - {:thousand_island, "~> 1.1", only: :test} + {:thousand_island, "~> 1.0", only: :test} ] end diff --git a/test/avx/demuxer_test.exs b/test/avx/demuxer_test.exs index 2ffc9e6..e6567a2 100644 --- a/test/avx/demuxer_test.exs +++ b/test/avx/demuxer_test.exs @@ -7,7 +7,25 @@ defmodule AVx.DemuxerTest do describe "demuxer" do test "from file" do - {:ok, demuxer} = Demuxer.new_from_file(@input) + assert_demuxer(@input) + end + + test "from tcp socket" do + pid = + start_link_supervised!( + {ThousandIsland, + [port: 0, handler_module: Support.TiHandler, handler_options: %{path: @input}]} + ) + + {:ok, {_, port}} = ThousandIsland.listener_info(pid) + addr = "tcp://127.0.0.1:#{port}" + + assert_demuxer(addr) + ThousandIsland.stop(pid) + end + + defp assert_demuxer(input_path) do + {:ok, demuxer} = Demuxer.new_from_file(input_path) streams = Demuxer.read_streams(demuxer) stream = Enum.find(streams, fn stream -> stream.codec_type == :audio end) diff --git a/test/support.ex b/test/support.ex index d9c4423..83b8db1 100644 --- a/test/support.ex +++ b/test/support.ex @@ -18,3 +18,14 @@ defmodule Support do |> Enum.filter(fn %{"type" => type} -> type == "frame" end) end end + +defmodule Support.TiHandler do + use ThousandIsland.Handler + + @impl ThousandIsland.Handler + def handle_connection(socket, state) do + data = File.read!(state.path) + ThousandIsland.Socket.send(socket, data) + {:close, state} + end +end