Skip to content

Commit

Permalink
perf(Predictions.PubSub): read routes for stop from global cached data
Browse files Browse the repository at this point in the history
  • Loading branch information
KaylaBrady committed Oct 1, 2024
1 parent c1580be commit 0cff77e
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 45 deletions.
2 changes: 1 addition & 1 deletion lib/mbta_v3_api/stop.ex
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ defmodule MBTAV3API.Stop do
stop
end

@spec stop_id_to_children(Object.stop_map(), [Stop.id()]) :: %{Stop.id() => [Stop.id()]}
@spec stop_id_to_children(Object.stop_map(), [id()]) :: %{id() => [id()]}
@doc """
Build a map containing the given stop_ids to their corresponding child stop ids.
Excludes child stops that don't have `location_type: :stop`
Expand Down
37 changes: 26 additions & 11 deletions lib/mobile_app_backend/global_data_cache.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule MobileAppBackend.GlobalDataCache do
use GenServer
alias MBTAV3API.Route
alias MBTAV3API.Stop
alias MBTAV3API.{JsonApi, Repository}
alias MBTAV3API.JsonApi.Object
Expand All @@ -25,10 +26,14 @@ defmodule MobileAppBackend.GlobalDataCache do
defstruct [:key, :update_ms, :first_update_ms]
end


@callback default_key :: key()
@callback get_data(key()) :: data()

@doc """
Get the id of all routes that serve the given stops
"""
@callback route_ids_for_stops([Stop.id()], key()) :: :error | [Route.id()]

def start_link(opts \\ []) do
Application.get_env(
:mobile_app_backend,
Expand All @@ -37,6 +42,14 @@ defmodule MobileAppBackend.GlobalDataCache do
).start_link(opts)
end

def handle_info(msg, state) do
Application.get_env(
:mobile_app_backend,
MobileAppBackend.GlobalDataCache.Module,
MobileAppBackend.GlobalDataCache.Impl
).handle_info(msg, state)
end

def init(opts \\ []) do
Application.get_env(
:mobile_app_backend,
Expand All @@ -60,11 +73,18 @@ defmodule MobileAppBackend.GlobalDataCache do
MobileAppBackend.GlobalDataCache.Impl
).get_data(key)
end

def route_ids_for_stops(stop_ids, key \\ default_key()) do
Application.get_env(
:mobile_app_backend,
MobileAppBackend.GlobalDataCache.Module,
MobileAppBackend.GlobalDataCache.Impl
).route_ids_for_stops(stop_ids, key)
end
end

defmodule MobileAppBackend.GlobalDataCache.Impl do
use GenServer
alias MBTAV3API.Stop
alias MBTAV3API.{JsonApi, Repository}
alias MobileAppBackend.GlobalDataCache
alias MobileAppBackend.GlobalDataCache.State
Expand All @@ -85,10 +105,7 @@ defmodule MobileAppBackend.GlobalDataCache.Impl do
:persistent_term.get(key, nil) || update_data(key)
end

@spec route_ids_for_stops([Stop.id()]) :: :error | {:ok, [Route.id()]}
@doc """
Get the id of all routes that serve the given stop
"""
@impl true
def route_ids_for_stops(stop_ids, key \\ default_key()) do
data = :persistent_term.get(key, nil) || update_data(key)

Expand All @@ -101,17 +118,16 @@ defmodule MobileAppBackend.GlobalDataCache.Impl do
route_pattern_ids =
pattern_ids_by_stop
|> Map.take(stop_ids)
|> Map.values()
|> Enum.flat_map(fn {_stop_id, pattern_ids} -> pattern_ids end)
|> Enum.uniq()

routes =
route_patterns
|> Map.take(route_pattern_ids)
|> Map.values()
|> Enum.map(& &1.route_id)
|> Enum.map(fn {_route_pattern_id, pattern} -> pattern.route_id end)
|> Enum.uniq()

{:ok, routes}
routes
end
end

Expand All @@ -134,7 +150,6 @@ defmodule MobileAppBackend.GlobalDataCache.Impl do
@impl GenServer
def handle_info(:recalculate, %State{update_ms: update_ms} = state) do
update_data(state.key)

Process.send_after(self(), :recalculate, update_ms)

{:noreply, state}
Expand Down
28 changes: 13 additions & 15 deletions lib/mobile_app_backend/predictions/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,18 @@ defmodule MobileAppBackend.Predictions.PubSub do

@impl true
def subscribe_for_stops(stop_ids) do
with %{stops: all_stops_by_id} <- GlobalDataCache.get_data() do
stop_id_to_children = Stop.stop_id_to_children(all_stops_by_id, stop_ids)

child_stop_ids =
stop_id_to_children
|> Map.values()
|> List.flatten()

{time_micros, _result} =
:timer.tc(MobileAppBackend.Predictions.StreamSubscriber, :subscribe_for_stops, [
stop_ids ++ child_stop_ids
])

with %{stops: all_stops_by_id} <- GlobalDataCache.get_data(),
stop_id_to_children <- Stop.stop_id_to_children(all_stops_by_id, stop_ids),
child_stop_ids <-
stop_id_to_children
|> Map.values()
|> List.flatten(),
{time_micros, :ok} <-
:timer.tc(MobileAppBackend.Predictions.StreamSubscriber, :subscribe_for_stops, [
stop_ids ++ child_stop_ids
]) do
Logger.info(
"#{__MODULE__} subscribe_for_stops stop_id=#{inspect(stop_ids)} duration=#{time_micros / 1000}"
"#{__MODULE__} subscribe_for_stops stop_id=#{inspect(stop_ids)} duration=#{time_micros / 1000} "
)

all_predictions_data =
Expand All @@ -108,11 +105,12 @@ defmodule MobileAppBackend.Predictions.PubSub do
trips: all_predictions_data.trips,
vehicles: all_predictions_data.vehicles
}
else
_ -> :error
end
end

@impl true
@spec subscribe_for_stop(any()) :: none()
def subscribe_for_stop(stop_id) do
subscribe_for_stops([stop_id])
end
Expand Down
36 changes: 22 additions & 14 deletions lib/mobile_app_backend/predictions/stream_subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defmodule MobileAppBackend.Predictions.StreamSubscriber do
Ensure prediction streams have been started for every route served by the given stops
and the stream of all vehicles has been started.
"""
@callback subscribe_for_stops([Stop.id()]) :: :ok
@callback subscribe_for_stops([Stop.id()]) :: :ok | :error

def subscribe_for_stops(stop_ids) do
Application.get_env(
Expand All @@ -28,21 +28,29 @@ defmodule MobileAppBackend.Predictions.StreamSubscriber.Impl do
@behaviour MobileAppBackend.Predictions.StreamSubscriber

alias MBTAV3API.Stream.StaticInstance
alias MobileAppBackend.GlobalDataCache

require Logger

@impl true
def subscribe_for_stops(stop_ids) do
{:ok, %{data: routes}} = MBTAV3API.Repository.routes(filter: [stop: stop_ids])

Enum.each(routes, fn %MBTAV3API.Route{id: route_id} ->
{:ok, _data} =
StaticInstance.ensure_stream_started("predictions:route:to_store:#{route_id}",
include_current_data: false
)
end)

{:ok, _data} =
StaticInstance.ensure_stream_started("vehicles:to_store", include_current_data: false)

:ok
case GlobalDataCache.route_ids_for_stops(stop_ids) do
:error ->
Logger.error("#{__MODULE__} failed to fetch route_ids_for_stops from global data")
:error

route_ids ->
Enum.each(route_ids, fn route_id ->
{:ok, _data} =
StaticInstance.ensure_stream_started("predictions:route:to_store:#{route_id}",
include_current_data: false
)
end)

{:ok, _data} =
StaticInstance.ensure_stream_started("vehicles:to_store", include_current_data: false)

:ok
end
end
end
38 changes: 38 additions & 0 deletions test/mobile_app_backend/global_data_cache_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule MobileAppBackend.GlobalDataCacheTest do
use HttpStub.Case
import MobileAppBackend.Factory
alias MobileAppBackend.GlobalDataCache

test "gets data" do
Expand Down Expand Up @@ -127,4 +128,41 @@ defmodule MobileAppBackend.GlobalDataCacheTest do
assert_receive :recalculate
end
end

describe "route_ids_for_stops/2" do
test "returns route ids for the given stops only" do
cache_key = make_ref()

start_link_supervised!({GlobalDataCache, key: cache_key})

:persistent_term.put(cache_key, %{
lines: %{},
pattern_ids_by_stop: %{
"stop_1" => ["66_1", "39_1"],
"stop_2" => ["66_2"],
"stop_3" => ["15_1"]
},
routes: %{
"66" => build(:route, id: "66"),
"39" => build(:route, id: "39"),
"15" => build(:route, id: "15")
},
route_patterns: %{
"66_1" => build(:route_pattern, id: "66_1", route_id: "66"),
"66_2" => build(:route_pattern, id: "66_2", route_id: "66"),
"39_1" => build(:route_pattern, id: "39_1", route_id: "39"),
"15_1" => build(:route_pattern, id: "15_1", route_id: "15")
},
stops: %{
"stop_1" => build(:stop, id: "stop_1"),
"stop_2" => build(:stop, id: "stop_2"),
"stop_3" => build(:stop, id: "stop_3")
},
trips: %{}
})

assert ["39", "66"] =
Enum.sort(GlobalDataCache.route_ids_for_stops(["stop_1", "stop_2"], cache_key))
end
end
end
14 changes: 10 additions & 4 deletions test/mobile_app_backend/predictions/stream_subscriber_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,25 @@ defmodule MobileAppBackend.Predictions.StreamSubscriberTest do
alias MobileAppBackend.Predictions.StreamSubscriber
import Mox
import Test.Support.Helpers
import MobileAppBackend.Factory

describe "subscribe_for_stops/1" do
setup do
verify_on_exit!()

reassign_env(
:mobile_app_backend,
MobileAppBackend.GlobalDataCache.Module,
GlobalDataCacheMock
)

reassign_env(:mobile_app_backend, MBTAV3API.Stream.StaticInstance, StaticInstanceMock)
reassign_env(:mobile_app_backend, MBTAV3API.Repository, RepositoryMock)
end

test "starts streams for to the routes served at the given stops and vehicles" do
expect(RepositoryMock, :routes, fn _, _ ->
ok_response([build(:route, id: "66"), build(:route, id: "39")])
end)
GlobalDataCacheMock
|> expect(:default_key, fn -> :default_key end)
|> expect(:route_ids_for_stops, fn _, _ -> ["66", "39"] end)

StaticInstanceMock
|> expect(:ensure_stream_started, fn "predictions:route:to_store:66",
Expand Down

0 comments on commit 0cff77e

Please sign in to comment.