From 6637ce48d92b40f0d37a286b22b4f0a4cc630461 Mon Sep 17 00:00:00 2001 From: Dhruv Jain <51796498+dhruvjain99@users.noreply.github.com> Date: Tue, 14 May 2024 13:55:03 +0530 Subject: [PATCH] Fix offline store choking by bypassing plugin for redis ops (#63) * chore: instrument plugin call until redis call * spike: bypass plugin for offline store * fix: offline store choking due to blocking redis call through plugin * chore: format changes * fix: msg store read fn dialyzer error * fix: username pass type in vmq_message_store * chore:fmt changes --- .../priv/vmq_generic_msg_store.schema | 13 - apps/vmq_generic_msg_store/rebar.config | 11 - .../src/engines/vmq_storage_engine_ets.erl | 57 -- .../engines/vmq_storage_engine_leveldb.erl | 176 ------ .../src/engines/vmq_storage_engine_no_op.erl | 17 - .../src/vmq_generic_msg_store.app.src | 27 - .../src/vmq_generic_msg_store.erl | 531 ------------------ .../src/vmq_generic_msg_store.hrl | 5 - .../src/vmq_generic_msg_store_app.erl | 34 -- .../src/vmq_generic_msg_store_sup.erl | 114 ---- .../src/vmq_generic_msg_store_utils.erl | 107 ---- .../test/vmq_generic_msg_store_SUITE.erl | 321 ----------- .../priv/vmq_generic_offline_msg_store.schema | 42 -- .../rebar.config | 9 - .../vmq_offline_storage_engine_postgres.erl | 115 ---- .../vmq_offline_storage_engine_redis.erl | 55 -- .../src/vmq_generic_offline_msg_store.app.src | 34 -- .../src/vmq_generic_offline_msg_store.erl | 204 ------- .../src/vmq_generic_offline_msg_store_app.erl | 16 - .../src/vmq_generic_offline_msg_store_sup.erl | 36 -- .../vmq_generic_offline_msg_store_SUITE.erl | 129 ----- apps/vmq_server/priv/vmq_server.schema | 26 +- apps/vmq_server/src/vmq_message_store.erl | 234 +++----- apps/vmq_server/src/vmq_metrics.erl | 49 -- apps/vmq_server/src/vmq_queue.erl | 27 +- apps/vmq_server/src/vmq_server_app.erl | 3 +- apps/vmq_server/test/vmq_test_utils.erl | 3 - rebar.config | 6 - 28 files changed, 109 insertions(+), 2292 deletions(-) delete mode 100644 apps/vmq_generic_msg_store/priv/vmq_generic_msg_store.schema delete mode 100644 apps/vmq_generic_msg_store/rebar.config delete mode 100644 apps/vmq_generic_msg_store/src/engines/vmq_storage_engine_ets.erl delete mode 100644 apps/vmq_generic_msg_store/src/engines/vmq_storage_engine_leveldb.erl delete mode 100644 apps/vmq_generic_msg_store/src/engines/vmq_storage_engine_no_op.erl delete mode 100644 apps/vmq_generic_msg_store/src/vmq_generic_msg_store.app.src delete mode 100644 apps/vmq_generic_msg_store/src/vmq_generic_msg_store.erl delete mode 100644 apps/vmq_generic_msg_store/src/vmq_generic_msg_store.hrl delete mode 100644 apps/vmq_generic_msg_store/src/vmq_generic_msg_store_app.erl delete mode 100644 apps/vmq_generic_msg_store/src/vmq_generic_msg_store_sup.erl delete mode 100644 apps/vmq_generic_msg_store/src/vmq_generic_msg_store_utils.erl delete mode 100644 apps/vmq_generic_msg_store/test/vmq_generic_msg_store_SUITE.erl delete mode 100644 apps/vmq_generic_offline_msg_store/priv/vmq_generic_offline_msg_store.schema delete mode 100644 apps/vmq_generic_offline_msg_store/rebar.config delete mode 100644 apps/vmq_generic_offline_msg_store/src/engines/vmq_offline_storage_engine_postgres.erl delete mode 100644 apps/vmq_generic_offline_msg_store/src/engines/vmq_offline_storage_engine_redis.erl delete mode 100644 apps/vmq_generic_offline_msg_store/src/vmq_generic_offline_msg_store.app.src delete mode 100644 apps/vmq_generic_offline_msg_store/src/vmq_generic_offline_msg_store.erl delete mode 100644 apps/vmq_generic_offline_msg_store/src/vmq_generic_offline_msg_store_app.erl delete mode 100644 apps/vmq_generic_offline_msg_store/src/vmq_generic_offline_msg_store_sup.erl delete mode 100644 apps/vmq_generic_offline_msg_store/test/vmq_generic_offline_msg_store_SUITE.erl diff --git a/apps/vmq_generic_msg_store/priv/vmq_generic_msg_store.schema b/apps/vmq_generic_msg_store/priv/vmq_generic_msg_store.schema deleted file mode 100644 index fa1a9b46b..000000000 --- a/apps/vmq_generic_msg_store/priv/vmq_generic_msg_store.schema +++ /dev/null @@ -1,13 +0,0 @@ -%% -*- mode: erlang -*- -%% ex: ft=erlang - -{mapping, "leveldb_message_store.directory", "vmq_generic_msg_store.msg_store_opts.store_dir", [ - {default, "{{platform_data_dir}}/msgstore"}, - {datatype, directory}, - hidden - ]}. - -{mapping, "generic_message_store_engine", "vmq_generic_msg_store.msg_store_engine", - [{default, vmq_storage_engine_leveldb}, - {datatype, atom} - ]}. diff --git a/apps/vmq_generic_msg_store/rebar.config b/apps/vmq_generic_msg_store/rebar.config deleted file mode 100644 index dd38d8508..000000000 --- a/apps/vmq_generic_msg_store/rebar.config +++ /dev/null @@ -1,11 +0,0 @@ -{erl_opts, [debug_info, {parse_transform, lager_transform}]}. -{deps, [ - lager, - {eleveldb, {git, "https://github.com/basho/eleveldb.git", {branch, "develop"}}}, - {sext, "1.5.0"} -]}. - -{overrides, [ - {override, sext, [{src_dirs, ["src"]}]} -]}. -{cover_enabled, true}. diff --git a/apps/vmq_generic_msg_store/src/engines/vmq_storage_engine_ets.erl b/apps/vmq_generic_msg_store/src/engines/vmq_storage_engine_ets.erl deleted file mode 100644 index 9189fe484..000000000 --- a/apps/vmq_generic_msg_store/src/engines/vmq_storage_engine_ets.erl +++ /dev/null @@ -1,57 +0,0 @@ -%% Copyright 2019 Octavo Labs AG Zurich Switzerland (http://octavolabs.com) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(vmq_storage_engine_ets). - --export([open/2, close/1, write/2, read/2, fold/3, fold/4]). - --record(state, {ref}). - -open(_DataRoot, _Opts) -> - Tid = ets:new(?MODULE, [public, ordered_set]), - {ok, #state{ref=Tid}}. - -close(#state{ref=Ref}) -> - ets:delete(Ref). - -write(#state{ref=Ref}, WriteOps) -> - lists:foreach(fun({put, Key, Val}) -> - ets:insert(Ref, {Key, Val}); - ({delete, Key}) -> - ets:delete(Ref, Key) - end, WriteOps). - -read(#state{ref=Ref}, Key) -> - case ets:lookup(Ref, Key) of - [{Key, Val}] -> {ok, Val}; - [] -> not_found - end. - -fold(#state{ref=Ref}, Fun, Acc) -> - fold_iterate(ets:first(Ref), Ref, Fun, Acc). - -fold(#state{ref=Ref}, Fun, Acc, FirstKey) -> - fold_iterate(ets:next(Ref, FirstKey), Ref, Fun, Acc). - -fold_iterate('$end_of_table', _Ref, _Fun, Acc) -> - Acc; -fold_iterate(Key, Tab, Fun, Acc0) -> - [{Key, Value}] = ets:lookup(Tab, Key), - try Fun(Key, Value, Acc0) of - Acc1 -> - fold_iterate(ets:next(Tab, Key), Tab, Fun, Acc1) - catch - throw:_Throw -> - Acc0 - end. diff --git a/apps/vmq_generic_msg_store/src/engines/vmq_storage_engine_leveldb.erl b/apps/vmq_generic_msg_store/src/engines/vmq_storage_engine_leveldb.erl deleted file mode 100644 index 7d7f22f68..000000000 --- a/apps/vmq_generic_msg_store/src/engines/vmq_storage_engine_leveldb.erl +++ /dev/null @@ -1,176 +0,0 @@ -%% Copyright 2019 Octavo Labs AG Zurich Switzerland (http://octavolabs.com) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(vmq_storage_engine_leveldb). - --export([open/2, close/1, write/2, read/2, fold/3, fold/4]). - --record(state, { - ref :: undefined | eleveldb:db_ref(), - data_root :: string(), - open_opts = [], - config :: config(), - read_opts = [], - write_opts = [], - fold_opts = [{fill_cache, false}] - }). - --type state() :: #state{}. --type config() :: [{atom(), term()}]. --type key() :: binary(). --type value() :: binary(). --type write_op() :: {put, key(), value()} | {delete, key()}. - -% API - -open(DataRoot, Opts) -> - RetriesLeft = proplists:get_value(open_retries, Opts, 30), - State = init_state(DataRoot, Opts), - open_db(Opts, State, max(1, RetriesLeft), undefined). - --spec write(state(), [write_op()]) -> ok. -write(#state{ref=EngineRef, write_opts=WriteOpts}, WriteOps) -> - eleveldb:write(EngineRef, WriteOps, WriteOpts). - --spec read(state(), key()) -> {ok, value()} | not_found. -read(#state{ref=EngineRef, read_opts=ReadOpts}, Key) -> - eleveldb:get(EngineRef, Key, ReadOpts). - -fold(#state{ref=EngineRef, fold_opts=FoldOpts}, Fun, Acc) -> - {ok, Itr} = eleveldb:iterator(EngineRef, FoldOpts), - fold_iterate(eleveldb:iterator_move(Itr, first), Itr, Fun, Acc). - -fold(#state{ref=EngineRef, fold_opts=FoldOpts}, Fun, Acc, FirstKey) -> - {ok, Itr} = eleveldb:iterator(EngineRef, FoldOpts), - fold_iterate(eleveldb:iterator_move(Itr, FirstKey), Itr, Fun, Acc). - -fold_iterate({error, _}, _Itr, _Fun, Acc) -> - %% no need to close the iterator - Acc; -fold_iterate({ok, Key, Value}, Itr, Fun, Acc0) -> - try Fun(Key, Value, Acc0) of - Acc1 -> - fold_iterate(eleveldb:iterator_move(Itr, prefetch), Itr, Fun, Acc1) - catch - throw:_Throw -> - eleveldb:iterator_close(Itr), - Acc0 - end. - -close(#state{ref=EngineRef}) -> - eleveldb:close(EngineRef). - - -% Internal -init_state(DataRoot, Config) -> - %% Get the data root directory - filelib:ensure_dir(filename:join(DataRoot, "msg_store_dummy")), - - %% Merge the proplist passed in from Config with any values specified by the - %% eleveldb app level; precedence is given to the Config. - MergedConfig = orddict:merge(fun(_K, VLocal, _VGlobal) -> VLocal end, - orddict:from_list(Config), % Local - orddict:from_list(application:get_all_env(eleveldb))), % Global - - %% Use a variable write buffer size in order to reduce the number - %% of vnodes that try to kick off compaction at the same time - %% under heavy uniform load... - WriteBufferMin = config_value(write_buffer_size_min, MergedConfig, 30 * 1024 * 1024), - WriteBufferMax = config_value(write_buffer_size_max, MergedConfig, 60 * 1024 * 1024), - WriteBufferSize = WriteBufferMin + rand:uniform(1 + WriteBufferMax - WriteBufferMin), - - %% Update the write buffer size in the merged config and make sure create_if_missing is set - %% to true - FinalConfig = orddict:store(write_buffer_size, WriteBufferSize, - orddict:store(create_if_missing, true, MergedConfig)), - - %% Parse out the open/read/write options - {OpenOpts, _BadOpenOpts} = eleveldb:validate_options(open, FinalConfig), - {ReadOpts, _BadReadOpts} = eleveldb:validate_options(read, FinalConfig), - {WriteOpts, _BadWriteOpts} = eleveldb:validate_options(write, FinalConfig), - - %% Use read options for folding, but FORCE fill_cache to false - FoldOpts = lists:keystore(fill_cache, 1, ReadOpts, {fill_cache, false}), - - %% Warn if block_size is set - SSTBS = proplists:get_value(sst_block_size, OpenOpts, false), - BS = proplists:get_value(block_size, OpenOpts, false), - case BS /= false andalso SSTBS == false of - true -> - lager:warning("eleveldb block_size has been renamed sst_block_size " - "and the current setting of ~p is being ignored. " - "Changing sst_block_size is strongly cautioned " - "against unless you know what you are doing. Remove " - "block_size from app.config to get rid of this " - "message.\n", [BS]); - _ -> - ok - end, - - %% Generate a debug message with the options we'll use for each operation - lager:debug("datadir ~s options for LevelDB: ~p\n", - [DataRoot, [{open, OpenOpts}, {read, ReadOpts}, {write, WriteOpts}, {fold, FoldOpts}]]), - #state { data_root = DataRoot, - open_opts = OpenOpts, - read_opts = ReadOpts, - write_opts = WriteOpts, - fold_opts = FoldOpts, - config = FinalConfig }. - -config_value(Key, Config, Default) -> - case orddict:find(Key, Config) of - error -> - Default; - {ok, Value} -> - Value - end. - -open_db(_Opts, _State0, 0, LastError) -> - {error, LastError}; -open_db(Opts, State0, RetriesLeft, _) -> - DataRoot = State0#state.data_root, - case eleveldb:open(DataRoot, State0#state.open_opts) of - {ok, Ref} -> - lager:info("Opening LevelDB database at ~p~n", [DataRoot]), - {ok, State0#state { ref = Ref }}; - %% Check specifically for lock error, this can be caused if - %% a crashed instance takes some time to flush leveldb information - %% out to disk. The process is gone, but the NIF resource cleanup - %% may not have completed. - {error, {db_open, OpenErr}=Reason} -> - case lists:prefix("Corruption: truncated record ", OpenErr) of - true -> - lager:info("VerneMQ LevelDB Message Store backend repair attempt for store ~p, after error ~s. LevelDB will put unusable .log and MANIFEST filest in 'lost' folder.\n", - [DataRoot, OpenErr]), - case eleveldb:repair(DataRoot, []) of - ok -> % LevelDB will put unusable .log and MANIFEST files in 'lost' folder. - open_db(Opts, State0, RetriesLeft - 1, Reason); - {error, Reason} -> {error, Reason} - end; - - false -> - case lists:prefix("IO error: lock ", OpenErr) of - true -> - SleepFor = proplists:get_value(open_retry_delay, Opts, 2000), - lager:info("VerneMQ LevelDB Message Store backend retrying ~p in ~p ms after error ~s\n", - [DataRoot, SleepFor, OpenErr]), - timer:sleep(SleepFor), - open_db(Opts, State0, RetriesLeft - 1, Reason); - false -> - {error, Reason} - end - end; - {error, Reason} -> - {error, Reason} - end. \ No newline at end of file diff --git a/apps/vmq_generic_msg_store/src/engines/vmq_storage_engine_no_op.erl b/apps/vmq_generic_msg_store/src/engines/vmq_storage_engine_no_op.erl deleted file mode 100644 index 9d1f0ba48..000000000 --- a/apps/vmq_generic_msg_store/src/engines/vmq_storage_engine_no_op.erl +++ /dev/null @@ -1,17 +0,0 @@ --module(vmq_storage_engine_no_op). - --export([open/2, close/1, write/2, read/2, fold/3, fold/4]). - --record(state, {ref}). - -open(_, _) -> {ok, #state{}}. - -close(_) -> true. - -write(_, _) -> {error, no_op}. - -read(_, _) -> not_found. - -fold(_, _, Acc) -> Acc. - -fold(_, _, Acc, _) -> Acc. diff --git a/apps/vmq_generic_msg_store/src/vmq_generic_msg_store.app.src b/apps/vmq_generic_msg_store/src/vmq_generic_msg_store.app.src deleted file mode 100644 index adcb34748..000000000 --- a/apps/vmq_generic_msg_store/src/vmq_generic_msg_store.app.src +++ /dev/null @@ -1,27 +0,0 @@ -{application, vmq_generic_msg_store, [ - {description, "A VerneMQ plugin that sets up LevelDB as message storage"}, - {vsn, git}, - {registered, []}, - {mod, {vmq_generic_msg_store_app, []}}, - {applications, [ - kernel, - stdlib, - sasl, - lager - ]}, - {env, [ - {vmq_plugin_hooks, [ - {vmq_generic_msg_store, msg_store_write, 2, [internal]}, - {vmq_generic_msg_store, msg_store_delete, 2, [internal]}, - {vmq_generic_msg_store, msg_store_find, 2, [internal]}, - {vmq_generic_msg_store, msg_store_read, 2, [internal]} - ]}, - {msg_store_engine, vmq_storage_engine_leveldb}, - {msg_store_opts, [ - {store_dir, "./data/msgstore"}, - {open_retries, 30}, - {open_retry_delay, 2000} - ]} - ]}, - {modules, []} -]}. diff --git a/apps/vmq_generic_msg_store/src/vmq_generic_msg_store.erl b/apps/vmq_generic_msg_store/src/vmq_generic_msg_store.erl deleted file mode 100644 index f62bb91c1..000000000 --- a/apps/vmq_generic_msg_store/src/vmq_generic_msg_store.erl +++ /dev/null @@ -1,531 +0,0 @@ -%% Copyright 2018 Erlio GmbH Basel Switzerland (http://erl.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(vmq_generic_msg_store). --include("vmq_generic_msg_store.hrl"). --behaviour(gen_server). - -%% API --export([ - start_link/1, - msg_store_write/2, - msg_store_read/2, - msg_store_delete/2, - msg_store_find/2, - get_engine/1, - refcount/1, - get_state/1 -]). - --export([msg_store_init_queue_collector/4]). - --export([ - parse_p_idx_val_pre/1, - parse_p_msg_val_pre/1, - serialize_p_idx_val_pre/1, - serialize_p_msg_val_pre/1 -]). - -%% gen_server callbacks --export([ - init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3 -]). - --record(state, { - engine, - engine_module, - refs = ets:new(?MODULE, []) -}). - --define(P_IDX_PRE, 0). --define(P_MSG_PRE, 0). - -%% Subsequent formats should always extend by adding new elements to -%% the end of the record or tuple. --type p_msg_val_pre() :: {routing_key(), payload()}. --record(p_idx_val, { - ts :: erlang:timestamp(), - dup :: flag(), - qos :: qos() -}). --type p_idx_val_pre() :: #p_idx_val{}. - -%%%=================================================================== -%%% API -%%%=================================================================== -start_link(Id) -> - gen_server:start_link(?MODULE, [Id], []). - -msg_store_write(SubscriberId, #vmq_msg{msg_ref = MsgRef} = Msg) -> - call(MsgRef, {write, SubscriberId, Msg}). - -msg_store_delete(SubscriberId, MsgRef) -> - call(MsgRef, {delete, SubscriberId, MsgRef}). - -msg_store_read(SubscriberId, MsgRef) -> - call(MsgRef, {read, SubscriberId, MsgRef}). - -%% We differentiate between queue_init and other as queue_init must -%% not be called concurrently for the same subscriber. -msg_store_find(SubscriberId, Type) when - Type =:= queue_init; - Type =:= other --> - Ref = make_ref(), - {Pid, MRef} = spawn_monitor( - ?MODULE, - msg_store_init_queue_collector, - [self(), SubscriberId, Ref, Type] - ), - receive - {'DOWN', MRef, process, Pid, Reason} -> - {error, Reason}; - {Pid, Ref, MsgRefs} -> - demonitor(MRef, [flush]), - {ok, MsgRefs} - end. - -msg_store_init_queue_collector(ParentPid, SubscriberId, Ref, queue_init) -> - MsgRefs = - case msg_store_init_from_tbl_with_instrumentation(init, SubscriberId) of - [] -> - init_from_disk_with_instrumentation(SubscriberId); - Res -> - Res - end, - ParentPid ! {self(), Ref, MsgRefs}; -msg_store_init_queue_collector(ParentPid, SubscriberId, Ref, other) -> - MsgRefs = init_from_disk_with_instrumentation(SubscriberId), - ParentPid ! {self(), Ref, MsgRefs}. - -init_from_disk_with_instrumentation(SubscriberId) -> - V1 = vmq_util:ts(), - MsgRefs = init_from_disk(SubscriberId), - vmq_metrics:pretimed_measurement( - {vmq_generic_message_store, init_from_disk}, vmq_util:ts() - V1 - ), - MsgRefs. - -init_from_disk(SubscriberId) -> - TblIdxRef = make_ref(), - Pids = vmq_generic_msg_store_sup:get_bucket_pids(), - ok = msg_store_collect(TblIdxRef, SubscriberId, Pids), - msg_store_init_from_tbl_with_instrumentation(TblIdxRef, SubscriberId). - -msg_store_init_from_tbl_with_instrumentation(Prefix, SubscriberId) -> - V1 = vmq_util:ts(), - MsgRefs = msg_store_init_from_tbl(Prefix, SubscriberId), - vmq_metrics:pretimed_measurement( - {vmq_generic_message_store, msg_store_init_from_tbl}, vmq_util:ts() - V1 - ), - MsgRefs. - -msg_store_init_from_tbl(Prefix, SubscriberId) -> - MS = ets:fun2ms( - fun({{P, S, _TS, MsgRef}}) when P =:= Prefix, S =:= SubscriberId -> - MsgRef - end - ), - MSDel = ets:fun2ms( - fun({{P, S, _TS, _MsgRef}}) when P =:= Prefix, S =:= SubscriberId -> - true - end - ), - Table = select_table(SubscriberId), - MsgRefs = ets:select(Table, MS), - _Deleted = ets:select_delete(Table, MSDel), - MsgRefs. - -msg_store_collect(_Ref, _, []) -> - ok; -msg_store_collect(Ref, SubscriberId, [Pid | Rest]) -> - ok = - try - gen_server:call(Pid, {find_for_subscriber_id, Ref, SubscriberId}, infinity) - catch - {'EXIT', {noproc, _}} -> - ok - end, - msg_store_collect(Ref, SubscriberId, Rest). - -get_engine(BucketPid) -> - gen_server:call(BucketPid, get_engine). - -refcount(MsgRef) -> - call(MsgRef, {refcount, MsgRef}). - -get_state(Pid) -> - gen_server:call(Pid, get_state, infinity). - -call(Key, Req) -> - case vmq_generic_msg_store_sup:get_bucket_pid(Key) of - {ok, BucketPid} -> - gen_server:call(BucketPid, Req, infinity); - {error, Reason} -> - {error, Reason} - end. - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Initializes the server -%% -%% @spec init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% @end -%%-------------------------------------------------------------------- -init([InstanceId]) -> - %% Initialize random seed - rand:seed(exsplus, os:timestamp()), - - {ok, EngineModule} = application:get_env(vmq_generic_msg_store, msg_store_engine), - Opts = application:get_env(vmq_generic_msg_store, msg_store_opts, []), - DataDir1 = proplists:get_value(store_dir, Opts, "data/msgstore"), - DataDir2 = filename:join(DataDir1, integer_to_list(InstanceId)), - - process_flag(trap_exit, true), - case apply(EngineModule, open, [DataDir2, Opts]) of - {ok, EngineState} -> - self() ! {initialize_from_storage, InstanceId}, - {ok, #state{engine = EngineState, engine_module = EngineModule}}; - {error, Reason} -> - {stop, Reason} - end. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling call messages -%% -%% @spec handle_call(Request, From, State) -> -%% {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% @end -%%-------------------------------------------------------------------- -handle_call(get_engine, _From, #state{engine = EngineState, engine_module = EngineModule} = State) -> - {reply, {EngineModule, EngineState}, State}; -handle_call({refcount, MsgRef}, _From, State) -> - RefCount = - case ets:lookup(State#state.refs, MsgRef) of - [] -> 0; - [{_, Cnt}] -> Cnt - end, - {reply, RefCount, State}; -handle_call(get_state, _From, State) -> - %% when called externally, the store is always initialized, so - %% just return that here. - {reply, initialized, State}; -handle_call(Request, _From, State) -> - {reply, handle_req(Request, State), State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling cast messages -%% -%% @spec handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% @end -%%-------------------------------------------------------------------- -handle_cast(_Request, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling all non call/cast messages -%% -%% @spec handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% @end -%%-------------------------------------------------------------------- -handle_info({initialize_from_storage, InstanceId}, State) -> - case setup_index(State) of - 0 -> - ok; - N -> - lager:info( - "indexed ~p offline messages in msg store instance ~p", - [N, InstanceId] - ) - end, - %% Register Bucket Instance with the Bucket Registry - vmq_generic_msg_store_sup:register_bucket_pid(InstanceId, self()), - {noreply, State}; -handle_info(_Info, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. -%% -%% @spec terminate(Reason, State) -> void() -%% @end -%%-------------------------------------------------------------------- -terminate(_Reason, #state{engine = EngineState, engine_module = EngineModule}) -> - apply(EngineModule, close, [EngineState]), - ok. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Convert process state when code is changed -%% -%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} -%% @end -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== -handle_req( - {write, {MP, _} = SubscriberId, #vmq_msg{ - msg_ref = MsgRef, - mountpoint = MP, - dup = Dup, - qos = QoS, - routing_key = RoutingKey, - payload = Payload - }}, - #state{engine = EngineState, engine_module = EngineModule, refs = Refs} -) -> - MsgKey = sext:encode({msg, MsgRef, {MP, ''}}), - IdxKey = sext:encode({idx, SubscriberId, MsgRef}), - IdxVal = serialize_p_idx_val_pre(#p_idx_val{ts = os:timestamp(), dup = Dup, qos = QoS}), - case incr_ref(Refs, MsgRef) of - 1 -> - %% new message - Val = serialize_p_msg_val_pre({RoutingKey, Payload}), - apply(EngineModule, write, [ - EngineState, - [ - {put, MsgKey, Val}, - {put, IdxKey, IdxVal} - ] - ]); - _ -> - %% only write the idx - apply(EngineModule, write, [EngineState, [{put, IdxKey, IdxVal}]]) - end; -handle_req( - {read, {MP, _} = SubscriberId, MsgRef}, - #state{engine = EngineState, engine_module = EngineModule} -) -> - MsgKey = sext:encode({msg, MsgRef, {MP, ''}}), - IdxKey = sext:encode({idx, SubscriberId, MsgRef}), - case apply(EngineModule, read, [EngineState, MsgKey]) of - {ok, Val} -> - {RoutingKey, Payload} = parse_p_msg_val_pre(Val), - case apply(EngineModule, read, [EngineState, IdxKey]) of - {ok, IdxVal} -> - #p_idx_val{dup = Dup, qos = QoS} = parse_p_idx_val_pre(IdxVal), - Msg = #vmq_msg{ - msg_ref = MsgRef, - mountpoint = MP, - dup = Dup, - qos = QoS, - routing_key = RoutingKey, - payload = Payload, - persisted = true - }, - {ok, Msg}; - not_found -> - {error, idx_val_not_found} - end; - not_found -> - {error, not_found} - end; -handle_req( - {delete, {MP, _} = SubscriberId, MsgRef}, - #state{refs = Refs, engine = EngineState, engine_module = EngineModule} -) -> - MsgKey = sext:encode({msg, MsgRef, {MP, ''}}), - IdxKey = sext:encode({idx, SubscriberId, MsgRef}), - case decr_ref(Refs, MsgRef) of - not_found -> - lager:warning("delete failed ~p due to not found", [MsgRef]); - 0 -> - %% last one to be deleted - apply(EngineModule, write, [ - EngineState, - [ - {delete, IdxKey}, - {delete, MsgKey} - ] - ]); - _ -> - %% we have to keep the message, but can delete the idx - apply(EngineModule, write, [EngineState, [{delete, IdxKey}]]) - end; -handle_req( - {find_for_subscriber_id, TblIdxRef, SubscriberId}, - #state{engine = EngineState, engine_module = EngineModule} -) -> - FirstIdxKey = sext:encode({idx, SubscriberId, ''}), - apply( - EngineModule, - fold, - [ - EngineState, - fun(IdxKey, IdxVal, _Acc) -> - case sext:decode(IdxKey) of - {idx, SubscriberId, MsgRef} -> - #p_idx_val{ts = TS} = parse_p_idx_val_pre(IdxVal), - Table = select_table(SubscriberId), - true = ets:insert(Table, {{TblIdxRef, SubscriberId, TS, MsgRef}}); - _ -> - %% all message refs accumulated for this subscriber - throw(finished) - end - end, - ignore, - FirstIdxKey - ] - ), - ok. - -setup_index(#state{engine = EngineState, engine_module = EngineModule, refs = Refs}) -> - FirstIdxKey = sext:encode({idx, '', ''}), - apply( - EngineModule, - fold, - [ - EngineState, - fun(Key, IdxVal, N) -> - case sext:decode(Key) of - {idx, SubscriberId, MsgRef} -> - #p_idx_val{ts = TS} = parse_p_idx_val_pre(IdxVal), - Table = select_table(SubscriberId), - true = ets:insert(Table, {{init, SubscriberId, TS, MsgRef}}), - incr_ref(Refs, MsgRef), - N + 1; - _ -> - throw(finished) - end - end, - 0, - FirstIdxKey - ] - ). - -incr_ref(Refs, MsgRef) -> - case ets:insert_new(Refs, {MsgRef, 1}) of - true -> 1; - false -> ets:update_counter(Refs, MsgRef, 1) - end. - -decr_ref(Refs, MsgRef) -> - try ets:update_counter(Refs, MsgRef, -1) of - 0 -> - ets:delete(Refs, MsgRef), - 0; - V -> - V - catch - _:_ -> - not_found - end. - -select_table(SubscriberId) -> - persistent_term:get({?TBL_MSG_INIT, erlang:phash2(SubscriberId, ?NR_OF_BUCKETS) + 1}). - -%% pre version idx: -%% {p_idx_val, ts, dup, qos} -%% future version: -%% {p_idx_val, vesion, ts, dup, qos, ...} - -%% current version of the index value --spec parse_p_idx_val_pre(binary()) -> p_idx_val_pre(). -parse_p_idx_val_pre(BinTerm) -> - parse_p_idx_val_pre_(binary_to_term(BinTerm)). - -parse_p_idx_val_pre_({TS, Dup, QoS}) -> - #p_idx_val{ts = TS, dup = Dup, qos = QoS}; -%% newer versions of the store -> downgrade -parse_p_idx_val_pre_(T) when - element(1, T) =:= p_idx_val, - is_integer(element(2, T)), - element(2, T) > ?P_IDX_PRE --> - TS = element(3, T), - Dup = element(4, T), - QoS = element(5, T), - #p_idx_val{ts = TS, dup = Dup, qos = QoS}. - -%% current version of the index value --spec serialize_p_idx_val_pre(p_idx_val_pre()) -> binary(). -serialize_p_idx_val_pre(#p_idx_val{ts = TS, dup = Dup, qos = QoS}) -> - term_to_binary({TS, Dup, QoS}); -serialize_p_idx_val_pre(T) when - element(1, T) =:= p_idx_val, - is_integer(element(2, T)), - element(2, T) > ?P_MSG_PRE --> - term_to_binary( - {element(3, T), element(4, T), element(5, T)} - ). - -%% pre msg version: -%% {routing_key, payload} -%% future version: -%% {version, routing_key, payload, ...} - -%% parse messages to message type from before versioning. --spec parse_p_msg_val_pre(binary()) -> p_msg_val_pre(). -parse_p_msg_val_pre(BinTerm) -> - parse_p_msg_val_pre_(binary_to_term(BinTerm)). - -parse_p_msg_val_pre_({RoutingKey, Payload}) -> - {RoutingKey, Payload}; -%% newer version of the msg value -parse_p_msg_val_pre_(T) when - is_integer(element(1, T)), - element(1, T) > ?P_MSG_PRE --> - {element(2, T), element(3, T)}. - -%% current version of the msg value --spec serialize_p_msg_val_pre(p_msg_val_pre()) -> binary(). -serialize_p_msg_val_pre({_RoutingKey, _Payload} = T) -> - term_to_binary(T); -serialize_p_msg_val_pre(T) when - is_integer(element(1, T)), - element(1, T) > ?P_MSG_PRE --> - term_to_binary( - {element(2, T), element(3, T)} - ). diff --git a/apps/vmq_generic_msg_store/src/vmq_generic_msg_store.hrl b/apps/vmq_generic_msg_store/src/vmq_generic_msg_store.hrl deleted file mode 100644 index ce1e9f8a9..000000000 --- a/apps/vmq_generic_msg_store/src/vmq_generic_msg_store.hrl +++ /dev/null @@ -1,5 +0,0 @@ --include_lib("stdlib/include/ms_transform.hrl"). --include_lib("vmq_commons/include/vmq_types.hrl"). - --define(TBL_MSG_INIT, vmq_generic_msg_store_init_msg_idx). --define(NR_OF_BUCKETS, 12). diff --git a/apps/vmq_generic_msg_store/src/vmq_generic_msg_store_app.erl b/apps/vmq_generic_msg_store/src/vmq_generic_msg_store_app.erl deleted file mode 100644 index 76e31c76f..000000000 --- a/apps/vmq_generic_msg_store/src/vmq_generic_msg_store_app.erl +++ /dev/null @@ -1,34 +0,0 @@ -%% Copyright 2019 Octavo Labs AG Zurich Switzerland (https://octavolabs.com) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(vmq_generic_msg_store_app). - --behaviour(application). - -%% Application callbacks --export([start/2, stop/1]). - -%%==================================================================== -%% API -%%==================================================================== -start(_StartType, _StartArgs) -> - vmq_generic_msg_store_sup:start_link(). - -%%-------------------------------------------------------------------- -stop(_State) -> - ok. - -%%==================================================================== -%% Internal functions -%%==================================================================== diff --git a/apps/vmq_generic_msg_store/src/vmq_generic_msg_store_sup.erl b/apps/vmq_generic_msg_store/src/vmq_generic_msg_store_sup.erl deleted file mode 100644 index d2c894348..000000000 --- a/apps/vmq_generic_msg_store/src/vmq_generic_msg_store_sup.erl +++ /dev/null @@ -1,114 +0,0 @@ -%% Copyright 2018 Erlio GmbH Basel Switzerland (http://erl.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(vmq_generic_msg_store_sup). - --behaviour(supervisor). - -%% API --export([ - start_link/0, - get_bucket_pid/1, - get_bucket_pids/0, - register_bucket_pid/2 -]). - -%% Supervisor callbacks --export([init/1]). - --include("vmq_generic_msg_store.hrl"). --define(TABLE, vmq_generic_msg_store_buckets). - -%% =================================================================== -%% API functions -%% =================================================================== - -start_link() -> - {ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, []), - ok = init_msg_init_tables(), - Pids = - [ - begin - {ok, ChildPid} = supervisor:start_child(Pid, child_spec(I)), - ChildPid - end - || I <- lists:seq(1, ?NR_OF_BUCKETS) - ], - - ok = wait_until_initialized(Pids), - {ok, Pid}. - -init_msg_init_tables() -> - lists:foreach( - fun(I) -> - %% register them by name to make it easier to inspect - %% them. - Name = - list_to_atom("vmq_generic_msg_store_init_msg_idx_" ++ integer_to_list(I)), - Ref = ets:new(Name, [ - public, - named_table, - ordered_set, - {read_concurrency, true} - ]), - %% use persistent terms to fetch the references when - %% mapping from the subscriberid. - persistent_term:put({?TBL_MSG_INIT, I}, Ref) - end, - lists:seq(1, ?NR_OF_BUCKETS) - ), - ok. - -wait_until_initialized(Pids) -> - lists:foreach( - fun(Pid) -> - initialized = vmq_generic_msg_store:get_state(Pid) - end, - Pids - ). - -get_bucket_pid(Key) when is_binary(Key) -> - Id = (erlang:phash2(Key) rem ?NR_OF_BUCKETS) + 1, - case ets:lookup(?TABLE, Id) of - [] -> - {error, no_bucket_found}; - [{Id, Pid}] -> - {ok, Pid} - end. - -get_bucket_pids() -> - [Pid || [{_, Pid}] <- ets:match(?TABLE, '$1')]. - -register_bucket_pid(BucketId, BucketPid) -> - %% Called from vmq_generic_msg_store:init - ets:insert(?TABLE, {BucketId, BucketPid}), - ok. - -%% =================================================================== -%% Supervisor callbacks -%% =================================================================== - -init([]) -> - _ = ets:new(?TABLE, [public, named_table, {read_concurrency, true}]), - {ok, {{one_for_one, 5, 10}, []}}. - -child_spec(I) -> - { - {vmq_generic_msg_store_bucket, I}, - {vmq_generic_msg_store, start_link, [I]}, - permanent, - 5000, - worker, - [vmq_generic_msg_store] - }. diff --git a/apps/vmq_generic_msg_store/src/vmq_generic_msg_store_utils.erl b/apps/vmq_generic_msg_store/src/vmq_generic_msg_store_utils.erl deleted file mode 100644 index 5d5d0e317..000000000 --- a/apps/vmq_generic_msg_store/src/vmq_generic_msg_store_utils.erl +++ /dev/null @@ -1,107 +0,0 @@ -%% Copyright 2018 Erlio GmbH Basel Switzerland (http://erl.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(vmq_generic_msg_store_utils). - -%% API --export([ - dump/1, - dump/2, - full_table_scan/2 -]). - -%%%=================================================================== -%%% API -%%%=================================================================== - -%% dumps the message store content to the file. -dump(FileName) -> - FileWriteOpts = [write], - dump(FileName, FileWriteOpts). - -dump(FileName, FileOpenOpts) -> - {ok, Fd} = file:open(FileName, FileOpenOpts), - full_table_scan(fun file_dump_/2, Fd), - file:close(Fd). - -file_dump_({msg, MsgRef, MP, RoutingKey, Payload}, Fd) -> - file:write( - Fd, - io_lib:format( - "Msg[~s]:\t ref: ~p\t topic: ~s\t data: ~p~n", - [ - MP, - erlang:phash2(MsgRef), - iolist_to_binary(vmq_topic:unword(RoutingKey)), - Payload - ] - ) - ), - Fd; -file_dump_({ref, MsgRef, MP, ClientId}, Fd) -> - file:write( - Fd, - io_lib:format( - "Ref[~s]:\t ref: ~p\t client: ~s~n", - [MP, erlang:phash2(MsgRef), ClientId] - ) - ), - Fd; -file_dump_({idx, MsgRef, MP, ClientId, IdxVal}, Fd) -> - {{MegaS, Sec, MicroS}, Dup, QoS} = IdxVal, - file:write( - Fd, - io_lib:format( - "Idx[~s]:\t ref: ~p\t client: ~s\t ts: ~p.~p.~p dup: ~p qos: ~p~n", - [ - MP, - erlang:phash2(MsgRef), - ClientId, - MegaS, - Sec, - MicroS, - Dup, - QoS - ] - ) - ), - Fd. - -full_table_scan(FoldFun, Acc) -> - full_table_scan_(vmq_generic_msg_store_sup:get_bucket_pids(), {FoldFun, Acc}). - -full_table_scan_([Bucket | Rest], Acc) -> - {Engine, EngineState} = vmq_generic_msg_store:get_engine(Bucket), - NewAcc = Engine:fold(EngineState, fun full_table_scan__/3, Acc), - full_table_scan_(Rest, NewAcc); -full_table_scan_([], {_, Acc}) -> - Acc. - -full_table_scan__(Key, Value, {FoldFun, FoldAcc} = Acc) -> - NewFoldAcc = - case sext:decode(Key) of - {msg, MsgRef, {MP, ''}} -> - {RoutingKey, Payload} = binary_to_term(Value), - FoldFun({msg, MsgRef, MP, RoutingKey, Payload}, FoldAcc); - {msg, MsgRef, {MP, ClientId}} -> - <<>> = Value, - FoldFun({ref, MsgRef, MP, ClientId}, FoldAcc); - {idx, {MP, ClientId}, MsgRef} -> - IdxVal = binary_to_term(Value), - FoldFun({idx, MsgRef, MP, ClientId, IdxVal}, FoldAcc); - E -> - io:format("unknown sext encoded key ~p~n", [E]), - Acc - end, - {FoldFun, NewFoldAcc}. diff --git a/apps/vmq_generic_msg_store/test/vmq_generic_msg_store_SUITE.erl b/apps/vmq_generic_msg_store/test/vmq_generic_msg_store_SUITE.erl deleted file mode 100644 index 763728155..000000000 --- a/apps/vmq_generic_msg_store/test/vmq_generic_msg_store_SUITE.erl +++ /dev/null @@ -1,321 +0,0 @@ --module(vmq_generic_msg_store_SUITE). --include("src/vmq_generic_msg_store.hrl"). --export([ - %% suite/0, - init_per_suite/1, - end_per_suite/1, - init_per_group/2, - end_per_group/2, - init_per_testcase/2, - end_per_testcase/2, - all/0, - groups/0 - ]). - --export([insert_delete_test/1, - ref_delete_test/1, - message_compat_pre_test/1, - idx_compat_pre_test/1]). - - -%% =================================================================== -%% common_test callbacks -%% =================================================================== -init_per_suite(Config) -> - Config. - -end_per_suite(Config) -> - Config. - -init_per_group(StorageEngine, Config) -> - [{engine, StorageEngine}|Config]. - -end_per_group(_Group, _Config) -> - ok. - -init_per_testcase(message_compat_pre_test, Config) -> - Config; -init_per_testcase(idx_compat_pre_test, Config) -> - Config; -init_per_testcase(_Case, Config) -> - StorageEngine = proplists:get_value(engine, Config), - application:load(vmq_generic_msg_store), - application:set_env(vmq_generic_msg_store, msg_store_engine, StorageEngine), - application:ensure_all_started(vmq_generic_msg_store), - Config. - -end_per_testcase(message_compat_pre_test, Config) -> - Config; -end_per_testcase(idx_compat_pre_test, Config) -> - Config; -end_per_testcase(_, Config) -> - application:stop(vmq_generic_msg_store), - Config. - -all() -> - [ - {group, vmq_storage_engine_leveldb}, - %{group, vmq_storage_engine_dets}, - {group, vmq_storage_engine_ets}, - {group, basic} - ]. - -groups() -> - StorageTests = [ - insert_delete_test, - ref_delete_test, - message_compat_pre_test, - idx_compat_pre_test], - BasicTests = [ - message_compat_pre_test, - idx_compat_pre_test - ], - [ - {vmq_storage_engine_leveldb, [shuffle], StorageTests}, - {vmq_storage_engine_dets, [shuffle], StorageTests}, - {vmq_storage_engine_ets, [shuffle], StorageTests}, - {basic, [shuffle], BasicTests} - ]. - - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%%% Actual Tests -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -insert_delete_test(Config) -> - {0,0} = store_summary(), - Msgs = generate_msgs(1000, []), - Refs = [Ref || #vmq_msg{msg_ref=Ref} <- Msgs], - ok = store_msgs({"", "foo"}, Msgs), - - {1000,1000} = store_summary(), - - 1000 = refcount(Refs, 0), - %% we should get back the exact same list - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo"}, other), - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo"}, queue_init), - %% delete all - ok = delete_msgs({"", "foo"}, Msgs), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo"}, other), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo"}, queue_init), - - 0 = refcount(Refs, 0), - {0,0} = store_summary(), - Config. - -ref_delete_test(Config) -> - {0,0} = store_summary(), - Msgs = generate_msgs(1000, []), - Refs = [Ref || #vmq_msg{msg_ref=Ref} <- Msgs], - ok = store_msgs({"", "foo0"}, Msgs), - ok = store_msgs({"", "foo1"}, Msgs), - ok = store_msgs({"", "foo2"}, Msgs), - ok = store_msgs({"", "foo3"}, Msgs), - ok = store_msgs({"", "foo4"}, Msgs), - ok = store_msgs({"", "foo5"}, Msgs), - ok = store_msgs({"", "foo6"}, Msgs), - ok = store_msgs({"", "foo7"}, Msgs), - ok = store_msgs({"", "foo8"}, Msgs), - ok = store_msgs({"", "foo9"}, Msgs), - - 10000 = refcount(Refs, 0), - {1000,10000} = store_summary(), - - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo0"}, other), - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo0"}, queue_init), - {ok, Msgs} = read_msgs({"", "foo0"}, Refs), - ok = delete_msgs({"", "foo0"}, Msgs), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo0"}, other), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo0"}, queue_init), - - 9000 = refcount(Refs, 0), - {1000,9000} = store_summary(), - - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo1"}, other), - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo1"}, queue_init), - {ok, Msgs} = read_msgs({"", "foo1"}, Refs), - ok = delete_msgs({"", "foo1"}, Msgs), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo1"}, other), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo1"}, queue_init), - - 8000 = refcount(Refs, 0), - {1000,8000} = store_summary(), - - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo2"}, other), - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo2"}, queue_init), - {ok, Msgs} = read_msgs({"", "foo2"}, Refs), - ok = delete_msgs({"", "foo2"}, Msgs), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo2"}, other), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo2"}, queue_init), - - 7000 = refcount(Refs, 0), - {1000,7000} = store_summary(), - - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo3"}, other), - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo3"}, queue_init), - {ok, Msgs} = read_msgs({"", "foo3"}, Refs), - ok = delete_msgs({"", "foo3"}, Msgs), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo3"}, other), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo3"}, queue_init), - - 6000 = refcount(Refs, 0), - {1000,6000} = store_summary(), - - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo4"}, other), - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo4"}, queue_init), - {ok, Msgs} = read_msgs({"", "foo4"}, Refs), - ok = delete_msgs({"", "foo4"}, Msgs), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo4"}, other), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo4"}, queue_init), - - 5000 = refcount(Refs, 0), - {1000,5000} = store_summary(), - - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo5"}, other), - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo5"}, queue_init), - {ok, Msgs} = read_msgs({"", "foo5"}, Refs), - ok = delete_msgs({"", "foo5"}, Msgs), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo5"}, other), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo5"}, queue_init), - - 4000 = refcount(Refs, 0), - {1000,4000} = store_summary(), - - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo6"}, other), - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo6"}, queue_init), - {ok, Msgs} = read_msgs({"", "foo6"}, Refs), - ok = delete_msgs({"", "foo6"}, Msgs), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo6"}, other), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo6"}, queue_init), - - 3000 = refcount(Refs, 0), - {1000,3000} = store_summary(), - - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo7"}, other), - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo7"}, queue_init), - {ok, Msgs} = read_msgs({"", "foo7"}, Refs), - ok = delete_msgs({"", "foo7"}, Msgs), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo7"}, other), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo7"}, queue_init), - - 2000 = refcount(Refs, 0), - {1000,2000} = store_summary(), - - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo8"}, other), - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo8"}, queue_init), - {ok, Msgs} = read_msgs({"", "foo8"}, Refs), - ok = delete_msgs({"", "foo8"}, Msgs), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo8"}, other), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo8"}, queue_init), - - 1000 = refcount(Refs, 0), - {1000,1000} = store_summary(), - - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo9"}, other), - {ok, Refs} = vmq_generic_msg_store:msg_store_find({"", "foo9"}, queue_init), - {ok, Msgs} = read_msgs({"", "foo9"}, Refs), - ok = delete_msgs({"", "foo9"}, Msgs), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo9"}, other), - {ok, []} = vmq_generic_msg_store:msg_store_find({"", "foo9"}, queue_init), - - 0 = refcount(Refs, 0), - {0,0} = store_summary(), - - Config. - -%% @doc test that the message store functions can parse and serialize -%% data from the future as well as the pre versioning format. -message_compat_pre_test(_Cfg) -> - %% We can serialize and parse msg vals from before versioning was added - PreVersion = {[<<"routing">>, <<"key">>], %% routing_key = [<<"routing">>, <<"key">>] - <<"payload">>}, - PreVersion = vmq_generic_msg_store:parse_p_msg_val_pre( - vmq_generic_msg_store:serialize_p_msg_val_pre(PreVersion)), - - %% We can also serialize / parse something from the future: - FutureVersion = {1, %% version - [<<"routing">>, <<"key">>], <<"payload">>, <<"something else">>}, - - PreVersion = vmq_generic_msg_store:parse_p_msg_val_pre( - vmq_generic_msg_store:serialize_p_msg_val_pre(FutureVersion)), - ok. - -idx_compat_pre_test(_Cfg) -> - %% We can serialize and parse idx vals from before versioning was added - PreVersion = {p_idx_val, - {1529,586954,257209}, %% ts = {1529,586954,257209} - false, %% dup = false - 2 %% qos = 2 - }, - PreVersion = vmq_generic_msg_store:parse_p_idx_val_pre( - vmq_generic_msg_store:serialize_p_idx_val_pre(PreVersion)), - - %% We can also serialize / parse something from the future: - FutureVersion = {p_idx_val, - 1, %% version = 1 - {1529,586954,257209}, %% ts = {1529,586954,257209} - false, %% dup = false - 2, %% qos = 2 - "something unknown1", - "something unknown2" - }, - - PreVersion = vmq_generic_msg_store:parse_p_idx_val_pre( - vmq_generic_msg_store:serialize_p_idx_val_pre(FutureVersion)), - ok. - -generate_msgs(0, Acc) -> Acc; -generate_msgs(N, Acc) -> - Msg = #vmq_msg{msg_ref= msg_ref(), - routing_key= rand_bytes(10), - payload = rand_bytes(100), - mountpoint = "", - dup = random_flag(), - qos = random_qos(), - properties=#{}, - persisted=true}, - generate_msgs(N - 1, [Msg|Acc]). - -store_msgs(SId, [Msg|Rest]) -> - ok = vmq_generic_msg_store:msg_store_write(SId, Msg), - store_msgs(SId, Rest); -store_msgs(_, []) -> ok. - -delete_msgs(_, []) -> ok; -delete_msgs(SId, [#vmq_msg{msg_ref=Ref}|Rest]) -> - ok = vmq_generic_msg_store:msg_store_delete(SId, Ref), - delete_msgs(SId, Rest). - -read_msgs(SId, Refs) -> - read_msgs(SId, Refs, []). -read_msgs(_, [], Acc) -> {ok, lists:reverse(Acc)}; -read_msgs(SId, [Ref|Refs], Acc) -> - {ok, Msg} = vmq_generic_msg_store:msg_store_read(SId, Ref), - read_msgs(SId, Refs, [Msg|Acc]). - -refcount([Ref|Refs], Cnt) -> - refcount(Refs, Cnt + vmq_generic_msg_store:refcount(Ref)); -refcount([], Cnt) -> Cnt. - - -random_flag() -> - rand:uniform(10) > 5. - -random_qos() -> - rand:uniform(3) - 1. - -store_summary() -> - vmq_generic_msg_store_utils:full_table_scan( - fun - ({msg, _, _, _, _}, {NumMsgs, NumIdxs}) -> - {NumMsgs + 1, NumIdxs}; - ({idx, _, _, _, _}, {NumMsgs, NumIdxs}) -> - {NumMsgs, NumIdxs + 1} - end, {0,0}). - -rand_bytes(N) -> - crypto:strong_rand_bytes(N). - -msg_ref() -> - erlang:md5(term_to_binary({node(), self(), erlang:timestamp(), rand_bytes(10)})). - - diff --git a/apps/vmq_generic_offline_msg_store/priv/vmq_generic_offline_msg_store.schema b/apps/vmq_generic_offline_msg_store/priv/vmq_generic_offline_msg_store.schema deleted file mode 100644 index d83e759e7..000000000 --- a/apps/vmq_generic_offline_msg_store/priv/vmq_generic_offline_msg_store.schema +++ /dev/null @@ -1,42 +0,0 @@ -%% -*- mode: erlang -*- -%% ex: ft=erlang - -{mapping, "offline_message_store_engine", "vmq_generic_offline_msg_store.msg_store_engine", - [{default, vmq_offline_storage_engine_redis}, - {datatype, atom} - ]}. - -{mapping, "offline_message_store_opts.host", "vmq_generic_offline_msg_store.msg_store_opts.host", - [{default, "localhost"}, - {datatype, string} - ]}. - -{mapping, "offline_message_store_opts.port", "vmq_generic_offline_msg_store.msg_store_opts.port", - [{default, 6379}, - {datatype, integer} - ]}. - -{mapping, "offline_message_store_opts.username", "vmq_generic_offline_msg_store.msg_store_opts.username", - [{default, undefined}, - {datatype, atom} - ]}. - -{mapping, "offline_message_store_opts.password", "vmq_generic_offline_msg_store.msg_store_opts.password", - [{default, undefined}, - {datatype, atom} - ]}. - -{mapping, "offline_message_store_opts.database", "vmq_generic_offline_msg_store.msg_store_opts.database", - [{default, "2"}, - {datatype, string} - ]}. - -{mapping, "offline_message_store_opts.connect_timeout", "vmq_generic_offline_msg_store.msg_store_opts.connect_timeout", - [{default, 4000}, - {datatype, integer} - ]}. - -{mapping, "offline_message_store_opts.query_timeout", "vmq_generic_offline_msg_store.msg_store_opts.query_timeout", - [{default, 2000}, - {datatype, integer} - ]}. diff --git a/apps/vmq_generic_offline_msg_store/rebar.config b/apps/vmq_generic_offline_msg_store/rebar.config deleted file mode 100644 index 16c4b0bb9..000000000 --- a/apps/vmq_generic_offline_msg_store/rebar.config +++ /dev/null @@ -1,9 +0,0 @@ -{erl_opts, [debug_info, {parse_transform, lager_transform}]}. -{deps, [ - lager, - {epgsql, "4.6.0"}, - {eredis, - {git, "https://github.com/dhruvjain99/eredis.git", {branch, "fix-sentinel-reconnect-loop"}}}, - {psql_migration, {git, "https://github.com/helium/psql-migration.git", {branch, "master"}}} -]}. -{cover_enabled, true}. diff --git a/apps/vmq_generic_offline_msg_store/src/engines/vmq_offline_storage_engine_postgres.erl b/apps/vmq_generic_offline_msg_store/src/engines/vmq_offline_storage_engine_postgres.erl deleted file mode 100644 index a79233d60..000000000 --- a/apps/vmq_generic_offline_msg_store/src/engines/vmq_offline_storage_engine_postgres.erl +++ /dev/null @@ -1,115 +0,0 @@ --module(vmq_offline_storage_engine_postgres). - --export([open/1, close/1, write/5, delete/3, delete/4, read/4, find/3]). - --include_lib("vmq_commons/src/vmq_types_common.hrl"). - --define(TABLE, "messages"). - --dialyzer([{nowarn_function, [write/5, delete/3, delete/4, read/4, find/3, equery/4]}]). - --record(column, { - name :: binary(), - type :: epgsql:epgsql_type(), - oid :: integer(), - size :: -1 | pos_integer(), - modifier :: -1 | pos_integer(), - format :: integer() -}). - --record(statement, { - name :: string(), - columns :: [#column{}], - types :: [epgsql:epgsql_type()], - parameter_info :: [epgsql_oid_db:oid_entry()] -}). - -% API -open(Opts) -> - Username = case proplists:get_value(username, Opts, undefined) of - undefined -> undefined; - User when is_atom(User) -> atom_to_list(User) - end, - Password = case proplists:get_value(password, Opts, undefined) of - undefined -> undefined; - Pass when is_atom(Pass) -> atom_to_list(Pass) - end, - ConnectOpts = [{host, proplists:get_value(host, Opts, "localhost")}, - {port, proplists:get_value(port, Opts, 5432)}, - {username, Username}, - {password, Password}, - {database, proplists:get_value(database, Opts, "vmq_test_database")}, - {timeout, proplists:get_value(connect_timeout, Opts, 5000)} - ], - open_(ConnectOpts). -open_(Opts) -> - case epgsql:connect(Opts) of - {ok, _} = OkResponse -> OkResponse; - {error, Reason} -> - lager:error("Error connecting db: ~p", [Reason]), - timer:sleep(2000), - open_(Opts) - end. - -write(Client, SIdB, MsgRef, MsgB, Timeout) -> - equery(Client, - "INSERT INTO " ++ ?TABLE ++ " (sid, msgref, payload) VALUES ($1, $2, $3)", - [SIdB, MsgRef, MsgB], - Timeout - ). - -delete(Client, SIdB, Timeout) -> - equery(Client, "DELETE FROM " ++ ?TABLE ++ " WHERE sid=$1", [SIdB], Timeout). - -delete(Client, SIdB, MsgRef, Timeout) -> - equery(Client, "DELETE FROM " ++ ?TABLE ++ " WHERE sid=$1 AND msgref=$2", [SIdB, MsgRef], Timeout). - -read(Client, SIdB, MsgRef, Timeout) -> - case equery(Client, "SELECT payload FROM " ++ ?TABLE ++ " WHERE sid=$1 AND msgref=$2", [SIdB, MsgRef], Timeout) of - {ok, _, [{BinaryMsg}]} -> {ok, binary_to_term(BinaryMsg)}; - E -> E - end. - -find(Client, SIdB, Timeout) -> - case equery(Client, "SELECT payload FROM " ++ ?TABLE ++ " WHERE sid=$1 ORDER BY created_time ASC", [SIdB], Timeout) of - {ok, _, MsgsInB} -> - DMsgs = lists:foldr(fun({MsgB}, Acc) -> - Msg = binary_to_term(MsgB), - D = #deliver{msg = Msg, qos = Msg#vmq_msg.qos}, - [D | Acc] end, [], MsgsInB), - {ok, DMsgs}; - Res -> Res - end. - -close(Client) -> - epgsql:close(Client). - -equery(C, SQL, Parameters, Timeout) -> - Ref0 = epgsqla:parse(C, SQL), - receive - {C, Ref0, {ok, #statement{types = Types} = S}} -> - TypedParameters = lists:zip(Types, Parameters), - Ref1 = epgsqla:equery(C, S, TypedParameters), - receive - {C, Ref1, Result} -> - epgsql:sync(C), - Result - after Timeout -> - ok = epgsql:cancel(C), - receive - {C, Ref1, Result} -> - epgsql:sync(C), - Result - end - end; - {C, Ref0, Result} -> - epgsql:sync(C), - Result - after Timeout -> - ok = epgsql:cancel(C), - receive - {C, Ref0, Result} -> - epgsql:sync(C), - Result - end - end. diff --git a/apps/vmq_generic_offline_msg_store/src/engines/vmq_offline_storage_engine_redis.erl b/apps/vmq_generic_offline_msg_store/src/engines/vmq_offline_storage_engine_redis.erl deleted file mode 100644 index d3b486449..000000000 --- a/apps/vmq_generic_offline_msg_store/src/engines/vmq_offline_storage_engine_redis.erl +++ /dev/null @@ -1,55 +0,0 @@ --module(vmq_offline_storage_engine_redis). - --export([open/1, close/1, write/5, delete/3, delete/4, read/4, find/3]). - --include_lib("vmq_server/src/vmq_server.hrl"). - --dialyzer([{nowarn_function, [write/5, delete/3, delete/4, read/4, find/3]}]). - -% API -open(Opts) -> - Username = case proplists:get_value(username, Opts, undefined) of - undefined -> undefined; - User when is_atom(User) -> atom_to_list(User) - end, - Password = case proplists:get_value(password, Opts, undefined) of - undefined -> undefined; - Pass when is_atom(Pass) -> atom_to_list(Pass) - end, - {Database, _} = string:to_integer(proplists:get_value(database, Opts, "2")), - Port = proplists:get_value(port, Opts, 26379), - SentinelHosts = vmq_schema_util:parse_list(proplists:get_value(host, Opts, "[\"localhost\"]")), - SentinelEndpoints = lists:foldr(fun(Host, Acc) -> [{Host, Port} | Acc]end, [], SentinelHosts), - ConnectOpts = [{sentinel, [{endpoints, SentinelEndpoints}, - {timeout, proplists:get_value(connect_timeout, Opts, 5000)}] - }, - {username, Username}, - {password, Password}, - {database, Database}], - eredis:start_link(ConnectOpts). - -write(Client, SIdB, _MsgRef, MsgB, Timeout) -> - vmq_redis:query(Client, ["RPUSH", SIdB, MsgB], ?RPUSH, ?MSG_STORE_WRITE, Timeout). - -delete(Client, SIdB, Timeout) -> - vmq_redis:query(Client, ["DEL", SIdB], ?DEL, ?MSG_STORE_DELETE, Timeout). - -delete(Client, SIdB, _MsgRef, Timeout) -> - vmq_redis:query(Client, ["LPOP", SIdB, 1], ?LPOP, ?MSG_STORE_DELETE, Timeout). - -read(_Client, _SIdB, _MsgRef, _Timeout) -> - {error, not_supported}. - -find(Client, SIdB, Timeout) -> - case vmq_redis:query(Client, ["LRANGE", SIdB, "0", "-1"], ?FIND, ?MSG_STORE_FIND, Timeout) of - {ok, MsgsInB} -> - DMsgs = lists:foldr(fun(MsgB, Acc) -> - Msg = binary_to_term(MsgB), - D = #deliver{msg = Msg, qos = Msg#vmq_msg.qos}, - [D | Acc] end, [], MsgsInB), - {ok, DMsgs}; - Res -> Res - end. - -close(Client) -> - eredis:stop(Client). diff --git a/apps/vmq_generic_offline_msg_store/src/vmq_generic_offline_msg_store.app.src b/apps/vmq_generic_offline_msg_store/src/vmq_generic_offline_msg_store.app.src deleted file mode 100644 index 9cda51714..000000000 --- a/apps/vmq_generic_offline_msg_store/src/vmq_generic_offline_msg_store.app.src +++ /dev/null @@ -1,34 +0,0 @@ -{application, vmq_generic_offline_msg_store, [ - {description, "A VerneMQ plugin that sets up offline store for message storage"}, - {vsn, git}, - {registered, []}, - {mod, {vmq_generic_offline_msg_store_app, []}}, - {applications, [ - kernel, - stdlib, - sasl, - lager, - epgsql, - eredis - ]}, - {env, [ - {vmq_plugin_hooks, [ - {vmq_generic_offline_msg_store, msg_store_write, 2, [internal]}, - {vmq_generic_offline_msg_store, msg_store_delete, 1, [internal]}, - {vmq_generic_offline_msg_store, msg_store_delete, 2, [internal]}, - {vmq_generic_offline_msg_store, msg_store_read, 2, [internal]}, - {vmq_generic_offline_msg_store, msg_store_find, 1, [internal]} - ]}, - {msg_store_engine, vmq_offline_storage_engine_redis}, - {msg_store_opts, [ - {host, "[\"localhost\"]"}, - {port, 26379}, - {username, undefined}, - {password, undefined}, - {database, "2"}, - {connect_timeout, 4000}, - {query_timeout, 2000} - ]} - ]}, - {modules, []} -]}. diff --git a/apps/vmq_generic_offline_msg_store/src/vmq_generic_offline_msg_store.erl b/apps/vmq_generic_offline_msg_store/src/vmq_generic_offline_msg_store.erl deleted file mode 100644 index af99cc909..000000000 --- a/apps/vmq_generic_offline_msg_store/src/vmq_generic_offline_msg_store.erl +++ /dev/null @@ -1,204 +0,0 @@ --module(vmq_generic_offline_msg_store). - --behaviour(gen_server). - --include_lib("vmq_commons/src/vmq_types_common.hrl"). - -%% API --export([ - start_link/0, - msg_store_write/2, - msg_store_delete/1, - msg_store_delete/2, - msg_store_read/2, - msg_store_find/1 -]). - -%% gen_server callbacks --export([ - init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3 -]). - --record(state, { - engine, - engine_module, - query_timeout, - options -}). - -%%%=================================================================== -%%% API -%%%=================================================================== -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -msg_store_write(SubscriberId, Msg) -> - safe_call({write, SubscriberId, Msg}). - -msg_store_delete(SubscriberId) -> - safe_call({delete, SubscriberId}). - -msg_store_delete(SubscriberId, MsgRef) -> - safe_call({delete, SubscriberId, MsgRef}). - -msg_store_read(SubscriberId, MsgRef) -> - safe_call({read, SubscriberId, MsgRef}). - -msg_store_find(SubscriberId) -> - safe_call({find, SubscriberId}). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Initializes the server -%% -%% @spec init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% @end -%%-------------------------------------------------------------------- -init([]) -> - {ok, EngineModule} = application:get_env(vmq_generic_offline_msg_store, msg_store_engine), - {ok, Opts} = application:get_env(vmq_generic_offline_msg_store, msg_store_opts), - Timeout = proplists:get_value(query_timeout, Opts, 2000), - - process_flag(trap_exit, true), - case apply(EngineModule, open, [Opts]) of - {ok, EngineState} -> - {ok, #state{ - engine = EngineState, - engine_module = EngineModule, - query_timeout = Timeout, - options = Opts - }}; - {error, Reason} -> - {stop, Reason} - end. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling call messages -%% -%% @spec handle_call(Request, From, State) -> -%% {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% @end -%%-------------------------------------------------------------------- -handle_call(Request, _From, State) -> - {reply, handle_req(Request, State), State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling cast messages -%% -%% @spec handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% @end -%%-------------------------------------------------------------------- -handle_cast(_Request, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling all non call/cast messages -%% -%% @spec handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% @end -%%-------------------------------------------------------------------- -handle_info({'EXIT', Pid, _}, #state{engine = Engine} = State) when Engine == Pid -> - reconnect(State); -handle_info(Info, State) -> - lager:info("Unknown info: ~p", [Info]), - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. -%% -%% @spec terminate(Reason, State) -> void() -%% @end -%%-------------------------------------------------------------------- -terminate(_Reason, #state{engine = EngineState, engine_module = EngineModule}) -> - apply(EngineModule, close, [EngineState]), - ok. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Convert process state when code is changed -%% -%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} -%% @end -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== -reconnect(#state{engine_module = EngineModule, options = Opts} = State) -> - case apply(EngineModule, open, [Opts]) of - {ok, EngineState} -> - {noreply, State#state{engine = EngineState}}; - {error, Reason} -> - {stop, Reason, State} - end. - -safe_call(Request) -> - try - gen_server:call(?MODULE, Request) - catch - Error:Reason -> {error, {Error, Reason}} - end. - -handle_req( - {write, SId, Msg}, - #state{engine = Engine, engine_module = EngineModule, query_timeout = Timeout} -) -> - apply(EngineModule, write, [ - Engine, term_to_binary(SId), Msg#vmq_msg.msg_ref, term_to_binary(Msg), Timeout - ]); -handle_req( - {delete, SId}, - #state{engine = Engine, engine_module = EngineModule, query_timeout = Timeout} -) -> - apply(EngineModule, delete, [Engine, term_to_binary(SId), Timeout]); -handle_req( - {delete, SId, MsgRef}, - #state{engine = Engine, engine_module = EngineModule, query_timeout = Timeout} -) -> - apply(EngineModule, delete, [Engine, term_to_binary(SId), MsgRef, Timeout]); -handle_req( - {read, SId, MsgRef}, - #state{engine = Engine, engine_module = EngineModule, query_timeout = Timeout} -) -> - apply(EngineModule, read, [Engine, term_to_binary(SId), MsgRef, Timeout]); -handle_req( - {find, SId}, - #state{engine = Engine, engine_module = EngineModule, query_timeout = Timeout} -) -> - apply(EngineModule, find, [Engine, term_to_binary(SId), Timeout]). diff --git a/apps/vmq_generic_offline_msg_store/src/vmq_generic_offline_msg_store_app.erl b/apps/vmq_generic_offline_msg_store/src/vmq_generic_offline_msg_store_app.erl deleted file mode 100644 index f71f27702..000000000 --- a/apps/vmq_generic_offline_msg_store/src/vmq_generic_offline_msg_store_app.erl +++ /dev/null @@ -1,16 +0,0 @@ --module(vmq_generic_offline_msg_store_app). - --behaviour(application). - -%% Application callbacks --export([start/2, stop/1]). - -%%==================================================================== -%% API -%%==================================================================== -start(_StartType, _StartArgs) -> - vmq_generic_offline_msg_store_sup:start_link(). - -%%-------------------------------------------------------------------- -stop(_State) -> - ok. diff --git a/apps/vmq_generic_offline_msg_store/src/vmq_generic_offline_msg_store_sup.erl b/apps/vmq_generic_offline_msg_store/src/vmq_generic_offline_msg_store_sup.erl deleted file mode 100644 index 8c8ca4ba8..000000000 --- a/apps/vmq_generic_offline_msg_store/src/vmq_generic_offline_msg_store_sup.erl +++ /dev/null @@ -1,36 +0,0 @@ --module(vmq_generic_offline_msg_store_sup). - --behaviour(supervisor). - -%% API --export([start_link/0]). - -%% Supervisor callbacks --export([init/1]). - -%% =================================================================== -%% API functions -%% =================================================================== - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -%% =================================================================== -%% Supervisor callbacks -%% =================================================================== - -init([]) -> - SupFlags = - #{strategy => one_for_one, intensity => 1, period => 5}, - ChildSpecs = - [ - #{ - id => vmq_generic_offline_msg_store, - start => {vmq_generic_offline_msg_store, start_link, []}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [vmq_generic_offline_msg_store] - } - ], - {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/vmq_generic_offline_msg_store/test/vmq_generic_offline_msg_store_SUITE.erl b/apps/vmq_generic_offline_msg_store/test/vmq_generic_offline_msg_store_SUITE.erl deleted file mode 100644 index 243931f65..000000000 --- a/apps/vmq_generic_offline_msg_store/test/vmq_generic_offline_msg_store_SUITE.erl +++ /dev/null @@ -1,129 +0,0 @@ --module(vmq_generic_offline_msg_store_SUITE). --include_lib("vmq_commons/src/vmq_types_common.hrl"). --export([ - %% suite/0, - init_per_suite/1, - end_per_suite/1, - init_per_group/2, - end_per_group/2, - init_per_testcase/2, - end_per_testcase/2, - all/0, - groups/0 - ]). - --export([insert_delete_test/1 ]). - - -%% =================================================================== -%% common_test callbacks -%% =================================================================== -init_per_suite(Config) -> - Config. - -end_per_suite(Config) -> - Config. - -init_per_group(StorageEngine, Config) -> - [{engine, StorageEngine}|Config]. - -end_per_group(_Group, _Config) -> - ok. - -init_per_testcase(_Case, Config) -> - StorageEngine = proplists:get_value(engine, Config), - Opts = case StorageEngine of - vmq_offline_storage_engine_postgres -> - [ - {username, vmq_test_user}, - {password, vmq_test_password}, - {database, "vmq_test_database"}, - {host, "localhost"}, - {port, 5432} - ]; - vmq_offline_storage_engine_redis -> - {ok, _} = vmq_metrics:start_link(), - [ - {database, "2"}, - {host, "[\"localhost\"]"}, - {port, 26379} - ] - end, - application:load(vmq_generic_offline_msg_store), - application:set_env(vmq_generic_offline_msg_store, msg_store_engine, StorageEngine), - application:set_env(vmq_generic_offline_msg_store, msg_store_opts, Opts), - application:ensure_all_started(vmq_generic_offline_msg_store), - Config. - -end_per_testcase(_, Config) -> - application:stop(vmq_generic_offline_msg_store), - Config. - -all() -> - [ - {group, vmq_offline_storage_engine_postgres}, - {group, vmq_offline_storage_engine_redis} - ]. - -groups() -> - [ - {vmq_offline_storage_engine_postgres, [], [insert_delete_test]}, - {vmq_offline_storage_engine_redis, [], [insert_delete_test]} - ]. - - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%%% Actual Tests -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -insert_delete_test(Config) -> - SId = {"", "foo"}, - - {0, []} = store_summary(SId), - - Msgs = generate_msgs(1000, []), - ok = store_msgs({"", "foo"}, Msgs), %% write - - {1000, [#deliver{msg = Msg} | _]} = store_summary(SId), %% find - - {ok, _} = vmq_generic_offline_msg_store:msg_store_delete(SId, Msg#vmq_msg.msg_ref), %% delete - {ok, _} = vmq_generic_offline_msg_store:msg_store_delete(SId), %% delete all - - {0, []} = store_summary(SId), %% find - Config. - -generate_msgs(0, Acc) -> Acc; -generate_msgs(N, Acc) -> - Msg = #vmq_msg{msg_ref= msg_ref(), - routing_key= rand_bytes(10), - payload = rand_bytes(100), - mountpoint = "", - dup = random_flag(), - qos = random_qos(), - properties=#{}, - persisted=true}, - generate_msgs(N - 1, [Msg|Acc]). - -store_msgs(SId, [Msg|Rest]) -> - {ok, _} = vmq_generic_offline_msg_store:msg_store_write(SId, Msg), - store_msgs(SId, Rest); -store_msgs(_, []) -> ok. - -random_flag() -> - rand:uniform(10) > 5. - -random_qos() -> - rand:uniform(3) - 1. - -store_summary(SId) -> - {ok, MsgList} = vmq_generic_offline_msg_store:msg_store_find(SId), - Size = size(MsgList, 0), - {Size, MsgList}. - -size([], S) -> S; -size([_ | R], S) -> size(R, S + 1). - -rand_bytes(N) -> - crypto:strong_rand_bytes(N). - -msg_ref() -> - erlang:md5(term_to_binary({node(), self(), erlang:timestamp(), rand_bytes(10)})). diff --git a/apps/vmq_server/priv/vmq_server.schema b/apps/vmq_server/priv/vmq_server.schema index a6d73a19e..12f4ba9d6 100644 --- a/apps/vmq_server/priv/vmq_server.schema +++ b/apps/vmq_server/priv/vmq_server.schema @@ -59,15 +59,6 @@ "redis_queue_sleep_interval", "vmq_server.redis_queue_sleep_interval", [{default, 0}, {datatype, integer}]}. -%% @doc This option specifies the interval in which the message store must retry the failed operations. -{mapping, "message_store_retry_interval", "vmq_server.message_store_retry_interval", [{default, 2000}, - {datatype, integer} - ]}. -%% @doc This option specifies the maximum number of retries in case of failed operations. -{mapping, "message_store_nr_of_retries", "vmq_server.message_store_nr_of_retries", [{default, 2}, - {datatype, integer} - ]}. - %% @doc This option caches the shared subscription locally, default is 'off'. !!NOTE!! %% Local shared subscriptions currently do not honor shared subscription groups properly. %% It is recommended to use shared subscription topics only for shared subscription use-cases. @@ -1741,11 +1732,6 @@ hidden ]}. -{mapping, "message_store_plugin", "vmq_server.message_store_impl", [{default, vmq_generic_msg_store}, - {datatype, atom}, - hidden - ]}. - %% @doc This parameter allows to set the max no. of crashes that will be allowed %% in queue_sup_sup_max_t seconds before restarting the queue_sup processes. {mapping, "queue_sup_sup_max_r", "vmq_server.max_r", [ @@ -1789,3 +1775,15 @@ {default, 10}, {datatype, integer} ]}. + +%% @doc This option specifies the connect options for redis as message store +{mapping, "message_store.redis.connect_options", "vmq_server.message_store.redis.connect_options", + [{datatype, string}]}. + +%% @doc This option specifies the username for redis as message store +{mapping, "message_store.redis.username", "vmq_server.message_store.redis.username", + [{datatype, atom}]}. + +%% @doc This option specifies the password for redis as message store +{mapping, "message_store.redis.password", "vmq_server.message_store.redis.password", + [{datatype, atom}]}. diff --git a/apps/vmq_server/src/vmq_message_store.erl b/apps/vmq_server/src/vmq_message_store.erl index c5c343b25..5f20d7c47 100644 --- a/apps/vmq_server/src/vmq_message_store.erl +++ b/apps/vmq_server/src/vmq_message_store.erl @@ -1,22 +1,15 @@ -%% Copyright 2019 Octavo Labs AG Zurich Switzerland (http://octavolabs.com) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - -module(vmq_message_store). + +-behaviour(supervisor). + -include("vmq_server.hrl"). + +%% Supervisor callbacks +-export([init/1]). + +%% API -export([ start/0, - stop/0, write/2, read/2, delete/1, @@ -24,138 +17,97 @@ find/1 ]). --export([ - write_with_retry/3, - read_with_retry/3, - delete_all_with_retry/2, - delete_with_retry/3, - find_with_retry/2 -]). - --define(RETRY_INTERVAL, application:get_env(vmq_server, message_store_retry_interval, 2000)). --define(NR_OF_RETRIES, application:get_env(vmq_server, message_store_nr_of_retries, 2)). - start() -> - Impl = application:get_env(vmq_server, message_store_impl, vmq_generic_offline_msg_store), - Ret = vmq_plugin_mgr:enable_system_plugin(Impl, [internal]), - lager:info("Try to start ~p: ~p", [Impl, Ret]), - Ret. - -stop() -> - % vmq_message_store:stop is typically called when stopping the vmq_server - % OTP application. As vmq_plugin_mgr:disable_plugin is likely stopping - % another OTP application too we might block the OTP application - % controller. Wrapping the disable_plugin in its own process would - % enable to stop the involved applications. Moreover, because an - % application:stop is actually a gen_server:call to the application - % controller the order of application termination is still provided. - % Nevertheless, this is of course only a workaround and the problem - % needs to be addressed when reworking the plugin system. - Impl = application:get_env(vmq_server, message_store_impl, vmq_generic_offline_msg_store), - _ = spawn(fun() -> - Ret = vmq_plugin_mgr:disable_plugin(Impl), - lager:info("Try to stop ~p: ~p", [Impl, Ret]) - end), - ok. + supervisor:start_link({local, ?MODULE}, ?MODULE, []). write(SubscriberId, Msg) -> - vmq_util:timed_measurement({?MODULE, write}, ?MODULE, write_with_retry, [ - SubscriberId, Msg, ?NR_OF_RETRIES - ]). -write_with_retry(SubscriberId, Msg, N) when N >= 0 -> - case vmq_plugin:only(msg_store_write, [SubscriberId, Msg]) of - {ok, _Count} -> - ok; - {error, no_matching_hook_found} = ErrRes -> - ErrRes; - {error, Err} -> - lager:error("Error: ~p", [Err]), - vmq_metrics:incr_msg_store_ops_error(write), - timer:sleep(?RETRY_INTERVAL), - write_with_retry(SubscriberId, Msg, N - 1) - end; -write_with_retry(_SubscriberId, _Msg, _N) -> - vmq_metrics:incr_msg_store_retry_exhausted(write), - ok. + vmq_redis:query( + vmq_message_store_redis_client, + ["RPUSH", term_to_binary(SubscriberId), term_to_binary(Msg)], + ?RPUSH, + ?MSG_STORE_WRITE + ). -read(SubscriberId, MsgRef) -> - vmq_util:timed_measurement({?MODULE, read}, ?MODULE, read_with_retry, [ - SubscriberId, MsgRef, ?NR_OF_RETRIES - ]). -read_with_retry(SubscriberId, MsgRef, N) when N >= 0 -> - case vmq_plugin:only(msg_store_read, [SubscriberId, MsgRef]) of - {ok, _} = OkRes -> - OkRes; - {error, no_matching_hook_found} = ErrRes -> - ErrRes; - {error, not_supported} = ErrRes -> - ErrRes; - {error, Err} -> - lager:error("Error: ~p", [Err]), - vmq_metrics:incr_msg_store_ops_error(read), - timer:sleep(?RETRY_INTERVAL), - read_with_retry(SubscriberId, MsgRef, N - 1) - end; -read_with_retry(_SubscriberId, _MsgRef, _N) -> - vmq_metrics:incr_msg_store_retry_exhausted(read), - {error, retry_exhausted}. +read(_SubscriberId, _MsgRef) -> + {error, not_supported}. delete(SubscriberId) -> - vmq_util:timed_measurement({?MODULE, delete_all}, ?MODULE, delete_all_with_retry, [ - SubscriberId, ?NR_OF_RETRIES - ]). -delete_all_with_retry(SubscriberId, N) when N >= 0 -> - case vmq_plugin:only(msg_store_delete, [SubscriberId]) of - {ok, _Count} -> - ok; - {error, no_matching_hook_found} = ErrRes -> - ErrRes; - {error, Err} -> - lager:error("Error: ~p", [Err]), - vmq_metrics:incr_msg_store_ops_error(delete_all), - timer:sleep(?RETRY_INTERVAL), - delete_all_with_retry(SubscriberId, N - 1) - end; -delete_all_with_retry(_SubscriberId, _N) -> - vmq_metrics:incr_msg_store_retry_exhausted(delete_all), - ok. + vmq_redis:query( + vmq_message_store_redis_client, + ["DEL", term_to_binary(SubscriberId)], + ?DEL, + ?MSG_STORE_DELETE + ). -delete(SubscriberId, MsgRef) -> - vmq_util:timed_measurement({?MODULE, delete}, ?MODULE, delete_with_retry, [ - SubscriberId, MsgRef, ?NR_OF_RETRIES - ]). -delete_with_retry(SubscriberId, MsgRef, N) when N >= 0 -> - case vmq_plugin:only(msg_store_delete, [SubscriberId, MsgRef]) of - {ok, _Count} -> - ok; - {error, no_matching_hook_found} = ErrRes -> - ErrRes; - {error, Err} -> - lager:error("Error: ~p", [Err]), - vmq_metrics:incr_msg_store_ops_error(delete), - timer:sleep(?RETRY_INTERVAL), - delete_with_retry(SubscriberId, MsgRef, N - 1) - end; -delete_with_retry(_SId, _MsgRef, _N) -> - vmq_metrics:incr_msg_store_retry_exhausted(delete), - ok. +delete(SubscriberId, _MsgRef) -> + vmq_redis:query( + vmq_message_store_redis_client, + ["LPOP", term_to_binary(SubscriberId), 1], + ?LPOP, + ?MSG_STORE_DELETE + ). find(SubscriberId) -> - vmq_util:timed_measurement({?MODULE, find}, ?MODULE, find_with_retry, [ - SubscriberId, ?NR_OF_RETRIES - ]). -find_with_retry(SubscriberId, N) when N >= 0 -> - case vmq_plugin:only(msg_store_find, [SubscriberId]) of - {ok, _} = OkRes -> - OkRes; - {error, no_matching_hook_found} = ErrRes -> - ErrRes; - {error, Err} -> - lager:error("Error: ~p", [Err]), - vmq_metrics:incr_msg_store_ops_error(find), - timer:sleep(?RETRY_INTERVAL), - find_with_retry(SubscriberId, N - 1) - end; -find_with_retry(_SId, _N) -> - vmq_metrics:incr_msg_store_retry_exhausted(find), - {error, retry_exhausted}. + case + vmq_redis:query( + vmq_message_store_redis_client, + ["LRANGE", term_to_binary(SubscriberId), "0", "-1"], + ?FIND, + ?MSG_STORE_FIND + ) + of + {ok, MsgsInB} -> + DMsgs = lists:foldr( + fun(MsgB, Acc) -> + Msg = binary_to_term(MsgB), + D = #deliver{msg = Msg, qos = Msg#vmq_msg.qos}, + [D | Acc] + end, + [], + MsgsInB + ), + {ok, DMsgs}; + Res -> + Res + end. + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== + +-spec init([]) -> + {'ok', + {{'one_for_one', 5, 10}, [ + {atom(), {atom(), atom(), list()}, permanent, pos_integer(), worker, [atom()]} + ]}}. +init([]) -> + StoreCfgs = application:get_env(vmq_server, message_store, [ + {redis, [ + {connect_options, "[{sentinel, [{endpoints, [{\"localhost\", 26379}]}]},{database,2}]"} + ]} + ]), + Redis = proplists:get_value(redis, StoreCfgs), + Username = + case proplists:get_value(username, Redis, undefined) of + undefined -> undefined; + User when is_atom(User) -> atom_to_list(User) + end, + Password = + case proplists:get_value(password, Redis, undefined) of + undefined -> undefined; + Pass when is_atom(Pass) -> atom_to_list(Pass) + end, + + {ok, + {{one_for_one, 5, 10}, [ + {eredis, + {eredis, start_link, [ + [ + {username, Username}, + {password, Password}, + {name, {local, vmq_message_store_redis_client}} + | vmq_schema_util:parse_list(proplists:get_value(connect_options, Redis)) + ] + ]}, + permanent, 5000, worker, [eredis]} + ]}}. diff --git a/apps/vmq_server/src/vmq_metrics.erl b/apps/vmq_server/src/vmq_metrics.erl index e7376f9cd..54a012562 100644 --- a/apps/vmq_server/src/vmq_metrics.erl +++ b/apps/vmq_server/src/vmq_metrics.erl @@ -84,9 +84,6 @@ incr_router_matches_remote/1, pretimed_measurement/2, - incr_msg_store_ops_error/1, - incr_msg_store_retry_exhausted/1, - incr_redis_cmd/1, incr_redis_cmd_miss/1, incr_redis_cmd_err/1, @@ -303,12 +300,6 @@ incr_router_matches_local(V) -> incr_router_matches_remote(V) -> incr_item(?METRIC_ROUTER_MATCHES_REMOTE, V). -incr_msg_store_ops_error(Op) -> - incr_item({?METRIC_MSG_STORE_OPS_ERRORS, Op}, 1). - -incr_msg_store_retry_exhausted(Op) -> - incr_item({?METRIC_MSG_STORE_RETRY_EXHAUSTED, Op}, 1). - incr_redis_cmd({CMD, OPERATION}) -> incr_item({?REDIS_CMD, CMD, OPERATION}, 1). @@ -992,41 +983,11 @@ internal_defs() -> mqtt5_auth_received_def(), sidecar_events_def(), redis_def(), - msg_store_ops_def(), mqtt_disconnect_def() ], [] ). -msg_store_ops_def() -> - Ops = [ - ?WRITE, - ?DELETE, - ?DELETE_ALL, - ?READ, - ?FIND - ], - [ - m( - counter, - [{operation, rcn_to_str(Op)}], - {?METRIC_MSG_STORE_OPS_ERRORS, Op}, - ?METRIC_MSG_STORE_OPS_ERRORS, - <<"The number of times msg store operation failed.">> - ) - || Op <- Ops - ] ++ - [ - m( - counter, - [{operation, rcn_to_str(Op)}], - {?METRIC_MSG_STORE_RETRY_EXHAUSTED, Op}, - ?METRIC_MSG_STORE_RETRY_EXHAUSTED, - <<"The number of times msg store operation retry exhausted.">> - ) - || Op <- Ops - ]. - redis_def() -> OPERATIONs = [ @@ -2819,16 +2780,6 @@ met2idx({?REDIS_STALE_CMD, ?FCALL, ?ENQUEUE_MSG}) -> 271; met2idx({?REDIS_STALE_CMD, ?FCALL, ?POLL_MAIN_QUEUE}) -> 272; met2idx({?UNAUTH_REDIS_CMD, ?FCALL, ?ENQUEUE_MSG}) -> 273; met2idx({?UNAUTH_REDIS_CMD, ?FCALL, ?POLL_MAIN_QUEUE}) -> 274; -met2idx({?METRIC_MSG_STORE_OPS_ERRORS, ?WRITE}) -> 275; -met2idx({?METRIC_MSG_STORE_OPS_ERRORS, ?DELETE}) -> 276; -met2idx({?METRIC_MSG_STORE_OPS_ERRORS, ?DELETE_ALL}) -> 277; -met2idx({?METRIC_MSG_STORE_OPS_ERRORS, ?READ}) -> 278; -met2idx({?METRIC_MSG_STORE_OPS_ERRORS, ?FIND}) -> 279; -met2idx({?METRIC_MSG_STORE_RETRY_EXHAUSTED, ?WRITE}) -> 280; -met2idx({?METRIC_MSG_STORE_RETRY_EXHAUSTED, ?DELETE}) -> 281; -met2idx({?METRIC_MSG_STORE_RETRY_EXHAUSTED, ?DELETE_ALL}) -> 282; -met2idx({?METRIC_MSG_STORE_RETRY_EXHAUSTED, ?READ}) -> 283; -met2idx({?METRIC_MSG_STORE_RETRY_EXHAUSTED, ?FIND}) -> 284; met2idx({?REDIS_CMD, ?RPUSH, ?MSG_STORE_WRITE}) -> 285; met2idx({?REDIS_CMD, ?DEL, ?MSG_STORE_DELETE}) -> 286; met2idx({?REDIS_CMD, ?FIND, ?MSG_STORE_FIND}) -> 287; diff --git a/apps/vmq_server/src/vmq_queue.erl b/apps/vmq_server/src/vmq_queue.erl index ececccd39..7bf2a3a1a 100644 --- a/apps/vmq_server/src/vmq_queue.erl +++ b/apps/vmq_server/src/vmq_queue.erl @@ -1202,12 +1202,7 @@ prepare_msgs(SId, OQ, Q, QC, N) -> end. maybe_deref(SId, MsgRef) when is_binary(MsgRef) -> - case vmq_message_store:read(SId, MsgRef) of - {ok, #vmq_msg{qos = QoS} = Msg} -> - {ok, #deliver{qos = QoS, msg = Msg}}; - {error, _} = E -> - E - end; + vmq_message_store:read(SId, MsgRef); maybe_deref(_, Msg) -> {ok, Msg}. @@ -1385,12 +1380,6 @@ decompress_queue(_, [], Acc) -> queue:from_list(lists:reverse(Acc)); decompress_queue(SId, [MsgRef | Rest], Acc) when is_binary(MsgRef) -> case vmq_message_store:read(SId, MsgRef) of - {ok, #vmq_msg{qos = QoS} = Msg} -> - decompress_queue( - SId, - Rest, - [#deliver{qos = QoS, msg = Msg#vmq_msg{persisted = false}} | Acc] - ); {error, Reason} -> lager:warning( "can't decompress queue item with msg_ref ~p for subscriber ~p due to ~p", @@ -1438,18 +1427,8 @@ on_message_drop_hook( ]); on_message_drop_hook(SubscriberId, MsgRef, Reason) when is_binary(MsgRef) -> Promise = fun() -> - case vmq_message_store:read(SubscriberId, MsgRef) of - {ok, #vmq_msg{ - routing_key = RoutingKey, - qos = QoS, - payload = Payload, - properties = Props, - acl_name = Name - }} -> - {RoutingKey, QoS, Payload, Props, #matched_acl{name = Name}}; - _ -> - error - end + {error, _} = vmq_message_store:read(SubscriberId, MsgRef), + error end, vmq_plugin:all(on_message_drop, [SubscriberId, Promise, Reason]). diff --git a/apps/vmq_server/src/vmq_server_app.erl b/apps/vmq_server/src/vmq_server_app.erl index 9552fd75c..31073c675 100644 --- a/apps/vmq_server/src/vmq_server_app.erl +++ b/apps/vmq_server/src/vmq_server_app.erl @@ -25,7 +25,7 @@ -spec start(_, _) -> {'error', _} | {'ok', pid()} | {'ok', pid(), _}. start(_StartType, _StartArgs) -> - ok = vmq_message_store:start(), + {ok, _pid} = vmq_message_store:start(), case vmq_server_sup:start_link() of {error, _} = E -> @@ -68,5 +68,4 @@ start_user_plugin( -spec stop(_) -> 'ok'. stop(_State) -> - _ = vmq_message_store:stop(), ok. diff --git a/apps/vmq_server/test/vmq_test_utils.erl b/apps/vmq_server/test/vmq_test_utils.erl index d5bfbb573..6ff39ee3e 100644 --- a/apps/vmq_server/test/vmq_test_utils.erl +++ b/apps/vmq_server/test/vmq_test_utils.erl @@ -69,9 +69,6 @@ disable_all_plugins() -> lists:foreach(fun ({application, vmq_plumtree, _}) -> % don't disable metadata plugin ignore; - ({application, vmq_generic_offline_msg_store, _}) -> - % don't disable message store plugin - ignore; ({application, App, _Hooks}) -> vmq_plugin_mgr:disable_plugin(App); (_ModPlugins) -> diff --git a/rebar.config b/rebar.config index 5c1a804c8..97cb25b99 100644 --- a/rebar.config +++ b/rebar.config @@ -89,8 +89,6 @@ {cuttlefish, load}, {vmq_plumtree, load}, {plumtree, load}, - {vmq_generic_msg_store, load}, - {vmq_generic_offline_msg_store, load}, {vmq_passwd, load}, {vmq_acl, load}, {vmq_bridge, load}, @@ -143,8 +141,6 @@ %% %% Please only use 0[0-9]-*.schema for development purposes {template, "apps/vmq_server/priv/vmq_server.schema", "share/schema/10-vmq_server.schema"}, - {template, "apps/vmq_generic_msg_store/priv/vmq_generic_msg_store.schema", - "share/schema/11-vmq_generic_msg_store.schema"}, {template, "apps/vmq_plugin/priv/vmq_plugin.schema", "share/schema/12-vmq_plugin.schema"}, {template, "apps/vmq_acl/priv/vmq_acl.schema", "share/schema/13-vmq_acl.schema"}, {template, "apps/vmq_passwd/priv/vmq_passwd.schema", "share/schema/14-vmq_passwd.schema"}, @@ -163,8 +159,6 @@ {template, "apps/vmq_pulse/priv/vmq_pulse.schema", "share/schema/21-vmq_pulse.schema"}, {template, "apps/vmq_enhanced_auth/priv/vmq_enhanced_auth.schema", "share/schema/22-vmq_enhanced_auth.schema"}, - {template, "apps/vmq_generic_offline_msg_store/priv/vmq_generic_offline_msg_store.schema", - "share/schema/23-vmq_generic_offline_msg_store.schema"}, {template, "files/vmq.schema", "share/schema/30-vmq.schema"},