diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 096ce7f4..6b0ec829 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -2,10 +2,10 @@ "version": "2.0.0", "tasks": [ { - "label": "test", + "label": "credo --strict", "type": "shell", - "command": "mix test", - "group": "test" + "command": "mix credo --strict", + "group": "build" }, { "label": "deps.compile", @@ -18,6 +18,12 @@ "type": "shell", "command": "mix deps.get", "group": "build" + }, + { + "label": "test", + "type": "shell", + "command": "mix test", + "group": "test" } ] } diff --git a/lib/mobile_app_backend/predictions/pub_sub.ex b/lib/mobile_app_backend/predictions/pub_sub.ex index 0d9364de..a3130906 100644 --- a/lib/mobile_app_backend/predictions/pub_sub.ex +++ b/lib/mobile_app_backend/predictions/pub_sub.ex @@ -4,20 +4,33 @@ defmodule MobileAppBackend.Predictions.PubSub.Behaviour do @type predictions_for_stop :: %{Stop.id() => JsonApi.Object.full_map()} - @type subscribe_response :: %{ + @type subscribe_stop_response :: %{ predictions_by_stop: %{Stop.id() => %{Prediction.id() => Prediction.t()}}, trips: %{Trip.id() => Trip.t()}, vehicles: %{Vehicle.id() => Vehicle.t()} } + @type subscribe_trip_response :: + %{ + trip_id: Trip.id(), + predictions: %{Prediction.id() => Prediction.t()}, + trips: %{Trip.id() => Trip.t()}, + vehicles: %{Vehicle.id() => Vehicle.t()} + } + | :error + @doc """ Subscribe to prediction updates for the given stop. For a parent station, this subscribes to updates for all child stops. """ - @callback subscribe_for_stop(Stop.id()) :: subscribe_response() + @callback subscribe_for_stop(Stop.id()) :: subscribe_stop_response() @doc """ Subscribe to prediction updates for multiple stops. For parent stations, this subscribes to updates for all their child stops. """ - @callback subscribe_for_stops([Stop.id()]) :: subscribe_response() + @callback subscribe_for_stops([Stop.id()]) :: subscribe_stop_response() + @doc """ + Subscribe to prediction updates for the given trip. + """ + @callback subscribe_for_trip(Trip.id()) :: subscribe_trip_response() end defmodule MobileAppBackend.Predictions.PubSub do @@ -114,6 +127,34 @@ defmodule MobileAppBackend.Predictions.PubSub do subscribe_for_stops([stop_id]) end + @impl true + def subscribe_for_trip(trip_id) do + case :timer.tc(MobileAppBackend.Predictions.StreamSubscriber, :subscribe_for_trip, [trip_id]) do + {time_micros, :ok} -> + Logger.info( + "#{__MODULE__} subscribe_for_trip trip_id=#{trip_id} duration=#{time_micros / 1000} " + ) + + predictions_data = + register_trip(trip_id) + |> Store.Predictions.fetch_with_associations() + + %{ + trip_id: trip_id, + predictions: predictions_data.predictions, + trips: predictions_data.trips, + vehicles: predictions_data.vehicles + } + + {time_micros, :error} -> + Logger.warning( + "#{__MODULE__} failed to subscribe_for_trip trip_id=#{trip_id} duration=#{time_micros / 1000} " + ) + + :error + end + end + @spec group_predictions_for_stop(%{Prediction.id() => Prediction.t()}, [Stop.id()], %{ Stop.id() => [Stop.id()] }) :: %{Stop.id() => %{Prediction.id() => Prediction.t()}} @@ -187,6 +228,20 @@ defmodule MobileAppBackend.Predictions.PubSub do fetch_keys end + @spec register_trip(Trip.id()) :: Store.fetch_keys() + defp register_trip(trip_id) do + fetch_keys = [trip_id: trip_id] + + {:ok, _owner} = + Registry.register( + MobileAppBackend.Predictions.Registry, + @fetch_registry_key, + {fetch_keys, fn data -> Map.put(data, :trip_id, trip_id) end} + ) + + fetch_keys + end + @impl GenServer def init(opts \\ []) do # Predictions are streamed from the V3 API by route, diff --git a/lib/mobile_app_backend/predictions/stream_subscriber.ex b/lib/mobile_app_backend/predictions/stream_subscriber.ex index fbd23002..402f93bf 100644 --- a/lib/mobile_app_backend/predictions/stream_subscriber.ex +++ b/lib/mobile_app_backend/predictions/stream_subscriber.ex @@ -7,6 +7,7 @@ defmodule MobileAppBackend.Predictions.StreamSubscriber do """ alias MBTAV3API.Stop + alias MBTAV3API.Trip alias MobileAppBackend.Predictions.StreamSubscriber @doc """ @@ -14,7 +15,6 @@ defmodule MobileAppBackend.Predictions.StreamSubscriber do and the stream of all vehicles has been started. """ @callback subscribe_for_stops([Stop.id()]) :: :ok | :error - def subscribe_for_stops(stop_ids) do Application.get_env( :mobile_app_backend, @@ -22,6 +22,15 @@ defmodule MobileAppBackend.Predictions.StreamSubscriber do StreamSubscriber.Impl ).subscribe_for_stops(stop_ids) end + + @callback subscribe_for_trip(Trip.id()) :: :ok | :error + def subscribe_for_trip(trip_id) do + Application.get_env( + :mobile_app_backend, + MobileAppBackend.Predictions.StreamSubscriber, + StreamSubscriber.Impl + ).subscribe_for_trip(trip_id) + end end defmodule MobileAppBackend.Predictions.StreamSubscriber.Impl do @@ -53,4 +62,22 @@ defmodule MobileAppBackend.Predictions.StreamSubscriber.Impl do :ok end end + + @impl true + def subscribe_for_trip(trip_id) do + with {:ok, %{data: [trip]}} <- MBTAV3API.Repository.trips(filter: [id: trip_id]), + route_id <- trip.route_id, + {:ok, _data} <- + StaticInstance.ensure_stream_started("predictions:route:to_store:#{route_id}", + include_current_data: false + ), + {:ok, _data} <- + StaticInstance.ensure_stream_started("vehicles:to_store", include_current_data: false) do + :ok + else + _ -> + Logger.warning("#{__MODULE__} failed to fetch trip from repository for #{trip_id}") + :error + end + end end diff --git a/lib/mobile_app_backend_web/channels/predictions_for_trip_channel.ex b/lib/mobile_app_backend_web/channels/predictions_for_trip_channel.ex index 13a2a6a5..734bccf5 100644 --- a/lib/mobile_app_backend_web/channels/predictions_for_trip_channel.ex +++ b/lib/mobile_app_backend_web/channels/predictions_for_trip_channel.ex @@ -1,74 +1,47 @@ defmodule MobileAppBackendWeb.PredictionsForTripChannel do use MobileAppBackendWeb, :channel - - alias MBTAV3API.JsonApi - alias MBTAV3API.Prediction - - @throttle_ms 500 + require Logger @impl true def join("predictions:trip:" <> trip_id, _payload, socket) do - {:ok, throttler} = - MobileAppBackend.Throttler.start_link( - target: self(), - cast: :send_data, - ms: @throttle_ms - ) - - {:ok, %{data: [trip]}} = MBTAV3API.Repository.trips(filter: [id: trip_id]) - - route_id = trip.route_id - - {:ok, data} = MBTAV3API.Stream.StaticInstance.subscribe("predictions:route:#{route_id}") + if trip_id == "" do + {:error, %{code: :no_trip_id}} + else + subscribe(trip_id, socket) + end + end - data = filter_data(data, trip_id) + defp subscribe(trip_id, socket) do + pubsub_module = + Application.get_env( + :mobile_app_backend, + MobileAppBackend.Predictions.PubSub, + MobileAppBackend.Predictions.PubSub + ) - {:ok, data, assign(socket, data: data, trip_id: trip_id, throttler: throttler)} - end + case :timer.tc(fn -> pubsub_module.subscribe_for_trip(trip_id) end) do + {time_micros, :error} -> + Logger.warning("#{__MODULE__} failed join duration=#{time_micros / 1000}") + {:error, %{code: :subscribe_failed}} - @impl true - def handle_info({:stream_data, "predictions:route:" <> _route_id, data}, socket) do - old_data = socket.assigns.data - new_data = filter_data(data, socket.assigns.trip_id) + {time_micros, initial_data} -> + Logger.info("#{__MODULE__} join duration=#{time_micros / 1000}") - if old_data != new_data do - MobileAppBackend.Throttler.request(socket.assigns.throttler) + {:ok, initial_data, socket} end - - socket = assign(socket, data: new_data) - {:noreply, socket} end @impl true - def handle_cast(:send_data, socket) do - :ok = push(socket, "stream_data", socket.assigns.data) - {:noreply, socket} - end + @spec handle_info({:new_predictions, any()}, Phoenix.Socket.t()) :: + {:noreply, Phoenix.Socket.t()} + def handle_info({:new_predictions, new_predictions_for_trip}, socket) do + {time_micros, _result} = + :timer.tc(fn -> + :ok = push(socket, "stream_data", new_predictions_for_trip) + end) - @doc """ - Filters the given data to predictions that are at one of the listed stops and the associated trips and vehicles. - """ - @spec filter_data(JsonApi.Object.full_map(), String.t()) :: JsonApi.Object.full_map() - def filter_data(route_data, trip_id) do - %{predictions: predictions, vehicle_ids: vehicle_ids} = - for {_, %Prediction{} = prediction} <- route_data.predictions, - reduce: %{predictions: %{}, vehicle_ids: []} do - %{predictions: predictions, vehicle_ids: vehicle_ids} -> - if prediction.trip_id == trip_id do - %{ - predictions: Map.put(predictions, prediction.id, prediction), - vehicle_ids: [prediction.vehicle_id | vehicle_ids] - } - else - %{predictions: predictions, vehicle_ids: vehicle_ids} - end - end + Logger.info("#{__MODULE__} push duration=#{time_micros / 1000}") - %{ - JsonApi.Object.to_full_map([]) - | predictions: predictions, - trips: Map.take(route_data.trips, [trip_id]), - vehicles: Map.take(route_data.vehicles, vehicle_ids) - } + {:noreply, socket} end end diff --git a/test/mobile_app_backend/predictions/pub_sub_test.exs b/test/mobile_app_backend/predictions/pub_sub_test.exs index 7f9a4e1c..7a2156c7 100644 --- a/test/mobile_app_backend/predictions/pub_sub_test.exs +++ b/test/mobile_app_backend/predictions/pub_sub_test.exs @@ -193,6 +193,47 @@ defmodule MobileAppBackend.Predictions.PubSubTests do end end + describe "subscribe_for_trip/1" do + test "returns initial data for the given trip" do + prediction_1 = + build(:prediction, id: "p_1", stop_id: "12345", trip_id: "trip_1", vehicle_id: "v_1") + + prediction_2 = + build(:prediction, id: "p_2", stop_id: "67890", trip_id: "trip_1", vehicle_id: "v_1") + + trip_1 = build(:trip, id: "trip_1") + vehicle_1 = build(:vehicle, id: "v_1") + + full_map = + JsonApi.Object.to_full_map([ + prediction_1, + prediction_2, + trip_1, + vehicle_1 + ]) + + expect(PredictionsStoreMock, :fetch_with_associations, fn [trip_id: "trip_1"] -> + full_map + end) + + expect(StreamSubscriberMock, :subscribe_for_trip, fn _ -> :ok end) + + assert %{ + trip_id: trip_1.id, + predictions: %{"p_1" => prediction_1, "p_2" => prediction_2}, + trips: %{"trip_1" => trip_1}, + vehicles: %{"v_1" => vehicle_1} + } == PubSub.subscribe_for_trip("trip_1") + end + + @tag :capture_log + test "returns an error when subscriber fails" do + expect(StreamSubscriberMock, :subscribe_for_trip, fn _ -> :error end) + + assert :error == PubSub.subscribe_for_trip("trip_1") + end + end + describe "handle_info" do setup do _dispatched_table = :ets.new(:test_last_dispatched, [:set, :named_table]) diff --git a/test/mobile_app_backend/predictions/stream_subscriber_test.exs b/test/mobile_app_backend/predictions/stream_subscriber_test.exs index 9805a944..81845697 100644 --- a/test/mobile_app_backend/predictions/stream_subscriber_test.exs +++ b/test/mobile_app_backend/predictions/stream_subscriber_test.exs @@ -4,6 +4,7 @@ defmodule MobileAppBackend.Predictions.StreamSubscriberTest do alias MobileAppBackend.Predictions.StreamSubscriber import Mox import Test.Support.Helpers + import MobileAppBackend.Factory describe "subscribe_for_stops/1" do setup do @@ -40,4 +41,37 @@ defmodule MobileAppBackend.Predictions.StreamSubscriberTest do StreamSubscriber.subscribe_for_stops([1, 2]) end end + + describe "subscribe_for_trip/1" do + setup do + verify_on_exit!() + + reassign_env( + :mobile_app_backend, + MobileAppBackend.GlobalDataCache.Module, + GlobalDataCacheMock + ) + + reassign_env(:mobile_app_backend, MBTAV3API.Stream.StaticInstance, StaticInstanceMock) + reassign_env(:mobile_app_backend, MBTAV3API.Repository, RepositoryMock) + end + + test "starts streams for to the routes served at the given stops and vehicles" do + trip = build(:trip, id: "trip", route_id: "66") + + RepositoryMock + |> expect(:trips, fn _, _ -> {:ok, %{data: [trip]}} end) + + StaticInstanceMock + |> expect(:ensure_stream_started, fn "predictions:route:to_store:66", + include_current_data: false -> + {:ok, :no_data} + end) + |> expect(:ensure_stream_started, fn "vehicles:to_store", include_current_data: false -> + {:ok, :no_data} + end) + + StreamSubscriber.subscribe_for_trip("trip") + end + end end diff --git a/test/mobile_app_backend_web/channels/predictions_for_trip_channel_test.exs b/test/mobile_app_backend_web/channels/predictions_for_trip_channel_test.exs index b5ec59c0..51726a1b 100644 --- a/test/mobile_app_backend_web/channels/predictions_for_trip_channel_test.exs +++ b/test/mobile_app_backend_web/channels/predictions_for_trip_channel_test.exs @@ -5,19 +5,12 @@ defmodule MobileAppBackendWeb.PredictionsForTripChannelTest do import MobileAppBackend.Factory import Mox import Test.Support.Helpers - import Test.Support.Sigils - alias MBTAV3API.JsonApi - alias MBTAV3API.Prediction - alias MBTAV3API.Stream - alias MBTAV3API.Trip - alias MBTAV3API.Vehicle alias MobileAppBackendWeb.PredictionsForTripChannel - alias Test.Support.FakeStaticInstance setup do reassign_env(:mobile_app_backend, :base_url, "https://api.example.net") reassign_env(:mobile_app_backend, :api_key, "abcdef") - reassign_env(:mobile_app_backend, MBTAV3API.Repository, RepositoryMock) + reassign_env(:mobile_app_backend, MobileAppBackend.Predictions.PubSub, PredictionsPubSubMock) {:ok, socket} = connect(MobileAppBackendWeb.UserSocket, %{}) @@ -25,194 +18,66 @@ defmodule MobileAppBackendWeb.PredictionsForTripChannelTest do end test "joins and leaves ok", %{socket: socket} do - trip_id = "81" - route_id = "92" + prediction_1 = + build(:prediction, id: "p_1", trip_id: "trip_1", stop_id: "12345", vehicle_id: "v_1") - RepositoryMock - |> expect(:trips, fn [filter: [id: ^trip_id]], _ -> - ok_response([build(:trip, id: trip_id, route_id: route_id)]) - end) + build(:prediction, id: "p_2", trip_id: "trip_2", stop_id: "67890", vehicle_id: "v_2") - prediction = build(:prediction, trip_id: trip_id, stop_id: "12345") + trip_1 = build(:trip, id: "trip_1") + build(:trip, id: "trip_2") - start_link_supervised!( - {FakeStaticInstance, - topic: "predictions:route:#{route_id}", data: to_full_map([prediction])} - ) + vehicle_1 = build(:vehicle, id: "v_1") + build(:vehicle, id: "v_2") + + response = %{ + predictions: %{ + "12345" => %{"p_1" => prediction_1} + }, + trips: %{"trip_1" => trip_1}, + vehicles: %{"v_1" => vehicle_1} + } + + expect(PredictionsPubSubMock, :subscribe_for_trip, 1, fn _ -> + response + end) {:ok, reply, _socket} = - subscribe_and_join(socket, "predictions:trip:#{trip_id}") + subscribe_and_join(socket, "predictions:trip:trip_1") - assert reply == to_full_map([prediction]) + assert reply == response end - describe "message handling" do - setup %{socket: socket} do - RepositoryMock - |> expect(:trips, fn _, _ -> - ok_response([build(:trip, id: "60392455", route_id: "Red")]) - end) - - start_link_supervised!( - {FakeStaticInstance, topic: "predictions:route:Red", data: to_full_map([])} - ) + test "error if missing trip id in topic", %{socket: socket} do + assert subscribe_and_join(socket, "predictions:trip:") == {:error, %{code: :no_trip_id}} + end - {:ok, reply, socket} = subscribe_and_join(socket, "predictions:trip:60392455") - - assert reply == to_full_map([]) - - Stream.PubSub.broadcast!( - "predictions:route:Red", - {:stream_data, "predictions:route:Red", - to_full_map([ - trip_60392455(), - trip_60392515(), - vehicle_r_547a83f7(), - vehicle_r_547a83f8(), - prediction_60392455(), - prediction_60392515() - ])} - ) + test "handles new predictions", %{socket: socket} do + expect(PredictionsPubSubMock, :subscribe_for_trip, fn _ -> %{} end) - assert_push "stream_data", initial_data - - socket.assigns.throttler |> :sys.replace_state(&put_in(&1.last_cast, nil)) - - {:ok, %{initial_data: initial_data}} - end - - defp trip_60392455 do - %Trip{id: "60392455", direction_id: 1, route_pattern_id: "Red-1-1", shape_id: "931_0010"} - end - - defp trip_60392515 do - %Trip{id: "60392515", direction_id: 0, route_pattern_id: "Red-1-0", shape_id: "931_0009"} - end - - defp vehicle_r_547a83f7 do - %Vehicle{ - id: "R-547A83F7", - current_status: :in_transit_to, - stop_id: "70072", - trip_id: trip_60392455().id - } - end - - defp vehicle_r_547a83f8 do - %Vehicle{ - id: "R-547A83F8", - current_status: :stopped_at, - stop_id: "70085", - trip_id: trip_60392515().id - } - end - - defp prediction_60392455 do - %Prediction{ - id: "prediction-60392455-70086-90", - arrival_time: ~B[2024-01-30 15:44:09], - departure_time: ~B[2024-01-30 15:45:10], - direction_id: 1, - revenue: true, - schedule_relationship: :scheduled, - stop_sequence: 90, - route_id: "Red", - stop_id: "70086", - trip_id: trip_60392455().id, - vehicle_id: vehicle_r_547a83f7().id - } - end - - defp prediction_60392515 do - %Prediction{ - id: "prediction-60392515-70085-130", - arrival_time: ~B[2024-01-30 15:46:26], - departure_time: ~B[2024-01-30 15:47:48], - direction_id: 0, - revenue: true, - schedule_relationship: :scheduled, - stop_sequence: 130, - route_id: "Red", - stop_id: "70085", - trip_id: trip_60392515().id, - vehicle_id: vehicle_r_547a83f8().id - } - end - - test "correctly handles reset", %{initial_data: data} do - assert data == - JsonApi.Object.to_full_map([ - trip_60392455(), - vehicle_r_547a83f7(), - prediction_60392455() - ]) - end - - test "replaces old data" do - updated_prediction = %Prediction{ - prediction_60392455() - | arrival_time: ~B[2024-05-08 16:18:21] - } - - Stream.PubSub.broadcast!( - "predictions:route:Red", - {:stream_data, "predictions:route:Red", - to_full_map([trip_60392455(), vehicle_r_547a83f7(), updated_prediction])} - ) + {:ok, _reply, socket} = + subscribe_and_join(socket, "predictions:trip:trip_1") - assert_push "stream_data", data - - assert data == - JsonApi.Object.to_full_map([ - trip_60392455(), - vehicle_r_547a83f7(), - updated_prediction - ]) - end - - test "ignores irrelevant data" do - fake_trip = build(:trip) - fake_vehicle = build(:vehicle) - - fake_prediction = %MBTAV3API.Prediction{ - prediction_60392515() - | stop_id: "somewhere-else", - trip_id: fake_trip.id, - vehicle_id: fake_vehicle.id - } - - Stream.PubSub.broadcast!( - "predictions:route:Red", - {:stream_data, "predictions:route:Red", - to_full_map([ - trip_60392455(), - vehicle_r_547a83f7(), - prediction_60392455(), - fake_trip, - fake_vehicle, - fake_prediction - ])} + prediction = + build(:prediction, + id: "prediction_1", + stop_id: "12345", + trip_id: "trip_1", + vehicle_id: "v_1" ) - refute_push "stream_data", _ - end - end + trip = build(:trip, id: "trip_1") + vehicle = build(:vehicle, id: "v_1") - describe "filter_data/2" do - test "properly divides predictions and associated objects" do - [trip1, trip2] = build_list(2, :trip) - [vehicle1, vehicle2] = build_list(2, :vehicle) - - prediction1 = - build(:prediction, stop_id: "12345", trip_id: trip1.id, vehicle_id: vehicle1.id) - - prediction2 = - build(:prediction, stop_id: "67890", trip_id: trip2.id, vehicle_id: vehicle2.id) + PredictionsForTripChannel.handle_info( + {:new_predictions, Map.put(to_full_map([prediction, trip, vehicle]), :trip_id, "trip_1")}, + socket + ) - assert PredictionsForTripChannel.filter_data( - to_full_map([prediction1, prediction2, trip1, trip2, vehicle1, vehicle2]), - trip1.id - ) == to_full_map([prediction1, trip1, vehicle1]) - end + assert_push "stream_data", %{ + trip_id: "trip_1", + predictions: %{"prediction_1" => ^prediction}, + trips: %{"trip_1" => ^trip}, + vehicles: %{"v_1" => ^vehicle} + } end end