Skip to content

Commit

Permalink
feat: add reason parameter in disconnect events (#34)
Browse files Browse the repository at this point in the history
* feat: add reason parameter in disconnect events

* fix : map required disconnect reason to events

* fix : review revisions

* fix : add empty line at the EOF.

* fix : remove package import

* refactor : move PUBLISH_AUTH_ERROR reason mapping to common function

* add metrics for tracking different reasons

* fix : update metric description

* fix : remove duplicate reason mapping

* fix : update reason mapping according to proto and add combination of reason code and name in metric

* refactor : use macros for reason mapping

* fix : test mqtt5_disconnect_sent - tcp_closed not found

* fix : mqtt5 tcp closed and exit signal reason mapping

---------

Co-authored-by: Vivek Pipaliya <[email protected]>
  • Loading branch information
VivekPipaliya23 and Vivek Pipaliya authored Nov 20, 2023
1 parent fbb8134 commit 91599d3
Show file tree
Hide file tree
Showing 36 changed files with 1,548 additions and 252 deletions.
4 changes: 2 additions & 2 deletions apps/vmq_diversity/src/vmq_diversity.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@

{vmq_diversity_plugin, on_offline_message, 5, []},
{vmq_diversity_plugin, on_client_wakeup, 1, []},
{vmq_diversity_plugin, on_client_offline, 1, []},
{vmq_diversity_plugin, on_client_gone, 1, []},
{vmq_diversity_plugin, on_client_offline, 2, []},
{vmq_diversity_plugin, on_client_gone, 2, []},
{vmq_diversity_plugin, on_session_expired, 1, []}
]},
{db_config, [
Expand Down
8 changes: 4 additions & 4 deletions apps/vmq_diversity/src/vmq_diversity_plugin.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
on_deliver/6,
on_offline_message/5,
on_client_wakeup/1,
on_client_offline/1,
on_client_gone/1,
on_client_offline/2,
on_client_gone/2,
on_session_expired/1,
on_delivery_complete/6,
auth_on_register_m5/6,
Expand Down Expand Up @@ -624,15 +624,15 @@ on_client_wakeup(SubscriberId) ->
{client_id, ClientId}
]).

on_client_offline(SubscriberId) ->
on_client_offline(SubscriberId, _) ->
{MP, ClientId} = subscriber_id(SubscriberId),
vmq_diversity_cache:clear_cache(MP, ClientId),
all(on_client_offline, [
{mountpoint, MP},
{client_id, ClientId}
]).

on_client_gone(SubscriberId) ->
on_client_gone(SubscriberId, _) ->
{MP, ClientId} = subscriber_id(SubscriberId),
vmq_diversity_cache:clear_cache(MP, ClientId),
all(on_client_gone, [
Expand Down
3 changes: 2 additions & 1 deletion apps/vmq_diversity/test/vmq_diversity_auth_cache_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ auth_cache_cleanup_test(_) ->
{subscribe, SubscribeAcls}] = vmq_diversity_cache:entries(<<"">>, <<"allowed-subscriber-id">>),
3 = length(PublishAcls),
3 = length(SubscribeAcls),
vmq_plugin:all(on_client_offline, [allowed_subscriber_id()]),
vmq_plugin:all(on_client_offline, [allowed_subscriber_id(), reason()]),
[] = vmq_diversity_cache:entries(<<"">>, <<"allowed-subscriber-id">>).


Expand All @@ -163,3 +163,4 @@ allowed_subscriber_id() ->
username() -> <<"test-user">>.
password() -> <<"test-password">>.
payload() -> <<"hello world">>.
reason() -> normal_disconnect.
5 changes: 3 additions & 2 deletions apps/vmq_diversity/test/vmq_diversity_plugin_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ on_offline_message_test(_) ->
on_client_wakeup_test(_) ->
[next] = vmq_plugin:all(on_client_wakeup, [allowed_subscriber_id()]).
on_client_offline_test(_) ->
[next] = vmq_plugin:all(on_client_offline, [allowed_subscriber_id()]).
[next] = vmq_plugin:all(on_client_offline, [allowed_subscriber_id(), reason()]).
on_client_gone_test(_) ->
[next] = vmq_plugin:all(on_client_gone, [allowed_subscriber_id()]).
[next] = vmq_plugin:all(on_client_gone, [allowed_subscriber_id(), reason()]).
on_session_expired_test(_) ->
[next] = vmq_plugin:all(on_session_expired, [allowed_subscriber_id()]).

Expand Down Expand Up @@ -312,3 +312,4 @@ payload() -> <<"hello world">>.
subopts() ->
#{rap => true,
no_local => false}.
reason() -> normal_disconnect.
1 change: 1 addition & 0 deletions apps/vmq_events_sidecar/include/vmq_events_sidecar.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
%% types
-type event() :: {atom(), integer(), tuple()}.
-type pool_size() :: pos_integer().
-type reason() :: atom().
-endif.
4 changes: 2 additions & 2 deletions apps/vmq_events_sidecar/src/vmq_events_sidecar.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
{vmq_events_sidecar_plugin, on_deliver, 6, []},
{vmq_events_sidecar_plugin, on_offline_message, 5, []},
{vmq_events_sidecar_plugin, on_client_wakeup, 1, []},
{vmq_events_sidecar_plugin, on_client_offline, 1, []},
{vmq_events_sidecar_plugin, on_client_offline, 2, []},
{vmq_events_sidecar_plugin, on_session_expired, 1, []},
{vmq_events_sidecar_plugin, on_delivery_complete, 6, []},
{vmq_events_sidecar_plugin, on_client_gone, 1, []}
{vmq_events_sidecar_plugin, on_client_gone, 2, []}
]}
]},
{modules, []},
Expand Down
10 changes: 6 additions & 4 deletions apps/vmq_events_sidecar/src/vmq_events_sidecar_format.erl
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,24 @@ encode({on_client_wakeup, Timestamp, {MP, ClientId}}) ->
timestamp = convert_timestamp(Timestamp)
})
);
encode({on_client_offline, Timestamp, {MP, ClientId}}) ->
encode({on_client_offline, Timestamp, {MP, ClientId, Reason}}) ->
encode_envelope(
"OnClientOffline",
on_client_offline_pb:encode_msg(#'eventssidecar.v1.OnClientOffline'{
client_id = ClientId,
mountpoint = MP,
timestamp = convert_timestamp(Timestamp)
timestamp = convert_timestamp(Timestamp),
reason = Reason
})
);
encode({on_client_gone, Timestamp, {MP, ClientId}}) ->
encode({on_client_gone, Timestamp, {MP, ClientId, Reason}}) ->
encode_envelope(
"OnClientGone",
on_client_gone_pb:encode_msg(#'eventssidecar.v1.OnClientGone'{
client_id = ClientId,
mountpoint = MP,
timestamp = convert_timestamp(Timestamp)
timestamp = convert_timestamp(Timestamp),
reason = Reason
})
);
encode({on_session_expired, Timestamp, {MP, ClientId}}) ->
Expand Down
16 changes: 8 additions & 8 deletions apps/vmq_events_sidecar/src/vmq_events_sidecar_plugin.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
on_deliver/6,
on_offline_message/5,
on_client_wakeup/1,
on_client_offline/1,
on_client_gone/1,
on_client_offline/2,
on_client_gone/2,
on_session_expired/1,
on_delivery_complete/6
]).
Expand Down Expand Up @@ -257,15 +257,15 @@ on_client_wakeup(SubscriberId) ->
{MP, ClientId} = subscriber_id(SubscriberId),
send_event({on_client_wakeup, {MP, ClientId}}).

-spec on_client_offline(subscriber_id()) -> 'next'.
on_client_offline(SubscriberId) ->
-spec on_client_offline(subscriber_id(), reason()) -> 'next'.
on_client_offline(SubscriberId, Reason) ->
{MP, ClientId} = subscriber_id(SubscriberId),
send_event({on_client_offline, {MP, ClientId}}).
send_event({on_client_offline, {MP, ClientId, Reason}}).

-spec on_client_gone(subscriber_id()) -> 'next'.
on_client_gone(SubscriberId) ->
-spec on_client_gone(subscriber_id(), reason()) -> 'next'.
on_client_gone(SubscriberId, Reason) ->
{MP, ClientId} = subscriber_id(SubscriberId),
send_event({on_client_gone, {MP, ClientId}}).
send_event({on_client_gone, {MP, ClientId, Reason}}).

-spec on_session_expired(subscriber_id()) -> 'next'.
on_session_expired(SubscriberId) ->
Expand Down
4 changes: 2 additions & 2 deletions apps/vmq_events_sidecar/test/events_sidecar_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,13 @@ on_client_wakeup(#'eventssidecar.v1.OnClientWakeUp'{mountpoint = ?MOUNTPOINT_BIN


on_client_offline(#'eventssidecar.v1.OnClientOffline'{mountpoint = ?MOUNTPOINT_BIN,
client_id = BinPid}) ->
client_id = BinPid, reason = ?REASON}) ->
Pid = list_to_pid(binary_to_list(BinPid)),
Pid ! on_client_offline_ok.


on_client_gone(#'eventssidecar.v1.OnClientGone'{mountpoint = ?MOUNTPOINT_BIN,
client_id = BinPid}) ->
client_id = BinPid, reason = ?REASON}) ->
Pid = list_to_pid(binary_to_list(BinPid)),
Pid ! on_client_gone_ok.

Expand Down
4 changes: 2 additions & 2 deletions apps/vmq_events_sidecar/test/vmq_events_sidecar_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,14 @@ on_client_wakeup_test(_) ->
on_client_offline_test(_) ->
enable_hook(on_client_offline),
Self = pid_to_bin(self()),
[ok] = vmq_plugin:all(on_client_offline, [{?MOUNTPOINT, Self}]),
[ok] = vmq_plugin:all(on_client_offline, [{?MOUNTPOINT, Self}, ?REASON]),
ok = exp_response(on_client_offline_ok),
disable_hook(on_client_offline).

on_client_gone_test(_) ->
enable_hook(on_client_gone),
Self = pid_to_bin(self()),
[ok] = vmq_plugin:all(on_client_gone, [{?MOUNTPOINT, Self}]),
[ok] = vmq_plugin:all(on_client_gone, [{?MOUNTPOINT, Self}, ?REASON]),
ok = exp_response(on_client_gone_ok),
disable_hook(on_client_gone).

Expand Down
1 change: 1 addition & 0 deletions apps/vmq_events_sidecar/test/vmq_events_sidecar_test.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
-define(MOUNTPOINT_BIN, <<"mountpoint">>).
-define(TOPIC, <<"test/topic">>).
-define(PAYLOAD, <<"hello world">>).
-define(REASON, 'REASON_NORMAL_DISCONNECT').

10 changes: 10 additions & 0 deletions apps/vmq_proto/include/disconnect_reason_pb.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
%% -*- coding: utf-8 -*-
%% Automatically generated, do not edit
%% Generated by gpb_compile version 4.20.0

-ifndef(disconnect_reason_pb).
-define(disconnect_reason_pb, true).

-define(disconnect_reason_pb_gpb_version, "4.20.0").

-endif.
28 changes: 25 additions & 3 deletions apps/vmq_proto/include/on_client_gone_pb.hrl
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
%% -*- coding: utf-8 -*-
%% Automatically generated, do not edit
%% Generated by gpb_compile version 4.19.1
%% Generated by gpb_compile version 4.20.0

-ifndef(on_client_gone_pb).
-define(on_client_gone_pb, true).

-define(on_client_gone_pb_gpb_version, "4.19.1").
-define(on_client_gone_pb_gpb_version, "4.20.0").

-ifndef('EVENTSSIDECAR.V1.ONCLIENTGONE_PB_H').
-define('EVENTSSIDECAR.V1.ONCLIENTGONE_PB_H', true).
Expand All @@ -16,7 +16,29 @@
% = 2, optional
client_id = <<>> :: unicode:chardata() | undefined,
% = 3, optional
mountpoint = <<>> :: unicode:chardata() | undefined
mountpoint = <<>> :: unicode:chardata() | undefined,
% = 4, optional, enum eventssidecar.v1.Reason
reason = 'REASON_UNSPECIFIED' ::
'REASON_UNSPECIFIED'
| 'REASON_NOT_AUTHORIZED'
| 'REASON_NORMAL_DISCONNECT'
| 'REASON_SESSION_TAKEN_OVER'
| 'REASON_ADMINISTRATIVE_ACTION'
| 'REASON_DISCONNECT_KEEP_ALIVE'
| 'REASON_DISCONNECT_MIGRATION'
| 'REASON_BAD_AUTHENTICATION_METHOD'
| 'REASON_REMOTE_SESSION_TAKEN_OVER'
| 'REASON_MQTT_CLIENT_DISCONNECT'
| 'REASON_RECEIVE_MAX_EXCEEDED'
| 'REASON_PROTOCOL_ERROR'
| 'REASON_PUBLISH_AUTH_ERROR'
| 'REASON_INVALID_PUBREC_ERROR'
| 'REASON_INVALID_PUBCOMP_ERROR'
| 'REASON_UNEXPECTED_FRAME_TYPE'
| 'REASON_EXIT_SIGNAL_RECEIVED'
| 'REASON_TCP_CLOSED'
| integer()
| undefined
}
).
-endif.
Expand Down
28 changes: 25 additions & 3 deletions apps/vmq_proto/include/on_client_offline_pb.hrl
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
%% -*- coding: utf-8 -*-
%% Automatically generated, do not edit
%% Generated by gpb_compile version 4.19.1
%% Generated by gpb_compile version 4.20.0

-ifndef(on_client_offline_pb).
-define(on_client_offline_pb, true).

-define(on_client_offline_pb_gpb_version, "4.19.1").
-define(on_client_offline_pb_gpb_version, "4.20.0").

-ifndef('EVENTSSIDECAR.V1.ONCLIENTOFFLINE_PB_H').
-define('EVENTSSIDECAR.V1.ONCLIENTOFFLINE_PB_H', true).
Expand All @@ -16,7 +16,29 @@
% = 2, optional
client_id = <<>> :: unicode:chardata() | undefined,
% = 3, optional
mountpoint = <<>> :: unicode:chardata() | undefined
mountpoint = <<>> :: unicode:chardata() | undefined,
% = 4, optional, enum eventssidecar.v1.Reason
reason = 'REASON_UNSPECIFIED' ::
'REASON_UNSPECIFIED'
| 'REASON_NOT_AUTHORIZED'
| 'REASON_NORMAL_DISCONNECT'
| 'REASON_SESSION_TAKEN_OVER'
| 'REASON_ADMINISTRATIVE_ACTION'
| 'REASON_DISCONNECT_KEEP_ALIVE'
| 'REASON_DISCONNECT_MIGRATION'
| 'REASON_BAD_AUTHENTICATION_METHOD'
| 'REASON_REMOTE_SESSION_TAKEN_OVER'
| 'REASON_MQTT_CLIENT_DISCONNECT'
| 'REASON_RECEIVE_MAX_EXCEEDED'
| 'REASON_PROTOCOL_ERROR'
| 'REASON_PUBLISH_AUTH_ERROR'
| 'REASON_INVALID_PUBREC_ERROR'
| 'REASON_INVALID_PUBCOMP_ERROR'
| 'REASON_UNEXPECTED_FRAME_TYPE'
| 'REASON_EXIT_SIGNAL_RECEIVED'
| 'REASON_TCP_CLOSED'
| integer()
| undefined
}
).
-endif.
Expand Down
24 changes: 24 additions & 0 deletions apps/vmq_proto/proto/disconnect_reason.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
syntax = "proto3";

package eventssidecar.v1;

enum Reason {
REASON_UNSPECIFIED = 0;
REASON_NOT_AUTHORIZED = 1;
REASON_NORMAL_DISCONNECT = 2;
REASON_SESSION_TAKEN_OVER = 3;
REASON_ADMINISTRATIVE_ACTION = 4;
REASON_DISCONNECT_KEEP_ALIVE = 5;
REASON_DISCONNECT_MIGRATION = 6;
REASON_BAD_AUTHENTICATION_METHOD = 7;
REASON_REMOTE_SESSION_TAKEN_OVER = 8;
REASON_MQTT_CLIENT_DISCONNECT = 9;
REASON_RECEIVE_MAX_EXCEEDED = 10;
REASON_PROTOCOL_ERROR = 11;
REASON_PUBLISH_AUTH_ERROR = 12;
REASON_INVALID_PUBREC_ERROR = 13;
REASON_INVALID_PUBCOMP_ERROR = 14;
REASON_UNEXPECTED_FRAME_TYPE = 15;
REASON_EXIT_SIGNAL_RECEIVED = 16;
REASON_TCP_CLOSED = 17;
}
4 changes: 2 additions & 2 deletions apps/vmq_proto/proto/on_client_gone.proto
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
syntax = "proto3";

import "google/protobuf/timestamp.proto";

option go_package = "source.golabs.io/courier/apis-go/eventssidecar/v1";
import "disconnect_reason.proto";

package eventssidecar.v1;

message OnClientGone {
google.protobuf.Timestamp timestamp = 1;
string client_id = 2;
string mountpoint = 3;
eventssidecar.v1.Reason reason = 4;
}
4 changes: 2 additions & 2 deletions apps/vmq_proto/proto/on_client_offline.proto
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
syntax = "proto3";

import "google/protobuf/timestamp.proto";

option go_package = "source.golabs.io/courier/apis-go/eventssidecar/v1";
import "disconnect_reason.proto";

package eventssidecar.v1;

message OnClientOffline {
google.protobuf.Timestamp timestamp = 1;
string client_id = 2;
string mountpoint = 3;
eventssidecar.v1.Reason reason = 4;
}
Loading

0 comments on commit 91599d3

Please sign in to comment.