-
Notifications
You must be signed in to change notification settings - Fork 31
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Validateur NeTEx : polling des résultats (#4326)
* Revert "Temporaire : désactivation validation NeTEx (#4295)" This reverts commit f24f1e5. * Polling résultats validation enRoute * Commentaires de review * Evitons que le snooze ne retarde les retry légitimes * Lien vers de la doc Oban * Poller NeTEx: contrainte d'unicité
- Loading branch information
Showing
14 changed files
with
788 additions
and
226 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
defmodule Transport.Jobs.NeTExPollerJob do | ||
@moduledoc """ | ||
Companion module to the validator for NeTEx files, used to handle long | ||
standing validations. | ||
""" | ||
|
||
# Max attempts doesn't really matter here as it's useful for workers failing. | ||
# Here we mostly poll and excepted network errors, the worker won't fail. | ||
@max_attempts 3 | ||
|
||
use Oban.Worker, | ||
tags: ["validation"], | ||
max_attempts: @max_attempts, | ||
queue: :resource_validation, | ||
unique: [fields: [:args, :worker]] | ||
|
||
alias Transport.Validators.NeTEx | ||
|
||
# Override the backoff to play nice and avoiding falling in very slow retry | ||
# after an important streak of snoozing (which increments the `attempt` | ||
# counter). | ||
# | ||
# See https://hexdocs.pm/oban/Oban.Worker.html#module-snoozing-jobs. | ||
@impl Worker | ||
def backoff(%Oban.Job{} = job) do | ||
corrected_attempt = @max_attempts - (job.max_attempts - job.attempt) | ||
|
||
Worker.backoff(%{job | attempt: corrected_attempt}) | ||
end | ||
|
||
@impl Worker | ||
def perform(%Oban.Job{ | ||
args: %{ | ||
"validation_id" => validation_id, | ||
"resource_history_id" => resource_history_id | ||
}, | ||
attempt: attempt | ||
}) do | ||
NeTEx.poll_validation_results(validation_id, attempt) | ||
|> NeTEx.handle_validation_results(resource_history_id, fn ^validation_id -> snooze_poller(attempt) end) | ||
end | ||
|
||
def snooze_poller(attempt) do | ||
{:snooze, NeTEx.poll_interval(attempt)} | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
defmodule Transport.Jobs.OnDemandNeTExPollerJob do | ||
@moduledoc """ | ||
Job in charge of polling validation results from enRoute Chouette Valid. | ||
Upon success it stores the result in the database. | ||
""" | ||
|
||
# Max attempts doesn't really matter here as it's useful for workers failing. | ||
# Here we mostly poll and excepted network errors, the worker won't fail. | ||
@max_attempts 3 | ||
|
||
use Oban.Worker, | ||
tags: ["validation"], | ||
max_attempts: @max_attempts, | ||
queue: :on_demand_validation, | ||
unique: [fields: [:args, :worker]] | ||
|
||
alias Transport.Jobs.OnDemandValidationHelpers, as: Helpers | ||
alias Transport.Validators.NeTEx | ||
|
||
# Override the backoff to play nice and avoiding falling in very slow retry | ||
# after an important streak of snoozing (which increments the `attempt` | ||
# counter). | ||
# | ||
# See https://hexdocs.pm/oban/Oban.Worker.html#module-snoozing-jobs. | ||
@impl Worker | ||
def backoff(%Oban.Job{} = job) do | ||
corrected_attempt = @max_attempts - (job.max_attempts - job.attempt) | ||
|
||
Worker.backoff(%{job | attempt: corrected_attempt}) | ||
end | ||
|
||
@impl Oban.Worker | ||
def perform(%Oban.Job{args: %{"id" => multivalidation_id} = args, attempt: attempt}) do | ||
check_result(args, attempt) | ||
|> Helpers.handle_validation_result(multivalidation_id) | ||
end | ||
|
||
def later(validation_id, multivalidation_id, url) do | ||
%{validation_id: validation_id, id: multivalidation_id, permanent_url: url} | ||
|> new(schedule_in: {20, :seconds}) | ||
|> Oban.insert() | ||
|
||
Helpers.delegated_state() | ||
end | ||
|
||
def check_result(%{"permanent_url" => url, "validation_id" => validation_id}, attempt) do | ||
case NeTEx.poll_validation(validation_id, attempt) do | ||
{:error, error_result} -> handle_error(error_result) | ||
{:ok, ok_result} -> handle_success(ok_result, url) | ||
{:pending, _validation_id} -> handle_pending(attempt) | ||
end | ||
end | ||
|
||
def handle_error(error_result) do | ||
error_result | ||
|> build_error_validation_result() | ||
|> Helpers.terminal_state() | ||
end | ||
|
||
def handle_success(ok_result, url) do | ||
ok_result | ||
|> build_successful_validation_result(url) | ||
|> Helpers.terminal_state() | ||
end | ||
|
||
def handle_pending(attempt) do | ||
attempt | ||
|> NeTEx.poll_interval() | ||
|> Helpers.snoozed_state() | ||
end | ||
|
||
defp build_successful_validation_result(%{"validations" => validation, "metadata" => metadata}, url) do | ||
%{ | ||
result: validation, | ||
metadata: metadata, | ||
data_vis: nil, | ||
validator: NeTEx.validator_name(), | ||
validated_data_name: url, | ||
max_error: NeTEx.get_max_severity_error(validation), | ||
oban_args: Helpers.completed() | ||
} | ||
end | ||
|
||
defp build_error_validation_result(%{message: msg}) do | ||
%{ | ||
oban_args: Helpers.error(msg), | ||
validator: NeTEx.validator_name() | ||
} | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
defmodule Transport.Jobs.OnDemandValidationHelpers do | ||
@moduledoc """ | ||
Shared code for jobs implementing the On Demand validation. | ||
""" | ||
import Ecto.Changeset | ||
import Ecto.Query | ||
alias DB.{MultiValidation, Repo} | ||
|
||
def terminal_state(result), do: {:terminal, result} | ||
def delegated_state, do: :delegated | ||
def snoozed_state(duration_in_seconds), do: {:snooze, duration_in_seconds} | ||
|
||
def completed, do: %{"state" => "completed"} | ||
|
||
def error(error_message), do: %{"state" => "error", "error_reason" => error_message} | ||
|
||
def handle_validation_result(result, multivalidation_id) do | ||
case result do | ||
{:terminal, changes} -> update_multivalidation(multivalidation_id, changes) | ||
:delegated -> :ok | ||
{:snooze, _duration_in_seconds} -> result | ||
end | ||
end | ||
|
||
defp update_multivalidation(multivalidation_id, changes) do | ||
validation = %{oban_args: oban_args} = MultiValidation |> preload(:metadata) |> Repo.get!(multivalidation_id) | ||
|
||
# update oban_args with validator output | ||
oban_args = Map.merge(oban_args, Map.get(changes, :oban_args, %{})) | ||
changes = changes |> Map.put(:oban_args, oban_args) | ||
|
||
{metadata, changes} = Map.pop(changes, :metadata) | ||
|
||
validation | ||
|> change(changes) | ||
|> put_assoc(:metadata, %{metadata: metadata}) | ||
|> Repo.update!() | ||
|
||
:ok | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.