Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger PA Messages from RTS #764

Merged
merged 18 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ config :realtime_signs,
aws_client: ExAws,
s3_client: ExAws.S3,
number_of_http_updaters: 4,
restart_fn: &Engine.Health.restart_noop/0
restart_fn: &Engine.Health.restart_noop/0,
active_pa_messages_path: "/api/pa-messages/active"

config :realtime_signs, RealtimeSignsWeb.Endpoint,
secret_key_base: "local_secret_key_base_at_least_64_bytes_________________________________"
Expand Down
4 changes: 3 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ if config_env() != :test do
message_log_s3_folder: System.get_env("MESSAGE_LOG_S3_FOLDER"),
message_log_report_s3_folder: System.get_env("MESSAGE_LOG_REPORT_S3_FOLDER"),
monitoring_api_key: System.get_env("MONITORING_API_KEY"),
active_headend_path: System.get_env("ACTIVE_HEADEND_S3_PATH")
active_headend_path: System.get_env("ACTIVE_HEADEND_S3_PATH"),
screenplay_base_url: System.get_env("SCREENPLAY_BASE_URL"),
screenplay_api_key: System.get_env("SCREENPLAY_API_KEY")
end

if config_env() == :dev do
Expand Down
3 changes: 2 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ config :realtime_signs,
external_config_getter: Fake.ExternalConfig.Local,
sign_config_file: "priv/config.json",
aws_client: Fake.ExAws,
s3_client: Fake.ExAws
s3_client: Fake.ExAws,
screenplay_base_url: "fake-screenplay.com"
123 changes: 123 additions & 0 deletions lib/engine/pa_messages.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
defmodule Engine.PaMessages do
use GenServer
require Logger

alias PaMessages.PaMessage

@type state :: %{
pa_messages_last_sent: %{non_neg_integer() => DateTime.t()}
}

@minute_in_ms 1000 * 60

def start_link([]) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

@impl true
def init(_) do
# Add some delay to wait for sign processes to come up before sending PA messages
Process.send_after(self(), :update, 10000)
{:ok, %{pa_messages_last_sent: %{}}}
end

@impl true
def handle_info(:update, state) do
schedule_update(self())

state =
case get_active_pa_messages() do
{:ok, pa_messages} ->
recent_sends = send_pa_messages(pa_messages, state.pa_messages_last_sent)

%{state | pa_messages_last_sent: recent_sends}

{:error, %HTTPoison.Response{status_code: status_code, body: body}} ->
Logger.error("pa_messages_response_error: status_code=#{status_code} body=#{body}")
state

{:error, %HTTPoison.Error{reason: reason}} ->
Logger.error("pa_messages_response_error: reason=#{reason}")
state

{:error, error} ->
Logger.error("pa_messages_response_error: error=#{inspect(error)}")
state
end

{:noreply, state}
end

defp send_pa_messages(pa_messages, pa_messages_last_sent) do
for %{
"id" => pa_id,
"visual_text" => visual_text,
"audio_text" => audio_text,
"interval_in_minutes" => interval_in_minutes,
"priority" => priority,
"sign_ids" => sign_ids
} <- pa_messages,
into: %{} do
active_pa_message = %PaMessage{
id: pa_id,
visual_text: visual_text,
audio_text: audio_text,
priority: priority,
sign_ids: sign_ids,
interval_in_ms: interval_in_minutes * @minute_in_ms
}

{_, last_sent_time} = Map.get(pa_messages_last_sent, pa_id, {nil, nil})

time_since_last_send =
if last_sent_time,
do: DateTime.diff(DateTime.utc_now(), last_sent_time, :millisecond),
else: active_pa_message.interval_in_ms

if time_since_last_send >= active_pa_message.interval_in_ms do
send_pa_message(active_pa_message)
{pa_id, {active_pa_message, DateTime.utc_now()}}
else
{pa_id, {active_pa_message, last_sent_time}}
end
end
end

defp send_pa_message(pa_message) do
Enum.each(pa_message.sign_ids, fn sign_id ->
send(
String.to_existing_atom("Signs/#{sign_id}"),
{:play_pa_message, pa_message}
)
end)
end

defp get_active_pa_messages() do
active_pa_messages_url =
Application.get_env(:realtime_signs, :screenplay_base_url) <>
Application.get_env(:realtime_signs, :active_pa_messages_path)

http_client = Application.get_env(:realtime_signs, :http_client)

with {:ok, response} <-
http_client.get(
active_pa_messages_url,
[
{"x-api-key", Application.get_env(:realtime_signs, :screenplay_api_key)}
],
timeout: 2000,
recv_timeout: 2000
),
%{status_code: 200, body: body} <- response,
{:ok, pa_messages} <- Jason.decode(body) do
{:ok, pa_messages}
else
error ->
{:error, error}
end
end

defp schedule_update(pid) do
Process.send_after(pid, :update, 1_000)
end
end
110 changes: 110 additions & 0 deletions lib/fake/httpoison.ex
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,116 @@ defmodule Fake.HTTPoison do
}}
end

def mock_response("https://screenplay-fake.mbtace.com/api/pa-messages/active") do
response = [
%{
"alert_id" => nil,
"audio_text" =>
"This is an example of a PA message that will be played at an MBTA station",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2033-12-05T23:53:23Z",
"id" => 4,
"inserted_at" => "2024-06-03T18:33:31Z",
"interval_in_minutes" => 2,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => [],
"start_time" => "2024-06-03T18:33:23Z",
"updated_at" => "2024-06-03T18:33:31Z",
"visual_text" =>
"This is an example of a PA message that will be played at an MBTA station"
},
%{
"alert_id" => nil,
"audio_text" => "This is another PA message that will play at MBTA stations",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2027-08-05T05:41:10Z",
"id" => 5,
"inserted_at" => "2024-06-03T19:54:40Z",
"interval_in_minutes" => 3,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => [],
"start_time" => "2024-06-03T19:54:30Z",
"updated_at" => "2024-06-03T19:54:40Z",
"visual_text" => "This is another PA message that will play at MBTA stations"
}
]

{:ok, %HTTPoison.Response{status_code: 200, body: Jason.encode!(response)}}
end

def mock_response("https://screenplay-fake.mbtace.com/api/pa-messages/no-longer-active") do
response = [
%{
"alert_id" => nil,
"audio_text" => "This is another PA message that will play at MBTA stations",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2027-08-05T05:41:10Z",
"id" => 5,
"inserted_at" => "2024-06-03T19:54:40Z",
"interval_in_minutes" => 2,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => [],
"start_time" => "2024-06-03T19:54:30Z",
"updated_at" => "2024-06-03T19:54:40Z",
"visual_text" => "This is another PA message that will play at MBTA stations"
}
]

{:ok, %HTTPoison.Response{status_code: 200, body: Jason.encode!(response)}}
end

def mock_response("https://screenplay-fake.mbtace.com/api/pa-messages/changed-interval") do
response = [
%{
"alert_id" => nil,
"audio_text" =>
"This is an example of a PA message that will be played at an MBTA station",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2033-12-05T23:53:23Z",
"id" => 4,
"inserted_at" => "2024-06-03T18:33:31Z",
"interval_in_minutes" => 1,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => [],
"start_time" => "2024-06-03T18:33:23Z",
"updated_at" => "2024-06-03T18:33:31Z",
"visual_text" =>
"This is an example of a PA message that will be played at an MBTA station"
},
%{
"alert_id" => nil,
"audio_text" => "This is another PA message that will play at MBTA stations",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2027-08-05T05:41:10Z",
"id" => 5,
"inserted_at" => "2024-06-03T19:54:40Z",
"interval_in_minutes" => 1,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => [],
"start_time" => "2024-06-03T19:54:30Z",
"updated_at" => "2024-06-03T19:54:40Z",
"visual_text" => "This is another PA message that will play at MBTA stations"
}
]

{:ok, %HTTPoison.Response{status_code: 200, body: Jason.encode!(response)}}
end

def mock_response("fake_trip_update2.json") do
feed_message =
%{
Expand Down
27 changes: 27 additions & 0 deletions lib/pa_messages/pa_message.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
defmodule PaMessages.PaMessage do
defstruct id: nil,
visual_text: nil,
audio_text: nil,
priority: nil,
sign_ids: [],
interval_in_ms: nil

@type t :: %__MODULE__{
id: integer(),
visual_text: String.t(),
audio_text: String.t(),
priority: integer(),
sign_ids: [String.t()],
interval_in_ms: non_neg_integer()
}

defimpl Content.Audio do
def to_params(%PaMessages.PaMessage{visual_text: visual_text}) do
{:ad_hoc, {visual_text, :audio_visual}}
end

def to_tts(%PaMessages.PaMessage{visual_text: visual_text, audio_text: audio_text}) do
{audio_text, PaEss.Utilities.paginate_text(visual_text)}
end
end
end
1 change: 1 addition & 0 deletions lib/realtime_signs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ defmodule RealtimeSigns do
Engine.BusPredictions,
Engine.ChelseaBridge,
Engine.Routes,
Engine.PaMessages,
MessageQueue,
RealtimeSigns.Scheduler,
RealtimeSignsWeb.Endpoint,
Expand Down
19 changes: 16 additions & 3 deletions lib/signs/bus.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ defmodule Signs.Bus do
:prev_bridge_status,
:current_messages,
:last_update,
:last_read_time
:last_read_time,
:pa_message_plays
]
defstruct @enforce_keys

Expand Down Expand Up @@ -62,7 +63,8 @@ defmodule Signs.Bus do
prev_bridge_status: nil | map(),
current_messages: tuple(),
last_update: nil | DateTime.t(),
last_read_time: DateTime.t()
last_read_time: DateTime.t(),
pa_message_plays: %{integer() => DateTime.t()}
}

def start_link(sign) do
Expand Down Expand Up @@ -90,7 +92,8 @@ defmodule Signs.Bus do
prev_bridge_status: nil,
current_messages: {nil, nil},
last_update: nil,
last_read_time: Timex.now()
last_read_time: Timex.now(),
pa_message_plays: %{}
}

GenServer.start_link(__MODULE__, state, name: :"Signs/#{state.id}")
Expand Down Expand Up @@ -121,6 +124,16 @@ defmodule Signs.Bus do
{:ok, state}
end

@impl true
def handle_info({:play_pa_message, pa_message}, sign) do
pa_message_plays =
Signs.Utilities.Audio.handle_pa_message_play(pa_message, sign, fn ->
send_audio([Content.Audio.to_params(pa_message)], sign)
end)

{:noreply, %{sign | pa_message_plays: pa_message_plays}}
end

@impl true
def handle_info(:run_loop, state) do
Process.send_after(self(), :run_loop, 1000)
Expand Down
18 changes: 15 additions & 3 deletions lib/signs/realtime.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ defmodule Signs.Realtime do
announced_alert: false,
prev_prediction_keys: nil,
prev_predictions: [],
uses_shuttles: true
uses_shuttles: true,
pa_message_plays: %{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the benefit of tracking this here. Is this so in case the engine crashes, we won't spam messages? If that's the concern, and we're willing to accept a slight behavior change, we could schedule new messages with their full duration instead of immediately, which would result in fewer messages in that failure mode, rather than excess ones.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that was the intention here, just to prevent consecutive plays due to an engine crashes/restarts. Ah so you mean that a PA Message wouldn't play immediately upon scheduling and instead wait one interval before it's first play?

I had actually sort of brought that up in the product channel last week but it was only discussed briefly. Kevin said a PA message should play as soon as it can right after it's created, but I'm still not sure if that's also what the ARINC software does currently. Seems like its still certainly up for discussion though. I'll raise the question again to the product folks and see if we would be okay with that behavior change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed a bit further in the thread and I think I lean towards avoiding scheduling a new message with the full duration. I don't think it's been fully confirmed or is explicitly a standard practice, but it seems that OIOs tend to schedule a message and look immediately for a "proof of play" and I'd rather not disrupt their workflow if we can avoid it. I'm open to other ideas for preventing spammed message plays though if you have any thoughts?

]

@type line_content :: Content.Message.t()
Expand Down Expand Up @@ -86,7 +87,8 @@ defmodule Signs.Realtime do
prev_prediction_keys: [{String.t(), 0 | 1}] | nil,
announced_alert: boolean(),
prev_predictions: [Predictions.Prediction.t()],
uses_shuttles: boolean()
uses_shuttles: boolean(),
pa_message_plays: %{integer() => DateTime.t()}
}

def start_link(%{"type" => "realtime"} = config) do
Expand Down Expand Up @@ -116,7 +118,8 @@ defmodule Signs.Realtime do
tick_read: 240 + Map.fetch!(config, "read_loop_offset"),
read_period_seconds: 240,
headway_stop_id: Map.get(config, "headway_stop_id"),
uses_shuttles: Map.get(config, "uses_shuttles", true)
uses_shuttles: Map.get(config, "uses_shuttles", true),
pa_message_plays: %{}
}

GenServer.start_link(__MODULE__, sign, name: :"Signs/#{sign.id}")
Expand All @@ -129,6 +132,15 @@ defmodule Signs.Realtime do
{:ok, sign}
end

def handle_info({:play_pa_message, pa_message}, sign) do
pa_message_plays =
Signs.Utilities.Audio.handle_pa_message_play(pa_message, sign, fn ->
Signs.Utilities.Audio.send_audio(sign, [pa_message])
end)

{:noreply, %{sign | pa_message_plays: pa_message_plays}}
end

def handle_info(:run_loop, sign) do
sign_stop_ids = SourceConfig.sign_stop_ids(sign.source_config)
sign_routes = SourceConfig.sign_routes(sign.source_config)
Expand Down
Loading
Loading