diff --git a/src/adaptors/tcp_lite/tcp_lite.c b/src/adaptors/tcp_lite/tcp_lite.c index de5726c67..f5d27a0f8 100644 --- a/src/adaptors/tcp_lite/tcp_lite.c +++ b/src/adaptors/tcp_lite/tcp_lite.c @@ -392,6 +392,7 @@ static void free_connection_IO(void *context) sys_atomic_destroy(&conn->core_activation); sys_atomic_destroy(&conn->raw_opened); qd_timer_free(conn->close_timer); + sys_mutex_free(&conn->activation_lock); free_tcplite_connection_t(conn); } @@ -406,7 +407,11 @@ static void close_raw_connection_XSIDE_IO(tcplite_connection_t *conn) pn_raw_connection_close(conn->raw_conn); drain_read_buffers_XSIDE_IO(conn->raw_conn); drain_write_buffers_XSIDE_IO(conn->raw_conn); + + sys_mutex_lock(&conn->activation_lock); + pn_raw_connection_set_context(conn->raw_conn, 0); conn->raw_conn = 0; + sys_mutex_unlock(&conn->activation_lock); } } } @@ -1020,9 +1025,11 @@ static void handle_outbound_delivery_CSIDE(tcplite_connection_t *conn, qdr_link_ // It is not guaranteed that this function will be called on the proper IO thread. Wake the raw connection for // continued processing in the correct context. // + sys_mutex_lock(&conn->activation_lock); if (IS_ATOMIC_FLAG_SET(&conn->raw_opened)) { pn_raw_connection_wake(conn->raw_conn); } + sys_mutex_unlock(&conn->activation_lock); } @@ -1349,6 +1356,7 @@ void on_accept(qd_adaptor_listener_t *listener, pn_listener_t *pn_listener, void conn->common.context_type = TL_CONNECTION; conn->common.parent = (tcplite_common_t*) li; + sys_mutex_init(&conn->activation_lock); sys_atomic_init(&conn->core_activation, 0); sys_atomic_init(&conn->raw_opened, 1); @@ -1393,10 +1401,12 @@ static void CORE_activate(void *context, qdr_connection_t *core_conn) case TL_CONNECTION: conn = (tcplite_connection_t*) common; + sys_mutex_lock(&conn->activation_lock); if (IS_ATOMIC_FLAG_SET(&conn->raw_opened)) { SET_ATOMIC_FLAG(&conn->core_activation); pn_raw_connection_wake(conn->raw_conn); } + sys_mutex_unlock(&conn->activation_lock); break; } } diff --git a/src/adaptors/tcp_lite/tcp_lite.h b/src/adaptors/tcp_lite/tcp_lite.h index 9ba80c7ff..0f4868435 100644 --- a/src/adaptors/tcp_lite/tcp_lite.h +++ b/src/adaptors/tcp_lite/tcp_lite.h @@ -118,6 +118,7 @@ typedef struct tcplite_connection_t { tcplite_common_t common; DEQ_LINKS(tcplite_connection_t); pn_raw_connection_t *raw_conn; + sys_mutex_t activation_lock; sys_atomic_t core_activation; sys_atomic_t raw_opened; qd_timer_t *close_timer; diff --git a/src/cutthrough_utils.c b/src/cutthrough_utils.c index c6e1b4ec3..fe486c307 100644 --- a/src/cutthrough_utils.c +++ b/src/cutthrough_utils.c @@ -64,8 +64,12 @@ static void activate_connection(qd_message_activation_t *activation, qd_directio case QD_ACTIVATION_TCP: { tcplite_connection_t *conn = safe_deref_tcplite_connection_t(activation->safeptr); - if (!!conn && IS_ATOMIC_FLAG_SET(&conn->raw_opened)) { - pn_raw_connection_wake(conn->raw_conn); + if (!!conn) { + sys_mutex_lock(&conn->activation_lock); + if (IS_ATOMIC_FLAG_SET(&conn->raw_opened)) { + pn_raw_connection_wake(conn->raw_conn); + } + sys_mutex_unlock(&conn->activation_lock); } break; }