Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in multilevel cache replication #232

Merged
merged 2 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions lib/nebulex/adapters/multilevel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -616,12 +616,18 @@ defmodule Nebulex.Adapters.Multilevel do
end

defp maybe_replicate({value, [level_meta | [_ | _] = levels]}, key, :inclusive) do
ttl = with_dynamic_cache(level_meta, :ttl, [key]) || :infinity

:ok =
Enum.each(levels, fn l_meta ->
_ = with_dynamic_cache(l_meta, :put, [key, value, [ttl: ttl]])
end)
case with_dynamic_cache(level_meta, :ttl, [key]) do
nil ->
# the cache entry expired between the `get` and `ttl` calls
# don't replicate the entry
:ok

ttl ->
Enum.each(levels, fn l_meta ->
_ = with_dynamic_cache(l_meta, :put, [key, value, [ttl: ttl]])
end)
end

value
end
Expand Down
35 changes: 35 additions & 0 deletions test/nebulex/adapters/multilevel_inclusive_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ defmodule Nebulex.Adapters.MultilevelInclusiveTest do

alias Nebulex.Adapters.Local.Generation
alias Nebulex.Cache.Cluster
alias Nebulex.TestCache.DelayedReadAdapter
alias Nebulex.TestCache.Multilevel
alias Nebulex.TestCache.Multilevel.{L1, L2, L3}
alias Nebulex.TestCache.MultilevelWithDelay

@gc_interval :timer.hours(1)

Expand Down Expand Up @@ -155,6 +157,39 @@ defmodule Nebulex.Adapters.MultilevelInclusiveTest do
end
end

describe "delayed multilevel" do
setup_with_dynamic_cache(MultilevelWithDelay, :multilevel_inclusive_with_delay,
model: :inclusive,
levels: [
{MultilevelWithDelay.L1,
name: :multilevel_inclusive_with_delay_l1,
gc_interval: @gc_interval,
backend: :shards,
partitions: 2},
{MultilevelWithDelay.L2,
name: :multilevel_inclusive_with_delay_l2,
gc_interval: @gc_interval,
backend: :shards,
partitions: 2}
]
)

test "does not replicate the data if the cache expires during replication" do
# reading from L2 will take 500ms
DelayedReadAdapter.put_read_delay(500)

# since we call both `get` and `ttl` the total read time will be 1000ms
:ok = MultilevelWithDelay.put(:key, :data, ttl: 700, level: 2)

# the key should expire between the `get` and `tl` calls, so the data
# should be returned but not replicated
assert MultilevelWithDelay.get(:key) == :data
assert MultilevelWithDelay.get(:key, level: 1) == nil

assert MultilevelWithDelay.ttl(:key) == nil
end
end

describe "distributed levels" do
test "return cluster nodes" do
assert Cluster.get_nodes(:multilevel_inclusive_l2) == [node()]
Expand Down
108 changes: 108 additions & 0 deletions test/support/test_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,114 @@ defmodule Nebulex.TestCache do
end
end

defmodule DelayedReadAdapter do
@moduledoc false

require Nebulex.Adapters.Local

@behaviour Nebulex.Adapter
@behaviour Nebulex.Adapter.Entry
@behaviour Nebulex.Adapter.Queryable

@fallback_adapter Nebulex.Adapters.Local

@impl true
defmacro __before_compile__(opts) do
quote do
require unquote(@fallback_adapter)

unquote(@fallback_adapter).__before_compile__(unquote(Macro.escape(opts)))
end
end

@impl true
defdelegate init(opts), to: @fallback_adapter

@impl true
def get(adapter_meta, key, opts) do
delay()
@fallback_adapter.get(adapter_meta, key, opts)
end

@impl true
def get_all(adapter_meta, list, opts) do
delay()
@fallback_adapter.get_all(adapter_meta, list, opts)
end

@impl true
defdelegate put(adapter_meta, key, value, ttl, on_write, opts), to: @fallback_adapter

@impl true
defdelegate put_all(adapter_meta, entries, ttl, on_write, opts), to: @fallback_adapter

@impl true
defdelegate delete(adapter_meta, key, opts), to: @fallback_adapter

@impl true
defdelegate take(adapter_meta, key, opts), to: @fallback_adapter

@impl true
def has_key?(adapter_meta, key) do
delay()
@fallback_adapter.has_key?(adapter_meta, key)
end

@impl true
def ttl(adapter_meta, key) do
delay()
@fallback_adapter.ttl(adapter_meta, key)
end

@impl true
defdelegate expire(adapter_meta, key, ttl), to: @fallback_adapter

@impl true
defdelegate touch(adapter_meta, key), to: @fallback_adapter

@impl true
defdelegate update_counter(adapter_meta, key, amount, ttl, default, opts), to: @fallback_adapter

@impl true
defdelegate execute(adapter_meta, command, args, opts), to: @fallback_adapter

@impl true
defdelegate stream(adapter_meta, query, opts), to: @fallback_adapter

@read_delay_key {__MODULE__, :read_delay}

def put_read_delay(delay) when is_integer(delay) do
Process.put(@read_delay_key, delay)
end

defp delay do
delay = Process.get(@read_delay_key, 1000)

Process.sleep(delay)
end
end

defmodule MultilevelWithDelay do
@moduledoc false
use Nebulex.Cache,
otp_app: :nebulex,
adapter: Nebulex.Adapters.Multilevel

defmodule L1 do
@moduledoc false
use Nebulex.Cache,
otp_app: :nebulex,
adapter: Nebulex.Adapters.Local
end

defmodule L2 do
@moduledoc false
use Nebulex.Cache,
otp_app: :nebulex,
adapter: Nebulex.TestCache.DelayedReadAdapter
end
end

## Mocks

defmodule AdapterMock do
Expand Down
Loading