Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Oct 3, 2024
1 parent cc35bb4 commit 54860fd
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 77 deletions.
2 changes: 1 addition & 1 deletion lib/realtime/api/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule Realtime.Api.Message do

def changeset(message, attrs) do
message
|> cast(attrs, [:topic, :extension, :payload, :event, :private, :inserted_at, :updated_at])
|> cast(attrs, [:topic, :extension, :payload, :event, :private])
|> validate_required([:topic, :extension])
|> put_timestamp(:updated_at)
|> maybe_put_timestamp(:inserted_at)
Expand Down
139 changes: 70 additions & 69 deletions lib/realtime/broadcast_changes/handler.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
defmodule Realtime.BroadcastChanges.Handler do
@moduledoc """
This module is responsible for handling the replication messages sent from Realtime.BroadcastChanges.PostgresReplication.
It will specifically setup the PostgresReplication configuration and handle the messages received from the replication stream on the table "realtime.messages".
## Options
* `:tenant_id` - The tenant's external_id to connect to.
"""
use GenServer
require Logger

Expand All @@ -19,6 +28,59 @@ defmodule Realtime.BroadcastChanges.Handler do

@behaviour PostgresReplication.Handler
@registry Realtime.BroadcastChanges.Handler.Registry
def start_link(opts), do: GenServer.start_link(__MODULE__, opts, opts)

@impl true
def init(opts) do
tenant_id = Keyword.fetch!(opts, :tenant_id)

tenant = Cache.get_tenant_by_external_id(tenant_id)
connection_opts = Database.from_tenant(tenant, "realtime_broadcast_changes", :stop, true)

supervisor =
{:via, PartitionSupervisor,
{Realtime.BroadcastChanges.Listener.DynamicSupervisor, tenant_id}}

name = {:via, Registry, {Realtime.BroadcastChanges.Listener.Registry, tenant_id}}

configuration = %PostgresReplication{
connection_opts: [
hostname: connection_opts.host,
username: connection_opts.user,
password: connection_opts.pass,
database: connection_opts.name,
port: connection_opts.port,
parameters: [
application_name: connection_opts.application_name
]
],
table: "messages",
schema: "realtime",
handler_module: __MODULE__,
opts: [name: name],
metadata: %{tenant_id: tenant_id}
}

children_spec = %{
id: Handler,
start: {PostgresReplication, :start_link, [configuration]},
type: :worker
}

state = %__MODULE__{tenant_id: tenant_id, buffer: [], relations: %{}}

case DynamicSupervisor.start_child(supervisor, children_spec) do
{:ok, pid} ->
{:ok, %{state | postgres_replication_pid: pid}}

{:error, {:already_started, pid}} ->
{:ok, %{state | postgres_replication_pid: pid}}

error ->
log_error("UnableToStartPostgresReplication", error)
{:stop, error}
end
end

@spec name(Tenant.t()) :: term()
def name(%Tenant{external_id: tenant_id}) do
Expand Down Expand Up @@ -60,7 +122,7 @@ defmodule Realtime.BroadcastChanges.Handler do
end

def call(msg, state) do
Logger.info("Unknown message received: #{inspect(%{msg: parse(msg), state: state})}")
Logger.warning("Unknown message received: #{inspect(%{msg: parse(msg), state: state})}")
:noreply
end

Expand All @@ -83,13 +145,12 @@ defmodule Realtime.BroadcastChanges.Handler do
tuple_data
|> Tuple.to_list()
|> Enum.zip(columns)
|> Enum.map(fn
|> Map.new(fn
{nil, %{name: name}} -> {name, nil}
{value, %{name: name, type: "jsonb"}} -> {name, Jason.decode!(value)}
{value, %{name: name, type: "bool"}} -> {name, value == "t"}
{value, %{name: name}} -> {name, value}
end)
|> Map.new()

payload = Map.get(to_broadcast, "payload")

Expand All @@ -98,23 +159,17 @@ defmodule Realtime.BroadcastChanges.Handler do
{:noreply, state}

payload ->
topic = Map.get(to_broadcast, "topic")
private = Map.get(to_broadcast, "private")
event = Map.get(to_broadcast, "event")

id = Map.get(to_broadcast, "id")

payload = Map.put(payload, "id", id)
id = Map.fetch!(to_broadcast, "id")

to_broadcast =
%{
topic: topic,
event: event,
private: private,
payload: payload
topic: Map.fetch!(to_broadcast, "topic"),
event: Map.fetch!(to_broadcast, "event"),
private: Map.fetch!(to_broadcast, "private"),
payload: Map.put(payload, "id", id)
}

buffer = buffer ++ [to_broadcast]
buffer = Enum.reverse([to_broadcast | buffer])
{:noreply, %{state | buffer: buffer}}
end

Expand Down Expand Up @@ -147,58 +202,4 @@ defmodule Realtime.BroadcastChanges.Handler do
log_error("BroadcastChangesHandlerTerminated", reason)
:ok
end

def start_link(opts), do: GenServer.start_link(__MODULE__, opts, opts)

@impl true
def init(opts) do
tenant_id = Keyword.fetch!(opts, :tenant_id)

tenant = Cache.get_tenant_by_external_id(tenant_id)
connection_opts = Database.from_tenant(tenant, "realtime_broadcast_changes", :stop, true)

supervisor =
{:via, PartitionSupervisor,
{Realtime.BroadcastChanges.Listener.DynamicSupervisor, tenant_id}}

name = {:via, Registry, {Realtime.BroadcastChanges.Listener.Registry, tenant_id}}

configuration = %PostgresReplication{
connection_opts: [
hostname: connection_opts.host,
username: connection_opts.user,
password: connection_opts.pass,
database: connection_opts.name,
port: connection_opts.port,
parameters: [
application_name: connection_opts.application_name
]
],
table: "messages",
schema: "realtime",
handler_module: __MODULE__,
opts: [name: name],
metadata: %{tenant_id: tenant_id}
}

children_spec = %{
id: Handler,
start: {PostgresReplication, :start_link, [configuration]},
type: :worker
}

state = %__MODULE__{tenant_id: tenant_id, buffer: [], relations: %{}}

case DynamicSupervisor.start_child(supervisor, children_spec) do
{:ok, pid} ->
{:ok, %{state | postgres_replication_pid: pid}}

{:error, {:already_started, pid}} ->
{:ok, %{state | postgres_replication_pid: pid}}

error ->
log_error("UnableToStartPostgresReplication", error)
{:stop, error}
end
end
end
10 changes: 3 additions & 7 deletions lib/realtime/broadcast_changes/postgres_replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,8 @@ defmodule Realtime.BroadcastChanges.PostgresReplication do
end

@impl true
def init(attrs) do
Logger.info(
"Initializing connection with the status: #{inspect(attrs |> Map.from_struct() |> Map.drop([:connection_opts]))}"
)
def init(%__MODULE__{} = attrs) do
Logger.info("Initializing connection with the status: #{inspect(attrs, pretty: true)}")

publication_name = publication_name(attrs)
replication_slot_name = replication_slot_name(attrs)
Expand Down Expand Up @@ -219,9 +217,7 @@ defmodule Realtime.BroadcastChanges.PostgresReplication do

@impl true
def handle_disconnect(state) do
Logger.error(
"Disconnected from the server: #{inspect(state |> Map.from_struct() |> Map.drop([:connection_opts]))}"
)
Logger.error("Disconnected from the server: #{inspect(state, pretty: true)}")

{:noreply, %{state | step: :disconnected}}
end
Expand Down

0 comments on commit 54860fd

Please sign in to comment.