diff --git a/apps/transport/lib/jobs/outdated_data_notification_job.ex b/apps/transport/lib/jobs/outdated_data_notification_job.ex index b32918b372..9821255b9c 100644 --- a/apps/transport/lib/jobs/outdated_data_notification_job.ex +++ b/apps/transport/lib/jobs/outdated_data_notification_job.ex @@ -1,15 +1,77 @@ defmodule Transport.Jobs.OutdatedDataNotificationJob do @moduledoc """ This module is in charge of sending notifications to both admins and users when data is outdated. - It is (currently) using the old DataChecker module, where there is also code for checking active/inactive datasets. - Behaviour of this job is tested in test/transport/data_checker_test.exs. """ + use Oban.Worker, max_attempts: 3, tags: ["notifications"] + import Ecto.Query + + @type delay_and_records :: {integer(), [{DB.Dataset.t(), [DB.Resource.t()]}]} + @expiration_reason Transport.NotificationReason.reason(:expiration) + # If delay < 0, the resource is already expired + @default_outdated_data_delays [-90, -60, -30, -45, -15, -7, -3, 0, 7, 14] @impl Oban.Worker def perform(%Oban.Job{id: job_id}) do - Transport.DataChecker.outdated_data(job_id) + outdated_data(job_id) :ok end + + def outdated_data(job_id) do + for delay <- possible_delays(), + date = Date.add(Date.utc_today(), delay) do + {delay, gtfs_datasets_expiring_on(date)} + end + |> Enum.reject(fn {_, records} -> Enum.empty?(records) end) + |> send_outdated_data_admin_mail() + |> Enum.map(&send_outdated_data_producer_notifications(&1, job_id)) + end + + @spec gtfs_datasets_expiring_on(Date.t()) :: [{DB.Dataset.t(), [DB.Resource.t()]}] + def gtfs_datasets_expiring_on(%Date{} = date) do + DB.Dataset.base_query() + |> DB.Dataset.join_from_dataset_to_metadata(Transport.Validators.GTFSTransport.validator_name()) + |> where( + [metadata: m, resource: r], + fragment("TO_DATE(?->>'end_date', 'YYYY-MM-DD')", m.metadata) == ^date and r.format == "GTFS" + ) + |> select([dataset: d, resource: r], {d, r}) + |> distinct(true) + |> DB.Repo.all() + |> Enum.group_by(fn {%DB.Dataset{} = d, _} -> d end, fn {_, %DB.Resource{} = r} -> r end) + |> Enum.to_list() + end + + def possible_delays do + @default_outdated_data_delays + |> Enum.uniq() + |> Enum.sort() + end + + # A different email is sent to producers for every delay, containing in a single email all datasets expiring on this given delay + @spec send_outdated_data_producer_notifications(delay_and_records(), integer()) :: :ok + def send_outdated_data_producer_notifications({delay, records}, job_id) do + Enum.each(records, fn {%DB.Dataset{} = dataset, resources} -> + @expiration_reason + |> DB.NotificationSubscription.subscriptions_for_reason_dataset_and_role(dataset, :producer) + |> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription -> + contact + |> Transport.UserNotifier.expiration_producer(dataset, resources, delay) + |> Transport.Mailer.deliver() + + DB.Notification.insert!(dataset, subscription, %{delay: delay, job_id: job_id}) + end) + end) + end + + @spec send_outdated_data_admin_mail([delay_and_records()]) :: [delay_and_records()] + defp send_outdated_data_admin_mail([] = _records), do: [] + + defp send_outdated_data_admin_mail(records) do + Transport.AdminNotifier.expiration(records) + |> Transport.Mailer.deliver() + + records + end end diff --git a/apps/transport/lib/transport/data_checker.ex b/apps/transport/lib/transport/data_checker.ex index 9724c70bf5..6cdd2f3d1d 100644 --- a/apps/transport/lib/transport/data_checker.ex +++ b/apps/transport/lib/transport/data_checker.ex @@ -1,18 +1,12 @@ defmodule Transport.DataChecker do @moduledoc """ - Use to check data for two things: - - Toggle in and of active status of datasets depending on status on data.gouv.fr - - Send notifications to producers and admins when data is outdated + Use to check data for toggling on and off active status of datasets depending on status on data.gouv.fr """ alias DB.{Dataset, Repo} import Ecto.Query require Logger - @type delay_and_records :: {integer(), [{DB.Dataset.t(), [DB.Resource.t()]}]} @type dataset_status :: :active | :inactive | :ignore | :no_producer | {:archived, DateTime.t()} - @expiration_reason Transport.NotificationReason.reason(:expiration) - # If delay < 0, the resource is already expired - @default_outdated_data_delays [-90, -60, -30, -45, -15, -7, -3, 0, 7, 14] @doc """ This method is a scheduled job which does two things: @@ -107,63 +101,6 @@ defmodule Transport.DataChecker do ) end - def outdated_data(job_id) do - for delay <- possible_delays(), - date = Date.add(Date.utc_today(), delay) do - {delay, gtfs_datasets_expiring_on(date)} - end - |> Enum.reject(fn {_, records} -> Enum.empty?(records) end) - |> send_outdated_data_admin_mail() - |> Enum.map(&send_outdated_data_producer_notifications(&1, job_id)) - end - - @spec gtfs_datasets_expiring_on(Date.t()) :: [{DB.Dataset.t(), [DB.Resource.t()]}] - def gtfs_datasets_expiring_on(%Date{} = date) do - DB.Dataset.base_query() - |> DB.Dataset.join_from_dataset_to_metadata(Transport.Validators.GTFSTransport.validator_name()) - |> where( - [metadata: m, resource: r], - fragment("TO_DATE(?->>'end_date', 'YYYY-MM-DD')", m.metadata) == ^date and r.format == "GTFS" - ) - |> select([dataset: d, resource: r], {d, r}) - |> distinct(true) - |> DB.Repo.all() - |> Enum.group_by(fn {%DB.Dataset{} = d, _} -> d end, fn {_, %DB.Resource{} = r} -> r end) - |> Enum.to_list() - end - - def possible_delays do - @default_outdated_data_delays - |> Enum.uniq() - |> Enum.sort() - end - - # A different email is sent to producers for every delay, containing in a single email all datasets expiring on this given delay - @spec send_outdated_data_producer_notifications(delay_and_records(), integer()) :: :ok - def send_outdated_data_producer_notifications({delay, records}, job_id) do - Enum.each(records, fn {%DB.Dataset{} = dataset, resources} -> - @expiration_reason - |> DB.NotificationSubscription.subscriptions_for_reason_dataset_and_role(dataset, :producer) - |> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription -> - contact - |> Transport.UserNotifier.expiration_producer(dataset, resources, delay) - |> Transport.Mailer.deliver() - - DB.Notification.insert!(dataset, subscription, %{delay: delay, job_id: job_id}) - end) - end) - end - - @spec send_outdated_data_admin_mail([delay_and_records()]) :: [delay_and_records()] - defp send_outdated_data_admin_mail([] = _records), do: [] - - defp send_outdated_data_admin_mail(records) do - Transport.AdminNotifier.expiration(records) - |> Transport.Mailer.deliver() - - records - end - # Do nothing if all lists are empty defp send_inactive_datasets_mail([] = _reactivated_datasets, [] = _inactive_datasets, [] = _archived_datasets), do: nil diff --git a/apps/transport/test/transport/data_checker_test.exs b/apps/transport/test/transport/data_checker_test.exs index 8f58b73ba5..c4b9372070 100644 --- a/apps/transport/test/transport/data_checker_test.exs +++ b/apps/transport/test/transport/data_checker_test.exs @@ -3,9 +3,6 @@ defmodule Transport.DataCheckerTest do import Mox import DB.Factory import Swoosh.TestAssertions - use Oban.Testing, repo: DB.Repo - - doctest Transport.DataChecker, import: true setup :verify_on_exit! @@ -128,193 +125,6 @@ defmodule Transport.DataCheckerTest do end end - test "gtfs_datasets_expiring_on" do - {today, tomorrow, yesterday} = {Date.utc_today(), Date.add(Date.utc_today(), 1), Date.add(Date.utc_today(), -1)} - assert [] == today |> Transport.DataChecker.gtfs_datasets_expiring_on() - - insert_fn = fn %Date{} = expiration_date, %DB.Dataset{} = dataset -> - multi_validation = - insert(:multi_validation, - validator: Transport.Validators.GTFSTransport.validator_name(), - resource_history: insert(:resource_history, resource: insert(:resource, dataset: dataset, format: "GTFS")) - ) - - insert(:resource_metadata, - multi_validation_id: multi_validation.id, - metadata: %{"end_date" => expiration_date} - ) - end - - # Ignores hidden or inactive datasets - insert_fn.(today, insert(:dataset, is_active: false)) - insert_fn.(today, insert(:dataset, is_active: true, is_hidden: true)) - - assert [] == today |> Transport.DataChecker.gtfs_datasets_expiring_on() - - # 2 GTFS resources expiring on the same day for a dataset - %DB.Dataset{id: dataset_id} = dataset = insert(:dataset, is_active: true) - insert_fn.(today, dataset) - insert_fn.(today, dataset) - - assert [ - {%DB.Dataset{id: ^dataset_id}, - [%DB.Resource{dataset_id: ^dataset_id}, %DB.Resource{dataset_id: ^dataset_id}]} - ] = today |> Transport.DataChecker.gtfs_datasets_expiring_on() - - assert [] == tomorrow |> Transport.DataChecker.gtfs_datasets_expiring_on() - assert [] == yesterday |> Transport.DataChecker.gtfs_datasets_expiring_on() - - insert_fn.(tomorrow, dataset) - - assert [ - {%DB.Dataset{id: ^dataset_id}, - [%DB.Resource{dataset_id: ^dataset_id}, %DB.Resource{dataset_id: ^dataset_id}]} - ] = today |> Transport.DataChecker.gtfs_datasets_expiring_on() - - assert [ - {%DB.Dataset{id: ^dataset_id}, [%DB.Resource{dataset_id: ^dataset_id}]} - ] = tomorrow |> Transport.DataChecker.gtfs_datasets_expiring_on() - - assert [] == yesterday |> Transport.DataChecker.gtfs_datasets_expiring_on() - - # Multiple datasets - %DB.Dataset{id: d2_id} = d2 = insert(:dataset, is_active: true) - insert_fn.(today, d2) - - assert [ - {%DB.Dataset{id: ^dataset_id}, - [%DB.Resource{dataset_id: ^dataset_id}, %DB.Resource{dataset_id: ^dataset_id}]}, - {%DB.Dataset{id: ^d2_id}, [%DB.Resource{dataset_id: ^d2_id}]} - ] = today |> Transport.DataChecker.gtfs_datasets_expiring_on() - end - - describe "outdated_data job" do - test "sends email to our team + relevant contact before expiry" do - %DB.Dataset{id: dataset_id} = - dataset = - insert(:dataset, is_active: true, custom_title: "Dataset custom title", custom_tags: ["loi-climat-resilience"]) - - assert DB.Dataset.climate_resilience_bill?(dataset) - # fake a resource expiring today - %DB.Resource{id: resource_id} = - resource = insert(:resource, dataset: dataset, format: "GTFS", title: resource_title = "Super GTFS") - - multi_validation = - insert(:multi_validation, - validator: Transport.Validators.GTFSTransport.validator_name(), - resource_history: insert(:resource_history, resource: resource) - ) - - insert(:resource_metadata, - multi_validation_id: multi_validation.id, - metadata: %{"end_date" => Date.utc_today()} - ) - - assert [{%DB.Dataset{id: ^dataset_id}, [%DB.Resource{id: ^resource_id}]}] = - Date.utc_today() |> Transport.DataChecker.gtfs_datasets_expiring_on() - - %DB.Contact{id: contact_id, email: email} = contact = insert_contact() - - insert(:notification_subscription, %{ - reason: :expiration, - source: :admin, - role: :producer, - contact_id: contact_id, - dataset_id: dataset.id - }) - - # Should be ignored, this subscription is for a reuser - %DB.Contact{id: reuser_id} = insert_contact() - - insert(:notification_subscription, %{ - reason: :expiration, - source: :user, - role: :reuser, - contact_id: reuser_id, - dataset_id: dataset.id - }) - - assert :ok == perform_job(Transport.Jobs.OutdatedDataNotificationJob, %{}) - - # a first mail to our team - - assert_email_sent(fn %Swoosh.Email{ - from: {"transport.data.gouv.fr", "contact@transport.data.gouv.fr"}, - to: [{"", "contact@transport.data.gouv.fr"}], - subject: "Jeux de données arrivant à expiration", - text_body: nil, - html_body: body - } -> - assert body =~ ~r/Jeux de données périmant demain :/ - - assert body =~ - ~s|