Skip to content

Commit

Permalink
perf(Predictions.PubSub): Read stops + routes from global cache inste…
Browse files Browse the repository at this point in the history
…ad of API (#206)

* perf(Predictions.PubSub): get relevant stops from GlobalDataCache

* perf(Predictions.PubSub): read routes for stop from global cached data

* perf: increase broadcast interval to 1s

* refactor(Predictions.PubSub): remove unnecessary error case

* Revert "perf: increase broadcast interval to 1s"

This reverts commit f42ef4d.

* feat: increase broadcast interval to 5s
  • Loading branch information
KaylaBrady authored Oct 2, 2024
1 parent 8be0b7a commit 02d94a0
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 62 deletions.
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ config :mobile_app_backend,
config :mobile_app_backend, MobileAppBackend.AppCheck,
jwks_url: "https://firebaseappcheck.googleapis.com/v1/jwks"

config :mobile_app_backend, predictions_broadcast_interval_ms: 10_000
config :mobile_app_backend, predictions_broadcast_interval_ms: 5_000

config :mobile_app_backend, MBTAV3API.ResponseCache,
gc_interval: :timer.hours(1),
Expand Down
22 changes: 22 additions & 0 deletions lib/mbta_v3_api/stop.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
defmodule MBTAV3API.Stop do
alias MBTAV3API.JsonApi.Object
use MBTAV3API.JsonApi.Object
require Util

Expand Down Expand Up @@ -89,6 +90,27 @@ defmodule MBTAV3API.Stop do
stop
end

@spec stop_id_to_children(Object.stop_map(), [id()]) :: %{id() => [id()]}
@doc """
Build a map containing the given stop_ids to their corresponding child stop ids.
Excludes child stops that don't have `location_type: :stop`
"""
def stop_id_to_children(all_stops_by_id, target_stop_ids) do
Map.take(all_stops_by_id, target_stop_ids)
|> Map.new(fn {stop_id, stop} ->
{stop_id,
Enum.filter(
List.wrap(stop.child_stop_ids),
fn child_stop_id ->
case Map.get(all_stops_by_id, child_stop_id) do
nil -> false
%{location_type: location_type} -> location_type == :stop
end
end
)}
end)
end

@impl JsonApi.Object
def serialize_filter_value(:route_type, route_type) do
MBTAV3API.Route.serialize_type(route_type)
Expand Down
43 changes: 42 additions & 1 deletion lib/mobile_app_backend/global_data_cache.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
defmodule MobileAppBackend.GlobalDataCache do
use GenServer
alias MBTAV3API.Route
alias MBTAV3API.Stop
alias MBTAV3API.{JsonApi, Repository}
alias MBTAV3API.JsonApi.Object

Expand Down Expand Up @@ -27,6 +29,11 @@ defmodule MobileAppBackend.GlobalDataCache do
@callback default_key :: key()
@callback get_data(key()) :: data()

@doc """
Get the id of all routes that serve the given stops
"""
@callback route_ids_for_stops([Stop.id()], key()) :: :error | [Route.id()]

def start_link(opts \\ []) do
Application.get_env(
:mobile_app_backend,
Expand Down Expand Up @@ -66,6 +73,14 @@ defmodule MobileAppBackend.GlobalDataCache do
MobileAppBackend.GlobalDataCache.Impl
).get_data(key)
end

def route_ids_for_stops(stop_ids, key \\ default_key()) do
Application.get_env(
:mobile_app_backend,
MobileAppBackend.GlobalDataCache.Module,
MobileAppBackend.GlobalDataCache.Impl
).route_ids_for_stops(stop_ids, key)
end
end

defmodule MobileAppBackend.GlobalDataCache.Impl do
Expand All @@ -90,9 +105,36 @@ defmodule MobileAppBackend.GlobalDataCache.Impl do
:persistent_term.get(key, nil) || update_data(key)
end

@impl true
def route_ids_for_stops(stop_ids, key \\ default_key()) do
data = :persistent_term.get(key, nil) || update_data(key)

if is_nil(data) do
:error
else
%{pattern_ids_by_stop: pattern_ids_by_stop, route_patterns: route_patterns} =
data

route_pattern_ids =
pattern_ids_by_stop
|> Map.take(stop_ids)
|> Enum.flat_map(fn {_stop_id, pattern_ids} -> pattern_ids end)
|> Enum.uniq()

routes =
route_patterns
|> Map.take(route_pattern_ids)
|> Enum.map(fn {_route_pattern_id, pattern} -> pattern.route_id end)
|> Enum.uniq()

routes
end
end

@impl GenServer
def init(opts \\ []) do
opts = Keyword.merge(Application.get_env(:mobile_app_backend, GlobalDataCache), opts)

first_update_ms = opts[:first_update_ms] || :timer.seconds(1)

state = %State{
Expand All @@ -108,7 +150,6 @@ defmodule MobileAppBackend.GlobalDataCache.Impl do
@impl GenServer
def handle_info(:recalculate, %State{update_ms: update_ms} = state) do
update_data(state.key)

Process.send_after(self(), :recalculate, update_ms)

{:noreply, state}
Expand Down
35 changes: 15 additions & 20 deletions lib/mobile_app_backend/predictions/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ defmodule MobileAppBackend.Predictions.PubSub do
"""
use GenServer
alias MBTAV3API.{Prediction, Stop, Store, Stream, Trip, Vehicle}
alias MobileAppBackend.GlobalDataCache
alias MobileAppBackend.Predictions.PubSub

@behaviour PubSub.Behaviour
Expand All @@ -59,6 +60,7 @@ defmodule MobileAppBackend.Predictions.PubSub do
@type state :: %{last_dispatched_table_name: atom()}

@spec start_link(Keyword.t()) :: GenServer.on_start()
@spec start_link() :: :ignore | {:error, any()} | {:ok, pid()}
def start_link(opts \\ []) do
name = Keyword.get(opts, :name, __MODULE__)

Expand All @@ -71,40 +73,33 @@ defmodule MobileAppBackend.Predictions.PubSub do

@impl true
def subscribe_for_stops(stop_ids) do
{:ok, %{data: _stop, included: %{stops: child_stops}}} =
MBTAV3API.Repository.stops(filter: [id: stop_ids], include: :child_stops)
%{stops: all_stops_by_id} = GlobalDataCache.get_data()
stop_id_to_children = Stop.stop_id_to_children(all_stops_by_id, stop_ids)

child_stops =
child_stops
child_stop_ids =
stop_id_to_children
|> Map.values()
|> Enum.filter(&(&1.location_type == :stop))
|> List.flatten()

child_stop_ids = Enum.map(child_stops, & &1.id)

child_ids_by_parent_id =
child_stops
|> Enum.map(&{&1.parent_station_id, &1.id})
|> Enum.group_by(&elem(&1, 0), &elem(&1, 1))

{time_micros, _result} =
{time_micros, :ok} =
:timer.tc(MobileAppBackend.Predictions.StreamSubscriber, :subscribe_for_stops, [
stop_ids ++ child_stop_ids
])

Logger.info(
"#{__MODULE__} subscribe_for_stops stop_id=#{inspect(stop_ids)} duration=#{time_micros / 1000}"
"#{__MODULE__} subscribe_for_stops stop_id=#{inspect(stop_ids)} duration=#{time_micros / 1000} "
)

all_predictions_data =
stop_ids
|> register_all_stops(child_ids_by_parent_id)
|> register_all_stops(stop_id_to_children)
|> Store.Predictions.fetch_with_associations()

predictions_by_stop =
group_predictions_for_stop(
all_predictions_data.predictions,
stop_ids,
child_ids_by_parent_id
stop_id_to_children
)

%{
Expand All @@ -129,8 +124,8 @@ defmodule MobileAppBackend.Predictions.PubSub do
|> Enum.group_by(& &1.stop_id)

Map.new(stop_ids, fn stop_id ->
case Map.get(child_ids_by_parent_id, stop_id) do
nil ->
case Map.get(child_ids_by_parent_id, stop_id, []) do
[] ->
{stop_id,
prediction_list_by_stop
|> Map.get(stop_id, [])
Expand All @@ -151,8 +146,8 @@ defmodule MobileAppBackend.Predictions.PubSub do
defp register_all_stops(stop_ids, child_ids_by_parent_id) do
stop_ids
|> Enum.flat_map(fn stop_id ->
case Map.get(child_ids_by_parent_id, stop_id) do
nil ->
case Map.get(child_ids_by_parent_id, stop_id, []) do
[] ->
[register_single_stop(stop_id)]

child_ids ->
Expand Down
36 changes: 22 additions & 14 deletions lib/mobile_app_backend/predictions/stream_subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defmodule MobileAppBackend.Predictions.StreamSubscriber do
Ensure prediction streams have been started for every route served by the given stops
and the stream of all vehicles has been started.
"""
@callback subscribe_for_stops([Stop.id()]) :: :ok
@callback subscribe_for_stops([Stop.id()]) :: :ok | :error

def subscribe_for_stops(stop_ids) do
Application.get_env(
Expand All @@ -28,21 +28,29 @@ defmodule MobileAppBackend.Predictions.StreamSubscriber.Impl do
@behaviour MobileAppBackend.Predictions.StreamSubscriber

alias MBTAV3API.Stream.StaticInstance
alias MobileAppBackend.GlobalDataCache

require Logger

@impl true
def subscribe_for_stops(stop_ids) do
{:ok, %{data: routes}} = MBTAV3API.Repository.routes(filter: [stop: stop_ids])

Enum.each(routes, fn %MBTAV3API.Route{id: route_id} ->
{:ok, _data} =
StaticInstance.ensure_stream_started("predictions:route:to_store:#{route_id}",
include_current_data: false
)
end)

{:ok, _data} =
StaticInstance.ensure_stream_started("vehicles:to_store", include_current_data: false)

:ok
case GlobalDataCache.route_ids_for_stops(stop_ids) do
:error ->
Logger.error("#{__MODULE__} failed to fetch route_ids_for_stops from global data")
:error

route_ids ->
Enum.each(route_ids, fn route_id ->
{:ok, _data} =
StaticInstance.ensure_stream_started("predictions:route:to_store:#{route_id}",
include_current_data: false
)
end)

{:ok, _data} =
StaticInstance.ensure_stream_started("vehicles:to_store", include_current_data: false)

:ok
end
end
end
28 changes: 28 additions & 0 deletions test/mbta_v3_api/stop_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,34 @@ defmodule MBTAV3API.StopTest do
end
end

describe "stop_id_to_children/2" do
test "returns only the given stop ids with their :stop children" do
other_stop = build(:stop, id: "other")
standalone = build(:stop, id: "standalone")
child_stop = build(:stop, id: "child_stop", location_type: :stop)
node_stop = build(:stop, id: "child_node", location_type: :generic_node)

parent_stop =
build(:stop,
id: "parent",
location_type: :station,
child_stop_ids: ["child_stop", "child_node", "child_missing"]
)

assert %{"parent" => ["child_stop"], "standalone" => []} ==
Stop.stop_id_to_children(
%{
other_stop.id => other_stop,
standalone.id => standalone,
child_stop.id => child_stop,
node_stop.id => node_stop,
parent_stop.id => parent_stop
},
["standalone", "parent"]
)
end
end

describe "include_missing_siblings/1" do
test "sibling stops which aren't included in the stop map are added to the stop map" do
stops_with_missing_sibling = %{
Expand Down
38 changes: 38 additions & 0 deletions test/mobile_app_backend/global_data_cache_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule MobileAppBackend.GlobalDataCacheTest do
use HttpStub.Case
import MobileAppBackend.Factory
alias MobileAppBackend.GlobalDataCache

test "gets data" do
Expand Down Expand Up @@ -127,4 +128,41 @@ defmodule MobileAppBackend.GlobalDataCacheTest do
assert_receive :recalculate
end
end

describe "route_ids_for_stops/2" do
test "returns route ids for the given stops only" do
cache_key = make_ref()

start_link_supervised!({GlobalDataCache, key: cache_key})

:persistent_term.put(cache_key, %{
lines: %{},
pattern_ids_by_stop: %{
"stop_1" => ["66_1", "39_1"],
"stop_2" => ["66_2"],
"stop_3" => ["15_1"]
},
routes: %{
"66" => build(:route, id: "66"),
"39" => build(:route, id: "39"),
"15" => build(:route, id: "15")
},
route_patterns: %{
"66_1" => build(:route_pattern, id: "66_1", route_id: "66"),
"66_2" => build(:route_pattern, id: "66_2", route_id: "66"),
"39_1" => build(:route_pattern, id: "39_1", route_id: "39"),
"15_1" => build(:route_pattern, id: "15_1", route_id: "15")
},
stops: %{
"stop_1" => build(:stop, id: "stop_1"),
"stop_2" => build(:stop, id: "stop_2"),
"stop_3" => build(:stop, id: "stop_3")
},
trips: %{}
})

assert ["39", "66"] =
Enum.sort(GlobalDataCache.route_ids_for_stops(["stop_1", "stop_2"], cache_key))
end
end
end
Loading

0 comments on commit 02d94a0

Please sign in to comment.