Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger PA Messages from RTS #764

Merged
merged 18 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ if config_env() != :test do
message_log_s3_folder: System.get_env("MESSAGE_LOG_S3_FOLDER"),
message_log_report_s3_folder: System.get_env("MESSAGE_LOG_REPORT_S3_FOLDER"),
monitoring_api_key: System.get_env("MONITORING_API_KEY"),
active_headend_path: System.get_env("ACTIVE_HEADEND_S3_PATH")
active_headend_path: System.get_env("ACTIVE_HEADEND_S3_PATH"),
screenplay_url: System.get_env("SCREENPLAY_URL"),
screenplay_api_key: System.get_env("SCREENPLAY_API_KEY"),
active_pa_messages_path: System.get_env("ACTIVE_PA_MESSAGES_PATH")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the rationale for this being a runtime environment variable? If we just need a testing hook, then just using the application environment should be sufficient.

As a side note, it would be nice at some point to convert the Fake http module to use Mox instead, which would make testing more straightforward, but that will probably take a bit of effort.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah that's a good point. I think my rationale was just that the path could change since the screenplay api is relatively new and still being developed, but I guess it would be just as much effort to make the devops change rather than just make a code change in RTS or probably even less. I'll make a change to remove this one

end

if config_env() == :dev do
Expand Down
4 changes: 3 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ config :realtime_signs,
external_config_getter: Fake.ExternalConfig.Local,
sign_config_file: "priv/config.json",
aws_client: Fake.ExAws,
s3_client: Fake.ExAws
s3_client: Fake.ExAws,
screenplay_url: "fake-screenplay.com",
active_pa_messages_path: "active"
191 changes: 191 additions & 0 deletions lib/engine/pa_messages.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
defmodule Engine.PaMessages do
use GenServer
require Logger

alias PaMessages.PaMessage

@type state :: %{
pa_message_timers_table: :ets.tab()
}

@pa_message_timers_table :pa_message_timers
@minute_in_ms 1000 * 60

def start_link([]) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

@impl true
def init(_) do
# Add some delay to wait for sign processes to come up before sending PA messages
Process.send_after(self(), :update, 10000)
state = %{pa_message_timers_table: @pa_message_timers_table}
create_table(state)
{:ok, state}
end

def create_table(state) do
:ets.new(state.pa_message_timers_table, [:named_table, read_concurrency: true])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we gain anything from using ETS here? It looks like the only access is inside this module, so there shouldn't be any contention to worry about.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that is true. Originally I had planned that the sign processes would be making requests to access the table so started with this and left it, but we can simplify things and go without the table

end

@impl true
def handle_info(:update, state) do
schedule_update(self())

case get_active_pa_messages() do
{:ok, pa_messages} ->
schedule_pa_messages(pa_messages, state.pa_message_timers_table)
|> handle_inactive_pa_messages(state.pa_message_timers_table)

{:error, %HTTPoison.Response{status_code: status_code, body: body}} ->
Logger.error("pa_messages_response_error: status_code=#{status_code} body=#{body}")

{:error, %HTTPoison.Error{reason: reason}} ->
Logger.error("pa_messages_response_error: reason=#{reason}")

{:error, error} ->
Logger.error("pa_messages_response_error: error=#{inspect(error)}")
end

{:noreply, state}
end

@impl true
def handle_info({:send_pa_message, pa_message}, state) do
Enum.each(pa_message.sign_ids, fn sign_id ->
send(
String.to_existing_atom("Signs/#{sign_id}"),
{:play_pa_message, pa_message}
)
end)

{:noreply, state}
end

defp schedule_pa_messages(pa_messages, table) do
for %{
"id" => pa_id,
"visual_text" => visual_text,
"audio_text" => audio_text,
"interval_in_minutes" => interval_in_minutes,
"priority" => priority,
"sign_ids" => sign_ids
} <- pa_messages do
active_pa_message = %PaMessage{
id: pa_id,
visual_text: visual_text,
audio_text: audio_text,
priority: priority,
sign_ids: sign_ids,
interval_in_ms: interval_in_minutes * @minute_in_ms
}

case get_pa_message_timer(pa_id, table) do
{timer_ref, existing_pa_message}
when existing_pa_message.interval_in_ms != active_pa_message.interval_in_ms ->
case Process.read_timer(timer_ref) do
false ->
schedule_pa_message(active_pa_message, active_pa_message.interval_in_ms, table)

remaining_ms ->
ms_elapsed = existing_pa_message.interval_in_ms - remaining_ms
temp_interval = (active_pa_message.interval_in_ms - ms_elapsed) |> max(0)
cancel_pa_timer(timer_ref, pa_id)
schedule_pa_message(active_pa_message, temp_interval, table)
Copy link
Collaborator

@panentheos panentheos Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timer-based approach will certainly work, but now that I'm looking at the code, I wonder if it's a little more complex than we need. I know the original motivation was to avoid excessive polling, but now that all the logic is centralized in this engine (which I think is good), we could also just keep a map of id-to-last-playtime in state, and check all the deltas against the current time once per second (especially since we're already doing an :update loop). My hunch is that that would end up being cleaner than wrangling the timers. Thoughts?

Copy link
Collaborator Author

@PaulJKim PaulJKim Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah I think that approach works nicely and will make the "edited interval" piece of the logic a good deal simpler. And it removes the need to cancel timers and such as you hinted at. I think we also would get the "handling" of inactive messages for free as well since if they're not included in the response, we won't be checking for the last playtime anyway.

end

{timer, _} ->
if Process.read_timer(timer) == false do
schedule_pa_message(active_pa_message, active_pa_message.interval_in_ms, table)
end

nil ->
schedule_pa_message(active_pa_message, 0, table)
end

pa_id
end
end

defp schedule_pa_message(pa_message, interval_in_ms, table) do
Logger.info(
"pa_message: action=scheduled id=#{pa_message.id} interval_ms=#{interval_in_ms} sign_ids=#{inspect(pa_message.sign_ids)}"
)

timer_ref =
Process.send_after(
self(),
{:send_pa_message, pa_message},
interval_in_ms
)

:ets.insert(
table,
{pa_message.id, {timer_ref, pa_message}}
)
end

defp handle_inactive_pa_messages(active_pa_ids, table) do
:ets.tab2list(table)
|> Enum.each(fn {pa_id, {timer_ref, pa_message}} ->
if pa_id not in active_pa_ids do
cancel_pa_timer(timer_ref, pa_id)
delete_pa_message(pa_message, table)
end
end)
end

defp cancel_pa_timer(timer_ref, pa_id) do
Logger.info("pa_message: action=timer_canceled id=#{pa_id}")
Process.cancel_timer(timer_ref)
end

defp delete_pa_message(pa_message, table) do
Logger.info("pa_message: action=message_deleted id=#{pa_message.id}")

Enum.each(pa_message.sign_ids, fn sign_id ->
send(
String.to_existing_atom("Signs/#{sign_id}"),
{:delete_pa_message, pa_message.id}
)
end)

:ets.delete(table, pa_message.id)
end

defp get_pa_message_timer(pa_id, table) do
case :ets.lookup(table, pa_id) do
[{^pa_id, timer}] -> timer
_ -> nil
end
end

defp get_active_pa_messages() do
active_pa_messages_url =
Application.get_env(:realtime_signs, :screenplay_url) <>
Application.get_env(:realtime_signs, :active_pa_messages_path)

http_client = Application.get_env(:realtime_signs, :http_client)

with {:ok, response} <-
http_client.get(
active_pa_messages_url,
[
{"x-api-key", Application.get_env(:realtime_signs, :screenplay_api_key)}
],
timeout: 2000,
recv_timeout: 2000
),
%{status_code: 200, body: body} <- response,
{:ok, pa_messages} <- Jason.decode(body) do
{:ok, pa_messages}
else
error ->
{:error, error}
end
end

defp schedule_update(pid) do
Process.send_after(pid, :update, 1_000)
end
end
131 changes: 131 additions & 0 deletions lib/fake/httpoison.ex
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,137 @@ defmodule Fake.HTTPoison do
}}
end

def mock_response("https://screenplay-fake.mbtace.com/api/pa-messages/active") do
response = [
%{
"alert_id" => nil,
"audio_text" =>
"This is an example of a PA message that will be played at an MBTA station",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2033-12-05T23:53:23Z",
"id" => 4,
"inserted_at" => "2024-06-03T18:33:31Z",
"interval_in_minutes" => 1,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => ["wellington_northbound", "wellington_mezzanine", "wellington_southbound"],
"start_time" => "2024-06-03T18:33:23Z",
"updated_at" => "2024-06-03T18:33:31Z",
"visual_text" =>
"This is an example of a PA message that will be played at an MBTA station"
},
%{
"alert_id" => nil,
"audio_text" => "This is another PA message that will play at MBTA stations",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2027-08-05T05:41:10Z",
"id" => 5,
"inserted_at" => "2024-06-03T19:54:40Z",
"interval_in_minutes" => 2,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => [
"wellington_northbound",
"wellington_southbound",
"wellington_mezzanine",
"sullivan_mezzanine",
"sullivan_northbound",
"sullivan_southbound"
],
"start_time" => "2024-06-03T19:54:30Z",
"updated_at" => "2024-06-03T19:54:40Z",
"visual_text" => "This is another PA message that will play at MBTA stations"
}
]

{:ok, %HTTPoison.Response{status_code: 200, body: Jason.encode!(response)}}
end

def mock_response("https://screenplay-fake.mbtace.com/api/pa-messages/no-longer-active") do
response = [
%{
"alert_id" => nil,
"audio_text" => "This is another PA message that will play at MBTA stations",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2027-08-05T05:41:10Z",
"id" => 5,
"inserted_at" => "2024-06-03T19:54:40Z",
"interval_in_minutes" => 2,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => [
"wellington_northbound",
"wellington_southbound",
"wellington_mezzanine",
"sullivan_mezzanine",
"sullivan_northbound",
"sullivan_southbound"
],
"start_time" => "2024-06-03T19:54:30Z",
"updated_at" => "2024-06-03T19:54:40Z",
"visual_text" => "This is another PA message that will play at MBTA stations"
}
]

{:ok, %HTTPoison.Response{status_code: 200, body: Jason.encode!(response)}}
end

def mock_response("https://screenplay-fake.mbtace.com/api/pa-messages/changed-interval") do
response = [
%{
"alert_id" => nil,
"audio_text" =>
"This is an example of a PA message that will be played at an MBTA station",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2033-12-05T23:53:23Z",
"id" => 4,
"inserted_at" => "2024-06-03T18:33:31Z",
"interval_in_minutes" => 2,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => ["wellington_northbound", "wellington_mezzanine", "wellington_southbound"],
"start_time" => "2024-06-03T18:33:23Z",
"updated_at" => "2024-06-03T18:33:31Z",
"visual_text" =>
"This is an example of a PA message that will be played at an MBTA station"
},
%{
"alert_id" => nil,
"audio_text" => "This is another PA message that will play at MBTA stations",
"days_of_week" => [1, 2, 3, 4, 5, 6, 7],
"end_time" => "2027-08-05T05:41:10Z",
"id" => 5,
"inserted_at" => "2024-06-03T19:54:40Z",
"interval_in_minutes" => 2,
"message_type" => nil,
"paused" => nil,
"priority" => 1,
"saved" => nil,
"sign_ids" => [
"wellington_northbound",
"wellington_southbound",
"wellington_mezzanine",
"sullivan_mezzanine",
"sullivan_northbound",
"sullivan_southbound"
],
"start_time" => "2024-06-03T19:54:30Z",
"updated_at" => "2024-06-03T19:54:40Z",
"visual_text" => "This is another PA message that will play at MBTA stations"
}
]

{:ok, %HTTPoison.Response{status_code: 200, body: Jason.encode!(response)}}
end

def mock_response("fake_trip_update2.json") do
feed_message =
%{
Expand Down
27 changes: 27 additions & 0 deletions lib/pa_messages/pa_message.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
defmodule PaMessages.PaMessage do
defstruct id: nil,
visual_text: nil,
audio_text: nil,
priority: nil,
sign_ids: [],
interval_in_ms: nil

@type t :: %__MODULE__{
id: integer(),
visual_text: String.t(),
audio_text: String.t(),
priority: integer(),
sign_ids: [String.t()],
interval_in_ms: non_neg_integer()
}

defimpl Content.Audio do
def to_params(%PaMessages.PaMessage{visual_text: visual_text}) do
{:ad_hoc, {visual_text, :audio_visual}}
end

def to_tts(%PaMessages.PaMessage{visual_text: visual_text, audio_text: audio_text}) do
{audio_text, PaEss.Utilities.paginate_text(visual_text)}
end
end
end
1 change: 1 addition & 0 deletions lib/realtime_signs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ defmodule RealtimeSigns do
Engine.BusPredictions,
Engine.ChelseaBridge,
Engine.Routes,
Engine.PaMessages,
MessageQueue,
RealtimeSigns.Scheduler,
RealtimeSignsWeb.Endpoint,
Expand Down
Loading
Loading