From 6eb7ec149d0419fcb4942aa808dedde740037138 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Per=C5=BCy=C5=82o?= Date: Fri, 29 Dec 2023 14:55:40 +0100 Subject: [PATCH] Add onMetaData command support --- .../rtmp/source/message.ex | 6 +- .../rtmp/source/message_handler.ex | 11 +++ .../rtmp/source/message_validator.ex | 6 ++ .../rtmp/source/message_validator/default.ex | 3 + .../rtmp/source/messages/on_meta_data.ex | 74 +++++++++++++++++++ mix.exs | 2 +- test/support/test_validator.ex | 3 + 7 files changed, 102 insertions(+), 3 deletions(-) create mode 100644 lib/membrane_rtmp_plugin/rtmp/source/messages/on_meta_data.ex diff --git a/lib/membrane_rtmp_plugin/rtmp/source/message.ex b/lib/membrane_rtmp_plugin/rtmp/source/message.ex index bc984322..696d06c2 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/message.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/message.ex @@ -28,11 +28,13 @@ defmodule Membrane.RTMP.Message do "FCPublish" => Messages.FCPublish, "createStream" => Messages.CreateStream, "publish" => Messages.Publish, - "@setDataFrame" => Messages.SetDataFrame + "@setDataFrame" => Messages.SetDataFrame, + "onMetaData" => Messages.OnMetaData } @amf_data_to_module %{ - "@setDataFrame" => Messages.SetDataFrame + "@setDataFrame" => Messages.SetDataFrame, + "onMetaData" => Messages.OnMetaData } @spec deserialize_message(type_id :: integer(), binary()) :: struct() diff --git a/lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex b/lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex index 97f98731..e6a029ef 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex @@ -154,6 +154,17 @@ defmodule Membrane.RTMP.MessageHandler do end end + @validation_stage :on_meta_data + defp do_handle_client_message(%Messages.OnMetaData{} = on_meta_data, _header, state) do + case MessageValidator.validate_on_meta_data(state.validator, on_meta_data) do + {:ok, _msg} = result -> + {:cont, validation_action(state, @validation_stage, result)} + + {:error, _reason} = error -> + {:halt, {:error, :stream_validation, validation_action(state, @validation_stage, error)}} + end + end + # According to ffmpeg's documentation, this command should prepare the server to receive media streams # We are simply acknowledging the message defp do_handle_client_message(%Messages.FCPublish{}, _header, state) do diff --git a/lib/membrane_rtmp_plugin/rtmp/source/message_validator.ex b/lib/membrane_rtmp_plugin/rtmp/source/message_validator.ex index f0f3edb0..c3443f23 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/message_validator.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/message_validator.ex @@ -31,4 +31,10 @@ defprotocol Membrane.RTMP.MessageValidator do """ @spec validate_set_data_frame(t(), Messages.SetDataFrame.t()) :: validation_result_t() def validate_set_data_frame(impl, message) + + @doc """ + Validates the `t:Membrane.RTMP.Messages.OnMetaData.t/0` message. + """ + @spec validate_on_meta_data(t(), Messages.OnMetaData.t()) :: validation_result_t() + def validate_on_meta_data(impl, message) end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/message_validator/default.ex b/lib/membrane_rtmp_plugin/rtmp/source/message_validator/default.ex index 90e028a9..c8560ed6 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/message_validator/default.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/message_validator/default.ex @@ -18,4 +18,7 @@ defimpl Membrane.RTMP.MessageValidator, for: Membrane.RTMP.MessageValidator.Defa @impl true def validate_set_data_frame(_impl, _message), do: {:ok, "set data frame success"} + + @impl true + def validate_on_meta_data(_impl, _message), do: {:ok, "on meta data success"} end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/on_meta_data.ex b/lib/membrane_rtmp_plugin/rtmp/source/messages/on_meta_data.ex new file mode 100644 index 00000000..88d9ea0c --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/messages/on_meta_data.ex @@ -0,0 +1,74 @@ +defmodule Membrane.RTMP.Messages.OnMetaData do + @moduledoc """ + Defines the RTMP `onMetaData` command (sent by nginx client). + """ + + @behaviour Membrane.RTMP.Message + + alias Membrane.RTMP.AMF.Encoder + + @attributes_to_keys %{ + "duration" => :duration, + "width" => :width, + "height" => :height, + "videocodecid" => :video_codec_id, + "videodatarate" => :video_data_rate, + "framerate" => :framerate, + "audiocodecid" => :audio_codec_id, + "audiodatarate" => :audio_data_rate + } + + @keys_to_attributes Map.new(@attributes_to_keys, fn {key, value} -> {value, key} end) + + defstruct Map.keys(@keys_to_attributes) + + @type t :: %__MODULE__{ + duration: number(), + # video related + width: number(), + height: number(), + video_codec_id: number(), + video_data_rate: number(), + framerate: number(), + # audio related + audio_codec_id: number(), + audio_data_rate: number() + } + + @impl true + def from_data(["onMetaData", properties]) do + new(properties) + end + + @spec new([{String.t(), any()}]) :: t() + def new(options) do + params = + options + |> Map.new() + |> Map.take(Map.keys(@attributes_to_keys)) + |> Enum.map(fn {key, value} -> + {Map.fetch!(@attributes_to_keys, key), value} + end) + + struct!(__MODULE__, params) + end + + # helper for message serialization + @doc false + @spec to_map(t()) :: map() + def to_map(%__MODULE__{} = message) do + Map.new(message, fn {key, value} -> {Map.fetch!(@keys_to_attributes, key), value} end) + end + + defimpl Membrane.RTMP.Messages.Serializer do + require Membrane.RTMP.Header + + @impl true + def serialize(%@for{} = message) do + Encoder.encode([@for.to_map(message)]) + end + + @impl true + def type(%@for{}), do: Membrane.RTMP.Header.type(:amf_data) + end +end diff --git a/mix.exs b/mix.exs index 7beefcc9..d62e9a33 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.RTMP.Mixfile do use Mix.Project - @version "0.20.2" + @version "0.21.0" @github_url "https://github.com/membraneframework/membrane_rtmp_plugin" def project do diff --git a/test/support/test_validator.ex b/test/support/test_validator.ex index e20ec048..669a2aa6 100644 --- a/test/support/test_validator.ex +++ b/test/support/test_validator.ex @@ -32,4 +32,7 @@ defimpl Membrane.RTMP.MessageValidator, for: Support.TestValidator do @impl true def validate_set_data_frame(_impl, _message), do: {:ok, "set data frame success"} + + @impl true + def validate_on_meta_data(_impl, _message), do: {:ok, "on meta data success"} end