From c82e6c01d53be72947484cd91e1dcc47cd0098cb Mon Sep 17 00:00:00 2001 From: Ismael Bejarano Date: Wed, 30 Oct 2024 15:21:16 -0300 Subject: [PATCH] Pause channel (#2370) - Add :paused field to channel model - Add endpoints to pause and unpause a channel - Update ChannelStatusServer to skip paused channels ---- * Refactor checking channel status changes * Add unit test for the :paused status * Add paused field to Channel * Make ChannelStatusServer.poll callable from production * Force a polling to ensure the pause/unpause is processed * Paused channels should appear in the down channels list * Fix CI failure * Apply fixes from PR --- lib/ask/channel.ex | 41 +++++-- lib/ask/runtime/channel_status_server.ex | 112 ++++++++++-------- lib/ask/runtime/survey_broker.ex | 6 +- lib/ask/survey.ex | 5 +- lib/ask_web/controllers/channel_controller.ex | 36 +++++- lib/ask_web/router.ex | 6 +- .../20241007192540_pause_channel.exs | 9 ++ priv/repo/structure.sql | 4 +- .../runtime/channel_status_server_test.exs | 30 +++++ .../controllers/channel_controller_test.exs | 36 ++++++ .../controllers/survey_controller_test.exs | 20 +++- test/support/test_channel.ex | 13 +- 12 files changed, 243 insertions(+), 75 deletions(-) create mode 100644 priv/repo/migrations/20241007192540_pause_channel.exs diff --git a/lib/ask/channel.ex b/lib/ask/channel.ex index 1e5f81a39..335f6cc77 100644 --- a/lib/ask/channel.ex +++ b/lib/ask/channel.ex @@ -16,6 +16,8 @@ defmodule Ask.Channel do field :settings, :map field :patterns, Ask.Ecto.Type.JSON, default: [] field :status, Ask.Ecto.Type.JSON, virtual: true + field :paused, :boolean, default: false + belongs_to :user, Ask.User has_many :respondent_group_channels, Ask.RespondentGroupChannel, on_delete: :delete_all many_to_many :projects, Ask.Project, join_through: Ask.ProjectChannel, on_replace: :delete @@ -36,7 +38,7 @@ defmodule Ask.Channel do """ def changeset(struct, params \\ %{}) do struct - |> cast(params, [:name, :type, :provider, :base_url, :settings, :user_id, :patterns]) + |> cast(params, [:name, :type, :provider, :base_url, :settings, :user_id, :patterns, :paused]) |> validate_required([:name, :type, :provider, :settings, :user_id]) |> validate_patterns |> assoc_constraint(:user) @@ -84,18 +86,39 @@ defmodule Ask.Channel do end def with_status(channel) do - status = channel.id |> ChannelStatusServer.get_channel_status() - - status = - case status do - :up -> %{status: "up"} - :unknown -> %{status: "unknown"} - down_or_error -> down_or_error - end + status = channel |> get_status() %{channel | status: status} end + def get_status(%{paused: true}) do + %{status: "paused"} + end + + def get_status(channel) do + channel.id + |> ChannelStatusServer.get_channel_status() + |> case do + :up -> %{status: "up"} + :unknown -> %{status: "unknown"} + down_or_error -> down_or_error + end + end + + def is_paused?(channel) do + channel.paused + end + + def is_down?(channel) do + channel + |> get_status() + |> case do + %{status: "up"} -> false + %{status: "unknown"} -> false + _ -> true + end + end + defp validate_patterns(changeset) do changeset |> validate_patterns_not_empty diff --git a/lib/ask/runtime/channel_status_server.ex b/lib/ask/runtime/channel_status_server.ex index ed39052dd..895c3b4ed 100644 --- a/lib/ask/runtime/channel_status_server.ex +++ b/lib/ask/runtime/channel_status_server.ex @@ -21,7 +21,7 @@ defmodule Ask.Runtime.ChannelStatusServer do end def poll(pid) do - send(pid, :poll) + send(pid, :poll_once) end def get_channel_status(channel_id) do @@ -40,57 +40,23 @@ defmodule Ask.Runtime.ChannelStatusServer do log_info("polling") try do - Survey.running_channels() - |> Repo.preload(:user) - |> Enum.each(fn c -> - previous_status = get_status_from_state(c.id, state) + poll_channels(state) - spawn(fn -> - status = ChannelBroker.check_status(c.id) - timestamp = Timex.now() + {:noreply, state} + after + :timer.send_after(@poll_interval, :poll) + end + end - case status do - {:down, messages} -> - case previous_status do - %{status: :down} -> - nil - - _ -> - AskWeb.Email.channel_down(c.user.email, c, messages) |> Ask.Mailer.deliver() - - update_channel_status(c.id, %{ - status: :down, - messages: messages, - name: c.name, - timestamp: timestamp - }) - end - - {:error, code} -> - case previous_status do - %{status: :error} -> - nil - - _ -> - AskWeb.Email.channel_error(c.user.email, c, code) |> Ask.Mailer.deliver() - - update_channel_status(c.id, %{ - status: :error, - code: code, - name: c.name, - timestamp: timestamp - }) - end - - status -> - update_channel_status(c.id, status) - end - end) - end) + def handle_info(:poll_once, state) do + log_info("poll forced") + + try do + poll_channels(state) {:noreply, state} after - :timer.send_after(@poll_interval, :poll) + nil end end @@ -105,4 +71,56 @@ defmodule Ask.Runtime.ChannelStatusServer do def log_info(message) do Logger.info("ChannelStatusServer: #{message}") end + + defp poll_channels(state) do + Survey.running_channels() + |> Repo.preload(:user) + |> Enum.each(fn c -> + + unless c.paused do + previous_status = get_status_from_state(c.id, state) + + spawn(fn -> + status = ChannelBroker.check_status(c.id) + timestamp = Timex.now() + + process_channel_status_change(status, previous_status, timestamp, c) + end) + end + end) + end + + defp process_channel_status_change({:down, _messages}, %{status: :down}, _timestamp, _channel) do + nil + end + + defp process_channel_status_change({:down, messages}, _previous_status, timestamp, channel) do + AskWeb.Email.channel_down(channel.user.email, channel, messages) |> Ask.Mailer.deliver() + + update_channel_status(channel.id, %{ + status: :down, + messages: messages, + name: channel.name, + timestamp: timestamp + }) + end + + defp process_channel_status_change({:error, _code}, %{status: :error}, _timestamp, _channel) do + nil + end + + defp process_channel_status_change({:error, code}, _previous_status, timestamp, channel) do + AskWeb.Email.channel_error(channel.user.email, channel, code) |> Ask.Mailer.deliver() + + update_channel_status(channel.id, %{ + status: :error, + code: code, + name: channel.name, + timestamp: timestamp + }) + end + + defp process_channel_status_change(status, _previous_status, _timestamp, channel) do + update_channel_status(channel.id, status) + end end diff --git a/lib/ask/runtime/survey_broker.ex b/lib/ask/runtime/survey_broker.ex index 5673786cd..6436bdbd3 100644 --- a/lib/ask/runtime/survey_broker.ex +++ b/lib/ask/runtime/survey_broker.ex @@ -4,6 +4,7 @@ defmodule Ask.Runtime.SurveyBroker do import Ecto alias Ask.{ + Channel, Repo, Logger, Survey, @@ -131,10 +132,7 @@ defmodule Ask.Runtime.SurveyBroker do channel_is_down? = channels - |> Enum.any?(fn c -> - status = c.id |> ChannelStatusServer.get_channel_status() - status != :up && status != :unknown - end) + |> Enum.any?(&(&1 |> Channel.is_paused?() || &1 |> Channel.is_down?())) poll_survey(survey, now, channel_is_down?) end diff --git a/lib/ask/survey.ex b/lib/ask/survey.ex index be234af6b..bdf2101bf 100644 --- a/lib/ask/survey.ex +++ b/lib/ask/survey.ex @@ -23,7 +23,6 @@ defmodule Ask.Survey do PanelSurvey } - alias Ask.Runtime.ChannelStatusServer alias Ask.Ecto.Type.JSON alias Ecto.Multi @@ -525,8 +524,8 @@ defmodule Ask.Survey do down_channels = channels - |> Enum.map(&(&1.id |> ChannelStatusServer.get_channel_status())) - |> Enum.filter(&(&1 != :up && &1 != :unknown)) + |> Enum.map(&(&1 |> Channel.get_status())) + |> Enum.filter(&(&1[:status] != "up" && &1[:status] != "unknown")) %{survey | down_channels: down_channels} end diff --git a/lib/ask_web/controllers/channel_controller.ex b/lib/ask_web/controllers/channel_controller.ex index 490d6c8cd..cff426e87 100644 --- a/lib/ask_web/controllers/channel_controller.ex +++ b/lib/ask_web/controllers/channel_controller.ex @@ -2,7 +2,7 @@ defmodule AskWeb.ChannelController do use AskWeb, :api_controller alias Ask.{Channel, Project, Logger} - alias Ask.Runtime.ChannelBroker + alias Ask.Runtime.{ChannelBroker, ChannelStatusServer} def index(conn, %{"project_id" => project_id}) do channels = @@ -106,4 +106,38 @@ defmodule AskWeb.ChannelController do render(conn, "show.json", channel: channel |> Repo.preload([:projects, :user])) end + + def pause(conn, %{"channel_id" => id}) do + pause_channel(conn, id, true) + end + + def unpause(conn, %{"channel_id" => id}) do + pause_channel(conn, id, false) + end + + defp pause_channel(conn, id, paused) do + channel_params = %{"paused" => paused} + + Channel + |> Repo.get!(id) + |> authorize_channel(conn) + |> Repo.preload([:projects, :user]) + |> Channel.changeset(channel_params) + |> Repo.update() + |> case do + {:ok, channel} -> + ChannelBroker.on_channel_settings_change(channel.id, channel.settings) + ChannelStatusServer.poll(ChannelStatusServer.server_ref()) + + render(conn, "show.json", channel: channel |> Repo.preload(:projects)) + + {:error, changeset} -> + Logger.warn("Error when pausing channel: #{id}") + + conn + |> put_status(:unprocessable_entity) + |> put_view(AskWeb.ChangesetView) + |> render("error.json", changeset: changeset) + end + end end diff --git a/lib/ask_web/router.ex b/lib/ask_web/router.ex index ddf5dd266..c41f3c49a 100644 --- a/lib/ask_web/router.ex +++ b/lib/ask_web/router.ex @@ -171,7 +171,11 @@ defmodule AskWeb.Router do as: :update_archived_status end - resources "/channels", ChannelController, except: [:new, :edit] + resources "/channels", ChannelController, except: [:new, :edit] do + post "/pause", ChannelController, :pause, as: :pause + post "/unpause", ChannelController, :unpause, as: :unpause + end + get "/audios/tts", AudioController, :tts resources "/audios", AudioController, only: [:create, :show] resources "/authorizations", OAuthClientController, only: [:index, :delete] diff --git a/priv/repo/migrations/20241007192540_pause_channel.exs b/priv/repo/migrations/20241007192540_pause_channel.exs new file mode 100644 index 000000000..3d08b1c4a --- /dev/null +++ b/priv/repo/migrations/20241007192540_pause_channel.exs @@ -0,0 +1,9 @@ +defmodule Ask.Repo.Migrations.PauseChannel do + use Ecto.Migration + + def change do + alter table(:channels) do + add :paused, :boolean, default: false + end + end +end diff --git a/priv/repo/structure.sql b/priv/repo/structure.sql index f3172a38f..1aa2e07c7 100644 --- a/priv/repo/structure.sql +++ b/priv/repo/structure.sql @@ -105,6 +105,7 @@ CREATE TABLE `channels` ( `updated_at` datetime NOT NULL, `base_url` varchar(255) DEFAULT NULL, `patterns` text, + `paused` tinyint(1) DEFAULT '0', PRIMARY KEY (`id`), UNIQUE KEY `id` (`id`), KEY `channels_user_id_index` (`user_id`), @@ -1018,7 +1019,7 @@ CREATE TABLE `users` ( /*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */; /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */; --- Dump completed on 2024-04-22 22:06:45 +-- Dump completed on 2024-10-07 19:43:26 INSERT INTO `schema_migrations` (version) VALUES (20160812145257); INSERT INTO `schema_migrations` (version) VALUES (20160816183915); INSERT INTO `schema_migrations` (version) VALUES (20160830200454); @@ -1238,3 +1239,4 @@ INSERT INTO `schema_migrations` (version) VALUES (20230405111657); INSERT INTO `schema_migrations` (version) VALUES (20230413101342); INSERT INTO `schema_migrations` (version) VALUES (20230821100203); INSERT INTO `schema_migrations` (version) VALUES (20240422175453); +INSERT INTO `schema_migrations` (version) VALUES (20241007192540); diff --git a/test/ask/runtime/channel_status_server_test.exs b/test/ask/runtime/channel_status_server_test.exs index a6068dcbf..c7990f549 100644 --- a/test/ask/runtime/channel_status_server_test.exs +++ b/test/ask/runtime/channel_status_server_test.exs @@ -144,4 +144,34 @@ defmodule Ask.Runtime.ChannelStatusServerTest do ChannelStatusServer.poll(pid) refute_receive [:email, ^email] end + + test "doesn't send email when a channel is down but status was previously :paused" do + {:ok, pid} = ChannelStatusServer.start_link() + Process.register(self(), :mail_target) + user = insert(:user) + survey = insert(:survey, state: :running) + + channel = + TestChannel.create_channel(user, "test", TestChannel.settings(TestChannel.new(), 1, :down), %{paused: true}) + + setup_surveys_with_channels([survey], [channel]) + + ChannelStatusServer.poll(pid) + refute_receive [:email, _] + end + + test "doesn't send email when :error is received but status was previously :paused" do + {:ok, pid} = ChannelStatusServer.start_link() + Process.register(self(), :mail_target) + user = insert(:user) + survey = insert(:survey, state: :running) + + channel = + TestChannel.create_channel(user, "test", TestChannel.settings(TestChannel.new(), 1, :error), %{paused: true}) + + setup_surveys_with_channels([survey], [channel]) + + ChannelStatusServer.poll(pid) + refute_receive [:email, _] + end end diff --git a/test/ask_web/controllers/channel_controller_test.exs b/test/ask_web/controllers/channel_controller_test.exs index a7a05323c..7c0dae8ca 100644 --- a/test/ask_web/controllers/channel_controller_test.exs +++ b/test/ask_web/controllers/channel_controller_test.exs @@ -218,4 +218,40 @@ defmodule AskWeb.ChannelControllerTest do end end end + + describe "pause" do + setup %{conn: conn, user: user} do + settings = Ask.TestChannel.settings(Ask.TestChannel.new) + channel = insert(:channel, user: user, type: "sms", settings: settings) + {:ok, conn: conn, user: user, channel: channel} + end + + test "pause channel", %{conn: conn, user: user, channel: channel} do + conn = + post conn, channel_pause_path(conn, :pause, channel.id) + + assert json_response(conn, 200) + + channel = + user + |> assoc(:channels) + |> Repo.one!() + + assert %{paused: true} = channel + end + + test "unpause channel", %{conn: conn, user: user, channel: channel} do + conn = + post conn, channel_unpause_path(conn, :unpause, channel.id) + + assert json_response(conn, 200) + + channel = + user + |> assoc(:channels) + |> Repo.one!() + + assert %{paused: false} = channel + end + end end diff --git a/test/ask_web/controllers/survey_controller_test.exs b/test/ask_web/controllers/survey_controller_test.exs index 4983dd727..7ab5fd4af 100644 --- a/test/ask_web/controllers/survey_controller_test.exs +++ b/test/ask_web/controllers/survey_controller_test.exs @@ -319,6 +319,7 @@ defmodule AskWeb.SurveyControllerTest do survey_1 = insert(:survey, project: project, state: :running) survey_2 = insert(:survey, project: project, state: :running) survey_3 = insert(:survey, project: project, state: :running) + survey_4 = insert(:survey, project: project, state: :running) up_channel = TestChannel.create_channel(user, "test", TestChannel.settings(TestChannel.new(), 1)) @@ -337,10 +338,19 @@ defmodule AskWeb.SurveyControllerTest do TestChannel.settings(TestChannel.new(), 3, :error) ) - setup_surveys_with_channels([survey_1, survey_2, survey_3], [ + paused_channel = + TestChannel.create_channel( + user, + "test", + TestChannel.settings(TestChannel.new(), 3, :error), + %{paused: true} + ) + + setup_surveys_with_channels([survey_1, survey_2, survey_3, survey_4], [ up_channel, down_channel, - error_channel + error_channel, + paused_channel, ]) ChannelStatusServer.poll(pid) @@ -350,7 +360,7 @@ defmodule AskWeb.SurveyControllerTest do conn = get(conn, project_survey_path(conn, :index, project.id)) - [survey_1, survey_2, survey_3] = json_response(conn, 200)["data"] + [survey_1, survey_2, survey_3, survey_4] = json_response(conn, 200)["data"] assert survey_1["down_channels"] == [] [%{"status" => "down", "messages" => [], "timestamp" => t1, "name" => "test"}] = @@ -364,6 +374,8 @@ defmodule AskWeb.SurveyControllerTest do assert t2 assert code + assert [%{"status" => "paused"}] = survey_4["down_channels"] + ChannelBrokerSupervisor.terminate_children() ChannelBrokerAgent.clear() end @@ -2774,7 +2786,7 @@ defmodule AskWeb.SurveyControllerTest do end test "stops respondents only for the stopped survey", %{conn: conn, user: user} do - start_survey_canceller_supervisor() + start_survey_canceller_supervisor() project = create_project_for_user(user) questionnaire = insert(:questionnaire, name: "test", project: project) survey = insert(:survey, project: project, state: :running) diff --git a/test/support/test_channel.ex b/test/support/test_channel.ex index 6b15ad7b8..ab58f13c8 100644 --- a/test/support/test_channel.ex +++ b/test/support/test_channel.ex @@ -74,16 +74,19 @@ defmodule Ask.TestChannel do |> Ask.Repo.insert!() end - def create_channel(user, base_url, api_channel) do - user - |> Ecto.build_assoc(:channels) - |> Ask.Channel.changeset(%{ + def create_channel(user, base_url, api_channel, extra_params \\ %{}) do + changeset_params = %{ name: "test", provider: "test", base_url: base_url, type: "ivr", settings: api_channel - }) + } + changeset_params = Map.merge(changeset_params, extra_params) + + user + |> Ecto.build_assoc(:channels) + |> Ask.Channel.changeset(changeset_params) |> Ask.Repo.insert!() end