Skip to content

Commit

Permalink
Fixes #1698: prevent use-after-free of qd_link_t on session close
Browse files Browse the repository at this point in the history
Previously the detach handler might free the qd_link_t but it was not
consistent. This patch moves the qd_link_t cleanup to the container.
  • Loading branch information
kgiusti committed Dec 13, 2024
1 parent f6f40bd commit ba7af25
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 99 deletions.
33 changes: 13 additions & 20 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1262,19 +1262,15 @@ static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_det

if (rlink) {
//
// This is the last event for this link that we will send into the core. Remove the
// core linkage. Note that the core->qd linkage is still in place.
//
qd_link_set_context(link, 0);
// If this is the second (response) detach or the link hasn't really detached but is being dropped due to parent
// connection/session loss then this is the last proton event that will be generated for this link. The qd_link
// will be freed on return from this call so remove the cross linkage between it and the qdr_link peer.

//
// If the link was lost (due to connection drop), or the linkage from the core
// object is already gone, finish disconnecting the linkage and free the qd_link
// because the core will silently free its own resources.
//
if (dt == QD_LOST || qdr_link_get_context(rlink) == 0) {
// note qdr_link context will be zeroed when the core sends the first detach, so if it is zero then this is
// the second detach!
qd_link_set_context(link, 0);
qdr_link_set_context(rlink, 0);
qd_link_free(link);
}

qdr_error_t *error = qdr_error_from_pn(cond);
Expand Down Expand Up @@ -1949,21 +1945,18 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error
}
}

qd_link_close(qlink);

//
// This is the last event for this link that we are going to send into Proton.
// Remove the core->proton linkage. Note that the proton->core linkage may still
// be intact and needed.
// This is the last event for this link that the core is going to send into Proton so remove the core => adaptor
// linkage. If this is the response attach then there will be no further proton link events to send to the core so
// remove the adaptor => core linkage. If this is the first (request) detach preserve the adaptor => core linkage so
// we can notify the core when the second (response) detach arrives
//
qdr_link_set_context(link, 0);

//
// If this is the second detach, free the qd_link
//
if (!first) {
qd_link_free(qlink);
qd_link_set_context(qlink, 0);
}

qd_link_close(qlink);
}


Expand Down
141 changes: 62 additions & 79 deletions src/adaptors/amqp/container.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ struct qd_link_t {
qd_alloc_safe_ptr_t incoming_msg; // DISPATCH-1690: for cleanup
pn_snd_settle_mode_t remote_snd_settle_mode;
qd_link_ref_list_t ref_list;
bool q2_limit_unbounded;
bool q3_blocked;
DEQ_LINKS_N(Q3, qd_link_t); ///< Q3 blocked links
uint64_t link_id;
bool q2_limit_unbounded;
bool q3_blocked;
bool policy_counted; // has this been counted by policy?
};

ALLOC_DEFINE_SAFE(qd_link_t);
Expand All @@ -83,6 +84,9 @@ struct qd_session_t {
// See qd_session_get_outgoing_capacity() for details.
size_t outgoing_bytes_high_threshold;
size_t outgoing_bytes_low_threshold;

// Has this session been counted by policy? Only remotely initiated sessions can be counted by policy
bool policy_counted;
};


Expand Down Expand Up @@ -148,15 +152,15 @@ static inline qd_session_t *qd_session_from_pn(pn_session_t *pn_ssn)
return (qd_session_t *)pn_session_get_context(pn_ssn);
}

static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link)
static qd_link_t *setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link)
{
qd_link_t *link = new_qd_link_t();
if (!link) {
pn_condition_t *cond = pn_link_condition(pn_link);
pn_condition_set_name(cond, QD_AMQP_COND_INTERNAL_ERROR);
pn_condition_set_description(cond, "Insufficient memory");
pn_link_close(pn_link);
return;
return 0;
}

ZERO(link);
Expand All @@ -174,18 +178,19 @@ static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link)

pn_link_set_context(pn_link, link);
container->ntype->outgoing_handler(container->qd_router, link);
return link;
}


static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link, uint64_t max_size)
static qd_link_t *setup_incoming_link(qd_container_t *container, pn_link_t *pn_link, uint64_t max_size)
{
qd_link_t *link = new_qd_link_t();
if (!link) {
pn_condition_t *cond = pn_link_condition(pn_link);
pn_condition_set_name(cond, QD_AMQP_COND_INTERNAL_ERROR);
pn_condition_set_description(cond, "Insufficient memory");
pn_link_close(pn_link);
return;
return 0;
}

ZERO(link);
Expand All @@ -205,6 +210,7 @@ static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link, u
}
pn_link_set_context(pn_link, link);
container->ntype->incoming_handler(container->qd_router, link);
return link;
}


Expand Down Expand Up @@ -269,26 +275,25 @@ static void notify_closed(qd_container_t *container, qd_connection_t *conn, void
container->ntype->conn_closed_handler(container->qd_router, conn, context);
}


// The given connection has dropped. There will be no further link events for this connection so manually clean up all
// links
static void close_links(qd_container_t *container, pn_connection_t *conn, bool print_log)
{
pn_link_t *pn_link = pn_link_head(conn, 0);
while (pn_link) {
qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link);

if (qd_link && qd_link_get_context(qd_link) == 0) {
pn_link_set_context(pn_link, 0);
pn_link = pn_link_next(pn_link, 0);
qd_link_free(qd_link);
continue;
}
pn_link_t *next_link = pn_link_next(pn_link, 0);

if (qd_link) {
if (print_log)
qd_log(LOG_CONTAINER, QD_LOG_DEBUG, "Aborting link '%s' due to parent connection end",
pn_link_name(pn_link));
container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST);
qd_link_free(qd_link);
}
pn_link = pn_link_next(pn_link, 0);

pn_link = next_link;
}
}

Expand Down Expand Up @@ -472,6 +477,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
qd_session_decref(qd_ssn);
break;
}
qd_ssn->policy_counted = true;
qd_conn->n_sessions++;
}
DEQ_INSERT_TAIL(qd_conn->child_sessions, qd_ssn);
Expand All @@ -490,35 +496,8 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
break;

case PN_SESSION_REMOTE_CLOSE : // fallthrough
case PN_SESSION_LOCAL_CLOSE :
ssn = pn_event_session(event);
for (int i = 0; i < QD_SSN_CLASS_COUNT; ++i) {
if (qd_conn->qd_sessions[i] && ssn == qd_conn->qd_sessions[i]->pn_session) {
qd_session_decref(qd_conn->qd_sessions[i]);
qd_conn->qd_sessions[i] = 0;
break;
}
}
pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
while (pn_link) {
if (pn_link_session(pn_link) == ssn) {
qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link);
if (qd_link)
qd_link->pn_link = 0;
}
pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
}

if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
qd_session_t *qd_ssn = qd_session_from_pn(ssn);
if (qd_ssn) {
DEQ_REMOVE(qd_conn->child_sessions, qd_ssn);
qd_session_decref(qd_ssn);
}
}
break;

case PN_SESSION_REMOTE_CLOSE :
ssn = pn_event_session(event);
for (int i = 0; i < QD_SSN_CLASS_COUNT; ++i) {
if (qd_conn->qd_sessions[i] && ssn == qd_conn->qd_sessions[i]->pn_session) {
Expand All @@ -529,52 +508,43 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {


// remote has nuked our session. Check for any links that were
// Remote has nuked our session. Check for any links that were
// left open and forcibly detach them, since no detaches will
// arrive on this session.
pn_connection_t *conn = pn_session_connection(ssn);

//Sweep thru every pn_link in this connection and a matching session and zero out the
// qd_link->pn_link reference. We do this in order to not miss any pn_links
pn_link = pn_link_head(conn, 0);
while (pn_link) {
pn_link_t *next_link = pn_link_next(pn_link, 0);
if (pn_link_session(pn_link) == ssn) {
qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link);

if ((pn_link_state(pn_link) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE))) {
if (qd_link) {
if (qd_conn->policy_settings) {
if (qd_link->direction == QD_OUTGOING) {
qd_conn->n_receivers--;
assert(qd_conn->n_receivers >= 0);
} else {
qd_conn->n_senders--;
assert(qd_conn->n_senders >= 0);
}
qd_link_t *qd_link = (qd_link_t *) pn_link_get_context(pn_link);
if (qd_link) {
if (qd_link->policy_counted) {
qd_link->policy_counted = false;
if (qd_link->direction == QD_OUTGOING) {
qd_conn->n_receivers--;
assert(qd_conn->n_receivers >= 0);
} else {
qd_conn->n_senders--;
assert(qd_conn->n_senders >= 0);
}
qd_log(LOG_CONTAINER, QD_LOG_DEBUG,
"Aborting link '%s' due to parent session end", pn_link_name(pn_link));
container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST);
}
qd_log(LOG_CONTAINER, QD_LOG_DEBUG,
"Aborting link '%s' due to parent session end", pn_link_name(pn_link));
container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST);
qd_link_free(qd_link);
}

if (qd_link)
qd_link->pn_link = 0;
}
pn_link = pn_link_next(pn_link, 0);

pn_link = next_link;
}
if (qd_conn->policy_settings) {
qd_conn->n_sessions--;
}

pn_session_close(ssn);
}
else if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
qd_session_t *qd_ssn = qd_session_from_pn(ssn);
if (qd_ssn) {
if (qd_ssn->policy_counted) {
qd_ssn->policy_counted = false;
qd_conn->n_sessions--;
assert(qd_conn->n_sessions >= 0);
}
DEQ_REMOVE(qd_conn->child_sessions, qd_ssn);
qd_session_decref(qd_ssn);
}
Expand All @@ -586,22 +556,29 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
pn_link = pn_event_link(event);
if (pn_link_state(pn_link) & PN_LOCAL_UNINIT) {
bool policy_counted = false;
if (pn_link_is_sender(pn_link)) {
if (qd_conn->policy_settings) {
if (!qd_policy_approve_amqp_receiver_link(pn_link, qd_conn)) {
break;
}
qd_conn->n_receivers++;
policy_counted = true;
}
setup_outgoing_link(container, pn_link);
qd_link_t *qd_link = setup_outgoing_link(container, pn_link);
if (qd_link)
qd_link->policy_counted = policy_counted;
} else {
if (qd_conn->policy_settings) {
if (!qd_policy_approve_amqp_sender_link(pn_link, qd_conn)) {
break;
}
qd_conn->n_senders++;
policy_counted = true;
}
setup_incoming_link(container, pn_link, qd_connection_max_message_size(qd_conn));
qd_link_t *qd_link = setup_incoming_link(container, pn_link, qd_connection_max_message_size(qd_conn));
if (qd_link)
qd_link->policy_counted = policy_counted;
}
} else if (pn_link_state(pn_link) & PN_LOCAL_ACTIVE)
handle_link_open(container, pn_link);
Expand All @@ -617,24 +594,29 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
if (qd_link->pn_link == pn_link) {
pn_link_close(pn_link);
}
if (qd_conn->policy_counted && qd_conn->policy_settings) {
if (qd_link->policy_counted) {
qd_link->policy_counted = false;
if (pn_link_is_sender(pn_link)) {
qd_conn->n_receivers--;
qd_log(LOG_CONTAINER, QD_LOG_DEBUG, "Closed receiver link %s. n_receivers: %d",
pn_link_name(pn_link), qd_conn->n_receivers);
assert (qd_conn->n_receivers >= 0);
assert(qd_conn->n_receivers >= 0);
} else {
qd_conn->n_senders--;
qd_log(LOG_CONTAINER, QD_LOG_DEBUG, "Closed sender link %s. n_senders: %d",
pn_link_name(pn_link), qd_conn->n_senders);
assert (qd_conn->n_senders >= 0);
assert(qd_conn->n_senders >= 0);
}
}

container->ntype->link_detach_handler(container->qd_router, qd_link, dt);

if (pn_link_state(pn_link) & PN_LOCAL_CLOSED) {
// link fully closed
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
qd_link_free(qd_link);
}
container->ntype->link_detach_handler(container->qd_router, qd_link, dt);

} else {
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
}
Expand All @@ -645,6 +627,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
pn_link = pn_event_link(event);
if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
qd_link_free((qd_link_t *) pn_link_get_context(pn_link));
}
break;

Expand Down
Loading

0 comments on commit ba7af25

Please sign in to comment.