Skip to content

Commit

Permalink
Trigger PA Messages from RTS (#764)
Browse files Browse the repository at this point in the history
* Trigger PA Messages from RTS

* Make logs errors and remove something used for testing

* Address PR feedback

* Put active messages path only in application env

* fix config

* Remove extraneous config

* fix tests

* Trigger PA Messages from RTS

* Make logs errors and remove something used for testing

* Address PR feedback

* Put active messages path only in application env

* fix config

* Remove extraneous config

* fix tests

* try adding stubs

* add TTS expects

* Address PR feedback
  • Loading branch information
PaulJKim authored Jul 1, 2024
1 parent e8b4ce7 commit 92ab430
Show file tree
Hide file tree
Showing 13 changed files with 490 additions and 11 deletions.
3 changes: 2 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ config :realtime_signs,
aws_client: ExAws,
s3_client: ExAws.S3,
number_of_http_updaters: 4,
restart_fn: &Engine.Health.restart_noop/0
restart_fn: &Engine.Health.restart_noop/0,
active_pa_messages_path: "/api/pa-messages/active"

config :realtime_signs, RealtimeSignsWeb.Endpoint,
secret_key_base: "local_secret_key_base_at_least_64_bytes_________________________________"
Expand Down
4 changes: 3 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ if config_env() != :test do
message_log_s3_folder: System.get_env("MESSAGE_LOG_S3_FOLDER"),
message_log_report_s3_folder: System.get_env("MESSAGE_LOG_REPORT_S3_FOLDER"),
monitoring_api_key: System.get_env("MONITORING_API_KEY"),
active_headend_path: System.get_env("ACTIVE_HEADEND_S3_PATH")
active_headend_path: System.get_env("ACTIVE_HEADEND_S3_PATH"),
screenplay_base_url: System.get_env("SCREENPLAY_BASE_URL"),
screenplay_api_key: System.get_env("SCREENPLAY_API_KEY")
end

if config_env() == :dev do
Expand Down
3 changes: 2 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ config :realtime_signs,
external_config_getter: Fake.ExternalConfig.Local,
sign_config_file: "priv/config.json",
aws_client: Fake.ExAws,
s3_client: Fake.ExAws
s3_client: Fake.ExAws,
screenplay_base_url: "fake-screenplay.com"
123 changes: 123 additions & 0 deletions lib/engine/pa_messages.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
defmodule Engine.PaMessages do
use GenServer
require Logger

alias PaMessages.PaMessage

@type state :: %{
pa_messages_last_sent: %{non_neg_integer() => DateTime.t()}
}

@minute_in_ms 1000 * 60

def start_link([]) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

@impl true
def init(_) do
# Add some delay to wait for sign processes to come up before sending PA messages
Process.send_after(self(), :update, 10000)
{:ok, %{pa_messages_last_sent: %{}}}
end

@impl true
def handle_info(:update, state) do
schedule_update(self())

state =
case get_active_pa_messages() do
{:ok, pa_messages} ->
recent_sends = send_pa_messages(pa_messages, state.pa_messages_last_sent)

%{state | pa_messages_last_sent: recent_sends}

{:error, %HTTPoison.Response{status_code: status_code, body: body}} ->
Logger.error("pa_messages_response_error: status_code=#{status_code} body=#{body}")
state

{:error, %HTTPoison.Error{reason: reason}} ->
Logger.error("pa_messages_response_error: reason=#{reason}")
state

{:error, error} ->
Logger.error("pa_messages_response_error: error=#{inspect(error)}")
state
end

{:noreply, state}
end

defp send_pa_messages(pa_messages, pa_messages_last_sent) do
for %{
"id" => pa_id,
"visual_text" => visual_text,
"audio_text" => audio_text,
"interval_in_minutes" => interval_in_minutes,
"priority" => priority,
"sign_ids" => sign_ids
} <- pa_messages,
into: %{} do
active_pa_message = %PaMessage{
id: pa_id,
visual_text: visual_text,
audio_text: audio_text,
priority: priority,
sign_ids: sign_ids,
interval_in_ms: interval_in_minutes * @minute_in_ms
}

{_, last_sent_time} = Map.get(pa_messages_last_sent, pa_id, {nil, nil})

time_since_last_send =
if last_sent_time,
do: DateTime.diff(DateTime.utc_now(), last_sent_time, :millisecond),
else: active_pa_message.interval_in_ms

if time_since_last_send >= active_pa_message.interval_in_ms do
send_pa_message(active_pa_message)
{pa_id, {active_pa_message, DateTime.utc_now()}}
else
{pa_id, {active_pa_message, last_sent_time}}
end
end
end

defp send_pa_message(pa_message) do
Enum.each(pa_message.sign_ids, fn sign_id ->
send(
String.to_existing_atom("Signs/#{sign_id}"),
{:play_pa_message, pa_message}
)
end)
end

defp get_active_pa_messages() do
active_pa_messages_url =
Application.get_env(:realtime_signs, :screenplay_base_url) <>
Application.get_env(:realtime_signs, :active_pa_messages_path)

http_client = Application.get_env(:realtime_signs, :http_client)

with {:ok, response} <-
http_client.get(
active_pa_messages_url,
[
{"x-api-key", Application.get_env(:realtime_signs, :screenplay_api_key)}
],
timeout: 2000,
recv_timeout: 2000
),
%{status_code: 200, body: body} <- response,
{:ok, pa_messages} <- Jason.decode(body) do
{:ok, pa_messages}
else
error ->
{:error, error}
end
end

defp schedule_update(pid) do
Process.send_after(pid, :update, 1_000)
end
end
110 changes: 110 additions & 0 deletions lib/fake/httpoison.ex
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,116 @@ defmodule Fake.HTTPoison do
}}
end

def mock_response("https://screenplay-fake.mbtace.com/api/pa-messages/active") do
response = [
%{
"alert_id" => nil,
"audio_text" =>
"This is an example of a PA message that will be played at an MBTA station",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2033-12-05T23:53:23Z",
"id" => 4,
"inserted_at" => "2024-06-03T18:33:31Z",
"interval_in_minutes" => 2,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => [],
"start_time" => "2024-06-03T18:33:23Z",
"updated_at" => "2024-06-03T18:33:31Z",
"visual_text" =>
"This is an example of a PA message that will be played at an MBTA station"
},
%{
"alert_id" => nil,
"audio_text" => "This is another PA message that will play at MBTA stations",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2027-08-05T05:41:10Z",
"id" => 5,
"inserted_at" => "2024-06-03T19:54:40Z",
"interval_in_minutes" => 3,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => [],
"start_time" => "2024-06-03T19:54:30Z",
"updated_at" => "2024-06-03T19:54:40Z",
"visual_text" => "This is another PA message that will play at MBTA stations"
}
]

{:ok, %HTTPoison.Response{status_code: 200, body: Jason.encode!(response)}}
end

def mock_response("https://screenplay-fake.mbtace.com/api/pa-messages/no-longer-active") do
response = [
%{
"alert_id" => nil,
"audio_text" => "This is another PA message that will play at MBTA stations",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2027-08-05T05:41:10Z",
"id" => 5,
"inserted_at" => "2024-06-03T19:54:40Z",
"interval_in_minutes" => 2,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => [],
"start_time" => "2024-06-03T19:54:30Z",
"updated_at" => "2024-06-03T19:54:40Z",
"visual_text" => "This is another PA message that will play at MBTA stations"
}
]

{:ok, %HTTPoison.Response{status_code: 200, body: Jason.encode!(response)}}
end

def mock_response("https://screenplay-fake.mbtace.com/api/pa-messages/changed-interval") do
response = [
%{
"alert_id" => nil,
"audio_text" =>
"This is an example of a PA message that will be played at an MBTA station",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2033-12-05T23:53:23Z",
"id" => 4,
"inserted_at" => "2024-06-03T18:33:31Z",
"interval_in_minutes" => 1,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => [],
"start_time" => "2024-06-03T18:33:23Z",
"updated_at" => "2024-06-03T18:33:31Z",
"visual_text" =>
"This is an example of a PA message that will be played at an MBTA station"
},
%{
"alert_id" => nil,
"audio_text" => "This is another PA message that will play at MBTA stations",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2027-08-05T05:41:10Z",
"id" => 5,
"inserted_at" => "2024-06-03T19:54:40Z",
"interval_in_minutes" => 1,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => [],
"start_time" => "2024-06-03T19:54:30Z",
"updated_at" => "2024-06-03T19:54:40Z",
"visual_text" => "This is another PA message that will play at MBTA stations"
}
]

{:ok, %HTTPoison.Response{status_code: 200, body: Jason.encode!(response)}}
end

def mock_response("fake_trip_update2.json") do
feed_message =
%{
Expand Down
27 changes: 27 additions & 0 deletions lib/pa_messages/pa_message.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
defmodule PaMessages.PaMessage do
defstruct id: nil,
visual_text: nil,
audio_text: nil,
priority: nil,
sign_ids: [],
interval_in_ms: nil

@type t :: %__MODULE__{
id: integer(),
visual_text: String.t(),
audio_text: String.t(),
priority: integer(),
sign_ids: [String.t()],
interval_in_ms: non_neg_integer()
}

defimpl Content.Audio do
def to_params(%PaMessages.PaMessage{visual_text: visual_text}) do
{:ad_hoc, {visual_text, :audio_visual}}
end

def to_tts(%PaMessages.PaMessage{visual_text: visual_text, audio_text: audio_text}) do
{audio_text, PaEss.Utilities.paginate_text(visual_text)}
end
end
end
1 change: 1 addition & 0 deletions lib/realtime_signs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ defmodule RealtimeSigns do
Engine.BusPredictions,
Engine.ChelseaBridge,
Engine.Routes,
Engine.PaMessages,
MessageQueue,
RealtimeSigns.Scheduler,
RealtimeSignsWeb.Endpoint,
Expand Down
19 changes: 16 additions & 3 deletions lib/signs/bus.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ defmodule Signs.Bus do
:prev_bridge_status,
:current_messages,
:last_update,
:last_read_time
:last_read_time,
:pa_message_plays
]
defstruct @enforce_keys

Expand Down Expand Up @@ -63,7 +64,8 @@ defmodule Signs.Bus do
prev_bridge_status: nil | map(),
current_messages: tuple(),
last_update: nil | DateTime.t(),
last_read_time: DateTime.t()
last_read_time: DateTime.t(),
pa_message_plays: %{integer() => DateTime.t()}
}

def start_link(sign) do
Expand Down Expand Up @@ -91,7 +93,8 @@ defmodule Signs.Bus do
prev_bridge_status: nil,
current_messages: {nil, nil},
last_update: nil,
last_read_time: Timex.now()
last_read_time: Timex.now(),
pa_message_plays: %{}
}

GenServer.start_link(__MODULE__, state, name: :"Signs/#{state.id}")
Expand Down Expand Up @@ -126,6 +129,16 @@ defmodule Signs.Bus do
{messages :: [Content.Message.value()], audios :: [Content.Audio.value()],
tts_audios :: [Content.Audio.tts_value()]}

@impl true
def handle_info({:play_pa_message, pa_message}, sign) do
pa_message_plays =
Signs.Utilities.Audio.handle_pa_message_play(pa_message, sign, fn ->
send_audio([Content.Audio.to_params(pa_message)], sign)
end)

{:noreply, %{sign | pa_message_plays: pa_message_plays}}
end

@impl true
def handle_info(:run_loop, state) do
Process.send_after(self(), :run_loop, 1000)
Expand Down
18 changes: 15 additions & 3 deletions lib/signs/realtime.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ defmodule Signs.Realtime do
announced_alert: false,
prev_prediction_keys: nil,
prev_predictions: [],
uses_shuttles: true
uses_shuttles: true,
pa_message_plays: %{}
]

@type line_content :: Content.Message.t()
Expand Down Expand Up @@ -86,7 +87,8 @@ defmodule Signs.Realtime do
prev_prediction_keys: [{String.t(), 0 | 1}] | nil,
announced_alert: boolean(),
prev_predictions: [Predictions.Prediction.t()],
uses_shuttles: boolean()
uses_shuttles: boolean(),
pa_message_plays: %{integer() => DateTime.t()}
}

def start_link(%{"type" => "realtime"} = config) do
Expand Down Expand Up @@ -116,7 +118,8 @@ defmodule Signs.Realtime do
tick_read: 240 + Map.fetch!(config, "read_loop_offset"),
read_period_seconds: 240,
headway_stop_id: Map.get(config, "headway_stop_id"),
uses_shuttles: Map.get(config, "uses_shuttles", true)
uses_shuttles: Map.get(config, "uses_shuttles", true),
pa_message_plays: %{}
}

GenServer.start_link(__MODULE__, sign, name: :"Signs/#{sign.id}")
Expand All @@ -129,6 +132,15 @@ defmodule Signs.Realtime do
{:ok, sign}
end

def handle_info({:play_pa_message, pa_message}, sign) do
pa_message_plays =
Signs.Utilities.Audio.handle_pa_message_play(pa_message, sign, fn ->
Signs.Utilities.Audio.send_audio(sign, [pa_message])
end)

{:noreply, %{sign | pa_message_plays: pa_message_plays}}
end

def handle_info(:run_loop, sign) do
sign_stop_ids = SourceConfig.sign_stop_ids(sign.source_config)
sign_routes = SourceConfig.sign_routes(sign.source_config)
Expand Down
Loading

0 comments on commit 92ab430

Please sign in to comment.