Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Change when we track and create partitions #1228

Merged
merged 1 commit into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ defmodule Realtime.Application do

region = Application.get_env(:realtime, :region)
:syn.join(RegionNodes, region, self(), node: node())
:ets.new(:active_tenants, [:named_table, :set, :public])

children =
[
Expand Down
26 changes: 0 additions & 26 deletions lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -298,32 +298,6 @@ defmodule Realtime.Tenants do
|> tap(fn _ -> Cache.invalidate_tenant_cache(tenant_id) end)
end

@doc """
Tracks the active tenant by external_id and stores the time when it was tracked in the ETS table named `:active_tenants`.
"""
@spec track_active_tenant(String.t()) :: :ok
def track_active_tenant(external_id) do
:ets.insert(:active_tenants, {external_id, NaiveDateTime.utc_now()})
:ok
end

@doc """
Lists all active tenants from the ETS table named `:active_tenants`.
"""
@spec track_active_tenant(String.t()) :: list({String.t(), NaiveDateTime.t()})
def list_active_tenants() do
:ets.tab2list(:active_tenants)
end

@doc """
Untracks the active tenant by external_id from the ETS table named `:active_tenants`.
"""
@spec untrack_active_tenant(String.t()) :: :ok
def untrack_active_tenant(external_id) do
:ets.delete(:active_tenants, external_id)
:ok
end

defp broadcast_operation_event(action, external_id) do
Phoenix.PubSub.broadcast!(
Realtime.PubSub,
Expand Down
1 change: 0 additions & 1 deletion lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ defmodule Realtime.Tenants.Connect do
connected_users_bucket: connected_users_bucket
} = state

Tenants.track_active_tenant(state.tenant_id)
:ok = Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:invalidate_cache")
send_connected_user_check_message(connected_users_bucket, check_connected_user_interval)

Expand Down
48 changes: 26 additions & 22 deletions lib/realtime/tenants/janitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ defmodule Realtime.Tenants.Janitor do
alias Realtime.Api.Tenant
alias Realtime.Database
alias Realtime.Messages
alias Realtime.Repo
alias Realtime.Tenants
alias Realtime.Tenants.Migrations

@type t :: %__MODULE__{
timer: pos_integer() | nil,
Expand Down Expand Up @@ -52,32 +52,35 @@ defmodule Realtime.Tenants.Janitor do
def init(%__MODULE__{start_after: start_after} = state) do
timer = timer(state) + start_after
Process.send_after(self(), :delete_old_messages, timer)

Logger.info("Janitor started")
{:ok, state}
end

@table_name :"syn_registry_by_name_Elixir.Realtime.Tenants.Connect"
@impl true
def handle_info(:delete_old_messages, state) do
Logger.info("Janitor started")
%{chunks: chunks, tasks: tasks} = state

{:ok, new_tasks} =
Repo.transaction(fn ->
Tenants.list_active_tenants()
|> Stream.map(&elem(&1, 0))
|> Stream.chunk_every(chunks)
|> Stream.map(fn chunks ->
task =
Task.Supervisor.async_nolink(
__MODULE__.TaskSupervisor,
fn -> run_cleanup_on_tenants(chunks) end,
ordered: false
)

{task.ref, chunks}
end)
|> Map.new()
matchspec = [
{{:"$1", :"$2", :"$3", :"$4", :"$5", Node.self()}, [], [:"$1"]}
]

new_tasks =
:ets.select(@table_name, matchspec)
|> Stream.chunk_every(chunks)
|> Stream.map(fn chunks ->
task =
Task.Supervisor.async_nolink(
__MODULE__.TaskSupervisor,
fn -> perform_mantaince_tasks(chunks) end,
ordered: false
)

{task.ref, chunks}
end)
|> Map.new()

Process.send_after(self(), :delete_old_messages, timer(state))

Expand All @@ -93,7 +96,7 @@ defmodule Realtime.Tenants.Janitor do

def handle_info({:DOWN, ref, _, _, :killed}, state) do
%{tasks: tasks} = state
{tenants, tasks} = Map.pop(tasks, ref)
tenants = Map.get(tasks, ref)

log_error(
"JanitorFailedToDeleteOldMessages",
Expand All @@ -110,18 +113,19 @@ defmodule Realtime.Tenants.Janitor do
defp timer(%{timer: timer, randomize: true}), do: timer + :timer.minutes(Enum.random(1..59))
defp timer(%{timer: timer}), do: timer

defp run_cleanup_on_tenants(tenants), do: Enum.map(tenants, &run_cleanup_on_tenant/1)
defp perform_mantaince_tasks(tenants), do: Enum.map(tenants, &perform_mantaince_task/1)

defp run_cleanup_on_tenant(tenant_external_id) do
defp perform_mantaince_task(tenant_external_id) do
Logger.metadata(project: tenant_external_id, external_id: tenant_external_id)
Logger.info("Janitor starting realtime.messages cleanup")

with %Tenant{} = tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_external_id),
{:ok, conn} <- Database.connect(tenant, "realtime_janitor", 1),
:ok <- Messages.delete_old_messages(conn) do
:ok <- Messages.delete_old_messages(conn),
:ok <- Migrations.create_partitions(conn) do
filipecabaco marked this conversation as resolved.
Show resolved Hide resolved
Logger.info("Janitor finished")

GenServer.stop(conn)
filipecabaco marked this conversation as resolved.
Show resolved Hide resolved
Tenants.untrack_active_tenant(tenant_external_id)
:ok
end
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.33.48",
version: "2.33.49",
elixir: "~> 1.16.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
8 changes: 0 additions & 8 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
defmodule Realtime.Tenants.ConnectTest do
# async: false due to the fact that multiple operations against the database will use the same connection
alias Realtime.Tenants
use Realtime.DataCase, async: false

import Mock
Expand All @@ -22,13 +21,6 @@ defmodule Realtime.Tenants.ConnectTest do
assert is_pid(db_conn)
end

test "on connect, tracks tenant as active", %{tenant: tenant} do
assert {:ok, _} = Connect.lookup_or_start_connection(tenant.external_id)
:timer.sleep(500)

assert Enum.find(Tenants.list_active_tenants(), &(elem(&1, 0) == tenant.external_id))
end

test "on database disconnect, returns new connection", %{tenant: tenant} do
assert {:ok, old_conn} = Connect.lookup_or_start_connection(tenant.external_id)
:timer.sleep(500)
Expand Down
53 changes: 31 additions & 22 deletions test/realtime/tenants/janitor_test.exs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
defmodule Realtime.Tenants.JanitorTest do
# async: false due to using database process
alias Realtime.Tenants
use Realtime.DataCase, async: false

import Mock
import ExUnit.CaptureLog

alias Realtime.Api.Message
alias Realtime.Api.Tenant
alias Realtime.Database
alias Realtime.Repo
alias Realtime.Tenants.Migrations
alias Realtime.Tenants.Janitor
alias Realtime.Tenants.Migrations
alias Realtime.Tenants.Connect

setup do
dev_tenant = Tenant |> Repo.all() |> hd()
Expand All @@ -27,13 +28,11 @@ defmodule Realtime.Tenants.JanitorTest do
dev_tenant
],
fn tenant ->
tenant = Repo.preload(tenant, [:extensions])
[%{settings: settings} | _] = tenant.extensions
migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings}
Migrations.run_migrations(migrations)
tenant = Repo.preload(tenant, :extensions)
Connect.lookup_or_start_connection(tenant.external_id)
:timer.sleep(250)
{:ok, conn} = Database.connect(tenant, "realtime_test", 1)
clean_table(conn, "realtime", "messages")
Tenants.track_active_tenant(tenant.external_id)
tenant
end
)
Expand All @@ -47,13 +46,15 @@ defmodule Realtime.Tenants.JanitorTest do
)

on_exit(fn ->
Enum.each(tenants, &Connect.shutdown(&1.external_id))
:timer.sleep(10)
Application.put_env(:realtime, :janitor_schedule_timer, timer)
end)

%{tenants: tenants}
end

test "cleans messages older than 72 hours from tenants that were active and untracks the user",
test "cleans messages older than 72 hours and creates partitions from tenants that were active and untracks the user",
%{
tenants: tenants
} do
Expand All @@ -73,20 +74,22 @@ defmodule Realtime.Tenants.JanitorTest do
|> Enum.reject(&(NaiveDateTime.compare(limit, &1.inserted_at) == :gt))
|> MapSet.new()

start_supervised!(Janitor)
Process.sleep(500)

current =
Enum.map(tenants, fn tenant ->
{:ok, conn} = Database.connect(tenant, "realtime_test", 1)
{:ok, res} = Repo.all(conn, from(m in Message), Message)
res
end)
|> List.flatten()
|> MapSet.new()
with_mock Migrations, create_partitions: fn _ -> :ok end do
start_supervised!(Janitor)
Process.sleep(500)

assert MapSet.difference(current, to_keep) |> MapSet.size() == 0
assert Tenants.list_active_tenants() == []
current =
Enum.map(tenants, fn tenant ->
{:ok, conn} = Database.connect(tenant, "realtime_test", 1)
{:ok, res} = Repo.all(conn, from(m in Message), Message)
res
end)
|> List.flatten()
|> MapSet.new()

assert MapSet.difference(current, to_keep) |> MapSet.size() == 0
assert_called(Migrations.create_partitions(:_))
end
end

test "logs error if fails to connect to tenant" do
Expand All @@ -109,7 +112,13 @@ defmodule Realtime.Tenants.JanitorTest do
]

tenant = tenant_fixture(%{extensions: extensions})
Tenants.track_active_tenant(tenant.external_id)
# Force add a bad tenant
:ets.insert(
:"syn_registry_by_name_Elixir.Realtime.Tenants.Connect",
{tenant.external_id, :undefined, :undefined, :undefined, :undefined, Node.self()}
)

:timer.sleep(250)

assert capture_log(fn ->
start_supervised!(Janitor)
Expand Down
2 changes: 0 additions & 2 deletions test/support/data_case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ defmodule Realtime.DataCase do
Sandbox.mode(Realtime.Repo, {:shared, self()})
end

:ets.match_delete(:active_tenants, :_)

{:ok, conn: Phoenix.ConnTest.build_conn()}
end

Expand Down
Loading