From 5cb18b623c4342e1b81cbd18b210ceabd7e7ccf2 Mon Sep 17 00:00:00 2001 From: vegris <1cheaterok1@gmail.com> Date: Thu, 27 Oct 2022 22:18:31 +0300 Subject: [PATCH] fix: Elixir, use persistent_term x2 longer stop_times.txt load time, but x2 more requests/sec and x2 decrease in RAM usage --- trexit/lib/trexit.ex | 90 +++++++++++++++++++++++++++++--- trexit/lib/trexit/application.ex | 8 ++- trexit/lib/trexit/gtfs.ex | 14 ----- trexit/lib/trexit/gtfs/loader.ex | 75 -------------------------- trexit/lib/trexit/router.ex | 2 +- 5 files changed, 92 insertions(+), 97 deletions(-) delete mode 100644 trexit/lib/trexit/gtfs.ex delete mode 100644 trexit/lib/trexit/gtfs/loader.ex diff --git a/trexit/lib/trexit.ex b/trexit/lib/trexit.ex index 336c058..0fbd1af 100644 --- a/trexit/lib/trexit.ex +++ b/trexit/lib/trexit.ex @@ -1,9 +1,87 @@ defmodule Trexit do - @moduledoc """ - Trexit keeps the contexts that define your domain - and business logic. + alias :persistent_term, as: PersistentTerm - Contexts are also responsible for managing your data, regardless - if it comes from the database, an external API or others. - """ + require Logger + + @stop_times_key {__MODULE__, :stop_times} + @trips_key {__MODULE__, :trips} + + def schedules_for_route(route_id) do + stop_times = PersistentTerm.get(@stop_times_key) + + @trips_key + |> PersistentTerm.get() + |> Map.get(route_id, []) + |> Enum.map(fn %{trip_id: trip_id} = route -> + schedules = Map.get(stop_times, trip_id, []) + + Map.merge(route, %{route_id: route_id, schedules: schedules}) + end) + end + + def load() do + {time, _} = + :timer.tc(fn -> + get_stop_times() + end) + + Logger.info("Parsed stop_times.txt in #{time / 1000} ms") + + {time, _} = + :timer.tc(fn -> + get_trips() + end) + + Logger.info("Parsed trips.txt in #{time / 1000} ms") + end + + def unload() do + PersistentTerm.erase(@stop_times_key) + PersistentTerm.erase(@trips_key) + end + + defp get_stop_times() do + stream = + "../MBTA_GTFS/stop_times.txt" + |> File.stream!() + |> NimbleCSV.RFC4180.parse_stream(skip_headers: false) + + # assert column order + ["trip_id", "arrival_time", "departure_time", "stop_id"] ++ _ = Enum.fetch!(stream, 0) + + stream + |> Stream.drop(1) + |> Stream.map(fn [trip_id, arrival_time, departure_time, stop_id] ++ _ -> + {trip_id, + %{ + arrival_time: arrival_time, + departure_time: departure_time, + stop_id: stop_id + }} + end) + |> Enum.group_by(&elem(&1, 0), &elem(&1, 1)) + |> then(&PersistentTerm.put(@stop_times_key, &1)) + end + + defp get_trips() do + stream = + "../MBTA_GTFS/trips.txt" + |> File.stream!() + |> NimbleCSV.RFC4180.parse_stream(skip_headers: false) + + # assert column order + ["route_id", "service_id", "trip_id"] ++ _ = Enum.fetch!(stream, 0) + + stream + |> Stream.drop(1) + |> Stream.map(fn [route_id, service_id, trip_id] ++ _ -> + {route_id, + %{ + service_id: service_id, + trip_id: trip_id + }} + end) + |> Enum.group_by(&elem(&1, 0), &elem(&1, 1)) + |> then(&PersistentTerm.put(@trips_key, &1)) + end end diff --git a/trexit/lib/trexit/application.ex b/trexit/lib/trexit/application.ex index b9ff173..7748ad5 100644 --- a/trexit/lib/trexit/application.ex +++ b/trexit/lib/trexit/application.ex @@ -7,8 +7,9 @@ defmodule Trexit.Application do @impl true def start(_type, _args) do + Trexit.load() + children = [ - Trexit.GTFS.Loader, {Plug.Cowboy, scheme: :http, plug: Trexit.Router, options: [port: 4000]} ] @@ -17,4 +18,9 @@ defmodule Trexit.Application do opts = [strategy: :one_for_one, name: Trexit.Supervisor] Supervisor.start_link(children, opts) end + + @impl true + def stop(_state) do + Trexit.unload() + end end diff --git a/trexit/lib/trexit/gtfs.ex b/trexit/lib/trexit/gtfs.ex deleted file mode 100644 index 7abde05..0000000 --- a/trexit/lib/trexit/gtfs.ex +++ /dev/null @@ -1,14 +0,0 @@ -defmodule Trexit.GTFS do - def schedules_for_route(route_id) do - :trips - |> :ets.lookup(route_id) - |> Enum.map(fn {_key, %{trip_id: trip_id} = route} -> - schedules = - :stop_times - |> :ets.lookup(trip_id) - |> Enum.map(fn {_key, schedule} -> schedule end) - - Map.merge(route, %{route_id: route_id, schedules: schedules}) - end) - end -end diff --git a/trexit/lib/trexit/gtfs/loader.ex b/trexit/lib/trexit/gtfs/loader.ex deleted file mode 100644 index dedb1c0..0000000 --- a/trexit/lib/trexit/gtfs/loader.ex +++ /dev/null @@ -1,75 +0,0 @@ -defmodule Trexit.GTFS.Loader do - use GenServer - - require Logger - - def start_link(_) do - GenServer.start_link(__MODULE__, []) - end - - def init(_) do - Logger.info("starting Trexit.GTFS") - {:ok, [], {:continue, :load_gtfs}} - end - - def handle_continue(:load_gtfs, state) do - Logger.info("loading GTFS") - load() - Logger.info("finished loading GTFS") - {:noreply, state} - end - - def load() do - :ets.new(:stop_times, [:named_table, :duplicate_bag, read_concurrency: true]) - :ets.new(:trips, [:named_table, :duplicate_bag, read_concurrency: true]) - - {time, _} = - :timer.tc(fn -> - get_stop_times() - end) - - Logger.info("Parsed stop_times.txt in #{time / 1000} ms") - - {time, _} = - :timer.tc(fn -> - get_trips() - end) - - Logger.info("Parsed trips.txt in #{time / 1000} ms") - end - - defp get_stop_times() do - stream = - "../MBTA_GTFS/stop_times.txt" - |> File.stream!() - |> NimbleCSV.RFC4180.parse_stream(skip_headers: false) - - # assert column order - ["trip_id", "arrival_time", "departure_time", "stop_id"] ++ _ = Enum.fetch!(stream, 0) - - stream - |> Stream.drop(1) - |> Enum.each(fn [trip_id, arrival_time, departure_time, stop_id] ++ _ -> - :ets.insert( - :stop_times, - {trip_id, %{arrival_time: arrival_time, departure_time: departure_time, stop_id: stop_id}} - ) - end) - end - - defp get_trips() do - stream = - "../MBTA_GTFS/trips.txt" - |> File.stream!() - |> NimbleCSV.RFC4180.parse_stream(skip_headers: false) - - # assert column order - ["route_id", "service_id", "trip_id"] ++ _ = Enum.fetch!(stream, 0) - - stream - |> Stream.drop(1) - |> Enum.each(fn [route_id, service_id, trip_id] ++ _ -> - :ets.insert(:trips, {route_id, %{service_id: service_id, trip_id: trip_id}}) - end) - end -end diff --git a/trexit/lib/trexit/router.ex b/trexit/lib/trexit/router.ex index 055000a..ffe1ce4 100644 --- a/trexit/lib/trexit/router.ex +++ b/trexit/lib/trexit/router.ex @@ -19,7 +19,7 @@ defmodule Trexit.Router do get "/schedules/:route" do payload = route - |> Trexit.GTFS.schedules_for_route() + |> Trexit.schedules_for_route() |> Jsonrs.encode!(lean: true) conn