Skip to content

Commit

Permalink
Merge pull request #123 from basho/develop-3.0-292
Browse files Browse the repository at this point in the history
Develop 3.0 292
  • Loading branch information
martinsumner authored May 7, 2020
2 parents cc5e7c4 + 5cc9624 commit e274204
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 44 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ EUnit-SASL.log
.rebar3
rebar.lock
_build
.eqc-info
11 changes: 6 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
language: erlang
notifications:
webhooks: http://basho-engbot.herokuapp.com/travis?key=cf2b9b83873cc2de2a01fad26e1c98cf2882fb02
email: [email protected]
script: "make dialyzer xref test"
otp_release:
- R16B02
- 20.3.8
- 21.3
- 22.3
script:
- chmod u+x rebar3
- ./rebar3 do upgrade, compile, xref, dialyzer, eunit
2 changes: 1 addition & 1 deletion eqc/reduce_fitting_pulse.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
done/1]).

%% console debugging convenience
-compile(export_all).
-compile([export_all, nowarn_export_all]).

-ifdef(PULSE).
-include_lib("pulse/include/pulse.hrl").
Expand Down
8 changes: 7 additions & 1 deletion eqc/reduce_fitting_pulse_sink.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@
%% pipe fsm-style sink.
-module(reduce_fitting_pulse_sink).

-compile(export_all).
-compile([export_all, nowarn_export_all]).

-compile([{nowarn_deprecated_function,
[{gen_fsm, start_link, 3},
{gen_fsm, sync_send_event, 3},
{gen_fsm, reply, 2}]}]).

%% compiler gets angry if behavior functions are not exported explicitly
-export([init/1,handle_event/3,handle_sync_event/4,terminate/3,
handle_info/3,code_change/4]).
Expand Down
2 changes: 1 addition & 1 deletion eqc/riak_pipe_fitting_eqc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
%% @doc Exercise riak_pipe_fitting.
-module(riak_pipe_fitting_eqc).

-compile(export_all).
-compile([export_all, nowarn_export_all]).

-ifdef(EQC).

Expand Down
8 changes: 3 additions & 5 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
%% -*- mode: erlang -*-
{erl_opts, [%warnings_as_errors,
{erl_opts, [warnings_as_errors,
debug_info,
{platform_define, "^[0-9]+", namespaced_types},
{parse_transform, lager_transform}]}.
{edoc_opts, [{preprocess, true}]}.
{cover_enabled, true}.

{xref_checks,[undefined_function_calls,undefined_functions,locals_not_used,
deprecated_function_calls, deprecated_functions]}.
{xref_checks,[undefined_function_calls,undefined_functions,locals_not_used]}.

{deps, [
gen_fsm_compat,
{riak_core, ".*", {git, "git://github.com/basho/riak_core.git", {branch, "develop-3.0"}}}
]}.

{plugins, [{rebar3_eqc, {git, "https://github.com/Vagabond/rebar3-eqc-plugin", {branch, "master"}}}]}.
{plugins, [{eqc_rebar, {git, "https://github.com/Quviq/eqc-rebar", {branch, "master"}}}]}.
Binary file modified rebar3
Binary file not shown.
3 changes: 1 addition & 2 deletions src/riak_pipe.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
kernel,
stdlib,
sasl,
riak_core,
gen_fsm_compat
riak_core
]},
{mod, { riak_pipe_app, []}},
{env, []}
Expand Down
24 changes: 15 additions & 9 deletions src/riak_pipe_builder.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,21 @@
%% the client asks to find the head fitting.
-module(riak_pipe_builder).

-behaviour(gen_fsm_compat).
-compile({nowarn_deprecated_function,
[{gen_fsm, start_link, 3},
{gen_fsm, sync_send_event, 2},
{gen_fsm, sync_send_event, 3},
{gen_fsm, sync_send_all_state_event, 2}]}).

-behaviour(gen_fsm).

%% API
-export([start_link/2]).
-export([fitting_pids/1,
pipeline/1,
destroy/1]).

%% gen_fsm_compat callbacks
%% gen_fsm callbacks
-export([init/1,
wait_pipeline_shutdown/2,
wait_pipeline_shutdown/3,
Expand All @@ -51,7 +57,7 @@
%% have to transform the 'receive' of the work results
-compile({parse_transform, pulse_instrument}).
%% don't trasnform toplevel test functions
-compile({pulse_replace_module,[{gen_fsm_compat,pulse_gen_fsm}]}).
-compile({pulse_replace_module,[{gen_fsm,pulse_gen_fsm}]}).
-endif.

-record(state, {options :: riak_pipe:exec_opts(),
Expand All @@ -70,7 +76,7 @@
-spec start_link([riak_pipe:fitting_spec()], riak_pipe:exec_opts()) ->
{ok, pid(), reference()} | ignore | {error, term()}.
start_link(Spec, Options) ->
case gen_fsm_compat:start_link(?MODULE, [Spec, Options], []) of
case gen_fsm:start_link(?MODULE, [Spec, Options], []) of
{ok, Pid} ->
{sink, #fitting{ref=Ref}} = lists:keyfind(sink, 1, Options),
{ok, Pid, Ref};
Expand All @@ -84,7 +90,7 @@ start_link(Spec, Options) ->
-spec fitting_pids(pid()) -> {ok, FittingPids::[pid()]} | gone.
fitting_pids(Builder) ->
try
{ok, gen_fsm_compat:sync_send_all_state_event(Builder, fittings)}
{ok, gen_fsm:sync_send_all_state_event(Builder, fittings)}
catch exit:{noproc, _} ->
gone
end.
Expand All @@ -94,24 +100,24 @@ fitting_pids(Builder) ->
%% finished building the pipeline.
-spec pipeline(pid()) -> {ok, #pipe{}} | gone.
pipeline(BuilderPid) ->
gen_fsm_compat:sync_send_event(BuilderPid, pipeline).
gen_fsm:sync_send_event(BuilderPid, pipeline).

%% @doc Shutdown the pipeline built by this builder.
-spec destroy(pid()) -> ok.
destroy(BuilderPid) ->
try
gen_fsm_compat:sync_send_event(BuilderPid, destroy, infinity)
gen_fsm:sync_send_event(BuilderPid, destroy, infinity)
catch exit:_Reason ->
%% the builder exited before the call completed,
%% since we were shutting it down anyway, this is ok
ok
end.

%%%===================================================================
%%% gen_fsm_compat callbacks
%%% gen_fsm callbacks
%%%===================================================================

%% @doc Initialize the builder fsm (gen_fsm_compat callback).
%% @doc Initialize the builder fsm (gen_fsm callback).
-spec init([ [riak_pipe:fitting_spec()] | riak_pipe:exec_opts() ]) ->
{ok, wait_pipeline_shutdown, state()}.
init([Spec, Options]) ->
Expand Down
24 changes: 15 additions & 9 deletions src/riak_pipe_fitting.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@
%% for this stage of the pipeline.
-module(riak_pipe_fitting).

-behaviour(gen_fsm_compat).
-compile({nowarn_deprecated_function,
[{gen_fsm, start_link, 3},
{gen_fsm, send_event, 2},
{gen_fsm, sync_send_event, 2},
{gen_fsm, sync_send_all_state_event, 2}]}).

-behaviour(gen_fsm).

%% API
-export([start_link/4]).
Expand All @@ -34,7 +40,7 @@
-export([validate_fitting/1,
format_name/1]).

%% gen_fsm_compat callbacks
%% gen_fsm callbacks
-export([init/1,
wait_upstream_eoi/2, wait_upstream_eoi/3,
wait_workers_done/3,
Expand All @@ -57,7 +63,7 @@
%% have to transform the 'receive' of the work results
-compile({parse_transform, pulse_instrument}).
%% don't trasnform toplevel test functions
-compile({pulse_replace_module,[{gen_fsm_compat,pulse_gen_fsm}]}).
-compile({pulse_replace_module,[{gen_fsm,pulse_gen_fsm}]}).
-endif.

-record(worker, {partition :: riak_pipe_vnode:partition(),
Expand Down Expand Up @@ -86,7 +92,7 @@
riak_pipe:exec_opts()) ->
{ok, pid(), riak_pipe:fitting()} | ignore | {error, term()}.
start_link(Builder, Spec, Output, Options) ->
case gen_fsm_compat:start_link(?MODULE, [Builder, Spec, Output, Options], []) of
case gen_fsm:start_link(?MODULE, [Builder, Spec, Output, Options], []) of
{ok, Pid} ->
{ok, Pid, fitting_record(Pid, Spec, Output)};
Error ->
Expand All @@ -96,7 +102,7 @@ start_link(Builder, Spec, Output, Options) ->
%% @doc Send an end-of-inputs message to the specified coordinator.
-spec eoi(riak_pipe:fitting()) -> ok.
eoi(#fitting{pid=Pid, ref=Ref, chashfun=C}) when C =/= sink ->
gen_fsm_compat:send_event(Pid, {eoi, Ref}).
gen_fsm:send_event(Pid, {eoi, Ref}).

%% @doc Request the details about this fitting. The ring partition
%% index of the vnode requesting the details is included such
Expand All @@ -108,7 +114,7 @@ eoi(#fitting{pid=Pid, ref=Ref, chashfun=C}) when C =/= sink ->
{ok, details()} | gone.
get_details(#fitting{pid=Pid, ref=Ref}, Partition) ->
try
gen_fsm_compat:sync_send_event(Pid, {get_details, Ref, Partition, self()})
gen_fsm:sync_send_event(Pid, {get_details, Ref, Partition, self()})
catch exit:_ ->
%% catching all exit types here , since we don't care
%% whether the coordinator was gone before we asked ('noproc')
Expand All @@ -124,7 +130,7 @@ get_details(#fitting{pid=Pid, ref=Ref}, Partition) ->
-spec worker_done(riak_pipe:fitting()) -> ok | gone.
worker_done(#fitting{pid=Pid, ref=Ref}) ->
try
gen_fsm_compat:sync_send_event(Pid, {done, Ref, self()})
gen_fsm:sync_send_event(Pid, {done, Ref, self()})
catch exit:_ ->
%% catching all exit types here , since we don't care
%% whether the coordinator was gone before we asked ('noproc')
Expand All @@ -138,7 +144,7 @@ worker_done(#fitting{pid=Pid, ref=Ref}) ->
-spec workers(pid()) -> {ok, [riak_pipe_vnode:partition()]} | gone.
workers(Fitting) ->
try
{ok, gen_fsm_compat:sync_send_all_state_event(Fitting, workers)}
{ok, gen_fsm:sync_send_all_state_event(Fitting, workers)}
catch exit:_ ->
%% catching all exit types here , since we don't care
%% whether the coordinator was gone before we asked ('noproc')
Expand All @@ -148,7 +154,7 @@ workers(Fitting) ->
end.

%%%===================================================================
%%% gen_fsm_compat callbacks
%%% gen_fsm callbacks
%%%===================================================================

%% @doc Initialize the coordinator. This function monitors the
Expand Down
12 changes: 8 additions & 4 deletions src/riak_pipe_sink.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
%% `#pipe_result{}', `#pipe_log{}', and `#pipe_eoi{}'.
-module(riak_pipe_sink).

-compile({nowarn_deprecated_function,
[{gen_fsm, send_event, 2},
{gen_fsm, sync_send_event, 3}]}).

-export([
result/4,
log/4,
Expand All @@ -38,7 +42,7 @@
%% have to transform the 'receive' of the work results
-compile({parse_transform, pulse_instrument}).
%% don't trasnform toplevel test functions
-compile({pulse_replace_module,[{gen_fsm_compat,pulse_gen_fsm}]}).
-compile({pulse_replace_module,[{gen_fsm,pulse_gen_fsm}]}).
-endif.

-export_type([sink_type/0]).
Expand Down Expand Up @@ -125,18 +129,18 @@ send_to_sink(Pid, Msg, {fsm, Period, Timeout}) ->
end.

send_to_sink_fsm(Pid, Msg, _Timeout, false, Count) ->
gen_fsm_compat:send_event(Pid, Msg),
gen_fsm:send_event(Pid, Msg),
put(sink_sync, Count+1),
ok;
send_to_sink_fsm(Pid, Msg, Timeout, true, _Count) ->
try
gen_fsm_compat:sync_send_event(Pid, Msg, Timeout),
gen_fsm:sync_send_event(Pid, Msg, Timeout),
put(sink_sync, 0),
ok
catch
exit:{timeout,_} ->
{error, timeout};
exit:{_Reason,{gen_fsm_compat,sync_send_event,_}} ->
exit:{_Reason,{gen_fsm,sync_send_event,_}} ->
%% we don't care why it died, just that it did ('noproc'
%% and 'normal' have been seen; others could be possible)
{error, sink_died};
Expand Down
18 changes: 11 additions & 7 deletions src/riak_pipe_vnode_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,11 @@
%%
-module(riak_pipe_vnode_worker).

-behaviour(gen_fsm_compat).
-compile({nowarn_deprecated_function,
[{gen_fsm, start_link, 3},
{gen_fsm, send_event, 2}]}).

-behaviour(gen_fsm).

%% API
-export([start_link/3]).
Expand All @@ -140,7 +144,7 @@
send_output/4,
send_output/5]).

%% gen_fsm_compat callbacks
%% gen_fsm callbacks
-export([
init/1,
initial_input_request/2,
Expand Down Expand Up @@ -184,14 +188,14 @@
riak_pipe_fitting:details()) ->
{ok, pid()} | ignore | {error, term()}.
start_link(Partition, VnodePid, FittingDetails) ->
gen_fsm_compat:start_link(?MODULE, [Partition, VnodePid, FittingDetails], []).
gen_fsm:start_link(?MODULE, [Partition, VnodePid, FittingDetails], []).

%% @doc Send input to the worker. Note: this should only be called
%% by the vnode that owns the worker, as the result of the worker
%% asking for its next input.
-spec send_input(pid(), done | {term(), riak_core_apl:preflist()}) -> ok.
send_input(WorkerPid, Input) ->
gen_fsm_compat:send_event(WorkerPid, {input, Input}).
gen_fsm:send_event(WorkerPid, {input, Input}).

%% @doc Ask the worker to merge handoff data from an archived worker.
%% Note: this should only be called by the vnode that owns the
Expand All @@ -200,14 +204,14 @@ send_input(WorkerPid, Input) ->
%% fitting.
-spec send_handoff(pid(), Archive::term()) -> ok.
send_handoff(WorkerPid, Handoff) ->
gen_fsm_compat:send_event(WorkerPid, {handoff, Handoff}).
gen_fsm:send_event(WorkerPid, {handoff, Handoff}).

%% @doc Ask the worker to archive itself. The worker will send the
%% archive data to the owning vnode when it has done so. Once
%% it has sent the archive, the worker shuts down normally.
-spec send_archive(pid()) -> ok.
send_archive(WorkerPid) ->
gen_fsm_compat:send_event(WorkerPid, archive).
gen_fsm:send_event(WorkerPid, archive).

%% @equiv send_output(Output, FromPartition, Details, infinity)
send_output(Output, FromPartition, Details) ->
Expand Down Expand Up @@ -308,7 +312,7 @@ send_output(Output, FromPartition,
end.

%%%===================================================================
%%% gen_fsm_compat callbacks
%%% gen_fsm callbacks
%%%===================================================================

%% @doc Initialize the worker. This function calls the implementing
Expand Down

0 comments on commit e274204

Please sign in to comment.