Skip to content

Commit

Permalink
chore(sync-service): Set process labels (#2012)
Browse files Browse the repository at this point in the history
  • Loading branch information
robacourt authored Nov 21, 2024
1 parent cf55ae3 commit 68540c8
Show file tree
Hide file tree
Showing 11 changed files with 21 additions and 5 deletions.
2 changes: 2 additions & 0 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ defmodule Electric.Connection.Manager do
# implement our custom error handling logic.
Process.flag(:trap_exit, true)

Process.set_label({:connection_manager, opts[:stack_id]})

connection_opts =
opts
|> Keyword.fetch!(:connection_opts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ defmodule Electric.Connection.Supervisor do
end

def init(opts) do
Process.set_label({:connection_supervisor, opts[:stack_id]})
Supervisor.init([{Electric.Connection.Manager, opts}], strategy: :rest_for_one)
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ defmodule Electric.Postgres.Inspector.EtsInspector do
# when the parent process sends an exit signal
Process.flag(:trap_exit, true)

Process.set_label({:ets_inspector, opts.stack_id})

# Name needs to be an atom but we don't want to dynamically create atoms.
# Instead, we will use the reference to the table that is returned by `:ets.new`
pg_info_table = :ets.new(opts.pg_info_table, [:named_table, :public, :set])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ defmodule Electric.Postgres.ReplicationClient do
# TODO(alco): this needs additional info about :noreply and :query return tuples.
@impl true
def init(replication_opts) do
Process.set_label(__MODULE__)
Process.set_label(:replication_client)

{:ok, State.new(replication_opts)}
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ defmodule Electric.Replication.ShapeLogCollector do
end

def init(opts) do
Process.set_label({:shape_log_collector, opts.stack_id})
state = Map.merge(opts, %{producer: nil, subscriptions: {0, MapSet.new()}})
# start in demand: :accumulate mode so that the ShapeCache is able to start
# all active consumers before we start sending transactions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ defmodule Electric.Replication.Supervisor do

@impl Supervisor
def init(opts) do
Process.set_label({:replication_supervisor, opts[:stack_id]})
Logger.info("Starting shape replication pipeline")

# TODO: weird to have these without defaults but `consumer_supervisor` with a default
Expand Down
2 changes: 2 additions & 0 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ defmodule Electric.Shapes.Consumer do
end

def init(config) do
Process.set_label({:consumer, config.shape_handle})

%{log_producer: producer, storage: storage, shape_status: {shape_status, shape_status_state}} =
config

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ defmodule Electric.Shapes.Consumer.Snapshotter do
end

def init(%{stack_id: stack_id} = config) do
Process.set_label({:snapshotter, stack_id})
Process.set_label({:snapshotter, config.shape_handle})
Logger.metadata(stack_id: stack_id)

{:ok, config, {:continue, :start_snapshot}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ defmodule Electric.Shapes.ConsumerSupervisor do
%{shape_handle: shape_handle, storage: {_, _} = storage} =
config

Process.set_label({:consumer_supervisor, shape_handle})

shape_storage = Electric.ShapeCache.Storage.for_shape(shape_handle, storage)

shape_config = %{config | storage: shape_storage}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ defmodule Electric.Shapes.DynamicConsumerSupervisor do
end

def start_link(opts) do
DynamicSupervisor.start_link(__MODULE__, [],
name: Keyword.get(opts, :name, name(Keyword.fetch!(opts, :stack_id)))
stack_id = Keyword.fetch!(opts, :stack_id)

DynamicSupervisor.start_link(__MODULE__, [stack_id: stack_id],
name: Keyword.get(opts, :name, name(stack_id))
)
end

Expand Down Expand Up @@ -49,7 +51,8 @@ defmodule Electric.Shapes.DynamicConsumerSupervisor do
end

@impl true
def init(_opts) do
def init(stack_id: stack_id) do
Process.set_label({:dynamic_consumer_supervisor, stack_id})
Logger.debug(fn -> "Starting #{__MODULE__}" end)
DynamicSupervisor.init(strategy: :one_for_one)
end
Expand Down
2 changes: 2 additions & 0 deletions packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ defmodule Electric.StackSupervisor do

@impl true
def init(%{stack_id: stack_id} = config) do
Process.set_label({:stack_supervisor, stack_id})

inspector =
Access.get(
config,
Expand Down

0 comments on commit 68540c8

Please sign in to comment.