From c5b79a54f428d8a2887503268f4a679023d84e05 Mon Sep 17 00:00:00 2001 From: Stefanos Mousafeiris Date: Thu, 21 Nov 2024 16:20:09 +0200 Subject: [PATCH] feat: Add global stack event registry and block requests before ready (#2019) PR by @icehaunter and me - makes the `StackSupervisor` accept a stack event registry that it uses to dispatch status events about the state of the stack. This was preliminary work for multitenancy, and also fixes https://github.com/electric-sql/electric/issues/1922 since now we hold connections when the stack is not ready, and release them when we receive a "ready event" or time them out with a 503 - avoids crashing the ETS inspector which was trying to use a DB connection from an uninitialised pool. Integration test is broken from https://github.com/electric-sql/electric/pull/2009 --- .changeset/olive-moons-breathe.md | 5 ++ .../sync-service/lib/electric/application.ex | 7 +-- .../lib/electric/connection/manager.ex | 10 ++++ .../lib/electric/connection/supervisor.ex | 7 ++- .../lib/electric/plug/delete_shape_plug.ex | 3 + .../lib/electric/plug/serve_shape_plug.ex | 3 + .../sync-service/lib/electric/plug/utils.ex | 23 ++++++++ .../lib/electric/process_registry.ex | 7 +++ .../lib/electric/stack_supervisor.ex | 19 ++++++- .../electric/plug/delete_shape_plug_test.exs | 36 ++++++++++-- .../electric/plug/serve_shape_plug_test.exs | 47 ++++++++++++++- .../test/electric/plug/utils_test.exs | 57 +++++++++++++++++++ .../test/support/component_setup.ex | 24 ++++++-- packages/sync-service/test/test_helper.exs | 2 + 14 files changed, 230 insertions(+), 20 deletions(-) create mode 100644 .changeset/olive-moons-breathe.md diff --git a/.changeset/olive-moons-breathe.md b/.changeset/olive-moons-breathe.md new file mode 100644 index 0000000000..2bdc8ca2ff --- /dev/null +++ b/.changeset/olive-moons-breathe.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Add global stack events registry for receiving updates on the stack status diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 1d10d0c355..2539c5b043 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -26,6 +26,7 @@ defmodule Electric.Application do ] ++ Electric.StackSupervisor.build_shared_opts( stack_id: stack_id, + stack_events_registry: Registry.StackEvents, storage: Application.fetch_env!(:electric, :storage) ) @@ -49,12 +50,10 @@ defmodule Electric.Application do Enum.concat([ [ Electric.Telemetry, - # {Registry, - # name: @process_registry_name, keys: :unique, partitions: System.schedulers_online()}, - # {Registry, - # name: Registry.ShapeChanges, keys: :duplicate, partitions: System.schedulers_online()}, + {Registry, name: Registry.StackEvents, keys: :duplicate}, {Electric.StackSupervisor, stack_id: stack_id, + stack_events_registry: Registry.StackEvents, connection_opts: Application.fetch_env!(:electric, :connection_opts), persistent_kv: persistent_kv, replication_opts: [ diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index 613e81f322..4a4b57b3aa 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -59,6 +59,8 @@ defmodule Electric.Connection.Manager do :pg_timeline_id, # ID used for process labeling and sibling discovery :stack_id, + # Registry used for stack events + :stack_events_registry, :tweaks, awaiting_active: [], drop_slot_requested: false @@ -178,6 +180,7 @@ defmodule Electric.Connection.Manager do pg_lock_acquired: false, backoff: {:backoff.init(1000, 10_000), nil}, stack_id: Keyword.fetch!(opts, :stack_id), + stack_events_registry: Keyword.fetch!(opts, :stack_events_registry), tweaks: Keyword.fetch!(opts, :tweaks) } @@ -231,6 +234,12 @@ defmodule Electric.Connection.Manager do lock_name: Keyword.fetch!(state.replication_opts, :slot_name) ) do {:ok, lock_connection_pid} -> + Electric.StackSupervisor.dispatch_stack_event( + state.stack_events_registry, + state.stack_id, + :waiting_for_connection_lock + ) + Process.send_after(self(), :log_lock_connection_status, @lock_status_logging_interval) {:noreply, %{state | lock_connection_pid: lock_connection_pid}} @@ -290,6 +299,7 @@ defmodule Electric.Connection.Manager do Electric.Connection.Supervisor.start_shapes_supervisor( stack_id: state.stack_id, shape_cache_opts: shape_cache_opts, + stack_events_registry: state.stack_events_registry, tweaks: state.tweaks ) diff --git a/packages/sync-service/lib/electric/connection/supervisor.ex b/packages/sync-service/lib/electric/connection/supervisor.ex index 080e6adeeb..7daab7419a 100644 --- a/packages/sync-service/lib/electric/connection/supervisor.ex +++ b/packages/sync-service/lib/electric/connection/supervisor.ex @@ -55,8 +55,11 @@ defmodule Electric.Connection.Supervisor do ) with {:ok, pid} <- Supervisor.start_child(name(opts), child_spec) do - if notify_pid = get_in(opts, [:tweaks, :notify_pid]), - do: send(notify_pid, {:startup_progress, stack_id, :shape_supervisor_ready}) + Electric.StackSupervisor.dispatch_stack_event( + opts[:stack_events_registry], + stack_id, + :ready + ) {:ok, pid} end diff --git a/packages/sync-service/lib/electric/plug/delete_shape_plug.ex b/packages/sync-service/lib/electric/plug/delete_shape_plug.ex index 82192b554e..00b7f4f0b6 100644 --- a/packages/sync-service/lib/electric/plug/delete_shape_plug.ex +++ b/packages/sync-service/lib/electric/plug/delete_shape_plug.ex @@ -4,10 +4,13 @@ defmodule Electric.Plug.DeleteShapePlug do alias Electric.Shapes alias Electric.Plug.ServeShapePlug.Params + import Electric.Plug.Utils, only: [hold_conn_until_stack_ready: 2] plug :fetch_query_params plug :put_resp_content_type, "application/json" + plug :hold_conn_until_stack_ready + plug :allow_shape_deletion plug :validate_query_params diff --git a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex index e99a80dad7..2a1d97cde7 100644 --- a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex +++ b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex @@ -6,6 +6,7 @@ defmodule Electric.Plug.ServeShapePlug do import Plug.Conn, except: [halt: 1] alias Electric.Plug.Utils + import Electric.Plug.Utils, only: [hold_conn_until_stack_ready: 2] alias Electric.Shapes alias Electric.Schema alias Electric.Replication.LogOffset @@ -155,6 +156,8 @@ defmodule Electric.Plug.ServeShapePlug do # start_telemetry_span needs to always be the first plug after fetching query params. plug :start_telemetry_span plug :put_resp_content_type, "application/json" + plug :hold_conn_until_stack_ready + plug :validate_query_params plug :load_shape_info plug :put_schema_header diff --git a/packages/sync-service/lib/electric/plug/utils.ex b/packages/sync-service/lib/electric/plug/utils.ex index 2f76c21a9b..810d944805 100644 --- a/packages/sync-service/lib/electric/plug/utils.ex +++ b/packages/sync-service/lib/electric/plug/utils.ex @@ -134,6 +134,29 @@ defmodule Electric.Plug.Utils do end end + def hold_conn_until_stack_ready(conn, _opts) do + stack_id = conn.assigns.config[:stack_id] + stack_ready_timeout = Access.get(conn.assigns.config, :stack_ready_timeout, 5_000) + stack_events_registry = conn.assigns.config[:stack_events_registry] + + ref = make_ref() + Electric.StackSupervisor.subscribe_to_stack_events(stack_events_registry, stack_id, ref) + + if Electric.ProcessRegistry.alive?(stack_id, Electric.Replication.Supervisor) do + conn + else + receive do + {:stack_status, ^ref, :ready} -> + conn + after + stack_ready_timeout -> + conn + |> Plug.Conn.send_resp(503, Jason.encode!(%{message: "Stack not ready"})) + |> Plug.Conn.halt() + end + end + end + defmodule CORSHeaderPlug do @behaviour Plug import Plug.Conn diff --git a/packages/sync-service/lib/electric/process_registry.ex b/packages/sync-service/lib/electric/process_registry.ex index c57780193a..c8533fb98d 100644 --- a/packages/sync-service/lib/electric/process_registry.ex +++ b/packages/sync-service/lib/electric/process_registry.ex @@ -21,4 +21,11 @@ defmodule Electric.ProcessRegistry do def name(stack_id, key, sub_key \\ nil) when not is_nil(stack_id) do {:via, Registry, {registry_name(stack_id), {key, sub_key}}} end + + def alive?(stack_id, key, sub_key \\ nil) do + case GenServer.whereis(name(stack_id, key, sub_key)) do + nil -> false + _ -> true + end + end end diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index 1510b344b9..b8c669e29e 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -33,6 +33,7 @@ defmodule Electric.StackSupervisor do name: [type: :any, required: false], stack_id: [type: :string, required: true], persistent_kv: [type: :any, required: true], + stack_events_registry: [type: :atom, required: true], connection_opts: [ type: :keyword_list, required: true, @@ -75,8 +76,7 @@ defmodule Electric.StackSupervisor do "tweaks to the behaviour of parts of the supervision tree, used mostly for tests", default: [], keys: [ - registry_partitions: [type: :non_neg_integer, required: false], - notify_pid: [type: :pid, required: false] + registry_partitions: [type: :non_neg_integer, required: false] ] ] ) @@ -87,6 +87,18 @@ defmodule Electric.StackSupervisor do end end + def subscribe_to_stack_events(registry, stack_id, value) do + Registry.register(registry, {:stack_status, stack_id}, value) + end + + def dispatch_stack_event(registry, stack_id, event) do + Registry.dispatch(registry, {:stack_status, stack_id}, fn entries -> + for {pid, ref} <- entries do + send(pid, {:stack_status, ref, event}) + end + end) + end + def build_shared_opts(opts) do # needs validation opts = Map.new(opts) @@ -113,8 +125,10 @@ defmodule Electric.StackSupervisor do [ shape_cache: shape_cache, registry: shape_changes_registry_name, + stack_events_registry: opts[:stack_events_registry], storage: storage_mod_arg(opts), inspector: inspector, + stack_id: stack_id, get_service_status: fn -> Electric.ServiceStatus.check(stack_id) end ] end @@ -174,6 +188,7 @@ defmodule Electric.StackSupervisor do stack_id: stack_id, # Coming from the outside, need validation connection_opts: config.connection_opts, + stack_events_registry: config.stack_events_registry, replication_opts: [ transaction_received: diff --git a/packages/sync-service/test/electric/plug/delete_shape_plug_test.exs b/packages/sync-service/test/electric/plug/delete_shape_plug_test.exs index b4888c6901..828903a220 100644 --- a/packages/sync-service/test/electric/plug/delete_shape_plug_test.exs +++ b/packages/sync-service/test/electric/plug/delete_shape_plug_test.exs @@ -9,7 +9,6 @@ defmodule Electric.Plug.DeleteShapePlugTest do import Mox - setup :with_stack_id_from_test setup :verify_on_exit! @moduletag :capture_log @@ -34,11 +33,6 @@ defmodule Electric.Plug.DeleteShapePlugTest do def load_relation(tbl, _), do: Support.StubInspector.load_relation(tbl, nil) - setup do - start_link_supervised!({Registry, keys: :duplicate, name: @registry}) - :ok - end - def conn(_ctx, method, "?" <> _ = query_string) do Plug.Test.conn(method, "/" <> query_string) end @@ -46,6 +40,8 @@ defmodule Electric.Plug.DeleteShapePlugTest do def call_delete_shape_plug(conn, ctx, allow \\ true) do config = [ stack_id: ctx.stack_id, + stack_events_registry: Registry.StackEvents, + stack_ready_timeout: 100, pg_id: @test_pg_id, shape_cache: {Mock.ShapeCache, []}, storage: {Mock.Storage, []}, @@ -61,6 +57,19 @@ defmodule Electric.Plug.DeleteShapePlugTest do end describe "DeleteShapePlug" do + setup :with_stack_id_from_test + + setup ctx do + start_link_supervised!({Registry, keys: :duplicate, name: @registry}) + + {:via, _, {registry_name, registry_key}} = + Electric.Replication.Supervisor.name(ctx) + + {:ok, _} = Registry.register(registry_name, registry_key, nil) + + :ok + end + test "returns 404 if shape deletion is not allowed", ctx do conn = ctx @@ -114,4 +123,19 @@ defmodule Electric.Plug.DeleteShapePlugTest do assert conn.status == 202 end end + + describe "stack not ready" do + setup :with_stack_id_from_test + + test "returns 503", ctx do + conn = + ctx + |> conn(:delete, "?table=public.users") + |> call_delete_shape_plug(ctx) + + assert conn.status == 503 + + assert Jason.decode!(conn.resp_body) == %{"message" => "Stack not ready"} + end + end end diff --git a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs index 527bfd0252..a208e54831 100644 --- a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs +++ b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs @@ -62,6 +62,8 @@ defmodule Electric.Plug.ServeShapePlugTest do config = [ stack_id: ctx.stack_id, pg_id: @test_pg_id, + stack_events_registry: Registry.StackEvents, + stack_ready_timeout: Access.get(ctx, :stack_ready_timeout, 100), shape_cache: {Mock.ShapeCache, []}, storage: {Mock.Storage, []}, inspector: {__MODULE__, []}, @@ -77,6 +79,14 @@ defmodule Electric.Plug.ServeShapePlugTest do describe "serving shape" do setup :with_stack_id_from_test + setup ctx do + {:via, _, {registry_name, registry_key}} = + Electric.Replication.Supervisor.name(ctx) + + {:ok, _} = Registry.register(registry_name, registry_key, nil) + :ok + end + test "returns 400 for invalid table", ctx do conn = ctx @@ -96,7 +106,7 @@ defmodule Electric.Plug.ServeShapePlugTest do conn = ctx |> conn(:get, %{"table" => "foo"}, "?offset=invalid") - |> ServeShapePlug.call([]) + |> call_serve_shape_plug(ctx) assert conn.status == 400 @@ -107,7 +117,7 @@ defmodule Electric.Plug.ServeShapePlugTest do conn = ctx |> conn(:get, %{}, "?offset=-1") - |> ServeShapePlug.call([]) + |> call_serve_shape_plug(ctx) assert conn.status == 400 @@ -688,4 +698,37 @@ defmodule Electric.Plug.ServeShapePlugTest do assert Plug.Conn.get_resp_header(conn, "electric-handle") == [test_shape_handle] end end + + describe "stack not ready" do + setup :with_stack_id_from_test + + test "returns 503", ctx do + conn = + ctx + |> conn(:get, %{"table" => "public.users"}, "?offset=-1&replica=full") + |> call_serve_shape_plug(ctx) + + assert conn.status == 503 + + assert Jason.decode!(conn.resp_body) == %{"message" => "Stack not ready"} + end + + @tag stack_ready_timeout: 1000 + test "waits until stack ready and proceeds", ctx do + conn_task = + Task.async(fn -> + ctx + |> conn(:get, %{"table" => "public.users", "columns" => "id,invalid"}, "?offset=-1") + |> call_serve_shape_plug(ctx) + end) + + Process.sleep(50) + + Electric.StackSupervisor.dispatch_stack_event(Registry.StackEvents, ctx.stack_id, :ready) + + conn = Task.await(conn_task) + + assert conn.status == 400 + end + end end diff --git a/packages/sync-service/test/electric/plug/utils_test.exs b/packages/sync-service/test/electric/plug/utils_test.exs index e1bc2186da..204375e514 100644 --- a/packages/sync-service/test/electric/plug/utils_test.exs +++ b/packages/sync-service/test/electric/plug/utils_test.exs @@ -1,5 +1,6 @@ defmodule Electric.Plug.UtilsTest do alias Electric.Plug.Utils + import Support.ComponentSetup use ExUnit.Case, async: true doctest Utils, import: true @@ -50,4 +51,60 @@ defmodule Electric.Plug.UtilsTest do ) != expected_interval end end + + describe "hold_conn_until_stack_ready/2" do + setup :with_stack_id_from_test + + test "immediately releases connection if stack ready", ctx do + {:via, _, {registry_name, registry_key}} = + Electric.Replication.Supervisor.name(ctx) + + {:ok, _} = Registry.register(registry_name, registry_key, nil) + + conn = + Plug.Test.conn(:get, "/") + |> Plug.Conn.assign(:config, + stack_id: ctx.stack_id, + stack_events_registry: Registry.StackEvents, + stack_ready_timeout: 100 + ) + + conn = Electric.Plug.Utils.hold_conn_until_stack_ready(conn, []) + refute conn.halted + end + + test "returns 503 when stack is not ready", ctx do + conn = + Plug.Test.conn(:get, "/") + |> Plug.Conn.assign(:config, + stack_id: ctx.stack_id, + stack_events_registry: Registry.StackEvents, + stack_ready_timeout: 100 + ) + + conn = Electric.Plug.Utils.hold_conn_until_stack_ready(conn, []) + assert conn.status == 503 + assert conn.halted + end + + test "should release connection after stack ready", ctx do + conn_task = + Task.async(fn -> + Plug.Test.conn(:get, "/") + |> Plug.Conn.assign(:config, + stack_id: ctx.stack_id, + stack_events_registry: Registry.StackEvents, + stack_ready_timeout: 1000 + ) + |> Electric.Plug.Utils.hold_conn_until_stack_ready([]) + end) + + Process.sleep(50) + + Electric.StackSupervisor.dispatch_stack_event(Registry.StackEvents, ctx.stack_id, :ready) + + conn = Task.await(conn_task) + refute conn.halted + end + end end diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 37ae3c2962..f394af0527 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -167,10 +167,16 @@ defmodule Support.ComponentSetup do storage = {FileStorage, stack_id: stack_id, storage_dir: ctx.tmp_dir} + stack_events_registry = Registry.StackEvents + + ref = make_ref() + Electric.StackSupervisor.subscribe_to_stack_events(stack_events_registry, stack_id, ref) + stack_supervisor = start_supervised!( {Electric.StackSupervisor, stack_id: stack_id, + stack_events_registry: stack_events_registry, persistent_kv: kv, storage: storage, connection_opts: ctx.db_config, @@ -185,15 +191,21 @@ defmodule Support.ComponentSetup do max_restarts: 0, pool_size: 2 ], - tweaks: [notify_pid: self(), registry_partitions: 1]}, + tweaks: [registry_partitions: 1]}, restart: :temporary ) - assert_receive {:startup_progress, ^stack_id, :shape_supervisor_ready} + assert_receive {:stack_status, ^ref, :ready} # Process.sleep(100) - %{stack_id: stack_id, persistent_kv: kv, stack_supervisor: stack_supervisor, storage: storage} + %{ + stack_id: stack_id, + stack_events_registry: stack_events_registry, + persistent_kv: kv, + stack_supervisor: stack_supervisor, + storage: storage + } end def build_router_opts(ctx, overrides \\ []) do @@ -204,7 +216,11 @@ defmodule Support.ComponentSetup do allow_shape_deletion: true ] |> Keyword.merge( - Electric.StackSupervisor.build_shared_opts(stack_id: ctx.stack_id, storage: ctx.storage) + Electric.StackSupervisor.build_shared_opts( + stack_id: ctx.stack_id, + stack_events_registry: ctx.stack_events_registry, + storage: ctx.storage + ) ) |> Keyword.merge(overrides) end diff --git a/packages/sync-service/test/test_helper.exs b/packages/sync-service/test/test_helper.exs index d3902d59a4..90f4bffe52 100644 --- a/packages/sync-service/test/test_helper.exs +++ b/packages/sync-service/test/test_helper.exs @@ -4,4 +4,6 @@ # supervision tree in the test environment. # Registry.start_link(name: Electric.Application.process_registry(), keys: :unique) +Registry.start_link(name: Registry.StackEvents, keys: :duplicate) + ExUnit.start(assert_receive_timeout: 400)