From 3122d30d0ce3fbd6c2169541b656bfb2d8b722bf Mon Sep 17 00:00:00 2001 From: sevenhe Date: Thu, 13 Apr 2023 13:48:42 +0800 Subject: [PATCH] Pick the specify worker --- src/grpcbox_channel.erl | 11 +++++++++++ src/grpcbox_client.erl | 1 + test/grpcbox_channel_SUITE.erl | 12 ++++++++++-- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/grpcbox_channel.erl b/src/grpcbox_channel.erl index ab98bd7..c3cbb99 100644 --- a/src/grpcbox_channel.erl +++ b/src/grpcbox_channel.erl @@ -4,6 +4,7 @@ -export([start_link/3, is_ready/1, + get/3, pick/2, pick/3, add_endpoints/2, @@ -60,6 +61,16 @@ start_link(Name, Endpoints, Options) -> is_ready(Name) -> gen_statem:call(?CHANNEL(Name), is_ready). +-spec get(name(), unary | stream, term()) -> + {ok, {pid(), grpcbox_client:interceptor() | undefined}} | + {error, undefined_channel | not_found_endpoint}. +get(Name, CallType, Key) -> + case lists:keyfind(Key, 1, gproc_pool:active_workers(Name)) of + {_, Pid} -> {ok, {Pid, interceptor(Name, CallType)}}; + false -> {error, not_found_endpoint} + end. + + %% @doc Picks a subchannel from a pool using the configured strategy. -spec pick(name(), unary | stream) -> {ok, {pid(), grpcbox_client:interceptor() | undefined}} | diff --git a/src/grpcbox_client.erl b/src/grpcbox_client.erl index 4dd4ffd..e04a78f 100644 --- a/src/grpcbox_client.erl +++ b/src/grpcbox_client.erl @@ -50,6 +50,7 @@ get_channel(Options, Type) -> Key = maps:get(key, Options, undefined), PickStrategy = maps:get(pick_strategy, Options, undefined), case PickStrategy of + specify_worker -> grpcbox_channel:get(Channel, Type, Key); active_worker -> grpcbox_channel:pick({Channel, active}, Type, Key); undefined -> grpcbox_channel:pick(Channel, Type, Key) end. diff --git a/test/grpcbox_channel_SUITE.erl b/test/grpcbox_channel_SUITE.erl index 3bf96d5..05c8246 100644 --- a/test/grpcbox_channel_SUITE.erl +++ b/test/grpcbox_channel_SUITE.erl @@ -6,7 +6,8 @@ add_and_remove_endpoints/1, add_and_remove_endpoints_active_workers/1, pick_worker_strategy/1, - pick_active_worker_strategy/1]). + pick_active_worker_strategy/1, + pick_specify_worker_strategy/1]). -include_lib("eunit/include/eunit.hrl"). @@ -15,7 +16,8 @@ all() -> add_and_remove_endpoints, add_and_remove_endpoints_active_workers, pick_worker_strategy, - pick_active_worker_strategy + pick_active_worker_strategy, + pick_specify_worker_strategy ]. init_per_suite(_Config) -> application:set_env(grpcbox, client, #{channel => []}), @@ -97,6 +99,12 @@ pick_active_worker_strategy(_Config) -> ?assertEqual(error, pick_worker({hash_channel, active})), ok. +pick_specify_worker_strategy(_Config) -> + ?assertMatch({ok, _} ,grpcbox_channel:get(default_channel, stream, {http, "127.0.0.1", 18080, []})), + ?assertEqual({error, not_found_endpoint} ,grpcbox_channel:get(default_channel, stream, {http, "127.0.0.1", 8080, []})), + ?assertEqual({error, not_found_endpoint} ,grpcbox_channel:get(channel_xxx, stream, {http, "127.0.0.1", 8080, []})), + ok. + pick_worker(Name, N) -> {R, _} = grpcbox_channel:pick(Name, unary, N), R.