From e502c027114cf06cc6e6aecb70b491b849b957b6 Mon Sep 17 00:00:00 2001 From: Jon Zimbel <63608771+jzimbel-mbta@users.noreply.github.com> Date: Fri, 18 Oct 2024 10:29:56 -0400 Subject: [PATCH] feat: GTFS feed import and validation (#1013) * feat: Create tables for GTFS-static archive import (#1006) * feat: Create tables for GTFS-static archive import * Clean up comments * Fixes and adjustments to the gtfs_* migrations * Add Ecto schemas for all GTFS-imported tables * Add logic to read from GTFS archive saved at local path and import data into gtfs_* tables * Make GTFS import function more communicative * Add /import-GTFS endpoint * tweak: Use `Repo.insert_all`, defer constraint checking * Use Postgres COPY for larger tables; Undo all superfluous deferrable constraints; define and use Importable behaviour * Remove now-unused `create_deferrable` migration helper * Appease credo, except for one TODO that still needs DOing * fix: increase length to 100MB for multipart uploads in Plug.Parsers (#1014) * fix: increase length to 100MB for multipart uploads in Plug.Parsers * ci: upgrade Github Action upload-artifact to fix deprecation error * Tweak service/calendar related tables and schemas * Remove TODO comment that is done * Update test for service / calendar / calendar_dates tables * Appease credo * Keep GTFS timestamps as strings throughout; provide utility fns to convert timestamps to other representations * Use Ecto.Enum for GTFS enum fields; convert corresponding CSV fields to int before casting * Appease dialyzer * Change how int-code and string enum values are defined in the DB * Oops, forgot to commit auto-generated structure.sql in prev commit * structure.sql was out of date somehow, even though migrations were already up??? Fixed now * Update mix.lock * Simplify some `execute/2` calls in GTFS migrations * Install Oban * Add GTFS import Oban workers; Add logic for job-based GTFS import * Update/add API endpoints for async GTFS import * Remove `Expires` header from S3 put_object requests, it doesn't do what I thought it did * Add more comprehensive logging for GTFS import process * Do not defer optionally-deferrable constraints during GTFS import/validation * Set queue_target and queue_interval config for DB connection * Fix duplicate import/validation job handling * Add `/api/gtfs/check_jobs` endpoint * fix: Use correct module for checking validation jobs * Fix route --------- Co-authored-by: meagharty <149533950+meagharty@users.noreply.github.com> --- config/config.exs | 12 +- config/dev.exs | 4 +- config/prod.exs | 3 +- config/runtime.exs | 7 +- config/test.exs | 7 +- lib/arrow/application.ex | 2 + lib/arrow/gtfs.ex | 157 ++++ lib/arrow/gtfs/agency.ex | 40 + lib/arrow/gtfs/archive.ex | 145 ++++ lib/arrow/gtfs/calendar.ex | 65 ++ lib/arrow/gtfs/calendar_date.ex | 41 + lib/arrow/gtfs/checkpoint.ex | 31 + lib/arrow/gtfs/direction.ex | 45 ++ lib/arrow/gtfs/feed_info.ex | 62 ++ lib/arrow/gtfs/import_helper.ex | 183 +++++ lib/arrow/gtfs/import_worker.ex | 45 ++ lib/arrow/gtfs/importable.ex | 135 ++++ lib/arrow/gtfs/job_helper.ex | 51 ++ lib/arrow/gtfs/level.ex | 34 + lib/arrow/gtfs/line.ex | 42 + lib/arrow/gtfs/migration_helper.ex | 51 ++ lib/arrow/gtfs/route.ex | 69 ++ lib/arrow/gtfs/route_pattern.ex | 74 ++ lib/arrow/gtfs/schema.ex | 23 + lib/arrow/gtfs/service.ex | 46 ++ lib/arrow/gtfs/shape.ex | 46 ++ lib/arrow/gtfs/shape_point.ex | 59 ++ lib/arrow/gtfs/stop.ex | 86 ++ lib/arrow/gtfs/stop_time.ex | 86 ++ lib/arrow/gtfs/time_helper.ex | 58 ++ lib/arrow/gtfs/trip.ex | 116 +++ lib/arrow/gtfs/validation_worker.ex | 40 + .../controllers/api/gtfs_import_controller.ex | 251 ++++++ lib/arrow_web/endpoint.ex | 2 +- lib/arrow_web/router.ex | 11 + mix.exs | 5 +- mix.lock | 9 +- ...0240826153959_create_gtfs_tables_part1.exs | 56 ++ ...0240826154124_create_gtfs_tables_part2.exs | 68 ++ ...0240826154204_create_gtfs_tables_part3.exs | 48 ++ ...0240826154208_create_gtfs_tables_part4.exs | 58 ++ ...0240826154213_create_gtfs_tables_part5.exs | 45 ++ ...0240826154218_create_gtfs_tables_part6.exs | 29 + .../20241003182524_add_oban_jobs_table.exs | 13 + priv/repo/structure.sql | 747 +++++++++++++++++- test/arrow/gtfs/agency_test.exs | 22 + test/arrow/gtfs/import_helper_test.exs | 4 + test/arrow/gtfs/service_test.exs | 65 ++ test/arrow/gtfs/time_helper_test.exs | 4 + 49 files changed, 3282 insertions(+), 20 deletions(-) create mode 100644 lib/arrow/gtfs.ex create mode 100644 lib/arrow/gtfs/agency.ex create mode 100644 lib/arrow/gtfs/archive.ex create mode 100644 lib/arrow/gtfs/calendar.ex create mode 100644 lib/arrow/gtfs/calendar_date.ex create mode 100644 lib/arrow/gtfs/checkpoint.ex create mode 100644 lib/arrow/gtfs/direction.ex create mode 100644 lib/arrow/gtfs/feed_info.ex create mode 100644 lib/arrow/gtfs/import_helper.ex create mode 100644 lib/arrow/gtfs/import_worker.ex create mode 100644 lib/arrow/gtfs/importable.ex create mode 100644 lib/arrow/gtfs/job_helper.ex create mode 100644 lib/arrow/gtfs/level.ex create mode 100644 lib/arrow/gtfs/line.ex create mode 100644 lib/arrow/gtfs/migration_helper.ex create mode 100644 lib/arrow/gtfs/route.ex create mode 100644 lib/arrow/gtfs/route_pattern.ex create mode 100644 lib/arrow/gtfs/schema.ex create mode 100644 lib/arrow/gtfs/service.ex create mode 100644 lib/arrow/gtfs/shape.ex create mode 100644 lib/arrow/gtfs/shape_point.ex create mode 100644 lib/arrow/gtfs/stop.ex create mode 100644 lib/arrow/gtfs/stop_time.ex create mode 100644 lib/arrow/gtfs/time_helper.ex create mode 100644 lib/arrow/gtfs/trip.ex create mode 100644 lib/arrow/gtfs/validation_worker.ex create mode 100644 lib/arrow_web/controllers/api/gtfs_import_controller.ex create mode 100644 priv/repo/migrations/20240826153959_create_gtfs_tables_part1.exs create mode 100644 priv/repo/migrations/20240826154124_create_gtfs_tables_part2.exs create mode 100644 priv/repo/migrations/20240826154204_create_gtfs_tables_part3.exs create mode 100644 priv/repo/migrations/20240826154208_create_gtfs_tables_part4.exs create mode 100644 priv/repo/migrations/20240826154213_create_gtfs_tables_part5.exs create mode 100644 priv/repo/migrations/20240826154218_create_gtfs_tables_part6.exs create mode 100644 priv/repo/migrations/20241003182524_add_oban_jobs_table.exs create mode 100644 test/arrow/gtfs/agency_test.exs create mode 100644 test/arrow/gtfs/import_helper_test.exs create mode 100644 test/arrow/gtfs/service_test.exs create mode 100644 test/arrow/gtfs/time_helper_test.exs diff --git a/config/config.exs b/config/config.exs index 6d172802..04eb885b 100644 --- a/config/config.exs +++ b/config/config.exs @@ -38,7 +38,11 @@ config :arrow, shape_storage_enabled?: false, shape_storage_bucket: "mbta-arrow", shape_storage_prefix: "shape-uploads/", - shape_storage_request_fn: {ExAws, :request} + shape_storage_request_fn: {ExAws, :request}, + gtfs_archive_storage_enabled?: false, + gtfs_archive_storage_bucket: "mbta-arrow", + gtfs_archive_storage_prefix: "gtfs-archive-uploads/", + gtfs_archive_storage_request_fn: {ExAws, :request} # Configures the endpoint config :arrow, ArrowWeb.Endpoint, @@ -47,6 +51,12 @@ config :arrow, ArrowWeb.Endpoint, pubsub_server: Arrow.PubSub, live_view: [signing_salt: "35DDvOCJ"] +# Configures Oban, the job processing library +config :arrow, Oban, + engine: Oban.Engines.Basic, + queues: [default: 10, gtfs_import: 1], + repo: Arrow.Repo + config :esbuild, version: "0.17.11", default: [ diff --git a/config/dev.exs b/config/dev.exs index 8349acdb..a73f47a4 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -77,7 +77,9 @@ config :arrow, dev_routes: true # Set prefix env for s3 uploads config :arrow, shape_storage_enabled?: true, - shape_storage_prefix_env: "dev/local/" + shape_storage_prefix_env: "dev/local/", + gtfs_archive_storage_enabled?: true, + gtfs_archive_storage_prefix_env: "dev/local/" # Do not include metadata nor timestamps in development logs config :logger, :console, format: "[$level] $message\n" diff --git a/config/prod.exs b/config/prod.exs index 5ca437f7..192b959d 100644 --- a/config/prod.exs +++ b/config/prod.exs @@ -11,7 +11,8 @@ import Config # before starting your production server. config :arrow, - shape_storage_enabled?: true + shape_storage_enabled?: true, + gtfs_archive_storage_enabled?: true config :arrow, :websocket_check_origin, [ "https://*.mbta.com", diff --git a/config/runtime.exs b/config/runtime.exs index 5858f739..e5f311d1 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -87,8 +87,11 @@ if config_env() == :prod do port: port, pool_size: pool_size, # password set by `configure` callback below - configure: {Arrow.Repo, :before_connect, []} + configure: {Arrow.Repo, :before_connect, []}, + queue_target: 30_000, + queue_interval: 120_000 config :arrow, - shape_storage_prefix_env: System.get_env("S3_PREFIX") + shape_storage_prefix_env: System.get_env("S3_PREFIX"), + gtfs_archive_storage_prefix_env: System.get_env("S3_PREFIX") end diff --git a/config/test.exs b/config/test.exs index 17ce270f..47ec615d 100644 --- a/config/test.exs +++ b/config/test.exs @@ -2,7 +2,9 @@ import Config config :arrow, shape_storage_enabled?: false, - shape_storage_request_fn: {Arrow.Mock.ExAws.Request, :request} + shape_storage_request_fn: {Arrow.Mock.ExAws.Request, :request}, + gtfs_archive_storage_enabled?: false, + gtfs_archive_storage_request_fn: {Arrow.Mock.ExAws.Request, :request} # Configure your database config :arrow, Arrow.Repo, @@ -19,6 +21,9 @@ config :arrow, ArrowWeb.Endpoint, config :arrow, ArrowWeb.AuthManager, secret_key: "test key" +# Prevent Oban from running jobs and plugins during test runs +config :arrow, Oban, testing: :inline + config :ueberauth, Ueberauth, providers: [ cognito: {Arrow.Ueberauth.Strategy.Fake, []}, diff --git a/lib/arrow/application.ex b/lib/arrow/application.ex index 4f4f3f6e..d164ed1b 100644 --- a/lib/arrow/application.ex +++ b/lib/arrow/application.ex @@ -16,6 +16,8 @@ defmodule Arrow.Application do {Phoenix.PubSub, name: Arrow.PubSub}, # Start the Ecto repository Arrow.Repo, + # Start Oban, the job processing library + {Oban, Application.fetch_env!(:arrow, Oban)}, # Start the endpoint when the application starts ArrowWeb.Endpoint ] ++ diff --git a/lib/arrow/gtfs.ex b/lib/arrow/gtfs.ex new file mode 100644 index 00000000..e2bb0dd7 --- /dev/null +++ b/lib/arrow/gtfs.ex @@ -0,0 +1,157 @@ +defmodule Arrow.Gtfs do + @moduledoc """ + GTFS import logic. + """ + + require Logger + alias Arrow.Gtfs.Importable + alias Arrow.Repo + + @import_timeout_ms :timer.minutes(10) + + @doc """ + Loads a GTFS archive into Arrow's gtfs_* DB tables, + replacing the previous archive's data. + + Setting `dry_run?` true causes the transaction to be rolled back + instead of committed, even if all queries succeed. + + Returns: + + - `:ok` on successful import or dry-run, or skipped import due to unchanged version. + - `{:error, reason}` if the import or dry-run failed. + """ + @spec import(Unzip.t(), String.t(), String.t() | nil, Oban.Job.t(), boolean) :: + :ok | {:error, term} + def import(unzip, new_version, current_version, job, dry_run? \\ false) do + Logger.info("GTFS import or validation job starting #{job_logging_params(job)}") + + with :ok <- validate_required_files(unzip), + :ok <- validate_version_change(new_version, current_version) do + case import_transaction(unzip, dry_run?) do + {:ok, _} -> + Logger.info("GTFS import success #{job_logging_params(job)}") + :ok + + {:error, :dry_run_success} -> + Logger.info("GTFS validation success #{job_logging_params(job)}") + :ok + + {:error, reason} = error -> + Logger.warn( + "GTFS import or validation failed #{job_logging_params(job)} reason=#{inspect(reason)}" + ) + + error + end + else + :unchanged -> + Logger.info("GTFS import skipped due to unchanged version #{job_logging_params(job)}") + + :ok + + {:error, reason} = error -> + Logger.warn( + "GTFS import or validation failed #{job_logging_params(job)} reason=#{inspect(reason)}" + ) + + error + end + end + + defp job_logging_params(job) do + s3_object_key = + job.args + |> Map.fetch!("s3_uri") + |> URI.parse() + |> then(& &1.path) + + archive_version = Map.fetch!(job.args, "archive_version") + + "job_id=#{job.id} archive_s3_object_key=#{s3_object_key} archive_version=\"#{archive_version}\" job_worker=#{job.worker}" + end + + defp import_transaction(unzip, dry_run?) do + transaction = fn -> + _ = truncate_all() + import_all(unzip) + + if dry_run? do + # Set any deferred constraints to run now, instead of on transaction commit, + # since we don't actually commit the transaction in this case. + _ = Repo.query!("SET CONSTRAINTS ALL IMMEDIATE") + Repo.rollback(:dry_run_success) + end + end + + {elapsed_ms, result} = + fn -> Repo.transaction(transaction, timeout: @import_timeout_ms) end + |> :timer.tc(:millisecond) + + action = if dry_run?, do: "validation", else: "import" + Logger.info("GTFS archive #{action} transaction completed elapsed_ms=#{elapsed_ms}") + + result + end + + defp truncate_all do + tables = Enum.map_join(importable_schemas(), ", ", & &1.__schema__(:source)) + Repo.query!("TRUNCATE #{tables}") + end + + defp import_all(unzip) do + Enum.each(importable_schemas(), &Importable.import(&1, unzip)) + end + + defp validate_required_files(unzip) do + files = + unzip + |> Unzip.list_entries() + |> MapSet.new(& &1.file_name) + + if MapSet.subset?(required_files(), files) do + :ok + else + missing = + MapSet.difference(required_files(), files) + |> Enum.sort() + |> Enum.join(",") + + {:error, "GTFS archive is missing required file(s) missing=#{missing}"} + end + end + + @spec validate_version_change(String.t(), String.t() | nil) :: :ok | :unchanged + defp validate_version_change(new_version, current_version) + + defp validate_version_change(version, version), do: :unchanged + defp validate_version_change(_new_version, _current_version), do: :ok + + defp importable_schemas do + # Listed in the order in which they should be imported. + [ + Arrow.Gtfs.FeedInfo, + Arrow.Gtfs.Agency, + Arrow.Gtfs.Checkpoint, + Arrow.Gtfs.Level, + Arrow.Gtfs.Line, + Arrow.Gtfs.Service, + Arrow.Gtfs.Calendar, + Arrow.Gtfs.CalendarDate, + Arrow.Gtfs.Stop, + Arrow.Gtfs.Shape, + Arrow.Gtfs.ShapePoint, + Arrow.Gtfs.Route, + Arrow.Gtfs.Direction, + Arrow.Gtfs.RoutePattern, + Arrow.Gtfs.Trip, + Arrow.Gtfs.StopTime + ] + end + + defp required_files do + importable_schemas() + |> Enum.flat_map(& &1.filenames()) + |> MapSet.new() + end +end diff --git a/lib/arrow/gtfs/agency.ex b/lib/arrow/gtfs/agency.ex new file mode 100644 index 00000000..2764bb07 --- /dev/null +++ b/lib/arrow/gtfs/agency.ex @@ -0,0 +1,40 @@ +defmodule Arrow.Gtfs.Agency do + @moduledoc """ + Represents a row from agency.txt. + + Changeset is intended for use only in CSV imports-- + table contents should be considered read-only otherwise. + """ + use Arrow.Gtfs.Schema + import Ecto.Changeset + + @type t :: %__MODULE__{ + id: String.t(), + name: String.t(), + url: String.t(), + timezone: String.t(), + lang: String.t() | nil, + phone: String.t() | nil + } + + schema "gtfs_agencies" do + field :name, :string + field :url, :string + field :timezone, :string + field :lang, :string + field :phone, :string + + has_many :routes, Arrow.Gtfs.Route + end + + def changeset(agency, attrs) do + attrs = remove_table_prefix(attrs, "agency") + + agency + |> cast(attrs, ~w[id name url timezone lang phone]a) + |> validate_required(~w[id name url timezone]a) + end + + @impl Arrow.Gtfs.Importable + def filenames, do: ["agency.txt"] +end diff --git a/lib/arrow/gtfs/archive.ex b/lib/arrow/gtfs/archive.ex new file mode 100644 index 00000000..3db98e16 --- /dev/null +++ b/lib/arrow/gtfs/archive.ex @@ -0,0 +1,145 @@ +defmodule Arrow.Gtfs.Archive do + @moduledoc """ + Functions for reading, uploading, and downloading a GTFS-static zip archive. + """ + + defmodule Config do + @moduledoc false + + @type t :: %__MODULE__{ + enabled?: boolean, + bucket: String.t(), + prefix: String.t(), + prefix_env: String.t() | nil, + request_fn: {module, atom} + } + + @enforce_keys [:enabled?, :bucket, :prefix, :prefix_env, :request_fn] + defstruct @enforce_keys + + @spec get() :: t() + def get do + %__MODULE__{ + enabled?: Application.fetch_env!(:arrow, :gtfs_archive_storage_enabled?), + bucket: Application.fetch_env!(:arrow, :gtfs_archive_storage_bucket), + prefix: Application.fetch_env!(:arrow, :gtfs_archive_storage_prefix), + prefix_env: Application.get_env(:arrow, :gtfs_archive_storage_prefix_env), + request_fn: Application.fetch_env!(:arrow, :gtfs_archive_storage_request_fn) + } + end + end + + @type t :: %__MODULE__{ + data: iodata, + fd: :file.fd() + } + + @enforce_keys [:data, :fd] + defstruct @enforce_keys + + @spec from_iodata(iodata) :: t() + def from_iodata(iodata) do + {:ok, fd} = :file.open(iodata, [:ram, :read]) + %__MODULE__{data: iodata, fd: fd} + end + + @spec close(t()) :: :ok | {:error, term} + def close(%__MODULE__{} = archive) do + :file.close(archive.fd) + end + + @spec upload_to_s3(iodata) :: {:ok, s3_uri :: String.t()} | {:ok, :disabled} | {:error, term} + def upload_to_s3(zip_iodata) do + config = Config.get() + + if config.enabled? do + do_upload(zip_iodata, config) + else + {:ok, :disabled} + end + end + + defp do_upload(zip_iodata, config) do + path = get_upload_path(config) + + upload_op = + zip_iodata + |> List.wrap() + |> Stream.map(&IO.iodata_to_binary/1) + |> ExAws.S3.upload(config.bucket, path, content_type: "application/zip") + + {mod, fun} = config.request_fn + + case apply(mod, fun, [upload_op]) do + {:ok, _} -> {:ok, Path.join(["s3://", config.bucket, path])} + {:error, _} = error -> error + end + end + + def to_unzip_struct(%Unzip{} = unzip), do: {:ok, unzip} + + def to_unzip_struct(path) when is_binary(path) do + case URI.new(path) do + {:ok, %URI{scheme: "s3"} = uri} -> + unzip_s3(uri) + + {:ok, %URI{scheme: nil, path: path}} when is_binary(path) -> + unzip_local(path) + + {:error, _} = error -> + error + end + end + + defp unzip_local(path) do + with :ok <- check_file_exists(path) do + zip_file = Unzip.LocalFile.open(path) + Unzip.new(zip_file) + end + end + + defp unzip_s3(%URI{host: bucket, path: object_key}) do + config = Config.get() + {mod, fun} = config.request_fn + + get_object_op = ExAws.S3.get_object(bucket, object_key) + + case apply(mod, fun, [get_object_op]) do + {:ok, %{body: zip_data}} -> + zip_data + |> List.wrap() + |> from_iodata() + |> Unzip.new() + + {:error, _} = error -> + error + end + end + + defp check_file_exists(path) do + if File.regular?(path), + do: :ok, + else: {:error, "Path does not exist, or is a directory: '#{path}'"} + end + + defp get_upload_path(config) do + filename = "MBTA_GTFS-#{Ecto.UUID.generate()}.zip" + + [config.prefix_env, config.prefix, filename] + |> Enum.reject(&is_nil/1) + |> Path.join() + end + + defimpl Unzip.FileAccess do + def pread(archive, offset, length) do + case :file.pread(archive.fd, offset, length) do + {:ok, data} -> {:ok, IO.iodata_to_binary(data)} + other -> other + end + end + + def size(archive) do + {:ok, IO.iodata_length(archive.data)} + end + end +end diff --git a/lib/arrow/gtfs/calendar.ex b/lib/arrow/gtfs/calendar.ex new file mode 100644 index 00000000..0079a7a9 --- /dev/null +++ b/lib/arrow/gtfs/calendar.ex @@ -0,0 +1,65 @@ +defmodule Arrow.Gtfs.Calendar do + @moduledoc """ + Represents a row from calendar.txt. + + Changeset is intended for use only in CSV imports-- + table contents should be considered read-only otherwise. + """ + use Arrow.Gtfs.Schema + import Ecto.Changeset + + @primary_key false + + @type t :: %__MODULE__{ + service: Arrow.Gtfs.Service.t() | Ecto.Association.NotLoaded.t(), + monday: boolean, + tuesday: boolean, + wednesday: boolean, + thursday: boolean, + friday: boolean, + saturday: boolean, + sunday: boolean, + start_date: Date.t(), + end_date: Date.t() + } + + schema "gtfs_calendars" do + belongs_to :service, Arrow.Gtfs.Service, primary_key: true + + for day <- ~w[monday tuesday wednesday thursday friday saturday sunday]a do + field day, :boolean + end + + field :start_date, :date + field :end_date, :date + end + + def changeset(calendar, attrs) do + attrs = values_to_iso8601_datestamp(attrs, ~w[start_date end_date]) + + calendar + |> cast( + attrs, + ~w[service_id monday tuesday wednesday thursday friday saturday sunday start_date end_date]a + ) + |> validate_required( + ~w[service_id monday tuesday wednesday thursday friday saturday sunday start_date end_date]a + ) + |> assoc_constraint(:service) + |> validate_start_date_not_after_end_date() + end + + defp validate_start_date_not_after_end_date(changeset) do + start_date = fetch_field!(changeset, :start_date) + end_date = fetch_field!(changeset, :end_date) + + if Date.compare(start_date, end_date) in [:lt, :eq] do + changeset + else + add_error(changeset, :dates, "start date should not be after end date") + end + end + + @impl Arrow.Gtfs.Importable + def filenames, do: ["calendar.txt"] +end diff --git a/lib/arrow/gtfs/calendar_date.ex b/lib/arrow/gtfs/calendar_date.ex new file mode 100644 index 00000000..cd8aa6f1 --- /dev/null +++ b/lib/arrow/gtfs/calendar_date.ex @@ -0,0 +1,41 @@ +defmodule Arrow.Gtfs.CalendarDate do + @moduledoc """ + Represents a row from calendar_dates.txt. + + Changeset is intended for use only in CSV imports-- + table contents should be considered read-only otherwise. + """ + use Arrow.Gtfs.Schema + import Ecto.Changeset + + @type t :: %__MODULE__{ + service: Arrow.Gtfs.Service.t() | Ecto.Association.NotLoaded.t(), + date: Date.t(), + exception_type: atom, + holiday_name: String.t() | nil + } + + @primary_key false + + schema "gtfs_calendar_dates" do + belongs_to :service, Arrow.Gtfs.Service, primary_key: true + field :date, :date, primary_key: true + field :exception_type, Ecto.Enum, values: [added: 1, removed: 2] + field :holiday_name, :string + end + + def changeset(calendar_date, attrs) do + attrs = + attrs + |> values_to_iso8601_datestamp(~w[date]) + |> values_to_int(~w[exception_type]) + + calendar_date + |> cast(attrs, ~w[service_id date exception_type holiday_name]a) + |> validate_required(~w[service_id date exception_type]a) + |> assoc_constraint(:service) + end + + @impl Arrow.Gtfs.Importable + def filenames, do: ["calendar_dates.txt"] +end diff --git a/lib/arrow/gtfs/checkpoint.ex b/lib/arrow/gtfs/checkpoint.ex new file mode 100644 index 00000000..76e12651 --- /dev/null +++ b/lib/arrow/gtfs/checkpoint.ex @@ -0,0 +1,31 @@ +defmodule Arrow.Gtfs.Checkpoint do + @moduledoc """ + Represents a row from checkpoints.txt. + + Changeset is intended for use only in CSV imports-- + table contents should be considered read-only otherwise. + """ + use Arrow.Gtfs.Schema + import Ecto.Changeset + + @type t :: %__MODULE__{ + id: String.t(), + name: String.t() + } + + schema "gtfs_checkpoints" do + field :name, :string + has_many :stop_times, Arrow.Gtfs.StopTime + end + + def changeset(checkpoint, attrs) do + attrs = remove_table_prefix(attrs, "checkpoint") + + checkpoint + |> cast(attrs, ~w[id name]a) + |> validate_required(~w[id name]a) + end + + @impl Arrow.Gtfs.Importable + def filenames, do: ["checkpoints.txt"] +end diff --git a/lib/arrow/gtfs/direction.ex b/lib/arrow/gtfs/direction.ex new file mode 100644 index 00000000..d9513619 --- /dev/null +++ b/lib/arrow/gtfs/direction.ex @@ -0,0 +1,45 @@ +defmodule Arrow.Gtfs.Direction do + @moduledoc """ + Represents a row from directions.txt. + + Changeset is intended for use only in CSV imports-- + table contents should be considered read-only otherwise. + """ + use Arrow.Gtfs.Schema + import Ecto.Changeset + + @type t :: %__MODULE__{ + route: Arrow.Gtfs.Route.t() | Ecto.Association.NotLoaded.t(), + direction_id: 0 | 1, + desc: String.t(), + destination: String.t() + } + + @primary_key false + + schema "gtfs_directions" do + belongs_to :route, Arrow.Gtfs.Route, primary_key: true + field :direction_id, :integer, primary_key: true + field :desc, :string + field :destination, :string + end + + def changeset(direction, attrs) do + attrs = + attrs + # Taking liberties: + # `direction` is inconsistently named--the human-readable name is + # "#{table}_desc" in all other tables. + |> rename_key("direction", "desc") + |> remove_table_prefix("direction", except: ["direction_id"]) + + direction + |> cast(attrs, ~w[route_id direction_id desc destination]a) + |> validate_required(~w[route_id direction_id desc destination]a) + |> validate_inclusion(:direction_id, 0..1) + |> assoc_constraint(:route) + end + + @impl Arrow.Gtfs.Importable + def filenames, do: ["directions.txt"] +end diff --git a/lib/arrow/gtfs/feed_info.ex b/lib/arrow/gtfs/feed_info.ex new file mode 100644 index 00000000..477cbbf9 --- /dev/null +++ b/lib/arrow/gtfs/feed_info.ex @@ -0,0 +1,62 @@ +defmodule Arrow.Gtfs.FeedInfo do + @moduledoc """ + Represents a row from feed_info.txt. + + Changeset is intended for use only in CSV imports-- + table contents should be considered read-only otherwise. + """ + use Arrow.Gtfs.Schema + import Ecto.Changeset + + @type t :: %__MODULE__{ + id: String.t(), + publisher_name: String.t(), + publisher_url: String.t(), + lang: String.t(), + start_date: Date.t(), + end_date: Date.t(), + version: String.t(), + contact_email: String.t() + } + + schema "gtfs_feed_info" do + field :publisher_name, :string + field :publisher_url, :string + field :lang, :string + field :start_date, :date + field :end_date, :date + field :version, :string + field :contact_email, :string + end + + def changeset(feed_info, attrs) do + attrs = + attrs + |> remove_table_prefix("feed") + |> values_to_iso8601_datestamp(~w[start_date end_date]) + + feed_info + |> cast( + attrs, + ~w[id publisher_name publisher_url lang start_date end_date version contact_email]a + ) + |> validate_required( + ~w[id publisher_name publisher_url lang start_date end_date version contact_email]a + ) + |> validate_start_date_before_end_date() + end + + defp validate_start_date_before_end_date(changeset) do + start_date = fetch_field!(changeset, :start_date) + end_date = fetch_field!(changeset, :end_date) + + if Date.compare(start_date, end_date) == :lt do + changeset + else + add_error(changeset, :dates, "start date should be before end date") + end + end + + @impl Arrow.Gtfs.Importable + def filenames, do: ["feed_info.txt"] +end diff --git a/lib/arrow/gtfs/import_helper.ex b/lib/arrow/gtfs/import_helper.ex new file mode 100644 index 00000000..30dba7b6 --- /dev/null +++ b/lib/arrow/gtfs/import_helper.ex @@ -0,0 +1,183 @@ +defmodule Arrow.Gtfs.ImportHelper do + @moduledoc """ + Helper functions for casting GTFS feed data to Ecto-defined structs. + """ + + @type csv_row :: %{String.t() => String.t()} + + @doc """ + Removes the table name prefix commonly included on GTFS field names. + + iex> attrs = %{"stop_id" => "70027", "platform_name" => "Oak Grove", "stop_url" => "https://www.mbta.com/stops/place-north"} + iex> remove_table_prefix(attrs, "stop") + %{"id" => "70027", "platform_name" => "Oak Grove", "url" => "https://www.mbta.com/stops/place-north"} + + Pass an `:except` opt with a string or list of strings to preserve specific keys. + + iex> attrs = %{"stop_id" => "70027", "platform_name" => "Oak Grove", "stop_url" => "https://www.mbta.com/stops/place-north"} + iex> remove_table_prefix(attrs, "stop", except: ["stop_id"]) + %{"stop_id" => "70027", "platform_name" => "Oak Grove", "url" => "https://www.mbta.com/stops/place-north"} + + Careful: This function does not check if it will produce duplicate keys, + i.e., don't do this: + + remove_table_prefix(%{"x_id" => ..., "id" => ...}, "x") + """ + @spec remove_table_prefix(map, String.t(), Keyword.t()) :: map + def remove_table_prefix(attrs, prefix, opts \\ []) when is_map(attrs) do + except = List.wrap(opts[:except]) + prefix = if String.ends_with?(prefix, "_"), do: prefix, else: "#{prefix}_" + + Map.new(attrs, fn {k, v} -> + {de_prefix_key(k, prefix, except), v} + end) + end + + defp de_prefix_key(k, prefix, except) do + if k in except do + k + else + case k do + <<^prefix::binary-size(byte_size(prefix)), k::binary>> -> k + _ -> k + end + end + end + + @doc """ + Renames the given `old_key` in `map` to `new_key`, if it exists. + + Otherwise, returns the map unchanged. + + iex> rename_key(%{foo: 5}, :foo, :bar) + %{bar: 5} + + iex> rename_key(%{baz: 6}, :foo, :bar) + %{baz: 6} + """ + @spec rename_key(map, term, term) :: map + def rename_key(map, old_key, new_key) do + case Map.pop(map, old_key) do + {nil, map} -> map + {value, map} -> Map.put(map, new_key, value) + end + end + + @doc """ + Calls `String.to_integer/1` on the values of `keys` in `map`. + + This is useful for preprocessing CSV fields corresponding to `Ecto.Enum`-typed schema fields-- + `Ecto.Enum.cast/2` expects either integer or (textual) string values, but the + values for these CSV fields come in as numeric strings. + + iex> values_to_int(%{"route_type" => "1", "other" => "value"}, ["route_type"]) + %{"route_type" => 1, "other" => "value"} + + iex> values_to_int(%{"route_type" => "1", "other" => "value"}, ["route_type", "exception_type"]) + %{"route_type" => 1, "other" => "value"} + + iex> values_to_int(%{"maybe_empty" => ""}, ["maybe_empty"]) + %{"maybe_empty" => ""} + """ + @spec values_to_int(map, Enumerable.t(term)) :: map + def values_to_int(map, keys) do + Enum.reduce(keys, map, fn k, m -> + Map.replace_lazy(m, k, fn + k when byte_size(k) > 0 -> String.to_integer(k) + "" -> "" + end) + end) + end + + @doc """ + Edits the GTFS-datestamp values under `keys` in `map` to be ISO8601-compliant. + + This is useful for preprocessing CSV fields corresponding to `:date`-typed schema fields-- + Ecto's date type expects incoming strings to be in ISO8601 format. + + iex> map = %{"start_date" => "20240925", "end_date" => "20240926", "blind_date" => "", "other" => "value"} + iex> values_to_iso8601_datestamp(map, ~w[start_date end_date blind_date double_date]) + %{"start_date" => "2024-09-25", "end_date" => "2024-09-26", "blind_date" => "", "other" => "value"} + """ + @spec values_to_iso8601_datestamp(map, Enumerable.t(term)) :: map + def values_to_iso8601_datestamp(map, keys) do + Enum.reduce(keys, map, fn k, m -> + Map.replace_lazy(m, k, fn + <> -> + <> + + "" -> + "" + end) + end) + end + + @doc """ + Strips metadata and association fields from an Ecto.Schema-defined struct, so + that it contains only the fields corresponding to its source table's columns. + + (Useful for converting a schema struct to a plain map for use with + `Repo.insert_all`) + + iex> direction = %Arrow.Gtfs.Direction{ + ...> __meta__: :meta_stuff_to_be_removed, + ...> route: :association_stuff_to_be_removed, + ...> route_id: "Orange", + ...> direction_id: 0, + ...> desc: "South", + ...> destination: "Forest Hills" + ...> } + iex> schema_struct_to_map(direction) + %{ + route_id: "Orange", + direction_id: 0, + desc: "South", + destination: "Forest Hills", + } + """ + def schema_struct_to_map(%mod{} = schema_struct) do + Map.take(schema_struct, mod.__schema__(:fields)) + end + + # Maximum number of params supported by Postgres for one statement. + @max_query_params 65_535 + defp max_query_params, do: @max_query_params + + @doc """ + Chunks a list of maps such that the following holds for each chunk: + + Enum.sum(Enum.map(chunk, &map_size/1)) <= @max_query_params + + Assumes that all maps in the list are the same size as the first one. + + Example / doctest: + + # div(@max_query_params, 26) = 2_520 + iex> row = Map.new(?a..?z, &{<<&1>>, &1}) + iex> values = List.duplicate(row, 6_000) + iex> chunked = chunk_values(values) + iex> Enum.count(chunked) + 3 + iex> length(Enum.at(chunked, 0)) + 2520 + """ + @spec chunk_values(Enumerable.t(map)) :: list(list(map)) + def chunk_values(values) do + if Enum.empty?(values) do + [[]] + else + params_per_row = map_size(Enum.at(values, 0)) + rows_per_chunk = div(max_query_params(), params_per_row) + Stream.chunk_every(values, rows_per_chunk) + end + end + + @spec stream_csv_rows(Unzip.t(), String.t()) :: Enumerable.t(csv_row) + def stream_csv_rows(unzip, filename) do + unzip + |> Unzip.file_stream!(filename) + # Flatten iodata for compatibility with CSV.decode + |> Stream.flat_map(&List.flatten/1) + |> CSV.decode!(headers: true) + end +end diff --git a/lib/arrow/gtfs/import_worker.ex b/lib/arrow/gtfs/import_worker.ex new file mode 100644 index 00000000..4c29e925 --- /dev/null +++ b/lib/arrow/gtfs/import_worker.ex @@ -0,0 +1,45 @@ +defmodule Arrow.Gtfs.ImportWorker do + @moduledoc """ + Oban worker for GTFS import jobs. + """ + use Oban.Worker, + queue: :gtfs_import, + # The job is discarded after one failed attempt. + max_attempts: 1, + # In config.exs, the :gtfs_import queue is configured to run only 1 job at a time. + # + # We also enforce uniqueness by archive version--if an existing import job is already + # running or queued on the same archive version, a new job for that same version will fail. + # + # It's ok to queue up an import job for a different version than what's currently + # running/queued, though. + unique: [ + fields: [:worker, :args], + keys: [:archive_version], + states: Oban.Job.states() -- [:completed, :discarded, :cancelled] + ] + + import Ecto.Query + + @impl Oban.Worker + def perform(%Oban.Job{args: %{"s3_uri" => s3_uri, "archive_version" => new_version}} = job) do + current_version = + Arrow.Repo.one( + from info in Arrow.Gtfs.FeedInfo, where: info.id == "mbta-ma-us", select: info.version + ) + + with {:ok, unzip} <- Arrow.Gtfs.Archive.to_unzip_struct(s3_uri) do + Arrow.Gtfs.import(unzip, current_version, new_version, job) + end + end + + # A sane timeout to avoid buildup of stuck jobs. + # Jobs should take much less than an hour, generally. + @impl Oban.Worker + def timeout(_job), do: :timer.hours(1) + + @spec check_jobs(Arrow.Gtfs.JobHelper.status_filter()) :: list(map) + def check_jobs(status_filter) do + Arrow.Gtfs.JobHelper.check_jobs(__MODULE__, status_filter) + end +end diff --git a/lib/arrow/gtfs/importable.ex b/lib/arrow/gtfs/importable.ex new file mode 100644 index 00000000..ceca8b45 --- /dev/null +++ b/lib/arrow/gtfs/importable.ex @@ -0,0 +1,135 @@ +defmodule Arrow.Gtfs.Importable do + @moduledoc """ + Logic for populating a GTFS table from CSV. + + Callback modules are assumed to also be Ecto schemas. + """ + + alias Arrow.Gtfs.ImportHelper + alias Arrow.Repo + alias Ecto.Changeset + + @doc """ + CSV filename(s) to import from. + + In certain cases, this must be a list of length 1. + """ + @callback filenames :: list(String.t()) + + @doc "How to import this table's data." + @callback import(Unzip.t()) :: term + + @optional_callbacks import: 1 + + ##### + ##### + ##### + + @type csv_row :: %{String.t() => String.t()} + + @doc """ + Imports data for the given schema into the DB from a GTFS archive. + """ + @spec import(module, Unzip.t()) :: term + def import(importable, unzip) do + Code.ensure_loaded!(importable) + + if function_exported?(importable, :import, 1) do + importable.import(unzip) + else + default_import(importable, unzip) + end + end + + @doc """ + Default import implementation, used if the callback module does not define `import/1`. + """ + @spec default_import(module, Unzip.t()) :: term + def default_import(importable, unzip) do + for filename <- importable.filenames() do + unzip + |> ImportHelper.stream_csv_rows(filename) + |> cast_and_insert(importable) + end + end + + @spec cast_and_insert(Enumerable.t(csv_row()), module) :: :ok + def cast_and_insert(csv_maps, schema_mod) do + csv_maps + |> Stream.map(&cast_to_insertable(&1, schema_mod)) + |> ImportHelper.chunk_values() + |> Enum.each(&Repo.insert_all(schema_mod, &1)) + end + + @doc """ + A more efficient import method that bypasses Ecto schemas and has Postgres + parse the CSV directly, using the `COPY` command. + + Useful for large files like stop_times.txt. + + Use `:header_mappings` and `:header_order` opts to convert CSV header names to + DB column names and specify the expected order of the CSV columns, respectively. + + The CSV file's values must all be directly castable to their respective DB + types. + + Options: + + - `:header_mappings` - A `%{string => string}` map used to replace certain CSV + headers before they get streamed to Postgres. Only the headers that have + matching keys in the map will be changed--others will be left alone. + - `:header_order` - A list of strings specifying the order of the CSV's + columns. The strings should match, and include all of, the destination + table's column names. If provided, this will be used to create a column list + argument for the COPY command. If not provided, no column list will be + included and CSV column order must match that of the destination table. + """ + @spec import_using_copy(module, Unzip.t(), Keyword.t()) :: term + def import_using_copy(importable, unzip, opts \\ []) do + # The schema must be sourced from only 1 CSV file. + [filename] = importable.filenames() + csv_stream = Unzip.file_stream!(unzip, filename) + + csv_stream = + case Keyword.fetch(opts, :header_mappings) do + {:ok, mappings} -> replace_headers(csv_stream, mappings) + :error -> csv_stream + end + + column_list = + case Keyword.fetch(opts, :header_order) do + {:ok, list} -> "(#{Enum.join(list, ", ")})" + :error -> "" + end + + table = importable.__schema__(:source) + + copy_query = """ + COPY "#{table}" #{column_list} + FROM STDIN + WITH (FORMAT csv, HEADER MATCH) + """ + + Repo.transaction(fn -> + db_stream = Ecto.Adapters.SQL.stream(Repo, copy_query) + Enum.into(csv_stream, db_stream) + end) + end + + # Passes a CSV-parsed map through given schema's `changeset` function, + # then converts it back to a plain map compatible with `Repo.insert_all`. + @spec cast_to_insertable(csv_row(), module) :: %{atom => term} + defp cast_to_insertable(row, schema) do + struct(schema) + |> schema.changeset(row) + |> Changeset.apply_action!(:insert) + |> ImportHelper.schema_struct_to_map() + end + + defp replace_headers(csv_stream, mappings) do + blob_with_headers = Enum.at(csv_stream, 0) |> to_string() + adjusted = Regex.replace(~r/[^,\n]+/f, blob_with_headers, &Map.get(mappings, &1, &1)) + + Stream.concat([adjusted], Stream.drop(csv_stream, 1)) + end +end diff --git a/lib/arrow/gtfs/job_helper.ex b/lib/arrow/gtfs/job_helper.ex new file mode 100644 index 00000000..bdbe23bb --- /dev/null +++ b/lib/arrow/gtfs/job_helper.ex @@ -0,0 +1,51 @@ +defmodule Arrow.Gtfs.JobHelper do + @moduledoc """ + Utilities for Oban jobs. + """ + import Ecto.Query + + @typedoc """ + Filters for `check_jobs/2`. + + - `:all` - Do not filter results + - `:queued` - Jobs with status "scheduled" or "available" + - `:executing` - Jobs with status "executing" or "retryable" + - `:succeeded` - Jobs with status "completed" + - `:failed` - Jobs with status "discarded" + - `:cancelled` - Jobs with status "cancelled" + - `:not_done` - Jobs with status "scheduled", "available", "executing", or "retryable" + - `:done` - Jobs with status "completed", "discarded", or "cancelled" + """ + @type status_filter :: + :all | :queued | :executing | :succeeded | :failed | :cancelled | :not_done | :done + + @doc """ + Returns details about GTFS import jobs. + """ + @spec check_jobs(module, status_filter) :: list(map) + def check_jobs(worker_mod, status_filter) do + worker = inspect(worker_mod) + states = Map.fetch!(job_filters(), status_filter) + + Arrow.Repo.all(from j in Oban.Job, where: [worker: ^worker], where: j.state in ^states) + |> Enum.map( + &Map.take( + &1, + ~w[id state queue worker args errors tags attempt attempted_by max_attempts priority inserted_at scheduled_at attempted_at completed_at discarded_at cancelled_at]a + ) + ) + end + + defp job_filters do + %{ + all: Enum.map(Oban.Job.states(), &Atom.to_string/1), + queued: ~w[scheduled available], + executing: ~w[executing retryable], + succeeded: ~w[completed], + failed: ~w[discarded], + cancelled: ~w[cancelled], + not_done: ~w[scheduled available executing retryable], + done: ~w[completed discarded cancelled] + } + end +end diff --git a/lib/arrow/gtfs/level.ex b/lib/arrow/gtfs/level.ex new file mode 100644 index 00000000..b2ac4764 --- /dev/null +++ b/lib/arrow/gtfs/level.ex @@ -0,0 +1,34 @@ +defmodule Arrow.Gtfs.Level do + @moduledoc """ + Represents a row from levels.txt. + + Changeset is intended for use only in CSV imports-- + table contents should be considered read-only otherwise. + """ + use Arrow.Gtfs.Schema + import Ecto.Changeset + + @type t :: %__MODULE__{ + id: String.t(), + index: String.t(), + name: String.t() | nil + } + + schema "gtfs_levels" do + field :index, :float + field :name, :string + + has_many :stop, Arrow.Gtfs.Stop + end + + def changeset(level, attrs) do + attrs = remove_table_prefix(attrs, "level") + + level + |> cast(attrs, ~w[id index name]a) + |> validate_required(~w[id index]a) + end + + @impl Arrow.Gtfs.Importable + def filenames, do: ["levels.txt"] +end diff --git a/lib/arrow/gtfs/line.ex b/lib/arrow/gtfs/line.ex new file mode 100644 index 00000000..66dda51b --- /dev/null +++ b/lib/arrow/gtfs/line.ex @@ -0,0 +1,42 @@ +defmodule Arrow.Gtfs.Line do + @moduledoc """ + Represents a row from lines.txt. + + Changeset is intended for use only in CSV imports-- + table contents should be considered read-only otherwise. + """ + use Arrow.Gtfs.Schema + import Ecto.Changeset + + @type t :: %__MODULE__{ + id: String.t(), + short_name: String.t(), + long_name: String.t(), + desc: String.t(), + url: String.t() | nil, + color: String.t(), + text_color: String.t(), + sort_order: integer + } + + schema "gtfs_lines" do + field :short_name, :string + field :long_name, :string + field :desc, :string + field :url, :string + field :color, :string + field :text_color, :string + field :sort_order, :integer + end + + def changeset(line, attrs) do + attrs = remove_table_prefix(attrs, "line") + + line + |> cast(attrs, ~w[id short_name long_name desc url color text_color sort_order]a) + |> validate_required(~w[id long_name color text_color sort_order]a) + end + + @impl Arrow.Gtfs.Importable + def filenames, do: ["lines.txt"] +end diff --git a/lib/arrow/gtfs/migration_helper.ex b/lib/arrow/gtfs/migration_helper.ex new file mode 100644 index 00000000..21cccc37 --- /dev/null +++ b/lib/arrow/gtfs/migration_helper.ex @@ -0,0 +1,51 @@ +defmodule Arrow.Gtfs.MigrationHelper do + @moduledoc """ + Conveniences for GTFS table migrations. + """ + import Ecto.Migration + + @doc """ + Creates a CHECK constraint asserting that an integer column's value + is within a specified range. + + The following: + + create_int_code_constraint("some_table", :some_column, 1..3//1) + + adds a check that `some_column`'s value is between 1 and 3, inclusive. + + The following: + + create_int_code_constraint("some_table", :some_column, 5) + + adds a check that `some_column`'s value is between 0 and 5, inclusive. + """ + @spec create_int_code_constraint(String.t(), atom, Range.t() | non_neg_integer) :: term + def create_int_code_constraint(table, column, range_or_max) + + def create_int_code_constraint(table, column, first..last//1) do + name = :"#{column}_must_be_in_range" + create(constraint(table, name, check: check_expr(column, first, last))) + end + + def create_int_code_constraint(table, column, max) when is_integer(max) and max >= 0 do + create_int_code_constraint(table, column, 0..max//1) + end + + defp check_expr(column, first, last) do + "#{column} <@ int4range(#{first}, #{last}, '[]')" + end + + @doc """ + Creates a Postgres enum type with the given allowed string values. + """ + @spec create_enum_type(String.t(), list(String.t())) :: term + def create_enum_type(name, strings) do + values_list = Enum.map_join(strings, ",", &"'#{&1}'") + + execute( + "CREATE TYPE #{name} AS ENUM (#{values_list})", + "DROP TYPE #{name}" + ) + end +end diff --git a/lib/arrow/gtfs/route.ex b/lib/arrow/gtfs/route.ex new file mode 100644 index 00000000..22ba7772 --- /dev/null +++ b/lib/arrow/gtfs/route.ex @@ -0,0 +1,69 @@ +defmodule Arrow.Gtfs.Route do + @moduledoc """ + Represents a row from routes.txt. + + Changeset is intended for use only in CSV imports-- + table contents should be considered read-only otherwise. + """ + use Arrow.Gtfs.Schema + import Ecto.Changeset + + @type t :: %__MODULE__{ + id: String.t(), + agency: Arrow.Gtfs.Agency.t() | Ecto.Association.NotLoaded.t(), + short_name: String.t() | nil, + long_name: String.t() | nil, + desc: String.t(), + type: atom, + url: String.t() | nil, + color: String.t() | nil, + text_color: String.t() | nil, + sort_order: integer, + fare_class: String.t(), + line: Arrow.Gtfs.Line.t() | Ecto.Association.NotLoaded.t(), + listed_route: atom, + network_id: String.t() + } + + @route_type_values Enum.with_index(~w[light_rail heavy_rail commuter_rail bus ferry]a) + + schema "gtfs_routes" do + belongs_to :agency, Arrow.Gtfs.Agency + field :short_name, :string + field :long_name, :string + field :desc, :string + + field :type, Ecto.Enum, values: @route_type_values + + field :url, :string + field :color, :string + field :text_color, :string + field :sort_order, :integer + field :fare_class, :string + belongs_to :line, Arrow.Gtfs.Line + field :listed_route, Ecto.Enum, values: Enum.with_index(~w[Included Excluded]a) + field :network_id, :string + + has_many :directions, Arrow.Gtfs.Direction + has_many :trips, Arrow.Gtfs.Trip + end + + def changeset(route, attrs) do + attrs = + attrs + |> remove_table_prefix("route") + |> values_to_int(~w[type listed_route]) + + route + |> cast( + attrs, + ~w[id agency_id short_name long_name desc type url color text_color sort_order fare_class line_id listed_route network_id]a + ) + |> validate_required(~w[id agency_id desc type sort_order fare_class network_id]a) + |> assoc_constraint(:agency) + |> assoc_constraint(:line) + end + + @impl Arrow.Gtfs.Importable + def filenames, do: ["routes.txt"] +end diff --git a/lib/arrow/gtfs/route_pattern.ex b/lib/arrow/gtfs/route_pattern.ex new file mode 100644 index 00000000..c44ae493 --- /dev/null +++ b/lib/arrow/gtfs/route_pattern.ex @@ -0,0 +1,74 @@ +defmodule Arrow.Gtfs.RoutePattern do + @moduledoc """ + Represents a row from route_patterns.txt. + + Changeset is intended for use only in CSV imports-- + table contents should be considered read-only otherwise. + """ + use Arrow.Gtfs.Schema + import Ecto.Changeset + + @type t :: %__MODULE__{ + id: String.t(), + route: Arrow.Gtfs.Route.t() | Ecto.Association.NotLoaded.t(), + direction_id: 0 | 1, + directions: list(Arrow.Gtfs.Direction.t()) | Ecto.Association.NotLoaded.t(), + name: String.t(), + time_desc: String.t() | nil, + typicality: atom, + sort_order: integer, + # The Trip that exemplifies this RoutePattern. + representative_trip: Arrow.Gtfs.Trip.t() | Ecto.Association.NotLoaded.t(), + # All the Trips that use this RoutePattern. + trips: list(Arrow.Gtfs.Trip.t()) | Ecto.Association.NotLoaded.t(), + canonical: atom + } + + @typicality_values Enum.with_index( + ~w[not_defined typical deviation atypical diversion typical_but_unscheduled]a + ) + + @canonicality_values Enum.with_index( + ~w[no_canonical_patterns_defined_for_route canonical not_canonical]a + ) + + schema "gtfs_route_patterns" do + belongs_to :route, Arrow.Gtfs.Route, type: :string + field :direction_id, :integer + # I couldn't find a way to directly associate the specific Direction + # here--composite FK relations aren't supported. + # + # So as a workaround, we have all the Directions of the associated Route and + # can manually look up the one that has this RoutePattern's direction_id. + has_many :directions, through: [:route, :directions] + field :name, :string + field :time_desc, :string + field :typicality, Ecto.Enum, values: @typicality_values + field :sort_order, :integer + belongs_to :representative_trip, Arrow.Gtfs.Trip + has_many :trips, Arrow.Gtfs.Trip + field :canonical, Ecto.Enum, values: @canonicality_values + end + + def changeset(route_pattern, attrs) do + attrs = + attrs + |> remove_table_prefix("route_pattern") + |> rename_key("canonical_route_pattern", "canonical") + |> values_to_int(~w[typicality canonical]) + + route_pattern + |> cast( + attrs, + ~w[id route_id direction_id name time_desc typicality sort_order representative_trip_id canonical]a + ) + |> validate_required( + ~w[id route_id direction_id name typicality sort_order representative_trip_id canonical]a + ) + |> assoc_constraint(:route) + |> assoc_constraint(:representative_trip) + end + + @impl Arrow.Gtfs.Importable + def filenames, do: ["route_patterns.txt"] +end diff --git a/lib/arrow/gtfs/schema.ex b/lib/arrow/gtfs/schema.ex new file mode 100644 index 00000000..e8b28b95 --- /dev/null +++ b/lib/arrow/gtfs/schema.ex @@ -0,0 +1,23 @@ +defmodule Arrow.Gtfs.Schema do + @moduledoc """ + Wrapper on Ecto.Schema that adds some helpers for the particularities + of GTFS feed import. + + - Imports functions from `Arrow.Gtfs.ImportHelper` + - Sets primary key to have name `:id` and type `:string`, and not autogenerate + - Sets foreign key type to `:string` + - Marks the module as a callback module of `Arrow.Gtfs.Importable` + """ + + defmacro __using__(_) do + quote do + use Ecto.Schema + @behaviour Arrow.Gtfs.Importable + + import Arrow.Gtfs.ImportHelper + + @primary_key {:id, :string, []} + @foreign_key_type :string + end + end +end diff --git a/lib/arrow/gtfs/service.ex b/lib/arrow/gtfs/service.ex new file mode 100644 index 00000000..e50f7c45 --- /dev/null +++ b/lib/arrow/gtfs/service.ex @@ -0,0 +1,46 @@ +defmodule Arrow.Gtfs.Service do + @moduledoc """ + Represents all calendar data related to a service_id, + which may exist in one or both of calendar.txt or calendar_dates.txt. + + Changeset is intended for use only in CSV imports-- + table contents should be considered read-only otherwise. + """ + use Arrow.Gtfs.Schema + import Ecto.Changeset + + @type t :: %__MODULE__{ + calendar: Arrow.Gtfs.Calendar.t() | Ecto.Association.NotLoaded.t(), + calendar_dates: list(Arrow.Gtfs.CalendarDate.t()) | Ecto.Association.NotLoaded.t() + } + + schema "gtfs_services" do + has_one :calendar, Arrow.Gtfs.Calendar + has_many :calendar_dates, Arrow.Gtfs.CalendarDate + + has_many :trips, Arrow.Gtfs.Trip + end + + def changeset(service, attrs) do + service + |> cast(attrs, [:id]) + |> validate_required(~w[id]a) + end + + @impl Arrow.Gtfs.Importable + def filenames, do: ["calendar.txt", "calendar_dates.txt"] + + @impl Arrow.Gtfs.Importable + def import(unzip) do + # This table's IDs are the union of those found in + # calendar.txt and calendar_dates.txt. + service_rows = + filenames() + |> Enum.map(&Arrow.Gtfs.ImportHelper.stream_csv_rows(unzip, &1)) + |> Stream.concat() + |> Stream.uniq_by(& &1["service_id"]) + |> Stream.map(&%{"id" => Map.fetch!(&1, "service_id")}) + + Arrow.Gtfs.Importable.cast_and_insert(service_rows, __MODULE__) + end +end diff --git a/lib/arrow/gtfs/shape.ex b/lib/arrow/gtfs/shape.ex new file mode 100644 index 00000000..74ef8795 --- /dev/null +++ b/lib/arrow/gtfs/shape.ex @@ -0,0 +1,46 @@ +defmodule Arrow.Gtfs.Shape do + @moduledoc """ + Represents a group of rows from shapes.txt that all share a shape_id. + + A Shape contains many ShapePoints. + + Changeset is intended for use only in CSV imports-- + table contents should be considered read-only otherwise. + """ + use Arrow.Gtfs.Schema + import Ecto.Changeset + + @type t :: %__MODULE__{ + id: String.t(), + points: list(Arrow.Gtfs.ShapePoint.t()) | Ecto.Association.NotLoaded.t(), + trips: list(Arrow.Gtfs.Trip.t()) | Ecto.Association.NotLoaded.t() + } + + schema "gtfs_shapes" do + has_many :points, Arrow.Gtfs.ShapePoint + has_many :trips, Arrow.Gtfs.Trip + end + + # This shape's points should be put in a separate list and imported + # after this table is populated, so that FKs can be validated. + def changeset(shape, attrs) do + attrs = remove_table_prefix(attrs, "shape") + + shape + |> cast(attrs, ~w[id]a) + |> validate_required(~w[id]a) + end + + @impl Arrow.Gtfs.Importable + def filenames, do: ["shapes.txt"] + + @impl Arrow.Gtfs.Importable + def import(unzip) do + [filename] = filenames() + + unzip + |> Arrow.Gtfs.ImportHelper.stream_csv_rows(filename) + |> Stream.uniq_by(& &1["shape_id"]) + |> Arrow.Gtfs.Importable.cast_and_insert(__MODULE__) + end +end diff --git a/lib/arrow/gtfs/shape_point.ex b/lib/arrow/gtfs/shape_point.ex new file mode 100644 index 00000000..d88815cf --- /dev/null +++ b/lib/arrow/gtfs/shape_point.ex @@ -0,0 +1,59 @@ +defmodule Arrow.Gtfs.ShapePoint do + @moduledoc """ + Represents a row from shapes.txt. + + ShapePoints are grouped under Shapes, by their shape_id field. + + Changeset is intended for use only in CSV imports-- + table contents should be considered read-only otherwise. + """ + use Arrow.Gtfs.Schema + import Ecto.Changeset + + @type t :: %__MODULE__{ + shape: Arrow.Gtfs.Shape.t() | Ecto.Association.NotLoaded.t(), + sequence: integer, + lat: float, + lon: float, + dist_traveled: float | nil + } + + @primary_key false + + schema "gtfs_shape_points" do + belongs_to :shape, Arrow.Gtfs.Shape, primary_key: true + field :lat, :float + field :lon, :float + field :sequence, :integer, primary_key: true + field :dist_traveled, :float + end + + def changeset(shape_point, attrs) do + attrs = + attrs + |> remove_table_prefix("shape_pt") + |> remove_table_prefix("shape", except: "shape_id") + + shape_point + |> cast(attrs, ~w[shape_id sequence lat lon dist_traveled]a) + |> validate_required(~w[shape_id sequence lat lon]a) + |> assoc_constraint(:shape) + end + + @impl Arrow.Gtfs.Importable + def filenames, do: ["shapes.txt"] + + @impl Arrow.Gtfs.Importable + def import(unzip) do + Arrow.Gtfs.Importable.import_using_copy( + __MODULE__, + unzip, + header_mappings: %{ + "shape_pt_lat" => "lat", + "shape_pt_lon" => "lon", + "shape_pt_sequence" => "sequence", + "shape_dist_traveled" => "dist_traveled" + } + ) + end +end diff --git a/lib/arrow/gtfs/stop.ex b/lib/arrow/gtfs/stop.ex new file mode 100644 index 00000000..5bcf02b5 --- /dev/null +++ b/lib/arrow/gtfs/stop.ex @@ -0,0 +1,86 @@ +defmodule Arrow.Gtfs.Stop do + @moduledoc """ + Represents a row from stops.txt. + + Changeset is intended for use only in CSV imports-- + table contents should be considered read-only otherwise. + """ + use Arrow.Gtfs.Schema + import Ecto.Changeset + + @type t :: %__MODULE__{ + id: String.t(), + code: String.t() | nil, + name: String.t(), + desc: String.t() | nil, + platform_code: String.t() | nil, + platform_name: String.t() | nil, + lat: float, + lon: float, + zone_id: String.t() | nil, + address: String.t() | nil, + url: String.t() | nil, + level: Arrow.Gtfs.Level.t() | Ecto.Association.NotLoaded.t() | nil, + location_type: atom, + parent_station: t() | Ecto.Association.NotLoaded.t() | nil, + wheelchair_boarding: atom, + municipality: String.t() | nil, + on_street: String.t() | nil, + at_street: String.t() | nil, + vehicle_type: atom, + times: list(Arrow.Gtfs.StopTime.t()) | Ecto.Association.NotLoaded.t() + } + + @location_type_values Enum.with_index( + ~w[stop_platform parent_station entrance_exit generic_node boarding_area]a + ) + + @wheelchair_boarding_values Enum.with_index( + ~w[no_info_inherit_from_parent accessible not_accessible]a + ) + + @vehicle_type_values Enum.with_index(~w[light_rail heavy_rail commuter_rail bus ferry]a) + + schema "gtfs_stops" do + field :code, :string + field :name, :string + field :desc, :string + field :platform_code, :string + field :platform_name, :string + field :lat, :float + field :lon, :float + field :zone_id, :string + field :address, :string + field :url, :string + belongs_to :level, Arrow.Gtfs.Level + field :location_type, Ecto.Enum, values: @location_type_values + belongs_to :parent_station, Arrow.Gtfs.Stop + field :wheelchair_boarding, Ecto.Enum, values: @wheelchair_boarding_values + field :municipality, :string + field :on_street, :string + field :at_street, :string + field :vehicle_type, Ecto.Enum, values: @vehicle_type_values + has_many :times, Arrow.Gtfs.StopTime + end + + def changeset(stop, attrs) do + attrs = + attrs + |> remove_table_prefix("stop") + # `parent_station` is inconsistently named--this changes the key to `parent_station_id`. + |> rename_key("parent_station", "parent_station_id") + |> values_to_int(~w[location_type wheelchair_boarding vehicle_type]) + + stop + |> cast( + attrs, + ~w[id code name desc platform_code platform_name lat lon zone_id address url level_id location_type parent_station_id wheelchair_boarding municipality on_street at_street vehicle_type]a + ) + |> validate_required(~w[id name location_type wheelchair_boarding]a) + |> assoc_constraint(:level) + |> assoc_constraint(:parent_station) + end + + @impl Arrow.Gtfs.Importable + def filenames, do: ["stops.txt"] +end diff --git a/lib/arrow/gtfs/stop_time.ex b/lib/arrow/gtfs/stop_time.ex new file mode 100644 index 00000000..a994173d --- /dev/null +++ b/lib/arrow/gtfs/stop_time.ex @@ -0,0 +1,86 @@ +defmodule Arrow.Gtfs.StopTime do + @moduledoc """ + Represents a row from stop_times.txt. + + Changeset is intended for use only in CSV imports-- + table contents should be considered read-only otherwise. + """ + use Arrow.Gtfs.Schema + import Ecto.Changeset + + @type t :: %__MODULE__{ + trip: Arrow.Gtfs.Trip.t() | Ecto.Association.NotLoaded.t(), + stop_sequence: integer, + arrival_time: String.t(), + departure_time: String.t(), + stop: Arrow.Gtfs.Stop.t() | Ecto.Association.NotLoaded.t(), + stop_headsign: String.t() | nil, + pickup_type: atom, + drop_off_type: atom, + timepoint: atom | nil, + checkpoint: Arrow.Gtfs.Checkpoint.t() | Ecto.Association.NotLoaded.t() | nil, + continuous_pickup: atom | nil, + continuous_drop_off: atom | nil + } + + @pickup_drop_off_types Enum.with_index([ + :regularly_scheduled, + :not_available, + :phone_agency_to_arrange, + :coordinate_with_driver + ]) + + @continuous_pickup_drop_off_types Enum.with_index([ + :continuous, + :not_continuous, + :phone_agency_to_arrange, + :coordinate_with_driver + ]) + + @primary_key false + + schema "gtfs_stop_times" do + belongs_to :trip, Arrow.Gtfs.Trip, primary_key: true + field :stop_sequence, :integer, primary_key: true + + # arrival_time and departure_time are kept as timestamps, to preserve after-midnight times like 24:15:00. + # `Arrow.Gtfs.TimeHelper` provides utility functions for converting timestamps to other representations. + field :arrival_time, :string + field :departure_time, :string + + belongs_to :stop, Arrow.Gtfs.Stop + field :stop_headsign, :string + field :pickup_type, Ecto.Enum, values: @pickup_drop_off_types + field :drop_off_type, Ecto.Enum, values: @pickup_drop_off_types + field :timepoint, Ecto.Enum, values: Enum.with_index(~w[approximate exact]a) + belongs_to :checkpoint, Arrow.Gtfs.Checkpoint + field :continuous_pickup, Ecto.Enum, values: @continuous_pickup_drop_off_types + field :continuous_drop_off, Ecto.Enum, values: @continuous_pickup_drop_off_types + end + + def changeset(stop_time, attrs) do + attrs = + values_to_int( + attrs, + ~w[pickup_type drop_off_type timepoint continuous_pickup continuous_drop_off] + ) + + stop_time + |> cast( + attrs, + ~w[trip_id stop_sequence arrival_time departure_time stop_id stop_headsign pickup_type drop_off_type timepoint checkpoint_id continuous_pickup continuous_drop_off]a + ) + |> validate_required( + ~w[trip_id stop_sequence arrival_time departure_time stop_id pickup_type drop_off_type]a + ) + |> assoc_constraint(:trip) + |> assoc_constraint(:stop) + |> assoc_constraint(:checkpoint) + end + + @impl Arrow.Gtfs.Importable + def filenames, do: ["stop_times.txt"] + + @impl Arrow.Gtfs.Importable + def import(unzip), do: Arrow.Gtfs.Importable.import_using_copy(__MODULE__, unzip) +end diff --git a/lib/arrow/gtfs/time_helper.ex b/lib/arrow/gtfs/time_helper.ex new file mode 100644 index 00000000..3d017276 --- /dev/null +++ b/lib/arrow/gtfs/time_helper.ex @@ -0,0 +1,58 @@ +defmodule Arrow.Gtfs.TimeHelper do + @moduledoc """ + Utility functions for converting GTFS timestamps, which can go past midnight, + to other representations. + """ + + @type timestamp :: String.t() + + @minute_in_seconds 60 + @hour_in_seconds 60 * @minute_in_seconds + + @doc """ + Converts a GTFS timestamp to number of seconds after midnight on the start of the service day. + + Examples: + + iex> to_seconds_after_midnight!("08:47:30") + 31_650 + + iex> to_seconds_after_midnight!("24:15:00") + 87_300 + """ + @spec to_seconds_after_midnight!(timestamp) :: non_neg_integer + def to_seconds_after_midnight!(timestamp) do + {h, m, s} = parse_parts(timestamp) + + h * @hour_in_seconds + m * @minute_in_seconds + s + end + + @doc """ + Converts a GTFS timestamp to a map containing a Time struct and a boolean field indicating if + that time is after midnight, i.e., close to the end of the service day. + + Examples: + + iex> to_annotated_time!("08:47:30") + %{time: ~T[08:47:30], after_midnight?: false} + + iex> to_annotated_time!("24:15:00") + %{time: ~T[00:15:00], after_midnight?: true} + """ + @spec to_annotated_time!(timestamp) :: %{time: Time.t(), after_midnight?: boolean} + def to_annotated_time!(timestamp) do + {h, m, s} = parse_parts(timestamp) + after_midnight? = h >= 24 + h = if after_midnight?, do: h - 24, else: h + + %{time: Time.new!(h, m, s), after_midnight?: after_midnight?} + end + + defp parse_parts(<>) do + {h, ""} = Integer.parse(h) + {m, ""} = Integer.parse(m) + {s, ""} = Integer.parse(s) + + {h, m, s} + end +end diff --git a/lib/arrow/gtfs/trip.ex b/lib/arrow/gtfs/trip.ex new file mode 100644 index 00000000..2a47d3b7 --- /dev/null +++ b/lib/arrow/gtfs/trip.ex @@ -0,0 +1,116 @@ +defmodule Arrow.Gtfs.Trip do + @moduledoc """ + Represents a row from trips.txt. + + Changeset is intended for use only in CSV imports-- + table contents should be considered read-only otherwise. + """ + use Arrow.Gtfs.Schema + import Ecto.Changeset + + @type t :: %__MODULE__{ + id: String.t(), + route: Arrow.Gtfs.Route.t() | Ecto.Association.NotLoaded.t(), + service: Arrow.Gtfs.Service.t() | Ecto.Association.NotLoaded.t(), + headsign: String.t(), + short_name: String.t() | nil, + direction_id: 0 | 1, + directions: list(Arrow.Gtfs.Direction.t()) | Ecto.Association.NotLoaded.t(), + block_id: String.t() | nil, + shape: Arrow.Gtfs.Shape.t() | Ecto.Association.NotLoaded.t() | nil, + shape_points: list(Arrow.Gtfs.ShapePoint.t()) | Ecto.Association.NotLoaded.t() | nil, + wheelchair_accessible: atom, + route_type: atom | nil, + # The RoutePattern that this Trip follows. + route_pattern: Arrow.Gtfs.RoutePattern.t() | Ecto.Association.NotLoaded.t(), + # The RoutePattern, if any, for which this is the *representative* Trip. + representing_route_pattern: + Arrow.Gtfs.RoutePattern.t() | Ecto.Association.NotLoaded.t() | nil, + bikes_allowed: atom, + stop_times: list(Arrow.Gtfs.StopTime.t()) | Ecto.Association.NotLoaded.t() + } + + @wheelchair_accessibility_values Enum.with_index( + ~w[no_information_inherit_from_parent accessible not_accessible]a + ) + @route_type_values Enum.with_index(~w[light_rail heavy_rail commuter_rail bus ferry]a) + @bike_boarding_values Enum.with_index(~w[no_information bikes_allowed bikes_not_allowed]a) + + schema "gtfs_trips" do + belongs_to :route, Arrow.Gtfs.Route + belongs_to :service, Arrow.Gtfs.Service + field :headsign, :string + field :short_name, :string + field :direction_id, :integer + # Like RoutePattern, we're limited here by Ecto.Schema's lack of support for + # composite FKs. + # + # Same workaround: use the Trip's `direction_id` field to + # manually look up the relevant Direction from `directions`. + has_many :directions, through: [:route, :directions] + field :block_id, :string + belongs_to :shape, Arrow.Gtfs.Shape + has_many :shape_points, through: [:shape, :points] + field :wheelchair_accessible, Ecto.Enum, values: @wheelchair_accessibility_values + field :route_type, Ecto.Enum, values: @route_type_values + belongs_to :route_pattern, Arrow.Gtfs.RoutePattern + + has_one :representing_route_pattern, Arrow.Gtfs.RoutePattern, + foreign_key: :representative_trip_id + + field :bikes_allowed, Ecto.Enum, values: @bike_boarding_values + has_many :stop_times, Arrow.Gtfs.StopTime, preload_order: [:stop_sequence] + end + + def changeset(trip, attrs) do + attrs = + attrs + |> remove_table_prefix("trip") + |> values_to_int(~w[wheelchair_accessible route_type bikes_allowed]) + + trip + |> cast( + attrs, + ~w[id route_id service_id headsign short_name direction_id block_id shape_id wheelchair_accessible route_type route_pattern_id bikes_allowed]a + ) + |> validate_required( + ~w[id route_id service_id headsign direction_id wheelchair_accessible route_pattern_id bikes_allowed]a + ) + |> validate_inclusion(:direction_id, 0..1) + |> assoc_constraint(:route) + |> assoc_constraint(:service) + |> assoc_constraint(:shape) + |> assoc_constraint(:route_pattern) + end + + @impl Arrow.Gtfs.Importable + def filenames, do: ["trips.txt"] + + @impl Arrow.Gtfs.Importable + def import(unzip) do + Arrow.Gtfs.Importable.import_using_copy( + __MODULE__, + unzip, + header_mappings: %{ + "trip_id" => "id", + "trip_headsign" => "headsign", + "trip_short_name" => "short_name", + "trip_route_type" => "route_type" + }, + header_order: [ + "route_id", + "service_id", + "id", + "headsign", + "short_name", + "direction_id", + "block_id", + "shape_id", + "wheelchair_accessible", + "route_type", + "route_pattern_id", + "bikes_allowed" + ] + ) + end +end diff --git a/lib/arrow/gtfs/validation_worker.ex b/lib/arrow/gtfs/validation_worker.ex new file mode 100644 index 00000000..55cb5195 --- /dev/null +++ b/lib/arrow/gtfs/validation_worker.ex @@ -0,0 +1,40 @@ +defmodule Arrow.Gtfs.ValidationWorker do + @moduledoc """ + Oban worker for GTFS validation jobs. + """ + use Oban.Worker, + queue: :gtfs_import, + # The job is discarded after one failed attempt. + max_attempts: 1, + # In config.exs, the :gtfs_import queue is configured to run only 1 job at a time. + # + # We also enforce uniqueness by archive version--if an existing validatiopn job is already + # running or queued on the same archive version, a new job for that same version will fail. + # + # It's ok to queue up a validation job for a different version than what's currently + # running/queued, though. + unique: [ + fields: [:worker, :args], + keys: [:archive_version], + states: Oban.Job.states() -- [:completed, :discarded, :cancelled] + ] + + @impl Oban.Worker + def perform(%Oban.Job{args: %{"s3_uri" => s3_uri, "archive_version" => new_version}} = job) do + current_version = "doesn't matter for validation" + + with {:ok, unzip} <- Arrow.Gtfs.Archive.to_unzip_struct(s3_uri) do + Arrow.Gtfs.import(unzip, current_version, new_version, job, true) + end + end + + # A sane timeout to avoid buildup of stuck jobs. + # Jobs should take much less than an hour, generally. + @impl Oban.Worker + def timeout(_job), do: :timer.hours(1) + + @spec check_jobs(Arrow.Gtfs.JobHelper.status_filter()) :: list(map) + def check_jobs(status_filter) do + Arrow.Gtfs.JobHelper.check_jobs(__MODULE__, status_filter) + end +end diff --git a/lib/arrow_web/controllers/api/gtfs_import_controller.ex b/lib/arrow_web/controllers/api/gtfs_import_controller.ex new file mode 100644 index 00000000..0f8eade5 --- /dev/null +++ b/lib/arrow_web/controllers/api/gtfs_import_controller.ex @@ -0,0 +1,251 @@ +defmodule ArrowWeb.API.GtfsImportController do + use ArrowWeb, :controller + + require Logger + import Ecto.Query + + @type error_tuple :: {:error, term} | {:error, status :: atom, term} + + @doc """ + When successful, responds with 200 status + JSON body `{"id": integer}` containing the ID of the enqueued job. + + When unsuccessful, responds with non-200 status and an error message in plaintext. + """ + def enqueue_import(conn, _) do + enqueue_job(conn, Arrow.Gtfs.ImportWorker) + end + + @doc """ + When the requested job exists, responds with 200 status + JSON body `{"status": st}` where `st` is one of: + - "queued" + - "executing" + - "success" + - "failure" + - "cancelled" + + Responds with 400 status if `id` request param is missing. + + Responds with 404 status if no job exists with the requested `id`. + """ + def import_status(conn, params) do + case Map.fetch(params, "id") do + {:ok, id} -> check_status(conn, id, Arrow.Gtfs.ImportWorker, "import") + :error -> send_resp(conn, :bad_request, "missing `id` query parameter") + end + end + + @doc """ + When successful, responds with 200 status + JSON body `{"id": integer}` containing the ID of the enqueued job. + + When unsuccessful, responds with non-200 status and an error message in plaintext. + """ + def enqueue_validation(conn, _) do + enqueue_job(conn, Arrow.Gtfs.ValidationWorker) + end + + @doc """ + When the requested job exists, responds with 200 status + JSON body `{"status": st}` where `st` is one of: + - "queued" + - "executing" + - "success" + - "failure" + - "cancelled" + + Responds with 400 status if `id` request param is missing. + + Responds with 404 status if no job exists with the requested `id`. + """ + def validation_status(conn, params) do + case Map.fetch(params, "id") do + {:ok, id} -> check_status(conn, id, Arrow.Gtfs.ValidationWorker, "validation") + :error -> send_resp(conn, :bad_request, "missing `id` query parameter") + end + end + + @status_filters ~w[all queued executing succeeded failed cancelled not_done done] + + @doc """ + Responds with info about the :gtfs_import queue and its jobs, in JSON form. + + See Arrow.Gtfs.JobHelper.status_filter for available filters. + """ + def check_jobs(conn, %{"status_filter" => status_filter}) + when status_filter in @status_filters do + status_filter = String.to_existing_atom(status_filter) + + info = %{ + queue_state: Oban.check_queue(queue: :gtfs_import), + import_jobs: Arrow.Gtfs.ImportWorker.check_jobs(status_filter), + validate_jobs: Arrow.Gtfs.ValidationWorker.check_jobs(status_filter) + } + + json(conn, info) + end + + def check_jobs(conn, %{"status_filter" => other}) do + send_resp( + conn, + :bad_request, + "Unrecognized `status_filter` param: \"#{other}\". Recognized filters are: #{Enum.join(@status_filters, ", ")}" + ) + end + + def check_jobs(conn, _) do + check_jobs(conn, %{"status_filter" => "all"}) + end + + @spec to_resp({:ok, term} | error_tuple, Plug.Conn.t()) :: Plug.Conn.t() + defp to_resp(result, conn) do + case result do + {:ok, value} -> + json(conn, value) + + {:error, status, message} -> + Logger.warn("GtfsImportController unsuccessful request message=#{inspect(message)}") + send_resp(conn, status, message) + + {:error, message} -> + to_resp({:error, :bad_request, message}, conn) + end + end + + @spec enqueue_job(Plug.Conn.t(), module) :: Plug.Conn.t() + defp enqueue_job(conn, worker_mod) do + with :ok <- validate_zip_file(conn), + {:ok, zip_iodata, conn} <- read_whole_body(conn), + {:ok, version} <- get_version(zip_iodata), + {:ok, s3_uri} <- upload_zip(zip_iodata) do + changeset = worker_mod.new(%{s3_uri: s3_uri, archive_version: version}) + + case Oban.insert(changeset) do + {:ok, %Oban.Job{conflict?: true} = job} -> + {:error, + "tried to insert a duplicate GTFS import or validation job existing_job_id=#{job.id} archive_version=\"#{version}\" worker=#{inspect(worker_mod)}"} + + {:ok, job} -> + Logger.info( + "job enqueued for GTFS archive job_id=#{job.id} archive_version=\"#{version}\" worker=#{inspect(worker_mod)}" + ) + + {:ok, %{id: job.id}} + + {:error, reason} -> + {:error, :internal_server_error, "failed to enqueue job, reason: #{reason}"} + end + |> to_resp(conn) + else + # Returned when `read_whole_body` fails + {:error, reason, %Plug.Conn{} = conn} -> to_resp({:error, reason}, conn) + error -> to_resp(error, conn) + end + end + + @spec check_status(Plug.Conn.t(), String.t(), module, String.t()) :: Plug.Conn.t() + defp check_status(conn, id, worker_mod, job_description) do + worker_name = inspect(worker_mod) + + with {:ok, id} <- parse_job_id(id) do + job_status = + Arrow.Repo.one( + from job in Oban.Job, + where: job.id == ^id, + where: job.worker == ^worker_name, + select: job.state + ) + + report_job_status(job_status, "could not find #{job_description} job with id #{id}") + end + |> to_resp(conn) + end + + @spec report_job_status(String.t() | nil, String.t()) :: {:ok, term} | error_tuple + defp report_job_status(job_status, not_found_message) do + case job_status do + nil -> {:error, :not_found, not_found_message} + queued when queued in ~w[scheduled available] -> {:ok, %{status: :queued}} + executing when executing in ~w[executing retryable] -> {:ok, %{status: :executing}} + "completed" -> {:ok, %{status: :success}} + "discarded" -> {:ok, %{status: :failure}} + "cancelled" -> {:ok, %{status: :cancelled}} + end + end + + @spec validate_zip_file(Plug.Conn.t()) :: :ok | error_tuple + defp validate_zip_file(conn) do + case get_req_header(conn, "content-type") do + ["application/zip"] -> + :ok + + [] -> + {:error, "missing content-type header"} + + [other] -> + {:error, "expected content-type of application/zip, got: #{other}"} + + others -> + {:error, "expected a single content-type header, got multiple: #{inspect(others)}"} + end + end + + @spec get_version(iodata) :: {:ok, String.t()} | error_tuple + defp get_version(zip_iodata) do + with {:ok, unzip} <- get_unzip(zip_iodata) do + unzip + |> Arrow.Gtfs.ImportHelper.stream_csv_rows("feed_info.txt") + |> Enum.at(0, %{}) + |> Map.fetch("feed_version") + |> case do + {:ok, _version} = success -> success + :error -> {:error, "feed_info.txt is missing or empty"} + end + end + end + + @spec get_unzip(iodata) :: {:ok, Unzip.t()} | error_tuple + defp get_unzip(zip_iodata) do + zip_iodata + |> Arrow.Gtfs.Archive.from_iodata() + |> Unzip.new() + |> case do + {:ok, _} = success -> success + {:error, reason} -> {:error, "could not read zip file, reason: #{inspect(reason)}"} + end + end + + @spec read_whole_body(Plug.Conn.t()) :: + {:ok, iodata, Plug.Conn.t()} | {:error, String.t(), Plug.Conn.t()} + defp read_whole_body(conn, acc \\ []) do + case read_body(conn) do + {:more, chunk, conn} -> + read_whole_body(conn, [acc, chunk]) + + {:ok, chunk, conn} -> + {:ok, [acc, chunk], conn} + + {:error, reason} -> + {:error, "could not read request body, reason: #{inspect(reason)}", conn} + end + end + + @spec upload_zip(iodata) :: {:ok, String.t()} | error_tuple + defp upload_zip(zip_iodata) do + case Arrow.Gtfs.Archive.upload_to_s3(zip_iodata) do + {:ok, _s3_uri} = success -> + success + + {:error, reason} -> + {:error, :internal_server_error, + "failed to upload archive to S3, reason: #{inspect(reason)}"} + end + end + + @spec parse_job_id(integer | String.t()) :: {:ok, integer} | error_tuple + defp parse_job_id(id) when is_integer(id), do: {:ok, id} + + defp parse_job_id(string) when is_binary(string) do + case Integer.parse(string) do + {id, ""} -> {:ok, id} + _ -> {:error, "id must be an integer or an integer-parsble string, got: \"#{string}\""} + end + end +end diff --git a/lib/arrow_web/endpoint.ex b/lib/arrow_web/endpoint.ex index ca25ccc4..d1047685 100644 --- a/lib/arrow_web/endpoint.ex +++ b/lib/arrow_web/endpoint.ex @@ -45,7 +45,7 @@ defmodule ArrowWeb.Endpoint do plug Plug.Telemetry, event_prefix: [:phoenix, :endpoint] plug Plug.Parsers, - parsers: [:urlencoded, :multipart, :json], + parsers: [:urlencoded, {:multipart, length: 100_000_000}, :json], pass: ["*/*"], json_decoder: Phoenix.json_library() diff --git a/lib/arrow_web/router.ex b/lib/arrow_web/router.ex index 9a4ac748..cf8183c6 100644 --- a/lib/arrow_web/router.ex +++ b/lib/arrow_web/router.ex @@ -1,4 +1,5 @@ defmodule ArrowWeb.Router do + alias ArrowWeb.API.GtfsImportController use ArrowWeb, :router import Phoenix.LiveDashboard.Router @@ -85,6 +86,16 @@ defmodule ArrowWeb.Router do post("/publish_notice", NoticeController, :publish) get("/db_dump", DBDumpController, :show) + + scope "/gtfs", alias: false do + post("/import", GtfsImportController, :enqueue_import) + get("/import/:id/status", GtfsImportController, :import_status) + + post("/validate", GtfsImportController, :enqueue_validation) + get("/validate/:id/status", GtfsImportController, :validation_status) + + get("/check_jobs", GtfsImportController, :check_jobs) + end end # Other scopes may use custom stacks. diff --git a/mix.exs b/mix.exs index 330ed43a..9a535845 100644 --- a/mix.exs +++ b/mix.exs @@ -45,6 +45,7 @@ defmodule Arrow.MixProject do def deps do [ {:credo, "~> 1.7", only: [:dev, :test], runtime: false}, + {:csv, "~> 3.2"}, {:dialyxir, "~> 1.1", only: [:dev], runtime: false}, {:ecto_sql, "~> 3.11"}, {:ecto_psql_extras, "~> 0.6"}, @@ -64,6 +65,7 @@ defmodule Arrow.MixProject do {:jason, "~> 1.0"}, {:lcov_ex, "~> 0.2", only: [:dev, :test], runtime: false}, {:mox, "~> 1.0.0", only: :test}, + {:oban, "~> 2.18"}, {:phoenix_ecto, "~> 4.0"}, {:phoenix_live_reload, "~> 1.5", only: :dev}, {:phoenix_html, "~> 3.0"}, @@ -91,7 +93,8 @@ defmodule Arrow.MixProject do app: false, compile: false, depth: 1}, - {:sax_map, "~> 1.2"} + {:sax_map, "~> 1.2"}, + {:unzip, "~> 0.12.0"} ] end diff --git a/mix.lock b/mix.lock index 8022387f..ca615451 100644 --- a/mix.lock +++ b/mix.lock @@ -2,11 +2,11 @@ "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, "castore": {:hex, :castore, "1.0.7", "b651241514e5f6956028147fe6637f7ac13802537e895a724f90bf3e36ddd1dd", [:mix], [], "hexpm", "da7785a4b0d2a021cd1292a60875a784b6caef71e76bf4917bdee1f390455cf5"}, "certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"}, - "connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"}, "cowboy": {:hex, :cowboy, "2.12.0", "f276d521a1ff88b2b9b4c54d0e753da6c66dd7be6c9fca3d9418b561828a3731", [:make, :rebar3], [{:cowlib, "2.13.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "8a7abe6d183372ceb21caa2709bec928ab2b72e18a3911aa1771639bef82651e"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, "cowlib": {:hex, :cowlib, "2.13.0", "db8f7505d8332d98ef50a3ef34b34c1afddec7506e4ee4dd4a3a266285d282ca", [:make, :rebar3], [], "hexpm", "e1e1284dc3fc030a64b1ad0d8382ae7e99da46c3246b815318a4b848873800a4"}, "credo": {:hex, :credo, "1.7.6", "b8f14011a5443f2839b04def0b252300842ce7388f3af177157c86da18dfbeea", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "146f347fb9f8cbc5f7e39e3f22f70acbef51d441baa6d10169dd604bfbc55296"}, + "csv": {:hex, :csv, "3.2.1", "6d401f1ed33acb2627682a9ab6021e96d33ca6c1c6bccc243d8f7e2197d032f5", [:mix], [], "hexpm", "8f55a0524923ae49e97ff2642122a2ce7c61e159e7fe1184670b2ce847aee6c8"}, "db_connection": {:hex, :db_connection, "2.6.0", "77d835c472b5b67fc4f29556dee74bf511bbafecdcaf98c27d27fa5918152086", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c2f992d15725e721ec7fbc1189d4ecdb8afef76648c746a8e1cad35e3b8a35f3"}, "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, "dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"}, @@ -25,7 +25,6 @@ "floki": {:hex, :floki, "0.36.2", "a7da0193538c93f937714a6704369711998a51a6164a222d710ebd54020aa7a3", [:mix], [], "hexpm", "a8766c0bc92f074e5cb36c4f9961982eda84c5d2b8e979ca67f5c268ec8ed580"}, "gettext": {:hex, :gettext, "0.21.0", "15bbceb20b317b706a8041061a08e858b5a189654128618b53746bf36c84352b", [:mix], [{:expo, "~> 0.1.0", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "04a66db4103b6d1d18f92240bb2c73167b517229316b7bef84e4eebbfb2f14f6"}, "guardian": {:hex, :guardian, "2.3.1", "2b2d78dc399a7df182d739ddc0e566d88723299bfac20be36255e2d052fd215d", [:mix], [{:jose, "~> 1.8", [hex: :jose, repo: "hexpm", optional: false]}, {:plug, "~> 1.3.3 or ~> 1.4", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "bbe241f9ca1b09fad916ad42d6049d2600bbc688aba5b3c4a6c82592a54274c3"}, - "guardian_phoenix": {:hex, :guardian_phoenix, "2.0.1", "89a817265af09a6ddf7cb1e77f17ffca90cea2db10ff888375ef34502b2731b1", [:mix], [{:guardian, "~> 2.0", [hex: :guardian, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.3", [hex: :phoenix, repo: "hexpm", optional: false]}], "hexpm", "21f439246715192b231f228680465d1ed5fbdf01555a4a3b17165532f5f9a08c"}, "hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"}, "heroicons": {:git, "https://github.com/tailwindlabs/heroicons.git", "88ab3a0d790e6a47404cba02800a6b25d2afae50", [tag: "v2.1.1", sparse: "optimized"]}, "httpoison": {:hex, :httpoison, "1.8.2", "9eb9c63ae289296a544842ef816a85d881d4a31f518a0fec089aaa744beae290", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "2bb350d26972e30c96e2ca74a1aaf8293d61d0742ff17f01e0279fef11599921"}, @@ -39,6 +38,7 @@ "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, "mox": {:hex, :mox, "1.0.1", "b651bf0113265cda0ba3a827fcb691f848b683c373b77e7d7439910a8d754d6e", [:mix], [], "hexpm", "35bc0dea5499d18db4ef7fe4360067a59b06c74376eb6ab3bd67e6295b133469"}, + "oban": {:hex, :oban, "2.18.3", "1608c04f8856c108555c379f2f56bc0759149d35fa9d3b825cb8a6769f8ae926", [:mix], [{:ecto_sql, "~> 3.10", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:ecto_sqlite3, "~> 0.9", [hex: :ecto_sqlite3, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.16", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "36ca6ca84ef6518f9c2c759ea88efd438a3c81d667ba23b02b062a0aa785475e"}, "oidcc": {:hex, :oidcc, "3.2.0", "f80a4826a946ce07dc8cbd8212392b4ff436ae3c4b4cd6680fa0d84d0ff2fec1", [:mix, :rebar3], [{:jose, "~> 1.11", [hex: :jose, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.2", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_registry, "~> 0.3.1", [hex: :telemetry_registry, repo: "hexpm", optional: false]}], "hexpm", "38fd9092ab5d5d10c71b8011b019316411afe466bef07ba57f57ec3f919278c3"}, "parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"}, "phoenix": {:hex, :phoenix, "1.7.12", "1cc589e0eab99f593a8aa38ec45f15d25297dd6187ee801c8de8947090b5a9d3", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "d646192fbade9f485b01bc9920c139bfdd19d0f8df3d73fd8eaf2dfbe0d2837c"}, @@ -50,7 +50,6 @@ "phoenix_live_view": {:hex, :phoenix_live_view, "0.20.14", "70fa101aa0539e81bed4238777498f6215e9dda3461bdaa067cad6908110c364", [:mix], [{:floki, "~> 0.36", [hex: :floki, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix, "~> 1.6.15 or ~> 1.7.0", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 3.3 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.15", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.2 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "82f6d006c5264f979ed5eb75593d808bbe39020f20df2e78426f4f2d570e2402"}, "phoenix_pubsub": {:hex, :phoenix_pubsub, "2.1.3", "3168d78ba41835aecad272d5e8cd51aa87a7ac9eb836eabc42f6e57538e3731d", [:mix], [], "hexpm", "bba06bc1dcfd8cb086759f0edc94a8ba2bc8896d5331a1e2c2902bf8e36ee502"}, "phoenix_template": {:hex, :phoenix_template, "1.0.4", "e2092c132f3b5e5b2d49c96695342eb36d0ed514c5b252a77048d5969330d639", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "2c0c81f0e5c6753faf5cca2f229c9709919aba34fab866d3bc05060c9c444206"}, - "phoenix_view": {:hex, :phoenix_view, "2.0.3", "4d32c4817fce933693741deeb99ef1392619f942633dde834a5163124813aad3", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}], "hexpm", "cd34049af41be2c627df99cd4eaa71fc52a328c0c3d8e7d4aa28f880c30e7f64"}, "plug": {:hex, :plug, "1.16.0", "1d07d50cb9bb05097fdf187b31cf087c7297aafc3fed8299aac79c128a707e47", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cbf53aa1f5c4d758a7559c0bd6d59e286c2be0c6a1fac8cc3eee2f638243b93e"}, "plug_cowboy": {:hex, :plug_cowboy, "2.7.1", "87677ffe3b765bc96a89be7960f81703223fe2e21efa42c125fcd0127dd9d6b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "02dbd5f9ab571b864ae39418db7811618506256f6d13b4a45037e5fe78dc5de3"}, "plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"}, @@ -66,8 +65,7 @@ "table_rex": {:hex, :table_rex, "4.0.0", "3c613a68ebdc6d4d1e731bc973c233500974ec3993c99fcdabb210407b90959b", [:mix], [], "hexpm", "c35c4d5612ca49ebb0344ea10387da4d2afe278387d4019e4d8111e815df8f55"}, "tailwind": {:hex, :tailwind, "0.2.2", "9e27288b568ede1d88517e8c61259bc214a12d7eed271e102db4c93fcca9b2cd", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}], "hexpm", "ccfb5025179ea307f7f899d1bb3905cd0ac9f687ed77feebc8f67bdca78565c4"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, - "telemetry_metrics": {:hex, :telemetry_metrics, "0.6.1", "315d9163a1d4660aedc3fee73f33f1d355dcc76c5c3ab3d59e76e3edf80eef1f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7be9e0871c41732c233be71e4be11b96e56177bf15dde64a8ac9ce72ac9834c6"}, - "telemetry_poller": {:hex, :telemetry_poller, "0.5.1", "21071cc2e536810bac5628b935521ff3e28f0303e770951158c73eaaa01e962a", [:rebar3], [{:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4cab72069210bc6e7a080cec9afffad1b33370149ed5d379b81c7c5f0c663fd4"}, + "telemetry_metrics": {:hex, :telemetry_metrics, "1.0.0", "29f5f84991ca98b8eb02fc208b2e6de7c95f8bb2294ef244a176675adc7775df", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "f23713b3847286a534e005126d4c959ebcca68ae9582118ce436b521d1d47d5d"}, "telemetry_registry": {:hex, :telemetry_registry, "0.3.1", "14a3319a7d9027bdbff7ebcacf1a438f5f5c903057b93aee484cca26f05bdcba", [:mix, :rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6d0ca77b691cf854ed074b459a93b87f4c7f5512f8f7743c635ca83da81f939e"}, "tesla": {:hex, :tesla, "1.8.0", "d511a4f5c5e42538d97eef7c40ec4f3e44effdc5068206f42ed859e09e51d1fd", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, ">= 1.0.0", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.2", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "10501f360cd926a309501287470372af1a6e1cbed0f43949203a4c13300bc79f"}, "tzdata": {:hex, :tzdata, "1.1.2", "45e5f1fcf8729525ec27c65e163be5b3d247ab1702581a94674e008413eef50b", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "cec7b286e608371602318c414f344941d5eb0375e14cfdab605cca2fe66cba8b"}, @@ -75,6 +73,7 @@ "ueberauth_cognito": {:hex, :ueberauth_cognito, "0.4.0", "62daa3f675298c2b03002d2e1b7e5a30cbc513400e5732a264864a26847e71ac", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:jose, "~> 1.0", [hex: :jose, repo: "hexpm", optional: false]}, {:ueberauth, "~> 0.7", [hex: :ueberauth, repo: "hexpm", optional: false]}], "hexpm", "62378f4f34c8569cd95cc4e7463c56e9981c8afc83fdc516922065f0e1302a35"}, "ueberauth_oidcc": {:hex, :ueberauth_oidcc, "0.4.0", "3fbfbc38735b4dba54ed8bf3e9b80f5054f73cc1ec9af6ae88b7886d25934768", [:mix], [{:oidcc, "~> 3.2.0", [hex: :oidcc, repo: "hexpm", optional: false]}, {:plug, "~> 1.11", [hex: :plug, repo: "hexpm", optional: false]}, {:ueberauth, "~> 0.10", [hex: :ueberauth, repo: "hexpm", optional: false]}], "hexpm", "cdd8517d773cfe499c0b692f795f213b2eb33119afbec34aefd8be0a85c62b21"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, + "unzip": {:hex, :unzip, "0.12.0", "beed92238724732418b41eba77dcb7f51e235b707406c05b1732a3052d1c0f36", [:mix], [], "hexpm", "95655b72db368e5a84951f0bed586ac053b55ee3815fd96062fce10ce4fc998d"}, "wallaby": {:hex, :wallaby, "0.30.6", "7dc4c1213f3b52c4152581d126632bc7e06892336d3a0f582853efeeabd45a71", [:mix], [{:ecto_sql, ">= 3.0.0", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:httpoison, "~> 0.12 or ~> 1.0 or ~> 2.0", [hex: :httpoison, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:phoenix_ecto, ">= 3.0.0", [hex: :phoenix_ecto, repo: "hexpm", optional: true]}, {:web_driver_client, "~> 0.2.0", [hex: :web_driver_client, repo: "hexpm", optional: false]}], "hexpm", "50950c1d968549b54c20e16175c68c7fc0824138e2bb93feb11ef6add8eb23d4"}, "web_driver_client": {:hex, :web_driver_client, "0.2.0", "63b76cd9eb3b0716ec5467a0f8bead73d3d9612e63f7560d21357f03ad86e31a", [:mix], [{:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:tesla, "~> 1.3", [hex: :tesla, repo: "hexpm", optional: false]}], "hexpm", "83cc6092bc3e74926d1c8455f0ce927d5d1d36707b74d9a65e38c084aab0350f"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, diff --git a/priv/repo/migrations/20240826153959_create_gtfs_tables_part1.exs b/priv/repo/migrations/20240826153959_create_gtfs_tables_part1.exs new file mode 100644 index 00000000..4782fc30 --- /dev/null +++ b/priv/repo/migrations/20240826153959_create_gtfs_tables_part1.exs @@ -0,0 +1,56 @@ +defmodule Arrow.Repo.Migrations.CreateGtfsTablesPart1 do + @moduledoc """ + Creates auxiliary tables referenced by fields in the more frequently-used + GTFS-static tables. + + Column names omit the `${table_name}_` prefix of their CSV counterparts. + """ + use Ecto.Migration + + def change do + ################################# + # Tables with zero dependencies # + ################################# + create table("gtfs_feed_info", primary_key: [name: :id, type: :string]) do + add :publisher_name, :string, null: false + add :publisher_url, :string, null: false + add :lang, :string, null: false + add :start_date, :date, null: false + add :end_date, :date, null: false + add :version, :string, null: false + add :contact_email, :string, null: false + end + + create table("gtfs_agencies", primary_key: [name: :id, type: :string]) do + add :name, :string, null: false + add :url, :string, null: false + add :timezone, :string, null: false + add :lang, :string + add :phone, :string + end + + create table("gtfs_checkpoints", primary_key: [name: :id, type: :string]) do + add :name, :string, null: false + end + + create table("gtfs_levels", primary_key: [name: :id, type: :string]) do + add :index, :float, null: false + add :name, :string + # `level_elevation` column is included but empty in the CSV and not + # mentioned in either the official spec or our reference. + # add :elevation, :string + end + + create table("gtfs_lines", primary_key: [name: :id, type: :string]) do + # Spec says always included, but column is partially empty + add :short_name, :string + add :long_name, :string, null: false + # Spec says always included, but column is partially empty + add :desc, :string + add :url, :string + add :color, :string, null: false + add :text_color, :string, null: false + add :sort_order, :integer, null: false + end + end +end diff --git a/priv/repo/migrations/20240826154124_create_gtfs_tables_part2.exs b/priv/repo/migrations/20240826154124_create_gtfs_tables_part2.exs new file mode 100644 index 00000000..a9d1cb46 --- /dev/null +++ b/priv/repo/migrations/20240826154124_create_gtfs_tables_part2.exs @@ -0,0 +1,68 @@ +defmodule Arrow.Repo.Migrations.CreateGtfsTablesPart2 do + use Ecto.Migration + import Arrow.Gtfs.MigrationHelper + + def change do + # A new table that allows us to easily view all + # calendar/calendar_dates entries referencing a given service_id. + create table("gtfs_services", primary_key: [name: :id, type: :string]) + + create table("gtfs_calendars", primary_key: false) do + add :service_id, references("gtfs_services", type: :string), primary_key: true + + for day <- ~w[monday tuesday wednesday thursday friday saturday sunday]a do + add day, :boolean, null: false + end + + add :start_date, :date, null: false + add :end_date, :date, null: false + end + + create table("gtfs_calendar_dates", primary_key: false) do + add :service_id, references("gtfs_services", type: :string), primary_key: true + add :date, :date, primary_key: true + add :exception_type, :integer, null: false + add :holiday_name, :string + end + + create_int_code_constraint("gtfs_calendar_dates", :exception_type, 1..2) + + create table("gtfs_stops", primary_key: [name: :id, type: :string]) do + add :code, :string + add :name, :string, null: false + add :desc, :string + add :platform_code, :string + add :platform_name, :string + add :lat, :float + add :lon, :float + add :zone_id, :string + add :address, :string + add :url, :string + add :level_id, references("gtfs_levels", type: :string) + add :location_type, :integer, null: false + add :parent_station_id, references("gtfs_stops", type: :string) + add :wheelchair_boarding, :integer, null: false + add :municipality, :string + add :on_street, :string + add :at_street, :string + add :vehicle_type, :integer + end + + create_int_code_constraint("gtfs_stops", :location_type, 4) + create_int_code_constraint("gtfs_stops", :wheelchair_boarding, 2) + create_int_code_constraint("gtfs_stops", :vehicle_type, 4) + + create table("gtfs_shapes", primary_key: [name: :id, type: :string]) + + # Individual points are separated into another table to properly + # form the 1:* relationship and allow FK relations to gtfs_shapes. + create table("gtfs_shape_points", primary_key: false) do + add :shape_id, references("gtfs_shapes", type: :string), primary_key: true + add :lat, :float, null: false + add :lon, :float, null: false + add :sequence, :integer, primary_key: true + # Column is empty, maybe should omit it? + add :dist_traveled, :float + end + end +end diff --git a/priv/repo/migrations/20240826154204_create_gtfs_tables_part3.exs b/priv/repo/migrations/20240826154204_create_gtfs_tables_part3.exs new file mode 100644 index 00000000..c051babb --- /dev/null +++ b/priv/repo/migrations/20240826154204_create_gtfs_tables_part3.exs @@ -0,0 +1,48 @@ +defmodule Arrow.Repo.Migrations.CreateGtfsTablesPart3 do + use Ecto.Migration + import Arrow.Gtfs.MigrationHelper + + def change do + create_enum_type("route_desc", [ + "Commuter Rail", + "Rapid Transit", + "Local Bus", + "Key Bus", + "Supplemental Bus", + "Community Bus", + "Commuter Bus", + "Ferry", + "Rail Replacement Bus" + ]) + + create_enum_type("fare_class", [ + "Local Bus", + "Inner Express", + "Outer Express", + "Rapid Transit", + "Commuter Rail", + "Ferry", + "Free", + "Special" + ]) + + create table("gtfs_routes", primary_key: [name: :id, type: :string]) do + add :agency_id, references("gtfs_agencies", type: :string), null: false + add :short_name, :string + add :long_name, :string + add :desc, :route_desc, null: false + add :type, :integer, null: false + add :url, :string + add :color, :string + add :text_color, :string + add :sort_order, :integer, null: false + add :fare_class, :fare_class, null: false + add :line_id, references("gtfs_lines", type: :string) + add :listed_route, :integer + add :network_id, :string, null: false + end + + create_int_code_constraint("gtfs_routes", :type, 4) + create_int_code_constraint("gtfs_routes", :listed_route, 1) + end +end diff --git a/priv/repo/migrations/20240826154208_create_gtfs_tables_part4.exs b/priv/repo/migrations/20240826154208_create_gtfs_tables_part4.exs new file mode 100644 index 00000000..cb196580 --- /dev/null +++ b/priv/repo/migrations/20240826154208_create_gtfs_tables_part4.exs @@ -0,0 +1,58 @@ +defmodule Arrow.Repo.Migrations.CreateGtfsTablesPart4 do + use Ecto.Migration + import Arrow.Gtfs.MigrationHelper + + def change do + create_enum_type("direction_desc", [ + "North", + "South", + "East", + "West", + "Northeast", + "Northwest", + "Southeast", + "Southwest", + "Clockwise", + "Counterclockwise", + "Inbound", + "Outbound", + "Loop A", + "Loop B", + "Loop" + ]) + + create table("gtfs_directions", primary_key: false) do + add :route_id, references("gtfs_routes", type: :string), primary_key: true + add :direction_id, :integer, primary_key: true + add :desc, :direction_desc, null: false + add :destination, :string, null: false + end + + create table("gtfs_route_patterns", primary_key: [name: :id, type: :string]) do + add :route_id, references("gtfs_routes", type: :string), null: false + + add :direction_id, + references("gtfs_directions", + column: :direction_id, + type: :integer, + with: [route_id: :route_id] + ), + null: false + + add :name, :string, null: false + add :time_desc, :string + add :typicality, :integer, null: false + add :sort_order, :integer, null: false + # References gtfs_trips, but we haven't created that yet. (gtfs_trips + # references this table, so we'll need to add a deferred FK constraint to + # this column later.) + add :representative_trip_id, :string, null: false + + # Make an integer-code table for this? + add :canonical, :integer, null: false + end + + create_int_code_constraint("gtfs_route_patterns", :typicality, 5) + create_int_code_constraint("gtfs_route_patterns", :canonical, 2) + end +end diff --git a/priv/repo/migrations/20240826154213_create_gtfs_tables_part5.exs b/priv/repo/migrations/20240826154213_create_gtfs_tables_part5.exs new file mode 100644 index 00000000..3c9ded49 --- /dev/null +++ b/priv/repo/migrations/20240826154213_create_gtfs_tables_part5.exs @@ -0,0 +1,45 @@ +defmodule Arrow.Repo.Migrations.CreateGtfsTablesPart5 do + use Ecto.Migration + import Arrow.Gtfs.MigrationHelper + + def change do + create table("gtfs_trips", primary_key: [name: :id, type: :string]) do + add :route_id, references("gtfs_routes", type: :string), null: false + add :service_id, references("gtfs_services", type: :string), null: false + add :headsign, :string, null: false + add :short_name, :string + + add :direction_id, + references("gtfs_directions", + column: :direction_id, + type: :integer, + with: [route_id: :route_id] + ), + null: false + + add :block_id, :string + add :shape_id, references("gtfs_shapes", type: :string) + add :wheelchair_accessible, :integer, null: false + add :route_type, :integer + add :route_pattern_id, references("gtfs_route_patterns", type: :string), null: false + add :bikes_allowed, :integer, null: false + end + + create_int_code_constraint("gtfs_trips", :wheelchair_accessible, 2) + create_int_code_constraint("gtfs_trips", :route_type, 4) + create_int_code_constraint("gtfs_trips", :bikes_allowed, 2) + + execute( + """ + ALTER TABLE "gtfs_route_patterns" + ADD CONSTRAINT "gtfs_route_patterns_representative_trip_id_fkey" + FOREIGN KEY ("representative_trip_id") REFERENCES "gtfs_trips"("id") + DEFERRABLE INITIALLY DEFERRED + """, + """ + ALTER TABLE "gtfs_route_patterns" + DROP CONSTRAINT "gtfs_route_patterns_representative_trip_id_fkey" + """ + ) + end +end diff --git a/priv/repo/migrations/20240826154218_create_gtfs_tables_part6.exs b/priv/repo/migrations/20240826154218_create_gtfs_tables_part6.exs new file mode 100644 index 00000000..b7f5aecc --- /dev/null +++ b/priv/repo/migrations/20240826154218_create_gtfs_tables_part6.exs @@ -0,0 +1,29 @@ +defmodule Arrow.Repo.Migrations.CreateGtfsTablesPart6 do + use Ecto.Migration + import Arrow.Gtfs.MigrationHelper + + def change do + create table("gtfs_stop_times", primary_key: false) do + add :trip_id, references("gtfs_trips", type: :string), primary_key: true + # Arrival and departure times are preserved as their original timestamps + # to allow for efficient import and to properly represent after-midnight values. + add :arrival_time, :string, null: false + add :departure_time, :string, null: false + add :stop_id, references("gtfs_stops", type: :string), null: false + add :stop_sequence, :integer, primary_key: true + add :stop_headsign, :string + add :pickup_type, :integer, null: false + add :drop_off_type, :integer, null: false + add :timepoint, :integer + add :checkpoint_id, references("gtfs_checkpoints", type: :string) + add :continuous_pickup, :integer + add :continuous_drop_off, :integer + end + + create_int_code_constraint("gtfs_stop_times", :pickup_type, 3) + create_int_code_constraint("gtfs_stop_times", :drop_off_type, 3) + create_int_code_constraint("gtfs_stop_times", :timepoint, 1) + create_int_code_constraint("gtfs_stop_times", :continuous_pickup, 3) + create_int_code_constraint("gtfs_stop_times", :continuous_drop_off, 3) + end +end diff --git a/priv/repo/migrations/20241003182524_add_oban_jobs_table.exs b/priv/repo/migrations/20241003182524_add_oban_jobs_table.exs new file mode 100644 index 00000000..8cc3b649 --- /dev/null +++ b/priv/repo/migrations/20241003182524_add_oban_jobs_table.exs @@ -0,0 +1,13 @@ +defmodule Arrow.Repo.Migrations.AddObanJobsTable do + use Ecto.Migration + + def up do + Oban.Migration.up(version: 12) + end + + # We specify `version: 1` in `down`, ensuring that we'll roll all the way back down if + # necessary, regardless of which version we've migrated `up` to. + def down do + Oban.Migration.down(version: 1) + end +end diff --git a/priv/repo/structure.sql b/priv/repo/structure.sql index 4d640eda..6deb899c 100644 --- a/priv/repo/structure.sql +++ b/priv/repo/structure.sql @@ -16,13 +16,6 @@ SET xmloption = content; SET client_min_messages = warning; SET row_security = off; --- --- Name: public; Type: SCHEMA; Schema: -; Owner: - --- - --- *not* creating schema, since initdb creates it - - -- -- Name: day_name; Type: TYPE; Schema: public; Owner: - -- @@ -38,6 +31,77 @@ CREATE TYPE public.day_name AS ENUM ( ); +-- +-- Name: direction_desc; Type: TYPE; Schema: public; Owner: - +-- + +CREATE TYPE public.direction_desc AS ENUM ( + 'North', + 'South', + 'East', + 'West', + 'Northeast', + 'Northwest', + 'Southeast', + 'Southwest', + 'Clockwise', + 'Counterclockwise', + 'Inbound', + 'Outbound', + 'Loop A', + 'Loop B', + 'Loop' +); + + +-- +-- Name: fare_class; Type: TYPE; Schema: public; Owner: - +-- + +CREATE TYPE public.fare_class AS ENUM ( + 'Local Bus', + 'Inner Express', + 'Outer Express', + 'Rapid Transit', + 'Commuter Rail', + 'Ferry', + 'Free', + 'Special' +); + + +-- +-- Name: oban_job_state; Type: TYPE; Schema: public; Owner: - +-- + +CREATE TYPE public.oban_job_state AS ENUM ( + 'available', + 'scheduled', + 'executing', + 'retryable', + 'completed', + 'discarded', + 'cancelled' +); + + +-- +-- Name: route_desc; Type: TYPE; Schema: public; Owner: - +-- + +CREATE TYPE public.route_desc AS ENUM ( + 'Commuter Rail', + 'Rapid Transit', + 'Local Bus', + 'Key Bus', + 'Supplemental Bus', + 'Community Bus', + 'Commuter Bus', + 'Ferry', + 'Rail Replacement Bus' +); + + SET default_tablespace = ''; SET default_table_access_method = heap; @@ -337,6 +401,336 @@ CREATE SEQUENCE public.disruptions_id_seq1 ALTER SEQUENCE public.disruptions_id_seq1 OWNED BY public.disruptions.id; +-- +-- Name: gtfs_agencies; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.gtfs_agencies ( + id character varying(255) NOT NULL, + name character varying(255) NOT NULL, + url character varying(255) NOT NULL, + timezone character varying(255) NOT NULL, + lang character varying(255), + phone character varying(255) +); + + +-- +-- Name: gtfs_calendar_dates; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.gtfs_calendar_dates ( + service_id character varying(255) NOT NULL, + date date NOT NULL, + exception_type integer NOT NULL, + holiday_name character varying(255), + CONSTRAINT exception_type_must_be_in_range CHECK ((exception_type <@ int4range(1, 2, '[]'::text))) +); + + +-- +-- Name: gtfs_calendars; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.gtfs_calendars ( + service_id character varying(255) NOT NULL, + monday boolean NOT NULL, + tuesday boolean NOT NULL, + wednesday boolean NOT NULL, + thursday boolean NOT NULL, + friday boolean NOT NULL, + saturday boolean NOT NULL, + sunday boolean NOT NULL, + start_date date NOT NULL, + end_date date NOT NULL +); + + +-- +-- Name: gtfs_checkpoints; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.gtfs_checkpoints ( + id character varying(255) NOT NULL, + name character varying(255) NOT NULL +); + + +-- +-- Name: gtfs_directions; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.gtfs_directions ( + route_id character varying(255) NOT NULL, + direction_id integer NOT NULL, + "desc" public.direction_desc NOT NULL, + destination character varying(255) NOT NULL +); + + +-- +-- Name: gtfs_feed_info; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.gtfs_feed_info ( + id character varying(255) NOT NULL, + publisher_name character varying(255) NOT NULL, + publisher_url character varying(255) NOT NULL, + lang character varying(255) NOT NULL, + start_date date NOT NULL, + end_date date NOT NULL, + version character varying(255) NOT NULL, + contact_email character varying(255) NOT NULL +); + + +-- +-- Name: gtfs_levels; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.gtfs_levels ( + id character varying(255) NOT NULL, + index double precision NOT NULL, + name character varying(255) +); + + +-- +-- Name: gtfs_lines; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.gtfs_lines ( + id character varying(255) NOT NULL, + short_name character varying(255), + long_name character varying(255) NOT NULL, + "desc" character varying(255), + url character varying(255), + color character varying(255) NOT NULL, + text_color character varying(255) NOT NULL, + sort_order integer NOT NULL +); + + +-- +-- Name: gtfs_route_patterns; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.gtfs_route_patterns ( + id character varying(255) NOT NULL, + route_id character varying(255) NOT NULL, + direction_id integer NOT NULL, + name character varying(255) NOT NULL, + time_desc character varying(255), + typicality integer NOT NULL, + sort_order integer NOT NULL, + representative_trip_id character varying(255) NOT NULL, + canonical integer NOT NULL, + CONSTRAINT canonical_must_be_in_range CHECK ((canonical <@ int4range(0, 2, '[]'::text))), + CONSTRAINT typicality_must_be_in_range CHECK ((typicality <@ int4range(0, 5, '[]'::text))) +); + + +-- +-- Name: gtfs_routes; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.gtfs_routes ( + id character varying(255) NOT NULL, + agency_id character varying(255) NOT NULL, + short_name character varying(255), + long_name character varying(255), + "desc" public.route_desc NOT NULL, + type integer NOT NULL, + url character varying(255), + color character varying(255), + text_color character varying(255), + sort_order integer NOT NULL, + fare_class public.fare_class NOT NULL, + line_id character varying(255), + listed_route integer, + network_id character varying(255) NOT NULL, + CONSTRAINT listed_route_must_be_in_range CHECK ((listed_route <@ int4range(0, 1, '[]'::text))), + CONSTRAINT type_must_be_in_range CHECK ((type <@ int4range(0, 4, '[]'::text))) +); + + +-- +-- Name: gtfs_services; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.gtfs_services ( + id character varying(255) NOT NULL +); + + +-- +-- Name: gtfs_shape_points; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.gtfs_shape_points ( + shape_id character varying(255) NOT NULL, + lat double precision NOT NULL, + lon double precision NOT NULL, + sequence integer NOT NULL, + dist_traveled double precision +); + + +-- +-- Name: gtfs_shapes; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.gtfs_shapes ( + id character varying(255) NOT NULL +); + + +-- +-- Name: gtfs_stop_times; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.gtfs_stop_times ( + trip_id character varying(255) NOT NULL, + arrival_time character varying(255) NOT NULL, + departure_time character varying(255) NOT NULL, + stop_id character varying(255) NOT NULL, + stop_sequence integer NOT NULL, + stop_headsign character varying(255), + pickup_type integer NOT NULL, + drop_off_type integer NOT NULL, + timepoint integer, + checkpoint_id character varying(255), + continuous_pickup integer, + continuous_drop_off integer, + CONSTRAINT continuous_drop_off_must_be_in_range CHECK ((continuous_drop_off <@ int4range(0, 3, '[]'::text))), + CONSTRAINT continuous_pickup_must_be_in_range CHECK ((continuous_pickup <@ int4range(0, 3, '[]'::text))), + CONSTRAINT drop_off_type_must_be_in_range CHECK ((drop_off_type <@ int4range(0, 3, '[]'::text))), + CONSTRAINT pickup_type_must_be_in_range CHECK ((pickup_type <@ int4range(0, 3, '[]'::text))), + CONSTRAINT timepoint_must_be_in_range CHECK ((timepoint <@ int4range(0, 1, '[]'::text))) +); + + +-- +-- Name: gtfs_stops; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.gtfs_stops ( + id character varying(255) NOT NULL, + code character varying(255), + name character varying(255) NOT NULL, + "desc" character varying(255), + platform_code character varying(255), + platform_name character varying(255), + lat double precision, + lon double precision, + zone_id character varying(255), + address character varying(255), + url character varying(255), + level_id character varying(255), + location_type integer NOT NULL, + parent_station_id character varying(255), + wheelchair_boarding integer NOT NULL, + municipality character varying(255), + on_street character varying(255), + at_street character varying(255), + vehicle_type integer, + CONSTRAINT location_type_must_be_in_range CHECK ((location_type <@ int4range(0, 4, '[]'::text))), + CONSTRAINT vehicle_type_must_be_in_range CHECK ((vehicle_type <@ int4range(0, 4, '[]'::text))), + CONSTRAINT wheelchair_boarding_must_be_in_range CHECK ((wheelchair_boarding <@ int4range(0, 2, '[]'::text))) +); + + +-- +-- Name: gtfs_trips; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.gtfs_trips ( + id character varying(255) NOT NULL, + route_id character varying(255) NOT NULL, + service_id character varying(255) NOT NULL, + headsign character varying(255) NOT NULL, + short_name character varying(255), + direction_id integer NOT NULL, + block_id character varying(255), + shape_id character varying(255), + wheelchair_accessible integer NOT NULL, + route_type integer, + route_pattern_id character varying(255) NOT NULL, + bikes_allowed integer NOT NULL, + CONSTRAINT bikes_allowed_must_be_in_range CHECK ((bikes_allowed <@ int4range(0, 2, '[]'::text))), + CONSTRAINT route_type_must_be_in_range CHECK ((route_type <@ int4range(0, 4, '[]'::text))), + CONSTRAINT wheelchair_accessible_must_be_in_range CHECK ((wheelchair_accessible <@ int4range(0, 2, '[]'::text))) +); + + +-- +-- Name: oban_jobs; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.oban_jobs ( + id bigint NOT NULL, + state public.oban_job_state DEFAULT 'available'::public.oban_job_state NOT NULL, + queue text DEFAULT 'default'::text NOT NULL, + worker text NOT NULL, + args jsonb DEFAULT '{}'::jsonb NOT NULL, + errors jsonb[] DEFAULT ARRAY[]::jsonb[] NOT NULL, + attempt integer DEFAULT 0 NOT NULL, + max_attempts integer DEFAULT 20 NOT NULL, + inserted_at timestamp without time zone DEFAULT timezone('UTC'::text, now()) NOT NULL, + scheduled_at timestamp without time zone DEFAULT timezone('UTC'::text, now()) NOT NULL, + attempted_at timestamp without time zone, + completed_at timestamp without time zone, + attempted_by text[], + discarded_at timestamp without time zone, + priority integer DEFAULT 0 NOT NULL, + tags text[] DEFAULT ARRAY[]::text[], + meta jsonb DEFAULT '{}'::jsonb, + cancelled_at timestamp without time zone, + CONSTRAINT attempt_range CHECK (((attempt >= 0) AND (attempt <= max_attempts))), + CONSTRAINT positive_max_attempts CHECK ((max_attempts > 0)), + CONSTRAINT queue_length CHECK (((char_length(queue) > 0) AND (char_length(queue) < 128))), + CONSTRAINT worker_length CHECK (((char_length(worker) > 0) AND (char_length(worker) < 128))) +); + + +-- +-- Name: TABLE oban_jobs; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.oban_jobs IS '12'; + + +-- +-- Name: oban_jobs_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.oban_jobs_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: oban_jobs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.oban_jobs_id_seq OWNED BY public.oban_jobs.id; + + +-- +-- Name: oban_peers; Type: TABLE; Schema: public; Owner: - +-- + +CREATE UNLOGGED TABLE public.oban_peers ( + name text NOT NULL, + node text NOT NULL, + started_at timestamp without time zone NOT NULL, + expires_at timestamp without time zone NOT NULL +); + + -- -- Name: schema_migrations; Type: TABLE; Schema: public; Owner: - -- @@ -593,6 +987,13 @@ ALTER TABLE ONLY public.disruption_trip_short_names ALTER COLUMN id SET DEFAULT ALTER TABLE ONLY public.disruptions ALTER COLUMN id SET DEFAULT nextval('public.disruptions_id_seq1'::regclass); +-- +-- Name: oban_jobs id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.oban_jobs ALTER COLUMN id SET DEFAULT nextval('public.oban_jobs_id_seq'::regclass); + + -- -- Name: shapes id; Type: DEFAULT; Schema: public; Owner: - -- @@ -700,6 +1101,158 @@ ALTER TABLE ONLY public.disruptions ADD CONSTRAINT disruptions_pkey1 PRIMARY KEY (id); +-- +-- Name: gtfs_agencies gtfs_agencies_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_agencies + ADD CONSTRAINT gtfs_agencies_pkey PRIMARY KEY (id); + + +-- +-- Name: gtfs_calendar_dates gtfs_calendar_dates_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_calendar_dates + ADD CONSTRAINT gtfs_calendar_dates_pkey PRIMARY KEY (service_id, date); + + +-- +-- Name: gtfs_calendars gtfs_calendars_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_calendars + ADD CONSTRAINT gtfs_calendars_pkey PRIMARY KEY (service_id); + + +-- +-- Name: gtfs_checkpoints gtfs_checkpoints_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_checkpoints + ADD CONSTRAINT gtfs_checkpoints_pkey PRIMARY KEY (id); + + +-- +-- Name: gtfs_directions gtfs_directions_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_directions + ADD CONSTRAINT gtfs_directions_pkey PRIMARY KEY (route_id, direction_id); + + +-- +-- Name: gtfs_feed_info gtfs_feed_info_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_feed_info + ADD CONSTRAINT gtfs_feed_info_pkey PRIMARY KEY (id); + + +-- +-- Name: gtfs_levels gtfs_levels_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_levels + ADD CONSTRAINT gtfs_levels_pkey PRIMARY KEY (id); + + +-- +-- Name: gtfs_lines gtfs_lines_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_lines + ADD CONSTRAINT gtfs_lines_pkey PRIMARY KEY (id); + + +-- +-- Name: gtfs_route_patterns gtfs_route_patterns_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_route_patterns + ADD CONSTRAINT gtfs_route_patterns_pkey PRIMARY KEY (id); + + +-- +-- Name: gtfs_routes gtfs_routes_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_routes + ADD CONSTRAINT gtfs_routes_pkey PRIMARY KEY (id); + + +-- +-- Name: gtfs_services gtfs_services_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_services + ADD CONSTRAINT gtfs_services_pkey PRIMARY KEY (id); + + +-- +-- Name: gtfs_shape_points gtfs_shape_points_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_shape_points + ADD CONSTRAINT gtfs_shape_points_pkey PRIMARY KEY (shape_id, sequence); + + +-- +-- Name: gtfs_shapes gtfs_shapes_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_shapes + ADD CONSTRAINT gtfs_shapes_pkey PRIMARY KEY (id); + + +-- +-- Name: gtfs_stop_times gtfs_stop_times_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_stop_times + ADD CONSTRAINT gtfs_stop_times_pkey PRIMARY KEY (trip_id, stop_sequence); + + +-- +-- Name: gtfs_stops gtfs_stops_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_stops + ADD CONSTRAINT gtfs_stops_pkey PRIMARY KEY (id); + + +-- +-- Name: gtfs_trips gtfs_trips_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_trips + ADD CONSTRAINT gtfs_trips_pkey PRIMARY KEY (id); + + +-- +-- Name: oban_jobs non_negative_priority; Type: CHECK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE public.oban_jobs + ADD CONSTRAINT non_negative_priority CHECK ((priority >= 0)) NOT VALID; + + +-- +-- Name: oban_jobs oban_jobs_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.oban_jobs + ADD CONSTRAINT oban_jobs_pkey PRIMARY KEY (id); + + +-- +-- Name: oban_peers oban_peers_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.oban_peers + ADD CONSTRAINT oban_peers_pkey PRIMARY KEY (name); + + -- -- Name: schema_migrations schema_migrations_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -811,6 +1364,27 @@ CREATE INDEX disruption_notes_disruption_id_index ON public.disruption_notes USI CREATE INDEX disruption_trip_short_names_disruption_id_index ON public.disruption_trip_short_names USING btree (disruption_revision_id); +-- +-- Name: oban_jobs_args_index; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX oban_jobs_args_index ON public.oban_jobs USING gin (args); + + +-- +-- Name: oban_jobs_meta_index; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX oban_jobs_meta_index ON public.oban_jobs USING gin (meta); + + +-- +-- Name: oban_jobs_state_queue_priority_scheduled_at_id_index; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX oban_jobs_state_queue_priority_scheduled_at_id_index ON public.oban_jobs USING btree (state, queue, priority, scheduled_at, id); + + -- -- Name: shapes_name_index; Type: INDEX; Schema: public; Owner: - -- @@ -924,6 +1498,158 @@ ALTER TABLE ONLY public.disruptions ADD CONSTRAINT disruptions_published_revision_id_fkey FOREIGN KEY (published_revision_id) REFERENCES public.disruption_revisions(id); +-- +-- Name: gtfs_calendar_dates gtfs_calendar_dates_service_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_calendar_dates + ADD CONSTRAINT gtfs_calendar_dates_service_id_fkey FOREIGN KEY (service_id) REFERENCES public.gtfs_services(id); + + +-- +-- Name: gtfs_calendars gtfs_calendars_service_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_calendars + ADD CONSTRAINT gtfs_calendars_service_id_fkey FOREIGN KEY (service_id) REFERENCES public.gtfs_services(id); + + +-- +-- Name: gtfs_directions gtfs_directions_route_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_directions + ADD CONSTRAINT gtfs_directions_route_id_fkey FOREIGN KEY (route_id) REFERENCES public.gtfs_routes(id); + + +-- +-- Name: gtfs_route_patterns gtfs_route_patterns_direction_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_route_patterns + ADD CONSTRAINT gtfs_route_patterns_direction_id_fkey FOREIGN KEY (direction_id, route_id) REFERENCES public.gtfs_directions(direction_id, route_id); + + +-- +-- Name: gtfs_route_patterns gtfs_route_patterns_representative_trip_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_route_patterns + ADD CONSTRAINT gtfs_route_patterns_representative_trip_id_fkey FOREIGN KEY (representative_trip_id) REFERENCES public.gtfs_trips(id) DEFERRABLE INITIALLY DEFERRED; + + +-- +-- Name: gtfs_route_patterns gtfs_route_patterns_route_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_route_patterns + ADD CONSTRAINT gtfs_route_patterns_route_id_fkey FOREIGN KEY (route_id) REFERENCES public.gtfs_routes(id); + + +-- +-- Name: gtfs_routes gtfs_routes_agency_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_routes + ADD CONSTRAINT gtfs_routes_agency_id_fkey FOREIGN KEY (agency_id) REFERENCES public.gtfs_agencies(id); + + +-- +-- Name: gtfs_routes gtfs_routes_line_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_routes + ADD CONSTRAINT gtfs_routes_line_id_fkey FOREIGN KEY (line_id) REFERENCES public.gtfs_lines(id); + + +-- +-- Name: gtfs_shape_points gtfs_shape_points_shape_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_shape_points + ADD CONSTRAINT gtfs_shape_points_shape_id_fkey FOREIGN KEY (shape_id) REFERENCES public.gtfs_shapes(id); + + +-- +-- Name: gtfs_stop_times gtfs_stop_times_checkpoint_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_stop_times + ADD CONSTRAINT gtfs_stop_times_checkpoint_id_fkey FOREIGN KEY (checkpoint_id) REFERENCES public.gtfs_checkpoints(id); + + +-- +-- Name: gtfs_stop_times gtfs_stop_times_stop_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_stop_times + ADD CONSTRAINT gtfs_stop_times_stop_id_fkey FOREIGN KEY (stop_id) REFERENCES public.gtfs_stops(id); + + +-- +-- Name: gtfs_stop_times gtfs_stop_times_trip_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_stop_times + ADD CONSTRAINT gtfs_stop_times_trip_id_fkey FOREIGN KEY (trip_id) REFERENCES public.gtfs_trips(id); + + +-- +-- Name: gtfs_stops gtfs_stops_level_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_stops + ADD CONSTRAINT gtfs_stops_level_id_fkey FOREIGN KEY (level_id) REFERENCES public.gtfs_levels(id); + + +-- +-- Name: gtfs_stops gtfs_stops_parent_station_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_stops + ADD CONSTRAINT gtfs_stops_parent_station_id_fkey FOREIGN KEY (parent_station_id) REFERENCES public.gtfs_stops(id); + + +-- +-- Name: gtfs_trips gtfs_trips_direction_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_trips + ADD CONSTRAINT gtfs_trips_direction_id_fkey FOREIGN KEY (direction_id, route_id) REFERENCES public.gtfs_directions(direction_id, route_id); + + +-- +-- Name: gtfs_trips gtfs_trips_route_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_trips + ADD CONSTRAINT gtfs_trips_route_id_fkey FOREIGN KEY (route_id) REFERENCES public.gtfs_routes(id); + + +-- +-- Name: gtfs_trips gtfs_trips_route_pattern_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_trips + ADD CONSTRAINT gtfs_trips_route_pattern_id_fkey FOREIGN KEY (route_pattern_id) REFERENCES public.gtfs_route_patterns(id); + + +-- +-- Name: gtfs_trips gtfs_trips_service_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_trips + ADD CONSTRAINT gtfs_trips_service_id_fkey FOREIGN KEY (service_id) REFERENCES public.gtfs_services(id); + + +-- +-- Name: gtfs_trips gtfs_trips_shape_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.gtfs_trips + ADD CONSTRAINT gtfs_trips_shape_id_fkey FOREIGN KEY (shape_id) REFERENCES public.gtfs_shapes(id); + + -- -- Name: shuttle_route_stops shuttle_route_stops_shuttle_route_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- @@ -979,6 +1705,13 @@ INSERT INTO public."schema_migrations" (version) VALUES (20240611173539); INSERT INTO public."schema_migrations" (version) VALUES (20240628203237); INSERT INTO public."schema_migrations" (version) VALUES (20240701173124); INSERT INTO public."schema_migrations" (version) VALUES (20240718181932); +INSERT INTO public."schema_migrations" (version) VALUES (20240826153959); +INSERT INTO public."schema_migrations" (version) VALUES (20240826154124); +INSERT INTO public."schema_migrations" (version) VALUES (20240826154204); +INSERT INTO public."schema_migrations" (version) VALUES (20240826154208); +INSERT INTO public."schema_migrations" (version) VALUES (20240826154213); +INSERT INTO public."schema_migrations" (version) VALUES (20240826154218); +INSERT INTO public."schema_migrations" (version) VALUES (20241003182524); INSERT INTO public."schema_migrations" (version) VALUES (20241010164333); INSERT INTO public."schema_migrations" (version) VALUES (20241010164455); INSERT INTO public."schema_migrations" (version) VALUES (20241010164555); diff --git a/test/arrow/gtfs/agency_test.exs b/test/arrow/gtfs/agency_test.exs new file mode 100644 index 00000000..8f907e25 --- /dev/null +++ b/test/arrow/gtfs/agency_test.exs @@ -0,0 +1,22 @@ +defmodule Arrow.Gtfs.AgencyTest do + use Arrow.DataCase + alias Arrow.Gtfs.Agency + + describe "database" do + test "can insert an agency using a CSV-parsed map with all string values" do + attrs = %{ + "id" => "mbta", + "name" => "Mass Bay Transpo Auth", + "url" => "mbta.com", + "timezone" => "America/New_York" + } + + cs = Agency.changeset(%Agency{}, attrs) + + assert {:ok, new_agency} = Repo.insert(cs) + assert attrs["id"] == new_agency.id + + assert new_agency in Repo.all(Agency) + end + end +end diff --git a/test/arrow/gtfs/import_helper_test.exs b/test/arrow/gtfs/import_helper_test.exs new file mode 100644 index 00000000..f4b2947c --- /dev/null +++ b/test/arrow/gtfs/import_helper_test.exs @@ -0,0 +1,4 @@ +defmodule Arrow.Gtfs.ImportHelperTest do + use ExUnit.Case, async: true + doctest Arrow.Gtfs.ImportHelper, import: true +end diff --git a/test/arrow/gtfs/service_test.exs b/test/arrow/gtfs/service_test.exs new file mode 100644 index 00000000..7fe3b464 --- /dev/null +++ b/test/arrow/gtfs/service_test.exs @@ -0,0 +1,65 @@ +defmodule Arrow.Gtfs.ServiceTest do + use Arrow.DataCase + alias Arrow.Gtfs.Calendar + alias Arrow.Gtfs.CalendarDate + alias Arrow.Gtfs.Service + + describe "database" do + test "enforces string FK constraints and integer-coded values, parses datestamps" do + service_id = "OL-Weekend-September" + + cs = Service.changeset(%Service{}, %{"id" => service_id}) + assert {:ok, service} = Repo.insert(cs) + + assert %Service{id: ^service_id} = service + + calendar_attrs = %{ + "service_id" => service_id, + "monday" => "0", + "tuesday" => "0", + "wednesday" => "0", + "thursday" => "0", + "friday" => "0", + "saturday" => "1", + "sunday" => "1", + "start_date" => "20240901", + "end_date" => "20240930" + } + + cs = Calendar.changeset(%Calendar{}, calendar_attrs) + assert {:ok, calendar} = Repo.insert(cs) + + assert %Calendar{ + service_id: ^service_id, + monday: false, + tuesday: false, + wednesday: false, + thursday: false, + friday: false, + saturday: true, + sunday: true, + start_date: ~D[2024-09-01], + end_date: ~D[2024-09-30] + } = calendar + + calendar_date_attrs = %{ + "service_id" => service_id, + "date" => "20240905", + "exception_type" => "2", + "holiday_name" => "" + } + + cs = CalendarDate.changeset(%CalendarDate{}, calendar_date_attrs) + assert {:ok, calendar_date} = Repo.insert(cs) + + assert %CalendarDate{ + service_id: ^service_id, + date: ~D[2024-09-05], + exception_type: :removed, + holiday_name: nil + } = calendar_date + + assert [^calendar_date] = Repo.all(Ecto.assoc(service, :calendar_dates)) + end + end +end diff --git a/test/arrow/gtfs/time_helper_test.exs b/test/arrow/gtfs/time_helper_test.exs new file mode 100644 index 00000000..85639f69 --- /dev/null +++ b/test/arrow/gtfs/time_helper_test.exs @@ -0,0 +1,4 @@ +defmodule Arrow.Gtfs.TimeHelperTest do + use ExUnit.Case, async: true + doctest Arrow.Gtfs.TimeHelper, import: true +end