Skip to content

Commit

Permalink
Merge branch 'main' into kb-pred-read-cache
Browse files Browse the repository at this point in the history
  • Loading branch information
KaylaBrady authored Sep 26, 2024
2 parents 132ae3c + ae7991f commit 246f0db
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 32 deletions.
6 changes: 4 additions & 2 deletions lib/mbta_v3_api/alert.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ defmodule MBTAV3API.Alert do
:drawbridge_being_raised,
:electrical_work,
:fire,
:fire_department_activity,
:flooding,
:fog,
:freight_train_interference,
:hazmat_condition,
Expand Down Expand Up @@ -168,9 +170,9 @@ defmodule MBTAV3API.Alert do
%__MODULE__{
id: item.id,
active_period: Enum.map(item.attributes["active_period"], &ActivePeriod.parse/1),
cause: parse_cause(item.attributes["cause"]),
cause: parse_cause(item.attributes["cause"], :unknown_cause),
description: item.attributes["description"],
effect: parse_effect(item.attributes["effect"]),
effect: parse_effect(item.attributes["effect"], :unknown_effect),
effect_name: item.attributes["effect_name"],
header: item.attributes["header"],
informed_entity: Enum.map(item.attributes["informed_entity"], &InformedEntity.parse/1),
Expand Down
19 changes: 15 additions & 4 deletions lib/mobile_app_backend/predictions/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ defmodule MobileAppBackend.Predictions.PubSub do
"""
use GenServer
alias MobileAppBackend.GlobalDataCache
alias MBTAV3API.{JsonApi, Prediction, Stop, Store, Stream}
alias MBTAV3API.{JsonApi, Prediction, Stop, Store, Stream, Trip, Vehicle}
alias MobileAppBackend.Predictions.PubSub

@behaviour PubSub.Behaviour
Expand All @@ -50,7 +50,18 @@ defmodule MobileAppBackend.Predictions.PubSub do
from fetching predictions from the store into the format expected by subscribers.
"""
@type registry_value :: {Store.fetch_keys(), function()}
@type broadcast_message :: {:new_predictions, %{Stop.id() => JsonApi.Object.full_map()}}
@type broadcast_message ::
{:new_predictions,
%{
stop_id: Stop.id(),
predictions: %{Prediction.id() => Prediction.t()},
trips: %{Trip.id() => Trip.t()},
vehicles: %{



.id() => Vehicle.t()}
}}

@type state :: %{last_dispatched_table_name: atom()}

Expand Down Expand Up @@ -158,7 +169,7 @@ defmodule MobileAppBackend.Predictions.PubSub do
Registry.register(
MobileAppBackend.Predictions.Registry,
@fetch_registry_key,
{fetch_keys, fn data -> %{stop_id => data} end}
{fetch_keys, fn data -> Map.put(data, :stop_id, stop_id) end}
)

fetch_keys
Expand All @@ -175,7 +186,7 @@ defmodule MobileAppBackend.Predictions.PubSub do
Registry.register(
MobileAppBackend.Predictions.Registry,
@fetch_registry_key,
{fetch_keys, fn data -> %{parent_stop_id => data} end}
{fetch_keys, fn data -> Map.put(data, :stop_id, parent_stop_id) end}
)

fetch_keys
Expand Down
41 changes: 41 additions & 0 deletions lib/util.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ defmodule Util do
" end",
"end",
"",
"@spec parse_lifecycle(raw_lifecycle(), lifecycle()) :: lifecycle()",
"def parse_lifecycle(lifecycle, default) do",
" case lifecycle do",
" \\"NEW\\" -> :new",
" \\"ONGOING\\" -> :ongoing",
" \\"ONGOING_UPCOMING\\" -> :ongoing_upcoming",
" \\"UPCOMING\\" -> :upcoming",
" _ -> default",
" end",
"end",
"",
"@spec serialize_lifecycle(lifecycle()) :: raw_lifecycle()",
"def serialize_lifecycle(lifecycle) do",
" case lifecycle do",
Expand Down Expand Up @@ -56,6 +67,15 @@ defmodule Util do
" end",
"end",
"",
"@spec parse_x(raw_x(), x()) :: x()",
"def parse_x(x, default) do",
" case x do",
" 0 -> :a",
" 1 -> :b",
" _ -> default",
" end",
"end",
"",
"@spec serialize_x(x()) :: raw_x()",
"def serialize_x(x) do",
" case x do",
Expand Down Expand Up @@ -83,6 +103,15 @@ defmodule Util do
end
end
#
@spec parse_a(raw_a(), a()) :: a()
def parse_a(a, default) do
case a do
"X" -> :x
nil -> :y
_ -> default
end
end
#
@spec serialize_a(a()) :: raw_a()
def serialize_a(a) do
case a do
Expand Down Expand Up @@ -111,6 +140,7 @@ defmodule Util do
parse_fn = :"parse_#{name}"
serialize_fn = :"serialize_#{name}"
method_arg = Macro.var(name, __MODULE__)
default_arg = Macro.var(:default, __MODULE__)
raw_type = :"raw_#{name}"
raw_type_name = Macro.var(raw_type, nil)

Expand All @@ -124,6 +154,11 @@ defmodule Util do
clause
end)

parse_default_clause =
quote do
_ -> unquote(default_arg)
end

serialize_clauses =
Enum.map(values, fn {value, raw_value} ->
[{:->, _, _} = clause] =
Expand All @@ -135,6 +170,7 @@ defmodule Util do
end)

parse_body = {:case, [], [method_arg, [do: parse_clauses]]}
parse_default_body = {:case, [], [method_arg, [do: parse_clauses ++ parse_default_clause]]}
serialize_body = {:case, [], [method_arg, [do: serialize_clauses]]}

quote do
Expand All @@ -146,6 +182,11 @@ defmodule Util do
unquote(parse_body)
end

@spec unquote(parse_fn)(unquote(raw_type)(), unquote(name)()) :: unquote(name)()
def unquote(parse_fn)(unquote(method_arg), unquote(default_arg)) do
unquote(parse_default_body)
end

@spec unquote(serialize_fn)(unquote(name)()) :: unquote(raw_type)()
def unquote(serialize_fn)(unquote(method_arg)) do
unquote(serialize_body)
Expand Down
38 changes: 38 additions & 0 deletions test/mbta_v3_api/alert_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,42 @@ defmodule MBTAV3API.AlertTest do
updated_at: ~B[2024-02-12 11:49:00]
}
end

test "unexpected enum values fall back" do
assert Alert.parse(%JsonApi.Item{
id: "553407",
attributes: %{
"active_period" => [
%{"start" => "2024-02-12T11:49:00-05:00", "end" => "2024-02-12T14:26:40-05:00"}
],
"cause" => "ALIENS",
"description" => "Description",
"effect" => "TELEPORTATION",
"header" => "Header",
"informed_entity" => [
%{"activities" => ["BOARD", "EXIT", "RIDE"], "route" => "39", "route_type" => 3}
],
"lifecycle" => "NEW",
"updated_at" => "2024-02-12T11:49:00-05:00"
}
}) == %Alert{
id: "553407",
active_period: [
%Alert.ActivePeriod{start: ~B[2024-02-12 11:49:00], end: ~B[2024-02-12 14:26:40]}
],
cause: :unknown_cause,
description: "Description",
effect: :unknown_effect,
header: "Header",
informed_entity: [
%Alert.InformedEntity{
activities: [:board, :exit, :ride],
route: "39",
route_type: :bus
}
],
lifecycle: :new,
updated_at: ~B[2024-02-12 11:49:00]
}
end
end
2 changes: 1 addition & 1 deletion test/mbta_v3_api/mbta_v3_api_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule MBTAV3APITest do
use ExUnit.Case, async: true
use ExUnit.Case, async: false

import Mox
import Test.Support.Helpers
Expand Down
2 changes: 1 addition & 1 deletion test/mbta_v3_api/stream/consumer_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule MBTAV3API.Stream.ConsumerTest do
use ExUnit.Case, async: true
use ExUnit.Case, async: false

alias MBTAV3API.JsonApi
alias MBTAV3API.Route
Expand Down
32 changes: 19 additions & 13 deletions test/mobile_app_backend/predictions/pub_sub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,12 @@ defmodule MobileAppBackend.Predictions.PubSubTests do
PubSub.handle_info(:broadcast, state)

assert_receive {:new_predictions,
%{"12345" => %{predictions: predictions, trips: trips, vehicles: vehicles}}}
%{
stop_id: "12345",
predictions: predictions,
trips: trips,
vehicles: vehicles
}}

assert %{prediction_2.id => prediction_2} == predictions
assert %{trip_1.id => trip_1} == trips
Expand All @@ -256,7 +261,9 @@ defmodule MobileAppBackend.Predictions.PubSubTests do
# Sends new predictions
PubSub.handle_info(:broadcast, state)

assert_receive {:new_predictions, %{"12345" => %{predictions: predictions, trips: trips}}}
assert_receive {:new_predictions,
%{stop_id: "12345", predictions: predictions, trips: trips}}

assert %{prediction_3.id => prediction_3} == predictions
assert %{trip_1.id => trip_1} == trips
end
Expand Down Expand Up @@ -317,21 +324,19 @@ defmodule MobileAppBackend.Predictions.PubSubTests do
assert_receive {:new_predictions, new_predictions}

assert %{
"12345" => %{
predictions: %{"prediction_1" => ^prediction_1},
trips: %{"trip_1" => ^trip_1},
vehicles: %{"v_1" => ^vehicle_1}
}
stop_id: "12345",
predictions: %{"prediction_1" => ^prediction_1},
trips: %{"trip_1" => ^trip_1},
vehicles: %{"v_1" => ^vehicle_1}
} = new_predictions

assert_receive {:new_predictions, new_predictions}

assert %{
"6789" => %{
predictions: %{"prediction_2" => ^prediction_2},
trips: %{"trip_2" => ^trip_2},
vehicles: %{"v_2" => ^vehicle_2}
}
stop_id: "6789",
predictions: %{"prediction_2" => ^prediction_2},
trips: %{"trip_2" => ^trip_2},
vehicles: %{"v_2" => ^vehicle_2}
} = new_predictions
end
end
Expand All @@ -358,7 +363,8 @@ defmodule MobileAppBackend.Predictions.PubSubTests do
PubSub.subscribe_for_stop("12345")

Stream.PubSub.broadcast!("predictions:all:events", :reset_event)
assert_receive {:new_predictions, %{"12345" => ^full_map}}
assert_receive {:new_predictions, response}
assert response == Map.put(full_map, :stop_id, "12345")
end
end
end
13 changes: 8 additions & 5 deletions test/mobile_app_backend/throttler_test.exs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
defmodule MobileAppBackend.ThrottlerTest do
use ExUnit.Case, async: true
# run exclusively for more precise timing
use ExUnit.Case, async: false

alias MobileAppBackend.Throttler

@timeout 10
# since the timing may not be perfect
@window 2

setup ctx do
throttler = start_link_supervised!({Throttler, target: self(), cast: :message, ms: @timeout})
Expand Down Expand Up @@ -37,8 +40,8 @@ defmodule MobileAppBackend.ThrottlerTest do
test "casts later if last cast was recent", %{throttler: throttler} do
Throttler.request(throttler)

refute_receive {:"$gen_cast", :message}, @timeout - 1
assert_receive {:"$gen_cast", :message}, 2
refute_receive {:"$gen_cast", :message}, @timeout - @window
assert_receive {:"$gen_cast", :message}, 2 * @window
end

@tag last_cast_ms_ago: 0
Expand All @@ -47,8 +50,8 @@ defmodule MobileAppBackend.ThrottlerTest do
Throttler.request(throttler)
end

refute_receive {:"$gen_cast", :message}, @timeout - 1
assert_receive {:"$gen_cast", :message}, 2
refute_receive {:"$gen_cast", :message}, @timeout - @window
assert_receive {:"$gen_cast", :message}, 2 * @window
refute_receive {:"$gen_cast", :message}, @timeout
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,15 @@ defmodule MobileAppBackendWeb.PredictionsForStopsV2ChannelTest do
vehicle = build(:vehicle, id: "v_1")

PredictionsForStopsV2Channel.handle_info(
{:new_predictions, %{"12345" => to_full_map([prediction, trip, vehicle])}},
{:new_predictions, Map.put(to_full_map([prediction, trip, vehicle]), :stop_id, "12345")},
socket
)

assert_push "stream_data", %{
"12345" => %{
predictions: %{"prediction_1" => ^prediction},
trips: %{"trip_1" => ^trip},
vehicles: %{"v_1" => ^vehicle}
}
stop_id: "12345",
predictions: %{"prediction_1" => ^prediction},
trips: %{"trip_1" => ^trip},
vehicles: %{"v_1" => ^vehicle}
}
end
end

0 comments on commit 246f0db

Please sign in to comment.