From efcb79b4e498dde456b22700459fc9a3bc3f7abd Mon Sep 17 00:00:00 2001 From: Drew Date: Fri, 6 Dec 2013 11:46:43 -0500 Subject: [PATCH 1/4] Initial work to get dialyzer up and running. This does not compile yet because of issues pulling type specs out of an include file (.hrl) --- Makefile | 31 +++++++++++++++++++++++++++++++ include/rhc.hrl | 1 + rebar.config | 7 ++++++- src/rhc.erl | 18 +++++++++++++++++- 4 files changed, 55 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 2fa756d..40e0a38 100644 --- a/Makefile +++ b/Makefile @@ -14,3 +14,34 @@ distclean: clean doc: @./rebar doc skip_deps=true + +COMBO_PLT = $(HOME)/.rhc_dialyzer_plt +APPS = kernel stdlib sasl erts eunit +INCLUDES = -I include -I deps +check_plt: all + dialyzer --check_plt --plt $(COMBO_PLT) --apps $(APPS) ./deps/*/ebin + +build_plt: all + dialyzer --build_plt --output_plt $(COMBO_PLT) --apps $(APPS) ./deps/*/ebin + +dialyzer: all + @echo + @echo Use "'make check_plt'" to check PLT prior to using this target. + @echo Use "'make build_plt'" to build PLT prior to using this target. + @echo + @sleep 1 + dialyzer --verbose -Wno_return --plt $(COMBO_PLT) $(INCLUDES) ebin + +typer: $(DEPSOLVER_PLT) + typer --plt $(COMBO_PLT) -r ./src + +plt_info: + dialyzer --plt $(COMBO_PLT) --plt_info + +cleanplt: + @echo + @echo "Are you sure? It takes time to re-build." + @echo Deleting $(COMBO_PLT) in 5 seconds. + @echo + @sleep 5 + rm $(COMBO_PLT) \ No newline at end of file diff --git a/include/rhc.hrl b/include/rhc.hrl index 36ee005..d7f32d4 100644 --- a/include/rhc.hrl +++ b/include/rhc.hrl @@ -26,3 +26,4 @@ port, prefix, options}). + diff --git a/rebar.config b/rebar.config index 0eef421..2cacd10 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,9 @@ -{erl_opts, []}. +{erl_opts, [debug_info, + warn_obsolete_guard, + warn_unused_import, + warn_shadow_vars, + warn_export_vars, + warn_export_all]}. {deps, [ diff --git a/src/rhc.erl b/src/rhc.erl index 9c6a46d..fe639ec 100644 --- a/src/rhc.erl +++ b/src/rhc.erl @@ -67,12 +67,14 @@ -include("raw_http.hrl"). -include("rhc.hrl"). +-include_file("riakc/include/riakc.hrl"). -export_type([rhc/0]). -opaque rhc() :: #rhc{}. %% @doc Create a client for connecting to the default port on localhost. %% @equiv create("127.0.0.1", 8098, "riak", []) +-spec create() -> rhc(). create() -> create("127.0.0.1", 8098, "riak", []). @@ -85,6 +87,7 @@ create() -> %% the Options list. The client id can also be specified by %% adding `{client_id, ID}' to the Options list. %% @spec create(string(), integer(), string(), Options::list()) -> rhc() +-spec create(string(), integer(), string(), list()) -> rhc(). create(IP, Port, Prefix, Opts0) when is_list(IP), is_integer(Port), is_list(Prefix), is_list(Opts0) -> Opts = case proplists:lookup(client_id, Opts0) of @@ -99,18 +102,22 @@ create(IP, Port, Prefix, Opts0) when is_list(IP), is_integer(Port), %% @doc Get the IP this client will connect to. %% @spec ip(rhc()) -> string() +-spec ip(rhc()) -> string(). ip(#rhc{ip=IP}) -> IP. %% @doc Get the Port this client will connect to. %% @spec port(rhc()) -> integer() +-spec port(rhc()) -> integer(). port(#rhc{port=Port}) -> Port. %% @doc Get the prefix this client will use for object URLs %% @spec prefix(rhc()) -> string() +-spec prefix(rhc()) -> string(). prefix(#rhc{prefix=Prefix}) -> Prefix. %% @doc Ping the server by requesting the "/ping" resource. %% @spec ping(rhc()) -> ok|{error, term()} +-spec ping(rhc()) -> ok | {error, term()}. ping(Rhc) -> Url = ping_url(Rhc), case request(get, Url, ["200","204"], [], [], Rhc) of @@ -122,12 +129,14 @@ ping(Rhc) -> %% @doc Get the client ID that this client will use when storing objects. %% @spec get_client_id(rhc()) -> {ok, string()} +-spec get_client_id(rhc()) -> {ok, string()}. get_client_id(Rhc) -> {ok, client_id(Rhc, [])}. %% @doc Get some basic information about the server. The proplist returned %% should include `node' and `server_version' entries. %% @spec get_server_info(rhc()) -> {ok, proplist()}|{error, term()} +-spec get_server_info(rhc()) -> {ok, list()}|{error, term()}. get_server_info(Rhc) -> Url = stats_url(Rhc), case request(get, Url, ["200"], [], [], Rhc) of @@ -140,6 +149,7 @@ get_server_info(Rhc) -> %% @doc Get the list of full stats from a /stats call to the server. %% @spec get_server_stats(rhc()) -> {ok, proplist()}|{error, term()} +-spec get_server_stats(rhc()) -> {ok, list()}|{error, term()}. get_server_stats(Rhc) -> Url = stats_url(Rhc), case request(get, Url, ["200"], [], [], Rhc) of @@ -152,6 +162,8 @@ get_server_stats(Rhc) -> end. %% @equiv get(Rhc, Bucket, Key, []) +%%-spec get(rhc(), bucket(), key()) -> {ok, riakc_obj()}|{error, term()}. +-spec get(rhc(), term(), term()) -> {ok, term()}|{error, term()}. get(Rhc, Bucket, Key) -> get(Rhc, Bucket, Key, []). @@ -169,6 +181,7 @@ get(Rhc, Bucket, Key) -> %% `notfound' if the key was not found. %% @spec get(rhc(), bucket(), key(), proplist()) %% -> {ok, riakc_obj()}|{error, term()} +-spec get(rhc(), bucket(), key(), list()) -> {ok, riakc_obj()}|{error, term()}. get(Rhc, Bucket, Key, Options) -> Qs = get_q_params(Rhc, Options), Url = make_url(Rhc, Bucket, Key, Qs), @@ -326,7 +339,8 @@ delete(Rhc, Bucket, Key) -> %%
`timeout'
%%
The server-side timeout for the write in ms
%% -%% @spec delete(rhc(), bucket(), key(), proplist()) -> ok|{error, term()} +%% @spec delete(rhc(), bucket(), key(), proplist()) -> ≈ +%%-spec delete(rhc(), bucket(), key(), list()) -> ok|{error, term()}. delete(Rhc, Bucket, Key, Options) -> Qs = delete_q_params(Rhc, Options), Url = make_url(Rhc, Bucket, Key, Qs), @@ -343,12 +357,14 @@ delete(Rhc, Bucket, Key, Options) -> %% @equiv delete_obj(Rhc, Obj, []) +%%-spec delete_obj(rhc(), riakc_obj()) -> ok | {error, term()}. delete_obj(Rhc, Obj) -> delete_obj(Rhc, Obj, []). %% @doc Delete the key of the given object, using the contained vector %% clock if present. %% @equiv delete(Rhc, riakc_obj:bucket(Obj), riakc_obj:key(Obj), [{vclock, riakc_obj:vclock(Obj)}|Options]) +%%-spec delete_obj(rhc(), riakc_obj(), list()) -> ok | {error, term()}. delete_obj(Rhc, Obj, Options) -> Bucket = riakc_obj:bucket(Obj), Key = riakc_obj:key(Obj), From 462f239b1bcb9188ee591f7108defd143251d75b Mon Sep 17 00:00:00 2001 From: Drew Date: Fri, 6 Dec 2013 18:13:04 -0500 Subject: [PATCH 2/4] Adds Dialyzer functionality. This patch does not fix all dialyzer warnings. It uses specs generated by Typer with some hand editing to clean them up. The -spec annotations try to match the original edoc specs where possible. However, some of the edoc annotations were out of date. The specs are underspecified in some places to allow dialyzer to run. --- Makefile | 6 +- include/rhc.hrl | 2 +- src/rhc.erl | 210 ++++++++++++++++++++++++++++++------------- src/rhc_bucket.erl | 9 ++ src/rhc_dt.erl | 8 ++ src/rhc_index.erl | 11 ++- src/rhc_listkeys.erl | 11 ++- src/rhc_mapred.erl | 18 ++++ src/rhc_obj.erl | 22 ++++- 9 files changed, 227 insertions(+), 70 deletions(-) diff --git a/Makefile b/Makefile index 40e0a38..8a474d5 100644 --- a/Makefile +++ b/Makefile @@ -19,10 +19,10 @@ COMBO_PLT = $(HOME)/.rhc_dialyzer_plt APPS = kernel stdlib sasl erts eunit INCLUDES = -I include -I deps check_plt: all - dialyzer --check_plt --plt $(COMBO_PLT) --apps $(APPS) ./deps/*/ebin + dialyzer --check_plt --plt $(COMBO_PLT) --apps $(APPS) deps/*/ebin build_plt: all - dialyzer --build_plt --output_plt $(COMBO_PLT) --apps $(APPS) ./deps/*/ebin + dialyzer --build_plt --output_plt $(COMBO_PLT) --apps $(APPS) deps/*/ebin dialyzer: all @echo @@ -33,7 +33,7 @@ dialyzer: all dialyzer --verbose -Wno_return --plt $(COMBO_PLT) $(INCLUDES) ebin typer: $(DEPSOLVER_PLT) - typer --plt $(COMBO_PLT) -r ./src + typer --plt $(COMBO_PLT) $(INCLUDES) -r src plt_info: dialyzer --plt $(COMBO_PLT) --plt_info diff --git a/include/rhc.hrl b/include/rhc.hrl index d7f32d4..6b80ef6 100644 --- a/include/rhc.hrl +++ b/include/rhc.hrl @@ -20,7 +20,7 @@ %% %% ------------------------------------------------------------------- --define(DEFAULT_TIMEOUT, 60000). +-define(DEFAULT_HTTP_TIMEOUT, 60000). -record(rhc, {ip, port, diff --git a/src/rhc.erl b/src/rhc.erl index fe639ec..8aac128 100644 --- a/src/rhc.erl +++ b/src/rhc.erl @@ -67,7 +67,7 @@ -include("raw_http.hrl"). -include("rhc.hrl"). --include_file("riakc/include/riakc.hrl"). +-include_lib("riakc/include/riakc.hrl"). -export_type([rhc/0]). -opaque rhc() :: #rhc{}. @@ -75,6 +75,7 @@ %% @doc Create a client for connecting to the default port on localhost. %% @equiv create("127.0.0.1", 8098, "riak", []) -spec create() -> rhc(). + create() -> create("127.0.0.1", 8098, "riak", []). @@ -86,7 +87,7 @@ create() -> %% Defaults for r, w, dw, rw, and return_body may be passed in %% the Options list. The client id can also be specified by %% adding `{client_id, ID}' to the Options list. -%% @spec create(string(), integer(), string(), Options::list()) -> rhc() + -spec create(string(), integer(), string(), list()) -> rhc(). create(IP, Port, Prefix, Opts0) when is_list(IP), is_integer(Port), is_list(Prefix), is_list(Opts0) -> @@ -101,23 +102,27 @@ create(IP, Port, Prefix, Opts0) when is_list(IP), is_integer(Port), #rhc{ip=IP, port=Port, prefix=Prefix, options=Opts}. %% @doc Get the IP this client will connect to. -%% @spec ip(rhc()) -> string() + -spec ip(rhc()) -> string(). + ip(#rhc{ip=IP}) -> IP. %% @doc Get the Port this client will connect to. -%% @spec port(rhc()) -> integer() + -spec port(rhc()) -> integer(). + port(#rhc{port=Port}) -> Port. %% @doc Get the prefix this client will use for object URLs -%% @spec prefix(rhc()) -> string() + -spec prefix(rhc()) -> string(). + prefix(#rhc{prefix=Prefix}) -> Prefix. %% @doc Ping the server by requesting the "/ping" resource. -%% @spec ping(rhc()) -> ok|{error, term()} + -spec ping(rhc()) -> ok | {error, term()}. + ping(Rhc) -> Url = ping_url(Rhc), case request(get, Url, ["200","204"], [], [], Rhc) of @@ -128,15 +133,17 @@ ping(Rhc) -> end. %% @doc Get the client ID that this client will use when storing objects. -%% @spec get_client_id(rhc()) -> {ok, string()} + -spec get_client_id(rhc()) -> {ok, string()}. + get_client_id(Rhc) -> {ok, client_id(Rhc, [])}. %% @doc Get some basic information about the server. The proplist returned %% should include `node' and `server_version' entries. -%% @spec get_server_info(rhc()) -> {ok, proplist()}|{error, term()} + -spec get_server_info(rhc()) -> {ok, list()}|{error, term()}. + get_server_info(Rhc) -> Url = stats_url(Rhc), case request(get, Url, ["200"], [], [], Rhc) of @@ -148,8 +155,9 @@ get_server_info(Rhc) -> end. %% @doc Get the list of full stats from a /stats call to the server. -%% @spec get_server_stats(rhc()) -> {ok, proplist()}|{error, term()} + -spec get_server_stats(rhc()) -> {ok, list()}|{error, term()}. + get_server_stats(Rhc) -> Url = stats_url(Rhc), case request(get, Url, ["200"], [], [], Rhc) of @@ -162,8 +170,9 @@ get_server_stats(Rhc) -> end. %% @equiv get(Rhc, Bucket, Key, []) -%%-spec get(rhc(), bucket(), key()) -> {ok, riakc_obj()}|{error, term()}. +%% spec get(rhc(), bucket(), key()) -> {ok, riakc_obj()}|{error, term()}. -spec get(rhc(), term(), term()) -> {ok, term()}|{error, term()}. + get(Rhc, Bucket, Key) -> get(Rhc, Bucket, Key, []). @@ -179,9 +188,9 @@ get(Rhc, Bucket, Key) -> %% %% The term in the second position of the error tuple will be %% `notfound' if the key was not found. -%% @spec get(rhc(), bucket(), key(), proplist()) + %% -> {ok, riakc_obj()}|{error, term()} --spec get(rhc(), bucket(), key(), list()) -> {ok, riakc_obj()}|{error, term()}. +-spec get(rhc(), {binary(), bucket()}, key(), list()) -> {ok, riakc_obj()}|{error, term()}. get(Rhc, Bucket, Key, Options) -> Qs = get_q_params(Rhc, Options), Url = make_url(Rhc, Bucket, Key, Qs), @@ -205,6 +214,7 @@ get(Rhc, Bucket, Key, Options) -> end. %% @equiv put(Rhc, Object, []) +-spec put(rhc(),riakc_obj()) -> ok | {error, term()} | {ok, riakc_obj()}. put(Rhc, Object) -> put(Rhc, Object, []). @@ -223,8 +233,9 @@ put(Rhc, Object) -> %% response. `ok' is returned if return_body is false. %% `{ok, Object}' is returned if return_body is true. %% -%% @spec put(rhc(), riakc_obj(), proplist()) + %% -> ok|{ok, riakc_obj()}|{error, term()} +-spec put(rhc(),riakc_obj(),list()) -> ok | {error, term()} | {ok, riakc_obj()}. put(Rhc, Object, Options) -> Qs = put_q_params(Rhc, Options), Bucket = riakc_obj:bucket(Object), @@ -253,6 +264,7 @@ put(Rhc, Object, Options) -> %% @equiv counter_incr(Rhc, Bucket, Key, Amt, []) -spec counter_incr(rhc(), binary(), binary(), integer()) -> ok | {ok, integer()} | {error, term()}. + counter_incr(Rhc, Bucket, Key, Amt) -> counter_incr(Rhc, Bucket, Key, Amt, []). @@ -277,6 +289,7 @@ counter_incr(Rhc, Bucket, Key, Amt) -> %% See the riak docs at http://docs.basho.com/riak/latest/references/apis/http/ for details -spec counter_incr(rhc(), binary(), binary(), integer(), list()) -> ok | {ok, integer()} | {error, term()}. + counter_incr(Rhc, Bucket, Key, Amt, Options) -> Qs = counter_q_params(Rhc, Options), Url = make_counter_url(Rhc, Bucket, Key, Qs), @@ -294,6 +307,7 @@ counter_incr(Rhc, Bucket, Key, Amt, Options) -> %% @doc Get the counter stored at `bucket', `key'. -spec counter_val(rhc(), term(), term()) -> {ok, integer()} | {error, term()}. + counter_val(Rhc, Bucket, Key) -> counter_val(Rhc, Bucket, Key, []). @@ -316,6 +330,7 @@ counter_val(Rhc, Bucket, Key) -> %% %% See the riak docs at http://docs.basho.com/riak/latest/references/apis/http/ fro details -spec counter_val(rhc(), term(), term(), list()) -> {ok, integer()} | {error, term()}. + counter_val(Rhc, Bucket, Key, Options) -> Qs = counter_q_params(Rhc, Options), Url = make_counter_url(Rhc, Bucket, Key, Qs), @@ -327,6 +342,7 @@ counter_val(Rhc, Bucket, Key, Options) -> end. %% @equiv delete(Rhc, Bucket, Key, []) +-spec delete(rhc(),{binary(), bucket()}, binary()) -> ok | {error, term()}. delete(Rhc, Bucket, Key) -> delete(Rhc, Bucket, Key, []). @@ -339,8 +355,8 @@ delete(Rhc, Bucket, Key) -> %%
`timeout'
%%
The server-side timeout for the write in ms
%% -%% @spec delete(rhc(), bucket(), key(), proplist()) -> ≈ -%%-spec delete(rhc(), bucket(), key(), list()) -> ok|{error, term()}. + +-spec delete(rhc(), {binary(), bucket()}, key(), list()) -> ok | {error, term()}. delete(Rhc, Bucket, Key, Options) -> Qs = delete_q_params(Rhc, Options), Url = make_url(Rhc, Bucket, Key, Qs), @@ -357,40 +373,48 @@ delete(Rhc, Bucket, Key, Options) -> %% @equiv delete_obj(Rhc, Obj, []) -%%-spec delete_obj(rhc(), riakc_obj()) -> ok | {error, term()}. +-spec delete_obj(rhc(), riakc_obj()) -> ok | {error, term()}. + delete_obj(Rhc, Obj) -> delete_obj(Rhc, Obj, []). %% @doc Delete the key of the given object, using the contained vector %% clock if present. %% @equiv delete(Rhc, riakc_obj:bucket(Obj), riakc_obj:key(Obj), [{vclock, riakc_obj:vclock(Obj)}|Options]) -%%-spec delete_obj(rhc(), riakc_obj(), list()) -> ok | {error, term()}. +-spec delete_obj(rhc(), riakc_obj(), list()) -> ok | {error, term()}. + delete_obj(Rhc, Obj, Options) -> Bucket = riakc_obj:bucket(Obj), Key = riakc_obj:key(Obj), VClock = riakc_obj:vclock(Obj), delete(Rhc, Bucket, Key, [{vclock, VClock}|Options]). +-spec list_buckets(_) -> none(). list_buckets(Rhc) -> list_buckets(Rhc, undefined). +-spec list_buckets(rhc(),integer()) -> {error, term()} | {ok,[binary()]}. list_buckets(Rhc, BucketType) when is_binary(BucketType) -> list_buckets(Rhc, BucketType, undefined); list_buckets(Rhc, Timeout) -> list_buckets(Rhc, undefined, Timeout). +-spec list_buckets(rhc(), undefined | binary(),integer()) -> {error, term()} | {ok,[binary()]}. list_buckets(Rhc, BucketType, Timeout) -> {ok, ReqId} = stream_list_buckets(Rhc, BucketType, Timeout), rhc_listkeys:wait_for_list(ReqId, Timeout). +-spec stream_list_buckets(rhc()) -> {error, term()} | {ok,reference()}. stream_list_buckets(Rhc) -> stream_list_buckets(Rhc, undefined). +-spec stream_list_buckets(rhc(),_) -> {error, term()} | {ok,reference()}. stream_list_buckets(Rhc, BucketType) when is_binary(BucketType) -> stream_list_buckets(Rhc, BucketType, undefined); stream_list_buckets(Rhc, Timeout) -> stream_list_buckets(Rhc, undefined, Timeout). +-spec stream_list_buckets(rhc(), term(), integer()) -> {error, term()} | {ok, reference()}. stream_list_buckets(Rhc, BucketType, Timeout) -> ParamList0 = [{?Q_BUCKETS, ?Q_STREAM}, {?Q_PROPS, ?Q_FALSE}], @@ -409,16 +433,19 @@ stream_list_buckets(Rhc, BucketType, Timeout) -> {error, Error} -> {error, Error} end. +-spec list_keys(rhc(),_) -> {error,_} | {ok,[binary()]}. list_keys(Rhc, Bucket) -> list_keys(Rhc, Bucket, undefined). %% @doc List the keys in the given bucket. -%% @spec list_keys(rhc(), bucket(), integer()) -> {ok, [key()]}|{error, term()} + +-spec list_keys(rhc(),_,_) -> {error,_} | {ok,[binary()]}. list_keys(Rhc, Bucket, Timeout) -> {ok, ReqId} = stream_list_keys(Rhc, Bucket, Timeout), - rhc_listkeys:wait_for_list(ReqId, ?DEFAULT_TIMEOUT). + rhc_listkeys:wait_for_list(ReqId, ?DEFAULT_HTTP_TIMEOUT). +-spec stream_list_keys(rhc(),_) -> {error,_} | {ok,reference()}. stream_list_keys(Rhc, Bucket) -> stream_list_keys(Rhc, Bucket, undefined). @@ -433,8 +460,9 @@ stream_list_keys(Rhc, Bucket) -> %%
`{error, term()}'
%%
an error occurred
%% -%% @spec stream_list_keys(rhc(), bucket(), integer()) -> + %% {ok, reference()}|{error, term()} +-spec stream_list_keys(rhc(),_,_) -> {error, term()} | {ok,reference()}. stream_list_keys(Rhc, Bucket, Timeout) -> ParamList0 = [{?Q_KEYS, ?Q_STREAM}, {?Q_PROPS, ?Q_FALSE}], @@ -454,27 +482,31 @@ stream_list_keys(Rhc, Bucket, Timeout) -> end. %% @doc Query a secondary index. -%% @spec get_index(rhc(), bucket(), index(), index_query()) -> + %% {ok, index_results()} | {error, term()} +-spec get_index(rhc(), bucket(), term(), term()) -> {error, term()} | {ok, index_results()}. get_index(Rhc, Bucket, Index, Query) -> get_index(Rhc, Bucket, Index, Query, []). %% @doc Query a secondary index. -%% @spec get_index(rhc(), bucket(), index(), index_query(), index_options()) -> + %% {ok, index_results()} | {error, term()} +-spec get_index(rhc(), bucket(), term(), term(), list()) -> {ok,index_results()} | {error, term()}. get_index(Rhc, Bucket, Index, Query, Options) -> {ok, ReqId} = stream_index(Rhc, Bucket, Index, Query, Options), rhc_index:wait_for_index(ReqId). %% @doc Query a secondary index, streaming the results back. -%% @spec stream_index(rhc(), bucket(), index(), index_query()) -> + %% {ok, reference()} | {error, term()} +-spec stream_index(rhc(), bucket(), term(), term()) -> {ok,reference()} | {error,term()}. stream_index(Rhc, Bucket, Index, Query) -> stream_index(Rhc, Bucket, Index, Query, []). %% @doc Query a secondary index, streaming the results back. -%% @spec stream_index(rhc(), bucket(), index(), index_query(), index_options()) -> + %% {ok, reference()} | {error, term()} +-spec stream_index(rhc(),_,_,_,[{atom(),_}]) -> {error,_} | {ok,reference()}. stream_index(Rhc, Bucket, Index, Query, Options) -> ParamList = rhc_index:query_options([{stream, true}|Options]), Url = index_url(Rhc, Bucket, Index, Query, ParamList), @@ -489,7 +521,8 @@ stream_index(Rhc, Bucket, Index, Query, Options) -> end. %% @doc Get the properties of the given bucket. -%% @spec get_bucket(rhc(), bucket()) -> {ok, proplist()}|{error, term()} + +-spec get_bucket(rhc(), {binary(), bucket()}) -> {ok, [proplists:property()]} | {error, term()}. get_bucket(Rhc, Bucket) -> Url = make_url(Rhc, Bucket, undefined, [{?Q_PROPS, ?Q_TRUE}, {?Q_KEYS, ?Q_FALSE}]), @@ -512,7 +545,8 @@ get_bucket(Rhc, Bucket) -> %%
Whether or not this bucket should allow siblings to %% be created for its keys
%% -%% @spec set_bucket(rhc(), bucket(), proplist()) -> ok|{error, term()} + +-spec set_bucket(rhc(),_,list()) -> ok | {error,_}. set_bucket(Rhc, Bucket, Props0) -> Url = make_url(Rhc, Bucket, undefined, [{?Q_PROPS, ?Q_TRUE}]), Headers = [{"Content-Type", "application/json"}], @@ -523,6 +557,7 @@ set_bucket(Rhc, Bucket, Props0) -> {error, Error} -> {error, Error} end. +-spec reset_bucket(rhc(),_) -> ok | {error,_}. reset_bucket(Rhc, Bucket) -> Url = make_url(Rhc, Bucket, undefined, [{?Q_PROPS, ?Q_TRUE}]), case request(delete, Url, ["204"], [], [], Rhc) of @@ -532,7 +567,8 @@ reset_bucket(Rhc, Bucket) -> %% @doc Get the properties of the given bucket. -%% @spec get_bucket_type (rhc(), bucket()) -> {ok, proplist()}|{error, term()} + +-spec get_bucket_type(rhc(),_) -> {error,_} | {ok,[{atom(),_}]}. get_bucket_type(Rhc, Type) -> Url = make_url(Rhc, {Type, undefined}, undefined, [{?Q_PROPS, ?Q_TRUE}, {?Q_KEYS, ?Q_FALSE}]), @@ -547,7 +583,8 @@ get_bucket_type(Rhc, Type) -> %% @doc Set the properties of the given bucket type. %% -%% @spec set_bucket_type(rhc(), bucket(), proplist()) -> ok|{error, term()} + +-spec set_bucket_type(rhc(),_,list()) -> ok | {error,_}. set_bucket_type(Rhc, Type, Props0) -> Url = make_url(Rhc, {Type, undefined}, undefined, [{?Q_PROPS, ?Q_TRUE}]), Headers = [{"Content-Type", "application/json"}], @@ -558,6 +595,7 @@ set_bucket_type(Rhc, Type, Props0) -> {error, Error} -> {error, Error} end. +-spec reset_bucket_type(rhc(),_) -> ok | {error,_}. reset_bucket_type(Rhc, Type) -> Url = make_url(Rhc, {Type, undefined}, undefined, [{?Q_PROPS, ?Q_TRUE}]), case request(delete, Url, ["204"], [], [], Rhc) of @@ -565,23 +603,26 @@ reset_bucket_type(Rhc, Type) -> {error, Error} -> {error, Error} end. -%% @equiv mapred(Rhc, Inputs, Query, DEFAULT_TIMEOUT) +%% @equiv mapred(Rhc, Inputs, Query, DEFAULT_HTTP_TIMEOUT) +-spec mapred(rhc(), binary(), term()) -> {ok, [{term(), term()}]} | {error, term()}. mapred(Rhc, Inputs, Query) -> - mapred(Rhc, Inputs, Query, ?DEFAULT_TIMEOUT). + mapred(Rhc, Inputs, Query, ?DEFAULT_HTTP_TIMEOUT). %% @doc Execute a map/reduce query. See {@link %% rhc_mapred:encode_mapred/2} for details of the allowed formats %% for `Inputs' and `Query'. -%% @spec mapred(rhc(), rhc_mapred:map_input(), + %% [rhc_mapred:query_part()], integer()) %% -> {ok, [rhc_mapred:phase_result()]}|{error, term()} +-spec mapred(rhc(), rhc_mapred:map_input(), [rhc_mapred:query_part()], integer()) -> {ok, [rhc_mapred:phase_result()]}|{error, term()}. mapred(Rhc, Inputs, Query, Timeout) -> {ok, ReqId} = mapred_stream(Rhc, Inputs, Query, self(), Timeout), rhc_mapred:wait_for_mapred(ReqId, Timeout). -%% @equiv mapred_stream(Rhc, Inputs, Query, ClientPid, DEFAULT_TIMEOUT) +%% @equiv mapred_stream(Rhc, Inputs, Query, ClientPid, DEFAULT_HTTP_TIMEOUT) +-spec mapred_stream(rhc(), binary(), term(), term()) -> {ok,reference()} | {error, term()}. mapred_stream(Rhc, Inputs, Query, ClientPid) -> - mapred_stream(Rhc, Inputs, Query, ClientPid, ?DEFAULT_TIMEOUT). + mapred_stream(Rhc, Inputs, Query, ClientPid, ?DEFAULT_HTTP_TIMEOUT). %% @doc Stream map/reduce results to a Pid. Messages sent to the Pid %% will be of the form `{reference(), message()}', @@ -596,9 +637,12 @@ mapred_stream(Rhc, Inputs, Query, ClientPid) -> %%
`{error, term()}'
%%
an error occurred
%% -%% @spec mapred_stream(rhc(), rhc_mapred:mapred_input(), + %% [rhc_mapred:query_phase()], pid(), integer()) %% -> {ok, reference()}|{error, term()} +-spec mapred_stream(rhc(), rhc_mapred:mapred_input(), + [rhc_mapred:query_phase()], pid(), integer()) -> + {ok, reference()}|{error, term()}. mapred_stream(Rhc, Inputs, Query, ClientPid, Timeout) -> Url = mapred_url(Rhc), StartRef = make_ref(), @@ -614,13 +658,14 @@ mapred_stream(Rhc, Inputs, Query, ClientPid, Timeout) -> %% @doc Execute a search query. This command will return an error %% unless executed against a Riak Search cluster. -%% @spec search(rhc(), bucket(), string()) -> + %% {ok, [rhc_mapred:phase_result()]}|{error, term()} +-spec search(rhc(), bucket(), string()) -> {ok,[rhc_mapred:phase_result()]} | {error,term}. search(Rhc, Bucket, SearchQuery) -> %% Run a Map/Reduce operation using reduce_identity to get a list %% of BKeys. IdentityQuery = [{reduce, {modfun, riak_kv_mapreduce, reduce_identity}, none, true}], - case search(Rhc, Bucket, SearchQuery, IdentityQuery, ?DEFAULT_TIMEOUT) of + case search(Rhc, Bucket, SearchQuery, IdentityQuery, ?DEFAULT_HTTP_TIMEOUT) of {ok, [{_, Results}]} -> %% Unwrap the results. {ok, Results}; @@ -631,42 +676,49 @@ search(Rhc, Bucket, SearchQuery) -> %% query. See {@link rhc_mapred:encode_mapred/2} for details of %% the allowed formats for `MRQuery'. This command will return an error %% unless executed against a Riak Search cluster. -%% @spec search(rhc(), bucket(), string(), + %% [rhc_mapred:query_part()], integer()) -> %% {ok, [rhc_mapred:phase_result()]}|{error, term()} +-spec search(rhc(), bucket(), string(),[rhc_mapred:query_part()], integer()) -> + {ok, [rhc_mapred:phase_result()]} | {error,term()}. search(Rhc, Bucket, SearchQuery, MRQuery, Timeout) -> Inputs = {modfun, riak_search, mapred_search, [Bucket, SearchQuery]}, mapred(Rhc, Inputs, MRQuery, Timeout). -%% @equiv mapred_bucket(Rhc, Bucket, Query, DEFAULT_TIMEOUT) +%% @equiv mapred_bucket(Rhc, Bucket, Query, DEFAULT_HTTP_TIMEOUT) +-spec mapred_bucket(rhc(), bucket(), term()) -> {ok, [rhc_mapred:phase_result()]} | {error,term()}. mapred_bucket(Rhc, Bucket, Query) -> - mapred_bucket(Rhc, Bucket, Query, ?DEFAULT_TIMEOUT). + mapred_bucket(Rhc, Bucket, Query, ?DEFAULT_HTTP_TIMEOUT). %% @doc Execute a map/reduce query over all keys in the given bucket. -%% @spec mapred_bucket(rhc(), bucket(), [rhc_mapred:query_phase()], + %% integer()) %% -> {ok, [rhc_mapred:phase_result()]}|{error, term()} +-spec mapred_bucket(rhc(), bucket(), [rhc_mapred:query_phase()], integer()) -> {ok, [rhc_mapred:phase_result()]}|{error, term()}. mapred_bucket(Rhc, Bucket, Query, Timeout) -> {ok, ReqId} = mapred_bucket_stream(Rhc, Bucket, Query, self(), Timeout), rhc_mapred:wait_for_mapred(ReqId, Timeout). %% @doc Stream map/reduce results over all keys in a bucket to a Pid. %% Similar to {@link mapred_stream/5} -%% @spec mapred_bucket_stream(rhc(), bucket(), + %% [rhc_mapred:query_phase()], pid(), integer()) %% -> {ok, reference()}|{error, term()} +-spec mapred_bucket_stream(rhc(), bucket(), + [rhc_mapred:query_phase()], pid(), integer()) + -> {ok, reference()}|{error, term()}. mapred_bucket_stream(Rhc, Bucket, Query, ClientPid, Timeout) -> mapred_stream(Rhc, Bucket, Query, ClientPid, Timeout). %% @doc Fetches the representation of a convergent datatype from Riak. --spec fetch_type(rhc(), {BucketType::binary(), Bucket::binary()}, Key::binary()) -> +-spec fetch_type(rhc(), {binary(), bucket()}, key()) -> {ok, riakc_datatype:datatype()} | {error, term()}. fetch_type(Rhc, BucketAndType, Key) -> fetch_type(Rhc, BucketAndType, Key, []). %% @doc Fetches the representation of a convergent datatype from Riak, %% using the given request options. --spec fetch_type(rhc(), {BucketType::binary(), Bucket::binary()}, Key::binary(), [proplists:property()]) -> +-spec fetch_type(rhc(), {binary(), bucket()}, key(), [proplists:property()]) -> {ok, riakc_datatype:datatype()} | {error, term()}. fetch_type(Rhc, BucketAndType, Key, Options) -> Query = fetch_type_q_params(Rhc, Options), @@ -680,17 +732,18 @@ fetch_type(Rhc, BucketAndType, Key, Options) -> %% @doc Updates the convergent datatype in Riak with local %% modifications stored in the container type. --spec update_type(rhc(), {BucketType::binary(), Bucket::binary()}, Key::binary(), +-spec update_type(rhc(), {binary(), bucket()}, key(), Update::riakc_datatype:update(term())) -> - ok | {ok, Key::binary()} | {ok, riakc_datatype:datatype()} | - {ok, Key::binary(), riakc_datatype:datatype()} | {error, term()}. + ok | {ok, key()} | {ok, riakc_datatype:datatype()} | + {ok, key(), riakc_datatype:datatype()} | {error, term()}. + update_type(Rhc, BucketAndType, Key, Update) -> update_type(Rhc, BucketAndType, Key, Update, []). --spec update_type(rhc(), {BucketType::binary(), Bucket::binary()}, Key::binary(), +-spec update_type(rhc(), {binary(), bucket()}, key(), Update::riakc_datatype:update(term()), [proplists:property()]) -> - ok | {ok, Key::binary()} | {ok, riakc_datatype:datatype()} | - {ok, Key::binary(), riakc_datatype:datatype()} | {error, term()}. + ok | {ok, key()} | {ok, riakc_datatype:datatype()} | + {ok, key(), riakc_datatype:datatype()} | {error, term()}. update_type(_Rhc, _BucketAndType, _Key, undefined, _Options) -> {error, unmodified}; update_type(Rhc, BucketAndType, Key, {Type, Op, Context}, Options) -> @@ -723,8 +776,9 @@ update_type(Rhc, BucketAndType, Key, {Type, Op, Context}, Options) -> %% updates the datatype in Riak. If an existing value is not found, %% but you want the updates to apply anyway, use the 'create' option. -spec modify_type(rhc(), fun((riakc_datatype:datatype()) -> riakc_datatype:datatype()), - {BucketType::binary(), Bucket::binary()}, Key::binary(), [proplists:property()]) -> + {BucketType::binary(), bucket()}, key(), [proplists:property()]) -> ok | {ok, riakc_datatype:datatype()} | {error, term()}. + modify_type(Rhc, Fun, BucketAndType, Key, Options) -> Create = proplists:get_value(create, Options, true), case fetch_type(Rhc, BucketAndType, Key, Options) of @@ -746,7 +800,8 @@ modify_type(Rhc, Fun, BucketAndType, Key, Options) -> %% @doc Get the client ID to use, given the passed options and client. %% Choose the client ID in Options before the one in the client. -%% @spec client_id(rhc(), proplist()) -> client_id() + +-spec client_id(rhc(), list()) -> any(). client_id(#rhc{options=RhcOptions}, Options) -> case proplists:get_value(client_id, Options) of undefined -> @@ -756,7 +811,8 @@ client_id(#rhc{options=RhcOptions}, Options) -> end. %% @doc Generate a random client ID. -%% @spec random_client_id() -> client_id() + +-spec random_client_id() -> string(). random_client_id() -> {{Y,Mo,D},{H,Mi,S}} = erlang:universaltime(), {_,_,NowPart} = now(), @@ -764,7 +820,8 @@ random_client_id() -> base64:encode_to_string(<>). %% @doc Assemble the root URL for the given client -%% @spec root_url(rhc()) -> iolist() + +-spec root_url(rhc()) -> iolist(). root_url(#rhc{ip=Ip, port=Port, options=Opts}) -> Proto = case proplists:get_value(is_ssl, Opts) of true -> @@ -775,21 +832,26 @@ root_url(#rhc{ip=Ip, port=Port, options=Opts}) -> [Proto, "://",Ip,":",integer_to_list(Port),"/"]. %% @doc Assemble the URL for the map/reduce resource -%% @spec mapred_url(rhc()) -> iolist() + +-spec mapred_url(rhc()) -> iolist(). mapred_url(Rhc) -> binary_to_list(iolist_to_binary([root_url(Rhc), "mapred/?chunked=true"])). %% @doc Assemble the URL for the ping resource -%% @spec ping_url(rhc()) -> iolist() + +-spec ping_url(rhc()) -> iolist(). ping_url(Rhc) -> binary_to_list(iolist_to_binary([root_url(Rhc), "ping/"])). %% @doc Assemble the URL for the stats resource -%% @spec stats_url(rhc()) -> iolist() + +-spec stats_url(rhc()) -> iolist(). stats_url(Rhc) -> binary_to_list(iolist_to_binary([root_url(Rhc), "stats/"])). %% @doc Assemble the URL for the 2I resource +-spec index_url(rhc(),{binary(), bucket()}, term(), term(), list()) -> + list() | {error, term()} | {incomplete, list(), binary()}. index_url(Rhc, BucketAndType, Index, Query, Params) -> {Type, Bucket} = extract_bucket_type(BucketAndType), QuerySegments = index_query_segments(Query), @@ -802,6 +864,7 @@ index_url(Rhc, BucketAndType, Index, Query, Params) -> [ ["?", mochiweb_util:urlencode(Params)] || Params =/= []]]). +-spec index_query_segments(_) -> [binary() | string()]. index_query_segments(B) when is_binary(B) -> [ B ]; index_query_segments(I) when is_integer(I) -> @@ -814,6 +877,9 @@ index_query_segments({I1, I2}) when is_integer(I1), [ integer_to_list(I1), integer_to_list(I2) ]; index_query_segments(_) -> []. +-spec index_name({binary_index, term()} | + {integer_index, term()} | + term()) -> term(). index_name({binary_index, B}) -> [B, "_bin"]; index_name({integer_index, I}) -> @@ -821,9 +887,9 @@ index_name({integer_index, I}) -> index_name(Idx) -> Idx. - %% @doc Assemble the URL for the given bucket and key -%% @spec make_url(rhc(), bucket(), key(), proplist()) -> iolist() + +-spec make_url(rhc(), {binary(),bucket() | undefined}, key() | undefined, list()) -> iolist(). make_url(Rhc=#rhc{}, BucketAndType, Key, Query) -> {Type, Bucket} = extract_bucket_type(BucketAndType), {IsKeys, IsProps, IsBuckets} = detect_bucket_flags(Query), @@ -848,6 +914,7 @@ make_counter_url(Rhc, Bucket, Key, Query) -> <<"buckets">>, "/", Bucket, "/", <<"counters">>, "/", Key, "?", [ [mochiweb_util:urlencode(Query)] || Query =/= []]])). +-spec make_datatype_url(rhc(),_, key(),list()) -> string() | {error, list(), iolist()} | {incomplete, list(), binary()}. make_datatype_url(Rhc, BucketAndType, Key, Query) -> case extract_bucket_type(BucketAndType) of {undefined, _B} -> @@ -862,6 +929,7 @@ make_datatype_url(Rhc, BucketAndType, Key, Query) -> end. %% @doc send an ibrowse request +-spec request(atom(), term(), list(), list(), term(), rhc()) -> any(). request(Method, Url, Expect, Headers, Body, Rhc) -> AuthHeader = get_auth_header(Rhc#rhc.options), SSLOptions = get_ssl_options(Rhc#rhc.options), @@ -878,6 +946,7 @@ request(Method, Url, Expect, Headers, Body, Rhc) -> end. %% @doc stream an ibrowse request +-spec request_stream(pid(), atom(), string(), list(), term(), rhc()) -> any(). request_stream(Pid, Method, Url, Headers, Body, Rhc) -> AuthHeader = get_auth_header(Rhc#rhc.options), SSLOptions = get_ssl_options(Rhc#rhc.options), @@ -891,17 +960,20 @@ request_stream(Pid, Method, Url, Headers, Body, Rhc) -> end. %% @doc Get the default options for the given client -%% @spec options(rhc()) -> proplist() + +-spec options(rhc()) -> any(). options(#rhc{options=Options}) -> Options. %% @doc Extract the list of query parameters to use for a GET -%% @spec get_q_params(rhc(), proplist()) -> proplist() + +-spec get_q_params(rhc(),list()) -> list(). get_q_params(Rhc, Options) -> options_list([r,pr,timeout], Options ++ options(Rhc)). %% @doc Extract the list of query parameters to use for a PUT -%% @spec put_q_params(rhc(), proplist()) -> proplist() + +-spec put_q_params(rhc(),list()) -> list(). put_q_params(Rhc, Options) -> options_list([r,w,dw,pr,pw,timeout,asis,{return_body,"returnbody"}], Options ++ options(Rhc)). @@ -913,24 +985,29 @@ counter_q_params(Rhc, Options) -> options_list([r, pr, w, pw, dw, returnvalue, basic_quorum, notfound_ok], Options ++ options(Rhc)). %% @doc Extract the list of query parameters to use for a DELETE -%% @spec delete_q_params(rhc(), proplist()) -> proplist() + +-spec delete_q_params(rhc(),list()) -> list(). delete_q_params(Rhc, Options) -> options_list([r,w,dw,pr,pw,rw,timeout], Options ++ options(Rhc)). +-spec fetch_type_q_params(rhc(),list()) -> list(). fetch_type_q_params(Rhc, Options) -> options_list([r,pr,basic_quorum,notfound_ok,timeout,include_context], Options ++ options(Rhc)). +-spec update_type_q_params(rhc(),list()) -> list(). update_type_q_params(Rhc, Options) -> options_list([r,w,dw,pr,pw,basic_quorum,notfound_ok,timeout,include_context,{return_body, "returnbody"}], Options ++ options(Rhc)). %% @doc Extract the options for the given `Keys' from the possible %% list of `Options'. -%% @spec options_list([Key::atom()|{Key::atom(),Alias::string()}], + %% proplist()) -> proplist() +-spec options_list(list(),_) -> list(). options_list(Keys, Options) -> options_list(Keys, Options, []). +-spec options_list(list(), [proplist:property()], list()) -> list(). options_list([K|Rest], Options, Acc) -> {Key,Alias} = case K of {_, _} -> K; @@ -946,12 +1023,15 @@ options_list([], _, Acc) -> %% @doc Convert a stats-resource response to an erlang-term server %% information proplist. +-spec erlify_server_info(list()) -> [{node,_} | {server_version,_}]. erlify_server_info(Props) -> lists:flatten([ erlify_server_info(K, V) || {K, V} <- Props ]). +-spec erlify_server_info(_,_) -> [] | {node, term()} | {server_version, term()}. erlify_server_info(<<"nodename">>, Name) -> {node, Name}; erlify_server_info(<<"riak_kv_version">>, Vsn) -> {server_version, Vsn}; erlify_server_info(_Ignore, _) -> []. +-spec get_auth_header(list()) -> [{string(), string()}]. get_auth_header(Options) -> case lists:keyfind(credentials, 1, Options) of {credentials, User, Password} -> @@ -962,6 +1042,7 @@ get_auth_header(Options) -> [] end. +-spec get_ssl_options([proplists:property()]) -> [{is_ssl,true} | {ssl_options,list()}]. get_ssl_options(Options) -> case proplists:get_value(is_ssl, Options) of true -> @@ -975,6 +1056,8 @@ get_ssl_options(Options) -> [] end. +-spec extract_bucket_type({binary(), bucket()} | bucket()) -> + {undefined | binary() , bucket()}. extract_bucket_type({<<"default">>, B}) -> {undefined, B}; extract_bucket_type({T,B}) -> @@ -982,6 +1065,7 @@ extract_bucket_type({T,B}) -> extract_bucket_type(B) -> {undefined, B}. +-spec detect_bucket_flags(list()) -> {false,boolean(),boolean()} | {true,boolean(),boolean()}. detect_bucket_flags(Query) -> {proplists:get_value(?Q_KEYS, Query, ?Q_FALSE) =/= ?Q_FALSE, proplists:get_value(?Q_PROPS, Query, ?Q_FALSE) =/= ?Q_FALSE, diff --git a/src/rhc_bucket.erl b/src/rhc_bucket.erl index 1b074e6..98ef0ec 100644 --- a/src/rhc_bucket.erl +++ b/src/rhc_bucket.erl @@ -29,8 +29,10 @@ -include("raw_http.hrl"). +-spec erlify_props([any()]) -> [{atom(),_}]. erlify_props(Props) -> lists:flatten([ erlify_prop(K, V) || {K, V} <- Props ]). +-spec erlify_prop(_,_) -> [] | {atom(),_}. erlify_prop(?JSON_ALLOW_MULT, AM) -> {allow_mult, AM}; erlify_prop(?JSON_BACKEND, B) -> {backend, B}; erlify_prop(?JSON_BASIC_Q, B) -> {basic_quorum, B}; @@ -55,12 +57,14 @@ erlify_prop(?JSON_W, W) -> {w, erlify_quorum(W)}; erlify_prop(?JSON_YOUNG_VC, I) -> {young_vclock, I}; erlify_prop(_Ignore, _) -> []. +-spec erlify_quorum(_) -> 'all' | 'one' | 'quorum' | 'undefined' | integer(). erlify_quorum(?JSON_ALL) -> all; erlify_quorum(?JSON_QUORUM) -> quorum; erlify_quorum(?JSON_ONE) -> one; erlify_quorum(I) when is_integer(I) -> I; erlify_quorum(_) -> undefined. +-spec erlify_repl(_) -> 'false' | 'fullsync' | 'realtime' | 'true' | 'undefined'. erlify_repl(?JSON_REALTIME) -> realtime; erlify_repl(?JSON_FULLSYNC) -> fullsync; erlify_repl(?JSON_BOTH) -> true; %% both is equivalent to true, but only works in 1.2+ @@ -68,6 +72,7 @@ erlify_repl(true) -> true; erlify_repl(false) -> false; erlify_repl(_) -> undefined. +-spec erlify_chash({'struct',[{<<_:24>>,binary()},...]}) -> {atom(),atom()}. erlify_chash({struct, [{?JSON_MOD, Mod}, {?JSON_FUN, Fun}]}=Struct) -> try {binary_to_existing_atom(Mod, utf8), binary_to_existing_atom(Fun, utf8)} @@ -77,12 +82,15 @@ erlify_chash({struct, [{?JSON_MOD, Mod}, {?JSON_FUN, Fun}]}=Struct) -> {binary_to_atom(Mod, utf8), binary_to_atom(Fun, utf8)} end. +-spec erlify_linkfun({'struct',[{<<_:24>>,binary()},...]}) -> {'modfun',atom(),atom()}. erlify_linkfun(Struct) -> {Mod, Fun} = erlify_chash(Struct), {modfun, Mod, Fun}. +-spec httpify_props(list()) -> [{<<_:8,_:_*8>>,_}]. httpify_props(Props) -> lists:flatten([ httpify_prop(K, V) || {K, V} <- Props ]). +-spec httpify_prop(_,_) -> [] | {<<_:8,_:_*8>>,_}. httpify_prop(allow_mult, AM) -> {?JSON_ALLOW_MULT, AM}; httpify_prop(backend, B) -> {?JSON_BACKEND, B}; httpify_prop(basic_quorum, B) -> {?JSON_BASIC_Q, B}; @@ -107,6 +115,7 @@ httpify_prop(w, Q) -> {?JSON_W, Q}; httpify_prop(young_vclock, VC) -> {?JSON_YOUNG_VC, VC}; httpify_prop(_Ignore, _) -> []. +-spec httpify_modfun({atom(),atom()} | {'modfun',atom(),atom()}) -> {'struct',[{_,_},...]}. httpify_modfun({modfun, M, F}) -> httpify_modfun({M, F}); httpify_modfun({M, F}) -> diff --git a/src/rhc_dt.erl b/src/rhc_dt.erl index b72bf74..b5b145d 100644 --- a/src/rhc_dt.erl +++ b/src/rhc_dt.erl @@ -31,6 +31,7 @@ -define(FIELD_PATTERN, "^(.*)_(counter|set|register|flag|map)$"). +-spec datatype_from_json({'struct',[any()]}) -> any(). datatype_from_json({struct, Props}) -> Value = proplists:get_value(<<"value">>, Props), Type = binary_to_existing_atom(proplists:get_value(<<"type">>, Props), utf8), @@ -38,6 +39,7 @@ datatype_from_json({struct, Props}) -> Mod = riakc_datatype:module(Type), Mod:new(decode_value(Type, Value), Context). +-spec decode_value('counter' | 'flag' | 'map' | 'register' | 'set',_) -> any(). decode_value(counter, Value) -> Value; decode_value(set, Value) -> Value; decode_value(flag, Value) -> Value; @@ -48,14 +50,17 @@ decode_value(map, {struct, Fields}) -> {{Name,Type}, decode_value(Type, Value)} end || {Field, Value} <- Fields ]. +-spec field_from_json(binary()) -> {binary() | [binary() | string() | char() | {integer(),integer()} | {'error',[any()],binary()} | {'incomplete',[any()],binary()}] | {integer(),integer()} | {'error',string(),binary()} | {'incomplete',string(),binary()},atom()}. field_from_json(Bin) when is_binary(Bin) -> {match, [Name, BinType]} = re:run(Bin, ?FIELD_PATTERN, [anchored, {capture, all_but_first, binary}]), {Name, binary_to_existing_atom(BinType, utf8)}. +-spec field_to_json({binary(),atom()}) -> <<_:8,_:_*8>>. field_to_json({Name, Type}) when is_binary(Name), is_atom(Type) -> BinType = atom_to_binary(Type, utf8), <>. +-spec decode_error(_,{'ok',_,_,_}) -> any(). decode_error(fetch, {ok, "404", Headers, Body}) -> case proplists:get_value("Content-Type", Headers) of "application/json" -> @@ -75,6 +80,7 @@ decode_error(_, {ok, "403", _, Body}) -> decode_error(_, {ok, _, _, Body}) -> Body. +-spec encode_update_request('counter' | 'flag' | 'map' | 'register' | 'set', term(), term()) -> binary() | {'struct',list()}. encode_update_request(register, {assign, Bin}, _Context) -> {struct, [{<<"assign">>, Bin}]}; encode_update_request(flag, Atom, _Context) -> @@ -89,6 +95,7 @@ encode_update_request(map, {update, Ops}, Context) -> {struct, orddict:to_list(lists:foldl(fun encode_map_op/2, orddict:new(), Ops)) ++ include_context(Context)}. +-spec encode_map_op({'add',{binary(),atom()}} | {'remove',{binary(),atom()}} | {'update',{binary(),'counter' | 'flag' | 'map' | 'register' | 'set'},_},[{_,_}]) -> [{_,_},...]. encode_map_op({add, Entry}, Ops) -> orddict:append(add, field_to_json(Entry), Ops); encode_map_op({remove, Entry}, Ops) -> @@ -103,6 +110,7 @@ encode_map_op({update, {_Key,Type}=Field, Op}, Ops) -> orddict:store(update, {struct, [Update]}, Ops) end. +-spec include_context(undefined | binary() | term()) -> [{<<_:56>>,_}]. include_context(undefined) -> []; include_context(<<>>) -> []; include_context(Bin) -> [{<<"context">>, Bin}]. diff --git a/src/rhc_index.erl b/src/rhc_index.erl index 06db9ca..e4497b4 100644 --- a/src/rhc_index.erl +++ b/src/rhc_index.erl @@ -38,6 +38,7 @@ query_options(Options) -> lists:flatmap(fun query_option/1, Options). +-spec query_option(_) -> [{[1..255,...], list()}]. query_option({timeout, N}) when is_integer(N) -> [{?Q_TIMEOUT, integer_to_list(N)}]; query_option({stream, B}) when is_boolean(B) -> @@ -54,10 +55,11 @@ query_option(_) -> []. %% @doc Collects 2i query results on behalf of the caller. --spec wait_for_index(reference()) -> {ok, ?INDEX_RESULTS{}} | {error, term()}. +-spec wait_for_index(reference()) -> {ok, index_results()} | {error, term()}. wait_for_index(ReqId) -> wait_for_index(ReqId, []). +-spec wait_for_index(_,[index_stream_result()]) -> {'error',_} | {'ok', index_results()}. wait_for_index(ReqId, Acc) -> receive {ReqId, {done, Continuation}} -> @@ -70,12 +72,14 @@ wait_for_index(ReqId, Acc) -> wait_for_index(ReqId, [Res|Acc]) end. +-spec collect_results([index_stream_result()],'undefined' | binary()) -> index_results(). collect_results(Acc, Continuation) -> lists:foldl(fun merge_index_results/2, ?INDEX_RESULTS{keys=[], terms=[], continuation=Continuation}, Acc). +-spec merge_index_results(index_stream_result(),index_results()) -> index_results(). merge_index_results(?INDEX_STREAM_RESULT{keys=KL}, ?INDEX_RESULTS{keys=K0}=Acc) when is_list(KL) -> Acc?INDEX_RESULTS{keys=KL++K0}; @@ -92,6 +96,7 @@ index_acceptor(Pid, PidRef) -> index_acceptor(Pid, PidRef, IbrowseRef) end. +-spec index_acceptor(atom() | pid() | port() | {atom(),atom()},_,_) -> {_,'done' | {'error','timeout' | {_,_}}}. index_acceptor(Pid, PidRef, IBRef) -> receive {ibrowse_async_headers, IBRef, Status, Headers} -> @@ -113,6 +118,7 @@ index_acceptor(Pid, PidRef, IBRef) -> %% @doc Receives multipart chunks from webmachine_multipart and parses %% them into results that can be sent to Pid. %% @private +-spec stream_parts_acceptor(atom() | pid() | port() | {atom(),atom()}, term(), 'done_parts' | term()) -> {term(),'done'}. stream_parts_acceptor(Pid, PidRef, done_parts) -> Pid ! {PidRef, done}; stream_parts_acceptor(Pid, PidRef, {{_Name, _Param, Part},Next}) -> @@ -127,6 +133,7 @@ stream_parts_acceptor(Pid, PidRef, {{_Name, _Param, Part},Next}) -> %% @doc Sends keys or terms to the Pid if they are present in the %% result, otherwise sends nothing. %% @private +-spec maybe_send_results(term(), term(), term(), term()) -> 'ok' | {_,index_stream_result()}. maybe_send_results(_Pid, _PidRef, undefined, undefined) -> ok; maybe_send_results(Pid, PidRef, Keys, Results) -> Pid ! {PidRef, ?INDEX_STREAM_RESULT{keys=Keys, @@ -135,6 +142,7 @@ maybe_send_results(Pid, PidRef, Keys, Results) -> %% @doc Sends the continuation to Pid if it is present in the result, %% otherwise sends nothing. %% @private +-spec maybe_send_continuation(_,_,_) -> 'ok' | {_,{'done',_}}. maybe_send_continuation(_Pid, _PidRef, undefined) -> ok; maybe_send_continuation(Pid, PidRef, Continuation) -> Pid ! {PidRef, {done, Continuation}}. @@ -142,6 +150,7 @@ maybe_send_continuation(Pid, PidRef, Continuation) -> %% @doc "next" fun for the webmachine_multipart streamer - waits for %% an ibrowse message, and then returns it to the streamer for processing %% @private +-spec stream_parts_helper(_,_,_,boolean()) -> fun(() -> {_,'done' | fun(() -> any())}). stream_parts_helper(Pid, PidRef, IbrowseRef, First) -> fun() -> receive diff --git a/src/rhc_listkeys.erl b/src/rhc_listkeys.erl index f64aeef..7f5418a 100644 --- a/src/rhc_listkeys.erl +++ b/src/rhc_listkeys.erl @@ -30,6 +30,7 @@ -include("raw_http.hrl"). -include("rhc.hrl"). +-include_lib("riakc/include/riakc.hrl"). -record(parse_state, {buffer=[], %% unused characters in reverse order brace=0, %% depth of braces in current partial @@ -40,10 +41,13 @@ %% @doc Collect all keylist results, and provide them as one list %% instead of streaming to a Pid. %% @spec wait_for_list(term(), integer()) -> -%% {ok, [key()]}|{error, term()} +%% {ok, [key()]}|{error, term()} +-spec wait_for_list(term(), integer()) -> {ok, [key()]}|{error, term()}. + wait_for_list(ReqId, Timeout) -> wait_for_list(ReqId,Timeout,[]). %% @private +-spec wait_for_list(_,_,[any()]) -> {'error',_} | {'ok',[any()]}. wait_for_list(ReqId, _Timeout0, Acc) -> receive {ReqId, done} -> @@ -56,6 +60,7 @@ wait_for_list(ReqId, _Timeout0, Acc) -> %% @doc first stage of ibrowse response handling - just waits to be %% told what ibrowse request ID to expect +-spec list_acceptor(atom() | pid() | port() | {atom(),atom()},_,_) -> {_,'done' | {'error',_}}. list_acceptor(Pid, PidRef, Type) -> receive {ibrowse_req_id, PidRef, IbrowseRef} -> @@ -64,6 +69,7 @@ list_acceptor(Pid, PidRef, Type) -> %% @doc main loop for ibrowse response handling - parses response and %% sends messaged to client Pid +-spec list_acceptor(atom() | pid() | port() | {atom(),atom()},_,_,#parse_state{},_) -> {_,'done' | {'error',_}}. list_acceptor(Pid,PidRef,IbrowseRef,ParseState,Type) -> receive {ibrowse_async_response_end, IbrowseRef} -> @@ -101,11 +107,13 @@ list_acceptor(Pid,PidRef,IbrowseRef,ParseState,Type) -> end end. +-spec is_empty(#parse_state{}) -> boolean(). is_empty(#parse_state{buffer=[],brace=0,quote=false,escape=false}) -> true; is_empty(#parse_state{}) -> false. +-spec try_parse(binary(),#parse_state{}) -> {[any()],#parse_state{}}. try_parse(Data, #parse_state{buffer=B, brace=D, quote=Q, escape=E}) -> Parse = try_parse(binary_to_list(Data), B, D, Q, E), {KeyLists, NewParseState} = @@ -135,6 +143,7 @@ try_parse(Data, #parse_state{buffer=B, brace=D, quote=Q, escape=E}) -> Parse), {lists:flatten(KeyLists), NewParseState}. +-spec try_parse([byte()],_,_,_,_) -> [[any(),...] | #parse_state{},...]. try_parse([], B, D, Q, E) -> [#parse_state{buffer=B, brace=D, quote=Q, escape=E}]; try_parse([_|Rest],B,D,Q,true) -> diff --git a/src/rhc_mapred.erl b/src/rhc_mapred.erl index df54218..de66e02 100644 --- a/src/rhc_mapred.erl +++ b/src/rhc_mapred.erl @@ -48,10 +48,12 @@ %% {jsanon, {bucket(), key()}}| %% {jsanon, binary()} %% @type linkspec() = binary()|'_' +-spec encode_mapred(binary() | [[any(),...] | {binary() | {_,_},_}] | {binary(),binary()} | {'index',binary() | [[any(),...] | {_,_}] | {binary(),binary()} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_} | {'modfun',atom(),atom(),_} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_,_},{'binary_index',[any()]} | {'integer_index',[any()]},_} | {'modfun',atom(),atom(),_} | {'index',binary() | [[any(),...] | {_,_}] | {binary(),binary()} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_} | {'modfun',atom(),atom(),_} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_,_},{'binary_index',[any()]} | {'integer_index',[any()]},_,_},[{'link',_,_,_} | {'map',{_,_} | {_,_,_},_,_} | {'reduce',{_,_} | {_,_,_},_,_}]) -> any(). encode_mapred(Inputs, Query) -> mochijson2:encode( {struct, [{<<"inputs">>, encode_mapred_inputs(Inputs)}, {<<"query">>, encode_mapred_query(Query)}]}). +-spec encode_mapred_inputs(binary() | [[any(),...] | {binary() | {_,_},_}] | {binary(),binary()} | {'index',binary() | [[any(),...] | {_,_}] | {binary(),binary()} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_} | {'modfun',atom(),atom(),_} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_,_},{'binary_index',[any()]} | {'integer_index',[any()]},_} | {'modfun',atom(),atom(),_} | {'index',binary() | [[any(),...] | {_,_}] | {binary(),binary()} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_} | {'modfun',atom(),atom(),_} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_,_},{'binary_index',[any()]} | {'integer_index',[any()]},_,_}) -> binary() | [binary() | [any(),...]] | {'struct',[{_,_},...]}. encode_mapred_inputs({BucketType, Bucket}) when is_binary(BucketType), is_binary(Bucket) -> [BucketType, Bucket]; @@ -77,6 +79,7 @@ encode_mapred_inputs({modfun, Module, Function, Options}) -> %% [Bucket, Key] %% or %% [Bucket, Key, KeyData] +-spec normalize_mapred_input([any(),...] | {binary() | {binary() | {binary(),binary()},binary()},_}) -> [any(),...]. normalize_mapred_input({Bucket, Key}) when is_binary(Bucket), is_binary(Key) -> [Bucket, Key]; @@ -96,9 +99,11 @@ normalize_mapred_input([Bucket, Key, KeyData]) when is_binary(Bucket), is_binary(Key) -> [Bucket, Key, KeyData]. +-spec encode_mapred_query([{'link',_,_,_} | {'map',{_,_} | {_,_,_},_,_} | {'reduce',{_,_} | {_,_,_},_,_}]) -> [{'struct',[any(),...]}]. encode_mapred_query(Query) when is_list(Query) -> [ encode_mapred_phase(P) || P <- Query ]. +-spec encode_mapred_phase({'link',_,_,_} | {'map',{'jsanon',_} | {'jsfun',_} | {'modfun',atom(),atom()},_,_} | {'reduce',{'jsanon',_} | {'jsfun',_} | {'modfun',atom(),atom()},_,_}) -> {'struct',[{<<_:24,_:_*8>>,{_,_}},...]}. encode_mapred_phase({MR, Fundef, Arg, Keep}) when MR =:= map; MR =:= reduce -> Type = if MR =:= map -> <<"map">>; @@ -151,11 +156,13 @@ encode_mapred_phase({link, Bucket, Tag, Keep}) -> %% @spec wait_for_mapred(term(), integer()) -> %% {ok, [phase_result()]}|{error, term()} %% @type phase_result() = {integer(), [term()]} +-spec wait_for_mapred(reference(),'infinity' | non_neg_integer()) -> {'error',_} | {'ok',[{_,_}]}. wait_for_mapred(ReqId, Timeout) -> wait_for_mapred_first(ReqId, Timeout). %% Wait for the first mapred result, so we know at least one phase %% that will be delivering results. +-spec wait_for_mapred_first(reference(),'infinity' | non_neg_integer()) -> {'error',_} | {'ok',[{_,_}]}. wait_for_mapred_first(ReqId, Timeout) -> case receive_mapred(ReqId, Timeout) of done -> @@ -173,6 +180,7 @@ wait_for_mapred_first(ReqId, Timeout) -> %% of accumulating a single phases's outputs will be more efficient %% than the repeated orddict:append_list/3 used when accumulating %% outputs from multiple phases. +-spec wait_for_mapred_one(reference(),'infinity' | non_neg_integer(),integer(),_) -> {'error',_} | {'ok',[{_,_}]}. wait_for_mapred_one(ReqId, Timeout, Phase, Acc) -> case receive_mapred(ReqId, Timeout) of done -> @@ -192,15 +200,18 @@ wait_for_mapred_one(ReqId, Timeout, Phase, Acc) -> end. %% Single-phase outputs are kept as a reverse list of results. +-spec acc_mapred_one([any()],_) -> any(). acc_mapred_one([R|Rest], Acc) -> acc_mapred_one(Rest, [R|Acc]); acc_mapred_one([], Acc) -> Acc. +-spec finish_mapred_one(integer(),[any()]) -> [{integer(),[any()]},...]. finish_mapred_one(Phase, Acc) -> [{Phase, lists:reverse(Acc)}]. %% Tracking outputs from multiple phases. +-spec wait_for_mapred_many(reference(),'infinity' | non_neg_integer(),[tuple(),...]) -> {'error',_} | {'ok',[{_,_}]}. wait_for_mapred_many(ReqId, Timeout, Acc) -> case receive_mapred(ReqId, Timeout) of done -> @@ -216,6 +227,7 @@ wait_for_mapred_many(ReqId, Timeout, Acc) -> %% Many-phase outputs are kepts as a proplist of reversed lists of %% results. +-spec acc_mapred_many(integer(),[any()],[tuple(),...]) -> [tuple(),...]. acc_mapred_many(Phase, Res, Acc) -> case lists:keytake(Phase, 1, Acc) of {value, {Phase, PAcc}, RAcc} -> @@ -224,12 +236,14 @@ acc_mapred_many(Phase, Res, Acc) -> [{Phase,acc_mapred_one(Res,[])}|Acc] end. +-spec finish_mapred_many([tuple(),...]) -> [{_,[any()]}]. finish_mapred_many(Acc) -> [ {P, lists:reverse(A)} || {P, A} <- lists:keysort(1, Acc) ]. %% Receive one mapred message. -spec receive_mapred(reference(), timeout()) -> done | {mapred, integer(), [term()]} | {error, term()} | timeout. + receive_mapred(ReqId, Timeout) -> receive {ReqId, Msg} -> %% Msg should be `done', `{mapred, Phase, Results}', or @@ -241,6 +255,7 @@ receive_mapred(ReqId, Timeout) -> %% @doc first stage of ibrowse response handling - just waits to be %% told what ibrowse request ID to expect +-spec mapred_acceptor(atom() | pid() | port() | {atom(),atom()},_,'infinity' | non_neg_integer()) -> {_,'done' | {'error','timeout' | {_,_}}}. mapred_acceptor(Pid, PidRef, Timeout) -> receive {ibrowse_req_id, PidRef, IbrowseRef} -> @@ -251,6 +266,7 @@ mapred_acceptor(Pid, PidRef, Timeout) -> %% @doc second stage of ibrowse response handling - waits for headers %% and extracts the boundary of the multipart/mixed message +-spec mapred_acceptor(atom() | pid() | port() | {atom(),atom()},_,'infinity' | non_neg_integer(),_) -> {_,'done' | {'error','timeout' | {_,_}}}. mapred_acceptor(Pid,PidRef,Timeout,IbrowseRef) -> receive {ibrowse_async_headers, IbrowseRef, Status, Headers} -> @@ -275,6 +291,7 @@ mapred_acceptor(Pid,PidRef,Timeout,IbrowseRef) -> %% @doc driver of the webmachine_multipart streamer - handles results %% of the parsing process (sends them to the client) and polls for %% the next part +-spec stream_parts_acceptor(atom() | pid() | port() | {atom(),atom()},_,'done_parts' | {{_,_,binary() | maybe_improper_list(binary() | maybe_improper_list(any(),binary() | []) | byte(),binary() | [])},fun(() -> any())}) -> {_,'done'}. stream_parts_acceptor(Pid,PidRef,done_parts) -> Pid ! {PidRef, done}; stream_parts_acceptor(Pid,PidRef,{{_Name, _Param, Part},Next}) -> @@ -286,6 +303,7 @@ stream_parts_acceptor(Pid,PidRef,{{_Name, _Param, Part},Next}) -> %% @doc "next" fun for the webmachine_multipart streamer - waits for %% an ibrowse message, and then returns it to the streamer for processing +-spec stream_parts_helper(_,_,_,_,boolean()) -> fun(() -> {_,'done' | fun(() -> any())}). stream_parts_helper(Pid, PidRef, Timeout, IbrowseRef, First) -> fun() -> receive diff --git a/src/rhc_obj.erl b/src/rhc_obj.erl index 977680c..1335164 100644 --- a/src/rhc_obj.erl +++ b/src/rhc_obj.erl @@ -30,9 +30,10 @@ -include("raw_http.hrl"). -include("rhc.hrl"). - +-include_lib("riakc/include/riakc.hrl"). %% HTTP -> riakc_obj +-spec make_riakc_obj(binary() | {binary(),binary()},'undefined' | binary(),[any()],binary()) -> riakc_obj(). make_riakc_obj(Bucket, Key, Headers, Body) -> Vclock = base64:decode(proplists:get_value(?HEAD_VCLOCK, Headers, "")), case ctype_from_headers(Headers) of @@ -47,10 +48,12 @@ make_riakc_obj(Bucket, Key, Headers, Body) -> [{headers_to_metadata(Headers), Body}]) end. +-spec ctype_from_headers([any()]) -> {[byte()],_}. ctype_from_headers(Headers) -> mochiweb_util:parse_header( proplists:get_value(?HEAD_CTYPE, Headers)). +-spec vtag_from_headers([any()]) -> any(). vtag_from_headers(Headers) -> %% non-sibling uses ETag, sibling uses Etag %% (note different capitalization on 't') @@ -60,6 +63,7 @@ vtag_from_headers(Headers) -> end. +-spec lastmod_from_headers([any()]) -> 'undefined' | {integer(),integer(),0}. lastmod_from_headers(Headers) -> case proplists:get_value("Last-Modified", Headers) of undefined -> @@ -73,6 +77,7 @@ lastmod_from_headers(Headers) -> 0} % Microseconds end. +-spec decode_siblings(maybe_improper_list(),binary()) -> [{dict(),binary()}]. decode_siblings(Boundary, <<"\r\n",SibBody/binary>>) -> decode_siblings(Boundary, SibBody); decode_siblings(Boundary, SibBody) -> @@ -83,6 +88,7 @@ decode_siblings(Boundary, SibBody) -> element(1, split_binary(Body, size(Body)-2))} %% remove trailing \r\n || {_, {_, Headers}, Body} <- Parts ]. +-spec headers_to_metadata([any()]) -> dict(). headers_to_metadata(Headers) -> UserMeta = extract_user_metadata(Headers), @@ -108,13 +114,16 @@ headers_to_metadata(Headers) -> Entries -> dict:store(?MD_INDEX, Entries, LinkMeta) end. +-spec extract_user_metadata([any()]) -> any(). extract_user_metadata(Headers) -> lists:foldl(fun extract_user_metadata/2, dict:new(), Headers). +-spec extract_user_metadata(_,_) -> any(). extract_user_metadata({?HEAD_USERMETA_PREFIX++K, V}, Dict) -> riakc_obj:set_user_metadata_entry(Dict, {K, V}); extract_user_metadata(_, D) -> D. +-spec extract_links([any()]) -> any(). extract_links(Headers) -> {ok, Re} = re:compile("; *riaktag=\"(.*)\""), Extractor = fun(L, Acc) -> @@ -128,9 +137,11 @@ extract_links(Headers) -> LinkHeader = proplists:get_value(?HEAD_LINK, Headers, []), lists:foldl(Extractor, [], string:tokens(LinkHeader, ",")). +-spec extract_indexes([any()]) -> [{binary(),binary() | integer()}]. extract_indexes(Headers) -> [ {list_to_binary(K), decode_index_value(K,V)} || {?HEAD_INDEX_PREFIX++K, V} <- Headers]. +-spec decode_index_value([byte()],maybe_improper_list(binary() | maybe_improper_list(any(),binary() | []) | char(),binary() | [])) -> binary() | integer(). decode_index_value(K, V) -> case lists:last(string:tokens(K, "_")) of "bin" -> @@ -141,9 +152,11 @@ decode_index_value(K, V) -> %% riakc_obj -> HTTP +-spec serialize_riakc_obj(_, riakc_obj()) -> {[{list(), list()}], binary()}. serialize_riakc_obj(Rhc, Object) -> {make_headers(Rhc, Object), make_body(Object)}. +-spec make_headers(rhc:rhc(), riakc_obj()) -> [{list(),list()}]. make_headers(Rhc, Object) -> MD = riakc_obj:get_update_metadata(Object), CType = case dict:find(?MD_CTYPE, MD) of @@ -164,6 +177,7 @@ make_headers(Rhc, Object) -> encode_indexes(MD) | encode_user_metadata(MD) ]). +-spec encode_links(rhc:rhc(), list()) -> list(). encode_links(_, []) -> []; encode_links(#rhc{prefix=Prefix}, Links) -> {{FirstBucket, FirstKey}, FirstTag} = hd(Links), @@ -174,10 +188,12 @@ encode_links(#rhc{prefix=Prefix}, Links) -> format_link(Prefix, FirstBucket, FirstKey, FirstTag), tl(Links)). +-spec encode_user_metadata(dict()) -> []. encode_user_metadata(_Metadata) -> %% TODO []. +-spec encode_indexes(dict()) -> [{nonempty_maybe_improper_list(any(),[] | {_,_,_}),maybe_improper_list()}]. encode_indexes(MD) -> case dict:find(?MD_INDEX, MD) of {ok, Entries} -> @@ -186,6 +202,7 @@ encode_indexes(MD) -> [] end. +-spec encode_index({term(), binary() | list() | integer()}) -> {list(), list()}. encode_index({Name, IntValue}) when is_integer(IntValue) -> encode_index({Name, integer_to_list(IntValue)}); encode_index({Name, BinValue}) when is_binary(BinValue) -> @@ -194,10 +211,12 @@ encode_index({Name, String}) when is_list(String) -> {?HEAD_INDEX_PREFIX ++ unicode:characters_to_list(Name, latin1), String}. +-spec format_link(_, _, riakc:key(), _) -> string(). format_link(Prefix, Bucket, Key, Tag) -> io_lib:format("; riaktag=\"~s\"", [Prefix, Bucket, Key, Tag]). +-spec make_body(riakc_obj()) -> binary(). make_body(Object) -> case riakc_obj:get_update_value(Object) of Val when is_binary(Val) -> Val; @@ -210,6 +229,7 @@ make_body(Object) -> term_to_binary(Val) end. +-spec is_iolist(term()) -> boolean(). is_iolist(Binary) when is_binary(Binary) -> true; is_iolist(List) when is_list(List) -> lists:all(fun is_iolist/1, List); From ce76c874b23248faeda82202ddf4b74d180ae928 Mon Sep 17 00:00:00 2001 From: Drew Date: Fri, 6 Dec 2013 18:22:10 -0500 Subject: [PATCH 3/4] Clean up some leftover lines from where I replaced the old edoc @spec entries by regex seach and replace. --- src/rhc.erl | 74 +---------------------------------------------------- 1 file changed, 1 insertion(+), 73 deletions(-) diff --git a/src/rhc.erl b/src/rhc.erl index 8aac128..021f6f3 100644 --- a/src/rhc.erl +++ b/src/rhc.erl @@ -75,7 +75,6 @@ %% @doc Create a client for connecting to the default port on localhost. %% @equiv create("127.0.0.1", 8098, "riak", []) -spec create() -> rhc(). - create() -> create("127.0.0.1", 8098, "riak", []). @@ -87,7 +86,6 @@ create() -> %% Defaults for r, w, dw, rw, and return_body may be passed in %% the Options list. The client id can also be specified by %% adding `{client_id, ID}' to the Options list. - -spec create(string(), integer(), string(), list()) -> rhc(). create(IP, Port, Prefix, Opts0) when is_list(IP), is_integer(Port), is_list(Prefix), is_list(Opts0) -> @@ -102,27 +100,21 @@ create(IP, Port, Prefix, Opts0) when is_list(IP), is_integer(Port), #rhc{ip=IP, port=Port, prefix=Prefix, options=Opts}. %% @doc Get the IP this client will connect to. - -spec ip(rhc()) -> string(). ip(#rhc{ip=IP}) -> IP. %% @doc Get the Port this client will connect to. - -spec port(rhc()) -> integer(). - port(#rhc{port=Port}) -> Port. %% @doc Get the prefix this client will use for object URLs - -spec prefix(rhc()) -> string(). prefix(#rhc{prefix=Prefix}) -> Prefix. %% @doc Ping the server by requesting the "/ping" resource. - -spec ping(rhc()) -> ok | {error, term()}. - ping(Rhc) -> Url = ping_url(Rhc), case request(get, Url, ["200","204"], [], [], Rhc) of @@ -135,15 +127,12 @@ ping(Rhc) -> %% @doc Get the client ID that this client will use when storing objects. -spec get_client_id(rhc()) -> {ok, string()}. - get_client_id(Rhc) -> {ok, client_id(Rhc, [])}. %% @doc Get some basic information about the server. The proplist returned %% should include `node' and `server_version' entries. - -spec get_server_info(rhc()) -> {ok, list()}|{error, term()}. - get_server_info(Rhc) -> Url = stats_url(Rhc), case request(get, Url, ["200"], [], [], Rhc) of @@ -155,9 +144,7 @@ get_server_info(Rhc) -> end. %% @doc Get the list of full stats from a /stats call to the server. - -spec get_server_stats(rhc()) -> {ok, list()}|{error, term()}. - get_server_stats(Rhc) -> Url = stats_url(Rhc), case request(get, Url, ["200"], [], [], Rhc) of @@ -169,8 +156,7 @@ get_server_stats(Rhc) -> {error, Error} end. -%% @equiv get(Rhc, Bucket, Key, []) -%% spec get(rhc(), bucket(), key()) -> {ok, riakc_obj()}|{error, term()}. + -spec get(rhc(), term(), term()) -> {ok, term()}|{error, term()}. get(Rhc, Bucket, Key) -> @@ -188,8 +174,6 @@ get(Rhc, Bucket, Key) -> %% %% The term in the second position of the error tuple will be %% `notfound' if the key was not found. - -%% -> {ok, riakc_obj()}|{error, term()} -spec get(rhc(), {binary(), bucket()}, key(), list()) -> {ok, riakc_obj()}|{error, term()}. get(Rhc, Bucket, Key, Options) -> Qs = get_q_params(Rhc, Options), @@ -213,7 +197,6 @@ get(Rhc, Bucket, Key, Options) -> {error, Error} end. -%% @equiv put(Rhc, Object, []) -spec put(rhc(),riakc_obj()) -> ok | {error, term()} | {ok, riakc_obj()}. put(Rhc, Object) -> put(Rhc, Object, []). @@ -234,7 +217,6 @@ put(Rhc, Object) -> %% `{ok, Object}' is returned if return_body is true. %% -%% -> ok|{ok, riakc_obj()}|{error, term()} -spec put(rhc(),riakc_obj(),list()) -> ok | {error, term()} | {ok, riakc_obj()}. put(Rhc, Object, Options) -> Qs = put_q_params(Rhc, Options), @@ -261,10 +243,8 @@ put(Rhc, Object, Options) -> %% @doc Increment the counter stored under `bucket', `key' %% by the given `amount'. -%% @equiv counter_incr(Rhc, Bucket, Key, Amt, []) -spec counter_incr(rhc(), binary(), binary(), integer()) -> ok | {ok, integer()} | {error, term()}. - counter_incr(Rhc, Bucket, Key, Amt) -> counter_incr(Rhc, Bucket, Key, Amt, []). @@ -289,7 +269,6 @@ counter_incr(Rhc, Bucket, Key, Amt) -> %% See the riak docs at http://docs.basho.com/riak/latest/references/apis/http/ for details -spec counter_incr(rhc(), binary(), binary(), integer(), list()) -> ok | {ok, integer()} | {error, term()}. - counter_incr(Rhc, Bucket, Key, Amt, Options) -> Qs = counter_q_params(Rhc, Options), Url = make_counter_url(Rhc, Bucket, Key, Qs), @@ -307,7 +286,6 @@ counter_incr(Rhc, Bucket, Key, Amt, Options) -> %% @doc Get the counter stored at `bucket', `key'. -spec counter_val(rhc(), term(), term()) -> {ok, integer()} | {error, term()}. - counter_val(Rhc, Bucket, Key) -> counter_val(Rhc, Bucket, Key, []). @@ -330,7 +308,6 @@ counter_val(Rhc, Bucket, Key) -> %% %% See the riak docs at http://docs.basho.com/riak/latest/references/apis/http/ fro details -spec counter_val(rhc(), term(), term(), list()) -> {ok, integer()} | {error, term()}. - counter_val(Rhc, Bucket, Key, Options) -> Qs = counter_q_params(Rhc, Options), Url = make_counter_url(Rhc, Bucket, Key, Qs), @@ -341,7 +318,6 @@ counter_val(Rhc, Bucket, Key, Options) -> {error, Error} end. -%% @equiv delete(Rhc, Bucket, Key, []) -spec delete(rhc(),{binary(), bucket()}, binary()) -> ok | {error, term()}. delete(Rhc, Bucket, Key) -> delete(Rhc, Bucket, Key, []). @@ -355,7 +331,6 @@ delete(Rhc, Bucket, Key) -> %%
`timeout'
%%
The server-side timeout for the write in ms
%% - -spec delete(rhc(), {binary(), bucket()}, key(), list()) -> ok | {error, term()}. delete(Rhc, Bucket, Key, Options) -> Qs = delete_q_params(Rhc, Options), @@ -372,17 +347,13 @@ delete(Rhc, Bucket, Key, Options) -> end. -%% @equiv delete_obj(Rhc, Obj, []) -spec delete_obj(rhc(), riakc_obj()) -> ok | {error, term()}. - delete_obj(Rhc, Obj) -> delete_obj(Rhc, Obj, []). %% @doc Delete the key of the given object, using the contained vector %% clock if present. -%% @equiv delete(Rhc, riakc_obj:bucket(Obj), riakc_obj:key(Obj), [{vclock, riakc_obj:vclock(Obj)}|Options]) -spec delete_obj(rhc(), riakc_obj(), list()) -> ok | {error, term()}. - delete_obj(Rhc, Obj, Options) -> Bucket = riakc_obj:bucket(Obj), Key = riakc_obj:key(Obj), @@ -438,8 +409,6 @@ list_keys(Rhc, Bucket) -> list_keys(Rhc, Bucket, undefined). %% @doc List the keys in the given bucket. - - -spec list_keys(rhc(),_,_) -> {error,_} | {ok,[binary()]}. list_keys(Rhc, Bucket, Timeout) -> {ok, ReqId} = stream_list_keys(Rhc, Bucket, Timeout), @@ -460,8 +429,6 @@ stream_list_keys(Rhc, Bucket) -> %%
`{error, term()}'
%%
an error occurred
%% - -%% {ok, reference()}|{error, term()} -spec stream_list_keys(rhc(),_,_) -> {error, term()} | {ok,reference()}. stream_list_keys(Rhc, Bucket, Timeout) -> ParamList0 = [{?Q_KEYS, ?Q_STREAM}, @@ -482,30 +449,22 @@ stream_list_keys(Rhc, Bucket, Timeout) -> end. %% @doc Query a secondary index. - -%% {ok, index_results()} | {error, term()} -spec get_index(rhc(), bucket(), term(), term()) -> {error, term()} | {ok, index_results()}. get_index(Rhc, Bucket, Index, Query) -> get_index(Rhc, Bucket, Index, Query, []). %% @doc Query a secondary index. - -%% {ok, index_results()} | {error, term()} -spec get_index(rhc(), bucket(), term(), term(), list()) -> {ok,index_results()} | {error, term()}. get_index(Rhc, Bucket, Index, Query, Options) -> {ok, ReqId} = stream_index(Rhc, Bucket, Index, Query, Options), rhc_index:wait_for_index(ReqId). %% @doc Query a secondary index, streaming the results back. - -%% {ok, reference()} | {error, term()} -spec stream_index(rhc(), bucket(), term(), term()) -> {ok,reference()} | {error,term()}. stream_index(Rhc, Bucket, Index, Query) -> stream_index(Rhc, Bucket, Index, Query, []). %% @doc Query a secondary index, streaming the results back. - -%% {ok, reference()} | {error, term()} -spec stream_index(rhc(),_,_,_,[{atom(),_}]) -> {error,_} | {ok,reference()}. stream_index(Rhc, Bucket, Index, Query, Options) -> ParamList = rhc_index:query_options([{stream, true}|Options]), @@ -521,7 +480,6 @@ stream_index(Rhc, Bucket, Index, Query, Options) -> end. %% @doc Get the properties of the given bucket. - -spec get_bucket(rhc(), {binary(), bucket()}) -> {ok, [proplists:property()]} | {error, term()}. get_bucket(Rhc, Bucket) -> Url = make_url(Rhc, Bucket, undefined, [{?Q_PROPS, ?Q_TRUE}, @@ -545,7 +503,6 @@ get_bucket(Rhc, Bucket) -> %%
Whether or not this bucket should allow siblings to %% be created for its keys
%% - -spec set_bucket(rhc(),_,list()) -> ok | {error,_}. set_bucket(Rhc, Bucket, Props0) -> Url = make_url(Rhc, Bucket, undefined, [{?Q_PROPS, ?Q_TRUE}]), @@ -567,7 +524,6 @@ reset_bucket(Rhc, Bucket) -> %% @doc Get the properties of the given bucket. - -spec get_bucket_type(rhc(),_) -> {error,_} | {ok,[{atom(),_}]}. get_bucket_type(Rhc, Type) -> Url = make_url(Rhc, {Type, undefined}, undefined, [{?Q_PROPS, ?Q_TRUE}, @@ -582,8 +538,6 @@ get_bucket_type(Rhc, Type) -> end. %% @doc Set the properties of the given bucket type. -%% - -spec set_bucket_type(rhc(),_,list()) -> ok | {error,_}. set_bucket_type(Rhc, Type, Props0) -> Url = make_url(Rhc, {Type, undefined}, undefined, [{?Q_PROPS, ?Q_TRUE}]), @@ -611,9 +565,6 @@ mapred(Rhc, Inputs, Query) -> %% @doc Execute a map/reduce query. See {@link %% rhc_mapred:encode_mapred/2} for details of the allowed formats %% for `Inputs' and `Query'. - -%% [rhc_mapred:query_part()], integer()) -%% -> {ok, [rhc_mapred:phase_result()]}|{error, term()} -spec mapred(rhc(), rhc_mapred:map_input(), [rhc_mapred:query_part()], integer()) -> {ok, [rhc_mapred:phase_result()]}|{error, term()}. mapred(Rhc, Inputs, Query, Timeout) -> {ok, ReqId} = mapred_stream(Rhc, Inputs, Query, self(), Timeout), @@ -637,9 +588,6 @@ mapred_stream(Rhc, Inputs, Query, ClientPid) -> %%
`{error, term()}'
%%
an error occurred
%% - -%% [rhc_mapred:query_phase()], pid(), integer()) -%% -> {ok, reference()}|{error, term()} -spec mapred_stream(rhc(), rhc_mapred:mapred_input(), [rhc_mapred:query_phase()], pid(), integer()) -> {ok, reference()}|{error, term()}. @@ -658,8 +606,6 @@ mapred_stream(Rhc, Inputs, Query, ClientPid, Timeout) -> %% @doc Execute a search query. This command will return an error %% unless executed against a Riak Search cluster. - -%% {ok, [rhc_mapred:phase_result()]}|{error, term()} -spec search(rhc(), bucket(), string()) -> {ok,[rhc_mapred:phase_result()]} | {error,term}. search(Rhc, Bucket, SearchQuery) -> %% Run a Map/Reduce operation using reduce_identity to get a list @@ -676,9 +622,6 @@ search(Rhc, Bucket, SearchQuery) -> %% query. See {@link rhc_mapred:encode_mapred/2} for details of %% the allowed formats for `MRQuery'. This command will return an error %% unless executed against a Riak Search cluster. - -%% [rhc_mapred:query_part()], integer()) -> -%% {ok, [rhc_mapred:phase_result()]}|{error, term()} -spec search(rhc(), bucket(), string(),[rhc_mapred:query_part()], integer()) -> {ok, [rhc_mapred:phase_result()]} | {error,term()}. search(Rhc, Bucket, SearchQuery, MRQuery, Timeout) -> @@ -691,9 +634,6 @@ mapred_bucket(Rhc, Bucket, Query) -> mapred_bucket(Rhc, Bucket, Query, ?DEFAULT_HTTP_TIMEOUT). %% @doc Execute a map/reduce query over all keys in the given bucket. - -%% integer()) -%% -> {ok, [rhc_mapred:phase_result()]}|{error, term()} -spec mapred_bucket(rhc(), bucket(), [rhc_mapred:query_phase()], integer()) -> {ok, [rhc_mapred:phase_result()]}|{error, term()}. mapred_bucket(Rhc, Bucket, Query, Timeout) -> {ok, ReqId} = mapred_bucket_stream(Rhc, Bucket, Query, self(), Timeout), @@ -701,9 +641,6 @@ mapred_bucket(Rhc, Bucket, Query, Timeout) -> %% @doc Stream map/reduce results over all keys in a bucket to a Pid. %% Similar to {@link mapred_stream/5} - -%% [rhc_mapred:query_phase()], pid(), integer()) -%% -> {ok, reference()}|{error, term()} -spec mapred_bucket_stream(rhc(), bucket(), [rhc_mapred:query_phase()], pid(), integer()) -> {ok, reference()}|{error, term()}. @@ -778,7 +715,6 @@ update_type(Rhc, BucketAndType, Key, {Type, Op, Context}, Options) -> -spec modify_type(rhc(), fun((riakc_datatype:datatype()) -> riakc_datatype:datatype()), {BucketType::binary(), bucket()}, key(), [proplists:property()]) -> ok | {ok, riakc_datatype:datatype()} | {error, term()}. - modify_type(Rhc, Fun, BucketAndType, Key, Options) -> Create = proplists:get_value(create, Options, true), case fetch_type(Rhc, BucketAndType, Key, Options) of @@ -800,7 +736,6 @@ modify_type(Rhc, Fun, BucketAndType, Key, Options) -> %% @doc Get the client ID to use, given the passed options and client. %% Choose the client ID in Options before the one in the client. - -spec client_id(rhc(), list()) -> any(). client_id(#rhc{options=RhcOptions}, Options) -> case proplists:get_value(client_id, Options) of @@ -811,7 +746,6 @@ client_id(#rhc{options=RhcOptions}, Options) -> end. %% @doc Generate a random client ID. - -spec random_client_id() -> string(). random_client_id() -> {{Y,Mo,D},{H,Mi,S}} = erlang:universaltime(), @@ -820,7 +754,6 @@ random_client_id() -> base64:encode_to_string(<>). %% @doc Assemble the root URL for the given client - -spec root_url(rhc()) -> iolist(). root_url(#rhc{ip=Ip, port=Port, options=Opts}) -> Proto = case proplists:get_value(is_ssl, Opts) of @@ -832,19 +765,16 @@ root_url(#rhc{ip=Ip, port=Port, options=Opts}) -> [Proto, "://",Ip,":",integer_to_list(Port),"/"]. %% @doc Assemble the URL for the map/reduce resource - -spec mapred_url(rhc()) -> iolist(). mapred_url(Rhc) -> binary_to_list(iolist_to_binary([root_url(Rhc), "mapred/?chunked=true"])). %% @doc Assemble the URL for the ping resource - -spec ping_url(rhc()) -> iolist(). ping_url(Rhc) -> binary_to_list(iolist_to_binary([root_url(Rhc), "ping/"])). %% @doc Assemble the URL for the stats resource - -spec stats_url(rhc()) -> iolist(). stats_url(Rhc) -> binary_to_list(iolist_to_binary([root_url(Rhc), "stats/"])). @@ -888,7 +818,6 @@ index_name(Idx) -> Idx. %% @doc Assemble the URL for the given bucket and key - -spec make_url(rhc(), {binary(),bucket() | undefined}, key() | undefined, list()) -> iolist(). make_url(Rhc=#rhc{}, BucketAndType, Key, Query) -> {Type, Bucket} = extract_bucket_type(BucketAndType), @@ -1002,7 +931,6 @@ update_type_q_params(Rhc, Options) -> %% @doc Extract the options for the given `Keys' from the possible %% list of `Options'. -%% proplist()) -> proplist() -spec options_list(list(),_) -> list(). options_list(Keys, Options) -> options_list(Keys, Options, []). From fa2d69c40875706109ccc0fa1f7075cb60cd0056 Mon Sep 17 00:00:00 2001 From: Drew Date: Fri, 6 Dec 2013 23:13:13 -0500 Subject: [PATCH 4/4] Fixed unused variable in the typer target of Makefile. Changed eopts in rebar.config to debug_info and warnings_as_errors to align with Basho standard practices. --- Makefile | 5 +++-- rebar.config | 7 +------ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/Makefile b/Makefile index 8a474d5..ecf4e09 100644 --- a/Makefile +++ b/Makefile @@ -18,6 +18,7 @@ doc: COMBO_PLT = $(HOME)/.rhc_dialyzer_plt APPS = kernel stdlib sasl erts eunit INCLUDES = -I include -I deps + check_plt: all dialyzer --check_plt --plt $(COMBO_PLT) --apps $(APPS) deps/*/ebin @@ -32,13 +33,13 @@ dialyzer: all @sleep 1 dialyzer --verbose -Wno_return --plt $(COMBO_PLT) $(INCLUDES) ebin -typer: $(DEPSOLVER_PLT) +typer: typer --plt $(COMBO_PLT) $(INCLUDES) -r src plt_info: dialyzer --plt $(COMBO_PLT) --plt_info -cleanplt: +clean_plt: @echo @echo "Are you sure? It takes time to re-build." @echo Deleting $(COMBO_PLT) in 5 seconds. diff --git a/rebar.config b/rebar.config index 2cacd10..c362be9 100644 --- a/rebar.config +++ b/rebar.config @@ -1,10 +1,5 @@ {erl_opts, [debug_info, - warn_obsolete_guard, - warn_unused_import, - warn_shadow_vars, - warn_export_vars, - warn_export_all]}. - + warnings_as_errors]}. {deps, [ %% ibrowse for doing HTTP requests