diff --git a/src/adaptors/amqp/amqp_adaptor.c b/src/adaptors/amqp/amqp_adaptor.c index b6ae307bf..2a78c013d 100644 --- a/src/adaptors/amqp/amqp_adaptor.c +++ b/src/adaptors/amqp/amqp_adaptor.c @@ -1262,19 +1262,15 @@ static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_det if (rlink) { // - // This is the last event for this link that we will send into the core. Remove the - // core linkage. Note that the core->qd linkage is still in place. - // - qd_link_set_context(link, 0); + // If this is the second (response) detach or the link hasn't really detached but is being dropped due to parent + // connection/session loss then this is the last proton event that will be generated for this link. The qd_link + // will be freed on return from this call so remove the cross linkage between it and the qdr_link peer. - // - // If the link was lost (due to connection drop), or the linkage from the core - // object is already gone, finish disconnecting the linkage and free the qd_link - // because the core will silently free its own resources. - // if (dt == QD_LOST || qdr_link_get_context(rlink) == 0) { + // note qdr_link context will be zeroed when the core sends the first detach, so if it is zero then this is + // the second detach! + qd_link_set_context(link, 0); qdr_link_set_context(rlink, 0); - qd_link_free(link); } qdr_error_t *error = qdr_error_from_pn(cond); @@ -1949,21 +1945,18 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error } } - qd_link_close(qlink); - // - // This is the last event for this link that we are going to send into Proton. - // Remove the core->proton linkage. Note that the proton->core linkage may still - // be intact and needed. + // This is the last event for this link that the core is going to send into Proton so remove the core => adaptor + // linkage. If this is the response attach then there will be no further proton link events to send to the core so + // remove the adaptor => core linkage. If this is the first (request) detach preserve the adaptor => core linkage so + // we can notify the core when the second (response) detach arrives // qdr_link_set_context(link, 0); - - // - // If this is the second detach, free the qd_link - // if (!first) { - qd_link_free(qlink); + qd_link_set_context(qlink, 0); } + + qd_link_close(qlink); } diff --git a/src/adaptors/amqp/container.c b/src/adaptors/amqp/container.c index cfb0ffe17..e18824496 100644 --- a/src/adaptors/amqp/container.c +++ b/src/adaptors/amqp/container.c @@ -54,10 +54,11 @@ struct qd_link_t { qd_alloc_safe_ptr_t incoming_msg; // DISPATCH-1690: for cleanup pn_snd_settle_mode_t remote_snd_settle_mode; qd_link_ref_list_t ref_list; - bool q2_limit_unbounded; - bool q3_blocked; DEQ_LINKS_N(Q3, qd_link_t); ///< Q3 blocked links uint64_t link_id; + bool q2_limit_unbounded; + bool q3_blocked; + bool policy_counted; // has this been counted by policy? }; ALLOC_DEFINE_SAFE(qd_link_t); @@ -83,6 +84,9 @@ struct qd_session_t { // See qd_session_get_outgoing_capacity() for details. size_t outgoing_bytes_high_threshold; size_t outgoing_bytes_low_threshold; + + // Has this session been counted by policy? Only remotely initiated sessions can be counted by policy + bool policy_counted; }; @@ -148,7 +152,7 @@ static inline qd_session_t *qd_session_from_pn(pn_session_t *pn_ssn) return (qd_session_t *)pn_session_get_context(pn_ssn); } -static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link) +static qd_link_t *setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link) { qd_link_t *link = new_qd_link_t(); if (!link) { @@ -156,7 +160,7 @@ static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link) pn_condition_set_name(cond, QD_AMQP_COND_INTERNAL_ERROR); pn_condition_set_description(cond, "Insufficient memory"); pn_link_close(pn_link); - return; + return 0; } ZERO(link); @@ -174,10 +178,11 @@ static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link) pn_link_set_context(pn_link, link); container->ntype->outgoing_handler(container->qd_router, link); + return link; } -static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link, uint64_t max_size) +static qd_link_t *setup_incoming_link(qd_container_t *container, pn_link_t *pn_link, uint64_t max_size) { qd_link_t *link = new_qd_link_t(); if (!link) { @@ -185,7 +190,7 @@ static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link, u pn_condition_set_name(cond, QD_AMQP_COND_INTERNAL_ERROR); pn_condition_set_description(cond, "Insufficient memory"); pn_link_close(pn_link); - return; + return 0; } ZERO(link); @@ -205,6 +210,7 @@ static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link, u } pn_link_set_context(pn_link, link); container->ntype->incoming_handler(container->qd_router, link); + return link; } @@ -269,26 +275,25 @@ static void notify_closed(qd_container_t *container, qd_connection_t *conn, void container->ntype->conn_closed_handler(container->qd_router, conn, context); } + +// The given connection has dropped. There will be no further link events for this connection so manually clean up all +// links static void close_links(qd_container_t *container, pn_connection_t *conn, bool print_log) { pn_link_t *pn_link = pn_link_head(conn, 0); while (pn_link) { qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link); - - if (qd_link && qd_link_get_context(qd_link) == 0) { - pn_link_set_context(pn_link, 0); - pn_link = pn_link_next(pn_link, 0); - qd_link_free(qd_link); - continue; - } + pn_link_t *next_link = pn_link_next(pn_link, 0); if (qd_link) { if (print_log) qd_log(LOG_CONTAINER, QD_LOG_DEBUG, "Aborting link '%s' due to parent connection end", pn_link_name(pn_link)); container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST); + qd_link_free(qd_link); } - pn_link = pn_link_next(pn_link, 0); + + pn_link = next_link; } } @@ -472,6 +477,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, qd_session_decref(qd_ssn); break; } + qd_ssn->policy_counted = true; qd_conn->n_sessions++; } DEQ_INSERT_TAIL(qd_conn->child_sessions, qd_ssn); @@ -490,35 +496,8 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, } break; + case PN_SESSION_REMOTE_CLOSE : // fallthrough case PN_SESSION_LOCAL_CLOSE : - ssn = pn_event_session(event); - for (int i = 0; i < QD_SSN_CLASS_COUNT; ++i) { - if (qd_conn->qd_sessions[i] && ssn == qd_conn->qd_sessions[i]->pn_session) { - qd_session_decref(qd_conn->qd_sessions[i]); - qd_conn->qd_sessions[i] = 0; - break; - } - } - pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); - while (pn_link) { - if (pn_link_session(pn_link) == ssn) { - qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link); - if (qd_link) - qd_link->pn_link = 0; - } - pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); - } - - if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { - qd_session_t *qd_ssn = qd_session_from_pn(ssn); - if (qd_ssn) { - DEQ_REMOVE(qd_conn->child_sessions, qd_ssn); - qd_session_decref(qd_ssn); - } - } - break; - - case PN_SESSION_REMOTE_CLOSE : ssn = pn_event_session(event); for (int i = 0; i < QD_SSN_CLASS_COUNT; ++i) { if (qd_conn->qd_sessions[i] && ssn == qd_conn->qd_sessions[i]->pn_session) { @@ -529,52 +508,43 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, } if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) { if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) { - - - // remote has nuked our session. Check for any links that were + // Remote has nuked our session. Check for any links that were // left open and forcibly detach them, since no detaches will // arrive on this session. - pn_connection_t *conn = pn_session_connection(ssn); - - //Sweep thru every pn_link in this connection and a matching session and zero out the - // qd_link->pn_link reference. We do this in order to not miss any pn_links pn_link = pn_link_head(conn, 0); while (pn_link) { + pn_link_t *next_link = pn_link_next(pn_link, 0); if (pn_link_session(pn_link) == ssn) { - qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link); - - if ((pn_link_state(pn_link) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE))) { - if (qd_link) { - if (qd_conn->policy_settings) { - if (qd_link->direction == QD_OUTGOING) { - qd_conn->n_receivers--; - assert(qd_conn->n_receivers >= 0); - } else { - qd_conn->n_senders--; - assert(qd_conn->n_senders >= 0); - } + qd_link_t *qd_link = (qd_link_t *) pn_link_get_context(pn_link); + if (qd_link) { + if (qd_link->policy_counted) { + qd_link->policy_counted = false; + if (qd_link->direction == QD_OUTGOING) { + qd_conn->n_receivers--; + assert(qd_conn->n_receivers >= 0); + } else { + qd_conn->n_senders--; + assert(qd_conn->n_senders >= 0); } - qd_log(LOG_CONTAINER, QD_LOG_DEBUG, - "Aborting link '%s' due to parent session end", pn_link_name(pn_link)); - container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST); } + qd_log(LOG_CONTAINER, QD_LOG_DEBUG, + "Aborting link '%s' due to parent session end", pn_link_name(pn_link)); + container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST); + qd_link_free(qd_link); } - - if (qd_link) - qd_link->pn_link = 0; } - pn_link = pn_link_next(pn_link, 0); - + pn_link = next_link; } - if (qd_conn->policy_settings) { - qd_conn->n_sessions--; - } - pn_session_close(ssn); } else if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { qd_session_t *qd_ssn = qd_session_from_pn(ssn); if (qd_ssn) { + if (qd_ssn->policy_counted) { + qd_ssn->policy_counted = false; + qd_conn->n_sessions--; + assert(qd_conn->n_sessions >= 0); + } DEQ_REMOVE(qd_conn->child_sessions, qd_ssn); qd_session_decref(qd_ssn); } @@ -586,22 +556,29 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) { pn_link = pn_event_link(event); if (pn_link_state(pn_link) & PN_LOCAL_UNINIT) { + bool policy_counted = false; if (pn_link_is_sender(pn_link)) { if (qd_conn->policy_settings) { if (!qd_policy_approve_amqp_receiver_link(pn_link, qd_conn)) { break; } qd_conn->n_receivers++; + policy_counted = true; } - setup_outgoing_link(container, pn_link); + qd_link_t *qd_link = setup_outgoing_link(container, pn_link); + if (qd_link) + qd_link->policy_counted = policy_counted; } else { if (qd_conn->policy_settings) { if (!qd_policy_approve_amqp_sender_link(pn_link, qd_conn)) { break; } qd_conn->n_senders++; + policy_counted = true; } - setup_incoming_link(container, pn_link, qd_connection_max_message_size(qd_conn)); + qd_link_t *qd_link = setup_incoming_link(container, pn_link, qd_connection_max_message_size(qd_conn)); + if (qd_link) + qd_link->policy_counted = policy_counted; } } else if (pn_link_state(pn_link) & PN_LOCAL_ACTIVE) handle_link_open(container, pn_link); @@ -617,24 +594,29 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, if (qd_link->pn_link == pn_link) { pn_link_close(pn_link); } - if (qd_conn->policy_counted && qd_conn->policy_settings) { + if (qd_link->policy_counted) { + qd_link->policy_counted = false; if (pn_link_is_sender(pn_link)) { qd_conn->n_receivers--; qd_log(LOG_CONTAINER, QD_LOG_DEBUG, "Closed receiver link %s. n_receivers: %d", pn_link_name(pn_link), qd_conn->n_receivers); - assert (qd_conn->n_receivers >= 0); + assert(qd_conn->n_receivers >= 0); } else { qd_conn->n_senders--; qd_log(LOG_CONTAINER, QD_LOG_DEBUG, "Closed sender link %s. n_senders: %d", pn_link_name(pn_link), qd_conn->n_senders); - assert (qd_conn->n_senders >= 0); + assert(qd_conn->n_senders >= 0); } } + container->ntype->link_detach_handler(container->qd_router, qd_link, dt); + if (pn_link_state(pn_link) & PN_LOCAL_CLOSED) { + // link fully closed add_link_to_free_list(&qd_conn->free_link_list, pn_link); + qd_link_free(qd_link); } - container->ntype->link_detach_handler(container->qd_router, qd_link, dt); + } else { add_link_to_free_list(&qd_conn->free_link_list, pn_link); } @@ -645,6 +627,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, pn_link = pn_event_link(event); if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { add_link_to_free_list(&qd_conn->free_link_list, pn_link); + qd_link_free((qd_link_t *) pn_link_get_context(pn_link)); } break; diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py index ad7a99eb4..ca8c44d01 100644 --- a/tests/system_tests_one_router.py +++ b/tests/system_tests_one_router.py @@ -1093,6 +1093,14 @@ def test_51_unsupported_link_reattach(self): test.run() self.assertIsNone(test.error) + def test_52_amqp_session_flap_test(self): + """ + Test creating and tearing down active sessions + """ + test = AMQPSessionFlapTest(self.address) + test.run() + self.assertIsNone(test.error) + class Entity: def __init__(self, status_code, status_description, attrs): @@ -3362,6 +3370,95 @@ def run(self): Container(self).run() +class AMQPSessionFlapTest(MessagingHandler): + """ + This test stresses the creation and deletion of AMQP Sessions. + It repeatedly creates sessions then tears them down while links are + actively sending messages + """ + def __init__(self, router_address): + super(AMQPSessionFlapTest, self).__init__() + self.router_address = router_address + self.target = "session/flap/test" + self.error = None + self.rx_conn = None + self.receiver = None + self.tx_conn = None + self.tx_session = None + + # repeat using a new session tx_session_limit iterations + self.tx_session_limit = 10 + self.tx_session_count = 0 + + # repeat using a new link tx_link_limit iterations + self.tx_link_limit = 100 + self.tx_links = [] + + # count number of messages in flight + self.tx_messages = 0 + self.timer = None + + def done(self, error=None): + self.error = error + if self.timer: + self.timer.cancel() + if self.rx_conn: + self.rx_conn.close() + if self.tx_conn: + self.tx_conn.close() + + def timeout(self): + self.done(error="Test timed out") + + def on_start(self, event): + self.rx_conn = event.container.connect(self.router_address) + self.receiver = event.container.create_receiver(self.rx_conn, self.target) + self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) + + def on_link_opened(self, event): + if event.receiver: + assert self.tx_conn is None + self.tx_conn = event.container.connect(self.router_address) + self.tx_session = self.tx_conn.session() + # open initial session + self.tx_session.open() + + def on_session_opened(self, event): + if event.session == self.tx_session: + self.tx_messages = 0 + for index in range(self.tx_link_limit): + link = self.tx_session.sender(name=f"Link-{self.tx_session_count}-{index}") + link.target.address = self.target + link.open() + self.tx_links.append(link) + + def on_link_flow(self, event): + if event.sender and event.sender in self.tx_links: + sender = event.sender + if sender.current is None and sender.credit > 0: + if self.tx_messages < self.tx_link_limit: + sender.delivery(sender.delivery_tag()) + sender.stream(b'Mary had a little french bulldog') + self.tx_messages += 1 + if self.tx_messages >= self.tx_link_limit: + self.tx_session.close() + + def on_session_closed(self, event): + if event.session == self.tx_session: + self.tx_session_count += 1 + self.tx_links.clear() + if self.tx_session_count < self.tx_session_limit: + # repeat with new session + self.tx_session = self.tx_conn.session() + self.tx_session.open() + else: + # done + self.done() + + def run(self): + Container(self).run() + + class DataConnectionCountTest(TestCase): """ Start the router with different numbers of worker threads and make sure