Skip to content

Commit

Permalink
Improve Stream Management
Browse files Browse the repository at this point in the history
* Delay the notification of the library user that the connection was
  successful, until SM is reported by the server as enabled.
* Clear the SM queue in case resumption failed.
* Improve some debug statements & comments.

Signed-off-by: Steffen Jaeckel <[email protected]>
  • Loading branch information
sjaeckel committed Nov 27, 2023
1 parent aa22dd0 commit 5d151fb
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 57 deletions.
75 changes: 40 additions & 35 deletions src/auth.c
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ static void _auth(xmpp_conn_t *conn)
}
}

static void _auth_success(xmpp_conn_t *conn)
static void _stream_negotiation_success(xmpp_conn_t *conn)
{
tls_clear_password_cache(conn);
conn->stream_negotiation_completed = 1;
Expand Down Expand Up @@ -1097,7 +1097,7 @@ static int
_handle_bind(xmpp_conn_t *conn, xmpp_stanza_t *stanza, void *userdata)
{
const char *type;
xmpp_stanza_t *iq, *enable, *session, *binding, *jid_stanza;
xmpp_stanza_t *iq, *session, *binding, *jid_stanza, *enable = NULL;

UNUSED(userdata);

Expand All @@ -1120,6 +1120,23 @@ _handle_bind(xmpp_conn_t *conn, xmpp_stanza_t *stanza, void *userdata)
}
}

/* send enable directly after the bind request */
if (conn->sm_state->sm_support && !conn->sm_disable) {
enable = xmpp_stanza_new(conn->ctx);
if (!enable) {
disconnect_mem_error(conn);
return 0;
}
xmpp_stanza_set_name(enable, "enable");
xmpp_stanza_set_ns(enable, XMPP_NS_SM);
if (!conn->sm_state->dont_request_resume)
xmpp_stanza_set_attribute(enable, "resume", "true");
handler_add(conn, _handle_sm, XMPP_NS_SM, NULL, NULL, NULL);
send_stanza(conn, enable, XMPP_QUEUE_SM_STROPHE);
conn->sm_state->sm_sent_nr = 0;
conn->sm_state->sm_enabled = 1;
}

/* establish a session if required */
if (conn->session_required) {
/* setup response handlers */
Expand Down Expand Up @@ -1150,22 +1167,12 @@ _handle_bind(xmpp_conn_t *conn, xmpp_stanza_t *stanza, void *userdata)
send_stanza(conn, iq, XMPP_QUEUE_STROPHE);
}

if (conn->sm_state->sm_support && !conn->sm_disable) {
enable = xmpp_stanza_new(conn->ctx);
if (!enable) {
disconnect_mem_error(conn);
return 0;
}
xmpp_stanza_set_name(enable, "enable");
xmpp_stanza_set_ns(enable, XMPP_NS_SM);
if (!conn->sm_state->dont_request_resume)
xmpp_stanza_set_attribute(enable, "resume", "true");
handler_add(conn, _handle_sm, XMPP_NS_SM, NULL, NULL, NULL);
send_stanza(conn, enable, XMPP_QUEUE_SM_STROPHE);
}

if (!conn->session_required) {
_auth_success(conn);
/* if there's no xmpp session required and we didn't try to enable
* stream-management, we're done here and the stream-negotiation was
* successful
*/
if (!conn->session_required && !enable) {
_stream_negotiation_success(conn);
}
} else {
strophe_error(conn->ctx, "xmpp", "Server sent malformed bind reply.");
Expand Down Expand Up @@ -1202,7 +1209,7 @@ _handle_session(xmpp_conn_t *conn, xmpp_stanza_t *stanza, void *userdata)
} else if (type && strcmp(type, "result") == 0) {
strophe_debug(conn->ctx, "xmpp", "Session establishment successful.");

_auth_success(conn);
_stream_negotiation_success(conn);
} else {
strophe_error(conn->ctx, "xmpp",
"Server sent malformed session reply.");
Expand Down Expand Up @@ -1237,7 +1244,7 @@ static int _handle_sm(xmpp_conn_t *const conn,
xmpp_stanza_t *const stanza,
void *const userdata)
{
xmpp_stanza_t *failed_cause;
xmpp_stanza_t *failed_cause, *bind;
const char *name, *id, *previd, *resume, *h, *cause;
xmpp_send_queue_t *e;
unsigned long ul_h = 0;
Expand All @@ -1249,7 +1256,6 @@ static int _handle_sm(xmpp_conn_t *const conn,
goto LBL_ERR;

if (strcmp(name, "enabled") == 0) {
conn->sm_state->sm_enabled = 1;
conn->sm_state->sm_handled_nr = 0;
resume = xmpp_stanza_get_attribute(stanza, "resume");
if (resume && (strcasecmp(resume, "true") || strcmp(resume, "1"))) {
Expand All @@ -1264,6 +1270,7 @@ static int _handle_sm(xmpp_conn_t *const conn,
conn->sm_state->can_resume = 1;
conn->sm_state->id = strophe_strdup(conn->ctx, id);
}
_stream_negotiation_success(conn);
} else if (strcmp(name, "resumed") == 0) {
previd = xmpp_stanza_get_attribute(stanza, "previd");
if (!previd || strcmp(previd, conn->sm_state->previd)) {
Expand Down Expand Up @@ -1293,6 +1300,8 @@ static int _handle_sm(xmpp_conn_t *const conn,
conn->sm_state->sm_sent_nr = ul_h;
while ((e = pop_queue_front(&conn->sm_state->sm_queue))) {
if (e->sm_h >= ul_h) {
strophe_debug_verbose(2, conn->ctx, "conn",
"SM_Q_RESEND: %p, h=%lu", e, e->sm_h);
/* Re-send what was already sent out and is still in the
* SM queue (i.e. it hasn't been ACK'ed by the server)
*/
Expand All @@ -1301,9 +1310,10 @@ static int _handle_sm(xmpp_conn_t *const conn,
strophe_free(conn->ctx, queue_element_free(conn->ctx, e));
}
strophe_debug(conn->ctx, "xmpp", "Session resumed successfully.");
_auth_success(conn);
_stream_negotiation_success(conn);
} else if (strcmp(name, "failed") == 0) {
name = NULL;
conn->sm_state->sm_enabled = 0;

failed_cause =
xmpp_stanza_get_child_by_ns(stanza, XMPP_NS_STANZAS_IETF);
Expand All @@ -1317,22 +1327,17 @@ static int _handle_sm(xmpp_conn_t *const conn,
if (!strcmp(cause, "item-not-found") ||
!strcmp(cause, "feature-not-implemented")) {
if (conn->sm_state->resume) {
conn->sm_state->resume = 0;
conn->sm_state->can_resume = 0;
bind = conn->sm_state->bind;
conn->sm_state->bind = NULL;
clear_sm_state(conn->sm_state);
/* remember that the server reports having support
* for resumption, but actually it doesn't ...
*/
conn->sm_state->dont_request_resume =
!strcmp(cause, "feature-not-implemented");
strophe_free(conn->ctx, conn->sm_state->previd);
conn->sm_state->previd = NULL;
strophe_free(conn->ctx, conn->sm_state->bound_jid);
conn->sm_state->bound_jid = NULL;
_do_bind(conn, conn->sm_state->bind);
conn->sm_state->bind = NULL;
_do_bind(conn, bind);
}
}
conn->sm_state->sm_handled_nr = 0;
} else {
/* unknown stanza received */
name = NULL;
Expand Down Expand Up @@ -1390,7 +1395,7 @@ _handle_legacy(xmpp_conn_t *conn, xmpp_stanza_t *stanza, void *userdata)
/* auth succeeded */
strophe_debug(conn->ctx, "xmpp", "Legacy auth succeeded.");

_auth_success(conn);
_stream_negotiation_success(conn);
} else {
strophe_error(conn->ctx, "xmpp",
"Server sent us a legacy authentication "
Expand Down Expand Up @@ -1580,7 +1585,7 @@ int _handle_component_hs_response(xmpp_conn_t *conn,
xmpp_disconnect(conn);
return XMPP_EINT;
} else {
_auth_success(conn);
_stream_negotiation_success(conn);
}

/* We don't need this handler anymore, return 0 so it can be deleted
Expand All @@ -1602,8 +1607,8 @@ int _handle_missing_handshake(xmpp_conn_t *conn, void *userdata)
void auth_handle_open_raw(xmpp_conn_t *conn)
{
handler_reset_timed(conn, 0);
/* user handlers are not called before authentication is completed. */
_auth_success(conn);
/* user handlers are not called before stream negotiation has completed. */
_stream_negotiation_success(conn);
}

void auth_handle_open_stub(xmpp_conn_t *conn)
Expand Down
4 changes: 3 additions & 1 deletion src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ struct _xmpp_conn_t {
/* stream open handler */
xmpp_open_handler open_handler;

/* user handlers only get called after authentication */
/* user handlers only get called after the stream negotiation has completed
*/
int stream_negotiation_completed;

/* connection events handler */
Expand Down Expand Up @@ -341,6 +342,7 @@ void handler_add(xmpp_conn_t *conn,
void handler_system_delete_all(xmpp_conn_t *conn);

/* utility functions */
void clear_sm_state(xmpp_sm_state_t *sm_state);
void disconnect_mem_error(xmpp_conn_t *conn);

/* auth functions */
Expand Down
42 changes: 26 additions & 16 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -1285,8 +1285,11 @@ static void _reset_sm_state_for_reconnect(xmpp_conn_t *conn)
int xmpp_conn_set_sm_state(xmpp_conn_t *conn, xmpp_sm_state_t *sm_state)
{
/* We can only set the SM state when we're disconnected */
if (conn->state != XMPP_STATE_DISCONNECTED)
if (conn->state != XMPP_STATE_DISCONNECTED) {
strophe_error(conn->ctx, "conn",
"SM state can only be set the when we're disconnected");
return XMPP_EINVOP;
}

if (conn->sm_state) {
strophe_error(conn->ctx, "conn", "SM state is already set!");
Expand All @@ -1305,22 +1308,10 @@ int xmpp_conn_set_sm_state(xmpp_conn_t *conn, xmpp_sm_state_t *sm_state)
return XMPP_EOK;
}

/** c.f. \ref xmpp_conn_get_sm_state for usage documentation
*
* @param sm_state A Stream Management state returned from a call to
* `xmpp_conn_get_sm_state()`
*
* @ingroup Connections
*/
void xmpp_free_sm_state(xmpp_sm_state_t *sm_state)
void clear_sm_state(xmpp_sm_state_t *sm_state)
{
xmpp_ctx_t *ctx;
xmpp_send_queue_t *smq;

if (!sm_state || !sm_state->ctx)
return;

ctx = sm_state->ctx;
xmpp_ctx_t *ctx = sm_state->ctx;

while ((smq = pop_queue_front(&sm_state->sm_queue))) {
strophe_free(ctx, queue_element_free(ctx, smq));
Expand All @@ -1335,6 +1326,24 @@ void xmpp_free_sm_state(xmpp_sm_state_t *sm_state)
if (sm_state->bound_jid)
strophe_free(ctx, sm_state->bound_jid);
memset(sm_state, 0, sizeof(*sm_state));
}

/** c.f. \ref xmpp_conn_get_sm_state for usage documentation
*
* @param sm_state A Stream Management state returned from a call to
* `xmpp_conn_get_sm_state()`
*
* @ingroup Connections
*/
void xmpp_free_sm_state(xmpp_sm_state_t *sm_state)
{
xmpp_ctx_t *ctx;

if (!sm_state || !sm_state->ctx)
return;

ctx = sm_state->ctx;
clear_sm_state(sm_state);
strophe_free(ctx, sm_state);
}

Expand Down Expand Up @@ -1698,7 +1707,8 @@ static void _conn_sm_handle_stanza(xmpp_conn_t *const conn,
while (conn->sm_state->sm_queue.head &&
conn->sm_state->sm_queue.head->sm_h < ul_h) {
e = pop_queue_front(&conn->sm_state->sm_queue);
strophe_debug_verbose(2, conn->ctx, "conn", "SM_Q_DROP: %p", e);
strophe_debug_verbose(2, conn->ctx, "conn",
"SM_Q_DROP: %p, h=%lu", e, e->sm_h);
c = queue_element_free(conn->ctx, e);
strophe_free(conn->ctx, c);
}
Expand Down
3 changes: 2 additions & 1 deletion src/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ void xmpp_run_once(xmpp_ctx_t *ctx, unsigned long timeout)
if (!(tsq->owner & XMPP_QUEUE_SM) && conn->sm_state->sm_enabled) {
tsq->sm_h = conn->sm_state->sm_sent_nr;
conn->sm_state->sm_sent_nr++;
strophe_debug_verbose(1, ctx, "xmpp", "SM_Q_MOVE: %p", tsq);
strophe_debug_verbose(1, ctx, "xmpp", "SM_Q_MOVE: %p, h=%lu",
tsq, tsq->sm_h);
add_queue_back(&conn->sm_state->sm_queue, tsq);
tsq = NULL;
}
Expand Down
8 changes: 4 additions & 4 deletions src/handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void handler_fire_stanza(xmpp_conn_t *conn, xmpp_stanza_t *stanza)

item = head;
while (item) {
/* don't fire user handlers until authentication succeeds and
/* don't fire user handlers until stream negotiation has completed
and skip newly added handlers */
if ((item->user_handler && !conn->stream_negotiation_completed) ||
!item->enabled) {
Expand Down Expand Up @@ -117,7 +117,7 @@ void handler_fire_stanza(xmpp_conn_t *conn, xmpp_stanza_t *stanza)

item = conn->handlers;
while (item) {
/* don't fire user handlers until authentication succeeds and
/* don't fire user handlers until stream negotiation has completed and
skip newly added handlers */
if ((item->user_handler && !conn->stream_negotiation_completed) ||
!item->enabled) {
Expand Down Expand Up @@ -176,8 +176,8 @@ uint64_t handler_fire_timed(xmpp_ctx_t *ctx)

item = conn->timed_handlers;
while (item) {
/* don't fire user handlers until authentication succeeds and
skip newly added handlers */
/* don't fire user handlers until stream negotiation has completed
and skip newly added handlers */
if ((item->user_handler && !conn->stream_negotiation_completed) ||
!item->enabled) {
item = item->next;
Expand Down

0 comments on commit 5d151fb

Please sign in to comment.