diff --git a/server/src/main/java/org/opensearch/transport/Header.java b/server/src/main/java/org/opensearch/transport/Header.java index ac30df8dda02c..fcfeb9c632075 100644 --- a/server/src/main/java/org/opensearch/transport/Header.java +++ b/server/src/main/java/org/opensearch/transport/Header.java @@ -55,6 +55,7 @@ public class Header { private static final String RESPONSE_NAME = "NO_ACTION_NAME_FOR_RESPONSES"; + private final TransportProtocol protocol; private final int networkMessageSize; private final Version version; private final long requestId; @@ -64,13 +65,18 @@ public class Header { Tuple, Map>> headers; Set features; - Header(int networkMessageSize, long requestId, byte status, Version version) { + Header(TransportProtocol protocol, int networkMessageSize, long requestId, byte status, Version version) { + this.protocol = protocol; this.networkMessageSize = networkMessageSize; this.version = version; this.requestId = requestId; this.status = status; } + TransportProtocol getTransportProtocol() { + return protocol; + } + public int getNetworkMessageSize() { return networkMessageSize; } @@ -142,6 +148,8 @@ void finishParsingHeader(StreamInput input) throws IOException { @Override public String toString() { return "Header{" + + protocol + + "}{" + networkMessageSize + "}{" + version diff --git a/server/src/main/java/org/opensearch/transport/InboundAggregator.java b/server/src/main/java/org/opensearch/transport/InboundAggregator.java index f52875d880b4f..c19ecd1f72b60 100644 --- a/server/src/main/java/org/opensearch/transport/InboundAggregator.java +++ b/server/src/main/java/org/opensearch/transport/InboundAggregator.java @@ -114,7 +114,7 @@ public void aggregate(ReleasableBytesReference content) { } } - public NativeInboundMessage finishAggregation() throws IOException { + public ProtocolInboundMessage finishAggregation() throws IOException { ensureOpen(); final ReleasableBytesReference releasableContent; if (isFirstContent()) { diff --git a/server/src/main/java/org/opensearch/transport/InboundDecoder.java b/server/src/main/java/org/opensearch/transport/InboundDecoder.java index d6b7a98e876b3..3e735d4be2420 100644 --- a/server/src/main/java/org/opensearch/transport/InboundDecoder.java +++ b/server/src/main/java/org/opensearch/transport/InboundDecoder.java @@ -187,11 +187,12 @@ private int headerBytesToRead(BytesReference reference) { // exposed for use in tests static Header readHeader(Version version, int networkMessageSize, BytesReference bytesReference) throws IOException { try (StreamInput streamInput = bytesReference.streamInput()) { - streamInput.skip(TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE); + TransportProtocol protocol = TransportProtocol.fromBytes(streamInput.readByte(), streamInput.readByte()); + streamInput.skip(TcpHeader.MESSAGE_LENGTH_SIZE); long requestId = streamInput.readLong(); byte status = streamInput.readByte(); Version remoteVersion = Version.fromId(streamInput.readInt()); - Header header = new Header(networkMessageSize, requestId, status, remoteVersion); + Header header = new Header(protocol, networkMessageSize, requestId, status, remoteVersion); final IllegalStateException invalidVersion = ensureVersionCompatibility(remoteVersion, version, header.isHandshake()); if (invalidVersion != null) { throw invalidVersion; diff --git a/server/src/main/java/org/opensearch/transport/InboundHandler.java b/server/src/main/java/org/opensearch/transport/InboundHandler.java index f77c44ea362cf..9de80eea073ad 100644 --- a/server/src/main/java/org/opensearch/transport/InboundHandler.java +++ b/server/src/main/java/org/opensearch/transport/InboundHandler.java @@ -56,7 +56,7 @@ public class InboundHandler { private volatile long slowLogThresholdMs = Long.MAX_VALUE; - private final Map protocolMessageHandlers; + private final Map protocolMessageHandlers; InboundHandler( String nodeName, @@ -75,7 +75,7 @@ public class InboundHandler { ) { this.threadPool = threadPool; this.protocolMessageHandlers = Map.of( - NativeInboundMessage.NATIVE_PROTOCOL, + TransportProtocol.NATIVE, new NativeMessageHandler( nodeName, version, @@ -114,9 +114,9 @@ void inboundMessage(TcpChannel channel, ProtocolInboundMessage message) throws E } private void messageReceivedFromPipeline(TcpChannel channel, ProtocolInboundMessage message, long startTime) throws IOException { - ProtocolMessageHandler protocolMessageHandler = protocolMessageHandlers.get(message.getProtocol()); + ProtocolMessageHandler protocolMessageHandler = protocolMessageHandlers.get(message.getTransportProtocol()); if (protocolMessageHandler == null) { - throw new IllegalStateException("No protocol message handler found for protocol: " + message.getProtocol()); + throw new IllegalStateException("No protocol message handler found for protocol: " + message.getTransportProtocol()); } protocolMessageHandler.messageReceived(channel, message, startTime, slowLogThresholdMs, messageListener); } diff --git a/server/src/main/java/org/opensearch/transport/InboundPipeline.java b/server/src/main/java/org/opensearch/transport/InboundPipeline.java index 5cee3bb975223..597ab0673ab4b 100644 --- a/server/src/main/java/org/opensearch/transport/InboundPipeline.java +++ b/server/src/main/java/org/opensearch/transport/InboundPipeline.java @@ -63,8 +63,7 @@ public class InboundPipeline implements Releasable { private final ArrayDeque pending = new ArrayDeque<>(2); private boolean isClosed = false; private final BiConsumer messageHandler; - private final List protocolBytesHandlers; - private InboundBytesHandler currentHandler; + private final InboundBytesHandler bytesHandler; public InboundPipeline( Version version, @@ -95,17 +94,14 @@ public InboundPipeline( this.statsTracker = statsTracker; this.decoder = decoder; this.aggregator = aggregator; - this.protocolBytesHandlers = List.of(new NativeInboundBytesHandler(pending, decoder, aggregator, statsTracker)); + this.bytesHandler = new NativeInboundBytesHandler(pending, decoder, aggregator, statsTracker); this.messageHandler = messageHandler; } @Override public void close() { isClosed = true; - if (currentHandler != null) { - currentHandler.close(); - currentHandler = null; - } + bytesHandler.close(); Releasables.closeWhileHandlingException(decoder, aggregator); Releasables.closeWhileHandlingException(pending); pending.clear(); @@ -127,22 +123,6 @@ public void doHandleBytes(TcpChannel channel, ReleasableBytesReference reference channel.getChannelStats().markAccessed(relativeTimeInMillis.getAsLong()); statsTracker.markBytesRead(reference.length()); pending.add(reference.retain()); - - // If we don't have a current handler, we should try to find one based on the protocol of the incoming bytes. - if (currentHandler == null) { - for (InboundBytesHandler handler : protocolBytesHandlers) { - if (handler.canHandleBytes(reference)) { - currentHandler = handler; - break; - } - } - } - - // If we have a current handler determined based on protocol, we should continue to use it for the fragmented bytes. - if (currentHandler != null) { - currentHandler.doHandleBytes(channel, reference, messageHandler); - } else { - throw new IllegalStateException("No bytes handler found for the incoming transport protocol"); - } + bytesHandler.doHandleBytes(channel, reference, messageHandler); } } diff --git a/server/src/main/java/org/opensearch/transport/ProtocolInboundMessage.java b/server/src/main/java/org/opensearch/transport/ProtocolInboundMessage.java index 43c2d5ffe4c96..d4ecb0f5d2941 100644 --- a/server/src/main/java/org/opensearch/transport/ProtocolInboundMessage.java +++ b/server/src/main/java/org/opensearch/transport/ProtocolInboundMessage.java @@ -9,6 +9,9 @@ package org.opensearch.transport; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.bytes.ReleasableBytesReference; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lease.Releasables; /** * Base class for inbound data as a message. @@ -17,11 +20,89 @@ * @opensearch.internal */ @PublicApi(since = "2.14.0") -public interface ProtocolInboundMessage { +public abstract class ProtocolInboundMessage implements Releasable { - /** - * @return the protocol used to encode this message - */ - public String getProtocol(); + protected final Header header; + protected final ReleasableBytesReference content; + protected final Exception exception; + protected final boolean isPing; + private Releasable breakerRelease; + public ProtocolInboundMessage(Header header, ReleasableBytesReference content, Releasable breakerRelease) { + this.header = header; + this.content = content; + this.breakerRelease = breakerRelease; + this.exception = null; + this.isPing = false; + } + + public ProtocolInboundMessage(Header header, Exception exception) { + this.header = header; + this.content = null; + this.breakerRelease = null; + this.exception = exception; + this.isPing = false; + } + + public ProtocolInboundMessage(Header header, boolean isPing) { + this.header = header; + this.content = null; + this.breakerRelease = null; + this.exception = null; + this.isPing = isPing; + } + + TransportProtocol getTransportProtocol() { + return header.getTransportProtocol(); + } + + public String getProtocol() { + return header.getTransportProtocol().toString(); + } + + public Header getHeader() { + return header; + } + + public int getContentLength() { + if (content == null) { + return 0; + } else { + return content.length(); + } + } + + public Exception getException() { + return exception; + } + + public boolean isPing() { + return isPing; + } + + public boolean isShortCircuit() { + return exception != null; + } + + public Releasable takeBreakerReleaseControl() { + final Releasable toReturn = breakerRelease; + breakerRelease = null; + if (toReturn != null) { + return toReturn; + } else { + return () -> {}; + } + } + + + + @Override + public void close() { + Releasables.closeWhileHandlingException(content, breakerRelease); + } + + @Override + public String toString() { + return "InboundMessage{" + header + "}"; + } } diff --git a/server/src/main/java/org/opensearch/transport/TransportProtocol.java b/server/src/main/java/org/opensearch/transport/TransportProtocol.java new file mode 100644 index 0000000000000..0e653d0bbf33f --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/TransportProtocol.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport; + +enum TransportProtocol { + NATIVE; + + public static TransportProtocol fromBytes(byte b1, byte b2) { + if (b1 == 'E' && b2 == 'S') { + return NATIVE; + } + + throw new IllegalArgumentException("Unknown transport protocol: [" + b1 + ", " + b2 + "]"); + } +} diff --git a/server/src/main/java/org/opensearch/transport/nativeprotocol/NativeInboundBytesHandler.java b/server/src/main/java/org/opensearch/transport/nativeprotocol/NativeInboundBytesHandler.java index 97981aeb6736e..9290ec8985161 100644 --- a/server/src/main/java/org/opensearch/transport/nativeprotocol/NativeInboundBytesHandler.java +++ b/server/src/main/java/org/opensearch/transport/nativeprotocol/NativeInboundBytesHandler.java @@ -151,7 +151,7 @@ private void forwardFragments( messageHandler.accept(channel, PING_MESSAGE); } else if (fragment == InboundDecoder.END_CONTENT) { assert aggregator.isAggregating(); - try (NativeInboundMessage aggregated = aggregator.finishAggregation()) { + try (ProtocolInboundMessage aggregated = aggregator.finishAggregation()) { statsTracker.markMessageReceived(); messageHandler.accept(channel, aggregated); } diff --git a/server/src/main/java/org/opensearch/transport/nativeprotocol/NativeInboundMessage.java b/server/src/main/java/org/opensearch/transport/nativeprotocol/NativeInboundMessage.java index 1143f129b6319..448b21b5d3668 100644 --- a/server/src/main/java/org/opensearch/transport/nativeprotocol/NativeInboundMessage.java +++ b/server/src/main/java/org/opensearch/transport/nativeprotocol/NativeInboundMessage.java @@ -49,81 +49,25 @@ * @opensearch.api */ @PublicApi(since = "2.14.0") -public class NativeInboundMessage implements Releasable, ProtocolInboundMessage { +public class NativeInboundMessage extends ProtocolInboundMessage { /** * The protocol used to encode this message */ public static String NATIVE_PROTOCOL = "native"; - private final Header header; - private final ReleasableBytesReference content; - private final Exception exception; - private final boolean isPing; - private Releasable breakerRelease; private StreamInput streamInput; public NativeInboundMessage(Header header, ReleasableBytesReference content, Releasable breakerRelease) { - this.header = header; - this.content = content; - this.breakerRelease = breakerRelease; - this.exception = null; - this.isPing = false; + super(header, content, breakerRelease); } public NativeInboundMessage(Header header, Exception exception) { - this.header = header; - this.content = null; - this.breakerRelease = null; - this.exception = exception; - this.isPing = false; + super(header, exception); } public NativeInboundMessage(Header header, boolean isPing) { - this.header = header; - this.content = null; - this.breakerRelease = null; - this.exception = null; - this.isPing = isPing; - } - - @Override - public String getProtocol() { - return NATIVE_PROTOCOL; - } - - public Header getHeader() { - return header; - } - - public int getContentLength() { - if (content == null) { - return 0; - } else { - return content.length(); - } - } - - public Exception getException() { - return exception; - } - - public boolean isPing() { - return isPing; - } - - public boolean isShortCircuit() { - return exception != null; - } - - public Releasable takeBreakerReleaseControl() { - final Releasable toReturn = breakerRelease; - breakerRelease = null; - if (toReturn != null) { - return toReturn; - } else { - return () -> {}; - } + super(header, isPing); } public StreamInput openOrGetStreamInput() throws IOException { @@ -138,12 +82,6 @@ public StreamInput openOrGetStreamInput() throws IOException { @Override public void close() { IOUtils.closeWhileHandlingException(streamInput); - Releasables.closeWhileHandlingException(content, breakerRelease); + super.close(); } - - @Override - public String toString() { - return "InboundMessage{" + header + "}"; - } - } diff --git a/server/src/test/java/org/opensearch/transport/InboundAggregatorTests.java b/server/src/test/java/org/opensearch/transport/InboundAggregatorTests.java index 4ac78366360d7..8c8641cc63cf6 100644 --- a/server/src/test/java/org/opensearch/transport/InboundAggregatorTests.java +++ b/server/src/test/java/org/opensearch/transport/InboundAggregatorTests.java @@ -79,7 +79,7 @@ public void setUp() throws Exception { public void testInboundAggregation() throws IOException { long requestId = randomNonNegativeLong(); - Header header = new Header(randomInt(), requestId, TransportStatus.setRequest((byte) 0), Version.CURRENT); + Header header = new Header(TransportProtocol.NATIVE, randomInt(), requestId, TransportStatus.setRequest((byte) 0), Version.CURRENT); header.headers = new Tuple<>(Collections.emptyMap(), Collections.emptyMap()); header.actionName = "action_name"; // Initiate Message @@ -108,7 +108,7 @@ public void testInboundAggregation() throws IOException { } // Signal EOS - NativeInboundMessage aggregated = aggregator.finishAggregation(); + ProtocolInboundMessage aggregated = aggregator.finishAggregation(); assertThat(aggregated, notNullValue()); assertFalse(aggregated.isPing()); @@ -126,7 +126,7 @@ public void testInboundAggregation() throws IOException { public void testInboundUnknownAction() throws IOException { long requestId = randomNonNegativeLong(); - Header header = new Header(randomInt(), requestId, TransportStatus.setRequest((byte) 0), Version.CURRENT); + Header header = new Header(TransportProtocol.NATIVE, randomInt(), requestId, TransportStatus.setRequest((byte) 0), Version.CURRENT); header.headers = new Tuple<>(Collections.emptyMap(), Collections.emptyMap()); header.actionName = unknownAction; // Initiate Message @@ -139,7 +139,7 @@ public void testInboundUnknownAction() throws IOException { assertEquals(0, content.refCount()); // Signal EOS - NativeInboundMessage aggregated = aggregator.finishAggregation(); + ProtocolInboundMessage aggregated = aggregator.finishAggregation(); assertThat(aggregated, notNullValue()); assertTrue(aggregated.isShortCircuit()); @@ -150,7 +150,7 @@ public void testInboundUnknownAction() throws IOException { public void testCircuitBreak() throws IOException { circuitBreaker.startBreaking(); // Actions are breakable - Header breakableHeader = new Header(randomInt(), randomNonNegativeLong(), TransportStatus.setRequest((byte) 0), Version.CURRENT); + Header breakableHeader = new Header(TransportProtocol.NATIVE, randomInt(), randomNonNegativeLong(), TransportStatus.setRequest((byte) 0), Version.CURRENT); breakableHeader.headers = new Tuple<>(Collections.emptyMap(), Collections.emptyMap()); breakableHeader.actionName = "action_name"; // Initiate Message @@ -162,7 +162,7 @@ public void testCircuitBreak() throws IOException { content1.close(); // Signal EOS - NativeInboundMessage aggregated1 = aggregator.finishAggregation(); + ProtocolInboundMessage aggregated1 = aggregator.finishAggregation(); assertEquals(0, content1.refCount()); assertThat(aggregated1, notNullValue()); @@ -170,7 +170,7 @@ public void testCircuitBreak() throws IOException { assertThat(aggregated1.getException(), instanceOf(CircuitBreakingException.class)); // Actions marked as unbreakable are not broken - Header unbreakableHeader = new Header(randomInt(), randomNonNegativeLong(), TransportStatus.setRequest((byte) 0), Version.CURRENT); + Header unbreakableHeader = new Header(TransportProtocol.NATIVE, randomInt(), randomNonNegativeLong(), TransportStatus.setRequest((byte) 0), Version.CURRENT); unbreakableHeader.headers = new Tuple<>(Collections.emptyMap(), Collections.emptyMap()); unbreakableHeader.actionName = unBreakableAction; // Initiate Message @@ -181,7 +181,7 @@ public void testCircuitBreak() throws IOException { content2.close(); // Signal EOS - NativeInboundMessage aggregated2 = aggregator.finishAggregation(); + ProtocolInboundMessage aggregated2 = aggregator.finishAggregation(); assertEquals(1, content2.refCount()); assertThat(aggregated2, notNullValue()); @@ -189,7 +189,7 @@ public void testCircuitBreak() throws IOException { // Handshakes are not broken final byte handshakeStatus = TransportStatus.setHandshake(TransportStatus.setRequest((byte) 0)); - Header handshakeHeader = new Header(randomInt(), randomNonNegativeLong(), handshakeStatus, Version.CURRENT); + Header handshakeHeader = new Header(TransportProtocol.NATIVE, randomInt(), randomNonNegativeLong(), handshakeStatus, Version.CURRENT); handshakeHeader.headers = new Tuple<>(Collections.emptyMap(), Collections.emptyMap()); handshakeHeader.actionName = "handshake"; // Initiate Message @@ -200,7 +200,7 @@ public void testCircuitBreak() throws IOException { content3.close(); // Signal EOS - NativeInboundMessage aggregated3 = aggregator.finishAggregation(); + ProtocolInboundMessage aggregated3 = aggregator.finishAggregation(); assertEquals(1, content3.refCount()); assertThat(aggregated3, notNullValue()); @@ -209,7 +209,7 @@ public void testCircuitBreak() throws IOException { public void testCloseWillCloseContent() { long requestId = randomNonNegativeLong(); - Header header = new Header(randomInt(), requestId, TransportStatus.setRequest((byte) 0), Version.CURRENT); + Header header = new Header(TransportProtocol.NATIVE, randomInt(), requestId, TransportStatus.setRequest((byte) 0), Version.CURRENT); header.headers = new Tuple<>(Collections.emptyMap(), Collections.emptyMap()); header.actionName = "action_name"; // Initiate Message @@ -249,7 +249,7 @@ public void testFinishAggregationWillFinishHeader() throws IOException { } else { actionName = "action_name"; } - Header header = new Header(randomInt(), requestId, TransportStatus.setRequest((byte) 0), Version.CURRENT); + Header header = new Header(TransportProtocol.NATIVE, randomInt(), requestId, TransportStatus.setRequest((byte) 0), Version.CURRENT); // Initiate Message aggregator.headerReceived(header); @@ -264,7 +264,7 @@ public void testFinishAggregationWillFinishHeader() throws IOException { content.close(); // Signal EOS - NativeInboundMessage aggregated = aggregator.finishAggregation(); + ProtocolInboundMessage aggregated = aggregator.finishAggregation(); assertThat(aggregated, notNullValue()); assertFalse(header.needsToReadVariableHeader()); diff --git a/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java b/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java index 2553e7740990b..7a2e79fa8cc1b 100644 --- a/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java @@ -215,7 +215,7 @@ public TestResponse read(StreamInput in) throws IOException { false ); BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize); - Header requestHeader = new Header(fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version); + Header requestHeader = new Header(TransportProtocol.NATIVE, fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version); NativeInboundMessage requestMessage = new NativeInboundMessage( requestHeader, ReleasableBytesReference.wrap(requestContent), @@ -240,7 +240,7 @@ public TestResponse read(StreamInput in) throws IOException { BytesReference fullResponseBytes = channel.getMessageCaptor().get(); BytesReference responseContent = fullResponseBytes.slice(headerSize, fullResponseBytes.length() - headerSize); - Header responseHeader = new Header(fullResponseBytes.length() - 6, requestId, responseStatus, version); + Header responseHeader = new Header(TransportProtocol.NATIVE, fullResponseBytes.length() - 6, requestId, responseStatus, version); NativeInboundMessage responseMessage = new NativeInboundMessage( responseHeader, ReleasableBytesReference.wrap(responseContent), @@ -267,6 +267,7 @@ public void testSendsErrorResponseToHandshakeFromCompatibleVersion() throws Exce final Version remoteVersion = VersionUtils.randomCompatibleVersion(random(), version); final long requestId = randomNonNegativeLong(); final Header requestHeader = new Header( + TransportProtocol.NATIVE, between(0, 100), requestId, TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), @@ -307,6 +308,7 @@ public void testClosesChannelOnErrorInHandshakeWithIncompatibleVersion() throws final Version remoteVersion = Version.fromId(randomIntBetween(0, version.minimumCompatibilityVersion().id - 1)); final long requestId = randomNonNegativeLong(); final Header requestHeader = new Header( + TransportProtocol.NATIVE, between(0, 100), requestId, TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), @@ -338,6 +340,7 @@ public void testLogsSlowInboundProcessing() throws Exception { final Version remoteVersion = Version.CURRENT; final long requestId = randomNonNegativeLong(); final Header requestHeader = new Header( + TransportProtocol.NATIVE, between(0, 100), requestId, TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), @@ -424,7 +427,7 @@ public void onResponseSent(long requestId, String action, Exception error) { BytesReference fullRequestBytes = BytesReference.fromByteBuffer((ByteBuffer) buffer.flip()); BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize); - Header requestHeader = new Header(fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version); + Header requestHeader = new Header(TransportProtocol.NATIVE, fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version); NativeInboundMessage requestMessage = new NativeInboundMessage( requestHeader, ReleasableBytesReference.wrap(requestContent), @@ -493,7 +496,7 @@ public void onResponseSent(long requestId, String action, Exception error) { ); // Create the request payload by intentionally stripping 1 byte away BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize - 1); - Header requestHeader = new Header(fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version); + Header requestHeader = new Header(TransportProtocol.NATIVE, fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version); NativeInboundMessage requestMessage = new NativeInboundMessage( requestHeader, ReleasableBytesReference.wrap(requestContent), @@ -561,7 +564,7 @@ public TestResponse read(StreamInput in) throws IOException { false ); BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize); - Header requestHeader = new Header(fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version); + Header requestHeader = new Header(TransportProtocol.NATIVE, fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version); NativeInboundMessage requestMessage = new NativeInboundMessage( requestHeader, ReleasableBytesReference.wrap(requestContent), @@ -587,7 +590,7 @@ public TestResponse read(StreamInput in) throws IOException { BytesReference fullResponseBytes = BytesReference.fromByteBuffer((ByteBuffer) buffer.flip()); BytesReference responseContent = fullResponseBytes.slice(headerSize, fullResponseBytes.length() - headerSize); - Header responseHeader = new Header(fullResponseBytes.length() - 6, requestId, responseStatus, version); + Header responseHeader = new Header(TransportProtocol.NATIVE, fullResponseBytes.length() - 6, requestId, responseStatus, version); NativeInboundMessage responseMessage = new NativeInboundMessage( responseHeader, ReleasableBytesReference.wrap(responseContent), @@ -655,7 +658,7 @@ public TestResponse read(StreamInput in) throws IOException { false ); BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize); - Header requestHeader = new Header(fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version); + Header requestHeader = new Header(TransportProtocol.NATIVE, fullRequestBytes.length() - 6, requestId, TransportStatus.setRequest((byte) 0), version); NativeInboundMessage requestMessage = new NativeInboundMessage( requestHeader, ReleasableBytesReference.wrap(requestContent), @@ -676,7 +679,7 @@ public TestResponse read(StreamInput in) throws IOException { BytesReference fullResponseBytes = channel.getMessageCaptor().get(); // Create the response payload by intentionally stripping 1 byte away BytesReference responseContent = fullResponseBytes.slice(headerSize, fullResponseBytes.length() - headerSize - 1); - Header responseHeader = new Header(fullResponseBytes.length() - 6, requestId, responseStatus, version); + Header responseHeader = new Header(TransportProtocol.NATIVE, fullResponseBytes.length() - 6, requestId, responseStatus, version); NativeInboundMessage responseMessage = new NativeInboundMessage( responseHeader, ReleasableBytesReference.wrap(responseContent),