Skip to content

Commit

Permalink
chore (sync service): report replication lag (#2043)
Browse files Browse the repository at this point in the history
Fixes #2031.

- exports the replication lag in bytes as a metric to Prometheus
- also creates a span including the replication lag in milliseconds for
every transaction

### Note on clock drift

The replication lag in milliseconds may be affected by clock drift
between Electric and Postgres. This may occur because Electric and
Postgres may be running on different machines and we compare the
transaction's commit timestamp (generated by PG) to Electric's timestamp
at the time of writing the transaction to the shape log.
  • Loading branch information
kevin-dp authored Nov 26, 2024
1 parent af0c0bf commit 4e50204
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 17 deletions.
5 changes: 5 additions & 0 deletions .changeset/khaki-meals-accept.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Telemetry for reporting replication lag.
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ defmodule Electric.Application do
children =
Enum.concat([
[
Electric.Telemetry,
{Registry, name: Registry.StackEvents, keys: :duplicate},
{Electric.StackSupervisor,
stack_id: stack_id,
Expand All @@ -64,6 +63,7 @@ defmodule Electric.Application do
pool_opts: [pool_size: Application.fetch_env!(:electric, :db_pool_size)],
storage: Application.fetch_env!(:electric, :storage),
chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold)},
{Electric.Telemetry, stack_id: stack_id},
{Bandit,
plug: {Electric.Plug.Router, router_opts},
port: Application.fetch_env!(:electric, :service_port),
Expand Down
38 changes: 36 additions & 2 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ defmodule Electric.Connection.Manager do
:stack_events_registry,
:tweaks,
awaiting_active: [],
drop_slot_requested: false
drop_slot_requested: false,
monitoring_started?: false
]
end

Expand Down Expand Up @@ -145,6 +146,10 @@ defmodule Electric.Connection.Manager do
GenServer.cast(server, {:pg_info_looked_up, pg_info})
end

def query_replication_lag(server) do
GenServer.call(server, :query_replication_lag)
end

@impl true
def init(opts) do
# Because child processes are started via `start_link()` functions and due to how Postgrex
Expand Down Expand Up @@ -226,6 +231,11 @@ defmodule Electric.Connection.Manager do
{:reply, :ok, %{state | drop_slot_requested: true}}
end

def handle_call(:query_replication_lag, _from, state) do
report_replication_lag(state)
{:reply, :ok, state}
end

@impl true
def handle_continue(:start_lock_connection, %State{lock_connection_pid: nil} = state) do
case Electric.Postgres.LockConnection.start_link(
Expand Down Expand Up @@ -311,7 +321,12 @@ defmodule Electric.Connection.Manager do
log_collector_pid = lookup_log_collector_pid(shapes_sup_pid)
Process.monitor(log_collector_pid)

state = %{state | pool_pid: pool_pid, shape_log_collector_pid: log_collector_pid}
state = %{
state
| pool_pid: pool_pid,
shape_log_collector_pid: log_collector_pid,
monitoring_started?: true
}

for awaiting <- state.awaiting_active do
GenServer.reply(awaiting, :ok)
Expand Down Expand Up @@ -634,4 +649,23 @@ defmodule Electric.Connection.Manager do
Logger.error("Failed to execute query: #{query}\nError: #{inspect(error)}")
end
end

defp report_replication_lag(%{monitoring_started?: false}), do: :ok

defp report_replication_lag(%{pool_pid: pool} = state) do
slot_name = Keyword.fetch!(state.replication_opts, :slot_name)

query =
"SELECT pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS replication_lag_size FROM pg_replication_slots WHERE slot_name = $1;"

case Postgrex.query(pool, query, [slot_name]) do
{:ok, %Postgrex.Result{rows: [[lag]]}} ->
:telemetry.execute([:electric, :postgres, :replication], %{lag: lag})

{:error, error} ->
Logger.warning("Failed to query replication lag\nError: #{inspect(error)}")
end

:ok
end
end
19 changes: 19 additions & 0 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ defmodule Electric.Shapes.Consumer do

notify_listeners(registry, :new_changes, shape_handle, last_log_offset)

report_replication_lag(txn)

{:cont, notify(txn, %{state | log_state: new_log_state})}

true ->
Expand Down Expand Up @@ -423,4 +425,21 @@ defmodule Electric.Shapes.Consumer do
"shape.where": shape.where
]
end

defp report_replication_lag(%Transaction{commit_timestamp: commit_timestamp}) do
# Compute time elapsed since commit
# since we are comparing PG's clock with our own
# there may be a slight skew so we make sure not to report negative lag.
# Since the lag is only useful when it becomes significant, a slight skew doesn't matter.
now = DateTime.utc_now()
lag = Kernel.max(0, DateTime.diff(now, commit_timestamp, :millisecond))

OpenTelemetry.with_span(
"shape_write.consumer.do_handle_txn.report_replication_lag",
[lag: lag],
fn ->
lag
end
)
end
end
13 changes: 8 additions & 5 deletions packages/sync-service/lib/electric/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ defmodule Electric.Telemetry do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end

def init(_) do
def init(stack_id: stack_id) do
children = [
{:telemetry_poller, measurements: periodic_measurements(), period: 2_000}
{:telemetry_poller, measurements: periodic_measurements(stack_id), period: 2_000}
]

children
Expand Down Expand Up @@ -72,7 +72,8 @@ defmodule Electric.Telemetry do
last_value("vm.memory.ets", unit: :byte),
last_value("vm.total_run_queue_lengths.total"),
last_value("vm.total_run_queue_lengths.cpu"),
last_value("vm.total_run_queue_lengths.io")
last_value("vm.total_run_queue_lengths.io"),
last_value("electric.postgres.replication.lag", unit: :byte)
# distribution("plug.router_dispatch.stop.duration",
# tags: [:route],
# unit: {:native, :millisecond}
Expand All @@ -88,10 +89,12 @@ defmodule Electric.Telemetry do
]
end

defp periodic_measurements do
defp periodic_measurements(stack_id) do
[
# A module, function and arguments to be invoked periodically.
{__MODULE__, :uptime_event, []}
{__MODULE__, :uptime_event, []},
{Electric.Connection.Manager, :query_replication_lag,
[Electric.Connection.Manager.name(stack_id)]}
]
end

Expand Down
3 changes: 2 additions & 1 deletion packages/sync-service/test/electric/shape_cache_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,8 @@ defmodule Electric.ShapeCacheTest do
xid: @xid,
last_log_offset: @change_offset,
lsn: @lsn,
affected_relations: MapSet.new([{"public", "items"}])
affected_relations: MapSet.new([{"public", "items"}]),
commit_timestamp: DateTime.utc_now()
},
context.shape_log_collector
)
Expand Down
56 changes: 48 additions & 8 deletions packages/sync-service/test/electric/shapes/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,12 @@ defmodule Electric.Shapes.ConsumerTest do
Registry.register(ctx.registry, @shape_handle1, ref)

txn =
%Transaction{xid: xmin, lsn: lsn, last_log_offset: last_log_offset}
%Transaction{
xid: xmin,
lsn: lsn,
last_log_offset: last_log_offset,
commit_timestamp: DateTime.utc_now()
}
|> Transaction.prepend_change(%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "1"},
Expand Down Expand Up @@ -222,7 +227,12 @@ defmodule Electric.Shapes.ConsumerTest do
Registry.register(ctx.registry, @shape_handle2, ref2)

txn =
%Transaction{xid: xid, lsn: lsn, last_log_offset: last_log_offset}
%Transaction{
xid: xid,
lsn: lsn,
last_log_offset: last_log_offset,
commit_timestamp: DateTime.utc_now()
}
|> Transaction.prepend_change(%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "1"},
Expand Down Expand Up @@ -277,7 +287,12 @@ defmodule Electric.Shapes.ConsumerTest do
)

txn =
%Transaction{xid: xid, lsn: lsn, last_log_offset: last_log_offset}
%Transaction{
xid: xid,
lsn: lsn,
last_log_offset: last_log_offset,
commit_timestamp: DateTime.utc_now()
}
|> Transaction.prepend_change(%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "1"},
Expand Down Expand Up @@ -359,7 +374,12 @@ defmodule Electric.Shapes.ConsumerTest do
)

txn =
%Transaction{xid: xid, lsn: lsn, last_log_offset: last_log_offset}
%Transaction{
xid: xid,
lsn: lsn,
last_log_offset: last_log_offset,
commit_timestamp: DateTime.utc_now()
}
|> Transaction.prepend_change(%Changes.TruncatedRelation{
relation: {"public", "test_table"}
})
Expand Down Expand Up @@ -509,7 +529,12 @@ defmodule Electric.Shapes.ConsumerTest do
lsn = Lsn.from_string("0/10")

txn =
%Transaction{xid: 150, lsn: lsn, last_log_offset: LogOffset.new(lsn, 0)}
%Transaction{
xid: 150,
lsn: lsn,
last_log_offset: LogOffset.new(lsn, 0),
commit_timestamp: DateTime.utc_now()
}
|> Transaction.prepend_change(%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "1"},
Expand Down Expand Up @@ -632,7 +657,12 @@ defmodule Electric.Shapes.ConsumerTest do
ref = Shapes.Consumer.monitor(ctx.stack_id, shape_handle)

txn =
%Transaction{xid: 11, lsn: lsn, last_log_offset: LogOffset.new(lsn, 2)}
%Transaction{
xid: 11,
lsn: lsn,
last_log_offset: LogOffset.new(lsn, 2),
commit_timestamp: DateTime.utc_now()
}
|> Transaction.prepend_change(%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "2"},
Expand Down Expand Up @@ -679,7 +709,12 @@ defmodule Electric.Shapes.ConsumerTest do
ref = Shapes.Consumer.monitor(ctx.stack_id, shape_handle)

txn1 =
%Transaction{xid: 9, lsn: lsn1, last_log_offset: LogOffset.new(lsn1, 2)}
%Transaction{
xid: 9,
lsn: lsn1,
last_log_offset: LogOffset.new(lsn1, 2),
commit_timestamp: DateTime.utc_now()
}
|> Transaction.prepend_change(%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "2"},
Expand All @@ -692,7 +727,12 @@ defmodule Electric.Shapes.ConsumerTest do
})

txn2 =
%Transaction{xid: 10, lsn: lsn2, last_log_offset: LogOffset.new(lsn2, 2)}
%Transaction{
xid: 10,
lsn: lsn2,
last_log_offset: LogOffset.new(lsn2, 2),
commit_timestamp: DateTime.utc_now()
}
|> Transaction.prepend_change(%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "2"},
Expand Down

0 comments on commit 4e50204

Please sign in to comment.