From 74c2184dc4daf55ac82af0a51c6a3dce77c99825 Mon Sep 17 00:00:00 2001 From: Ganesh Murthy Date: Mon, 16 Oct 2023 11:55:02 -0400 Subject: [PATCH 1/2] Fixes #1255: Introduce an activation lock to protect the raw connection from being activated when it is being torn down --- src/adaptors/tcp_lite/tcp_lite.c | 10 ++++++++++ src/adaptors/tcp_lite/tcp_lite.h | 1 + src/cutthrough_utils.c | 2 ++ 3 files changed, 13 insertions(+) diff --git a/src/adaptors/tcp_lite/tcp_lite.c b/src/adaptors/tcp_lite/tcp_lite.c index de5726c67..f5d27a0f8 100644 --- a/src/adaptors/tcp_lite/tcp_lite.c +++ b/src/adaptors/tcp_lite/tcp_lite.c @@ -392,6 +392,7 @@ static void free_connection_IO(void *context) sys_atomic_destroy(&conn->core_activation); sys_atomic_destroy(&conn->raw_opened); qd_timer_free(conn->close_timer); + sys_mutex_free(&conn->activation_lock); free_tcplite_connection_t(conn); } @@ -406,7 +407,11 @@ static void close_raw_connection_XSIDE_IO(tcplite_connection_t *conn) pn_raw_connection_close(conn->raw_conn); drain_read_buffers_XSIDE_IO(conn->raw_conn); drain_write_buffers_XSIDE_IO(conn->raw_conn); + + sys_mutex_lock(&conn->activation_lock); + pn_raw_connection_set_context(conn->raw_conn, 0); conn->raw_conn = 0; + sys_mutex_unlock(&conn->activation_lock); } } } @@ -1020,9 +1025,11 @@ static void handle_outbound_delivery_CSIDE(tcplite_connection_t *conn, qdr_link_ // It is not guaranteed that this function will be called on the proper IO thread. Wake the raw connection for // continued processing in the correct context. // + sys_mutex_lock(&conn->activation_lock); if (IS_ATOMIC_FLAG_SET(&conn->raw_opened)) { pn_raw_connection_wake(conn->raw_conn); } + sys_mutex_unlock(&conn->activation_lock); } @@ -1349,6 +1356,7 @@ void on_accept(qd_adaptor_listener_t *listener, pn_listener_t *pn_listener, void conn->common.context_type = TL_CONNECTION; conn->common.parent = (tcplite_common_t*) li; + sys_mutex_init(&conn->activation_lock); sys_atomic_init(&conn->core_activation, 0); sys_atomic_init(&conn->raw_opened, 1); @@ -1393,10 +1401,12 @@ static void CORE_activate(void *context, qdr_connection_t *core_conn) case TL_CONNECTION: conn = (tcplite_connection_t*) common; + sys_mutex_lock(&conn->activation_lock); if (IS_ATOMIC_FLAG_SET(&conn->raw_opened)) { SET_ATOMIC_FLAG(&conn->core_activation); pn_raw_connection_wake(conn->raw_conn); } + sys_mutex_unlock(&conn->activation_lock); break; } } diff --git a/src/adaptors/tcp_lite/tcp_lite.h b/src/adaptors/tcp_lite/tcp_lite.h index 9ba80c7ff..0f4868435 100644 --- a/src/adaptors/tcp_lite/tcp_lite.h +++ b/src/adaptors/tcp_lite/tcp_lite.h @@ -118,6 +118,7 @@ typedef struct tcplite_connection_t { tcplite_common_t common; DEQ_LINKS(tcplite_connection_t); pn_raw_connection_t *raw_conn; + sys_mutex_t activation_lock; sys_atomic_t core_activation; sys_atomic_t raw_opened; qd_timer_t *close_timer; diff --git a/src/cutthrough_utils.c b/src/cutthrough_utils.c index c6e1b4ec3..d88bb382f 100644 --- a/src/cutthrough_utils.c +++ b/src/cutthrough_utils.c @@ -64,9 +64,11 @@ static void activate_connection(qd_message_activation_t *activation, qd_directio case QD_ACTIVATION_TCP: { tcplite_connection_t *conn = safe_deref_tcplite_connection_t(activation->safeptr); + sys_mutex_lock(&conn->activation_lock); if (!!conn && IS_ATOMIC_FLAG_SET(&conn->raw_opened)) { pn_raw_connection_wake(conn->raw_conn); } + sys_mutex_unlock(&conn->activation_lock); break; } } From 9607aa5a0685972327ba5ca8e55bc3ab1949e28b Mon Sep 17 00:00:00 2001 From: Ganesh Murthy Date: Mon, 23 Oct 2023 11:27:51 -0400 Subject: [PATCH 2/2] Moved the connection check before locking using activation lock in src/cutthrough_utils.c --- src/cutthrough_utils.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/cutthrough_utils.c b/src/cutthrough_utils.c index d88bb382f..fe486c307 100644 --- a/src/cutthrough_utils.c +++ b/src/cutthrough_utils.c @@ -64,11 +64,13 @@ static void activate_connection(qd_message_activation_t *activation, qd_directio case QD_ACTIVATION_TCP: { tcplite_connection_t *conn = safe_deref_tcplite_connection_t(activation->safeptr); - sys_mutex_lock(&conn->activation_lock); - if (!!conn && IS_ATOMIC_FLAG_SET(&conn->raw_opened)) { - pn_raw_connection_wake(conn->raw_conn); + if (!!conn) { + sys_mutex_lock(&conn->activation_lock); + if (IS_ATOMIC_FLAG_SET(&conn->raw_opened)) { + pn_raw_connection_wake(conn->raw_conn); + } + sys_mutex_unlock(&conn->activation_lock); } - sys_mutex_unlock(&conn->activation_lock); break; } }