From 86ef5dc8542f6ab22114fef55c834c4b08db8cf5 Mon Sep 17 00:00:00 2001 From: Ken Giusti Date: Wed, 1 Nov 2023 16:00:04 -0400 Subject: [PATCH] Fixes #1136: Add test for mismatched TCP adaptor encapsulation (#1279) --- src/adaptors/tcp_lite/tcp_lite.c | 79 +++++++++++++++++++++-- tests/system_tests_tcp_adaptor.py | 103 ++++++++++++++++++++++++++++-- 2 files changed, 171 insertions(+), 11 deletions(-) diff --git a/src/adaptors/tcp_lite/tcp_lite.c b/src/adaptors/tcp_lite/tcp_lite.c index f5d27a0f8..43cd5b96e 100644 --- a/src/adaptors/tcp_lite/tcp_lite.c +++ b/src/adaptors/tcp_lite/tcp_lite.c @@ -92,6 +92,7 @@ static void on_connection_event_CSIDE_IO(pn_event_t *e, qd_server_t *qd_server, static void connection_run_LSIDE_IO(tcplite_connection_t *conn); static void connection_run_CSIDE_IO(tcplite_connection_t *conn); static void connection_run_XSIDE_IO(tcplite_connection_t *conn); +static uint64_t validate_outbound_message(const qdr_delivery_t *out_dlv); //================================================================================= @@ -917,14 +918,29 @@ static void extract_metadata_from_stream_CSIDE(tcplite_connection_t *conn) } } - -static void handle_outbound_delivery_LSIDE_IO(tcplite_connection_t *conn, qdr_link_t *link, qdr_delivery_t *delivery) +// Handle delivery of outbound message to the client. +// +// @return 0 on success, otherwise a terminal outcome indicating that the message cannot be delivered. +// +static uint64_t handle_outbound_delivery_LSIDE_IO(tcplite_connection_t *conn, qdr_link_t *link, qdr_delivery_t *delivery) { ASSERT_RAW_IO; qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] handle_outbound_delivery_LSIDE_IO - receive_complete=%s", conn->common.conn_id, qd_message_receive_complete(conn->outbound_stream) ? "true" : "false"); if (!conn->outbound_delivery) { + // newly arrived delivery: validate it + // + uint64_t dispo = validate_outbound_message(delivery); + if (dispo != PN_RECEIVED) { + // PN_RELEASED: since this message was delivered to this listener's unique reply-to, it cannot be + // redelivered to another consumer. PN_RELEASED means incompatible encapsulation so this is a + // misconfiguration. Reject the delivery. + if (dispo == PN_RELEASED) + dispo = PN_REJECTED; + return dispo; + } + qdr_delivery_incref(delivery, "handle_outbound_delivery_LSIDE_IO"); conn->outbound_delivery = delivery; conn->outbound_stream = qdr_delivery_message(delivery); @@ -942,18 +958,28 @@ static void handle_outbound_delivery_LSIDE_IO(tcplite_connection_t *conn, qdr_li } connection_run_LSIDE_IO(conn); + return 0; } /** * Handle the first indication of a new outbound delivery on CSIDE. This is where the raw connection to the * external service is established. This function executes in an IO thread not associated with a raw connection. + * + * @return disposition. MOVED_TO_NEW_LINK on success, 0 if more message needed, else error outcome */ -static void handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qdr_link_t *link, qdr_delivery_t *delivery) +static uint64_t handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qdr_link_t *link, qdr_delivery_t *delivery) { ASSERT_TIMER_IO; assert(!qdr_delivery_get_context(delivery)); + // Verify the message properties have arrived and are valid + // + uint64_t dispo = validate_outbound_message(delivery); + if (dispo != PN_RECEIVED) { + return dispo; + } + tcplite_connection_t *conn = new_tcplite_connection_t(); ZERO(conn); @@ -1009,6 +1035,8 @@ static void handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qdr_li // SET_ATOMIC_FLAG(&conn->raw_opened); pn_proactor_raw_connect(tcplite_context->proactor, conn->raw_conn, cr->adaptor_config->host_port); + + return QD_DELIVERY_MOVED_TO_NEW_LINK; } @@ -1288,6 +1316,46 @@ static void connection_run_XSIDE_IO(tcplite_connection_t *conn) } } +// Validate the outbound message associated with out_dlv +// +// @return a disposition value indicating the validity of the message: +// 0: message headers incomplete, wait for more data to arrive +// PN_REJECTED: corrupt headers, cannot be re-delivered +// PN_RELEASED: headers ok, incompatible body format: deliver elsewhere +// PN_RECEIVED: headers & body ok +// +static uint64_t validate_outbound_message(const qdr_delivery_t *out_dlv) +{ + qd_message_t *msg = qdr_delivery_message(out_dlv); + qd_message_depth_status_t depth_ok = qd_message_check_depth(msg, QD_DEPTH_PROPERTIES); + if (depth_ok == QD_MESSAGE_DEPTH_INCOMPLETE) { + qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, + DLV_FMT " tcp_adaptor egress message incomplete, waiting for more", DLV_ARGS(out_dlv)); + return 0; // retry later + } + if (depth_ok != QD_MESSAGE_DEPTH_OK) { // otherwise bug? corrupted message encoding? + qd_log(LOG_TCP_ADAPTOR, QD_LOG_WARNING, DLV_FMT " Malformed TCP message - discarding!", DLV_ARGS(out_dlv)); + qd_message_set_send_complete(msg); + return PN_REJECTED; + } + + // ISSUE-1136: ensure the message body is using the proper encapsulation. + // + bool encaps_ok = false; + qd_iterator_t *encaps = qd_message_field_iterator(msg, QD_FIELD_CONTENT_TYPE); + if (encaps) { + encaps_ok = qd_iterator_equal(encaps, (unsigned char *) QD_CONTENT_TYPE_APP_OCTETS); + qd_iterator_free(encaps); + } + if (!encaps_ok) { + qd_log(LOG_TCP_ADAPTOR, QD_LOG_ERROR, DLV_FMT " Misconfigured TCP adaptor (wrong encapsulation)", + DLV_ARGS(out_dlv)); + qd_message_set_send_complete(msg); + return PN_RELEASED; // allow it to be re-forwarded to a different adaptor + } + return PN_RECEIVED; +} + //================================================================================= // Handlers for events from the Raw Connections @@ -1513,12 +1581,11 @@ static uint64_t CORE_deliver_outbound(void *context, qdr_link_t *link, qdr_deliv } if (common->context_type == TL_CONNECTOR) { - handle_first_outbound_delivery_CSIDE((tcplite_connector_t*) common, link, delivery); - return QD_DELIVERY_MOVED_TO_NEW_LINK; + return handle_first_outbound_delivery_CSIDE((tcplite_connector_t*) common, link, delivery); } else if (common->context_type == TL_CONNECTION) { tcplite_connection_t *conn = (tcplite_connection_t*) common; if (conn->listener_side) { - handle_outbound_delivery_LSIDE_IO(conn, link, delivery); + return handle_outbound_delivery_LSIDE_IO(conn, link, delivery); } else { handle_outbound_delivery_CSIDE(conn, link, delivery, settled); } diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index 6b25577ed..766e981e9 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -2261,7 +2261,7 @@ def check_connection_deleted(): class TcpLegacyInvalidEncodingTest(TestCase): """ Ensure that the TCP adaptor can recover from receiving an improperly - formatted/wrong version AMQP encoded stream message. + formatted AMQP encoded stream message. """ @classmethod def setUpClass(cls): @@ -2408,8 +2408,9 @@ def run(self): class InvalidServerSendReply(MessagingHandler): """ - Simulate a TCP adaptor reply message that is invalid. Expect the client - (egress) to ignore the message and set its disposition to dispo. + Simulate via AMQP a TCP client/server flow and have the fake server send + "msg" as the TCP adaptor reply message to the client. Expect the client to + set the reply messages disposition to "dispo". """ def __init__(self, msg, server_address, listener_address, service_address, dispo): super(InvalidServerSendReply, self).__init__(auto_settle=False) @@ -2433,7 +2434,8 @@ def __init__(self, msg, server_address, listener_address, service_address, dispo self.request_dlv = None self.dlv_drain_timer = None - # fake tcp client, just sends a request message + # fake tcp client, just opens an AMQP connection to the TCP + # listener. This initiates the ingress streaming request message. self.listener_address = listener_address self.client_conn = None self.client_sent = False @@ -2453,13 +2455,17 @@ def timeout(self): self.done(error=f"Timeout Expired: sent={self.sent}") def on_start(self, event): + # Create an AMQP receiver for the service address. This will simulate a + # fake server and activate the TCP adaptor listener port self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) self.server_conn = event.container.connect(self.server_address) self.server_receiver = event.container.create_receiver(self.server_conn, self.service_address) def on_timer_task(self, event): - # at this point we expect the reply-to header to have arrived + # At this point we expect the reply-to header to have arrived. Use the + # reply-to address to send the 'msg' parameter to the client as the + # response streaming message. try: data = self.server_receiver.recv(self.request_dlv.pending) #print(f"len={len(xxx)}\nBODY=[{xxx}]", flush=True) @@ -2473,6 +2479,9 @@ def on_timer_task(self, event): self.request_dlv.settle() def on_delivery(self, event): + # We've received the start of the client request message. In order to + # set up a reply-to link wait until the message's reply-to field has + # arrived. if event.receiver == self.server_receiver: if self.request_dlv is None and event.delivery.readable: # sleep a bit to allow all the header data to arrive on the @@ -2484,9 +2493,11 @@ def on_link_opened(self, event): if event.receiver == self.server_receiver: # "server" ready to take requests, fire up the "client". All we # need is to connect since that will activate the tcp adaptor + # client-side due to the AMQP @open handshake. self.client_conn = event.container.connect(self.listener_address) def on_sendable(self, event): + # Have the fake server send 'msg' to the reply-to of the fake client if event.sender == self.server_sender: if not self.server_sent: # send the invalid reply @@ -2710,5 +2721,87 @@ def test_01_check_delayed_deliveries(self): f"Expected delay counter to be zero, got {counters}") +class TcpMisconfiguredLegacyLiteEncapsTest(TestCase): + """ + Ensure that the TCP adaptor can detect misconfiguration of the + encapsulation setting by creating a tcpListener and tcpConnector pair that + use different encaps. + """ + @classmethod + def setUpClass(cls): + super(TcpMisconfiguredLegacyLiteEncapsTest, cls).setUpClass() + + config = [ + ('router', {'mode': 'interior', 'id': 'TcpMisconfiguredEncaps'}), + # Listener for handling router management requests. + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + ('address', {'prefix': 'closest', 'distribution': 'closest'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ] + + cls.router = cls.tester.qdrouterd('TcpInvalidEncoding', + Qdrouterd.Config(config), wait=True) + cls.address = cls.router.addresses[0] + + def _test(self, ingress_encaps, egress_encaps): + ingress_port = self.tester.get_port() + egress_port = self.tester.get_port() + mgmt = self.router.qd_manager + + mgmt.create(TCP_CONNECTOR_TYPE, + {"name": "EncapsConnector", + "address": "EncapsTest", + "host": "localhost", + "port": egress_port, + "encapsulation": egress_encaps}) + mgmt.create(TCP_LISTENER_TYPE, + {"name": "EncapsListener", + "address": "EncapsTest", + "port": ingress_port, + "encapsulation": ingress_encaps}) + wait_tcp_listeners_up(self.address) + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server: + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server.settimeout(TIMEOUT) + server.bind(("", egress_port)) + server.listen(1) + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_conn: + client_conn.settimeout(TIMEOUT) + while True: + try: + client_conn.connect(('127.0.0.1', ingress_port)) + break + except ConnectionRefusedError: + # There may be a delay between the operStatus going up and + # the actual listener socket availability, so allow that: + time.sleep(0.1) + continue + + # send data to kick off the flow, but expect connection to fail + # in recv (either close or reset) + + client_conn.sendall(b' test ') + try: + data = client_conn.recv(4096) + except ConnectionResetError: + data = b'' + + self.assertEqual(b'', data, "expected recv to fail") + + mgmt.delete(TCP_CONNECTOR_TYPE, name="EncapsConnector") + mgmt.delete(TCP_LISTENER_TYPE, name="EncapsListener") + + def test_01_encaps_mismatch(self): + """ + Attempt to configure incompatible TCP encapsulation on each side of the + TCP path + """ + self._test(ingress_encaps="legacy", egress_encaps="lite") + self._test(ingress_encaps="lite", egress_encaps="legacy") + + if __name__ == '__main__': unittest.main(main_module())