Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow multiple lock managers at once #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
_build
.viminfo
.vimrc
*.swp
Expand Down
6 changes: 6 additions & 0 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{profiles, [
{test, [
{plugins, [rebar3_proper]},
{deps, [proper]}
]}
]}.
2 changes: 1 addition & 1 deletion src/canal_lock.app.src
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{application, canal_lock,
[{description, "ETS-based lock manager for concurrently variable resource numbers"},
{vsn, "1.0.0"},
{vsn, "1.1.0"},
{registered, []},
{applications, [kernel, stdlib]}]}.
96 changes: 54 additions & 42 deletions src/canal_lock.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,59 +7,68 @@
-behaviour(gen_server).

-export([start_link/1, acquire/3, release/3, stop/0]).
-export([start_link/2, acquire/4, release/4, stop/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
code_change/3, terminate/2]).

-record(state, {locks :: ets:tid(),
refs :: ets:tid(),
mod :: pos_integer()}).

-define(TABLE, ?MODULE).
-define(MAX_DECR_TRIES, 15).

start_link(MaxMultiplier) ->
start_link(?MODULE, MaxMultiplier).

start_link(Name, MaxMultiplier) ->
%% The `MaxMultiplier' value can just be the expected `Max', or an
%% equivalent values in the case you allow, say `X' locks per `N'
%% instances. In this case, `Max' would be equal to `N * MaxMultiplier',
%% and `X =:= MaxMultiplier'
gen_server:start_link({local, ?MODULE}, ?MODULE, MaxMultiplier, []).
gen_server:start_link({local, Name}, ?MODULE, {Name, MaxMultiplier}, []).

acquire(Key, MaxPer, NumResources) ->
acquire(Key, MaxPer, NumResources, 1).
acquire(?MODULE, Key, MaxPer, NumResources, 1).

acquire(Name, Key, MaxPer, NumResources) ->
acquire(Name, Key, MaxPer, NumResources, 1).

acquire(Key, MaxPer, NumResources, Bucket) ->
acquire(Name, Key, MaxPer, NumResources, Bucket) ->
%% Update the counter and get the value as one single write-only operation,
%% exploiting the {write_concurrency, true} option set on the table.
%% If we're at `Max+1' value, we failed to acquire the lock.
Cap = MaxPer+1,
try ets:update_counter(?TABLE, {Key,Bucket}, {2,1,Cap,Cap}) of
try ets:update_counter(Name, {Key,Bucket}, {2,1,Cap,Cap}) of
Cap when NumResources =:= Bucket ->
full;
Cap ->
acquire(Key, MaxPer, NumResources, Bucket+1);
acquire(Name, Key, MaxPer, NumResources, Bucket+1);
N ->
notify_lock(Key),
notify_lock(Name, Key),
{acquired, lock_counter(N, Bucket, Cap)}
catch
error:badarg ->
%% table is either dead, or element not in there.
%% We try to insert it -- will error if the table isn't there --
%% and to acquire the lock at the same time to save an operation
case ets:insert_new(?TABLE, {{Key,Bucket},1}) of
case ets:insert_new(Name, {{Key,Bucket},1}) of
true ->
case Bucket of
1 -> ets:insert(?TABLE, {{Key,highest},1});
_ -> ets:update_counter(?TABLE, {Key,highest}, {2,1})
1 -> ets:insert(Name, {{Key,highest},1});
_ -> ets:update_counter(Name, {Key,highest}, {2,1})
end,
notify_lock(Key),
notify_lock(Name, Key),
{acquired, lock_counter(1, Bucket, Cap)};
false ->
%% Someone else beat us to it
acquire(Key, MaxPer, NumResources, Bucket)
acquire(Name, Key, MaxPer, NumResources, Bucket)
end
end.

release(Key, MaxPer, NumResources) ->
release(?MODULE, Key, MaxPer, NumResources).

release(Name, Key, MaxPer, NumResources) ->
%% TODO: See if this logic needs to be moved inside the lock manager or
%% if it can stay out.
%%
Expand Down Expand Up @@ -95,27 +104,30 @@ release(Key, MaxPer, NumResources) ->
%% was below 0, it's fine, as one of the reincrements we will eventually
%% account for it.
%%
case release_loop(Key, MaxPer) of
case release_loop(Name, Key, MaxPer) of
ok -> ok;
{error, out_of_buckets} -> warn_buckets(Key, NumResources)
end,
notify_unlock(Key).
notify_unlock(Name, Key).

stop() ->
stop(?MODULE).

stop(Name) ->
%% Dirty termination of the server. It won't clean up anything behind other
%% than its own state, but is useful for tests and making sure state is
%% gone.
gen_server:call(?MODULE, stop).
gen_server:call(Name, stop).

notify_lock(Key) ->
gen_server:cast(?MODULE, {lock, self(), Key}).
notify_lock(Name, Key) ->
gen_server:cast(Name, {lock, self(), Key}).

notify_unlock(Key) ->
notify_unlock(Name, Key) ->
%% This call needs to be synchronous to avoid having a process locking
%% itself out of its locks by locking faster than unlocking went!
gen_server:call(?MODULE, {unlock, self(), Key}).
gen_server:call(Name, {unlock, self(), Key}).

init(MaxMultiplier) ->
init({Name, MaxMultiplier}) ->
%% We start the refs table before the locks tablel to minimize the amount
%% of time we may be without a registered process, which would lead to
%% a sequence of registers, followed by crashes when trying to notify this
Expand All @@ -124,7 +136,7 @@ init(MaxMultiplier) ->
%% The Refs table holds its own lookup table: `{Ref, Key}' for crashes, and
%% `{{Pid,Key},Ref}' for proper releases.
Refs = ets:new(lock_refs, [bag, protected]),
Locks = ets:new(?TABLE, [named_table, public, set, {write_concurrency, true}]),
Locks = ets:new(Name, [named_table, public, set, {write_concurrency, true}]),
{ok, #state{locks=Locks,
refs=Refs,
mod=MaxMultiplier}}.
Expand Down Expand Up @@ -163,18 +175,18 @@ handle_cast(Cast, S=#state{}) ->
[Cast]),
{noreply, S}.

handle_info({'DOWN', Ref, process, Pid, _Reason}, S=#state{refs=Tab, mod=Mod}) ->
handle_info({'DOWN', Ref, process, Pid, _Reason}, S=#state{refs=Tab, mod=Mod, locks=Locks}) ->
case ets:lookup(Tab, Ref) of
[{Ref,Key}] ->
ets:delete(Tab, Ref),
ets:delete_object(Tab, {{Pid,Key},Ref}),
%% Because we can't know for sure if the `Mod' value is the final one
%% or not, we may have to decrement a bunch of times in a loop, similar
%% to the external acquisition procedure.
case release_loop(Key, Mod) of
case release_loop(Locks, Key, Mod) of
ok -> ok;
{error, out_of_buckets} -> warn_buckets(Key, find_highest_bucket(Key))
end;
{error, out_of_buckets} -> warn_buckets(Key, find_highest_bucket(Locks, Key))
end;
[] ->
%% we already handled an unlock there, and the process must
%% have died before we stopped monitoring. Alternatively, this
Expand All @@ -195,47 +207,47 @@ terminate(_Reason, _State) ->
ok.


release_loop(Key, MaxPer) ->
HighestBucket = find_highest_bucket(Key),
release_loop(Key, MaxPer, HighestBucket, HighestBucket).
release_loop(Name, Key, MaxPer) ->
HighestBucket = find_highest_bucket(Name, Key),
release_loop(Name, Key, MaxPer, HighestBucket, HighestBucket).

release_loop(Key, MaxPer, NumResources, Bucket) ->
release_loop(Name, Key, MaxPer, NumResources, Bucket) ->
%% This bucket should exist, otherwise we're leaking connections
%% anyway and will need to think hard about solving it.
N = ets:update_counter(?TABLE, {Key,Bucket}, {2, -1}),
N = ets:update_counter(Name, {Key,Bucket}, {2, -1}),
if N < MaxPer, N >= 0 ->
ok;
N =:= MaxPer ->
release_loop_inner({Key,Bucket}, MaxPer),
release_loop_inner(Name, {Key,Bucket}, MaxPer),
ok;
N < 0, Bucket =:= 1 ->
%% We decremented too much, undo this. But because
%% this is the last bucket, this is all we could do.
%% Somehow, something is wrong and we're leaking a lock.
%% We have to give up though.
ets:update_counter(?TABLE, {Key,Bucket}, {2, 1}),
ets:update_counter(Name, {Key,Bucket}, {2, 1}),
{error, out_of_buckets};
N < 0 ->
%% We decremented too much -- this bucket was empty already.
%% Undo this and try the next one.
ets:update_counter(?TABLE, {Key,Bucket}, {2, 1}),
release_loop(Key, MaxPer, NumResources, Bucket-1)
ets:update_counter(Name, {Key,Bucket}, {2, 1}),
release_loop(Name, Key, MaxPer, NumResources, Bucket-1)
end.

release_loop_inner(Key, Max) -> release_loop_inner(Key, Max, ?MAX_DECR_TRIES).
release_loop_inner(Name, Key, Max) -> release_loop_inner(Name, Key, Max, ?MAX_DECR_TRIES).

release_loop_inner(Key, _Max, 0) ->
ets:update_counter(?TABLE, Key, {2, -2, 0, 0});
release_loop_inner(Key, Max, Tries) ->
N = ets:update_counter(?TABLE, Key, {2, -1, 0, 0}),
release_loop_inner(Name, Key, _Max, 0) ->
ets:update_counter(Name, Key, {2, -2, 0, 0});
release_loop_inner(Name, Key, Max, Tries) ->
N = ets:update_counter(Name, Key, {2, -1, 0, 0}),
if N =:= Max ->
release_loop_inner(Key, Max, Tries-1);
release_loop_inner(Name, Key, Max, Tries-1);
N < Max ->
ok
end.

find_highest_bucket(Key) ->
ets:update_counter(?TABLE, {Key,highest}, {2,0}).
find_highest_bucket(Name, Key) ->
ets:update_counter(Name, {Key,highest}, {2,0}).

warn_buckets(Key, Bucket) ->
error_logger:warning_msg("mod=canal_lock at=release_loop_mod "
Expand Down
Binary file added test/.rebar3/erlcinfo
Binary file not shown.
69 changes: 31 additions & 38 deletions test/canal_lock_prop.erl → test/prop_canal_lock.erl
Original file line number Diff line number Diff line change
@@ -1,32 +1,25 @@
-module(canal_lock_prop).
%-include_lib("proper/include/proper.hrl").
-module(prop_canal_lock).
-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").

%-define(PROPMOD, proper).
%-define(PROP(A), {timeout, 120, ?_assert(?PROPMOD:quickcheck(A(), [100]))}).
%
%proper_test_() ->
% {"Run all property-based tests",
% [?PROP(prop_lock_unlock)]}.
%
%prop_lock_unlock() ->
% ?FORALL({Mod,Num,Keys}, {max_per(), num_resources(), unique_keys()},
% begin
% start(Mod),
% Max = Mod*Num,
% Runs = lists:seq(1,Max),
% Locks = [canal_lock:acquire(Key, Mod, Num) || Key <- Keys, _ <- Runs],
% ReLocks1 = [canal_lock:acquire(Key, Mod, Num) || Key <- Keys],
% Unlocks = [canal_lock:release(Key, Mod, Num) || Key <- Keys, _ <- Runs],
% ReLocks2 = [canal_lock:acquire(Key, Mod, Num) || Key <- Keys],
% lists:all(fun is_acquired/1, Locks)
% andalso
% lists:all(fun is_full/1, ReLocks1)
% andalso
% lists:all(fun is_ok/1, Unlocks)
% andalso
% lists:all(fun is_acquired/1, ReLocks2)
% end).
prop_lock_unlock() ->
?FORALL({Mod,Num,Keys}, {max_per(), num_resources(), unique_keys()},
begin
start(Mod),
Max = Mod*Num,
Runs = lists:seq(1,Max),
Locks = [canal_lock:acquire(Key, Mod, Num) || Key <- Keys, _ <- Runs],
ReLocks1 = [canal_lock:acquire(Key, Mod, Num) || Key <- Keys],
Unlocks = [canal_lock:release(Key, Mod, Num) || Key <- Keys, _ <- Runs],
ReLocks2 = [canal_lock:acquire(Key, Mod, Num) || Key <- Keys],
lists:all(fun is_acquired/1, Locks)
andalso
lists:all(fun is_full/1, ReLocks1)
andalso
lists:all(fun is_ok/1, Unlocks)
andalso
lists:all(fun is_acquired/1, ReLocks2)
end).

downsize_release_test() ->
start(5),
Expand Down Expand Up @@ -186,12 +179,12 @@ show_table() ->
% ?debugVal(Tab2),
?debugVal(Tab).

%key() -> integer().
%keys() -> non_empty(list(key())).
%unique_keys() -> ?LET(K, keys(), sets:to_list(sets:from_list(K))).
%
%num_resources() -> ?SUCHTHAT(X, pos_integer(), X > 1 andalso X < 100).
%max_per() -> ?SUCHTHAT(X, pos_integer(), X > 1 andalso X < 200).
key() -> integer().
keys() -> non_empty(list(key())).
unique_keys() -> ?LET(K, keys(), sets:to_list(sets:from_list(K))).

num_resources() -> ?SUCHTHAT(X, pos_integer(), X > 1 andalso X < 100).
max_per() -> ?SUCHTHAT(X, pos_integer(), X > 1 andalso X < 200).

start(MaxPer) ->
case whereis(canal_lock) of
Expand Down Expand Up @@ -255,8 +248,8 @@ until_acquired(Key, Max, Num, Time) ->
is_acquired({acquired,_}) -> true;
is_acquired(_) -> false.

%is_full(full) -> true;
%is_full(_) -> false.
%
%is_ok(ok) -> true;
%is_ok(_) -> false.
is_full(full) -> true;
is_full(_) -> false.

is_ok(ok) -> true;
is_ok(_) -> false.