Skip to content

Commit

Permalink
Last Trip of the Day phase 1 (#746)
Browse files Browse the repository at this point in the history
* Add new engine that watches prediction for last trips

* include cleanup logic

* update engine api

* Calculate service statuses

* Add textual messaging

* Add audio

* make conditions for getting last trip messages more explicit

* add unit tests

* implement to_tts

* dialyzer

* default to tts

* Address PR feedback

* Don't need to store route_id

* jfk umass platforms should only see one last trip

* Update lib/signs/utilities/last_trip.ex

Co-authored-by: Brett Heath-Wlaz <[email protected]>

* address PR feedback

* remove IO.inspect()

* Use vehicle location as a way to track departures instead of departure time

* remove unused attribute

* fix unit test

* Omit red line and blank if one mz line is empty

---------

Co-authored-by: Brett Heath-Wlaz <[email protected]>
  • Loading branch information
PaulJKim and panentheos authored May 8, 2024
1 parent df87737 commit 27bd645
Show file tree
Hide file tree
Showing 16 changed files with 663 additions and 6 deletions.
80 changes: 80 additions & 0 deletions lib/content/audio/service_ended.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
defmodule Content.Audio.ServiceEnded do
alias PaEss.Utilities
@enforce_keys [:location]
defstruct @enforce_keys ++ [:destination]

@type location :: :platform | :station | :direction
@type t :: %__MODULE__{
destination: PaEss.destination(),
location: location()
}

def from_message(%Content.Message.LastTrip.StationClosed{}) do
[%__MODULE__{location: :station}]
end

def from_message(%Content.Message.LastTrip.PlatformClosed{destination: destination}) do
[%__MODULE__{location: :platform, destination: destination}]
end

def from_message(%Content.Message.LastTrip.NoService{destination: destination}) do
[%__MODULE__{location: :direction, destination: destination}]
end

defimpl Content.Audio do
@service_ended "882"
@station_closed "883"
@platform_closed "884"

def to_params(%Content.Audio.ServiceEnded{location: :station}) do
Utilities.take_message([@station_closed], :audio)
end

def to_params(
%Content.Audio.ServiceEnded{location: :platform, destination: destination} = audio
) do
case Utilities.destination_var(destination) do
{:ok, destination_var} ->
Utilities.take_message([@platform_closed, destination_var, @service_ended], :audio)

{:error, :unknown} ->
to_tts(audio)
end
end

def to_params(
%Content.Audio.ServiceEnded{location: :direction, destination: destination} = audio
) do
case Utilities.destination_var(destination) do
{:ok, destination_var} ->
Utilities.take_message([destination_var, @service_ended], :audio)

{:error, :unknown} ->
to_tts(audio)
end
end

def to_tts(%Content.Audio.ServiceEnded{location: :station}) do
"This station is closed. Service has ended for the night."
end

def to_tts(%Content.Audio.ServiceEnded{location: :platform, destination: destination}) do
{:ok, destination_string} = Utilities.destination_to_ad_hoc_string(destination)

service_ended =
"#{destination_string} service has ended for the night."
|> String.trim_leading()
|> String.capitalize()

"This platform is closed. #{service_ended}"
end

def to_tts(%Content.Audio.ServiceEnded{location: :direction, destination: destination}) do
{:ok, destination_string} = Utilities.destination_to_ad_hoc_string(destination)

"#{destination_string} service has ended for the night."
|> String.trim_leading()
|> String.capitalize()
end
end
end
38 changes: 38 additions & 0 deletions lib/content/message/last_trip/no_service.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule Content.Message.LastTrip.NoService do
@enforce_keys [:destination, :page?]
defstruct @enforce_keys

@type t :: %__MODULE__{
destination: PaEss.destination(),
page?: boolean()
}

defimpl Content.Message do
def to_string(%Content.Message.LastTrip.NoService{
destination: destination,
page?: page?
}) do
headsign = PaEss.Utilities.destination_to_sign_string(destination)

if page?,
do: [
{Content.Utilities.width_padded_string(
headsign,
"No trains",
24
), 6},
{Content.Utilities.width_padded_string(
headsign,
"Svc ended",
24
), 6}
],
else:
Content.Utilities.width_padded_string(
headsign,
"No Svc",
18
)
end
end
end
17 changes: 17 additions & 0 deletions lib/content/message/last_trip/platform_closed.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Content.Message.LastTrip.PlatformClosed do
@moduledoc """
A message displayed when a station is closed
"""
@enforce_keys [:destination]
defstruct @enforce_keys

@type t :: %__MODULE__{
destination: PaEss.destination()
}

defimpl Content.Message do
def to_string(%Content.Message.LastTrip.PlatformClosed{}) do
"Platform closed"
end
end
end
15 changes: 15 additions & 0 deletions lib/content/message/last_trip/service_ended.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule Content.Message.LastTrip.ServiceEnded do
@moduledoc """
A message displayed when a station is closed
"""
@enforce_keys []
defstruct @enforce_keys

@type t :: %__MODULE__{}

defimpl Content.Message do
def to_string(%Content.Message.LastTrip.ServiceEnded{}) do
"Service ended for night"
end
end
end
15 changes: 15 additions & 0 deletions lib/content/message/last_trip/station_closed.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule Content.Message.LastTrip.StationClosed do
@moduledoc """
A message displayed when a station is closed
"""
@enforce_keys []
defstruct @enforce_keys

@type t :: %__MODULE__{}

defimpl Content.Message do
def to_string(%Content.Message.LastTrip.StationClosed{}) do
"Station closed"
end
end
end
127 changes: 127 additions & 0 deletions lib/engine/last_trip.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
defmodule Engine.LastTrip do
@behaviour Engine.LastTripAPI
use GenServer
require Logger

@recent_departures_table :recent_departures
@last_trips_table :last_trips
@hour_in_seconds 3600

@type state :: %{
recent_departures: :ets.tab(),
last_trips: :ets.tab()
}

def start_link([]) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

@impl true
def get_recent_departures(recent_departures_table \\ @recent_departures_table, stop_id) do
case :ets.lookup(recent_departures_table, stop_id) do
[{^stop_id, departures}] -> departures
_ -> []
end
end

@impl true
def is_last_trip?(last_trips_table \\ @last_trips_table, trip_id) do
case :ets.lookup(last_trips_table, trip_id) do
[{^trip_id, _timestamp}] -> true
_ -> false
end
end

def update_last_trips(last_trips) do
GenServer.cast(__MODULE__, {:update_last_trips, last_trips})
end

def update_recent_departures(new_recent_departures) do
GenServer.cast(__MODULE__, {:update_recent_departures, new_recent_departures})
end

@impl true
def init(_) do
schedule_clean(self())

state = %{
recent_departures: @recent_departures_table,
last_trips: @last_trips_table
}

create_tables(state)
{:ok, state}
end

def create_tables(state) do
:ets.new(state.recent_departures, [:named_table, read_concurrency: true])
:ets.new(state.last_trips, [:named_table, read_concurrency: true])
end

@impl true
def handle_cast({:update_last_trips, last_trips}, %{last_trips: last_trips_table} = state) do
current_time = Timex.now()

last_trips = Enum.map(last_trips, fn trip_id -> {trip_id, current_time} end)

:ets.insert(last_trips_table, last_trips)

{:noreply, state}
end

@impl true
def handle_cast(
{:update_recent_departures, new_recent_departures},
%{recent_departures: recent_departures_table} = state
) do
current_recent_departures = :ets.tab2list(recent_departures_table) |> Map.new()

Enum.reduce(new_recent_departures, current_recent_departures, fn {stop_id, trip_id,
departure_time},
acc ->
Map.update(acc, stop_id, %{trip_id => departure_time}, fn recent_departures ->
Map.put(recent_departures, trip_id, departure_time)
end)
end)
|> Map.to_list()
|> then(&:ets.insert(recent_departures_table, &1))

{:noreply, state}
end

@impl true
def handle_info(:clean_old_data, state) do
schedule_clean(self())
clean_last_trips(state)
clean_recent_departures(state)

{:noreply, state}
end

defp clean_last_trips(state) do
:ets.tab2list(state.last_trips)
|> Enum.each(fn {trip_id, timestamp} ->
if Timex.diff(Timex.now(), timestamp, :seconds) > @hour_in_seconds * 2 do
:ets.delete(state.last_trips, trip_id)
end
end)
end

defp clean_recent_departures(state) do
current_time = Timex.now()

:ets.tab2list(state.recent_departures)
|> Enum.each(fn {key, departures} ->
departures_within_last_hour =
Map.filter(departures, fn {_, departed_time} ->
DateTime.to_unix(current_time) - DateTime.to_unix(departed_time) <= @hour_in_seconds
end)

:ets.insert(state.recent_departures, {key, departures_within_last_hour})
end)
end

defp schedule_clean(pid) do
Process.send_after(pid, :clean_old_data, 1_000)
end
end
4 changes: 4 additions & 0 deletions lib/engine/last_trip_api.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
defmodule Engine.LastTripAPI do
@callback get_recent_departures(String.t()) :: map()
@callback is_last_trip?(String.t()) :: boolean()
end
11 changes: 9 additions & 2 deletions lib/engine/predictions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,16 @@ defmodule Engine.Predictions do
recv_timeout: 2000
) do
{:ok, %HTTPoison.Response{body: body, status_code: 200, headers: headers}} ->
parsed_json = Predictions.Predictions.parse_json_response(body)

{new_predictions, vehicles_running_revenue_trips} =
Predictions.Predictions.parse_json_response(body)
|> Predictions.Predictions.get_all(current_time)
Predictions.Predictions.get_all(parsed_json, current_time)

Predictions.LastTrip.get_last_trips(parsed_json)
|> Engine.LastTrip.update_last_trips()

Predictions.LastTrip.get_recent_departures(parsed_json)
|> Engine.LastTrip.update_recent_departures()

:ets.tab2list(state.trip_updates_table)
|> Enum.map(&{elem(&1, 0), []})
Expand Down
31 changes: 31 additions & 0 deletions lib/predictions/last_trip.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule Predictions.LastTrip do
defp get_running_trips(predictions_feed) do
predictions_feed["entity"]
|> Stream.map(& &1["trip_update"])
|> Enum.reject(&(&1["trip"]["schedule_relationship"] == "CANCELED"))
end

def get_last_trips(predictions_feed) do
get_running_trips(predictions_feed)
|> Stream.filter(&(&1["trip"]["last_trip"] == true))
|> Enum.map(& &1["trip"]["trip_id"])
end

def get_recent_departures(predictions_feed) do
predictions_by_trip =
get_running_trips(predictions_feed)
|> Enum.map(&{&1["trip"]["trip_id"], &1["stop_time_update"], &1["vehicle"]["id"]})

for {trip_id, predictions, vehicle_id} <- predictions_by_trip,
prediction <- predictions do
vehicle_location = Engine.Locations.for_vehicle(vehicle_id)

if vehicle_location &&
(vehicle_location.stop_id == prediction["stop_id"] and
vehicle_location.status == :stopped_at) do
{prediction["stop_id"], trip_id, Timex.now()}
end
end
|> Enum.reject(&is_nil/1)
end
end
1 change: 1 addition & 0 deletions lib/realtime_signs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ defmodule RealtimeSigns do
Engine.BusPredictions,
Engine.ChelseaBridge,
Engine.Routes,
Engine.LastTrip,
MessageQueue,
RealtimeSigns.Scheduler,
RealtimeSignsWeb.Endpoint,
Expand Down
Loading

0 comments on commit 27bd645

Please sign in to comment.