Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make trip channel pull from ETS predictions #235

Merged
merged 10 commits into from
Nov 12, 2024
12 changes: 9 additions & 3 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
"version": "2.0.0",
"tasks": [
{
"label": "test",
"label": "credo --strict",
"type": "shell",
"command": "mix test",
"group": "test"
"command": "mix credo --strict",
"group": "build"
},
{
"label": "deps.compile",
Expand All @@ -18,6 +18,12 @@
"type": "shell",
"command": "mix deps.get",
"group": "build"
},
{
"label": "test",
"type": "shell",
"command": "mix test",
"group": "test"
}
]
}
57 changes: 54 additions & 3 deletions lib/mobile_app_backend/predictions/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,30 @@ defmodule MobileAppBackend.Predictions.PubSub.Behaviour do

@type predictions_for_stop :: %{Stop.id() => JsonApi.Object.full_map()}

@type subscribe_response :: %{
@type subscribe_stop_response :: %{
predictions_by_stop: %{Stop.id() => %{Prediction.id() => Prediction.t()}},
trips: %{Trip.id() => Trip.t()},
vehicles: %{Vehicle.id() => Vehicle.t()}
}

@type subscribe_trip_response :: %{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion(non-blocking): add the new trip_id field here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't understand what you mean?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a trip_id field added to the map here, I think it would be worth adding to the typespec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh, okay yes, updated

predictions: %{Prediction.id() => Prediction.t()},
trips: %{Trip.id() => Trip.t()},
vehicles: %{Vehicle.id() => Vehicle.t()}
}

@doc """
Subscribe to prediction updates for the given stop. For a parent station, this subscribes to updates for all child stops.
"""
@callback subscribe_for_stop(Stop.id()) :: subscribe_response()
@callback subscribe_for_stop(Stop.id()) :: subscribe_stop_response()
@doc """
Subscribe to prediction updates for multiple stops. For parent stations, this subscribes to updates for all their child stops.
"""
@callback subscribe_for_stops([Stop.id()]) :: subscribe_response()
@callback subscribe_for_stops([Stop.id()]) :: subscribe_stop_response()
@doc """
Subscribe to prediction updates for the given trip.
"""
@callback subscribe_for_trip(Trip.id()) :: subscribe_trip_response()
end

defmodule MobileAppBackend.Predictions.PubSub do
Expand Down Expand Up @@ -114,6 +124,33 @@ defmodule MobileAppBackend.Predictions.PubSub do
subscribe_for_stops([stop_id])
end

@impl true
def subscribe_for_trip(trip_id) do
case :timer.tc(MobileAppBackend.Predictions.StreamSubscriber, :subscribe_for_trip, [trip_id]) do
{time_micros, :ok} ->
Logger.info(
"#{__MODULE__} subscribe_for_trip trip_id=#{trip_id} duration=#{time_micros / 1000} "
)

predictions_data =
register_trip(trip_id)
|> Store.Predictions.fetch_with_associations()

%{
predictions: predictions_data.predictions,
trips: predictions_data.trips,
vehicles: predictions_data.vehicles
}

{time_micros, :error} ->
Logger.warning(
"#{__MODULE__} failed to subscribe_for_trip trip_id=#{trip_id} duration=#{time_micros / 1000} "
)

:error
end
end

@spec group_predictions_for_stop(%{Prediction.id() => Prediction.t()}, [Stop.id()], %{
Stop.id() => [Stop.id()]
}) :: %{Stop.id() => %{Prediction.id() => Prediction.t()}}
Expand Down Expand Up @@ -187,6 +224,20 @@ defmodule MobileAppBackend.Predictions.PubSub do
fetch_keys
end

@spec register_trip(Trip.id()) :: Store.fetch_keys()
defp register_trip(trip_id) do
fetch_keys = [trip_id: trip_id]

{:ok, _owner} =
Registry.register(
MobileAppBackend.Predictions.Registry,
@fetch_registry_key,
{fetch_keys, fn data -> Map.put(data, :trip_id, trip_id) end}
)

fetch_keys
end

@impl GenServer
def init(opts \\ []) do
# Predictions are streamed from the V3 API by route,
Expand Down
33 changes: 32 additions & 1 deletion lib/mobile_app_backend/predictions/stream_subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,30 @@ defmodule MobileAppBackend.Predictions.StreamSubscriber do

"""
alias MBTAV3API.Stop
alias MBTAV3API.Trip
alias MobileAppBackend.Predictions.StreamSubscriber

@doc """
Ensure prediction streams have been started for every route served by the given stops
and the stream of all vehicles has been started.
"""
@callback subscribe_for_stops([Stop.id()]) :: :ok | :error

def subscribe_for_stops(stop_ids) do
Application.get_env(
:mobile_app_backend,
MobileAppBackend.Predictions.StreamSubscriber,
StreamSubscriber.Impl
).subscribe_for_stops(stop_ids)
end

@callback subscribe_for_trip(Trip.id()) :: :ok | :error
def subscribe_for_trip(trip_id) do
Application.get_env(
:mobile_app_backend,
MobileAppBackend.Predictions.StreamSubscriber,
StreamSubscriber.Impl
).subscribe_for_trip(trip_id)
end
end

defmodule MobileAppBackend.Predictions.StreamSubscriber.Impl do
Expand Down Expand Up @@ -53,4 +62,26 @@ defmodule MobileAppBackend.Predictions.StreamSubscriber.Impl do
:ok
end
end

@impl true
def subscribe_for_trip(trip_id) do
case MBTAV3API.Repository.trips(filter: [id: trip_id]) do
EmmaSimon marked this conversation as resolved.
Show resolved Hide resolved
{:ok, %{data: [trip]}} ->
route_id = trip.route_id

{:ok, _data} =
StaticInstance.ensure_stream_started("predictions:route:to_store:#{route_id}",
include_current_data: false
)

{:ok, _data} =
StaticInstance.ensure_stream_started("vehicles:to_store", include_current_data: false)

:ok

_ ->
Logger.error("#{__MODULE__} failed to fetch trip from repository for #{trip_id}")
:error
end
end
end
Original file line number Diff line number Diff line change
@@ -1,74 +1,48 @@
defmodule MobileAppBackendWeb.PredictionsForTripChannel do
use MobileAppBackendWeb, :channel

alias MBTAV3API.JsonApi
alias MBTAV3API.Prediction

@throttle_ms 500
require Logger

@impl true
def join("predictions:trip:" <> trip_id, _payload, socket) do
{:ok, throttler} =
MobileAppBackend.Throttler.start_link(
target: self(),
cast: :send_data,
ms: @throttle_ms
)

{:ok, %{data: [trip]}} = MBTAV3API.Repository.trips(filter: [id: trip_id])

route_id = trip.route_id

{:ok, data} = MBTAV3API.Stream.StaticInstance.subscribe("predictions:route:#{route_id}")
if trip_id == "" do
{:error, %{code: :no_trip_id}}
else
subscribe(trip_id, socket)
end
end

data = filter_data(data, trip_id)
defp subscribe(trip_id, socket) do
pubsub_module =
Application.get_env(
:mobile_app_backend,
MobileAppBackend.Predictions.PubSub,
MobileAppBackend.Predictions.PubSub
)

{:ok, data, assign(socket, data: data, trip_id: trip_id, throttler: throttler)}
end
case :timer.tc(fn -> pubsub_module.subscribe_for_trip(trip_id) end) do
{time_micros, :error} ->
Logger.warning("#{__MODULE__} failed join duration=#{time_micros / 1000}")
{:error, %{code: :subscribe_failed}}

@impl true
def handle_info({:stream_data, "predictions:route:" <> _route_id, data}, socket) do
old_data = socket.assigns.data
new_data = filter_data(data, socket.assigns.trip_id)
{time_micros, initial_data} ->
Logger.info("#{__MODULE__} join duration=#{time_micros / 1000}")

if old_data != new_data do
MobileAppBackend.Throttler.request(socket.assigns.throttler)
{:ok, initial_data, socket}
end

socket = assign(socket, data: new_data)
{:noreply, socket}
end

@impl true
def handle_cast(:send_data, socket) do
:ok = push(socket, "stream_data", socket.assigns.data)
{:noreply, socket}
end
@spec handle_info({:new_predictions, any()}, Phoenix.Socket.t()) ::
{:noreply, Phoenix.Socket.t()}
def handle_info({:new_predictions, new_predictions_for_trip}, socket) do
{time_micros, _result} =
:timer.tc(fn ->
:ok = push(socket, "stream_data", new_predictions_for_trip)
end)

@doc """
Filters the given data to predictions that are at one of the listed stops and the associated trips and vehicles.
"""
@spec filter_data(JsonApi.Object.full_map(), String.t()) :: JsonApi.Object.full_map()
def filter_data(route_data, trip_id) do
%{predictions: predictions, vehicle_ids: vehicle_ids} =
for {_, %Prediction{} = prediction} <- route_data.predictions,
reduce: %{predictions: %{}, vehicle_ids: []} do
%{predictions: predictions, vehicle_ids: vehicle_ids} ->
if prediction.trip_id == trip_id do
%{
predictions: Map.put(predictions, prediction.id, prediction),
vehicle_ids: [prediction.vehicle_id | vehicle_ids]
}
else
%{predictions: predictions, vehicle_ids: vehicle_ids}
end
end
Logger.info("#{__MODULE__} push duration=#{time_micros / 1000}")

%{
JsonApi.Object.to_full_map([])
| predictions: predictions,
trips: Map.take(route_data.trips, [trip_id]),
vehicles: Map.take(route_data.vehicles, vehicle_ids)
}
require Logger
EmmaSimon marked this conversation as resolved.
Show resolved Hide resolved
{:noreply, socket}
end
end
1 change: 1 addition & 0 deletions lib/mobile_app_backend_web/channels/user_socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule MobileAppBackendWeb.UserSocket do
channel "predictions:stops", MobileAppBackendWeb.PredictionsForStopsChannel
channel "predictions:stops:v2:*", MobileAppBackendWeb.PredictionsForStopsV2Channel

channel "predictions:trip:v2:*", MobileAppBackendWeb.PredictionsForTripV2Channel
EmmaSimon marked this conversation as resolved.
Show resolved Hide resolved
channel "predictions:trip:*", MobileAppBackendWeb.PredictionsForTripChannel
channel "alerts", MobileAppBackendWeb.AlertsChannel
channel "vehicles:*", MobileAppBackendWeb.VehiclesForRouteChannel
Expand Down
40 changes: 40 additions & 0 deletions test/mobile_app_backend/predictions/pub_sub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,46 @@ defmodule MobileAppBackend.Predictions.PubSubTests do
end
end

describe "subscribe_for_trip/1" do
test "returns initial data for the given trip" do
prediction_1 =
build(:prediction, id: "p_1", stop_id: "12345", trip_id: "trip_1", vehicle_id: "v_1")

prediction_2 =
build(:prediction, id: "p_2", stop_id: "67890", trip_id: "trip_1", vehicle_id: "v_1")

trip_1 = build(:trip, id: "trip_1")
vehicle_1 = build(:vehicle, id: "v_1")

full_map =
JsonApi.Object.to_full_map([
prediction_1,
prediction_2,
trip_1,
vehicle_1
])

expect(PredictionsStoreMock, :fetch_with_associations, fn [trip_id: "trip_1"] ->
full_map
end)

expect(StreamSubscriberMock, :subscribe_for_trip, fn _ -> :ok end)

assert %{
predictions: %{"p_1" => prediction_1, "p_2" => prediction_2},
trips: %{"trip_1" => trip_1},
vehicles: %{"v_1" => vehicle_1}
} == PubSub.subscribe_for_trip("trip_1")
end

@tag :capture_log
test "returns an error when subscriber fails" do
expect(StreamSubscriberMock, :subscribe_for_trip, fn _ -> :error end)

assert :error == PubSub.subscribe_for_trip("trip_1")
end
end

describe "handle_info" do
setup do
_dispatched_table = :ets.new(:test_last_dispatched, [:set, :named_table])
Expand Down
34 changes: 34 additions & 0 deletions test/mobile_app_backend/predictions/stream_subscriber_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule MobileAppBackend.Predictions.StreamSubscriberTest do
alias MobileAppBackend.Predictions.StreamSubscriber
import Mox
import Test.Support.Helpers
import MobileAppBackend.Factory

describe "subscribe_for_stops/1" do
setup do
Expand Down Expand Up @@ -40,4 +41,37 @@ defmodule MobileAppBackend.Predictions.StreamSubscriberTest do
StreamSubscriber.subscribe_for_stops([1, 2])
end
end

describe "subscribe_for_trip/1" do
setup do
verify_on_exit!()

reassign_env(
:mobile_app_backend,
MobileAppBackend.GlobalDataCache.Module,
GlobalDataCacheMock
)

reassign_env(:mobile_app_backend, MBTAV3API.Stream.StaticInstance, StaticInstanceMock)
reassign_env(:mobile_app_backend, MBTAV3API.Repository, RepositoryMock)
end

test "starts streams for to the routes served at the given stops and vehicles" do
trip = build(:trip, id: "trip", route_id: "66")

RepositoryMock
|> expect(:trips, fn _, _ -> {:ok, %{data: [trip]}} end)

StaticInstanceMock
|> expect(:ensure_stream_started, fn "predictions:route:to_store:66",
include_current_data: false ->
{:ok, :no_data}
end)
|> expect(:ensure_stream_started, fn "vehicles:to_store", include_current_data: false ->
{:ok, :no_data}
end)

StreamSubscriber.subscribe_for_trip("trip")
end
end
end
Loading
Loading