Skip to content

Commit

Permalink
Move code from ImportData to OutdatedNotificationJob and merge two si…
Browse files Browse the repository at this point in the history
…milar tests
  • Loading branch information
vdegove committed Dec 17, 2024
1 parent a8aa8c4 commit 439bf1d
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 257 deletions.
68 changes: 65 additions & 3 deletions apps/transport/lib/jobs/outdated_data_notification_job.ex
Original file line number Diff line number Diff line change
@@ -1,15 +1,77 @@
defmodule Transport.Jobs.OutdatedDataNotificationJob do
@moduledoc """
This module is in charge of sending notifications to both admins and users when data is outdated.
It is (currently) using the old DataChecker module, where there is also code for checking active/inactive datasets.
Behaviour of this job is tested in test/transport/data_checker_test.exs.
"""

use Oban.Worker, max_attempts: 3, tags: ["notifications"]
import Ecto.Query

@type delay_and_records :: {integer(), [{DB.Dataset.t(), [DB.Resource.t()]}]}
@expiration_reason Transport.NotificationReason.reason(:expiration)
# If delay < 0, the resource is already expired
@default_outdated_data_delays [-90, -60, -30, -45, -15, -7, -3, 0, 7, 14]

@impl Oban.Worker

def perform(%Oban.Job{id: job_id}) do
Transport.DataChecker.outdated_data(job_id)
outdated_data(job_id)
:ok
end

def outdated_data(job_id) do
for delay <- possible_delays(),
date = Date.add(Date.utc_today(), delay) do
{delay, gtfs_datasets_expiring_on(date)}
end
|> Enum.reject(fn {_, records} -> Enum.empty?(records) end)
|> send_outdated_data_admin_mail()
|> Enum.map(&send_outdated_data_producer_notifications(&1, job_id))
end

@spec gtfs_datasets_expiring_on(Date.t()) :: [{DB.Dataset.t(), [DB.Resource.t()]}]
def gtfs_datasets_expiring_on(%Date{} = date) do
DB.Dataset.base_query()
|> DB.Dataset.join_from_dataset_to_metadata(Transport.Validators.GTFSTransport.validator_name())
|> where(
[metadata: m, resource: r],
fragment("TO_DATE(?->>'end_date', 'YYYY-MM-DD')", m.metadata) == ^date and r.format == "GTFS"
)
|> select([dataset: d, resource: r], {d, r})
|> distinct(true)
|> DB.Repo.all()
|> Enum.group_by(fn {%DB.Dataset{} = d, _} -> d end, fn {_, %DB.Resource{} = r} -> r end)
|> Enum.to_list()
end

def possible_delays do
@default_outdated_data_delays
|> Enum.uniq()
|> Enum.sort()
end

# A different email is sent to producers for every delay, containing all datasets expiring on this given delay
@spec send_outdated_data_producer_notifications(delay_and_records(), integer()) :: :ok
def send_outdated_data_producer_notifications({delay, records}, job_id) do
Enum.each(records, fn {%DB.Dataset{} = dataset, resources} ->
@expiration_reason
|> DB.NotificationSubscription.subscriptions_for_reason_dataset_and_role(dataset, :producer)
|> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription ->
contact
|> Transport.UserNotifier.expiration_producer(dataset, resources, delay)
|> Transport.Mailer.deliver()

DB.Notification.insert!(dataset, subscription, %{delay: delay, job_id: job_id})
end)
end)
end

@spec send_outdated_data_admin_mail([delay_and_records()]) :: [delay_and_records()]
defp send_outdated_data_admin_mail([] = _records), do: []

defp send_outdated_data_admin_mail(records) do
Transport.AdminNotifier.expiration(records)
|> Transport.Mailer.deliver()

records
end
end
65 changes: 1 addition & 64 deletions apps/transport/lib/transport/data_checker.ex
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
defmodule Transport.DataChecker do
@moduledoc """
Use to check data for two things:
- Toggle in and of active status of datasets depending on status on data.gouv.fr
- Send notifications to producers and admins when data is outdated
Use to check data for toggling on and off active status of datasets depending on status on data.gouv.fr
"""
alias DB.{Dataset, Repo}
import Ecto.Query
require Logger

@type delay_and_records :: {integer(), [{DB.Dataset.t(), [DB.Resource.t()]}]}
@type dataset_status :: :active | :inactive | :ignore | :no_producer | {:archived, DateTime.t()}
@expiration_reason Transport.NotificationReason.reason(:expiration)
# If delay < 0, the resource is already expired
@default_outdated_data_delays [-90, -60, -30, -45, -15, -7, -3, 0, 7, 14]

@doc """
This method is a scheduled job which does two things:
Expand Down Expand Up @@ -107,63 +101,6 @@ defmodule Transport.DataChecker do
)
end

def outdated_data(job_id) do
for delay <- possible_delays(),
date = Date.add(Date.utc_today(), delay) do
{delay, gtfs_datasets_expiring_on(date)}
end
|> Enum.reject(fn {_, records} -> Enum.empty?(records) end)
|> send_outdated_data_admin_mail()
|> Enum.map(&send_outdated_data_producer_notifications(&1, job_id))
end

@spec gtfs_datasets_expiring_on(Date.t()) :: [{DB.Dataset.t(), [DB.Resource.t()]}]
def gtfs_datasets_expiring_on(%Date{} = date) do
DB.Dataset.base_query()
|> DB.Dataset.join_from_dataset_to_metadata(Transport.Validators.GTFSTransport.validator_name())
|> where(
[metadata: m, resource: r],
fragment("TO_DATE(?->>'end_date', 'YYYY-MM-DD')", m.metadata) == ^date and r.format == "GTFS"
)
|> select([dataset: d, resource: r], {d, r})
|> distinct(true)
|> DB.Repo.all()
|> Enum.group_by(fn {%DB.Dataset{} = d, _} -> d end, fn {_, %DB.Resource{} = r} -> r end)
|> Enum.to_list()
end

def possible_delays do
@default_outdated_data_delays
|> Enum.uniq()
|> Enum.sort()
end

# A different email is sent to producers for every delay, containing in a single email all datasets expiring on this given delay
@spec send_outdated_data_producer_notifications(delay_and_records(), integer()) :: :ok
def send_outdated_data_producer_notifications({delay, records}, job_id) do
Enum.each(records, fn {%DB.Dataset{} = dataset, resources} ->
@expiration_reason
|> DB.NotificationSubscription.subscriptions_for_reason_dataset_and_role(dataset, :producer)
|> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription ->
contact
|> Transport.UserNotifier.expiration_producer(dataset, resources, delay)
|> Transport.Mailer.deliver()

DB.Notification.insert!(dataset, subscription, %{delay: delay, job_id: job_id})
end)
end)
end

@spec send_outdated_data_admin_mail([delay_and_records()]) :: [delay_and_records()]
defp send_outdated_data_admin_mail([] = _records), do: []

defp send_outdated_data_admin_mail(records) do
Transport.AdminNotifier.expiration(records)
|> Transport.Mailer.deliver()

records
end

# Do nothing if all lists are empty
defp send_inactive_datasets_mail([] = _reactivated_datasets, [] = _inactive_datasets, [] = _archived_datasets),
do: nil
Expand Down
190 changes: 0 additions & 190 deletions apps/transport/test/transport/data_checker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ defmodule Transport.DataCheckerTest do
import Mox
import DB.Factory
import Swoosh.TestAssertions
use Oban.Testing, repo: DB.Repo

doctest Transport.DataChecker, import: true

setup :verify_on_exit!

Expand Down Expand Up @@ -128,193 +125,6 @@ defmodule Transport.DataCheckerTest do
end
end

test "gtfs_datasets_expiring_on" do
{today, tomorrow, yesterday} = {Date.utc_today(), Date.add(Date.utc_today(), 1), Date.add(Date.utc_today(), -1)}
assert [] == today |> Transport.DataChecker.gtfs_datasets_expiring_on()

insert_fn = fn %Date{} = expiration_date, %DB.Dataset{} = dataset ->
multi_validation =
insert(:multi_validation,
validator: Transport.Validators.GTFSTransport.validator_name(),
resource_history: insert(:resource_history, resource: insert(:resource, dataset: dataset, format: "GTFS"))
)

insert(:resource_metadata,
multi_validation_id: multi_validation.id,
metadata: %{"end_date" => expiration_date}
)
end

# Ignores hidden or inactive datasets
insert_fn.(today, insert(:dataset, is_active: false))
insert_fn.(today, insert(:dataset, is_active: true, is_hidden: true))

assert [] == today |> Transport.DataChecker.gtfs_datasets_expiring_on()

# 2 GTFS resources expiring on the same day for a dataset
%DB.Dataset{id: dataset_id} = dataset = insert(:dataset, is_active: true)
insert_fn.(today, dataset)
insert_fn.(today, dataset)

assert [
{%DB.Dataset{id: ^dataset_id},
[%DB.Resource{dataset_id: ^dataset_id}, %DB.Resource{dataset_id: ^dataset_id}]}
] = today |> Transport.DataChecker.gtfs_datasets_expiring_on()

assert [] == tomorrow |> Transport.DataChecker.gtfs_datasets_expiring_on()
assert [] == yesterday |> Transport.DataChecker.gtfs_datasets_expiring_on()

insert_fn.(tomorrow, dataset)

assert [
{%DB.Dataset{id: ^dataset_id},
[%DB.Resource{dataset_id: ^dataset_id}, %DB.Resource{dataset_id: ^dataset_id}]}
] = today |> Transport.DataChecker.gtfs_datasets_expiring_on()

assert [
{%DB.Dataset{id: ^dataset_id}, [%DB.Resource{dataset_id: ^dataset_id}]}
] = tomorrow |> Transport.DataChecker.gtfs_datasets_expiring_on()

assert [] == yesterday |> Transport.DataChecker.gtfs_datasets_expiring_on()

# Multiple datasets
%DB.Dataset{id: d2_id} = d2 = insert(:dataset, is_active: true)
insert_fn.(today, d2)

assert [
{%DB.Dataset{id: ^dataset_id},
[%DB.Resource{dataset_id: ^dataset_id}, %DB.Resource{dataset_id: ^dataset_id}]},
{%DB.Dataset{id: ^d2_id}, [%DB.Resource{dataset_id: ^d2_id}]}
] = today |> Transport.DataChecker.gtfs_datasets_expiring_on()
end

describe "outdated_data job" do
test "sends email to our team + relevant contact before expiry" do
%DB.Dataset{id: dataset_id} =
dataset =
insert(:dataset, is_active: true, custom_title: "Dataset custom title", custom_tags: ["loi-climat-resilience"])

assert DB.Dataset.climate_resilience_bill?(dataset)
# fake a resource expiring today
%DB.Resource{id: resource_id} =
resource = insert(:resource, dataset: dataset, format: "GTFS", title: resource_title = "Super GTFS")

multi_validation =
insert(:multi_validation,
validator: Transport.Validators.GTFSTransport.validator_name(),
resource_history: insert(:resource_history, resource: resource)
)

insert(:resource_metadata,
multi_validation_id: multi_validation.id,
metadata: %{"end_date" => Date.utc_today()}
)

assert [{%DB.Dataset{id: ^dataset_id}, [%DB.Resource{id: ^resource_id}]}] =
Date.utc_today() |> Transport.DataChecker.gtfs_datasets_expiring_on()

%DB.Contact{id: contact_id, email: email} = contact = insert_contact()

insert(:notification_subscription, %{
reason: :expiration,
source: :admin,
role: :producer,
contact_id: contact_id,
dataset_id: dataset.id
})

# Should be ignored, this subscription is for a reuser
%DB.Contact{id: reuser_id} = insert_contact()

insert(:notification_subscription, %{
reason: :expiration,
source: :user,
role: :reuser,
contact_id: reuser_id,
dataset_id: dataset.id
})

assert :ok == perform_job(Transport.Jobs.OutdatedDataNotificationJob, %{})

# a first mail to our team

assert_email_sent(fn %Swoosh.Email{
from: {"transport.data.gouv.fr", "[email protected]"},
to: [{"", "[email protected]"}],
subject: "Jeux de données arrivant à expiration",
text_body: nil,
html_body: body
} ->
assert body =~ ~r/Jeux de données périmant demain :/

assert body =~
~s|<li><a href="http://127.0.0.1:5100/datasets/#{dataset.slug}">#{dataset.custom_title}</a> - ✅ notification automatique ⚖️🗺️ article 122</li>|
end)

# a second mail to the email address in the notifications config
display_name = DB.Contact.display_name(contact)

assert_email_sent(fn %Swoosh.Email{
from: {"transport.data.gouv.fr", "[email protected]"},
to: [{^display_name, ^email}],
subject: "Jeu de données arrivant à expiration",
html_body: html_body
} ->
refute html_body =~ "notification automatique"
refute html_body =~ "article 122"

assert html_body =~
~s(Les données GTFS #{resource_title} associées au jeu de données <a href="http://127.0.0.1:5100/datasets/#{dataset.slug}">#{dataset.custom_title}</a> périment demain.)

assert html_body =~
~s(<a href="https://doc.transport.data.gouv.fr/administration-des-donnees/procedures-de-publication/mettre-a-jour-des-donnees#remplacer-un-jeu-de-donnees-existant-plutot-quen-creer-un-nouveau">remplaçant la ressource périmée par la nouvelle</a>)
end)
end

test "outdated_data job with nothing to send should not send email" do
assert :ok == perform_job(Transport.Jobs.OutdatedDataNotificationJob, %{})
assert_no_email_sent()
end
end

test "send_outdated_data_notifications" do
%{id: dataset_id} = dataset = insert(:dataset)
%DB.Contact{id: contact_id, email: email} = contact = insert_contact()

%DB.NotificationSubscription{id: ns_id} =
insert(:notification_subscription, %{
reason: :expiration,
source: :admin,
role: :producer,
contact_id: contact_id,
dataset_id: dataset.id
})

job_id = 42
Transport.DataChecker.send_outdated_data_producer_notifications({7, [{dataset, []}]}, job_id)

assert_email_sent(
from: {"transport.data.gouv.fr", "[email protected]"},
to: {DB.Contact.display_name(contact), email},
subject: "Jeu de données arrivant à expiration",
text_body: nil,
html_body: ~r/Bonjour/
)

assert [
%DB.Notification{
contact_id: ^contact_id,
email: ^email,
reason: :expiration,
dataset_id: ^dataset_id,
notification_subscription_id: ^ns_id,
role: :producer,
payload: %{"delay" => 7, "job_id" => ^job_id}
}
] =
DB.Notification |> DB.Repo.all()
end

describe "dataset_status" do
test "active" do
dataset = %DB.Dataset{datagouv_id: Ecto.UUID.generate()}
Expand Down
Loading

0 comments on commit 439bf1d

Please sign in to comment.