Skip to content

Commit

Permalink
Fixes #1136: Add test for mismatched TCP adaptor encapsulation
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Oct 25, 2023
1 parent a3a0045 commit 6fab0e1
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 11 deletions.
79 changes: 73 additions & 6 deletions src/adaptors/tcp_lite/tcp_lite.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ static void on_connection_event_CSIDE_IO(pn_event_t *e, qd_server_t *qd_server,
static void connection_run_LSIDE_IO(tcplite_connection_t *conn);
static void connection_run_CSIDE_IO(tcplite_connection_t *conn);
static void connection_run_XSIDE_IO(tcplite_connection_t *conn);
static uint64_t validate_outbound_message(const qdr_delivery_t *out_dlv);


//=================================================================================
Expand Down Expand Up @@ -917,14 +918,29 @@ static void extract_metadata_from_stream_CSIDE(tcplite_connection_t *conn)
}
}


static void handle_outbound_delivery_LSIDE_IO(tcplite_connection_t *conn, qdr_link_t *link, qdr_delivery_t *delivery)
// Handle delivery of outbound message to the client.
//
// @return 0 on success, otherwise a terminal outcome indicating that the message cannot be delivered.
//
static uint64_t handle_outbound_delivery_LSIDE_IO(tcplite_connection_t *conn, qdr_link_t *link, qdr_delivery_t *delivery)
{
ASSERT_RAW_IO;
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] handle_outbound_delivery_LSIDE_IO - receive_complete=%s",
conn->common.conn_id, qd_message_receive_complete(conn->outbound_stream) ? "true" : "false");

if (!conn->outbound_delivery) {
// newly arrived delivery: validate it
//
uint64_t dispo = validate_outbound_message(delivery);
if (dispo != PN_RECEIVED) {
// PN_RELEASED: since this message was delivered to this listener's unique reply-to, it cannot be
// redelivered to another consumer. PN_RELEASED means incompatible encapsulation so this is a
// misconfiguration. Reject the delivery.
if (dispo == PN_RELEASED)
dispo = PN_REJECTED;
return dispo;
}

qdr_delivery_incref(delivery, "handle_outbound_delivery_LSIDE_IO");
conn->outbound_delivery = delivery;
conn->outbound_stream = qdr_delivery_message(delivery);
Expand All @@ -942,18 +958,28 @@ static void handle_outbound_delivery_LSIDE_IO(tcplite_connection_t *conn, qdr_li
}

connection_run_LSIDE_IO(conn);
return 0;
}


/**
* Handle the first indication of a new outbound delivery on CSIDE. This is where the raw connection to the
* external service is established. This function executes in an IO thread not associated with a raw connection.
*
* @return disposition. MOVED_TO_NEW_LINK on success, 0 if more message needed, else error outcome
*/
static void handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qdr_link_t *link, qdr_delivery_t *delivery)
static uint64_t handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qdr_link_t *link, qdr_delivery_t *delivery)
{
ASSERT_TIMER_IO;
assert(!qdr_delivery_get_context(delivery));

// Verify the message properties have arrived and are valid
//
uint64_t dispo = validate_outbound_message(delivery);
if (dispo != PN_RECEIVED) {
return dispo;
}

tcplite_connection_t *conn = new_tcplite_connection_t();
ZERO(conn);

Expand Down Expand Up @@ -1009,6 +1035,8 @@ static void handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qdr_li
//
SET_ATOMIC_FLAG(&conn->raw_opened);
pn_proactor_raw_connect(tcplite_context->proactor, conn->raw_conn, cr->adaptor_config->host_port);

return QD_DELIVERY_MOVED_TO_NEW_LINK;
}


Expand Down Expand Up @@ -1288,6 +1316,46 @@ static void connection_run_XSIDE_IO(tcplite_connection_t *conn)
}
}

// Validate the outbound message associated with out_dlv
//
// @return a disposition value indicating the validity of the message:
// 0: message headers incomplete, wait for more data to arrive
// PN_REJECTED: corrupt headers, cannot be re-delivered
// PN_RELEASED: headers ok, incompatible body format: deliver elsewhere
// PN_RECEIVED: headers & body ok
//
static uint64_t validate_outbound_message(const qdr_delivery_t *out_dlv)
{
qd_message_t *msg = qdr_delivery_message(out_dlv);
qd_message_depth_status_t depth_ok = qd_message_check_depth(msg, QD_DEPTH_PROPERTIES);
if (depth_ok == QD_MESSAGE_DEPTH_INCOMPLETE) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG,
DLV_FMT " tcp_adaptor egress message incomplete, waiting for more", DLV_ARGS(out_dlv));
return 0; // retry later
}
if (depth_ok != QD_MESSAGE_DEPTH_OK) { // otherwise bug? corrupted message encoding?
qd_log(LOG_TCP_ADAPTOR, QD_LOG_WARNING, DLV_FMT " Malformed TCP message - discarding!", DLV_ARGS(out_dlv));
qd_message_set_send_complete(msg);
return PN_REJECTED;
}

// ISSUE-1136: ensure the message body is using the proper encapsulation.
//
bool encaps_ok = false;
qd_iterator_t *encaps = qd_message_field_iterator(msg, QD_FIELD_CONTENT_TYPE);
if (encaps) {
encaps_ok = qd_iterator_equal(encaps, (unsigned char *) QD_CONTENT_TYPE_APP_OCTETS);
qd_iterator_free(encaps);
}
if (!encaps_ok) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_ERROR, DLV_FMT " Misconfigured TCP adaptor (wrong encapsulation)",
DLV_ARGS(out_dlv));
qd_message_set_send_complete(msg);
return PN_RELEASED; // allow it to be re-forwarded to a different adaptor
}
return PN_RECEIVED;
}


//=================================================================================
// Handlers for events from the Raw Connections
Expand Down Expand Up @@ -1513,12 +1581,11 @@ static uint64_t CORE_deliver_outbound(void *context, qdr_link_t *link, qdr_deliv
}

if (common->context_type == TL_CONNECTOR) {
handle_first_outbound_delivery_CSIDE((tcplite_connector_t*) common, link, delivery);
return QD_DELIVERY_MOVED_TO_NEW_LINK;
return handle_first_outbound_delivery_CSIDE((tcplite_connector_t*) common, link, delivery);
} else if (common->context_type == TL_CONNECTION) {
tcplite_connection_t *conn = (tcplite_connection_t*) common;
if (conn->listener_side) {
handle_outbound_delivery_LSIDE_IO(conn, link, delivery);
return handle_outbound_delivery_LSIDE_IO(conn, link, delivery);
} else {
handle_outbound_delivery_CSIDE(conn, link, delivery, settled);
}
Expand Down
103 changes: 98 additions & 5 deletions tests/system_tests_tcp_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2239,7 +2239,7 @@ def check_connection_deleted():
class TcpLegacyInvalidEncodingTest(TestCase):
"""
Ensure that the TCP adaptor can recover from receiving an improperly
formatted/wrong version AMQP encoded stream message.
formatted AMQP encoded stream message.
"""
@classmethod
def setUpClass(cls):
Expand Down Expand Up @@ -2386,8 +2386,9 @@ def run(self):

class InvalidServerSendReply(MessagingHandler):
"""
Simulate a TCP adaptor reply message that is invalid. Expect the client
(egress) to ignore the message and set its disposition to dispo.
Simulate via AMQP a TCP client/server flow and have the fake server send
"msg" as the TCP adaptor reply message to the client. Expect the client to
set the reply messages disposition to "dispo".
"""
def __init__(self, msg, server_address, listener_address, service_address, dispo):
super(InvalidServerSendReply, self).__init__(auto_settle=False)
Expand All @@ -2411,7 +2412,8 @@ def __init__(self, msg, server_address, listener_address, service_address, dispo
self.request_dlv = None
self.dlv_drain_timer = None

# fake tcp client, just sends a request message
# fake tcp client, just opens an AMQP connection to the TCP
# listener. This initiates the ingress streaming request message.
self.listener_address = listener_address
self.client_conn = None
self.client_sent = False
Expand All @@ -2431,13 +2433,17 @@ def timeout(self):
self.done(error=f"Timeout Expired: sent={self.sent}")

def on_start(self, event):
# Create an AMQP receiver for the service address. This will simulate a
# fake server and activate the TCP adaptor listener port
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.server_conn = event.container.connect(self.server_address)
self.server_receiver = event.container.create_receiver(self.server_conn,
self.service_address)

def on_timer_task(self, event):
# at this point we expect the reply-to header to have arrived
# At this point we expect the reply-to header to have arrived. Use the
# reply-to address to send the 'msg' parameter to the client as the
# response streaming message.
try:
data = self.server_receiver.recv(self.request_dlv.pending)
#print(f"len={len(xxx)}\nBODY=[{xxx}]", flush=True)
Expand All @@ -2451,6 +2457,9 @@ def on_timer_task(self, event):
self.request_dlv.settle()

def on_delivery(self, event):
# We've received the start of the client request message. In order to
# set up a reply-to link wait until the message's reply-to field has
# arrived.
if event.receiver == self.server_receiver:
if self.request_dlv is None and event.delivery.readable:
# sleep a bit to allow all the header data to arrive on the
Expand All @@ -2462,9 +2471,11 @@ def on_link_opened(self, event):
if event.receiver == self.server_receiver:
# "server" ready to take requests, fire up the "client". All we
# need is to connect since that will activate the tcp adaptor
# client-side due to the AMQP @open handshake.
self.client_conn = event.container.connect(self.listener_address)

def on_sendable(self, event):
# Have the fake server send 'msg' to the reply-to of the fake client
if event.sender == self.server_sender:
if not self.server_sent:
# send the invalid reply
Expand Down Expand Up @@ -2688,5 +2699,87 @@ def test_01_check_delayed_deliveries(self):
f"Expected delay counter to be zero, got {counters}")


class TcpMisconfiguredLegacyLiteEncapsTest(TestCase):
"""
Ensure that the TCP adaptor can detect misconfiguration of the
encapsulation setting by creating a tcpListener and tcpConnector pair that
use different encaps.
"""
@classmethod
def setUpClass(cls):
super(TcpMisconfiguredLegacyLiteEncapsTest, cls).setUpClass()

config = [
('router', {'mode': 'interior', 'id': 'TcpMisconfiguredEncaps'}),
# Listener for handling router management requests.
('listener', {'role': 'normal',
'port': cls.tester.get_port()}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
]

cls.router = cls.tester.qdrouterd('TcpInvalidEncoding',
Qdrouterd.Config(config), wait=True)
cls.address = cls.router.addresses[0]

def _test(self, ingress_encaps, egress_encaps):
ingress_port = self.tester.get_port()
egress_port = self.tester.get_port()
mgmt = self.router.qd_manager

mgmt.create(TCP_CONNECTOR_TYPE,
{"name": "EncapsConnector",
"address": "EncapsTest",
"host": "localhost",
"port": egress_port,
"encapsulation": egress_encaps})
mgmt.create(TCP_LISTENER_TYPE,
{"name": "EncapsListener",
"address": "EncapsTest",
"port": ingress_port,
"encapsulation": ingress_encaps})
wait_tcp_listeners_up(self.address)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.settimeout(TIMEOUT)
server.bind(("", egress_port))
server.listen(1)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_conn:
client_conn.settimeout(TIMEOUT)
while True:
try:
client_conn.connect(('127.0.0.1', ingress_port))
break
except ConnectionRefusedError:
# There may be a delay between the operStatus going up and
# the actual listener socket availability, so allow that:
time.sleep(0.1)
continue

# send data to kick off the flow, but expect connection to fail
# in recv (either close or reset)

client_conn.sendall(b' test ')
try:
data = client_conn.recv(4096)
except ConnectionResetError:
data = b''

self.assertEqual(b'', data, "expected recv to fail")

mgmt.delete(TCP_CONNECTOR_TYPE, name="EncapsConnector")
mgmt.delete(TCP_LISTENER_TYPE, name="EncapsListener")

def test_01_encaps_mismatch(self):
"""
Attempt to configure incompatible TCP encapsulation on each side of the
TCP path
"""
self._test(ingress_encaps="legacy", egress_encaps="lite")
self._test(ingress_encaps="lite", egress_encaps="legacy")


if __name__ == '__main__':
unittest.main(main_module())

0 comments on commit 6fab0e1

Please sign in to comment.