From 324cd3e59929475d045035529f0fb48fee224c74 Mon Sep 17 00:00:00 2001
From: Nick Dichev <117689053+nickdichev-firework@users.noreply.github.com>
Date: Fri, 1 Mar 2024 00:17:50 -0800
Subject: [PATCH] Handle socket closed by client before start of stream arrives
 in RTMP Source Bin (#87)

* Handle unexpected socket closed by client before start of stream arrives

* Bump version to 0.23.0
---
 README.md                                       |  2 +-
 lib/membrane_rtmp_plugin/rtmp/source/bin.ex     |  4 ++++
 mix.exs                                         |  2 +-
 .../rtmp_source_bin_test.exs                    | 17 +++++++++++++++++
 4 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/README.md b/README.md
index 9ddfb17d..2c4d6f0a 100644
--- a/README.md
+++ b/README.md
@@ -14,7 +14,7 @@ The package can be installed by adding `membrane_rtmp_plugin` to your list of de
 ```elixir
 def deps do
   [
-	  {:membrane_rtmp_plugin, "~> 0.22.1"}
+	  {:membrane_rtmp_plugin, "~> 0.23.0"}
   ]
 end
 ```
diff --git a/lib/membrane_rtmp_plugin/rtmp/source/bin.ex b/lib/membrane_rtmp_plugin/rtmp/source/bin.ex
index 353820b9..3acb2aa8 100644
--- a/lib/membrane_rtmp_plugin/rtmp/source/bin.ex
+++ b/lib/membrane_rtmp_plugin/rtmp/source/bin.ex
@@ -105,6 +105,10 @@ defmodule Membrane.RTMP.SourceBin do
     {[notify_parent: notification], state}
   end
 
+  def handle_child_notification(:unexpected_socket_closed, :src, _ctx, state) do
+    {[notify_parent: :unexpected_socket_close], state}
+  end
+
   @doc """
   Passes the control of the socket to the `source`.
 
diff --git a/mix.exs b/mix.exs
index 29051a99..3c92d7cc 100644
--- a/mix.exs
+++ b/mix.exs
@@ -1,7 +1,7 @@
 defmodule Membrane.RTMP.Mixfile do
   use Mix.Project
 
-  @version "0.22.1"
+  @version "0.23.0"
   @github_url "https://github.com/membraneframework/membrane_rtmp_plugin"
 
   def project do
diff --git a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs
index d67d3b04..c6938098 100644
--- a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs
+++ b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs
@@ -184,6 +184,23 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do
     assert :error = Task.await(ffmpeg_task)
   end
 
+  test "Handles client socket close before start of stream" do
+    {:ok, port} = start_tcp_server()
+
+    Task.async(fn ->
+      host = @local_ip |> String.to_charlist() |> :inet.parse_address() |> elem(1)
+      {:ok, socket} = :gen_tcp.connect(host, port, [], :infinity)
+      :gen_tcp.close(socket)
+    end)
+
+    pipeline = await_pipeline_started()
+
+    assert_end_of_stream(pipeline, :audio_sink, :input)
+    assert_end_of_stream(pipeline, :video_sink, :input)
+
+    assert_pipeline_notified(pipeline, :src, :unexpected_socket_close)
+  end
+
   defp start_tcp_server(validator \\ %Membrane.RTMP.MessageValidator.Default{}) do
     test_process = self()