From 7de433aaf264d1cf3ad3fc13fadf8108ee46976d Mon Sep 17 00:00:00 2001 From: Richard Bair Date: Fri, 13 Dec 2024 17:57:08 -0800 Subject: [PATCH] Significant new tests and bug fixes to the PbjProtocolHandler Signed-off-by: Richard Bair --- .../pbj/grpc/helidon/PbjProtocolHandler.java | 443 +++--- .../grpc/helidon/PbjProtocolHandlerTest.java | 1387 +++++++++++++---- .../runtime/io/WritableSequentialData.java | 14 +- .../hedera/pbj/runtime/io/buffer/Bytes.java | 16 +- 4 files changed, 1313 insertions(+), 547 deletions(-) diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java index 8bd00bd0..48ed585c 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java @@ -23,6 +23,11 @@ import static com.hedera.pbj.runtime.grpc.ServiceInterface.RequestOptions.APPLICATION_GRPC; import static com.hedera.pbj.runtime.grpc.ServiceInterface.RequestOptions.APPLICATION_GRPC_JSON; import static com.hedera.pbj.runtime.grpc.ServiceInterface.RequestOptions.APPLICATION_GRPC_PROTO; +import static io.helidon.http.Status.OK_200; +import static io.helidon.http.http2.Http2StreamState.CLOSED; +import static io.helidon.http.http2.Http2StreamState.HALF_CLOSED_LOCAL; +import static io.helidon.http.http2.Http2StreamState.HALF_CLOSED_REMOTE; +import static io.helidon.http.http2.Http2StreamState.OPEN; import static java.lang.System.Logger.Level.DEBUG; import static java.lang.System.Logger.Level.ERROR; import static java.util.Collections.emptyList; @@ -66,7 +71,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Pattern; /** * Implementation of gRPC based on PBJ. This class specifically contains the glue logic for bridging @@ -84,16 +88,20 @@ final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHa private static final Header GRPC_ENCODING_IDENTITY = HeaderValues.createCached("grpc-encoding", IDENTITY); - /** The regular expression used to parse the grpc-timeout header. */ - private static final String GRPC_TIMEOUT_REGEX = "(\\d{1,8})([HMSmun])"; - - private static final Pattern GRPC_TIMEOUT_PATTERN = Pattern.compile(GRPC_TIMEOUT_REGEX); + /** Simple implementation of the {@link ServiceInterface.RequestOptions} interface. */ + private record Options( + Optional authority, boolean isProtobuf, boolean isJson, String contentType) + implements ServiceInterface.RequestOptions {} - // Helidon-specific fields related to the connection itself + /** The headers that were sent with the request. */ private final Http2Headers headers; + /** The stream writer that is used to send data back to the client. */ private final Http2StreamWriter streamWriter; + /** The stream id of the connection. */ private final int streamId; + /** The flow control for the connection, just a pass-through variable, we don't use it directly */ private final FlowControl.Outbound flowControl; + /** The current state of the stream, initially OPEN */ private final AtomicReference currentStreamState; /** @@ -102,11 +110,13 @@ final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHa */ private final PbjMethodRoute route; + /** The configuration for the PBJ implementation. */ private final PbjConfig config; /** * If there is a timeout defined for the request, then this detector is used to determine when - * the timeout deadline has been met. The detector runs on a background thread/timer. + * the timeout deadline has been met. The detector will schedule a callback to be invoked when + * the deadline has been exceeded. */ private final DeadlineDetector deadlineDetector; @@ -125,42 +135,19 @@ final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHa /** * The bytes of the next incoming message. This is created dynamically as a message is received, - * and is never larger than the system configured {@link PbjConfig#maxMessageSizeBytes()}. + * and is never larger than the system configured {@link PbjConfig#maxMessageSizeBytes()}. We have to + * use a {@link BytesHolder} because the bytes for the entity may be split across multiple frames. * *

This member is only accessed by the {@link #data} method, which is called sequentially. */ - private byte[] entityBytes = null; + private BytesHolder entityBytes = null; /** - * The current index into {@link #entityBytes} into which data is to be read. - * - *

This member is only accessed by the {@link #data} method, which is called sequentially. + * The bytes we have so far read that form the "Length-Prefix", which is exactly 5 bytes in length. + * We have to use a {@link BytesHolder} because the bytes for the "length prefix" (a combination of + * a byte for compression and 4 bytes for length) may be split across multiple frames. */ - private int entityBytesIndex = 0; - - /** States for currentReadState state ,machine */ - enum ReadState { - /** - * Start state, when we are looking for first byte that says if data is compressed or not - */ - START, - /** - * State were we are reading length, can be partial length of final point when we have all - * length bytes - */ - READ_LENGTH, - /** State where we are reading the protobuf entity bytes */ - READ_ENTITY_BYTES - } - - /** State machine as we read bytes from incoming data */ - private ReadState currentReadState = ReadState.START; - - /** Number of read bytes between 0 and {@code Integer.BYTES} = 4 */ - private int numOfPartReadBytes = 0; - - /** Byte array to store bytes as we build up to a full 4 byte integer */ - private final byte[] partReadLengthBytes = new byte[Integer.BYTES]; + private BytesHolder lengthPrefixBytes = null; /** * The communication pipeline between server and client @@ -170,7 +157,7 @@ enum ReadState { * *

Method calls on this object are thread-safe. */ - private Pipeline pipeline; + private final AtomicReference> pipeline = new AtomicReference<>(); /** Create a new instance */ PbjProtocolHandler( @@ -192,6 +179,22 @@ enum ReadState { this.deadlineDetector = requireNonNull(deadlineDetector); } + @Override + @NonNull + public Http2StreamState streamState() { + return currentStreamState.get(); + } + + @Override + public void rstStream(@NonNull final Http2RstStream rstStream) { + pipeline.get().onComplete(); + } + + @Override + public void windowUpdate(@NonNull final Http2WindowUpdate update) { + // Nothing to do because we're not passing flow control to the pipeline yet + } + /** * Called at the very beginning of the request, before any data has arrived. At this point we * can look at the request headers and determine whether we have a valid request, and do any @@ -209,8 +212,7 @@ public void init() { // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md // In addition, "application/grpc" is interpreted as "application/grpc+proto". final var requestHeaders = headers.httpHeaders(); - final var requestContentType = - requestHeaders.contentType().orElse(null); + final var requestContentType = requestHeaders.contentType().orElse(null); final var ct = requestContentType == null ? "" : requestContentType.text(); final var contentType = switch (ct) { @@ -273,16 +275,9 @@ public void init() { // any compression. // If the grpc-timeout header is present, determine when that timeout would occur, or - // default to a future that is so far in the future it will never happen. + // default to a value that will be understood as to never happen. final var timeout = requestHeaders.value(GRPC_TIMEOUT); - deadlineFuture = - timeout.map(this::scheduleDeadline).orElse(new NoopScheduledFuture<>()); - - // At this point, the request itself is valid. Maybe it will still fail to be handled by - // the service interface, but as far as the protocol is concerned, this was a valid - // request. Send the headers back to the client (the messages and trailers will be sent - // later). - sendResponseHeaders(GRPC_ENCODING_IDENTITY, requestContentType, emptyList()); + final long deadline = timeout.map(this::determineDeadline).orElse(Long.MIN_VALUE); // NOTE: The specification mentions the "Message-Type" header. Like everyone else, we're // just going to ignore that header. See https://github.com/grpc/grpc/issues/12468. @@ -292,6 +287,12 @@ public void init() { // We should have a wrapper around them, such that we don't process the custom headers // ourselves, but allow the service interface to look up special headers based on key. + // At this point, the request itself is valid. Maybe it will still fail to be handled by + // the service interface, but as far as the protocol is concerned, this was a valid + // request. Send the headers back to the client (the messages and trailers will be sent + // later). + sendResponseHeaders(GRPC_ENCODING_IDENTITY, contentType, emptyList()); + // Create the "options" to make available to the ServiceInterface. These options are // used to decide on the best way to parse or handle the request. final var options = @@ -305,7 +306,12 @@ public void init() { // This is given to the "open" method on the service to allow it to send messages to // the client. final Pipeline outgoing = new SendToClientSubscriber(); - pipeline = route.service().open(route.method(), options, outgoing); + pipeline.set(route.service().open(route.method(), options, outgoing)); + + // We can now create the deadline future because we have a pipeline. + deadlineFuture = deadline == Long.MIN_VALUE + ? new NoopScheduledFuture<>() + : deadlineDetector.scheduleDeadline(deadline, this::deadlineExceeded); } catch (final GrpcException grpcException) { route.failedGrpcRequestCounter().increment(); new TrailerOnlyBuilder() @@ -329,22 +335,6 @@ public void init() { } } - @Override - @NonNull - public Http2StreamState streamState() { - return currentStreamState.get(); - } - - @Override - public void rstStream(@NonNull final Http2RstStream rstStream) { - pipeline.onComplete(); - } - - @Override - public void windowUpdate(@NonNull final Http2WindowUpdate update) { - // Nothing to do - } - /** * Called by the webserver whenever some additional data is available on the stream. The data * comes in chunks, it may be that an entire message is available in the chunk, or it may be @@ -352,112 +342,110 @@ public void windowUpdate(@NonNull final Http2WindowUpdate update) { */ @Override public void data(@NonNull final Http2FrameHeader header, @NonNull final BufferData data) { - Objects.requireNonNull(header); - Objects.requireNonNull(data); + requireNonNull(header); + requireNonNull(data); + + if (currentStreamState.get() != OPEN) { + // We shouldn't have received this data. If the stream was CLOSED or HALF_CLOSED_REMOTE, then we definitely + // should not have received it. If it is HALF_CLOSED_LOCAL, then it is possible to receive it because the + // client didn't know any better yet, but in that case, we can just bail. No point handling it. + return; + } + + // If END_STREAM is set on the frame, then there is no remaining data from the client, so we can set + // the stream to HALF_CLOSED_REMOTE (we might still send the client data, but they're half closed now) + final var eos = header.flags(Http2FrameTypes.DATA).endOfStream(); + if (eos) { + currentStreamState.set(HALF_CLOSED_REMOTE); + } try { - // NOTE: if the deadline is exceeded, then the stream will be closed and data will no - // longer flow. + // NOTE: if the deadline is exceeded, then the stream will be closed and data will no longer flow. // There is some asynchronous behavior here, but in the worst case, we handle a few more // bytes before the stream is closed. while (data.available() > 0) { - switch (currentReadState) { - case START: - { - // Read whether this message is compressed. We do not currently support - // compression. - final var isCompressed = (data.read() == 1); - if (isCompressed) { - // The error will eventually result in the stream being closed - throw new GrpcException( - GrpcStatus.UNIMPLEMENTED, "Compression is not supported"); - } - currentReadState = ReadState.READ_LENGTH; - numOfPartReadBytes = 0; - break; - } - case READ_LENGTH: - { - // if I have not read a full int yet then read more from available bytes - if (numOfPartReadBytes < Integer.BYTES) { - // we do not have enough bytes yet to read a 4 byte int - // read the bytes we do have and store them for next time - final int bytesToRead = - Math.min( - data.available(), - Integer.BYTES - numOfPartReadBytes); - data.read(partReadLengthBytes, numOfPartReadBytes, bytesToRead); - numOfPartReadBytes += bytesToRead; - } - // check if we have read all the 4 bytes of the length int32 - if (numOfPartReadBytes == Integer.BYTES) { - final long length = - ((long) partReadLengthBytes[0] & 0xFF) << 24 - | ((long) partReadLengthBytes[1] & 0xFF) << 16 - | ((long) partReadLengthBytes[2] & 0xFF) << 8 - | ((long) partReadLengthBytes[3] & 0xFF); - if (length > config.maxMessageSizeBytes()) { - throw new GrpcException( - GrpcStatus.INVALID_ARGUMENT, - "Message size exceeds maximum allowed size"); - } - // Create a buffer to hold the message. We sadly cannot reuse this buffer - // because once we have filled it and wrapped it in Bytes and sent it to the - // handler, some user code may grab and hold that Bytes object for an arbitrary - // amount of time, and if we were to scribble into the same byte array, we - // would break the application. So we need a new buffer each time :-( - entityBytes = new byte[(int) length]; - entityBytesIndex = 0; - // done with length now, so move on to next state - currentReadState = ReadState.READ_ENTITY_BYTES; - } - break; - } - case READ_ENTITY_BYTES: - { - // By the time we get here, entityBytes is no longer null. It may be empty, or it - // may already have been partially populated from a previous iteration. It may be - // that the number of bytes available to be read is larger than just this one - // message. So we need to be careful to read, from what is available, only up to - // the message length, and to leave the rest for the next iteration. - final int available = data.available(); - final int numBytesToRead = - Math.min(entityBytes.length - entityBytesIndex, available); - data.read(entityBytes, entityBytesIndex, numBytesToRead); - entityBytesIndex += numBytesToRead; - - // If we have completed reading the message, then we can proceed. - if (entityBytesIndex == entityBytes.length) { - currentReadState = ReadState.START; - // Grab and wrap the bytes and reset to being reading the next - // message - final var bytes = Bytes.wrap(entityBytes); - pipeline.onNext(bytes); - entityBytesIndex = 0; - entityBytes = null; - } - break; - } + // If we don't have a length prefix yet, then we need to read the length prefix. We reset this to + // null after we have read all the message bytes in preparation for the next message. + if (lengthPrefixBytes == null) { + lengthPrefixBytes = new BytesHolder(5); + } + + // It will be full after all 5 bytes have been read. Otherwise, we have to keep reading. + if (!lengthPrefixBytes.isFull()) { + // Try to read all remaining bytes + lengthPrefixBytes.read(data); + if (!lengthPrefixBytes.isFull()) break; + + // We have now read all 5 of the bytes. Check whether this message is compressed. + // We do not currently support compression. + final var prefix = lengthPrefixBytes.toBytes(); + final var isCompressed = prefix.getByte(0) == 1; + if (isCompressed) { + // The error will eventually result in the stream being closed + throw new GrpcException( + GrpcStatus.UNIMPLEMENTED, "Compression is not supported"); + } + + // Read the length of the message + final var length = prefix.getInt(1); + if (length > config.maxMessageSizeBytes()) { + throw new GrpcException( + GrpcStatus.INVALID_ARGUMENT, + "Message size exceeds maximum allowed size"); + } + + // Create a buffer to hold the message. We sadly cannot reuse this buffer + // because once we have filled it and wrapped it in Bytes and sent it to the + // handler, some user code may grab and hold that Bytes object for an arbitrary + // amount of time, and if we were to scribble into the same byte array, we + // would break the application. So we need a new buffer each time :-( + entityBytes = new BytesHolder(length); + } + + // By the time we get here, entityBytes is no longer null. It may be empty, or it + // may already have been partially populated from a previous iteration. It may be + // that the number of bytes available to be read is larger than just this one + // message. So we need to be careful to read, from what is available, only up to + // the message length, and to leave the rest for the next iteration. + if (!entityBytes.isFull()) { + entityBytes.read(data); + if (entityBytes.isFull()) { + final var bytes = entityBytes.toBytes(); + pipeline.get().onNext(bytes); + // We've finished reading this message, so we reset these fields to be + // prepared for the next message. + entityBytes = null; + lengthPrefixBytes = null; + } } } // The end of the stream has been reached! It is possible that a bad client will send - // end of stream before all the message data we sent. In that case, it is as if the + // end of stream before all the message data was sent. In that case, it is as if the // message were never sent. - if (header.flags(Http2FrameTypes.DATA).endOfStream()) { - entityBytesIndex = 0; + if (eos) { + // I don't need to clear these, but it makes me feel better entityBytes = null; - currentStreamState.set(Http2StreamState.HALF_CLOSED_REMOTE); - pipeline.clientEndStreamReceived(); + lengthPrefixBytes = null; + // Let the app know that the stream has ended so it can clean up any state it has + pipeline.get().clientEndStreamReceived(); } } catch (final Exception e) { // I have to propagate this error through the service interface, so it can respond to // errors in the connection, tear down resources, etc. It will also forward this on // to the client, causing the connection to be torn down. - pipeline.onError(e); + pipeline.get().onError(e); } } + /** + * Called when the deadline has been exceeded. + */ + private void deadlineExceeded() { + route.deadlineExceededCounter().increment(); + pipeline.get().onError(new GrpcException(GrpcStatus.DEADLINE_EXCEEDED)); + } + /** * An error has occurred. Cancel the deadline future if it's still active, and set the stream * state accordingly. @@ -468,16 +456,13 @@ private void error() { // Canceling a future that has already completed has no effect. So by canceling here, we are saying: // "If you have not yet executed, never execute. If you have already executed, then just ignore me". // The "isCancelled" flag is set if the future was canceled before it was executed. - - // cancel is threadsafe deadlineFuture.cancel(false); - currentStreamState.set(Http2StreamState.CLOSED); + advanceTowardsClosed(); } /** - * Helper function. Given a string of digits followed by a unit, schedule a callback to be - * invoked when the deadline is exceeded, and return the associated future. The proper format of - * the string is defined in the specification as: + * Helper function. Given a string of digits followed by a unit, compute the number of nanoseconds until the + * deadline will be exceeded. The proper format of the string is defined in the specification as: * *

      *      Timeout → "grpc-timeout" TimeoutValue TimeoutUnit
@@ -491,43 +476,56 @@ private void error() {
      *             Nanosecond → "n"
      * 
* - *

Illegal values result in the deadline being ignored. + *

Illegal values will result in an error (INVALID_ARGUMENT). The spec is not clear on the expected + * behavior in this case, but the go GRPC implementation throws an error (although, it throws a 400 BAD REQUEST + * HTTP response code and an INTERNAL grpc code, both of which seem wrong). * * @param timeout The timeout value. Cannot be null. - * @return The future representing the task that will be executed if/when the deadline is - * reached. + * @return the number of nanoseconds into the future at which point this request is considered to have timed out. */ - @NonNull - private ScheduledFuture scheduleDeadline(@NonNull final String timeout) { - final var matcher = GRPC_TIMEOUT_PATTERN.matcher(timeout); - if (matcher.matches()) { - final var num = Integer.parseInt(matcher.group(1)); - final var unit = matcher.group(2); - final var deadline = - System.nanoTime() - * TimeUnit.NANOSECONDS.convert( - num, - switch (unit) { - case "H" -> TimeUnit.HOURS; - case "M" -> TimeUnit.MINUTES; - case "S" -> TimeUnit.SECONDS; - case "m" -> TimeUnit.MILLISECONDS; - case "u" -> TimeUnit.MICROSECONDS; - case "n" -> TimeUnit.NANOSECONDS; - // This should NEVER be reachable, because the matcher - // would not have matched. - default -> throw new GrpcException( - GrpcStatus.INTERNAL, "Invalid unit: " + unit); - }); - return deadlineDetector.scheduleDeadline( - deadline, - () -> { - route.deadlineExceededCounter().increment(); - pipeline.onError(new GrpcException(GrpcStatus.DEADLINE_EXCEEDED)); + private long determineDeadline(@NonNull final String timeout) { + if (timeout.length() > 1 && timeout.length() <= 9) { + final var num = parseTimeoutValue(timeout); + final var unit = timeout.charAt(timeout.length() - 1); + return TimeUnit.NANOSECONDS.convert( + num, + switch (unit) { + case 'H' -> TimeUnit.HOURS; + case 'M' -> TimeUnit.MINUTES; + case 'S' -> TimeUnit.SECONDS; + case 'm' -> TimeUnit.MILLISECONDS; + case 'u' -> TimeUnit.MICROSECONDS; + case 'n' -> TimeUnit.NANOSECONDS; + // This should NEVER be reachable, because the matcher + // would not have matched. + default -> throw new GrpcException( + GrpcStatus.INVALID_ARGUMENT, "Invalid unit: " + unit); }); } - return new NoopScheduledFuture<>(); + throw new GrpcException(GrpcStatus.INVALID_ARGUMENT, "Invalid timeout: " + timeout); + } + + /** + * Simple function that parses the timeout value from the string, or throws a {@link GrpcException} if parsing + * fails. The constraints on the timeout value are tighter than for normal numbers, so this function is very + * simple. + * + * @param timeout The timeout, cannot be null, must have at least 2 chars, the value will be all chars but the last. + * @return the parsed number + * @throws GrpcException if the timeout is not a valid number + */ + private int parseTimeoutValue(@NonNull final String timeout) { + int num = 0; + for (int i = 0; i < timeout.length() - 1; i++) { + final var c = timeout.charAt(i); + if (c < '0' || c > '9') { + throw new GrpcException(GrpcStatus.INVALID_ARGUMENT, "Invalid timeout: " + timeout); + } + num = num * 10 + (c - '0'); + } + + return num; } /** @@ -544,7 +542,7 @@ private ScheduledFuture scheduleDeadline(@NonNull final String timeout) { */ private void sendResponseHeaders( @Nullable final Header messageEncoding, - @NonNull final HttpMediaType contentType, + @NonNull final String contentType, @NonNull final List

customMetadata) { // Some headers are http2 specific, the rest are used for the grpc protocol @@ -553,18 +551,13 @@ private void sendResponseHeaders( // Since this has to be sent before we have any data to send, we must know ahead of time // which custom headers are to be returned. grpcHeaders.set(HeaderNames.TRAILER, "grpc-status, grpc-message"); - grpcHeaders.set(Http2Headers.STATUS_NAME, Status.OK_200.code()); - grpcHeaders.contentType(contentType); + grpcHeaders.set(Http2Headers.STATUS_NAME, OK_200.code()); + grpcHeaders.contentType(HttpMediaType.create(contentType)); grpcHeaders.set(GRPC_ACCEPT_ENCODING, IDENTITY); customMetadata.forEach(grpcHeaders::set); - if (messageEncoding != null) { - grpcHeaders.set(messageEncoding); - } - - final var http2Headers = Http2Headers.create(grpcHeaders); - + grpcHeaders.set(Objects.requireNonNullElse(messageEncoding, GRPC_ENCODING_IDENTITY)); streamWriter.writeHeaders( - http2Headers, + Http2Headers.create(grpcHeaders), streamId, Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS), flowControl); @@ -649,7 +642,7 @@ protected void send( * It extends {@link TrailerBuilder} and delegates to its parent to send common headers. */ private class TrailerOnlyBuilder extends TrailerBuilder { - private Status httpStatus = Status.OK_200; + private Status httpStatus = OK_200; private final HttpMediaType contentType = APPLICATION_GRPC_PROTO_TYPE; /** The HTTP Status to return in these trailers. The status will default to 200 OK. */ @@ -733,30 +726,25 @@ public void onError(@NonNull final Throwable throwable) { @Override public void onComplete() { - new TrailerBuilder().send(); - deadlineFuture.cancel(false); - - currentStreamState.getAndUpdate( - currentValue -> { - if (requireNonNull(currentValue) == Http2StreamState.OPEN) { - return Http2StreamState.HALF_CLOSED_LOCAL; - } - return Http2StreamState.CLOSED; - }); + new TrailerBuilder().send(); + advanceTowardsClosed(); } } - /** Simple implementation of the {@link ServiceInterface.RequestOptions} interface. */ - private record Options( - Optional authority, boolean isProtobuf, boolean isJson, String contentType) - implements ServiceInterface.RequestOptions {} + /** Transitions to HALF_CLOSED_LOCAL, or CLOSED, based on the rules for state transitions. */ + private void advanceTowardsClosed() { + currentStreamState.getAndUpdate(s -> s == OPEN ? HALF_CLOSED_LOCAL : CLOSED); + } /** * A {@link ScheduledFuture} that does nothing. This is used when there is no deadline set for * the request. A new instance of this must be created (or we need a "reset" method) for each * {@link PbjProtocolHandler} instance, because it can become "corrupted" if canceled from any * particular call. + * + *

Instances of {@link NoopScheduledFuture} are NEVER scheduled. They just exist to avoid + * NullPointerExceptions or complicated logic in the code. */ private static final class NoopScheduledFuture extends CompletableFuture implements ScheduledFuture { @@ -773,4 +761,29 @@ public int compareTo(@NonNull final Delayed o) { return (int) (o.getDelay(TimeUnit.NANOSECONDS)); } } + + /** + * A utility class that will read data from a {@link BufferData} object, until it has read all it needed to. + */ + private static final class BytesHolder { + private int readLen = 0; + private final byte[] bytes; + + BytesHolder(int len) { + bytes = new byte[len]; + } + + void read(BufferData data) { + int len = data.read(bytes, readLen, bytes.length - readLen); + readLen += len; + } + + boolean isFull() { + return readLen == bytes.length; + } + + Bytes toBytes() { + return Bytes.wrap(bytes); + } + } } diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandlerTest.java b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandlerTest.java index f5a878a3..c6eeb192 100644 --- a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandlerTest.java +++ b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandlerTest.java @@ -16,15 +16,29 @@ package com.hedera.pbj.grpc.helidon; +import static com.hedera.pbj.runtime.grpc.GrpcStatus.DEADLINE_EXCEEDED; +import static com.hedera.pbj.runtime.grpc.GrpcStatus.INVALID_ARGUMENT; +import static com.hedera.pbj.runtime.grpc.GrpcStatus.OK; +import static io.helidon.http.Status.OK_200; +import static io.helidon.http.http2.Http2StreamState.CLOSED; +import static io.helidon.http.http2.Http2StreamState.HALF_CLOSED_LOCAL; +import static java.time.Duration.ofHours; +import static java.time.Duration.ofMillis; +import static java.time.Duration.ofMinutes; +import static java.time.Duration.ofNanos; +import static java.time.Duration.ofSeconds; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.entry; +import static org.assertj.core.api.Assertions.fail; +import com.google.protobuf.InvalidProtocolBufferException; import com.hedera.pbj.grpc.helidon.config.PbjConfig; -import com.hedera.pbj.runtime.grpc.GrpcException; import com.hedera.pbj.runtime.grpc.GrpcStatus; import com.hedera.pbj.runtime.grpc.Pipeline; import com.hedera.pbj.runtime.grpc.ServiceInterface; +import com.hedera.pbj.runtime.io.buffer.BufferedData; import com.hedera.pbj.runtime.io.buffer.Bytes; import edu.umd.cs.findbugs.annotations.NonNull; import greeter.HelloReply; @@ -45,10 +59,10 @@ import io.helidon.http.http2.Http2StreamWriter; import io.helidon.metrics.api.Metrics; import io.netty.handler.codec.http2.Http2Flags; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; +import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Delayed; import java.util.concurrent.Flow; import java.util.concurrent.ScheduledFuture; @@ -56,362 +70,1068 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +/** + * The primary set of test cases for gRPC handling in Helidon with PBJ. + * + *

The tests in this class were created by reviewing the + * gRPC over HTTP2 specification, and by + * checking on the behavior of the go gRPC server implementation. The specification and primary implementation were not + * always in agreement. Where there was a discrepancy, it was noted in these tests. + * + *

These tests assume Helidon handles HTTP2 correctly. However, the {@link PbjProtocolHandler} is responsible for + * setting some HTTP2 values, such as advancing the flow control window correctly, and handling the stream status + * correctly, so those things are checked. + * + *

To understand these tests, you need to be familiar with the gRPC over HTTP2 specification, and with the HTTP/2 + * specification. At a high level, the HTTP/2 specification defines "streams", "frames", and "states". Over a single + * physical connection a client and server can multiplex many streams. Data sent or received for each stream is "framed" + * and transmitted over the connection. The connection "state" indicates the lifecycle phase of the stream. In these + * tests, it is critical to verify the state of the server during various scenarios, and to verify that the server is + * sending the correct frames back to the client. We are not worried with basic HTTP/2 framing and rules, since Helidon + * handles that. We only need to worry about the header and data frames that make up the main part of the lifecycle of + * the stream, since this is the part that {@link PbjProtocolHandler} impacts. + */ class PbjProtocolHandlerTest { - private Http2Headers headers; - private StreamWriterStub streamWriter; - private int streamId; - private OutboundFlowControlStub flowControl; - private Http2StreamState currentStreamState; - private PbjConfigStub config; - private PbjMethodRoute route; - private DeadlineDetectorStub deadlineDetector; - private ServiceInterfaceStub service; @BeforeEach void setUp() { - headers = Http2Headers.create(WritableHeaders.create() - .add(HeaderNames.CONTENT_TYPE, "application/grpc+proto")); - streamWriter = new StreamWriterStub(); - streamId = 1; - flowControl = new OutboundFlowControlStub(); - currentStreamState = Http2StreamState.OPEN; - config = new PbjConfigStub(); - service = new ServiceInterfaceStub(); - route = new PbjMethodRoute(service, ServiceInterfaceStub.METHOD); - deadlineDetector = new DeadlineDetectorStub(); - - Metrics.globalRegistry().close(); // reset the counters - assertThat(route.requestCounter().count()).isZero(); - assertThat(route.failedGrpcRequestCounter().count()).isZero(); - assertThat(route.failedHttpRequestCounter().count()).isZero(); - assertThat(route.failedUnknownRequestCounter().count()).isZero(); + // The Helidon metrics are global, so we need to reset them before each test, because we want to verify + // in EVERY TEST the impact of that test on the metrics, to make sure the metrics are useful and correct. + Metrics.globalRegistry().close(); } /** - * If the content-type is missing, or does not start with "application/grpc", the server should respond with a 415 - * Unsupported Media Type and the stream state should end up CLOSED. See - * - */ - @ValueSource(strings = {"", "text/plain", "application/json"}) - @ParameterizedTest - void unsupportedContentType(String contentType) { - final var h = WritableHeaders.create(); - if (!contentType.isBlank()) h.add(HeaderNames.CONTENT_TYPE, contentType); - headers = Http2Headers.create(h); - - // Initializing the handler will throw an error because the content types are unsupported - final var handler = new PbjProtocolHandler( - headers, streamWriter, streamId, flowControl, currentStreamState, config, route, deadlineDetector); - handler.init(); - - // Even though the request failed, it was made, and should be counted - assertThat(route.requestCounter().count()).isEqualTo(1); - // And since it failed the failed counter should be incremented - assertThat(route.failedGrpcRequestCounter().count()).isZero(); - assertThat(route.failedHttpRequestCounter().count()).isEqualTo(1); - assertThat(route.failedUnknownRequestCounter().count()).isZero(); - - // Check the HTTP2 response header frame was error 415 - assertThat(streamWriter.writtenHeaders).hasSize(1); - assertThat(streamWriter.writtenDataFrames).isEmpty(); - final var responseHeaderFrame = streamWriter.writtenHeaders.getFirst(); - assertThat(responseHeaderFrame.status()).isEqualTo(Status.UNSUPPORTED_MEDIA_TYPE_415); - - // I verified with the go GRPC server its behavior in this scenario. The following headers should be - // available in the response - // Content-Type: application/grpc - // Grpc-Message: invalid gRPC request content-type "" - // Grpc-Status: 3 - final var responseHeaders = responseHeaderFrame.httpHeaders().stream() - .collect(Collectors.toMap(Header::name, Header::values)); - assertThat(responseHeaders).contains( - entry("grpc-status", "" + GrpcStatus.INVALID_ARGUMENT.ordinal()), - entry("grpc-message", UriEncoding.encodeUri("invalid gRPC request content-type \"" + contentType + "\"")), - entry("Content-Type", "application/grpc"), - entry("grpc-accept-encoding", "identity")); - - // The stream should be closed - assertThat(handler.streamState()).isEqualTo(Http2StreamState.CLOSED); - } - - /** - * If the content type is "application/grpc", then it is treated as "application/grpc+proto". - */ - @Test - void contentTypeIsNormalized() { - final var h = WritableHeaders.create(); - h.add(HeaderNames.CONTENT_TYPE, "application/grpc"); - headers = Http2Headers.create(h); - - // Initialize will succeed! - final var handler = new PbjProtocolHandler( - headers, streamWriter, streamId, flowControl, currentStreamState, config, route, deadlineDetector); - handler.init(); - assertThat(route.requestCounter().count()).isEqualTo(1); - - // This will call the service - final var data = createRequestData("Alice"); - sendAllData(handler, data); - - assertThat(service.calledMethod).isSameAs(ServiceInterfaceStub.METHOD); - assertThat(service.receivedBytes).contains(data); - assertThat(service.opts.contentType()).isEqualTo("application/grpc+proto"); - - assertThat(route.failedGrpcRequestCounter().count()).isZero(); - assertThat(route.failedHttpRequestCounter().count()).isZero(); - assertThat(route.failedUnknownRequestCounter().count()).isZero(); - assertThat(handler.streamState()).isEqualTo(Http2StreamState.HALF_CLOSED_REMOTE); - } - - /** - * These are perfectly valid encodings, but none of them are supported at this time. + * Tests the behavior of the {@link PbjProtocolHandler} when handling the headers of a gRPC request. * - * @param encoding the encoding to test with. - */ - @ValueSource(strings = {"gzip", "compress", "deflate", "br", "zstd", "gzip, deflate;q=0.5"}) - @ParameterizedTest - void unsupportedGrpcEncodings(String encoding) { - final var h = WritableHeaders.create(); - h.add(HeaderNames.CONTENT_TYPE, "application/grpc+proto"); - h.add(HeaderNames.create("grpc-encoding"), encoding); - headers = Http2Headers.create(h); - - // Initializing the handler will throw an error because the content types are unsupported - final var handler = new PbjProtocolHandler( - headers, streamWriter, streamId, flowControl, currentStreamState, config, route, deadlineDetector); - handler.init(); - - // Even though the request failed, it was made, and should be counted - assertThat(route.requestCounter().count()).isEqualTo(1); - // And since it failed the failed counter should be incremented - assertThat(route.failedGrpcRequestCounter().count()).isEqualTo(1); - assertThat(route.failedHttpRequestCounter().count()).isZero(); - assertThat(route.failedUnknownRequestCounter().count()).isZero(); - - // The HTTP2 response itself was successful, but the GRPC response was not - assertThat(streamWriter.writtenHeaders).hasSize(1); - assertThat(streamWriter.writtenDataFrames).isEmpty(); - final var responseHeaderFrame = streamWriter.writtenHeaders.getFirst(); - assertThat(responseHeaderFrame.status()).isEqualTo(Status.OK_200); - - // I verified with the go GRPC server its behavior in this scenario. The following headers should be - // available in the response - // Content-Type: application/grpc - // Grpc-Message: grpc: Decompressor is not installed for grpc-encoding "[bad encoding here]" - // Grpc-Status: 12 - final var responseHeaders = responseHeaderFrame.httpHeaders().stream() - .collect(Collectors.toMap(Header::name, Header::values)); - assertThat(responseHeaders).contains( - entry("grpc-status", "" + GrpcStatus.UNIMPLEMENTED.ordinal()), - entry("grpc-message", UriEncoding.encodeUri("Decompressor is not installed for grpc-encoding \"" + encoding + "\"")), - entry("Content-Type", "application/grpc"), - entry("grpc-accept-encoding", "identity")); - - // The stream should be closed - assertThat(handler.streamState()).isEqualTo(Http2StreamState.CLOSED); - } - - /** - * These are encodings we support. They all contain "identity". + *

NOTE: The pseudo-headers :method, :scheme, and :path are required, but they are not handled by the + * PbjProtocolHandler. They are handled by the PbjProtocolSelector. So we don't test for them here. + * + *

NOTE: The "TE" header, although specified in the gRPC over HTTP2 spec, is not required. The HTTP2 + * specification says: "the TE header field, which MAY be present in an HTTP/2 request [...] MUST NOT + * contain any value other than 'trailers'". So we don't need to worry about handling it, the HTTP/2 server + * implementation should. We'll just ignore it. + * + *

NOTE: Another pseudo-header, :authority, is optional. It doesn't matter to use. It could be used for logging, + * but we leave that to any reverse proxies, this server implementation doesn't care about the Authority header. + * + *

NOTE: The gRPC specification mentions the optional "grpc-message-type" header. This implementation does + * not use that field at all. It is ignored. * - *

See for information - * on the encoding header syntax. + *

NOTE: While the gRPC specification defines the "user-agent" header, we don't use it for anything, so we + * ignore it. * - * @param encoding + * @see gRPC over HTTP2 spec */ - @ValueSource(strings = { - // Simple identity strings with qualifiers - "identity", "identity;q=0.5", "identity;", "identity;nonsense", - // an identity with and without a qualifier in a list of encodings - "gzip, deflate;q=0.5, identity;q=0.1", - "gzip, deflate;q=0.5, identity", - "gzip, identity;q=0.1, deflate;q=0.5", - "gzip, identity, deflate;q=0.5", - "identity;q=.9, deflate;q=0.5, gzip;q=0.1, br;q=0.1", - "identity, deflate;q=0.5, gzip;q=0.1, br;q=0.1"}) - @ParameterizedTest - void supportedComplexEncodingsWithIdentity(String encoding) { - final var h = WritableHeaders.create(); - h.add(HeaderNames.CONTENT_TYPE, "application/grpc+proto"); - h.add(HeaderNames.create("grpc-encoding"), encoding); - headers = Http2Headers.create(h); - - // Initializing the handler will throw an error because the content types are unsupported - final var handler = new PbjProtocolHandler( - headers, streamWriter, streamId, flowControl, currentStreamState, config, route, deadlineDetector); - handler.init(); - - // Even though the request failed, it was made, and should be counted - assertThat(route.requestCounter().count()).isEqualTo(1); - // And since it failed the failed counter should be incremented - assertThat(route.failedGrpcRequestCounter().count()).isZero(); - assertThat(route.failedHttpRequestCounter().count()).isZero(); - assertThat(route.failedUnknownRequestCounter().count()).isZero(); - - final var data = createRequestData("Alice"); - sendAllData(handler, data); - - // The HTTP2 response itself was successful, and so was the gRPC response - assertThat(streamWriter.writtenHeaders).hasSize(1); - assertThat(streamWriter.writtenDataFrames).isEmpty(); - final var responseHeaderFrame = streamWriter.writtenHeaders.getFirst(); - assertThat(responseHeaderFrame.status()).isEqualTo(Status.OK_200); - - final var responseHeaders = responseHeaderFrame.httpHeaders().stream() - .collect(Collectors.toMap(Header::name, Header::values)); - assertThat(responseHeaders).contains( - entry("Content-Type", "application/grpc+proto"), - entry("grpc-accept-encoding", "identity")); - - // The stream should be closed - assertThat(handler.streamState()).isEqualTo(Http2StreamState.HALF_CLOSED_REMOTE); + @Nested + final class HeaderTests { + /** + * The "grpc-timeout" header is optional. If not specified, the timeout is considered to be infinite. If it is + * specified, then the server will sever the connection with the client if the timeout is exceeded. + */ + @Nested + final class GrpcTimeoutTests { + private static Stream provideGoodTimeouts() { + return Stream.of( + Arguments.of("2H", NANOSECONDS.convert(ofHours(2))), + Arguments.of("2M", NANOSECONDS.convert(ofMinutes(2))), + Arguments.of("2S", NANOSECONDS.convert(ofSeconds(2))), + Arguments.of("2m", NANOSECONDS.convert(ofMillis(2))), + Arguments.of("2u", NANOSECONDS.convert(ofNanos(2 * 1000))), + Arguments.of("2n", NANOSECONDS.convert(ofNanos(2))), + // "positive integer as ASCII string of at most 8 digits" + Arguments.of("12345678H", NANOSECONDS.convert(ofHours(12345678))), + Arguments.of("12345678M", NANOSECONDS.convert(ofMinutes(12345678))), + Arguments.of("12345678S", NANOSECONDS.convert(ofSeconds(12345678))), + Arguments.of("12345678m", NANOSECONDS.convert(ofMillis(12345678))), + Arguments.of("12345678u", NANOSECONDS.convert(ofNanos(1000 * 12345678L))), + Arguments.of("12345678n", NANOSECONDS.convert(ofNanos(12345678)))); + } + + @MethodSource("provideGoodTimeouts") + @ParameterizedTest + void validTimeouts(String timeout, long expectedNanos) { + // Given a valid grpc-timeout + final var conn = new ConnectionBuilder() + .withHeader("grpc-timeout", timeout) + .build(); + + // When we make a request + conn.request("Alice"); + + // The deadline that was set matches the grpc-timeout. This proves it was parsed correctly. + // And since we never advanced the clock in our fake deadline detector, the deadline should + // have not been exceeded. + assertThat(conn.deadlineDetector.futures).hasSize(1); + assertThat(conn.deadlineDetector.futures.getFirst().delay).isEqualTo(expectedNanos); + conn.assertSuccessfulUnaryResponse(); + } + + @ValueSource(strings = { + // Missing the timeout unit + "2", + // Unknown timeout unit + "2X", + // Random nonsense characters + "-", "_", "*", "!", "@", "#", "$", "%", "^", "&", "(", ")", "+", "=", "[", "]", "{", "}", ";", ":", + "'", "\"", ",", "<", ">", ".", "?", "/", "\\", "|", "`", "~", " ", "\t", "\n", "\r", + // Cannot be missing the timeout value + "H", "M", "S", "m", "u", "n", + "-H", "-M", "-S", "-m", "-u", "-n", + // Must only use digits for the timeout value + "abcH", "abcM", "abcS", "abcm", "abcu", "abcn", + "1bcH", "a2cM", "ab3S", + // Fractional timeout values + "2.5", "2.5X", "2.5H", "2.5M", "2.5S", "2.5m", "2.5u", "2.5n", + // Very large timeout values (cannot be more than 8 ascii digits) + "123456789", "123456789X", "123456789H", "123456789M", "123456789S", "123456789m", "123456789u", + "123456789n", + // Cannot have negative timeout values + "-2", "-2X", "-2H", "-2M", "-2S", "-2m", "-2u", "-2n", + // Or really large negative values + "-123456789", "-123456789X", "-123456789H", "-123456789M", "-123456789S", "-123456789m", + "-123456789u", "-123456789n", + // And whitespace shouldn't be allowed + " 2H", "2H ", "\r2H", "2H\r", "\n2H", "2H\n", "\t2H", "2H\t" + }) + @ParameterizedTest + void invalidTimeouts(String timeout) { + // Given an INVALID grpc-timeout + final var conn = new ConnectionBuilder() + .withHeader("grpc-timeout", timeout) + .build(); + + // When the connection is opened + conn.open(); + + // It will fail because the timeout is invalid + // Even though the request failed, it was made, and should be counted + assertThat(conn.route.requestCounter().count()).isEqualTo(1); + + // It passed HTTP/2, but failed during GRPC processing + assertThat(conn.route.failedGrpcRequestCounter().count()).isEqualTo(1); + assertThat(conn.route.failedHttpRequestCounter().count()).isZero(); + assertThat(conn.route.failedUnknownRequestCounter().count()).isZero(); + + // There is only a single response header because the stream is closed immediately upon initialization, + // and therefore no data is sent. + assertThat(conn.streamWriter.responseHeaderFrames).hasSize(1); + assertThat(conn.streamWriter.responseDataFrames).isEmpty(); + + // The go GRPC server returns HTTP status 400, and grpc-status of 13 (INTERNAL error)! But, according + // to the specification, the HTTP response code should always be 200 if the request was valid from an + // HTTP/2 perspective, with the gRPC related error in the grpc-status header: + // + // "Implementations should expect broken deployments to send non-200 HTTP status codes in responses as + // well as a variety of non-GRPC Content-Types". Emphasis on the word "broken"! + // + // In addition, the go GRPC server sends status code 13, INTERNAL error, as the grpc status! I think + // the correct code would be INVALID_ARGUMENT. + // + // So we will return a 200 OK response, with a grpc-status of 3 (INVALID_ARGUMENT). + conn.assertHttpStatusEquals(OK_200); + conn.assertGrpcStatusEquals(INVALID_ARGUMENT); + conn.assertResponseHeaderEquals("Content-Type", "application/grpc"); + + // The stream should be HALF_CLOSED_LOCAL because we sent END_STREAM, but not RST_STREAM + conn.assertStreamStateEquals(HALF_CLOSED_LOCAL); + } + + @Test + void timeoutExceededBeforeDataSent() { + // Given a VALID grpc-timeout + final var conn = new ConnectionBuilder() + .withHeader("grpc-timeout", "1n") + .build(); + + // When the connection is opened and then the deadline is exceeded before the request is made + conn.open(); + conn.deadlineDetector.advanceTime(ofNanos(2)); + + // The request will fail because the deadline was exceeded + assertThat(conn.route.requestCounter().count()).isEqualTo(1); + assertThat(conn.route.failedGrpcRequestCounter().count()).isZero(); + assertThat(conn.route.failedHttpRequestCounter().count()).isZero(); + assertThat(conn.route.failedUnknownRequestCounter().count()).isZero(); + assertThat(conn.route.deadlineExceededCounter().count()).isEqualTo(1); + + // The HTTP status is OK but the grpc status is DEADLINE_EXCEEDED + conn.assertHttpStatusEquals(OK_200); + conn.assertGrpcStatusEquals(DEADLINE_EXCEEDED); + conn.assertResponseHeaderEquals("Content-Type", "application/grpc+proto"); + conn.assertResponseHeaderEquals("grpc-encoding", "identity"); + conn.assertResponseHeaderEquals("grpc-accept-encoding", "identity"); + + // The stream should be HALF_CLOSED_LOCAL because the client didn't terminate the stream, the server did + conn.assertStreamStateEquals(HALF_CLOSED_LOCAL); + } + + @Test + void timeoutExceededAfterResponseSent() { + // Given a VALID grpc-timeout + final var conn = new ConnectionBuilder() + .withHeader("grpc-timeout", "1S") + .build(); + + // When a request is made and completes BEFORE the expiration + conn.request("Alice"); + conn.deadlineDetector.advanceTime(ofSeconds(2)); + + // Then the request will succeed (it is too late to fail!!) + conn.assertSuccessfulUnaryResponse(); + } + + @Test + void timeoutExceededAfterSomeDataReceivedButNotAll() { + // Given a VALID grpc-timeout + final var conn = new ConnectionBuilder() + .withHeader("grpc-timeout", "1n") + .build(); + + // When the connection is opened and then the deadline is exceeded before all data is received + final var proto = createRequestData("Alice"); + conn.open(); + conn.sendIncompleteData(proto.slice(0, 3), (int) proto.length()); + conn.deadlineDetector.advanceTime(ofNanos(2)); + + // The request will fail because the deadline was exceeded + assertThat(conn.route.requestCounter().count()).isEqualTo(1); + assertThat(conn.route.failedGrpcRequestCounter().count()).isZero(); + assertThat(conn.route.failedHttpRequestCounter().count()).isZero(); + assertThat(conn.route.failedUnknownRequestCounter().count()).isZero(); + assertThat(conn.route.deadlineExceededCounter().count()).isEqualTo(1); + + // The HTTP status is OK but the grpc status is DEADLINE_EXCEEDED + conn.assertHttpStatusEquals(OK_200); + conn.assertGrpcStatusEquals(DEADLINE_EXCEEDED); + conn.assertResponseHeaderEquals("Content-Type", "application/grpc+proto"); + conn.assertResponseHeaderEquals("grpc-encoding", "identity"); + conn.assertResponseHeaderEquals("grpc-accept-encoding", "identity"); + + // The stream should be HALF_CLOSED_LOCAL because the client didn't terminate the stream, the server did + conn.assertStreamStateEquals(HALF_CLOSED_LOCAL); + } + + @Test + void timeoutExceededWhenReceivingStreamFromServer() throws InvalidProtocolBufferException { + // Given a VALID grpc-timeout and a server-streaming request + final AtomicReference clockAdvancer = new AtomicReference<>(); + final var svc = new ServiceInterfaceStub() { + @Override + public void sayHelloStreamReply(HelloRequest request, Pipeline replies) { + // Send three responses, and then wait for the timeout to expire + for (int i = 1; i < 4; i++) { + replies.onNext(HelloReply.newBuilder().setMessage("Hello!").build()); + } + + // Advance the clock to force the timeout to happen BEFORE this method terminated. + clockAdvancer.get().run(); + } + }; + + final var conn = new ConnectionBuilder() + .withHeader("grpc-timeout", "1n") + .withService(svc) + .withServiceMethod(GreeterService.GreeterMethod.sayHelloStreamReply) + .build(); + + clockAdvancer.set(() -> { + // The server was created to send responses for the first three but not the fourth. So now we can + // advance the time beyond what the deadline detector will allow, so it should terminate. + conn.deadlineDetector.advanceTime(ofNanos(2)); + }); + + // When the request is made and the deadline is exceeded before we've received all responses + conn.open(); + conn.sendAllData(createRequestData("Alice")); + + // We should see three responses, but not the fourth. + assertThat(conn.streamWriter.responseDataFrames).hasSize(3); + for (int i = 0; i < 3; i++) { + final var frameBytes = Bytes.wrap(conn.streamWriter.responseDataFrames.get(i).data().readBytes()); + final var len = frameBytes.length() - 5; // 5 bytes for the "compress" and length prefix + assertThat(frameBytes.getByte(0)).isZero(); // The compression flag + assertThat(frameBytes.getInt(1)).isEqualTo(len); // The length of the message + final var proto = frameBytes.slice(5, len); + final var response = HelloReply.parseFrom(proto.toByteArray()); + assertThat(response.getMessage()).isEqualTo("Hello!"); + } + + // Then the request will fail because the deadline was exceeded + assertThat(conn.route.requestCounter().count()).isEqualTo(1); + assertThat(conn.route.failedGrpcRequestCounter().count()).isZero(); + assertThat(conn.route.failedHttpRequestCounter().count()).isZero(); + assertThat(conn.route.failedUnknownRequestCounter().count()).isZero(); + assertThat(conn.route.deadlineExceededCounter().count()).isEqualTo(1); + + // The HTTP status is OK but the grpc status is DEADLINE_EXCEEDED + conn.assertHttpStatusEquals(OK_200); + conn.assertGrpcStatusEquals(DEADLINE_EXCEEDED); + conn.assertResponseHeaderEquals("Content-Type", "application/grpc+proto"); + conn.assertResponseHeaderEquals("grpc-encoding", "identity"); + conn.assertResponseHeaderEquals("grpc-accept-encoding", "identity"); + + // The stream should be CLOSED because the client sent END_STREAM before the timeout happened + conn.assertStreamStateEquals(CLOSED); + } + + @Test + void timeoutExceededWithBidiStreaming() throws InvalidProtocolBufferException { + // Given a VALID grpc-timeout and a bidi-streaming request + final var conn = new ConnectionBuilder() + .withHeader("grpc-timeout", "1n") + .withServiceMethod(GreeterService.GreeterMethod.sayHelloStreamBidi) + .build(); + + // When the request is made and some requests have been made, but we are not yet done, + // and the deadline is exceeded + conn.open(); + conn.sendBytes(createRequestData("Alice")); + conn.sendBytes(createRequestData("Bob")); + conn.sendBytes(createRequestData("Carol")); + conn.deadlineDetector.advanceTime(ofNanos(2)); + conn.sendBytes(createRequestData("Dave")); // This should NEVER BE HANDLED + + // We should see three responses, but not the fourth. + assertThat(conn.streamWriter.responseDataFrames).hasSize(3); + for (int i = 0; i < 3; i++) { + final var frameBytes = Bytes.wrap(conn.streamWriter.responseDataFrames.get(i).data().readBytes()); + final var len = frameBytes.length() - 5; // 5 bytes for the "compress" and length prefix + assertThat(frameBytes.getByte(0)).isZero(); // The compression flag + assertThat(frameBytes.getInt(1)).isEqualTo(len); // The length of the message + final var proto = frameBytes.slice(5, len); + final var response = HelloReply.parseFrom(proto.toByteArray()); + assertThat(response.getMessage()).startsWith("Hello"); + } + + // Then the request will fail because the deadline was exceeded + assertThat(conn.route.requestCounter().count()).isEqualTo(1); + assertThat(conn.route.failedGrpcRequestCounter().count()).isZero(); + assertThat(conn.route.failedHttpRequestCounter().count()).isZero(); + assertThat(conn.route.failedUnknownRequestCounter().count()).isZero(); + assertThat(conn.route.deadlineExceededCounter().count()).isEqualTo(1); + + // The HTTP status is OK but the grpc status is DEADLINE_EXCEEDED + conn.assertHttpStatusEquals(OK_200); + conn.assertGrpcStatusEquals(DEADLINE_EXCEEDED); + conn.assertResponseHeaderEquals("Content-Type", "application/grpc+proto"); + conn.assertResponseHeaderEquals("grpc-encoding", "identity"); + conn.assertResponseHeaderEquals("grpc-accept-encoding", "identity"); + + // The stream should be HALF_CLOSED_LOCAL because the client never sent END_STREAM before the timeout + // happened, so the server was the one that closed the connection. + conn.assertStreamStateEquals(HALF_CLOSED_LOCAL); + } + } + + @Nested + final class ContentTypeTests { + /** + * If the Content-Type is missing, or does not start with "application/grpc", the server should + * respond with a 415 Unsupported Media Type and the stream state should end up CLOSED. See + * + */ + @ValueSource(strings = {"", "text/plain", "application/json"}) + @ParameterizedTest + void unsupportedContentType(String contentType) { + // Given a handler with a content type that is not supported + final var b = new ConnectionBuilder().withoutHeader("Content-Type"); + if (!contentType.isBlank()) b.withContentType(contentType); + final var conn = b.build(); + + // When the handler is initialized it will fail. + conn.open(); + + // Even though the request failed, it was made, and should be counted + assertThat(conn.route.requestCounter().count()).isEqualTo(1); + + // It failed at the HTTP/2 level, it didn't even get to the GRPC level! + assertThat(conn.route.failedGrpcRequestCounter().count()).isZero(); + assertThat(conn.route.failedHttpRequestCounter().count()).isEqualTo(1); + assertThat(conn.route.failedUnknownRequestCounter().count()).isZero(); + + // There is only a single response header because the stream is closed immediately upon initialization, + // and therefore no data is sent. + assertThat(conn.streamWriter.responseHeaderFrames).hasSize(1); + assertThat(conn.streamWriter.responseDataFrames).isEmpty(); + + // I verified with the go GRPC server its behavior in this scenario. The following headers should be + // available in the response + // Content-Type: application/grpc + // Grpc-Message: invalid gRPC request Content-Type "" + // Grpc-Status: 3 + final var responseHeaderFrame = conn.streamWriter.responseHeaderFrames.getFirst(); + assertThat(responseHeaderFrame.status()).isEqualTo(Status.UNSUPPORTED_MEDIA_TYPE_415); + final var responseHeaders = responseHeaderFrame.httpHeaders().stream() + .collect(Collectors.toMap(Header::name, Header::values)); + assertThat(responseHeaders).contains( + entry("grpc-status", "" + INVALID_ARGUMENT.ordinal()), + entry("grpc-message", UriEncoding.encodeUri("invalid gRPC request content-type \"" + contentType + "\"")), + entry("Content-Type", "application/grpc"), + entry("grpc-accept-encoding", "identity")); + + // The stream should be HALF_CLOSED_LOCAL because we sent END_STREAM, but not RST_STREAM + assertThat(conn.serverStreamState()).isEqualTo(HALF_CLOSED_LOCAL); + } + + /** + * If the content type is "application/grpc", then it is treated as "application/grpc+proto". + */ + @Test + void contentTypeIsNormalized() { + // Given a request with a default content type + final var conn = new ConnectionBuilder() + .withContentType("application/grpc") + .build(); + + // When the request is made + conn.open(); + final var data = createRequestData("Alice"); + conn.sendAllData(data); + + // Then the request will succeed and the "opts" passed to the service specifies proto + conn.assertSuccessfulUnaryResponse(); + assertThat(conn.service.opts.contentType()).isEqualTo("application/grpc+proto"); + } + } + + @Nested + final class GrpcEncodingTests { + /** + * These are perfectly valid encodings, but none of them are supported at this time. + * + * @param encoding the encoding to test with. + */ + @ValueSource(strings = {"gzip", "compress", "deflate", "br", "zstd", "gzip, deflate;q=0.5"}) + @ParameterizedTest + void unsupportedGrpcEncodings(String encoding) { + final var conn = new ConnectionBuilder() + .withHeader("grpc-encoding", encoding) + .build(); + + // Initializing the handler will throw an error because the content types are unsupported + conn.open(); + + // Even though the request failed, it was made, and should be counted + assertThat(conn.route.requestCounter().count()).isEqualTo(1); + // And since it failed the failed counter should be incremented + assertThat(conn.route.failedGrpcRequestCounter().count()).isEqualTo(1); + assertThat(conn.route.failedHttpRequestCounter().count()).isZero(); + assertThat(conn.route.failedUnknownRequestCounter().count()).isZero(); + + // The HTTP2 response itself was successful, but the GRPC response was not + assertThat(conn.streamWriter.responseHeaderFrames).hasSize(1); + assertThat(conn.streamWriter.responseDataFrames).isEmpty(); + final var responseHeaderFrame = conn.streamWriter.responseHeaderFrames.getFirst(); + assertThat(responseHeaderFrame.status()).isEqualTo(OK_200); + + // I verified with the go GRPC server its behavior in this scenario. The following headers should be + // available in the response + // Content-Type: application/grpc + // Grpc-Message: grpc: Decompressor is not installed for grpc-encoding "[bad encoding here]" + // Grpc-Status: 12 + final var responseHeaders = responseHeaderFrame.httpHeaders().stream() + .collect(Collectors.toMap(Header::name, Header::values)); + assertThat(responseHeaders).contains( + entry("grpc-status", "" + GrpcStatus.UNIMPLEMENTED.ordinal()), + entry("grpc-message", UriEncoding.encodeUri("Decompressor is not installed for grpc-encoding \"" + encoding + "\"")), + entry("Content-Type", "application/grpc"), + entry("grpc-accept-encoding", "identity")); + + // The stream should be HALF_CLOSED_LOCAL because we sent END_STREAM, but not RST_STREAM + assertThat(conn.serverStreamState()).isEqualTo(HALF_CLOSED_LOCAL); + } + + /** + * These are encodings we support. They all contain "identity". + * + *

See for information + * on the encoding header syntax. + * + * @param encoding + */ + @ValueSource(strings = { + // Simple identity strings with qualifiers + "identity", "identity;q=0.5", "identity;", "identity;nonsense", + // an identity with and without a qualifier in a list of encodings + "gzip, deflate;q=0.5, identity;q=0.1", + "gzip, deflate;q=0.5, identity", + "gzip, identity;q=0.1, deflate;q=0.5", + "gzip, identity, deflate;q=0.5", + "identity;q=.9, deflate;q=0.5, gzip;q=0.1, br;q=0.1", + "identity, deflate;q=0.5, gzip;q=0.1, br;q=0.1"}) + @ParameterizedTest + void supportedComplexEncodingsWithIdentity(String encoding) { + // Given a valid encoding + final var conn = new ConnectionBuilder() + .withHeader("grpc-encoding", encoding) + .build(); + + // When the request is mode + conn.open(); + conn.sendAllData(createRequestData("Alice")); + + // Then the request succeeds + conn.assertSuccessfulUnaryResponse(); + } + } + + /** + * This is an optional header. If specified, it MUST contain one of "application/grpc", + * "application/grpc+proto", or "application/grpc+json". If it does not, the server should respond with a + * "415 Unsupported Media Type". However, the Go GRPC server seems to totally ignore invalid values for this + * header, or might ignore it entirely. The gRPC specification doesn't describe the correct behavior for this + * header. + */ + @Nested + final class GrpcAcceptEncodingTests { + + } + + /** + * gRPC supports custom headers, referred to as "Custom-Metadata". The header, if suffixed with "-bin", is + * considered to be binary data and is base64 encoded. Otherwise, it is a normal textual data header. + * All such headers should be passed to the application handling code so it can handle those fields as + * needed (for example, the "authentication" header can be passed this way). + */ + @Nested + final class MetadataTests { + + } } - @Test - void errorThrownForOnNextWhenStreamIsClosed() { - // Use a custom streamWriter that will throw an exception when "streamClosed" is set to true, and it is - // asked to write something. This can be used to simulate what happens when the network connection fails. - final var streamClosed = new AtomicBoolean(false); - streamWriter = new StreamWriterStub() { - @Override - public void writeData(Http2FrameData frame, FlowControl.Outbound flowControl) { - if (streamClosed.get()) { - throw new IllegalStateException("Stream is closed"); + @Nested + final class Http2Tests { + /** + * When sending multiple messages to the server, the HTTP/2 client may break the stream of messages over + * multiple data frames. Each data frame will have some of the bytes. Multiple messages are sent such + * that each message is a length-delimited set of Length-Prefixed-Message, where each has a "compressed" + * byte followed by a 4-byte length prefix, followed by the message bytes. + * + *

Now, it may be that the prefix of the message (those 5 bytes) end up on a data frame boundary, so that + * some bytes are in frame N and the rest in frame N+1. It is critical that when the + * {@link PbjProtocolHandler} handles a frame, if it has some subset of those 5 bytes, that it waits for the + * next frame before it proceeds with accumulating the remaining bytes of the message. + * + *

This test simulates this scenario. + */ + @Nested + final class FramingTests { + private static Stream provideSplitFrames() { + // I'm going to construct a single byte array that is the concatenation of three protobuf messages. + // I'm then going to run a series of tests, where each test sends a subset of the bytes to the server. + // Initially each subset will be of length 1, then of length 2, then of length 3, and so on until + // eventually we send the entire three protobuf messages to the server in a single frame. + // + // For each iteration, after all frames are sent, there should be valid responses that indicate all + // three protobuf messages were received. + final var args = new ArrayList(); + final var allBytes = + createLengthPrefixedMessage(createRequestData("Alice")).append( + createLengthPrefixedMessage(createRequestData("Bob")).append( + createLengthPrefixedMessage(createRequestData("Carol")))); + + for (int i = 0; i < allBytes.length(); i++) { + final var frames = new ArrayList(); + final var frameBytes = new ArrayList(); + final int numBytesPerFrame = i + 1; + for (int j = 0; j < allBytes.length(); j += numBytesPerFrame) { + final var len = Math.min(numBytesPerFrame, allBytes.length() - j); + final var bytes = allBytes.slice(j, len); + frames.add(createDataFrameHeader((int) bytes.length(), (j + len) == allBytes.length(), 1)); + frameBytes.add(bytes); + } + args.add(Arguments.of(frames, frameBytes)); } + + return args.stream(); } - }; - - // Within this test, the replyRef will be set once when the setup is complete, and then - // will be available for the test code to use to call onNext, onError, etc. as required. - final var replyRef = new AtomicReference>(); - route = new PbjMethodRoute(new GreeterServiceImpl() { - @Override - public void sayHelloStreamReply(HelloRequest request, Pipeline replies) { - replyRef.set(replies); + + @MethodSource("provideSplitFrames") + @ParameterizedTest + void protobufIsSpreadAcrossMultipleFrames(List frameHeaders, List frameBytes) throws InvalidProtocolBufferException { + final var conn = new ConnectionBuilder() + .withServiceMethod(GreeterService.GreeterMethod.sayHelloStreamBidi) + .build(); + + conn.open(); + + for (int i = 0; i < frameHeaders.size(); i++) { + conn.handler.data(frameHeaders.get(i), BufferData.create(frameBytes.get(i).toByteArray())); + } + + // Now we can assert that we have successfully handled 3 different protobuf messages + // by checking the responses + assertThat(conn.streamWriter.responseDataFrames).hasSize(3); + for (final var frame : conn.streamWriter.responseDataFrames) { + final var bytes = Bytes.wrap(frame.data().readBytes()); + final var len = bytes.length() - 5; // 5 bytes for the "compress" and length prefix + assertThat(bytes.getByte(0)).isZero(); // The compression flag + assertThat(bytes.getInt(1)).isEqualTo(len); // The length of the message + final var proto = bytes.slice(5, len); + final var response = HelloReply.parseFrom(proto.toByteArray()); + assertThat(response.getMessage()).startsWith("Hello"); + } + + // Then the request will fail because the deadline was exceeded + assertThat(conn.route.requestCounter().count()).isEqualTo(1); + assertThat(conn.route.failedGrpcRequestCounter().count()).isZero(); + assertThat(conn.route.failedHttpRequestCounter().count()).isZero(); + assertThat(conn.route.failedUnknownRequestCounter().count()).isZero(); + assertThat(conn.route.deadlineExceededCounter().count()).isZero(); + + // The HTTP status is OK but the grpc status is DEADLINE_EXCEEDED + conn.assertHttpStatusEquals(OK_200); + conn.assertGrpcStatusEquals(OK); + conn.assertResponseHeaderEquals("Content-Type", "application/grpc+proto"); + conn.assertResponseHeaderEquals("grpc-encoding", "identity"); + conn.assertResponseHeaderEquals("grpc-accept-encoding", "identity"); + + // The stream should be CLOSED because the client sent END_OF_STREAM before the server closed + conn.assertStreamStateEquals(CLOSED); } - }, GreeterService.GreeterMethod.sayHelloStreamReply); + } + + /** + * The {@link PbjProtocolHandler} is responsible for advancing the stream state correctly. Periodically + * the Helidon code will look up the state and use it for interacting with the client. So we need to make + * sure it is updated correctly as per + * the spec. + */ + @Nested + final class StateTests { + // Test that for a unary method invocation, the server ends in CLOSED state. The client must have received + // an END_STREAM flag on the response. + + // Test that for a client-streaming method invocation, when the client sends an END_STREAM flag, the server + // will transition to HALF_CLOSED_REMOTE, then it will respond with data, and then send an END_STREAM + // and transition to CLOSED. - final var handler = new PbjProtocolHandler(headers, streamWriter, streamId, flowControl, currentStreamState, config, route, deadlineDetector); - handler.init(); - sendAllData(handler, createRequestData("Alice")); + // Test that for a server-streaming method invocation, the server will eventually send END_STREAM when it + // is done sending data, and that it then ends up in CLOSED. - final var replies = replyRef.get(); - assertThat(replies).isNotNull(); + // Test that for a bidi-streaming method invocation, the server will transition to HALF_CLOSED_REMOTE if + // it receives an END_STREAM flag from the client, and it will then notify the application so it can + // finish streaming to the client, and then send END_STREAM. - replies.onNext(HelloReply.newBuilder().setMessage("Good").build()); - streamClosed.set(true); + // Test that for a bidi-streaming method invocation, the server will transaction to HALF_CLOSED_LOCAL if + // it is done and send END_STREAM, and then when the client finally sends an END_STREAM, the server will + // transition to CLOSED. - final var failingReply = HelloReply.newBuilder().setMessage("Bad").build(); - assertThatThrownBy(() -> replies.onNext(failingReply)) - .isInstanceOf(Exception.class); + // Test that for a unary method, if the client sends an RST_STREAM before all data has been received, then + // the server will transition to CLOSED and NOT respond. - assertThat(route.requestCounter().count()).isEqualTo(1); - assertThat(route.failedGrpcRequestCounter().count()).isEqualTo(0); - assertThat(route.failedHttpRequestCounter().count()).isEqualTo(0); - assertThat(route.failedUnknownRequestCounter().count()).isEqualTo(0); - assertThat(route.failedResponseCounter().count()).isEqualTo(1); + // Test that for a client-streaming method, if the client sends RST_STREAM then the server will transition + // to CLOSED and not respond. + + // Test that for server-streaming, if the client sends RST_STREAM before all data was originally sent, then + // the connection is CLOSED and there is no response. + + // Test that for bidi-streaming, if the client sends RST_STREAM then the server closes and terminates the + // connection and ends up in CLOSED state. + + // Test that for bidi-streaming, if the server sends RST_STREAM, then it also stops receiving data from + // the client (this should be handled by Helidon). + } } - private Bytes createRequestData(String name) { + @Nested + final class StreamClosedTests { + @Test + void errorThrownForOnNextWhenStreamIsClosed() { + // Use a custom streamWriter that will throw an exception when "streamClosed" is set to true, and it is + // asked to write something. This can be used to simulate what happens when the network connection fails. + final var streamClosed = new AtomicBoolean(false); + final var streamWriter = new StreamWriterStub() { + @Override + public void writeData(Http2FrameData frame, FlowControl.Outbound flowControl) { + if (streamClosed.get()) { + throw new IllegalStateException("Stream is closed"); + } + } + }; + + // Within this test, the replyRef will be set once when the setup is complete, and then + // will be available for the test code to use to call onNext, onError, etc. as required. + final var replyRef = new AtomicReference>(); + final var service = new ServiceInterfaceStub() { + @Override + public void sayHelloStreamReply(HelloRequest request, Pipeline replies) { + replyRef.set(replies); + } + }; + + final var conn = new ConnectionBuilder() + .withStreamWriter(streamWriter) + .withService(service) + .withServiceMethod(GreeterService.GreeterMethod.sayHelloStreamReply) + .build(); + + + conn.open(); + conn.sendBytes(createRequestData("Alice")); + + final var replies = replyRef.get(); + assertThat(replies).isNotNull(); + + replies.onNext(HelloReply.newBuilder().setMessage("Good").build()); + streamClosed.set(true); + + final var failingReply = HelloReply.newBuilder().setMessage("Bad").build(); + assertThatThrownBy(() -> replies.onNext(failingReply)) + .isInstanceOf(Exception.class); + + assertThat(conn.route.requestCounter().count()).isEqualTo(1); + assertThat(conn.route.failedGrpcRequestCounter().count()).isZero(); + assertThat(conn.route.failedHttpRequestCounter().count()).isZero(); + assertThat(conn.route.failedUnknownRequestCounter().count()).isZero(); + assertThat(conn.route.failedResponseCounter().count()).isEqualTo(1); + } + } + + private static Bytes createRequestData(String name) { return createData(HelloRequest.newBuilder().setName(name).build()); } - private Bytes createData(HelloRequest request) { + private static Bytes createData(HelloRequest request) { return Bytes.wrap(request.toByteArray()); } - private void sendAllData(PbjProtocolHandler handler, Bytes bytes) { - final var frameHeader = createDataFrameHeader((int) bytes.length()); - final var buf = createDataFrameBytes(bytes); - handler.data(frameHeader, buf); + private static Bytes createLengthPrefixedMessage(Bytes data) { + return createLengthPrefixedMessage(data, (int) data.length()); + } + + private static Bytes createLengthPrefixedMessage(Bytes data, int protoLen) { + final var buf = BufferedData.allocate((int) data.length() + 5); + buf.writeByte((byte) 0); + buf.writeInt(protoLen); + buf.writeBytes(data); + return buf.getBytes(0, buf.length()); + } + + private static Http2FrameHeader createDataFrameHeader(int length, int streamId) { + return createDataFrameHeader(length, true, streamId); + } + + private static Http2FrameHeader createDataFrameHeader(int length, boolean eos, int streamId) { + return Http2FrameHeader.create(length + 5, Http2FrameTypes.DATA, eos ? Http2Flag.DataFlags.create(Http2Flags.END_STREAM) : Http2Flag.DataFlags.create(0), streamId); } - private BufferData createDataFrameBytes(Bytes data) { - try { - final var buf = new ByteArrayOutputStream((int) data.length() + 5); - final var s = new DataOutputStream(buf); - s.writeByte(0); - s.writeInt((int) data.length()); - data.writeTo(s); - return BufferData.create(buf.toByteArray()); - } catch (Exception e) { - throw new RuntimeException(e); + private static final class Connection { + private final StreamWriterStub streamWriter; + private final int streamId; + private final OutboundFlowControlStub flowControl; + private final Http2StreamState currentStreamState; + private final PbjConfigStub config; + private final PbjMethodRoute route; + private final DeadlineDetectorStub deadlineDetector; + private final ServiceInterfaceStub service; + private final PbjProtocolHandler handler; + private Bytes sentBytes = Bytes.EMPTY; + + public Connection(Http2Headers headers, StreamWriterStub streamWriter, int streamId, OutboundFlowControlStub flowControl, Http2StreamState currentStreamState, PbjConfigStub config, PbjMethodRoute route, DeadlineDetectorStub deadlineDetector, ServiceInterfaceStub service) { + this.streamWriter = streamWriter; + this.streamId = streamId; + this.flowControl = flowControl; + this.currentStreamState = currentStreamState; + this.config = config; + this.route = route; + this.deadlineDetector = deadlineDetector; + this.service = service; + + this.handler = new PbjProtocolHandler( + headers, + streamWriter, + streamId, + flowControl, + currentStreamState, + config, + route, + deadlineDetector); + } + + public void open() { + handler.init(); + } + + public HelloReply request(String name) { + open(); + sendAllData(createRequestData(name)); + + var responseBytes = Bytes.EMPTY; + for (final var data : service.receivedBytes) { + responseBytes = responseBytes.append(data); + } + + try { + return HelloReply.parseFrom(responseBytes.toByteArray()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + + public void sendIncompleteData(Bytes bytes, int protoLen) { + sentBytes = sentBytes.append(bytes); + final var frameHeader = createDataFrameHeader((int) bytes.length(), false, 1); + final var buf = createLengthPrefixedMessage(bytes, protoLen); + handler.data(frameHeader, BufferData.create(buf.toByteArray())); + } + + public void sendBytes(Bytes bytes) { + sentBytes = sentBytes.append(bytes); + final var frameHeader = createDataFrameHeader((int) bytes.length(), false, 1); + final var buf = createLengthPrefixedMessage(bytes, (int) bytes.length()); + handler.data(frameHeader, BufferData.create(buf.toByteArray())); + } + + public void terminateForcefully() { + + } + + public void close() { + + } + + public Http2StreamState serverStreamState() { + return handler.streamState(); + } + + private void sendAllData(Bytes bytes) { + sentBytes = bytes; + final var protoLen = (int) bytes.length(); + final var frameHeader = createDataFrameHeader(protoLen, 1); + final var buf = createLengthPrefixedMessage(bytes, protoLen); + handler.data(frameHeader, BufferData.create(buf.toByteArray())); + } + + public void assertSuccessfulUnaryResponse() { + // Request was made + assertThat(route.requestCounter().count()).isEqualTo(1); + + // It succeeded! + assertThat(route.failedGrpcRequestCounter().count()).isZero(); + assertThat(route.failedHttpRequestCounter().count()).isZero(); + assertThat(route.failedUnknownRequestCounter().count()).isZero(); + assertThat(route.deadlineExceededCounter().count()).isZero(); + + // The method was called + assertThat(service.calledMethod).isSameAs(route.method()); + assertThat(service.receivedBytes).contains(sentBytes); + + // There are two response headers -- the one at the start of the request, and the final trailer + assertThat(streamWriter.responseHeaderFrames).hasSize(2); + assertThat(streamWriter.responseDataFrames).hasSize(1); // Not necessarily true + + // The first response header gives the HTTP/2 OK status, the second one gives the gRPC OK status code + assertHttpStatusEquals(OK_200); + assertGrpcStatusEquals(GrpcStatus.OK); + assertResponseHeaderEquals("Content-Type", "application/grpc+proto"); + assertResponseHeaderEquals("grpc-encoding", "identity"); + assertResponseHeaderEquals("grpc-accept-encoding", "identity"); + + // The stream should be CLOSED because the client sent an END_STREAM which transitioned us to + // HALF_CLOSE_REMOTE after accepting all the bytes, and then we needed to transition to CLOSED + // when we were done with the response. + assertStreamStateEquals(CLOSED); + } + + void assertHttpStatusEquals(Status expectedStatus) { + assertThat(streamWriter.responseHeaderFrames).hasSizeGreaterThanOrEqualTo(1); + final var responseHeaderFrame = streamWriter.responseHeaderFrames.getFirst(); + assertThat(responseHeaderFrame.status()).isEqualTo(expectedStatus); + } + + void assertGrpcStatusEquals(GrpcStatus expectedStatus) { + assertThat(streamWriter.responseHeaderFrames).hasSizeGreaterThanOrEqualTo(1); + final var responseHeaderFrame = streamWriter.responseHeaderFrames.getLast(); + final var responseHeaders = responseHeaderFrame.httpHeaders().stream() + .collect(Collectors.toMap(Header::name, Header::values)); + assertThat(responseHeaders).contains( + entry("grpc-status", "" + expectedStatus.ordinal())); + } + + void assertResponseHeaderEquals(String headerName, String expectedValue) { + assertThat(streamWriter.responseHeaderFrames).hasSizeGreaterThanOrEqualTo(1); + for (final var responseHeaderFrame : streamWriter.responseHeaderFrames) { + final var responseHeaders = responseHeaderFrame.httpHeaders().stream() + .collect(Collectors.toMap(Header::name, Header::values)); + if (responseHeaders.containsKey(headerName)) { + assertThat(responseHeaders).contains( + entry(headerName, expectedValue)); + return; + } + } + fail("No response header with name " + headerName + " found"); + } + + void assertStreamStateEquals(Http2StreamState expectedState) { + assertThat(serverStreamState()).isEqualTo(expectedState); + } + + public int receivedBytesLength() { + final var headerBytes = streamWriter.responseHeaderFrames.size() * StreamWriterStub.FAKE_ENCODED_HEADER_SIZE; + final var dataBytes = streamWriter.responseDataFrames.stream().map(Http2FrameData::data) + .map(buf -> Bytes.wrap(buf.readBytes())) + .reduce(Bytes::append) + .orElse(Bytes.EMPTY); + return (int) dataBytes.length() + headerBytes; } } - private Http2FrameHeader createDataFrameHeader(int length) { - return Http2FrameHeader.create(length + 5, Http2FrameTypes.DATA, Http2Flag.DataFlags.create(Http2Flags.END_STREAM), streamId); + /** + * A convenient builder for {@link PbjProtocolHandler} for testing purposes. By default, a fully valid call + * is assembled for the call to the handler. You can override different headers, etc. to see how the handler + * behaves. + */ + private static final class ConnectionBuilder { + private WritableHeaders headers; + private ServiceInterface.Method method = GreeterService.GreeterMethod.sayHello; + private ServiceInterfaceStub service = new ServiceInterfaceStub(); + private StreamWriterStub streamWriter = new StreamWriterStub(); + + private ConnectionBuilder() { + // :method: + // :path: + // :authority: + // :scheme: + // Content-Type + // user-agent + // te + headers = WritableHeaders.create() + .set(HeaderNames.CONTENT_TYPE, "application/grpc+proto") + .set(HeaderNames.create("user-agent"), "java-test/1.0") + .set(HeaderNames.create("te"), "trailers"); + } + + ConnectionBuilder withHeader(String name, String value) { + headers.set(HeaderNames.create(name), value); + return this; + } + + ConnectionBuilder withUserAgent(String userAgent) { + return withHeader("user-agent", userAgent); + } + + ConnectionBuilder withContentType(String contentType) { + headers.set(HeaderNames.CONTENT_TYPE, contentType); + return this; + } + + ConnectionBuilder withoutHeader(String name) { + headers.remove(HeaderNames.create(name)); + return this; + } + + ConnectionBuilder withService(ServiceInterfaceStub service) { + this.service = service; + return this; + } + + ConnectionBuilder withServiceMethod(ServiceInterface.Method method) { + this.method = method; + return this; + } + + Connection build() { + return new Connection( + Http2Headers.create(headers), + streamWriter, + 1, + new OutboundFlowControlStub(), + Http2StreamState.OPEN, + new PbjConfigStub(), + new PbjMethodRoute(service, method), + new DeadlineDetectorStub(), + service); + } + + public ConnectionBuilder withStreamWriter(StreamWriterStub streamWriter) { + this.streamWriter = streamWriter; + return this; + } } private static final class OutboundFlowControlStub implements FlowControl.Outbound { + public static final int INITIAL_WINDOW_SIZE = 1000; + private int windowSize = INITIAL_WINDOW_SIZE; @Override public long incrementStreamWindowSize(int increment) { - return 0; + return windowSize + increment; } @Override public Http2FrameData[] cut(Http2FrameData frame) { - return new Http2FrameData[0]; + throw new UnsupportedOperationException(); } @Override public void blockTillUpdate() { - + throw new UnsupportedOperationException(); } @Override public int maxFrameSize() { - return 0; + return Integer.MAX_VALUE; } @Override public void decrementWindowSize(int decrement) { - + windowSize -= decrement; } @Override public void resetStreamWindowSize(int size) { - + windowSize = size; } @Override public int getRemainingWindowSize() { - return 0; + return windowSize; } } private static class StreamWriterStub implements Http2StreamWriter { - private final List writtenDataFrames = new ArrayList<>(); - private final List writtenHeaders = new ArrayList<>(); + public static final int FAKE_ENCODED_HEADER_SIZE = 100; + private final List responseDataFrames = new ArrayList<>(); + private final List responseHeaderFrames = new ArrayList<>(); @Override public void write(Http2FrameData frame) { - writtenDataFrames.add(frame); + responseDataFrames.add(frame); } @Override public void writeData(Http2FrameData frame, FlowControl.Outbound flowControl) { - writtenDataFrames.add(frame); + responseDataFrames.add(frame); + flowControl.decrementWindowSize(frame.data().available()); } @Override public int writeHeaders(Http2Headers headers, int streamId, Http2Flag.HeaderFlags flags, FlowControl.Outbound flowControl) { - writtenHeaders.add(headers); - return 0; + // OK, I need to update the flow control. But I don't know how big the encoded headers will be on the wire. + // I can try to make this real, or I can cheat. I'm going to cheat unless there is some reason I cannot. + // I'm going to pretend that headers are 100 bytes long as far as flow control is concerned!! + responseHeaderFrames.add(headers); + flowControl.decrementWindowSize(FAKE_ENCODED_HEADER_SIZE); + return FAKE_ENCODED_HEADER_SIZE; } @Override public int writeHeaders(Http2Headers headers, int streamId, Http2Flag.HeaderFlags flags, Http2FrameData dataFrame, FlowControl.Outbound flowControl) { - writtenHeaders.add(headers); - writtenDataFrames.add(dataFrame); - return 0; + throw new UnsupportedOperationException(); +// responseHeaderFrames.add(headers); +// responseDataFrames.add(dataFrame); +// return 0; } } @@ -429,102 +1149,123 @@ public String name() { } private static final class DeadlineDetectorStub implements DeadlineDetector { + private long currentTime = 0; + private final List futures = new ArrayList<>(); + @NonNull @Override public ScheduledFuture scheduleDeadline(long deadlineNanos, @NonNull Runnable onDeadlineExceeded) { - return new ScheduledFuture<>() { - @Override - public long getDelay(@NonNull TimeUnit unit) { - return 0; - } + final var future = new ScheduledFutureStub(deadlineNanos, onDeadlineExceeded); + this.futures.add(future); + return future; + } - @Override - public int compareTo(@NonNull Delayed o) { - return 0; - } + public void advanceTime(Duration duration) { + currentTime += duration.toNanos(); - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; - } + // Find all futures that have expired and complete them + final var completed = futures.stream() + .filter(f -> f.getDelay(NANOSECONDS) <= currentTime) + .toList(); - @Override - public boolean isCancelled() { - return false; - } + for (var f : completed) { + f.complete(null); + } - @Override - public boolean isDone() { - return false; - } + // And remove them so we don't fire them off again + futures.removeAll(completed); + } + } - @Override - public Object get() { - return null; - } + private static final class ScheduledFutureStub extends CompletableFuture implements ScheduledFuture { + private final long delay; + private final Runnable onDeadlineExceeded; - @Override - public Object get(long timeout, @NonNull TimeUnit unit) { - return null; - } - }; + public ScheduledFutureStub(long delay, @NonNull Runnable onDeadlineExceeded) { + this.delay = delay; + this.onDeadlineExceeded = onDeadlineExceeded; + } + + @Override + public long getDelay(@NonNull TimeUnit unit) { + return unit.convert(delay, NANOSECONDS); + } + + @Override + public int compareTo(@NonNull Delayed o) { + return Long.compare(delay, o.getDelay(NANOSECONDS)); + } + + @Override + public boolean complete(Void value) { + boolean b = super.complete(value); + if (b) { + onDeadlineExceeded.run(); + } + return b; } } - private static final class ServiceInterfaceStub implements ServiceInterface { + private static class ServiceInterfaceStub implements GreeterService { private Method calledMethod; private RequestOptions opts; private List receivedBytes = new ArrayList<>(); private Throwable error; private boolean completed; - static final Method METHOD = () -> "m"; - + @Override @NonNull + public Pipeline open(@NonNull Method method, @NonNull RequestOptions options, @NonNull Pipeline replies) { + this.calledMethod = method; + this.opts = options; + return GreeterService.super.open(method, options, replies); + } + @Override - public String serviceName() { - return "s"; + public HelloReply sayHello(HelloRequest request) { + this.receivedBytes.add(Bytes.wrap(request.toByteArray())); + return HelloReply.newBuilder().build(); } - @NonNull @Override - public String fullName() { - return "s/m"; + public Pipeline sayHelloStreamRequest(Pipeline replies) { + throw new UnsupportedOperationException(); } - @NonNull @Override - public List methods() { - return List.of(METHOD); + public void sayHelloStreamReply(HelloRequest request, Pipeline replies) { + throw new UnsupportedOperationException(); } - @NonNull @Override - public Pipeline open( - @NonNull Method method, - @NonNull RequestOptions opts, - @NonNull Pipeline responses) throws GrpcException { - this.calledMethod = method; - this.opts = opts; + public Pipeline sayHelloStreamBidi(Pipeline replies) { + // Here we receive info from the client. In this case, it is a stream of requests with + // names. We will respond with a stream of replies. return new Pipeline<>() { + @Override + public void clientEndStreamReceived() { + onComplete(); + } + @Override public void onSubscribe(Flow.Subscription subscription) { - // ignored + subscription.request(Long.MAX_VALUE); // turn off flow control } @Override - public void onNext(Bytes item) { - receivedBytes.add(item); + public void onNext(HelloRequest item) { + replies.onNext( + HelloReply.newBuilder().setMessage("Hello " + item.getName()).build()); } @Override public void onError(Throwable throwable) { - error = throwable; + replies.onError(throwable); } @Override public void onComplete() { - completed = true; + replies.onComplete(); } }; } diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/io/WritableSequentialData.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/io/WritableSequentialData.java index 28c82ab9..0080fcb6 100644 --- a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/io/WritableSequentialData.java +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/io/WritableSequentialData.java @@ -16,7 +16,10 @@ package com.hedera.pbj.runtime.io; +import static java.util.Objects.requireNonNull; + import com.hedera.pbj.runtime.io.buffer.BufferedData; +import com.hedera.pbj.runtime.io.buffer.Bytes; import com.hedera.pbj.runtime.io.buffer.RandomAccessData; import edu.umd.cs.findbugs.annotations.NonNull; import java.io.IOException; @@ -197,7 +200,7 @@ default void writeBytes(@NonNull final RandomAccessData src) throws BufferOverfl */ default int writeBytes(@NonNull final InputStream src, final int maxLength) throws UncheckedIOException { // Check for a bad length or a null src - Objects.requireNonNull(src); + requireNonNull(src); if (maxLength < 0) { throw new IllegalArgumentException("The length must be >= 0"); } @@ -234,6 +237,15 @@ default int writeBytes(@NonNull final InputStream src, final int maxLength) thro } } + /** + * Writes the entire content of the given {@link Bytes} to this buffer. + * @param bytes The source {@link Bytes} to write + */ + default void writeBytes(@NonNull final Bytes bytes) { + requireNonNull(bytes); + bytes.writeTo(this); + } + /** * Write a string as UTF8 bytes to this {@link WritableSequentialData}. * diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/io/buffer/Bytes.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/io/buffer/Bytes.java index ab5370a9..7a2707e5 100644 --- a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/io/buffer/Bytes.java +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/io/buffer/Bytes.java @@ -234,7 +234,7 @@ public void writeTo(@NonNull final ByteBuffer dstBuffer) { * @param length The number of bytes to extract. */ public void writeTo(@NonNull final ByteBuffer dstBuffer, final int offset, final int length) { - dstBuffer.put(buffer, Math.toIntExact(start + offset), length); + dstBuffer.put(buffer, Math.toIntExact(start + (long) offset), length); } /** @@ -255,7 +255,7 @@ public void writeTo(@NonNull final OutputStream outStream) { @Override public void writeTo(@NonNull final OutputStream outStream, final int offset, final int length) { try { - outStream.write(buffer, Math.toIntExact(start + offset), length); + outStream.write(buffer, Math.toIntExact(start + (long) offset), length); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -280,7 +280,7 @@ public void writeTo(@NonNull final WritableSequentialData wsd) { * @param length The number of bytes to extract. */ public void writeTo(@NonNull final WritableSequentialData wsd, final int offset, final int length) { - wsd.writeBytes(buffer, Math.toIntExact(start + offset), length); + wsd.writeBytes(buffer, Math.toIntExact(start + (long) offset), length); } /** @@ -302,7 +302,7 @@ public void writeTo(@NonNull final MessageDigest digest) { * @param length The number of bytes to extract. */ public void writeTo(@NonNull final MessageDigest digest, final int offset, final int length) { - digest.update(buffer, Math.toIntExact(start + offset), length); + digest.update(buffer, Math.toIntExact(start + (long) offset), length); } /** @@ -687,10 +687,10 @@ private static Comparator valueSorter(@NonNull final Comparator byt public Bytes append(@NonNull final Bytes bytes) { // The length field of Bytes is int. The length() returns always an int, // so safe to cast. - long length = this.length(); - byte[] newBytes = new byte[(int)(length + (int)bytes.length())]; - this.getBytes(0, newBytes, 0, (int) length); - bytes.getBytes(0, newBytes, (int) length, (int)bytes.length()); + final long len = this.length(); + byte[] newBytes = new byte[(int)(len + (int)bytes.length())]; + this.getBytes(0, newBytes, 0, (int) len); + bytes.getBytes(0, newBytes, (int) len, (int)bytes.length()); return Bytes.wrap(newBytes); }