Skip to content

Commit

Permalink
add departure logging for headway analysis (#675)
Browse files Browse the repository at this point in the history
  • Loading branch information
panentheos authored Aug 28, 2023
1 parent ed5b056 commit fb69602
Show file tree
Hide file tree
Showing 18 changed files with 264 additions and 851 deletions.
187 changes: 0 additions & 187 deletions lib/engine/departures.ex

This file was deleted.

96 changes: 55 additions & 41 deletions lib/engine/locations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ defmodule Engine.Locations do

@type state :: %{
last_modified_vehicle_positions: String.t() | nil,
vehicle_locations_table: :ets.tab()
vehicle_locations_table: :ets.tab(),
stop_locations_table: :ets.tab()
}

@vehicle_locations_table :vehicle_locations
@stop_locations_table :stop_locations

def start_link([]) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
Expand All @@ -28,51 +30,52 @@ defmodule Engine.Locations do
end
end

@impl true
def for_stop(stop_id) do
case :ets.lookup(@stop_locations_table, stop_id) do
[{^stop_id, locations}] -> locations
_ -> []
end
end

@impl true
def init(_) do
schedule_update(self())

@vehicle_locations_table =
:ets.new(
@vehicle_locations_table,
[:set, :protected, :named_table, read_concurrency: true]
)
state = %{
last_modified_vehicle_positions: nil,
vehicle_locations_table: @vehicle_locations_table,
stop_locations_table: @stop_locations_table
}

{:ok,
%{last_modified_vehicle_positions: nil, vehicle_locations_table: @vehicle_locations_table}}
create_tables(state)
{:ok, state}
end

def create_tables(state) do
:ets.new(state.vehicle_locations_table, [:named_table, read_concurrency: true])
:ets.new(state.stop_locations_table, [:named_table, read_concurrency: true])
end

@impl true
def handle_info(:update, state) do
schedule_update(self())

last_modified_vehicle_locations =
download_and_process_vehicle_locations(
state.last_modified_vehicle_positions,
state.vehicle_locations_table
)

{:noreply, %{state | last_modified_vehicle_positions: last_modified_vehicle_locations}}
end

@spec download_and_process_vehicle_locations(String.t() | nil, :ets.tab()) ::
String.t()
defp download_and_process_vehicle_locations(last_modified, ets_table) do
full_url = Application.get_env(:realtime_signs, :vehicle_positions_url)

case download_data(full_url, last_modified) do
{:ok, body, new_last_modified} ->
existing_vehicles =
:ets.tab2list(ets_table) |> Enum.map(&{elem(&1, 0), :none}) |> Map.new()

all_vehicles = Map.merge(existing_vehicles, map_locations_data(body))
:ets.insert(ets_table, Enum.into(all_vehicles, []))
last_modified_vehicle_locations =
case download_data(full_url, state.last_modified_vehicle_positions) do
{:ok, body, new_last_modified} ->
{locations_by_vehicle, locations_by_stop} = map_locations_data(body)
write_ets(state.vehicle_locations_table, locations_by_vehicle, :none)
write_ets(state.stop_locations_table, locations_by_stop, [])
new_last_modified

new_last_modified
:error ->
state.last_modified_vehicle_positions
end

:error ->
last_modified
end
{:noreply, %{state | last_modified_vehicle_positions: last_modified_vehicle_locations}}
end

@spec download_data(String.t(), String.t() | nil) ::
Expand Down Expand Up @@ -106,24 +109,26 @@ defmodule Engine.Locations do
end
end

@spec map_locations_data(String.t()) :: %{String.t() => String.t()}
@spec map_locations_data(String.t()) ::
{%{String.t() => Locations.Location.t()}, %{String.t() => Locations.Location.t()}}
defp map_locations_data(response) do
try do
response
|> Jason.decode!()
|> Map.get("entity")
|> Enum.reject(&(&1["vehicle"]["trip"]["schedule_relationship"] == "CANCELED"))
|> Enum.map(&location_from_update/1)
|> Map.new(fn location ->
{location.vehicle_id, location}
end)
locations =
response
|> Jason.decode!()
|> Map.get("entity")
|> Enum.reject(&(&1["vehicle"]["trip"]["schedule_relationship"] == "CANCELED"))
|> Enum.map(&location_from_update/1)

{Map.new(locations, fn location -> {location.vehicle_id, location} end),
Enum.group_by(locations, & &1.stop_id)}
rescue
e in Jason.DecodeError ->
Logger.error(
"Engine.Locations json_decode_error: #{inspect(Jason.DecodeError.message(e))}"
)

%{}
{%{}, %{}}
end
end

Expand Down Expand Up @@ -151,4 +156,13 @@ defmodule Engine.Locations do
defp schedule_update(pid) do
Process.send_after(pid, :update, 1_000)
end

defp write_ets(table, values, empty_value) do
:ets.tab2list(table)
|> Enum.map(&{elem(&1, 0), empty_value})
|> Map.new()
|> Map.merge(values)
|> Map.to_list()
|> then(&:ets.insert(table, &1))
end
end
1 change: 1 addition & 0 deletions lib/engine/locations_api.ex
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
defmodule Engine.LocationsAPI do
@callback for_vehicle(String.t()) :: Locations.Location.t()
@callback for_stop(String.t()) :: [Locations.Location.t()]
end
23 changes: 20 additions & 3 deletions lib/engine/predictions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ defmodule Engine.Predictions do

@type state :: %{
last_modified_trip_updates: String.t() | nil,
trip_updates_table: :ets.tab()
trip_updates_table: :ets.tab(),
revenue_vehicles: MapSet.t()
}

@trip_updates_table :trip_updates
Expand All @@ -34,6 +35,11 @@ defmodule Engine.Predictions do
end
end

@impl true
def revenue_vehicles() do
GenServer.call(__MODULE__, :revenue_vehicles)
end

@impl true
def init(_) do
schedule_update(self())
Expand All @@ -44,10 +50,16 @@ defmodule Engine.Predictions do
{:ok,
%{
last_modified_trip_updates: nil,
trip_updates_table: @trip_updates_table
trip_updates_table: @trip_updates_table,
revenue_vehicles: MapSet.new()
}}
end

@impl true
def handle_call(:revenue_vehicles, _from, state) do
{:reply, state.revenue_vehicles, state}
end

@impl true
def handle_info(:update, state) do
schedule_update(self())
Expand All @@ -62,7 +74,12 @@ defmodule Engine.Predictions do
)

if vehicles_running_revenue_trips != nil do
{:noreply, %{state | last_modified_trip_updates: last_modified_trip_updates}}
{:noreply,
%{
state
| last_modified_trip_updates: last_modified_trip_updates,
revenue_vehicles: vehicles_running_revenue_trips
}}
else
{:noreply, state}
end
Expand Down
1 change: 1 addition & 0 deletions lib/engine/predictions_api.ex
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
defmodule Engine.PredictionsAPI do
@callback for_stop(String.t(), 0 | 1) :: [Predictions.Prediction.t()]
@callback revenue_vehicles() :: MapSet.t()
end
Loading

0 comments on commit fb69602

Please sign in to comment.