diff --git a/config/config.exs b/config/config.exs index 6e062a9..d5be86f 100644 --- a/config/config.exs +++ b/config/config.exs @@ -13,7 +13,7 @@ config :mobile_app_backend, config :mobile_app_backend, MobileAppBackend.AppCheck, jwks_url: "https://firebaseappcheck.googleapis.com/v1/jwks" -config :mobile_app_backend, predictions_broadcast_interval_ms: 10_000 +config :mobile_app_backend, predictions_broadcast_interval_ms: 5_000 config :mobile_app_backend, MBTAV3API.ResponseCache, gc_interval: :timer.hours(1), diff --git a/lib/mbta_v3_api/stop.ex b/lib/mbta_v3_api/stop.ex index 1a04b90..f99c69f 100644 --- a/lib/mbta_v3_api/stop.ex +++ b/lib/mbta_v3_api/stop.ex @@ -1,4 +1,5 @@ defmodule MBTAV3API.Stop do + alias MBTAV3API.JsonApi.Object use MBTAV3API.JsonApi.Object require Util @@ -89,6 +90,27 @@ defmodule MBTAV3API.Stop do stop end + @spec stop_id_to_children(Object.stop_map(), [id()]) :: %{id() => [id()]} + @doc """ + Build a map containing the given stop_ids to their corresponding child stop ids. + Excludes child stops that don't have `location_type: :stop` + """ + def stop_id_to_children(all_stops_by_id, target_stop_ids) do + Map.take(all_stops_by_id, target_stop_ids) + |> Map.new(fn {stop_id, stop} -> + {stop_id, + Enum.filter( + List.wrap(stop.child_stop_ids), + fn child_stop_id -> + case Map.get(all_stops_by_id, child_stop_id) do + nil -> false + %{location_type: location_type} -> location_type == :stop + end + end + )} + end) + end + @impl JsonApi.Object def serialize_filter_value(:route_type, route_type) do MBTAV3API.Route.serialize_type(route_type) diff --git a/lib/mobile_app_backend/global_data_cache.ex b/lib/mobile_app_backend/global_data_cache.ex index b9d4db4..bbe03f2 100644 --- a/lib/mobile_app_backend/global_data_cache.ex +++ b/lib/mobile_app_backend/global_data_cache.ex @@ -1,5 +1,7 @@ defmodule MobileAppBackend.GlobalDataCache do use GenServer + alias MBTAV3API.Route + alias MBTAV3API.Stop alias MBTAV3API.{JsonApi, Repository} alias MBTAV3API.JsonApi.Object @@ -27,6 +29,11 @@ defmodule MobileAppBackend.GlobalDataCache do @callback default_key :: key() @callback get_data(key()) :: data() + @doc """ + Get the id of all routes that serve the given stops + """ + @callback route_ids_for_stops([Stop.id()], key()) :: :error | [Route.id()] + def start_link(opts \\ []) do Application.get_env( :mobile_app_backend, @@ -66,6 +73,14 @@ defmodule MobileAppBackend.GlobalDataCache do MobileAppBackend.GlobalDataCache.Impl ).get_data(key) end + + def route_ids_for_stops(stop_ids, key \\ default_key()) do + Application.get_env( + :mobile_app_backend, + MobileAppBackend.GlobalDataCache.Module, + MobileAppBackend.GlobalDataCache.Impl + ).route_ids_for_stops(stop_ids, key) + end end defmodule MobileAppBackend.GlobalDataCache.Impl do @@ -90,9 +105,36 @@ defmodule MobileAppBackend.GlobalDataCache.Impl do :persistent_term.get(key, nil) || update_data(key) end + @impl true + def route_ids_for_stops(stop_ids, key \\ default_key()) do + data = :persistent_term.get(key, nil) || update_data(key) + + if is_nil(data) do + :error + else + %{pattern_ids_by_stop: pattern_ids_by_stop, route_patterns: route_patterns} = + data + + route_pattern_ids = + pattern_ids_by_stop + |> Map.take(stop_ids) + |> Enum.flat_map(fn {_stop_id, pattern_ids} -> pattern_ids end) + |> Enum.uniq() + + routes = + route_patterns + |> Map.take(route_pattern_ids) + |> Enum.map(fn {_route_pattern_id, pattern} -> pattern.route_id end) + |> Enum.uniq() + + routes + end + end + @impl GenServer def init(opts \\ []) do opts = Keyword.merge(Application.get_env(:mobile_app_backend, GlobalDataCache), opts) + first_update_ms = opts[:first_update_ms] || :timer.seconds(1) state = %State{ @@ -108,7 +150,6 @@ defmodule MobileAppBackend.GlobalDataCache.Impl do @impl GenServer def handle_info(:recalculate, %State{update_ms: update_ms} = state) do update_data(state.key) - Process.send_after(self(), :recalculate, update_ms) {:noreply, state} diff --git a/lib/mobile_app_backend/predictions/pub_sub.ex b/lib/mobile_app_backend/predictions/pub_sub.ex index d9e6216..0d9364d 100644 --- a/lib/mobile_app_backend/predictions/pub_sub.ex +++ b/lib/mobile_app_backend/predictions/pub_sub.ex @@ -34,6 +34,7 @@ defmodule MobileAppBackend.Predictions.PubSub do """ use GenServer alias MBTAV3API.{Prediction, Stop, Store, Stream, Trip, Vehicle} + alias MobileAppBackend.GlobalDataCache alias MobileAppBackend.Predictions.PubSub @behaviour PubSub.Behaviour @@ -59,6 +60,7 @@ defmodule MobileAppBackend.Predictions.PubSub do @type state :: %{last_dispatched_table_name: atom()} @spec start_link(Keyword.t()) :: GenServer.on_start() + @spec start_link() :: :ignore | {:error, any()} | {:ok, pid()} def start_link(opts \\ []) do name = Keyword.get(opts, :name, __MODULE__) @@ -71,40 +73,33 @@ defmodule MobileAppBackend.Predictions.PubSub do @impl true def subscribe_for_stops(stop_ids) do - {:ok, %{data: _stop, included: %{stops: child_stops}}} = - MBTAV3API.Repository.stops(filter: [id: stop_ids], include: :child_stops) + %{stops: all_stops_by_id} = GlobalDataCache.get_data() + stop_id_to_children = Stop.stop_id_to_children(all_stops_by_id, stop_ids) - child_stops = - child_stops + child_stop_ids = + stop_id_to_children |> Map.values() - |> Enum.filter(&(&1.location_type == :stop)) + |> List.flatten() - child_stop_ids = Enum.map(child_stops, & &1.id) - - child_ids_by_parent_id = - child_stops - |> Enum.map(&{&1.parent_station_id, &1.id}) - |> Enum.group_by(&elem(&1, 0), &elem(&1, 1)) - - {time_micros, _result} = + {time_micros, :ok} = :timer.tc(MobileAppBackend.Predictions.StreamSubscriber, :subscribe_for_stops, [ stop_ids ++ child_stop_ids ]) Logger.info( - "#{__MODULE__} subscribe_for_stops stop_id=#{inspect(stop_ids)} duration=#{time_micros / 1000}" + "#{__MODULE__} subscribe_for_stops stop_id=#{inspect(stop_ids)} duration=#{time_micros / 1000} " ) all_predictions_data = stop_ids - |> register_all_stops(child_ids_by_parent_id) + |> register_all_stops(stop_id_to_children) |> Store.Predictions.fetch_with_associations() predictions_by_stop = group_predictions_for_stop( all_predictions_data.predictions, stop_ids, - child_ids_by_parent_id + stop_id_to_children ) %{ @@ -129,8 +124,8 @@ defmodule MobileAppBackend.Predictions.PubSub do |> Enum.group_by(& &1.stop_id) Map.new(stop_ids, fn stop_id -> - case Map.get(child_ids_by_parent_id, stop_id) do - nil -> + case Map.get(child_ids_by_parent_id, stop_id, []) do + [] -> {stop_id, prediction_list_by_stop |> Map.get(stop_id, []) @@ -151,8 +146,8 @@ defmodule MobileAppBackend.Predictions.PubSub do defp register_all_stops(stop_ids, child_ids_by_parent_id) do stop_ids |> Enum.flat_map(fn stop_id -> - case Map.get(child_ids_by_parent_id, stop_id) do - nil -> + case Map.get(child_ids_by_parent_id, stop_id, []) do + [] -> [register_single_stop(stop_id)] child_ids -> diff --git a/lib/mobile_app_backend/predictions/stream_subscriber.ex b/lib/mobile_app_backend/predictions/stream_subscriber.ex index bd14947..fbd2300 100644 --- a/lib/mobile_app_backend/predictions/stream_subscriber.ex +++ b/lib/mobile_app_backend/predictions/stream_subscriber.ex @@ -13,7 +13,7 @@ defmodule MobileAppBackend.Predictions.StreamSubscriber do Ensure prediction streams have been started for every route served by the given stops and the stream of all vehicles has been started. """ - @callback subscribe_for_stops([Stop.id()]) :: :ok + @callback subscribe_for_stops([Stop.id()]) :: :ok | :error def subscribe_for_stops(stop_ids) do Application.get_env( @@ -28,21 +28,29 @@ defmodule MobileAppBackend.Predictions.StreamSubscriber.Impl do @behaviour MobileAppBackend.Predictions.StreamSubscriber alias MBTAV3API.Stream.StaticInstance + alias MobileAppBackend.GlobalDataCache + + require Logger @impl true def subscribe_for_stops(stop_ids) do - {:ok, %{data: routes}} = MBTAV3API.Repository.routes(filter: [stop: stop_ids]) - - Enum.each(routes, fn %MBTAV3API.Route{id: route_id} -> - {:ok, _data} = - StaticInstance.ensure_stream_started("predictions:route:to_store:#{route_id}", - include_current_data: false - ) - end) - - {:ok, _data} = - StaticInstance.ensure_stream_started("vehicles:to_store", include_current_data: false) - - :ok + case GlobalDataCache.route_ids_for_stops(stop_ids) do + :error -> + Logger.error("#{__MODULE__} failed to fetch route_ids_for_stops from global data") + :error + + route_ids -> + Enum.each(route_ids, fn route_id -> + {:ok, _data} = + StaticInstance.ensure_stream_started("predictions:route:to_store:#{route_id}", + include_current_data: false + ) + end) + + {:ok, _data} = + StaticInstance.ensure_stream_started("vehicles:to_store", include_current_data: false) + + :ok + end end end diff --git a/test/mbta_v3_api/stop_test.exs b/test/mbta_v3_api/stop_test.exs index 0750329..c2454f2 100644 --- a/test/mbta_v3_api/stop_test.exs +++ b/test/mbta_v3_api/stop_test.exs @@ -80,6 +80,34 @@ defmodule MBTAV3API.StopTest do end end + describe "stop_id_to_children/2" do + test "returns only the given stop ids with their :stop children" do + other_stop = build(:stop, id: "other") + standalone = build(:stop, id: "standalone") + child_stop = build(:stop, id: "child_stop", location_type: :stop) + node_stop = build(:stop, id: "child_node", location_type: :generic_node) + + parent_stop = + build(:stop, + id: "parent", + location_type: :station, + child_stop_ids: ["child_stop", "child_node", "child_missing"] + ) + + assert %{"parent" => ["child_stop"], "standalone" => []} == + Stop.stop_id_to_children( + %{ + other_stop.id => other_stop, + standalone.id => standalone, + child_stop.id => child_stop, + node_stop.id => node_stop, + parent_stop.id => parent_stop + }, + ["standalone", "parent"] + ) + end + end + describe "include_missing_siblings/1" do test "sibling stops which aren't included in the stop map are added to the stop map" do stops_with_missing_sibling = %{ diff --git a/test/mobile_app_backend/global_data_cache_test.exs b/test/mobile_app_backend/global_data_cache_test.exs index a2b6c71..43a768b 100644 --- a/test/mobile_app_backend/global_data_cache_test.exs +++ b/test/mobile_app_backend/global_data_cache_test.exs @@ -1,5 +1,6 @@ defmodule MobileAppBackend.GlobalDataCacheTest do use HttpStub.Case + import MobileAppBackend.Factory alias MobileAppBackend.GlobalDataCache test "gets data" do @@ -127,4 +128,41 @@ defmodule MobileAppBackend.GlobalDataCacheTest do assert_receive :recalculate end end + + describe "route_ids_for_stops/2" do + test "returns route ids for the given stops only" do + cache_key = make_ref() + + start_link_supervised!({GlobalDataCache, key: cache_key}) + + :persistent_term.put(cache_key, %{ + lines: %{}, + pattern_ids_by_stop: %{ + "stop_1" => ["66_1", "39_1"], + "stop_2" => ["66_2"], + "stop_3" => ["15_1"] + }, + routes: %{ + "66" => build(:route, id: "66"), + "39" => build(:route, id: "39"), + "15" => build(:route, id: "15") + }, + route_patterns: %{ + "66_1" => build(:route_pattern, id: "66_1", route_id: "66"), + "66_2" => build(:route_pattern, id: "66_2", route_id: "66"), + "39_1" => build(:route_pattern, id: "39_1", route_id: "39"), + "15_1" => build(:route_pattern, id: "15_1", route_id: "15") + }, + stops: %{ + "stop_1" => build(:stop, id: "stop_1"), + "stop_2" => build(:stop, id: "stop_2"), + "stop_3" => build(:stop, id: "stop_3") + }, + trips: %{} + }) + + assert ["39", "66"] = + Enum.sort(GlobalDataCache.route_ids_for_stops(["stop_1", "stop_2"], cache_key)) + end + end end diff --git a/test/mobile_app_backend/predictions/pub_sub_test.exs b/test/mobile_app_backend/predictions/pub_sub_test.exs index 1ea1c82..7f9a4e1 100644 --- a/test/mobile_app_backend/predictions/pub_sub_test.exs +++ b/test/mobile_app_backend/predictions/pub_sub_test.exs @@ -11,6 +11,12 @@ defmodule MobileAppBackend.Predictions.PubSubTests do setup do verify_on_exit!() + reassign_env( + :mobile_app_backend, + MobileAppBackend.GlobalDataCache.Module, + GlobalDataCacheMock + ) + reassign_env(:mobile_app_backend, StreamSubscriber, StreamSubscriberMock) reassign_env(:mobile_app_backend, MBTAV3API.Repository, RepositoryMock) reassign_env(:mobile_app_backend, Store.Predictions, PredictionsStoreMock) @@ -62,8 +68,9 @@ defmodule MobileAppBackend.Predictions.PubSubTests do full_map end) - RepositoryMock - |> expect(:stops, fn _, _ -> ok_response([build(:stop, id: "12345")]) end) + GlobalDataCacheMock + |> expect(:default_key, fn -> :default_key end) + |> expect(:get_data, fn _ -> %{stops: %{"12345" => build(:stop, id: "12345")}} end) expect(StreamSubscriberMock, :subscribe_for_stops, fn _ -> :ok end) @@ -102,12 +109,19 @@ defmodule MobileAppBackend.Predictions.PubSubTests do full_map end) - RepositoryMock - |> expect(:stops, fn _, _ -> - ok_response([build(:stop, id: "parent_stop_id")], [ - build(:stop, id: "12345", location_type: :stop, parent_station_id: "parent_stop_id"), - build(:stop, id: "6789", location_type: :stop, parent_station_id: "parent_stop_id") - ]) + GlobalDataCacheMock + |> expect(:default_key, fn -> :default_key end) + |> expect(:get_data, fn _ -> + %{ + stops: %{ + "parent_stop_id" => + build(:stop, id: "parent_stop_id", child_stop_ids: ["12345", "6789"]), + "12345" => + build(:stop, id: "12345", location_type: :stop, parent_station_id: "parent_stop_id"), + "6789" => + build(:stop, id: "6789", location_type: :stop, parent_station_id: "parent_stop_id") + } + } end) expect(StreamSubscriberMock, :subscribe_for_stops, fn _ -> :ok end) @@ -153,11 +167,16 @@ defmodule MobileAppBackend.Predictions.PubSubTests do full_map end) - RepositoryMock - |> expect(:stops, fn _, _ -> - ok_response([build(:stop, id: "standalone")], [ - build(:stop, id: "child", parent_station_id: "parent") - ]) + GlobalDataCacheMock + |> expect(:default_key, fn -> :default_key end) + |> expect(:get_data, fn _ -> + %{ + stops: %{ + "parent" => build(:stop, id: "parent", child_stop_ids: ["child"]), + "child" => build(:stop, id: "child"), + "standalone" => build(:stop, id: "standalone") + } + } end) expect(StreamSubscriberMock, :subscribe_for_stops, fn _ -> :ok end) @@ -206,9 +225,14 @@ defmodule MobileAppBackend.Predictions.PubSubTests do JsonApi.Object.to_full_map([prediction_3, trip_1, vehicle_1]) end) - RepositoryMock - |> expect(:stops, fn _, _ -> - ok_response([build(:stop, id: "12345")], []) + GlobalDataCacheMock + |> expect(:default_key, fn -> :default_key end) + |> expect(:get_data, fn _ -> + %{ + stops: %{ + "12345" => build(:stop, id: "12345") + } + } end) expect(StreamSubscriberMock, :subscribe_for_stops, fn _ -> :ok end) @@ -283,9 +307,12 @@ defmodule MobileAppBackend.Predictions.PubSubTests do full_map_6789 end) - RepositoryMock - |> expect(:stops, 2, fn _, _ -> - ok_response([], []) + GlobalDataCacheMock + |> expect(:default_key, 2, fn -> :default_key end) + |> expect(:get_data, 2, fn _ -> + %{ + stops: %{} + } end) expect(StreamSubscriberMock, :subscribe_for_stops, 2, fn _ -> :ok end) @@ -323,9 +350,12 @@ defmodule MobileAppBackend.Predictions.PubSubTests do expect(PredictionsStoreMock, :fetch_with_associations, 2, fn _ -> full_map end) - RepositoryMock - |> expect(:stops, fn _, _ -> - ok_response([build(:stop, id: "12345")], []) + GlobalDataCacheMock + |> expect(:default_key, fn -> :default_key end) + |> expect(:get_data, fn _ -> + %{ + stops: %{} + } end) expect(StreamSubscriberMock, :subscribe_for_stops, fn _ -> :ok end) diff --git a/test/mobile_app_backend/predictions/stream_subscriber_test.exs b/test/mobile_app_backend/predictions/stream_subscriber_test.exs index 3ed1c10..9805a94 100644 --- a/test/mobile_app_backend/predictions/stream_subscriber_test.exs +++ b/test/mobile_app_backend/predictions/stream_subscriber_test.exs @@ -4,19 +4,25 @@ defmodule MobileAppBackend.Predictions.StreamSubscriberTest do alias MobileAppBackend.Predictions.StreamSubscriber import Mox import Test.Support.Helpers - import MobileAppBackend.Factory describe "subscribe_for_stops/1" do setup do verify_on_exit!() + + reassign_env( + :mobile_app_backend, + MobileAppBackend.GlobalDataCache.Module, + GlobalDataCacheMock + ) + reassign_env(:mobile_app_backend, MBTAV3API.Stream.StaticInstance, StaticInstanceMock) reassign_env(:mobile_app_backend, MBTAV3API.Repository, RepositoryMock) end test "starts streams for to the routes served at the given stops and vehicles" do - expect(RepositoryMock, :routes, fn _, _ -> - ok_response([build(:route, id: "66"), build(:route, id: "39")]) - end) + GlobalDataCacheMock + |> expect(:default_key, fn -> :default_key end) + |> expect(:route_ids_for_stops, fn _, _ -> ["66", "39"] end) StaticInstanceMock |> expect(:ensure_stream_started, fn "predictions:route:to_store:66",