Skip to content

Commit

Permalink
Pick a subchannel which connection is established
Browse files Browse the repository at this point in the history
  • Loading branch information
heshaoqiong committed Apr 7, 2023
1 parent 0f4b743 commit 86ad40d
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 24 deletions.
6 changes: 5 additions & 1 deletion src/grpcbox_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ init([Name, Endpoints, Options]) ->

gproc_pool:new(Name, BalancerType, [{size, length(Endpoints)},
{auto_size, true}]),
gproc_pool:new({Name, active}, BalancerType, [{size, length(Endpoints)},
{auto_size, true}]),
Data = #data{
pool = Name,
encoding = Encoding,
Expand Down Expand Up @@ -172,10 +174,12 @@ handle_event(_, _, Data) ->
{keep_state, Data}.

terminate({shutdown, force_delete}, _State, #data{pool=Name}) ->
gproc_pool:force_delete(Name);
gproc_pool:force_delete(Name),
gproc_pool:force_delete({Name, active});
terminate(Reason, _State, #data{pool=Name}) ->
[grpcbox_subchannel:stop(Pid, Reason) || {_Channel, Pid} <- gproc_pool:active_workers(Name)],
gproc_pool:delete(Name),
gproc_pool:delete({Name, active}),
ok.

insert_interceptors(Name, Interceptors) ->
Expand Down
6 changes: 5 additions & 1 deletion src/grpcbox_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@
get_channel(Options, Type) ->
Channel = maps:get(channel, Options, default_channel),
Key = maps:get(key, Options, undefined),
grpcbox_channel:pick(Channel, Type, Key).
PickStrategy = maps:get(pick_strategy, Options, undefined),
case PickStrategy of
active_worker -> grpcbox_channel:pick({Channel, active}, Type, Key);
undefined -> grpcbox_channel:pick(Channel, Type, Key)
end.

unary(Ctx, Service, Method, Input, Def, Options) ->
unary(Ctx, filename:join([<<>>, Service, Method]), Input, Def, Options).
Expand Down
53 changes: 41 additions & 12 deletions src/grpcbox_subchannel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
ready/3,
disconnected/3]).

-define(RECONNECT_INTERVAL, 5000).

-record(data, {name :: any(),
endpoint :: grpcbox_channel:endpoint(),
channel :: grpcbox_channel:t(),
Expand Down Expand Up @@ -42,13 +44,13 @@ stop(Pid, Reason) ->

init([Name, Channel, Endpoint, Encoding, StatsHandler]) ->
process_flag(trap_exit, true),

gproc_pool:connect_worker(Channel, Name),
{ok, disconnected, #data{name=Name,
conn=undefined,
info=info_map(Endpoint, Encoding, StatsHandler),
endpoint=Endpoint,
channel=Channel}}.
Data = #data{name=Name,
conn=undefined,
info=info_map(Endpoint, Encoding, StatsHandler),
endpoint=Endpoint,
channel=Channel},
{ok, disconnected, Data, [{next_event, internal, connect}]}.

info_map({http, Host, 80, _}, Encoding, StatsHandler) ->
#{authority => list_to_binary(Host),
Expand All @@ -72,20 +74,28 @@ callback_mode() ->
ready({call, From}, conn, #data{conn=Conn,
info=Info}) ->
{keep_state_and_data, [{reply, From, {ok, Conn, Info}}]};
ready(info, {'EXIT', Pid, _}, Data=#data{conn=Pid, name=Name, channel=Channel}) ->
gproc_pool:disconnect_worker({Channel, active}, Name),
{next_state, disconnected, Data#data{conn=undefined}, [{next_event, internal, connect}]};
ready(info, {timeout, connect}, _Data) ->
keep_state_and_data;
ready(EventType, EventContent, Data) ->
handle_event(EventType, EventContent, Data).

disconnected(internal, connect, Data) ->
do_connect(Data);
disconnected(info, {timeout, connect}, Data) ->
do_connect(Data);
disconnected({call, From}, conn, Data) ->
connect(Data, From, [postpone]);
disconnected(info, {'EXIT', _, _}, #data{conn=undefined}) ->
erlang:send_after(?RECONNECT_INTERVAL, self(), {timeout, connect}),
keep_state_and_data;
disconnected(EventType, EventContent, Data) ->
handle_event(EventType, EventContent, Data).

handle_event({call, From}, info, #data{info=Info}) ->
{keep_state_and_data, [{reply, From, Info}]};
handle_event(info, {'EXIT', Pid, _}, Data=#data{conn=Pid}) ->
{next_state, disconnected, Data#data{conn=undefined}};
handle_event(info, {'EXIT', _, econnrefused}, #data{conn=undefined}) ->
keep_state_and_data;
handle_event({call, From}, shutdown, _) ->
{stop_and_reply, normal, {reply, From, ok}};
handle_event(_, _, _) ->
Expand All @@ -96,28 +106,47 @@ terminate(_Reason, _State, #data{conn=undefined,
channel=Channel}) ->
gproc_pool:disconnect_worker(Channel, Name),
gproc_pool:remove_worker(Channel, Name),
gproc_pool:remove_worker({Channel, active}, Name),
ok;
terminate(normal, _State, #data{conn=Pid,
name=Name,
channel=Channel}) ->
h2_connection:stop(Pid),
gproc_pool:disconnect_worker(Channel, Name),
gproc_pool:remove_worker(Channel, Name),
gproc_pool:disconnect_worker({Channel, active}, Name),
gproc_pool:remove_worker({Channel, active}, Name),
ok;
terminate(Reason, _State, #data{conn=Pid,
name=Name,
channel=Channel}) ->
gproc_pool:disconnect_worker(Channel, Name),
gproc_pool:remove_worker(Channel, Name),
gproc_pool:disconnect_worker({Channel, active}, Name),
gproc_pool:remove_worker({Channel, active}, Name),
exit(Pid, Reason),
ok.

connect(Data=#data{conn=undefined,
endpoint={Transport, Host, Port, SSLOptions}}, From, Actions) ->
do_connect(Data=#data{name=Name, channel=Channel,
conn=undefined, endpoint={Transport, Host, Port, SSLOptions}}) ->
case h2_client:start_link(Transport, Host, Port, options(Transport, SSLOptions),
#{garbage_on_end => true,
stream_callback_mod => grpcbox_client_stream}) of
{ok, Pid} ->
gproc_pool:connect_worker({Channel, active}, Name),
{next_state, ready, Data#data{conn=Pid}};
{error, _} ->
erlang:send_after(?RECONNECT_INTERVAL, self(), {timeout, connect}),
{next_state, disconnected, Data#data{conn=undefined}}
end.

connect(Data=#data{name=Name, channel=Channel,
conn=undefined, endpoint={Transport, Host, Port, SSLOptions}}, From, Actions) ->
case h2_client:start_link(Transport, Host, Port, options(Transport, SSLOptions),
#{garbage_on_end => true,
stream_callback_mod => grpcbox_client_stream}) of
{ok, Pid} ->
gproc_pool:connect_worker({Channel, active}, Name),
{next_state, ready, Data#data{conn=Pid}, Actions};
{error, _}=Error ->
{next_state, disconnected, Data#data{conn=undefined}, [{reply, From, Error}]}
Expand Down
64 changes: 54 additions & 10 deletions test/grpcbox_channel_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,42 @@
init_per_suite/1,
end_per_suite/1,
add_and_remove_endpoints/1,
pick_worker_strategy/1]).
add_and_remove_endpoints_active_workers/1,
pick_worker_strategy/1,
pick_active_worker_strategy/1]).

-include_lib("eunit/include/eunit.hrl").

all() ->
[
add_and_remove_endpoints,
pick_worker_strategy
add_and_remove_endpoints_active_workers,
pick_worker_strategy,
pick_active_worker_strategy
].
init_per_suite(_Config) ->
application:set_env(grpcbox, servers, []),
GrpcOptions = #{service_protos => [route_guide_pb], services => #{'routeguide.RouteGuide' => routeguide_route_guide}},
Servers = [#{grpc_opts => GrpcOptions,
listen_opts => #{port => 18080, ip => {127,0,0,1}}},
#{grpc_opts => GrpcOptions,
listen_opts => #{port => 18081, ip => {127,0,0,1}}},
#{grpc_opts => GrpcOptions,
listen_opts => #{port => 18082, ip => {127,0,0,1}}},
#{grpc_opts => GrpcOptions,
listen_opts => #{port => 18083, ip => {127,0,0,1}}}],
application:set_env(grpcbox, servers, Servers),
application:ensure_all_started(grpcbox),
ct:sleep(1000),
grpcbox_channel_sup:start_link(),
grpcbox_channel_sup:start_child(default_channel, [{https, "127.0.0.1", 8080, #{}}], #{}),
grpcbox_channel_sup:start_child(default_channel, [{http, "127.0.0.1", 18080, #{}}], #{}),
grpcbox_channel_sup:start_child(random_channel,
[{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}],
[{http, "127.0.0.1", 18080, #{}}, {http, "127.0.0.1", 18081, #{}}, {http, "127.0.0.1", 18082, #{}}, {http, "127.0.0.1", 18083, #{}}],
#{balancer => random}),
grpcbox_channel_sup:start_child(hash_channel,
[{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}],
[{http, "127.0.0.1", 18080, #{}}, {http, "127.0.0.1", 18081, #{}}, {http, "127.0.0.1", 18082, #{}}, {http, "127.0.0.1", 18083, #{}}],
#{balancer => hash}),
grpcbox_channel_sup:start_child(direct_channel,
[{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}],
[{http, "127.0.0.1", 18080, #{}}, {http, "127.0.0.1", 18081, #{}}, {http, "127.0.0.1", 18082, #{}}, {http, "127.0.0.4", 18084, #{}}],
#{ balancer => direct}),

_Config.
Expand All @@ -34,11 +48,30 @@ end_per_suite(_Config) ->
application:stop(grpcbox),
ok.


add_and_remove_endpoints(_Config) ->
grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.3", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}]),
grpcbox_channel:add_endpoints(default_channel, [{http, "127.0.0.1", 18081, #{}}, {http, "127.0.0.1", 18082, #{}}, {http, "127.0.0.1", 18083, #{}}]),
?assertEqual(4, length(gproc_pool:active_workers(default_channel))),
grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.1", 18081, #{}}, {https, "127.0.0.1", 18082, #{}}, {https, "127.0.0.1", 18083, #{}}]),
?assertEqual(7, length(gproc_pool:active_workers(default_channel))),
grpcbox_channel:remove_endpoints(default_channel, [{http, "127.0.0.1", 18081, #{}}, {http, "127.0.0.1", 18082, #{}}, {http, "127.0.0.1", 18083, #{}}], normal),
?assertEqual(4, length(gproc_pool:active_workers(default_channel))),
grpcbox_channel:remove_endpoints(default_channel, [{https, "127.0.0.1", 8080, #{}}, {https, "127.0.0.2", 8080, #{}}, {https, "127.0.0.4", 8080, #{}}], normal),
?assertEqual(1, length(gproc_pool:active_workers(default_channel))).
grpcbox_channel:remove_endpoints(default_channel, [{https, "127.0.0.1", 18080, #{}}, {https, "127.0.0.1", 18081, #{}}, {https, "127.0.0.1", 18082, #{}}], normal),
?assertEqual(2, length(gproc_pool:active_workers(default_channel))).

add_and_remove_endpoints_active_workers(_Config) ->
grpcbox_channel:add_endpoints(default_channel, [{http, "127.0.0.1", 18081, #{}}, {http, "127.0.0.1", 18082, #{}}, {http, "127.0.0.1", 18083, #{}}]),
ct:sleep(1000),
?assertEqual(4, length(gproc_pool:active_workers({default_channel, active}))),
grpcbox_channel:add_endpoints(default_channel, [{https, "127.0.0.1", 18081, #{}}, {https, "127.0.0.1", 18082, #{}}, {https, "127.0.0.1", 18083, #{}}]),
ct:sleep(1000),
?assertEqual(4, length(gproc_pool:active_workers({default_channel, active}))),
grpcbox_channel:remove_endpoints(default_channel, [{http, "127.0.0.1", 18081, #{}}, {http, "127.0.0.1", 18082, #{}}, {http, "127.0.0.1", 18083, #{}}], normal),
ct:sleep(1000),
?assertEqual(1, length(gproc_pool:active_workers({default_channel, active}))),
grpcbox_channel:remove_endpoints(default_channel, [{https, "127.0.0.1", 18081, #{}}, {https, "127.0.0.1", 18082, #{}}, {https, "127.0.0.1", 18083, #{}}], normal),
ct:sleep(1000),
?assertEqual(1, length(gproc_pool:active_workers({default_channel, active}))).

pick_worker_strategy(_Config) ->
?assertEqual(ok, pick_worker(default_channel)),
Expand All @@ -51,6 +84,17 @@ pick_worker_strategy(_Config) ->
?assertEqual(error, pick_worker(hash_channel)),
ok.

pick_active_worker_strategy(_Config) ->
ct:sleep(1000),
?assertEqual(ok, pick_worker({default_channel, active})),
?assertEqual(ok, pick_worker({random_channel, active})),
?assertEqual(ok, pick_worker({direct_channel, active}, 1)),
?assertEqual(ok, pick_worker({hash_channel, active}, 1)),
?assertEqual(error, pick_worker({default_channel, active}, 1)),
?assertEqual(error, pick_worker({random_channel, active}, 1)),
?assertEqual(error, pick_worker({direct_channel, active})),
?assertEqual(error, pick_worker({hash_channel, active})),
ok.
pick_worker(Name, N) ->
{R, _} = grpcbox_channel:pick(Name, unary, N),
R.
Expand Down

0 comments on commit 86ad40d

Please sign in to comment.