Skip to content

Commit

Permalink
Fixes 1136: add additional message integrity tests to the legacy adap…
Browse files Browse the repository at this point in the history
…tor (#1253)

* Fixes 1136: add additional message integrity tests to the legacy adaptor

* fixup: more comments!
  • Loading branch information
kgiusti authored Oct 25, 2023
1 parent ddfcb11 commit a3a0045
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 21 deletions.
58 changes: 54 additions & 4 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,10 @@ static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_r
conn->read_eos_seen = true;
break;
case QD_MESSAGE_STREAM_DATA_INVALID:
// Corrupted message, treat like EOS since there is no way to undo what has already been sent
qd_log(LOG_TCP_ADAPTOR, QD_LOG_ERROR,
"[C%" PRIu64 "] Invalid body data for streaming message", conn->conn_id);
"[C%" PRIu64 "] Invalid body data for streaming message, closing connection", conn->conn_id);
conn->read_eos_seen = true;
break;
default:
break;
Expand Down Expand Up @@ -1996,15 +1998,19 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t
DLV_FMT " tcp_adaptor egress message incomplete, waiting for more", DLV_ARGS(delivery));
return 0; // retry later
}
assert(depth_ok == QD_MESSAGE_DEPTH_OK); // otherwise bug in message encoding?
if (depth_ok != QD_MESSAGE_DEPTH_OK) { // otherwise bug? corrupted message encoding?
qd_log(LOG_TCP_ADAPTOR, QD_LOG_WARNING, DLV_FMT " Malformed TCP message - discarding!", DLV_ARGS(delivery));
qd_message_set_send_complete(msg);
return PN_REJECTED;
}

// ISSUE-1136: check if the message format is correct. For this adaptor the content-type field must be
// unset. See comment in handle_incoming().
//
qd_iterator_t *ctype = qd_message_field_iterator(msg, QD_FIELD_CONTENT_TYPE);
if (ctype) {
qd_iterator_free(ctype);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_ERROR, DLV_FMT " Misconfigured tcpConnector (wrong version)",
qd_log(LOG_TCP_ADAPTOR, QD_LOG_ERROR, DLV_FMT " Misconfigured tcpConnector (wrong encapsulation)",
DLV_ARGS(delivery));
qd_message_set_send_complete(msg);
return PN_RELEASED; // allow it to be re-forwarded to a different adaptor
Expand All @@ -2028,6 +2034,50 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t
} else if (!tc->out_dlv_stream) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG,
DLV_FMT " tcp_adaptor delivery arrived on non-egress dispatcher connection", DLV_ARGS(delivery));

if (tc->ingress) {
// Egress (connector-side) outgoing messages are validated when they arrive on the dispatcher link (see
// above). Ingress (client-side) outbound reply messages do not arrive on the dispatch link so these
// messages need to be validated here.

qd_message_depth_status_t depth_ok = qd_message_check_depth(msg, QD_DEPTH_APPLICATION_PROPERTIES);
if (depth_ok == QD_MESSAGE_DEPTH_INCOMPLETE) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG,
DLV_FMT " tcp_adaptor reply message incomplete, waiting for more", DLV_ARGS(delivery));
return 0; // retry later
}
if (depth_ok != QD_MESSAGE_DEPTH_OK) { // otherwise bug? corrupted message encoding?
qd_log(LOG_TCP_ADAPTOR, QD_LOG_WARNING, DLV_FMT " Malformed TCP message - discarding!", DLV_ARGS(delivery));
qd_message_set_send_complete(msg);
return PN_REJECTED;
}

// ISSUE-1136: check if the message format is correct. For this adaptor the content-type field must be
// unset. See comment in handle_incoming().
//
qd_iterator_t *ctype = qd_message_field_iterator(msg, QD_FIELD_CONTENT_TYPE);
if (ctype) {
qd_iterator_free(ctype);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_ERROR, DLV_FMT " Misconfigured tcpListener (wrong outgoing encapsulation)",
DLV_ARGS(delivery));
qd_message_set_send_complete(msg);

// What to do? This is a reply message, so it cannot be re-delivered to another service.

if (tc->pn_raw_conn) {
// set the raw connection condition info so it will appear in the vanflow logs
// when the connection disconnects
pn_condition_t *cond = pn_raw_connection_condition(tc->pn_raw_conn);
if (!!cond) {
(void) pn_condition_set_name(cond, "delivery-failed");
(void) pn_condition_set_description(cond, "invalid message encapsulation");
}
pn_raw_connection_close(tc->pn_raw_conn);
}
return PN_REJECTED;
}
}

tc->out_dlv_stream = delivery;
qdr_delivery_incref(delivery, "tcp_adaptor - new out_dlv_stream");
if (tc->ingress) {
Expand Down Expand Up @@ -2115,7 +2165,7 @@ static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t
pn_condition_t *cond = pn_raw_connection_condition(tc->pn_raw_conn);
if (!!cond) {
(void) pn_condition_set_name(cond, "delivery-failed");
(void) pn_condition_set_description(cond, "destination unreachable");
(void) pn_condition_set_description(cond, (disp == PN_REJECTED) ? "invalid/corrupt message" : "destination unreachable");
}
pn_raw_connection_close(tc->pn_raw_conn);
}
Expand Down
194 changes: 177 additions & 17 deletions tests/system_tests_tcp_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from subprocess import STDOUT
from typing import List, Optional, Mapping, Tuple

from proton import Message
from proton import Message, Disposition
from proton.handlers import MessagingHandler
from proton.reactor import Container

Expand Down Expand Up @@ -2236,53 +2236,113 @@ def check_connection_deleted():
client_conn.close()


class TcpInvalidEncodingTest(TestCase):
class TcpLegacyInvalidEncodingTest(TestCase):
"""
Ensure that the TCP adaptor can recover from receiving an improperly
formatted/wrong version AMQP encoded stream message.
"""
@classmethod
def setUpClass(cls):
super(TcpInvalidEncodingTest, cls).setUpClass()
super(TcpLegacyInvalidEncodingTest, cls).setUpClass()

config = [
('router', {'mode': 'interior', 'id': 'TcpInvalidEncoding'}),
# Listener for handling router management requests.
('listener', {'role': 'normal',
'port': cls.tester.get_port()}),
('tcpConnector', {'host': "localhost",
('tcpConnector', {'host': "127.0.0.1",
'port': cls.tester.get_port(),
'address': 'tcp-adaptor',
'address': 'tcp-connector',
'encapsulation': 'legacy',
'siteId': "mySite"}),
('tcpListener', {'host': "0.0.0.0",
'port': cls.tester.get_port(),
'address': 'tcp-listener',
'encapsulation': 'legacy',
'siteId': "mySite"}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
]

cls.router = cls.tester.qdrouterd('TcpInvalidEncoding',
Qdrouterd.Config(config), wait=True)
cls.address = cls.router.addresses[0]

def test_invalid_amqp_message(self):
cls.amqp_address = cls.router.addresses[0]
cls.listener_address = cls.router.tcp_addresses[0]

def test_invalid_egress_client_request_encaps(self):
"""
Send an AMQP message addressed to the TCP service via the amqp
listener. Set values in the AMQP header which will conflict with what
is expected by the adaptor. Verify the message is RELEASED and an an
error has been logged.
Simulate an message with an incompatible ecapsulation sent by a client
that arrives at the egress connector. Verify that the egress connector
RELEASED the message and an error has been logged. The message should
be released so it can be delivered to another (compatible) connector.
"""

# send a request message with an incompatible encapsulation

msg = Message()
msg.to = "tcp-adaptor"
msg.to = "tcp-connector"
msg.subject = "stuff"
msg.reply_to = "invalid/reply/to"
msg.content_type = "application/octet-stream"
test = SendAMQPMessage(msg, self.address, 'tcp-adaptor')
msg.content_type = "This-is-wrong"
test = InvalidClientSendRequest(msg, self.amqp_address, 'tcp-connector')
test.run()
self.assertIsNone(test.error)
self.router.wait_log_message(pattern=r"Misconfigured tcpConnector \(wrong version\)")
self.router.wait_log_message(pattern=r"Misconfigured tcpConnector \(wrong encapsulation\)")

def test_invalid_ingress_server_reply_encaps(self):
"""
Simulate an invalid reply message arriving at the ingress
listener. Verify the message is REJECTED and an error has been
logged. The message is rejected because the link is a reply-to stream
with a unique address for the client - it cannot be redelivered to
another client.
"""

class SendAMQPMessage(MessagingHandler):
# send a reply message with an incompatible encapsulation

msg = Message()
msg.subject = "Subject"
msg.annotations = {":flowid": "whatever"}
msg.content_type = "This-is-wrong"
test = InvalidServerSendReply(msg, self.amqp_address,
self.listener_address, 'tcp-listener',
Disposition.REJECTED)
test.run()
self.assertIsNone(test.error)
self.router.wait_log_message(pattern=r"Misconfigured tcpListener \(wrong outgoing encapsulation\)")

def test_invalid_ingress_server_reply_body(self):
"""
Simulate a reply message arriving at the ingress listener that has an
invalid body structure. The client should close the connection and set
the outcome to ACCEPTED. The reason REJECTED is not used is because
data may already have been sent to the TCP client and it is too late to
reject the message.
"""

# send a reply message with an incompatible body format

msg = Message()
msg.subject = "Subject"
msg.annotations = {":flowid": "whatever"}
msg.body = "This is a STRING, NOT VBIN!"
test = InvalidServerSendReply(msg, self.amqp_address,
self.listener_address, 'tcp-listener',
Disposition.ACCEPTED)
test.run()
self.assertIsNone(test.error)
self.router.wait_log_message(pattern=r"Invalid body data for streaming message")


class InvalidClientSendRequest(MessagingHandler):
"""
Builds a legacy TCP adaptor client request message with an incompatible
encapsulation format. Expect the TCP adaptor connector to release the
message so it can be re-delivered to another (compatible) connector.
"""
def __init__(self, msg, address, destination):
super(SendAMQPMessage, self).__init__(auto_settle=False)
super(InvalidClientSendRequest, self).__init__(auto_settle=False)
self.msg = msg
self.address = address
self.destination = destination
Expand Down Expand Up @@ -2324,6 +2384,106 @@ def run(self):
Container(self).run()


class InvalidServerSendReply(MessagingHandler):
"""
Simulate a TCP adaptor reply message that is invalid. Expect the client
(egress) to ignore the message and set its disposition to dispo.
"""
def __init__(self, msg, server_address, listener_address, service_address, dispo):
super(InvalidServerSendReply, self).__init__(auto_settle=False)
self.msg = msg
self.service_address = service_address
self.error = None
self.timer = None
self.expected_dispo = dispo

# fake server connection, receive link for request, send link for reply-to
self.server_address = server_address
self.server_conn = None
self.server_sender = None
self.server_receiver = None
self.server_sent = False

# The request message that arrives at the "server" is streaming. Proton
# does not give us an "on_message" callback since it never
# completes. Wait long enough for the headers to arrive so we can
# extract the reply-to
self.request_dlv = None
self.dlv_drain_timer = None

# fake tcp client, just sends a request message
self.listener_address = listener_address
self.client_conn = None
self.client_sent = False

def done(self, error=None):
self.error = error
if self.timer:
self.timer.cancel()
self.server_conn.close()
if self.client_conn is not None:
self.client_conn.close()
if self.dlv_drain_timer:
self.dlv_drain_timer.cancel()

def timeout(self):
self.timer = None
self.done(error=f"Timeout Expired: sent={self.sent}")

def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.server_conn = event.container.connect(self.server_address)
self.server_receiver = event.container.create_receiver(self.server_conn,
self.service_address)

def on_timer_task(self, event):
# at this point we expect the reply-to header to have arrived
try:
data = self.server_receiver.recv(self.request_dlv.pending)
#print(f"len={len(xxx)}\nBODY=[{xxx}]", flush=True)
msg = Message()
msg.decode(data)
self.server_sender = event.container.create_sender(self.server_conn,
msg.reply_to)
except Exception as exc:
self.bail(error=f"Incomplete request msg headers {data}")

self.request_dlv.settle()

def on_delivery(self, event):
if event.receiver == self.server_receiver:
if self.request_dlv is None and event.delivery.readable:
# sleep a bit to allow all the header data to arrive on the
# delivery
self.request_dlv = event.delivery
self.dlv_drain_timer = event.reactor.schedule(1.0, self)

def on_link_opened(self, event):
if event.receiver == self.server_receiver:
# "server" ready to take requests, fire up the "client". All we
# need is to connect since that will activate the tcp adaptor
self.client_conn = event.container.connect(self.listener_address)

def on_sendable(self, event):
if event.sender == self.server_sender:
if not self.server_sent:
# send the invalid reply
self.server_sender.send(self.msg)
self.server_sent = True

def on_released(self, event):
self.done(None if self.expected_dispo == Disposition.RELEASED else "Unexpected PN_RELEASED")

def on_accepted(self, event):
self.done(None if self.expected_dispo == Disposition.ACCEPTED else "Unexpected PN_ACCEPTED")

def on_rejected(self, event):
self.done(None if self.expected_dispo == Disposition.REJECTED else "Unexpected PN_REJECTED")

def run(self):
Container(self).run()


class TcpAdaptorConnCounter(TestCase):
"""
Validate the TCP service connection counter
Expand Down

0 comments on commit a3a0045

Please sign in to comment.