diff --git a/config/runtime.exs b/config/runtime.exs index 1fdbad3eb..58b7181f1 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -36,7 +36,10 @@ if config_env() != :test do message_log_s3_folder: System.get_env("MESSAGE_LOG_S3_FOLDER"), message_log_report_s3_folder: System.get_env("MESSAGE_LOG_REPORT_S3_FOLDER"), monitoring_api_key: System.get_env("MONITORING_API_KEY"), - active_headend_path: System.get_env("ACTIVE_HEADEND_S3_PATH") + active_headend_path: System.get_env("ACTIVE_HEADEND_S3_PATH"), + screenplay_url: System.get_env("SCREENPLAY_URL"), + screenplay_api_key: System.get_env("SCREENPLAY_API_KEY"), + active_pa_messages_path: System.get_env("ACTIVE_PA_MESSAGES_PATH") end if config_env() == :dev do diff --git a/config/test.exs b/config/test.exs index b3c9acacd..9d5efd893 100644 --- a/config/test.exs +++ b/config/test.exs @@ -11,4 +11,6 @@ config :realtime_signs, external_config_getter: Fake.ExternalConfig.Local, sign_config_file: "priv/config.json", aws_client: Fake.ExAws, - s3_client: Fake.ExAws + s3_client: Fake.ExAws, + screenplay_url: "fake-screenplay.com", + active_pa_messages_path: "active" diff --git a/lib/engine/pa_messages.ex b/lib/engine/pa_messages.ex new file mode 100644 index 000000000..62a09dac1 --- /dev/null +++ b/lib/engine/pa_messages.ex @@ -0,0 +1,191 @@ +defmodule Engine.PaMessages do + use GenServer + require Logger + + alias PaMessages.PaMessage + + @type state :: %{ + pa_message_timers_table: :ets.tab() + } + + @pa_message_timers_table :pa_message_timers + @minute_in_ms 1000 * 60 + + def start_link([]) do + GenServer.start_link(__MODULE__, [], name: __MODULE__) + end + + @impl true + def init(_) do + # Add some delay to wait for sign processes to come up before sending PA messages + Process.send_after(self(), :update, 10000) + state = %{pa_message_timers_table: @pa_message_timers_table} + create_table(state) + {:ok, state} + end + + def create_table(state) do + :ets.new(state.pa_message_timers_table, [:named_table, read_concurrency: true]) + end + + @impl true + def handle_info(:update, state) do + schedule_update(self()) + + case get_active_pa_messages() do + {:ok, pa_messages} -> + schedule_pa_messages(pa_messages, state.pa_message_timers_table) + |> handle_inactive_pa_messages(state.pa_message_timers_table) + + {:error, %HTTPoison.Response{status_code: status_code, body: body}} -> + Logger.warn("pa_messages_response_error: status_code=#{status_code} body=#{body}") + + {:error, %HTTPoison.Error{reason: reason}} -> + Logger.warn("pa_messages_response_error: reason=#{reason}") + + {:error, error} -> + Logger.warn("pa_messages_response_error: error=#{inspect(error)}") + end + + {:noreply, state} + end + + @impl true + def handle_info({:send_pa_message, pa_message}, state) do + Enum.each(pa_message.sign_ids, fn sign_id -> + send( + String.to_existing_atom("Signs/#{sign_id}"), + {:play_pa_message, pa_message} + ) + end) + + {:noreply, state} + end + + defp schedule_pa_messages(pa_messages, table) do + for %{ + "id" => pa_id, + "visual_text" => visual_text, + "audio_text" => audio_text, + "interval_in_minutes" => interval_in_minutes, + "priority" => priority, + "sign_ids" => sign_ids + } <- pa_messages do + active_pa_message = %PaMessage{ + id: pa_id, + visual_text: visual_text, + audio_text: audio_text, + priority: priority, + sign_ids: ["Silver_Line.South_Station_EB" | sign_ids], + interval_in_ms: interval_in_minutes * @minute_in_ms + } + + case get_pa_message_timer(pa_id, table) do + {timer_ref, existing_pa_message} + when existing_pa_message.interval_in_ms != active_pa_message.interval_in_ms -> + case Process.read_timer(timer_ref) do + false -> + schedule_pa_message(active_pa_message, active_pa_message.interval_in_ms, table) + + remaining_ms -> + ms_elapsed = existing_pa_message.interval_in_ms - remaining_ms + temp_interval = (active_pa_message.interval_in_ms - ms_elapsed) |> max(0) + cancel_pa_timer(timer_ref, pa_id) + schedule_pa_message(active_pa_message, temp_interval, table) + end + + {timer, _} -> + if Process.read_timer(timer) == false do + schedule_pa_message(active_pa_message, active_pa_message.interval_in_ms, table) + end + + nil -> + schedule_pa_message(active_pa_message, 0, table) + end + + pa_id + end + end + + defp schedule_pa_message(pa_message, interval_in_ms, table) do + Logger.info( + "pa_message: action=scheduled id=#{pa_message.id} interval_ms=#{interval_in_ms} sign_ids=#{inspect(pa_message.sign_ids)}" + ) + + timer_ref = + Process.send_after( + self(), + {:send_pa_message, pa_message}, + interval_in_ms + ) + + :ets.insert( + table, + {pa_message.id, {timer_ref, pa_message}} + ) + end + + defp handle_inactive_pa_messages(active_pa_ids, table) do + :ets.tab2list(table) + |> Enum.each(fn {pa_id, {timer_ref, pa_message}} -> + if pa_id not in active_pa_ids do + cancel_pa_timer(timer_ref, pa_id) + delete_pa_message(pa_message, table) + end + end) + end + + defp cancel_pa_timer(timer_ref, pa_id) do + Logger.info("pa_message: action=timer_canceled id=#{pa_id}") + Process.cancel_timer(timer_ref) + end + + defp delete_pa_message(pa_message, table) do + Logger.info("pa_message: action=message_deleted id=#{pa_message.id}") + + Enum.each(pa_message.sign_ids, fn sign_id -> + send( + String.to_existing_atom("Signs/#{sign_id}"), + {:delete_pa_message, pa_message.id} + ) + end) + + :ets.delete(table, pa_message.id) + end + + defp get_pa_message_timer(pa_id, table) do + case :ets.lookup(table, pa_id) do + [{^pa_id, timer}] -> timer + _ -> nil + end + end + + defp get_active_pa_messages() do + active_pa_messages_url = + Application.get_env(:realtime_signs, :screenplay_url) <> + Application.get_env(:realtime_signs, :active_pa_messages_path) + + http_client = Application.get_env(:realtime_signs, :http_client) + + with {:ok, response} <- + http_client.get( + active_pa_messages_url, + [ + {"x-api-key", Application.get_env(:realtime_signs, :screenplay_api_key)} + ], + timeout: 2000, + recv_timeout: 2000 + ), + %{status_code: 200, body: body} <- response, + {:ok, pa_messages} <- Jason.decode(body) do + {:ok, pa_messages} + else + error -> + {:error, error} + end + end + + defp schedule_update(pid) do + Process.send_after(pid, :update, 1_000) + end +end diff --git a/lib/fake/httpoison.ex b/lib/fake/httpoison.ex index 2d9a0a9bf..816701514 100644 --- a/lib/fake/httpoison.ex +++ b/lib/fake/httpoison.ex @@ -128,6 +128,137 @@ defmodule Fake.HTTPoison do }} end + def mock_response("https://screenplay-fake.mbtace.com/api/pa-messages/active") do + response = [ + %{ + "alert_id" => nil, + "audio_text" => + "This is an example of a PA message that will be played at an MBTA station", + "days_of_week" => [1, 2, 3, 4, 5, 6, 7], + "end_time" => "2033-12-05T23:53:23Z", + "id" => 4, + "inserted_at" => "2024-06-03T18:33:31Z", + "interval_in_minutes" => 1, + "message_type" => nil, + "paused" => nil, + "priority" => 1, + "saved" => nil, + "sign_ids" => ["wellington_northbound", "wellington_mezzanine", "wellington_southbound"], + "start_time" => "2024-06-03T18:33:23Z", + "updated_at" => "2024-06-03T18:33:31Z", + "visual_text" => + "This is an example of a PA message that will be played at an MBTA station" + }, + %{ + "alert_id" => nil, + "audio_text" => "This is another PA message that will play at MBTA stations", + "days_of_week" => [1, 2, 3, 4, 5, 6, 7], + "end_time" => "2027-08-05T05:41:10Z", + "id" => 5, + "inserted_at" => "2024-06-03T19:54:40Z", + "interval_in_minutes" => 2, + "message_type" => nil, + "paused" => nil, + "priority" => 1, + "saved" => nil, + "sign_ids" => [ + "wellington_northbound", + "wellington_southbound", + "wellington_mezzanine", + "sullivan_mezzanine", + "sullivan_northbound", + "sullivan_southbound" + ], + "start_time" => "2024-06-03T19:54:30Z", + "updated_at" => "2024-06-03T19:54:40Z", + "visual_text" => "This is another PA message that will play at MBTA stations" + } + ] + + {:ok, %HTTPoison.Response{status_code: 200, body: Jason.encode!(response)}} + end + + def mock_response("https://screenplay-fake.mbtace.com/api/pa-messages/no-longer-active") do + response = [ + %{ + "alert_id" => nil, + "audio_text" => "This is another PA message that will play at MBTA stations", + "days_of_week" => [1, 2, 3, 4, 5, 6, 7], + "end_time" => "2027-08-05T05:41:10Z", + "id" => 5, + "inserted_at" => "2024-06-03T19:54:40Z", + "interval_in_minutes" => 2, + "message_type" => nil, + "paused" => nil, + "priority" => 1, + "saved" => nil, + "sign_ids" => [ + "wellington_northbound", + "wellington_southbound", + "wellington_mezzanine", + "sullivan_mezzanine", + "sullivan_northbound", + "sullivan_southbound" + ], + "start_time" => "2024-06-03T19:54:30Z", + "updated_at" => "2024-06-03T19:54:40Z", + "visual_text" => "This is another PA message that will play at MBTA stations" + } + ] + + {:ok, %HTTPoison.Response{status_code: 200, body: Jason.encode!(response)}} + end + + def mock_response("https://screenplay-fake.mbtace.com/api/pa-messages/changed-interval") do + response = [ + %{ + "alert_id" => nil, + "audio_text" => + "This is an example of a PA message that will be played at an MBTA station", + "days_of_week" => [1, 2, 3, 4, 5, 6, 7], + "end_time" => "2033-12-05T23:53:23Z", + "id" => 4, + "inserted_at" => "2024-06-03T18:33:31Z", + "interval_in_minutes" => 2, + "message_type" => nil, + "paused" => nil, + "priority" => 1, + "saved" => nil, + "sign_ids" => ["wellington_northbound", "wellington_mezzanine", "wellington_southbound"], + "start_time" => "2024-06-03T18:33:23Z", + "updated_at" => "2024-06-03T18:33:31Z", + "visual_text" => + "This is an example of a PA message that will be played at an MBTA station" + }, + %{ + "alert_id" => nil, + "audio_text" => "This is another PA message that will play at MBTA stations", + "days_of_week" => [1, 2, 3, 4, 5, 6, 7], + "end_time" => "2027-08-05T05:41:10Z", + "id" => 5, + "inserted_at" => "2024-06-03T19:54:40Z", + "interval_in_minutes" => 2, + "message_type" => nil, + "paused" => nil, + "priority" => 1, + "saved" => nil, + "sign_ids" => [ + "wellington_northbound", + "wellington_southbound", + "wellington_mezzanine", + "sullivan_mezzanine", + "sullivan_northbound", + "sullivan_southbound" + ], + "start_time" => "2024-06-03T19:54:30Z", + "updated_at" => "2024-06-03T19:54:40Z", + "visual_text" => "This is another PA message that will play at MBTA stations" + } + ] + + {:ok, %HTTPoison.Response{status_code: 200, body: Jason.encode!(response)}} + end + def mock_response("fake_trip_update2.json") do feed_message = %{ diff --git a/lib/pa_messages/pa_message.ex b/lib/pa_messages/pa_message.ex new file mode 100644 index 000000000..b4d2c6b36 --- /dev/null +++ b/lib/pa_messages/pa_message.ex @@ -0,0 +1,27 @@ +defmodule PaMessages.PaMessage do + defstruct id: nil, + visual_text: nil, + audio_text: nil, + priority: nil, + sign_ids: [], + interval_in_ms: nil + + @type t :: %__MODULE__{ + id: integer(), + visual_text: String.t(), + audio_text: String.t(), + priority: integer(), + sign_ids: [String.t()], + interval_in_ms: non_neg_integer() + } + + defimpl Content.Audio do + def to_params(%PaMessages.PaMessage{visual_text: visual_text}) do + {:ad_hoc, {visual_text, :audio_visual}} + end + + def to_tts(%PaMessages.PaMessage{visual_text: visual_text, audio_text: audio_text}) do + {audio_text, PaEss.Utilities.paginate_text(visual_text)} + end + end +end diff --git a/lib/realtime_signs.ex b/lib/realtime_signs.ex index 392e6d5c5..89bf7f67d 100644 --- a/lib/realtime_signs.ex +++ b/lib/realtime_signs.ex @@ -34,6 +34,7 @@ defmodule RealtimeSigns do Engine.BusPredictions, Engine.ChelseaBridge, Engine.Routes, + Engine.PaMessages, MessageQueue, RealtimeSigns.Scheduler, RealtimeSignsWeb.Endpoint, diff --git a/lib/signs/bus.ex b/lib/signs/bus.ex index 31f46dc75..92e455d62 100644 --- a/lib/signs/bus.ex +++ b/lib/signs/bus.ex @@ -34,7 +34,8 @@ defmodule Signs.Bus do :prev_bridge_status, :current_messages, :last_update, - :last_read_time + :last_read_time, + :pa_message_plays ] defstruct @enforce_keys @@ -62,7 +63,8 @@ defmodule Signs.Bus do prev_bridge_status: nil | map(), current_messages: tuple(), last_update: nil | DateTime.t(), - last_read_time: DateTime.t() + last_read_time: DateTime.t(), + pa_message_plays: %{integer() => DateTime.t()} } def start_link(sign) do @@ -90,7 +92,8 @@ defmodule Signs.Bus do prev_bridge_status: nil, current_messages: {nil, nil}, last_update: nil, - last_read_time: Timex.now() + last_read_time: Timex.now(), + pa_message_plays: %{} } GenServer.start_link(__MODULE__, state, name: :"Signs/#{state.id}") @@ -121,6 +124,23 @@ defmodule Signs.Bus do {:ok, state} end + @impl true + def handle_info({:play_pa_message, pa_message}, sign) do + Logger.info("pa_message: action=send id=#{pa_message.id} destination=#{sign.id}") + + pa_message_plays = + Signs.Utilities.Audio.send_pa_message(pa_message, sign.pa_message_plays, fn -> + send_audio([Content.Audio.to_params(pa_message)], sign) + end) + + {:noreply, %{sign | pa_message_plays: pa_message_plays}} + end + + @impl true + def handle_info({:delete_pa_message, pa_id}, sign) do + {:noreply, %{sign | pa_message_plays: Map.delete(sign.pa_message_plays, pa_id)}} + end + @impl true def handle_info(:run_loop, state) do Process.send_after(self(), :run_loop, 1000) diff --git a/lib/signs/realtime.ex b/lib/signs/realtime.ex index 2ade07339..e75b831a5 100644 --- a/lib/signs/realtime.ex +++ b/lib/signs/realtime.ex @@ -47,7 +47,8 @@ defmodule Signs.Realtime do announced_alert: false, prev_prediction_keys: nil, prev_predictions: [], - uses_shuttles: true + uses_shuttles: true, + pa_message_plays: %{} ] @type line_content :: Content.Message.t() @@ -86,7 +87,8 @@ defmodule Signs.Realtime do prev_prediction_keys: [{String.t(), 0 | 1}] | nil, announced_alert: boolean(), prev_predictions: [Predictions.Prediction.t()], - uses_shuttles: boolean() + uses_shuttles: boolean(), + pa_message_plays: %{integer() => DateTime.t()} } def start_link(%{"type" => "realtime"} = config) do @@ -116,7 +118,8 @@ defmodule Signs.Realtime do tick_read: 240 + Map.fetch!(config, "read_loop_offset"), read_period_seconds: 240, headway_stop_id: Map.get(config, "headway_stop_id"), - uses_shuttles: Map.get(config, "uses_shuttles", true) + uses_shuttles: Map.get(config, "uses_shuttles", true), + pa_message_plays: %{} } GenServer.start_link(__MODULE__, sign, name: :"Signs/#{sign.id}") @@ -129,6 +132,21 @@ defmodule Signs.Realtime do {:ok, sign} end + def handle_info({:play_pa_message, pa_message}, sign) do + Logger.info("pa_message: action=send id=#{pa_message.id} destination=#{sign.id}") + + pa_message_plays = + Signs.Utilities.Audio.send_pa_message(pa_message, sign.pa_message_plays, fn -> + Signs.Utilities.Audio.send_audio(sign, [pa_message]) + end) + + {:noreply, %{sign | pa_message_plays: pa_message_plays}} + end + + def handle_info({:delete_pa_message, pa_id}, sign) do + {:noreply, %{sign | pa_message_plays: Map.delete(sign.pa_message_plays, pa_id)}} + end + def handle_info(:run_loop, sign) do sign_stop_ids = SourceConfig.sign_stop_ids(sign.source_config) sign_routes = SourceConfig.sign_routes(sign.source_config) diff --git a/lib/signs/utilities/audio.ex b/lib/signs/utilities/audio.ex index 876485c56..f092b198d 100644 --- a/lib/signs/utilities/audio.ex +++ b/lib/signs/utilities/audio.ex @@ -401,4 +401,25 @@ defmodule Signs.Utilities.Audio do end) ) end + + @spec send_pa_message(PaMessages.PaMessage.t(), %{integer() => DateTime.t()}, function()) :: %{ + integer() => DateTime.t() + } + def send_pa_message(pa_message, pa_message_plays, send_audio_fn) do + case Map.get(pa_message_plays, pa_message.id) do + nil -> + send_audio_fn.() + + last_sent -> + if DateTime.diff(DateTime.utc_now(), last_sent, :millisecond) >= pa_message.interval_in_ms do + send_audio_fn.() + else + Logger.warn("pa_message: action=skipped id=#{pa_message.id}") + end + end + |> then(fn + :ok -> Map.put(pa_message_plays, pa_message.id, DateTime.utc_now()) + _ -> pa_message_plays + end) + end end diff --git a/test/engine/pa_messages_test.exs b/test/engine/pa_messages_test.exs new file mode 100644 index 000000000..a24fe7ac9 --- /dev/null +++ b/test/engine/pa_messages_test.exs @@ -0,0 +1,80 @@ +defmodule Engine.PaMessagesTest do + use ExUnit.Case + + @state %{ + pa_message_timers_table: :test_pa_message_timers + } + + setup_all do + screenplay_url = Application.get_env(:realtime_signs, :screenplay_url) + active_pa_messages_path = Application.get_env(:realtime_signs, :active_pa_messages_path) + + Application.put_env(:realtime_signs, :screenplay_url, "https://screenplay-fake.mbtace.com") + Application.put_env(:realtime_signs, :active_pa_messages_path, "/api/pa-messages/active") + + on_exit(fn -> + Application.put_env(:realtime_signs, :screenplay_url, screenplay_url) + Application.put_env(:realtime_signs, :active_pa_messages_path, active_pa_messages_path) + end) + end + + setup do + Application.put_env(:realtime_signs, :active_pa_messages_path, "/api/pa-messages/active") + end + + describe "handle_info/2 schedule messages" do + test "Schedules PA messages" do + Engine.PaMessages.create_table(@state) + Engine.PaMessages.handle_info(:update, @state) + + pa_ids = Enum.map(:ets.tab2list(:test_pa_message_timers), &elem(&1, 0)) + + assert 4 in pa_ids + assert 5 in pa_ids + end + end + + describe "handle_info/2 changes or deletes messages" do + test "Unschedules inactive PA messages" do + Engine.PaMessages.create_table(@state) + Engine.PaMessages.handle_info(:update, @state) + pa_ids = Enum.map(:ets.tab2list(:test_pa_message_timers), &elem(&1, 0)) + + assert 4 in pa_ids + assert 5 in pa_ids + + Application.put_env( + :realtime_signs, + :active_pa_messages_path, + "/api/pa-messages/no-longer-active" + ) + + Engine.PaMessages.handle_info(:update, @state) + pa_ids = Enum.map(:ets.tab2list(:test_pa_message_timers), &elem(&1, 0)) + + assert 4 not in pa_ids + assert 5 in pa_ids + end + + test "Updates timer when interval changes" do + Engine.PaMessages.create_table(@state) + Engine.PaMessages.handle_info(:update, @state) + + [{4, {timer_ref_before, pa_message_before}}] = :ets.lookup(:test_pa_message_timers, 4) + + Application.put_env( + :realtime_signs, + :active_pa_messages_path, + "/api/pa-messages/changed-interval" + ) + + Engine.PaMessages.handle_info(:update, @state) + + [{4, {timer_ref_after, pa_message_after}}] = :ets.lookup(:test_pa_message_timers, 4) + + assert timer_ref_before != timer_ref_after + assert Process.read_timer(timer_ref_before) == false + assert pa_message_before.interval_in_ms < pa_message_after.interval_in_ms + end + end +end diff --git a/test/signs/bus_test.exs b/test/signs/bus_test.exs index fd9dab67b..db0cdd80a 100644 --- a/test/signs/bus_test.exs +++ b/test/signs/bus_test.exs @@ -118,7 +118,8 @@ defmodule Signs.BusTest do prev_bridge_status: nil, current_messages: {nil, nil}, last_update: nil, - last_read_time: Timex.shift(Timex.now(), minutes: -10) + last_read_time: Timex.shift(Timex.now(), minutes: -10), + pa_message_plays: %{} } setup :verify_on_exit! diff --git a/test/signs/realtime_test.exs b/test/signs/realtime_test.exs index 25bbb2186..3c82d6e7d 100644 --- a/test/signs/realtime_test.exs +++ b/test/signs/realtime_test.exs @@ -49,7 +49,8 @@ defmodule Signs.RealtimeTest do sign_updater: PaEss.Updater.Mock, last_update: @fake_time, tick_read: 1, - read_period_seconds: 100 + read_period_seconds: 100, + pa_message_plays: %{} } @mezzanine_sign %{ @@ -1501,6 +1502,48 @@ defmodule Signs.RealtimeTest do end end + describe "PA messages" do + test "Plays message if no prior plays" do + expect_audios(ad_hoc: {"A PA Message", :audio_visual}) + + pa_message = %PaMessages.PaMessage{ + id: 1, + visual_text: "A PA Message", + audio_text: "A PA Message" + } + + Signs.Realtime.handle_info({:play_pa_message, pa_message}, @sign) + end + + test "Plays message if interval has passed" do + expect_audios(ad_hoc: {"A PA Message", :audio_visual}) + + pa_message = %PaMessages.PaMessage{ + id: 1, + visual_text: "A PA Message", + audio_text: "A PA Message", + interval_in_ms: 120_000 + } + + sign = %{@sign | pa_message_plays: %{1 => ~U[2024-06-10 12:00:00.000Z]}} + + Signs.Realtime.handle_info({:play_pa_message, pa_message}, sign) + end + + test "Does not play if less than interval has passed" do + pa_message = %PaMessages.PaMessage{ + id: 1, + visual_text: "A PA Message", + audio_text: "A PA Message", + interval_in_ms: 120_000 + } + + sign = %{@sign | pa_message_plays: %{1 => DateTime.utc_now()}} + + Signs.Realtime.handle_info({:play_pa_message, pa_message}, sign) + end + end + defp expect_messages(messages) do expect(PaEss.Updater.Mock, :set_background_message, fn _, top, bottom -> assert {top, bottom} == messages