Skip to content

Commit

Permalink
refactor: Replace existing channel
Browse files Browse the repository at this point in the history
  • Loading branch information
EmmaSimon committed Nov 6, 2024
1 parent 3a104b4 commit 0cc1e5d
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 385 deletions.
88 changes: 31 additions & 57 deletions lib/mobile_app_backend_web/channels/predictions_for_trip_channel.ex
Original file line number Diff line number Diff line change
@@ -1,74 +1,48 @@
defmodule MobileAppBackendWeb.PredictionsForTripChannel do
use MobileAppBackendWeb, :channel

alias MBTAV3API.JsonApi
alias MBTAV3API.Prediction

@throttle_ms 500
require Logger

@impl true
def join("predictions:trip:" <> trip_id, _payload, socket) do
{:ok, throttler} =
MobileAppBackend.Throttler.start_link(
target: self(),
cast: :send_data,
ms: @throttle_ms
)

{:ok, %{data: [trip]}} = MBTAV3API.Repository.trips(filter: [id: trip_id])

route_id = trip.route_id

{:ok, data} = MBTAV3API.Stream.StaticInstance.subscribe("predictions:route:#{route_id}")
if trip_id == "" do
{:error, %{code: :no_trip_id}}
else
subscribe(trip_id, socket)
end
end

data = filter_data(data, trip_id)
defp subscribe(trip_id, socket) do
pubsub_module =
Application.get_env(
:mobile_app_backend,
MobileAppBackend.Predictions.PubSub,
MobileAppBackend.Predictions.PubSub
)

{:ok, data, assign(socket, data: data, trip_id: trip_id, throttler: throttler)}
end
case :timer.tc(fn -> pubsub_module.subscribe_for_trip(trip_id) end) do
{time_micros, :error} ->
Logger.warning("#{__MODULE__} failed join duration=#{time_micros / 1000}")
{:error, %{code: :subscribe_failed}}

@impl true
def handle_info({:stream_data, "predictions:route:" <> _route_id, data}, socket) do
old_data = socket.assigns.data
new_data = filter_data(data, socket.assigns.trip_id)
{time_micros, initial_data} ->
Logger.info("#{__MODULE__} join duration=#{time_micros / 1000}")

if old_data != new_data do
MobileAppBackend.Throttler.request(socket.assigns.throttler)
{:ok, initial_data, socket}
end

socket = assign(socket, data: new_data)
{:noreply, socket}
end

@impl true
def handle_cast(:send_data, socket) do
:ok = push(socket, "stream_data", socket.assigns.data)
{:noreply, socket}
end
@spec handle_info({:new_predictions, any()}, Phoenix.Socket.t()) ::
{:noreply, Phoenix.Socket.t()}
def handle_info({:new_predictions, new_predictions_for_trip}, socket) do
{time_micros, _result} =
:timer.tc(fn ->
:ok = push(socket, "stream_data", new_predictions_for_trip)
end)

@doc """
Filters the given data to predictions that are at one of the listed stops and the associated trips and vehicles.
"""
@spec filter_data(JsonApi.Object.full_map(), String.t()) :: JsonApi.Object.full_map()
def filter_data(route_data, trip_id) do
%{predictions: predictions, vehicle_ids: vehicle_ids} =
for {_, %Prediction{} = prediction} <- route_data.predictions,
reduce: %{predictions: %{}, vehicle_ids: []} do
%{predictions: predictions, vehicle_ids: vehicle_ids} ->
if prediction.trip_id == trip_id do
%{
predictions: Map.put(predictions, prediction.id, prediction),
vehicle_ids: [prediction.vehicle_id | vehicle_ids]
}
else
%{predictions: predictions, vehicle_ids: vehicle_ids}
end
end
Logger.info("#{__MODULE__} push duration=#{time_micros / 1000}")

%{
JsonApi.Object.to_full_map([])
| predictions: predictions,
trips: Map.take(route_data.trips, [trip_id]),
vehicles: Map.take(route_data.vehicles, vehicle_ids)
}
require Logger
{:noreply, socket}
end
end

This file was deleted.

16 changes: 1 addition & 15 deletions test/mobile_app_backend/predictions/pub_sub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -225,22 +225,8 @@ defmodule MobileAppBackend.Predictions.PubSubTests do
} == PubSub.subscribe_for_trip("trip_1")
end

@tag :capture_log
test "returns an error when subscriber fails" do
prediction_1 =
build(:prediction, id: "p_1", stop_id: "12345", trip_id: "trip_1", vehicle_id: "v_1")

prediction_2 =
build(:prediction, id: "p_2", stop_id: "67890", trip_id: "trip_1", vehicle_id: "v_1")

vehicle_1 = build(:vehicle, id: "v_1")

full_map =
JsonApi.Object.to_full_map([
prediction_1,
prediction_2,
vehicle_1
])

expect(StreamSubscriberMock, :subscribe_for_trip, fn _ -> :error end)

assert :error == PubSub.subscribe_for_trip("trip_1")
Expand Down
Loading

0 comments on commit 0cc1e5d

Please sign in to comment.