Skip to content

Commit

Permalink
Rewrite predictions pubsub tests (#2129)
Browse files Browse the repository at this point in the history
* rewrite mostly working

* mocks and first test

* all tests passing

* tests

* cleanup and start channel tests

* start of final test file

* add other test stubs

* all tests passing

* last two tests

* add comments

* try to sleep for supervisor children check

* alphabetize type keys

* add more sleep for pubsub test

* remove process sleep hackiness

* cleanup

* move stream stopping code into the only test using it

* remove flaky test
  • Loading branch information
anthonyshull authored Jul 25, 2024
1 parent a9e240e commit 9b1b4ad
Show file tree
Hide file tree
Showing 16 changed files with 372 additions and 426 deletions.
4 changes: 4 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ config :dotcom, :repo_modules,
routes: Routes.Repo,
stops: Stops.Repo

config :dotcom, :predictions_phoenix_pub_sub, Predictions.Phoenix.PubSub
config :dotcom, :predictions_pub_sub, Predictions.PubSub
config :dotcom, :predictions_store, Predictions.Store

config :dotcom, :redis, Dotcom.Cache.Multilevel.Redis
config :dotcom, :redix, Redix
config :dotcom, :redix_pub_sub, Redix.PubSub
Expand Down
4 changes: 4 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ config :dotcom, :repo_modules,
routes: Routes.Repo.Mock,
stops: Stops.Repo.Mock

config :dotcom, :predictions_phoenix_pub_sub, Predictions.Phoenix.PubSub.Mock
config :dotcom, :predictions_pub_sub, Predictions.PubSub.Mock
config :dotcom, :predictions_store, Predictions.Store.Mock

config :dotcom, :redis, Dotcom.Redis.Mock
config :dotcom, :redix, Dotcom.Redix.Mock
config :dotcom, :redix_pub_sub, Dotcom.Redix.PubSub.Mock
Expand Down
98 changes: 22 additions & 76 deletions lib/dotcom_web/channels/predictions_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,23 @@ defmodule DotcomWeb.PredictionsChannel do

alias Routes.Route
alias Phoenix.{Channel, Socket}
alias Predictions.{Prediction, PredictionsPubSub}
alias Predictions.Prediction

@predictions_pub_sub Application.compile_env!(:dotcom, :predictions_pub_sub)

@impl Channel
@spec handle_info({:new_predictions, [Prediction.t()]}, Socket.t()) :: {:noreply, Socket.t()}
def handle_info({:new_predictions, predictions}, socket) do
:ok = push(socket, "data", %{predictions: filter_new(predictions)})

{:noreply, socket}
end

@impl Channel
@spec join(topic :: binary(), payload :: Channel.payload(), socket :: Socket.t()) ::
{:ok, %{predictions: [Prediction.t()]}, Socket.t()} | {:error, map()}
def join("predictions:" <> topic, _message, socket) do
predictions_subscribe_fn =
Application.get_env(
:dotcom,
:predictions_subscribe_fn,
&PredictionsPubSub.subscribe/1
)

case predictions_subscribe_fn.(topic) do
case @predictions_pub_sub.subscribe(topic) do
{:error, _reason} ->
{:error,
%{
Expand All @@ -36,83 +39,26 @@ defmodule DotcomWeb.PredictionsChannel do

@impl Channel
def terminate(_, socket) do
GenServer.cast(Predictions.PredictionsPubSub, {:closed_channel, socket.channel_pid})
end

@impl Channel
@spec handle_info({:new_predictions, [Prediction.t()]}, Socket.t()) :: {:noreply, Socket.t()}
def handle_info({:new_predictions, predictions}, socket) do
:ok = push(socket, "data", %{predictions: filter_new(predictions)})
{:noreply, socket}
GenServer.cast(@predictions_pub_sub, {:closed_channel, socket.channel_pid})
end

defp filter_new(predictions) do
predictions
|> Enum.reject(fn prediction ->
is_nil(prediction.trip) ||
is_skipped_or_cancelled?(prediction) ||
no_trip?(prediction) ||
no_departure_time?(prediction) ||
is_in_past?(prediction) ||
terminal_stop?(prediction)
skipped_or_cancelled?(prediction)
end)
end

# Used to filter out predictions that have an arrival time but no departure time.
# This is common when shuttles are being used at non-terminal stops.
defp no_departure_time?(prediction) do
prediction.arrival_time != nil &&
prediction.departure_time == nil
end

# Keeping this style until we change all of these.
# credo:disable-for-next-line Credo.Check.Readability.PredicateFunctionNames
defp is_skipped_or_cancelled?(prediction) do
Route.subway?(prediction.route.type, prediction.route.id) &&
prediction.schedule_relationship in [:skipped, :cancelled]
end

defp future?(%Prediction{time: %DateTime{} = dt}),
do: Util.time_is_greater_or_equal?(dt, Util.now())
defp no_trip?(prediction), do: is_nil(prediction.trip)

defp future?(%Prediction{
schedule_relationship: sr,
stop_sequence: seq,
time: nil,
trip: %Schedules.Trip{id: trip_id}
})
when sr in [:skipped, :cancelled] do
case Schedules.Repo.schedule_for_trip(trip_id, stop_sequence: seq) do
[%Schedules.Schedule{time: %DateTime{} = dt}] ->
Util.time_is_greater_or_equal?(dt, Util.now())
# Used to filter out predictions that have no departure time.
# This is common when shuttles are being used at non-terminal stops and at terminal stops.
defp no_departure_time?(prediction), do: is_nil(prediction.departure_time)

_ ->
false
end
end

defp future?(_), do: false

# Keeping this style until we change all of these.
# credo:disable-for-next-line Credo.Check.Readability.PredicateFunctionNames
defp is_in_past?(prediction), do: !future?(prediction)

defp terminal_stop?(%Prediction{
arrival_time: arrival,
departure_time: departure,
stop_sequence: seq,
trip: %Schedules.Trip{id: trip_id}
}) do
case Schedules.Repo.schedule_for_trip(trip_id, stop_sequence: seq) do
[%Schedules.Schedule{last_stop?: is_last_stop}] ->
is_last_stop

[] ->
# This is a prediction without a schedule. Predictions for terminal
# stops likely have an arrival time but no departure
arrival && is_nil(departure)

_ ->
false
end
defp skipped_or_cancelled?(prediction) do
Route.subway?(prediction.route.type, prediction.route.id) &&
prediction.schedule_relationship in [:cancelled, :skipped]
end
end
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Predictions.PredictionsPubSub do
defmodule Predictions.PubSub do
@moduledoc """
Allow channels to subscribe to prediction streams, which are collected into an
ETS table keyed by prediction ID, route ID, stop ID, direction ID, trip ID,
Expand All @@ -10,16 +10,19 @@ defmodule Predictions.PredictionsPubSub do
use GenServer

alias Predictions.{Prediction, Store, StreamSupervisor, StreamTopic}
alias Predictions.PubSub.Behaviour

@broadcast_interval_ms Application.compile_env!(:dotcom, [:predictions_broadcast_interval_ms])
@predictions_phoenix_pub_sub Application.compile_env!(:dotcom, [:predictions_phoenix_pub_sub])
@predictions_store Application.compile_env!(:dotcom, [:predictions_store])
@subscribers :prediction_subscriptions_registry

@type registry_value :: {Store.fetch_keys(), binary()}
@type broadcast_message :: {:new_predictions, [Prediction.t()]}

# Client
@behaviour Behaviour

@spec start_link() :: GenServer.on_start()
# Client
@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(opts \\ []) do
name = Keyword.get(opts, :name, __MODULE__)
Expand All @@ -31,14 +34,13 @@ defmodule Predictions.PredictionsPubSub do
)
end

@spec subscribe(String.t()) :: [Prediction.t()]
@spec subscribe(String.t(), GenServer.server()) :: [Prediction.t()] | {:error, term()}
def subscribe(topic, server \\ __MODULE__) do
@impl Behaviour
def subscribe(topic) do
case StreamTopic.new(topic) do
%StreamTopic{} = stream_topic ->
:ok = StreamTopic.start_streams(stream_topic)

{registry_key, predictions} = GenServer.call(server, {:subscribe, stream_topic})
{registry_key, predictions} = GenServer.call(__MODULE__, {:subscribe, stream_topic})

for key <- StreamTopic.registration_keys(stream_topic) do
Registry.register(@subscribers, registry_key, key)
Expand All @@ -54,9 +56,9 @@ defmodule Predictions.PredictionsPubSub do
# Server

@impl GenServer
def init(opts) do
subscribe_fn = Keyword.get(opts, :subscribe_fn, &Phoenix.PubSub.subscribe/2)
subscribe_fn.(Predictions.PubSub, "predictions")
def init(_) do
Phoenix.PubSub.subscribe(@predictions_phoenix_pub_sub, "predictions")

broadcast_timer(50)

callers = :ets.new(:callers_by_pid, [:bag])
Expand All @@ -82,7 +84,7 @@ defmodule Predictions.PredictionsPubSub do
filter_names = Enum.map(streams, &elem(&1, 1))
:ets.insert(state.callers_by_pid, Enum.map(filter_names, &{from_pid, &1}))
registry_key = self()
{:reply, {registry_key, Store.fetch(fetch_keys)}, state, :hibernate}
{:reply, {registry_key, @predictions_store.fetch(fetch_keys)}, state, :hibernate}
end

@impl GenServer
Expand Down Expand Up @@ -117,7 +119,7 @@ defmodule Predictions.PredictionsPubSub do
fn {pid, {_, _}} -> pid end
)
|> Enum.each(fn {fetch_keys, pids} ->
new_predictions = Store.fetch(fetch_keys)
new_predictions = @predictions_store.fetch(fetch_keys)
send(self(), {:dispatch, Enum.uniq(pids), fetch_keys, new_predictions})
end)
end)
Expand Down
7 changes: 7 additions & 0 deletions lib/predictions/pub_sub/behaviour.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
defmodule Predictions.PubSub.Behaviour do
@moduledoc """
Defines the behaviour for the Predictions PubSub.
"""

@callback subscribe(String.t()) :: [Prediction.t()] | {:error, term()}
end
49 changes: 20 additions & 29 deletions lib/predictions/store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,46 @@ defmodule Predictions.Store do
be retrieved using `fetch/1` for any combination of values specified of
`fetch_keys`.
"""

use GenServer

require Logger

alias Predictions.Prediction
alias Routes.Route
alias Schedules.Trip
alias Stops.Stop
alias Vehicles.Vehicle

@type fetch_keys :: [
prediction_id: Prediction.id_t(),
route: Route.id_t(),
stop: Stop.id_t(),
direction: 0 | 1,
trip: Trip.id_t(),
vehicle_id: Vehicle.id_t()
]
alias Predictions.Store.Behaviour

@behaviour Behaviour

@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

@doc "Deletes predictions associated with the input fetch keys, e.g. clear([route: 'Red', direction: 1])"
@impl Behaviour
def clear(keys) do
GenServer.cast(__MODULE__, {:remove, Enum.map(fetch(keys), & &1.id)})
end

@spec fetch(fetch_keys) :: [Prediction.t()]
@impl Behaviour
def fetch(keys) do
GenServer.call(__MODULE__, {:fetch, keys})
end

@spec update({atom, [Prediction.t()]}) :: :ok
@impl Behaviour
def update({event, predictions}) do
GenServer.cast(__MODULE__, {event, predictions})
end

@doc "Deletes predictions associated with the input fetch keys, e.g. clear([route: 'Red', direction: 1])"
@spec clear(fetch_keys) :: :ok
def clear(keys) do
GenServer.cast(__MODULE__, {:remove, Enum.map(fetch(keys), & &1.id)})
end

# Server
@impl GenServer
def init(opts) do
table = :ets.new(Keyword.get(opts, :name, __MODULE__), [:public])
def init(_) do
table = :ets.new(__MODULE__, [:public])
periodic_delete()
{:ok, table}
end

@impl true
@impl GenServer
def handle_cast({_, []}, table), do: {:noreply, table}

def handle_cast({event, predictions}, table) when event in [:add, :update] do
Expand All @@ -74,13 +65,13 @@ defmodule Predictions.Store do

def handle_cast(_, table), do: {:noreply, table}

@impl true
@impl GenServer
def handle_call({:fetch, keys}, _from, table) do
predictions = predictions_for_keys(table, keys)
{:reply, predictions, table}
end

@impl true
@impl GenServer
def handle_info(:periodic_delete, table) do
now = Util.now() |> DateTime.to_unix()

Expand All @@ -102,7 +93,7 @@ defmodule Predictions.Store do
{:noreply, table}
end

@spec predictions_for_keys(:ets.table(), fetch_keys) :: [Prediction.t()]
@spec predictions_for_keys(:ets.table(), Behaviour.fetch_keys()) :: [Prediction.t()]
defp predictions_for_keys(table, opts) do
match_pattern = {
Keyword.get(opts, :prediction_id, :_) || :_,
Expand Down
24 changes: 24 additions & 0 deletions lib/predictions/store/behaviour.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule Predictions.Store.Behaviour do
@moduledoc """
Defines the behaviour for the Predictions Store.
"""

alias Predictions.Prediction
alias Routes.Route
alias Schedules.Trip
alias Stops.Stop
alias Vehicles.Vehicle

@type fetch_keys :: [
direction: 0 | 1,
prediction_id: Prediction.id_t(),
route: Route.id_t(),
stop: Stop.id_t(),
trip: Trip.id_t(),
vehicle_id: Vehicle.id_t()
]

@callback clear(fetch_keys()) :: :ok
@callback fetch(fetch_keys()) :: [Prediction.t()]
@callback update({atom, [Prediction.t()]}) :: :ok
end
4 changes: 3 additions & 1 deletion lib/predictions/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ defmodule Predictions.Stream do
alias Phoenix.PubSub
alias Predictions.{Repo, Store, StreamParser, StreamTopic}

@predictions_phoenix_pub_sub Application.compile_env!(:dotcom, :predictions_phoenix_pub_sub)

@type event_type :: :reset | :add | :update | :remove

def start_link(opts) do
Expand Down Expand Up @@ -94,7 +96,7 @@ defmodule Predictions.Stream do
@typep broadcast_fn :: (atom, String.t(), any -> :ok | {:error, any})
@spec broadcast(broadcast_fn) :: :ok
defp broadcast(broadcast_fn) do
Predictions.PubSub
@predictions_phoenix_pub_sub
|> broadcast_fn.("predictions", :broadcast)
|> log_errors()
end
Expand Down
2 changes: 2 additions & 0 deletions lib/predictions/stream_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ defmodule Predictions.StreamSupervisor do
@moduledoc """
DynamicSupervisor managing streams of predictions from the API.
"""

use DynamicSupervisor

alias Predictions.Store
alias Predictions.StreamSupervisor.Worker
alias Predictions.StreamTopic
Expand Down
Loading

0 comments on commit 9b1b4ad

Please sign in to comment.