diff --git a/apps/transport/lib/jobs/outdated_data_notification_job.ex b/apps/transport/lib/jobs/outdated_data_notification_job.ex index b32918b372..e5321d1cba 100644 --- a/apps/transport/lib/jobs/outdated_data_notification_job.ex +++ b/apps/transport/lib/jobs/outdated_data_notification_job.ex @@ -1,15 +1,77 @@ defmodule Transport.Jobs.OutdatedDataNotificationJob do @moduledoc """ This module is in charge of sending notifications to both admins and users when data is outdated. - It is (currently) using the old DataChecker module, where there is also code for checking active/inactive datasets. - Behaviour of this job is tested in test/transport/data_checker_test.exs. """ + use Oban.Worker, max_attempts: 3, tags: ["notifications"] + import Ecto.Query + + @type delay_and_records :: {integer(), [{DB.Dataset.t(), [DB.Resource.t()]}]} + @expiration_reason Transport.NotificationReason.reason(:expiration) + # If delay < 0, the resource is already expired + @default_outdated_data_delays [-90, -60, -30, -45, -15, -7, -3, 0, 7, 14] @impl Oban.Worker def perform(%Oban.Job{id: job_id}) do - Transport.DataChecker.outdated_data(job_id) + outdated_data(job_id) :ok end + + def outdated_data(job_id) do + for delay <- possible_delays(), + date = Date.add(Date.utc_today(), delay) do + {delay, gtfs_datasets_expiring_on(date)} + end + |> Enum.reject(fn {_, records} -> Enum.empty?(records) end) + |> send_outdated_data_admin_mail() + |> Enum.map(&send_outdated_data_producer_notifications(&1, job_id)) + end + + @spec gtfs_datasets_expiring_on(Date.t()) :: [{DB.Dataset.t(), [DB.Resource.t()]}] + def gtfs_datasets_expiring_on(%Date{} = date) do + DB.Dataset.base_query() + |> DB.Dataset.join_from_dataset_to_metadata(Transport.Validators.GTFSTransport.validator_name()) + |> where( + [metadata: m, resource: r], + fragment("TO_DATE(?->>'end_date', 'YYYY-MM-DD')", m.metadata) == ^date and r.format == "GTFS" + ) + |> select([dataset: d, resource: r], {d, r}) + |> distinct(true) + |> DB.Repo.all() + |> Enum.group_by(fn {%DB.Dataset{} = d, _} -> d end, fn {_, %DB.Resource{} = r} -> r end) + |> Enum.to_list() + end + + def possible_delays do + @default_outdated_data_delays + |> Enum.uniq() + |> Enum.sort() + end + + # A different email is sent to producers for every delay, containing all datasets expiring on this given delay + @spec send_outdated_data_producer_notifications(delay_and_records(), integer()) :: :ok + def send_outdated_data_producer_notifications({delay, records}, job_id) do + Enum.each(records, fn {%DB.Dataset{} = dataset, resources} -> + @expiration_reason + |> DB.NotificationSubscription.subscriptions_for_reason_dataset_and_role(dataset, :producer) + |> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription -> + contact + |> Transport.UserNotifier.expiration_producer(dataset, resources, delay) + |> Transport.Mailer.deliver() + + DB.Notification.insert!(dataset, subscription, %{delay: delay, job_id: job_id}) + end) + end) + end + + @spec send_outdated_data_admin_mail([delay_and_records()]) :: [delay_and_records()] + defp send_outdated_data_admin_mail([] = _records), do: [] + + defp send_outdated_data_admin_mail(records) do + Transport.AdminNotifier.expiration(records) + |> Transport.Mailer.deliver() + + records + end end diff --git a/apps/transport/lib/transport/data_checker.ex b/apps/transport/lib/transport/data_checker.ex index 9724c70bf5..6cdd2f3d1d 100644 --- a/apps/transport/lib/transport/data_checker.ex +++ b/apps/transport/lib/transport/data_checker.ex @@ -1,18 +1,12 @@ defmodule Transport.DataChecker do @moduledoc """ - Use to check data for two things: - - Toggle in and of active status of datasets depending on status on data.gouv.fr - - Send notifications to producers and admins when data is outdated + Use to check data for toggling on and off active status of datasets depending on status on data.gouv.fr """ alias DB.{Dataset, Repo} import Ecto.Query require Logger - @type delay_and_records :: {integer(), [{DB.Dataset.t(), [DB.Resource.t()]}]} @type dataset_status :: :active | :inactive | :ignore | :no_producer | {:archived, DateTime.t()} - @expiration_reason Transport.NotificationReason.reason(:expiration) - # If delay < 0, the resource is already expired - @default_outdated_data_delays [-90, -60, -30, -45, -15, -7, -3, 0, 7, 14] @doc """ This method is a scheduled job which does two things: @@ -107,63 +101,6 @@ defmodule Transport.DataChecker do ) end - def outdated_data(job_id) do - for delay <- possible_delays(), - date = Date.add(Date.utc_today(), delay) do - {delay, gtfs_datasets_expiring_on(date)} - end - |> Enum.reject(fn {_, records} -> Enum.empty?(records) end) - |> send_outdated_data_admin_mail() - |> Enum.map(&send_outdated_data_producer_notifications(&1, job_id)) - end - - @spec gtfs_datasets_expiring_on(Date.t()) :: [{DB.Dataset.t(), [DB.Resource.t()]}] - def gtfs_datasets_expiring_on(%Date{} = date) do - DB.Dataset.base_query() - |> DB.Dataset.join_from_dataset_to_metadata(Transport.Validators.GTFSTransport.validator_name()) - |> where( - [metadata: m, resource: r], - fragment("TO_DATE(?->>'end_date', 'YYYY-MM-DD')", m.metadata) == ^date and r.format == "GTFS" - ) - |> select([dataset: d, resource: r], {d, r}) - |> distinct(true) - |> DB.Repo.all() - |> Enum.group_by(fn {%DB.Dataset{} = d, _} -> d end, fn {_, %DB.Resource{} = r} -> r end) - |> Enum.to_list() - end - - def possible_delays do - @default_outdated_data_delays - |> Enum.uniq() - |> Enum.sort() - end - - # A different email is sent to producers for every delay, containing in a single email all datasets expiring on this given delay - @spec send_outdated_data_producer_notifications(delay_and_records(), integer()) :: :ok - def send_outdated_data_producer_notifications({delay, records}, job_id) do - Enum.each(records, fn {%DB.Dataset{} = dataset, resources} -> - @expiration_reason - |> DB.NotificationSubscription.subscriptions_for_reason_dataset_and_role(dataset, :producer) - |> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription -> - contact - |> Transport.UserNotifier.expiration_producer(dataset, resources, delay) - |> Transport.Mailer.deliver() - - DB.Notification.insert!(dataset, subscription, %{delay: delay, job_id: job_id}) - end) - end) - end - - @spec send_outdated_data_admin_mail([delay_and_records()]) :: [delay_and_records()] - defp send_outdated_data_admin_mail([] = _records), do: [] - - defp send_outdated_data_admin_mail(records) do - Transport.AdminNotifier.expiration(records) - |> Transport.Mailer.deliver() - - records - end - # Do nothing if all lists are empty defp send_inactive_datasets_mail([] = _reactivated_datasets, [] = _inactive_datasets, [] = _archived_datasets), do: nil diff --git a/apps/transport/test/transport/data_checker_test.exs b/apps/transport/test/transport/data_checker_test.exs index 8f58b73ba5..c4b9372070 100644 --- a/apps/transport/test/transport/data_checker_test.exs +++ b/apps/transport/test/transport/data_checker_test.exs @@ -3,9 +3,6 @@ defmodule Transport.DataCheckerTest do import Mox import DB.Factory import Swoosh.TestAssertions - use Oban.Testing, repo: DB.Repo - - doctest Transport.DataChecker, import: true setup :verify_on_exit! @@ -128,193 +125,6 @@ defmodule Transport.DataCheckerTest do end end - test "gtfs_datasets_expiring_on" do - {today, tomorrow, yesterday} = {Date.utc_today(), Date.add(Date.utc_today(), 1), Date.add(Date.utc_today(), -1)} - assert [] == today |> Transport.DataChecker.gtfs_datasets_expiring_on() - - insert_fn = fn %Date{} = expiration_date, %DB.Dataset{} = dataset -> - multi_validation = - insert(:multi_validation, - validator: Transport.Validators.GTFSTransport.validator_name(), - resource_history: insert(:resource_history, resource: insert(:resource, dataset: dataset, format: "GTFS")) - ) - - insert(:resource_metadata, - multi_validation_id: multi_validation.id, - metadata: %{"end_date" => expiration_date} - ) - end - - # Ignores hidden or inactive datasets - insert_fn.(today, insert(:dataset, is_active: false)) - insert_fn.(today, insert(:dataset, is_active: true, is_hidden: true)) - - assert [] == today |> Transport.DataChecker.gtfs_datasets_expiring_on() - - # 2 GTFS resources expiring on the same day for a dataset - %DB.Dataset{id: dataset_id} = dataset = insert(:dataset, is_active: true) - insert_fn.(today, dataset) - insert_fn.(today, dataset) - - assert [ - {%DB.Dataset{id: ^dataset_id}, - [%DB.Resource{dataset_id: ^dataset_id}, %DB.Resource{dataset_id: ^dataset_id}]} - ] = today |> Transport.DataChecker.gtfs_datasets_expiring_on() - - assert [] == tomorrow |> Transport.DataChecker.gtfs_datasets_expiring_on() - assert [] == yesterday |> Transport.DataChecker.gtfs_datasets_expiring_on() - - insert_fn.(tomorrow, dataset) - - assert [ - {%DB.Dataset{id: ^dataset_id}, - [%DB.Resource{dataset_id: ^dataset_id}, %DB.Resource{dataset_id: ^dataset_id}]} - ] = today |> Transport.DataChecker.gtfs_datasets_expiring_on() - - assert [ - {%DB.Dataset{id: ^dataset_id}, [%DB.Resource{dataset_id: ^dataset_id}]} - ] = tomorrow |> Transport.DataChecker.gtfs_datasets_expiring_on() - - assert [] == yesterday |> Transport.DataChecker.gtfs_datasets_expiring_on() - - # Multiple datasets - %DB.Dataset{id: d2_id} = d2 = insert(:dataset, is_active: true) - insert_fn.(today, d2) - - assert [ - {%DB.Dataset{id: ^dataset_id}, - [%DB.Resource{dataset_id: ^dataset_id}, %DB.Resource{dataset_id: ^dataset_id}]}, - {%DB.Dataset{id: ^d2_id}, [%DB.Resource{dataset_id: ^d2_id}]} - ] = today |> Transport.DataChecker.gtfs_datasets_expiring_on() - end - - describe "outdated_data job" do - test "sends email to our team + relevant contact before expiry" do - %DB.Dataset{id: dataset_id} = - dataset = - insert(:dataset, is_active: true, custom_title: "Dataset custom title", custom_tags: ["loi-climat-resilience"]) - - assert DB.Dataset.climate_resilience_bill?(dataset) - # fake a resource expiring today - %DB.Resource{id: resource_id} = - resource = insert(:resource, dataset: dataset, format: "GTFS", title: resource_title = "Super GTFS") - - multi_validation = - insert(:multi_validation, - validator: Transport.Validators.GTFSTransport.validator_name(), - resource_history: insert(:resource_history, resource: resource) - ) - - insert(:resource_metadata, - multi_validation_id: multi_validation.id, - metadata: %{"end_date" => Date.utc_today()} - ) - - assert [{%DB.Dataset{id: ^dataset_id}, [%DB.Resource{id: ^resource_id}]}] = - Date.utc_today() |> Transport.DataChecker.gtfs_datasets_expiring_on() - - %DB.Contact{id: contact_id, email: email} = contact = insert_contact() - - insert(:notification_subscription, %{ - reason: :expiration, - source: :admin, - role: :producer, - contact_id: contact_id, - dataset_id: dataset.id - }) - - # Should be ignored, this subscription is for a reuser - %DB.Contact{id: reuser_id} = insert_contact() - - insert(:notification_subscription, %{ - reason: :expiration, - source: :user, - role: :reuser, - contact_id: reuser_id, - dataset_id: dataset.id - }) - - assert :ok == perform_job(Transport.Jobs.OutdatedDataNotificationJob, %{}) - - # a first mail to our team - - assert_email_sent(fn %Swoosh.Email{ - from: {"transport.data.gouv.fr", "contact@transport.data.gouv.fr"}, - to: [{"", "contact@transport.data.gouv.fr"}], - subject: "Jeux de données arrivant à expiration", - text_body: nil, - html_body: body - } -> - assert body =~ ~r/Jeux de données périmant demain :/ - - assert body =~ - ~s|
  • #{dataset.custom_title} - ✅ notification automatique ⚖️🗺️ article 122
  • | - end) - - # a second mail to the email address in the notifications config - display_name = DB.Contact.display_name(contact) - - assert_email_sent(fn %Swoosh.Email{ - from: {"transport.data.gouv.fr", "contact@transport.data.gouv.fr"}, - to: [{^display_name, ^email}], - subject: "Jeu de données arrivant à expiration", - html_body: html_body - } -> - refute html_body =~ "notification automatique" - refute html_body =~ "article 122" - - assert html_body =~ - ~s(Les données GTFS #{resource_title} associées au jeu de données #{dataset.custom_title} périment demain.) - - assert html_body =~ - ~s(remplaçant la ressource périmée par la nouvelle) - end) - end - - test "outdated_data job with nothing to send should not send email" do - assert :ok == perform_job(Transport.Jobs.OutdatedDataNotificationJob, %{}) - assert_no_email_sent() - end - end - - test "send_outdated_data_notifications" do - %{id: dataset_id} = dataset = insert(:dataset) - %DB.Contact{id: contact_id, email: email} = contact = insert_contact() - - %DB.NotificationSubscription{id: ns_id} = - insert(:notification_subscription, %{ - reason: :expiration, - source: :admin, - role: :producer, - contact_id: contact_id, - dataset_id: dataset.id - }) - - job_id = 42 - Transport.DataChecker.send_outdated_data_producer_notifications({7, [{dataset, []}]}, job_id) - - assert_email_sent( - from: {"transport.data.gouv.fr", "contact@transport.data.gouv.fr"}, - to: {DB.Contact.display_name(contact), email}, - subject: "Jeu de données arrivant à expiration", - text_body: nil, - html_body: ~r/Bonjour/ - ) - - assert [ - %DB.Notification{ - contact_id: ^contact_id, - email: ^email, - reason: :expiration, - dataset_id: ^dataset_id, - notification_subscription_id: ^ns_id, - role: :producer, - payload: %{"delay" => 7, "job_id" => ^job_id} - } - ] = - DB.Notification |> DB.Repo.all() - end - describe "dataset_status" do test "active" do dataset = %DB.Dataset{datagouv_id: Ecto.UUID.generate()} diff --git a/apps/transport/test/transport/jobs/outdated_data_notification_job_test.exs b/apps/transport/test/transport/jobs/outdated_data_notification_job_test.exs new file mode 100644 index 0000000000..c83275757b --- /dev/null +++ b/apps/transport/test/transport/jobs/outdated_data_notification_job_test.exs @@ -0,0 +1,172 @@ +defmodule Transport.Test.Transport.Jobs.OutdatedDataNotificationJobTest do + use ExUnit.Case, async: true + import DB.Factory + import Swoosh.TestAssertions + use Oban.Testing, repo: DB.Repo + + setup do + Ecto.Adapters.SQL.Sandbox.checkout(DB.Repo) + end + + test "sends email to our team + relevant contact before expiry" do + %DB.Dataset{id: dataset_id} = + dataset = + insert(:dataset, is_active: true, custom_title: "Dataset custom title", custom_tags: ["loi-climat-resilience"]) + + assert DB.Dataset.climate_resilience_bill?(dataset) + # fake a resource expiring today + %DB.Resource{id: resource_id} = + resource = insert(:resource, dataset: dataset, format: "GTFS", title: resource_title = "Super GTFS") + + multi_validation = + insert(:multi_validation, + validator: Transport.Validators.GTFSTransport.validator_name(), + resource_history: insert(:resource_history, resource: resource) + ) + + insert(:resource_metadata, + multi_validation_id: multi_validation.id, + metadata: %{"end_date" => Date.utc_today()} + ) + + assert [{%DB.Dataset{id: ^dataset_id}, [%DB.Resource{id: ^resource_id}]}] = + Date.utc_today() |> Transport.Jobs.OutdatedDataNotificationJob.gtfs_datasets_expiring_on() + + %DB.Contact{id: contact_id, email: email} = contact = insert_contact() + + %DB.NotificationSubscription{id: ns_id} = + insert(:notification_subscription, %{ + reason: :expiration, + source: :admin, + role: :producer, + contact_id: contact_id, + dataset_id: dataset.id + }) + + # Should be ignored, this subscription is for a reuser + %DB.Contact{id: reuser_id} = insert_contact() + + insert(:notification_subscription, %{ + reason: :expiration, + source: :user, + role: :reuser, + contact_id: reuser_id, + dataset_id: dataset.id + }) + + assert :ok == perform_job(Transport.Jobs.OutdatedDataNotificationJob, %{}) + + # a first mail to our team + + assert_email_sent(fn %Swoosh.Email{ + from: {"transport.data.gouv.fr", "contact@transport.data.gouv.fr"}, + to: [{"", "contact@transport.data.gouv.fr"}], + subject: "Jeux de données arrivant à expiration", + text_body: nil, + html_body: body + } -> + assert body =~ ~r/Jeux de données périmant demain :/ + + assert body =~ + ~s|
  • #{dataset.custom_title} - ✅ notification automatique ⚖️🗺️ article 122
  • | + end) + + # a second mail to the email address in the notifications config + display_name = DB.Contact.display_name(contact) + + assert_email_sent(fn %Swoosh.Email{ + from: {"transport.data.gouv.fr", "contact@transport.data.gouv.fr"}, + to: [{^display_name, ^email}], + subject: "Jeu de données arrivant à expiration", + html_body: html_body + } -> + refute html_body =~ "notification automatique" + refute html_body =~ "article 122" + + assert html_body =~ + ~s(Les données GTFS #{resource_title} associées au jeu de données #{dataset.custom_title} périment demain.) + + assert html_body =~ + ~s(remplaçant la ressource périmée par la nouvelle) + end) + + # Logs are there + assert [ + %DB.Notification{ + contact_id: ^contact_id, + email: ^email, + reason: :expiration, + dataset_id: ^dataset_id, + notification_subscription_id: ^ns_id, + role: :producer, + payload: %{"delay" => 0, "job_id" => _job_id} + } + ] = + DB.Notification |> DB.Repo.all() + end + + test "outdated_data job with nothing to send should not send email" do + assert :ok == perform_job(Transport.Jobs.OutdatedDataNotificationJob, %{}) + assert_no_email_sent() + end + + test "gtfs_datasets_expiring_on" do + {today, tomorrow, yesterday} = {Date.utc_today(), Date.add(Date.utc_today(), 1), Date.add(Date.utc_today(), -1)} + assert [] == today |> Transport.Jobs.OutdatedDataNotificationJob.gtfs_datasets_expiring_on() + + insert_fn = fn %Date{} = expiration_date, %DB.Dataset{} = dataset -> + multi_validation = + insert(:multi_validation, + validator: Transport.Validators.GTFSTransport.validator_name(), + resource_history: insert(:resource_history, resource: insert(:resource, dataset: dataset, format: "GTFS")) + ) + + insert(:resource_metadata, + multi_validation_id: multi_validation.id, + metadata: %{"end_date" => expiration_date} + ) + end + + # Ignores hidden or inactive datasets + insert_fn.(today, insert(:dataset, is_active: false)) + insert_fn.(today, insert(:dataset, is_active: true, is_hidden: true)) + + assert [] == today |> Transport.Jobs.OutdatedDataNotificationJob.gtfs_datasets_expiring_on() + + # 2 GTFS resources expiring on the same day for a dataset + %DB.Dataset{id: dataset_id} = dataset = insert(:dataset, is_active: true) + insert_fn.(today, dataset) + insert_fn.(today, dataset) + + assert [ + {%DB.Dataset{id: ^dataset_id}, + [%DB.Resource{dataset_id: ^dataset_id}, %DB.Resource{dataset_id: ^dataset_id}]} + ] = today |> Transport.Jobs.OutdatedDataNotificationJob.gtfs_datasets_expiring_on() + + assert [] == tomorrow |> Transport.Jobs.OutdatedDataNotificationJob.gtfs_datasets_expiring_on() + assert [] == yesterday |> Transport.Jobs.OutdatedDataNotificationJob.gtfs_datasets_expiring_on() + + insert_fn.(tomorrow, dataset) + + assert [ + {%DB.Dataset{id: ^dataset_id}, + [%DB.Resource{dataset_id: ^dataset_id}, %DB.Resource{dataset_id: ^dataset_id}]} + ] = today |> Transport.Jobs.OutdatedDataNotificationJob.gtfs_datasets_expiring_on() + + assert [ + {%DB.Dataset{id: ^dataset_id}, [%DB.Resource{dataset_id: ^dataset_id}]} + ] = tomorrow |> Transport.Jobs.OutdatedDataNotificationJob.gtfs_datasets_expiring_on() + + assert [] == yesterday |> Transport.Jobs.OutdatedDataNotificationJob.gtfs_datasets_expiring_on() + + # Multiple datasets + %DB.Dataset{id: d2_id} = d2 = insert(:dataset, is_active: true) + insert_fn.(today, d2) + + assert [ + {%DB.Dataset{id: ^dataset_id}, + [%DB.Resource{dataset_id: ^dataset_id}, %DB.Resource{dataset_id: ^dataset_id}]}, + {%DB.Dataset{id: ^d2_id}, [%DB.Resource{dataset_id: ^d2_id}]} + ] = today |> Transport.Jobs.OutdatedDataNotificationJob.gtfs_datasets_expiring_on() + end +end