Skip to content

Commit

Permalink
Trigger PA Messages from RTS
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulJKim committed Jun 11, 2024
1 parent 6974e00 commit 47c2b62
Show file tree
Hide file tree
Showing 12 changed files with 548 additions and 10 deletions.
5 changes: 4 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ if config_env() != :test do
message_log_s3_folder: System.get_env("MESSAGE_LOG_S3_FOLDER"),
message_log_report_s3_folder: System.get_env("MESSAGE_LOG_REPORT_S3_FOLDER"),
monitoring_api_key: System.get_env("MONITORING_API_KEY"),
active_headend_path: System.get_env("ACTIVE_HEADEND_S3_PATH")
active_headend_path: System.get_env("ACTIVE_HEADEND_S3_PATH"),
screenplay_url: System.get_env("SCREENPLAY_URL"),
screenplay_api_key: System.get_env("SCREENPLAY_API_KEY"),
active_pa_messages_path: System.get_env("ACTIVE_PA_MESSAGES_PATH")
end

if config_env() == :dev do
Expand Down
4 changes: 3 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ config :realtime_signs,
external_config_getter: Fake.ExternalConfig.Local,
sign_config_file: "priv/config.json",
aws_client: Fake.ExAws,
s3_client: Fake.ExAws
s3_client: Fake.ExAws,
screenplay_url: "fake-screenplay.com",
active_pa_messages_path: "active"
191 changes: 191 additions & 0 deletions lib/engine/pa_messages.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
defmodule Engine.PaMessages do
use GenServer
require Logger

alias PaMessages.PaMessage

@type state :: %{
pa_message_timers_table: :ets.tab()
}

@pa_message_timers_table :pa_message_timers
@minute_in_ms 1000 * 60

def start_link([]) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

@impl true
def init(_) do
# Add some delay to wait for sign processes to come up before sending PA messages
Process.send_after(self(), :update, 10000)
state = %{pa_message_timers_table: @pa_message_timers_table}
create_table(state)
{:ok, state}
end

def create_table(state) do
:ets.new(state.pa_message_timers_table, [:named_table, read_concurrency: true])
end

@impl true
def handle_info(:update, state) do
schedule_update(self())

case get_active_pa_messages() do
{:ok, pa_messages} ->
schedule_pa_messages(pa_messages, state.pa_message_timers_table)
|> handle_inactive_pa_messages(state.pa_message_timers_table)

{:error, %HTTPoison.Response{status_code: status_code, body: body}} ->
Logger.warn("pa_messages_response_error: status_code=#{status_code} body=#{body}")

{:error, %HTTPoison.Error{reason: reason}} ->
Logger.warn("pa_messages_response_error: reason=#{reason}")

{:error, error} ->
Logger.warn("pa_messages_response_error: error=#{inspect(error)}")
end

{:noreply, state}
end

@impl true
def handle_info({:send_pa_message, pa_message}, state) do
Enum.each(pa_message.sign_ids, fn sign_id ->
send(
String.to_existing_atom("Signs/#{sign_id}"),
{:play_pa_message, pa_message}
)
end)

{:noreply, state}
end

defp schedule_pa_messages(pa_messages, table) do
for %{
"id" => pa_id,
"visual_text" => visual_text,
"audio_text" => audio_text,
"interval_in_minutes" => interval_in_minutes,
"priority" => priority,
"sign_ids" => sign_ids
} <- pa_messages do
active_pa_message = %PaMessage{
id: pa_id,
visual_text: visual_text,
audio_text: audio_text,
priority: priority,
sign_ids: ["Silver_Line.South_Station_EB" | sign_ids],
interval_in_ms: interval_in_minutes * @minute_in_ms
}

case get_pa_message_timer(pa_id, table) do
{timer_ref, existing_pa_message}
when existing_pa_message.interval_in_ms != active_pa_message.interval_in_ms ->
case Process.read_timer(timer_ref) do
false ->
schedule_pa_message(active_pa_message, active_pa_message.interval_in_ms, table)

remaining_ms ->
ms_elapsed = existing_pa_message.interval_in_ms - remaining_ms
temp_interval = (active_pa_message.interval_in_ms - ms_elapsed) |> max(0)
cancel_pa_timer(timer_ref, pa_id)
schedule_pa_message(active_pa_message, temp_interval, table)
end

{timer, _} ->
if Process.read_timer(timer) == false do
schedule_pa_message(active_pa_message, active_pa_message.interval_in_ms, table)
end

nil ->
schedule_pa_message(active_pa_message, 0, table)
end

pa_id
end
end

defp schedule_pa_message(pa_message, interval_in_ms, table) do
Logger.info(
"pa_message: action=scheduled id=#{pa_message.id} interval_ms=#{interval_in_ms} sign_ids=#{inspect(pa_message.sign_ids)}"
)

timer_ref =
Process.send_after(
self(),
{:send_pa_message, pa_message},
interval_in_ms
)

:ets.insert(
table,
{pa_message.id, {timer_ref, pa_message}}
)
end

defp handle_inactive_pa_messages(active_pa_ids, table) do
:ets.tab2list(table)
|> Enum.each(fn {pa_id, {timer_ref, pa_message}} ->
if pa_id not in active_pa_ids do
cancel_pa_timer(timer_ref, pa_id)
delete_pa_message(pa_message, table)
end
end)
end

defp cancel_pa_timer(timer_ref, pa_id) do
Logger.info("pa_message: action=timer_canceled id=#{pa_id}")
Process.cancel_timer(timer_ref)
end

defp delete_pa_message(pa_message, table) do
Logger.info("pa_message: action=message_deleted id=#{pa_message.id}")

Enum.each(pa_message.sign_ids, fn sign_id ->
send(
String.to_existing_atom("Signs/#{sign_id}"),
{:delete_pa_message, pa_message.id}
)
end)

:ets.delete(table, pa_message.id)
end

defp get_pa_message_timer(pa_id, table) do
case :ets.lookup(table, pa_id) do
[{^pa_id, timer}] -> timer
_ -> nil
end
end

defp get_active_pa_messages() do
active_pa_messages_url =
Application.get_env(:realtime_signs, :screenplay_url) <>
Application.get_env(:realtime_signs, :active_pa_messages_path)

http_client = Application.get_env(:realtime_signs, :http_client)

with {:ok, response} <-
http_client.get(
active_pa_messages_url,
[
{"x-api-key", Application.get_env(:realtime_signs, :screenplay_api_key)}
],
timeout: 2000,
recv_timeout: 2000
),
%{status_code: 200, body: body} <- response,
{:ok, pa_messages} <- Jason.decode(body) do
{:ok, pa_messages}
else
error ->
{:error, error}
end
end

defp schedule_update(pid) do
Process.send_after(pid, :update, 1_000)
end
end
131 changes: 131 additions & 0 deletions lib/fake/httpoison.ex
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,137 @@ defmodule Fake.HTTPoison do
}}
end

def mock_response("https://screenplay-fake.mbtace.com/api/pa-messages/active") do
response = [
%{
"alert_id" => nil,
"audio_text" =>
"This is an example of a PA message that will be played at an MBTA station",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2033-12-05T23:53:23Z",
"id" => 4,
"inserted_at" => "2024-06-03T18:33:31Z",
"interval_in_minutes" => 1,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => ["wellington_northbound", "wellington_mezzanine", "wellington_southbound"],
"start_time" => "2024-06-03T18:33:23Z",
"updated_at" => "2024-06-03T18:33:31Z",
"visual_text" =>
"This is an example of a PA message that will be played at an MBTA station"
},
%{
"alert_id" => nil,
"audio_text" => "This is another PA message that will play at MBTA stations",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2027-08-05T05:41:10Z",
"id" => 5,
"inserted_at" => "2024-06-03T19:54:40Z",
"interval_in_minutes" => 2,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => [
"wellington_northbound",
"wellington_southbound",
"wellington_mezzanine",
"sullivan_mezzanine",
"sullivan_northbound",
"sullivan_southbound"
],
"start_time" => "2024-06-03T19:54:30Z",
"updated_at" => "2024-06-03T19:54:40Z",
"visual_text" => "This is another PA message that will play at MBTA stations"
}
]

{:ok, %HTTPoison.Response{status_code: 200, body: Jason.encode!(response)}}
end

def mock_response("https://screenplay-fake.mbtace.com/api/pa-messages/no-longer-active") do
response = [
%{
"alert_id" => nil,
"audio_text" => "This is another PA message that will play at MBTA stations",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2027-08-05T05:41:10Z",
"id" => 5,
"inserted_at" => "2024-06-03T19:54:40Z",
"interval_in_minutes" => 2,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => [
"wellington_northbound",
"wellington_southbound",
"wellington_mezzanine",
"sullivan_mezzanine",
"sullivan_northbound",
"sullivan_southbound"
],
"start_time" => "2024-06-03T19:54:30Z",
"updated_at" => "2024-06-03T19:54:40Z",
"visual_text" => "This is another PA message that will play at MBTA stations"
}
]

{:ok, %HTTPoison.Response{status_code: 200, body: Jason.encode!(response)}}
end

def mock_response("https://screenplay-fake.mbtace.com/api/pa-messages/changed-interval") do
response = [
%{
"alert_id" => nil,
"audio_text" =>
"This is an example of a PA message that will be played at an MBTA station",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2033-12-05T23:53:23Z",
"id" => 4,
"inserted_at" => "2024-06-03T18:33:31Z",
"interval_in_minutes" => 2,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => ["wellington_northbound", "wellington_mezzanine", "wellington_southbound"],
"start_time" => "2024-06-03T18:33:23Z",
"updated_at" => "2024-06-03T18:33:31Z",
"visual_text" =>
"This is an example of a PA message that will be played at an MBTA station"
},
%{
"alert_id" => nil,
"audio_text" => "This is another PA message that will play at MBTA stations",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2027-08-05T05:41:10Z",
"id" => 5,
"inserted_at" => "2024-06-03T19:54:40Z",
"interval_in_minutes" => 2,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => [
"wellington_northbound",
"wellington_southbound",
"wellington_mezzanine",
"sullivan_mezzanine",
"sullivan_northbound",
"sullivan_southbound"
],
"start_time" => "2024-06-03T19:54:30Z",
"updated_at" => "2024-06-03T19:54:40Z",
"visual_text" => "This is another PA message that will play at MBTA stations"
}
]

{:ok, %HTTPoison.Response{status_code: 200, body: Jason.encode!(response)}}
end

def mock_response("fake_trip_update2.json") do
feed_message =
%{
Expand Down
27 changes: 27 additions & 0 deletions lib/pa_messages/pa_message.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
defmodule PaMessages.PaMessage do
defstruct id: nil,
visual_text: nil,
audio_text: nil,
priority: nil,
sign_ids: [],
interval_in_ms: nil

@type t :: %__MODULE__{
id: integer(),
visual_text: String.t(),
audio_text: String.t(),
priority: integer(),
sign_ids: [String.t()],
interval_in_ms: non_neg_integer()
}

defimpl Content.Audio do
def to_params(%PaMessages.PaMessage{visual_text: visual_text}) do
{:ad_hoc, {visual_text, :audio_visual}}
end

def to_tts(%PaMessages.PaMessage{visual_text: visual_text, audio_text: audio_text}) do
{audio_text, PaEss.Utilities.paginate_text(visual_text)}
end
end
end
1 change: 1 addition & 0 deletions lib/realtime_signs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ defmodule RealtimeSigns do
Engine.BusPredictions,
Engine.ChelseaBridge,
Engine.Routes,
Engine.PaMessages,
MessageQueue,
RealtimeSigns.Scheduler,
RealtimeSignsWeb.Endpoint,
Expand Down
Loading

0 comments on commit 47c2b62

Please sign in to comment.