From 2146cff4589dce23ffbe33047e6a215d6a2a8df0 Mon Sep 17 00:00:00 2001 From: Michael Rebello Date: Fri, 15 Sep 2023 09:53:55 -0700 Subject: [PATCH] Handle `grpc-status` in headers rather than only trailers (#174) "Trailers-only" responses can actually be just headers, in which case we should read `grpc-status` from them. This PR: - Updates the logic for both unary and streaming APIs to properly handle cases where "trailers-only" responses are returned using the headers block - Updates the conformance test suite to test against grpc-go in addition to connect-go in order to catch this case. (grpc-go returns "trailers-only" responses in headers, and connect-go returns them in trailers) Here's the new logic for gRPC: - If non-HTTP 200, report an invalid response - If no body data, assume "trailers-only" - Look for the gRPC status and error details in the headers - Look for the gRPC status and error details in the trailers - If the status is an error, don't process the body Resolves #168. --- .../ConnectNIO/Internal/GRPCInterceptor.swift | 47 +++++++++++-------- .../AsyncAwaitConformance.swift | 34 +++++++++----- .../CallbackConformance.swift | 30 +++++++----- .../ConformanceConfiguration.swift | 36 +++++++++----- 4 files changed, 93 insertions(+), 54 deletions(-) diff --git a/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift b/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift index 92763974..6ecfde95 100644 --- a/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift +++ b/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift @@ -49,16 +49,16 @@ extension GRPCInterceptor: Connect.Interceptor { return response } - guard let responseData = response.message, !responseData.isEmpty else { - let code = response.trailers.grpcStatus() ?? response.code + let (grpcCode, connectError) = self.grpcResult( + fromHeaders: response.headers, trailers: response.trailers + ) + guard grpcCode == .ok, let rawData = response.message, !rawData.isEmpty else { return Connect.HTTPResponse( - code: code, + code: grpcCode, headers: response.headers, message: response.message, trailers: response.trailers, - error: response.error ?? ConnectError.fromGRPCTrailers( - response.trailers, code: code - ), + error: connectError ?? response.error, tracingInfo: response.tracingInfo ) } @@ -69,18 +69,14 @@ extension GRPCInterceptor: Connect.Interceptor { .flatMap { self.config.responseCompressionPool(forName: $0) } do { let messageData = try Connect.Envelope.unpackMessage( - responseData, - compressionPool: compressionPool + rawData, compressionPool: compressionPool ).unpacked - let grpcCode = response.trailers.grpcStatus() ?? .unknown return Connect.HTTPResponse( code: grpcCode, headers: response.headers, message: messageData, trailers: response.trailers, - error: grpcCode == .ok - ? nil - : ConnectError.fromGRPCTrailers(response.trailers, code: grpcCode), + error: nil, tracingInfo: response.tracingInfo ) } catch let error { @@ -120,14 +116,13 @@ extension GRPCInterceptor: Connect.Interceptor { responseHeaders.value = headers return result - case .message(let data): + case .message(let rawData): do { let responseCompressionPool = responseHeaders.value?[ Connect.HeaderConstants.grpcContentEncoding ]?.first.flatMap { self.config.responseCompressionPool(forName: $0) } return .message(try Connect.Envelope.unpackMessage( - data, - compressionPool: responseCompressionPool + rawData, compressionPool: responseCompressionPool ).unpacked) } catch let error { // TODO: Close the stream here? @@ -140,7 +135,9 @@ extension GRPCInterceptor: Connect.Interceptor { return .complete(code: code, error: error, trailers: trailers) } - let grpcCode = trailers?.grpcStatus() ?? .unknown + let (grpcCode, connectError) = self.grpcResult( + fromHeaders: responseHeaders.value, trailers: trailers + ) if grpcCode == .ok { return .complete( code: .ok, @@ -150,9 +147,7 @@ extension GRPCInterceptor: Connect.Interceptor { } else { return .complete( code: grpcCode, - error: trailers - .map { ConnectError.fromGRPCTrailers($0, code: grpcCode) } - ?? error, + error: connectError ?? error, trailers: trailers ) } @@ -160,4 +155,18 @@ extension GRPCInterceptor: Connect.Interceptor { } ) } + + private func grpcResult( + fromHeaders headers: Headers?, trailers: Trailers? + ) -> (code: Code, error: ConnectError?) { + // "Trailers-only" responses can be sent in the headers or trailers block. + // Check for a valid gRPC status in the headers first, then in the trailers. + if let headers = headers, let grpcCode = headers.grpcStatus() { + return (grpcCode, .fromGRPCTrailers(headers, code: grpcCode)) + } else if let trailers = trailers, let grpcCode = trailers.grpcStatus() { + return (grpcCode, .fromGRPCTrailers(trailers, code: grpcCode)) + } else { + return (.unknown, nil) + } + } } diff --git a/Tests/ConnectLibraryTests/ConnectConformance/AsyncAwaitConformance.swift b/Tests/ConnectLibraryTests/ConnectConformance/AsyncAwaitConformance.swift index e442d28e..87b74242 100644 --- a/Tests/ConnectLibraryTests/ConnectConformance/AsyncAwaitConformance.swift +++ b/Tests/ConnectLibraryTests/ConnectConformance/AsyncAwaitConformance.swift @@ -274,20 +274,32 @@ final class AsyncAwaitConformance: XCTestCase { } } - func testUnimplementedMethod() async { - await self.executeTestWithClients { client in + func testUnimplementedMethod() async throws { + let validErrorMessages = [ + // connect-go + "connectrpc.conformance.v1.TestService.UnimplementedCall is not implemented", + // grpc-go + "method UnimplementedCall not implemented", + ] + try await self.executeTestWithClients { client in let response = await client.unimplementedCall( request: SwiftProtobuf.Google_Protobuf_Empty() ) XCTAssertEqual(response.code, .unimplemented) - XCTAssertEqual( - response.error?.message, - "connectrpc.conformance.v1.TestService.UnimplementedCall is not implemented" - ) + XCTAssertTrue(validErrorMessages.contains(try XCTUnwrap(response.error?.message))) } } func testUnimplementedServerStreamingMethod() async throws { + let validErrorMessages = [ + // connect-go + """ + connectrpc.conformance.v1.TestService.UnimplementedStreamingOutputCall is \ + not implemented + """, + // grpc-go + "method UnimplementedStreamingOutputCall not implemented", + ] try await self.executeTestWithClients { client in let expectation = self.expectation(description: "Stream completes") let stream = client.unimplementedStreamingOutputCall() @@ -299,13 +311,9 @@ final class AsyncAwaitConformance: XCTestCase { case .complete(let code, let error, _): XCTAssertEqual(code, .unimplemented) - XCTAssertEqual( - (error as? ConnectError)?.message, - """ - connectrpc.conformance.v1.TestService.UnimplementedStreamingOutputCall is \ - not implemented - """ - ) + XCTAssertTrue(validErrorMessages.contains( + try XCTUnwrap((error as? ConnectError)?.message) + )) expectation.fulfill() } } diff --git a/Tests/ConnectLibraryTests/ConnectConformance/CallbackConformance.swift b/Tests/ConnectLibraryTests/ConnectConformance/CallbackConformance.swift index f6b2a7e5..28de45e5 100644 --- a/Tests/ConnectLibraryTests/ConnectConformance/CallbackConformance.swift +++ b/Tests/ConnectLibraryTests/ConnectConformance/CallbackConformance.swift @@ -301,14 +301,17 @@ final class CallbackConformance: XCTestCase { } func testUnimplementedMethod() { + let validErrorMessages = [ + // connect-go + "connectrpc.conformance.v1.TestService.UnimplementedCall is not implemented", + // grpc-go + "method UnimplementedCall not implemented", + ] self.executeTestWithClients { client in let expectation = self.expectation(description: "Request completes") client.unimplementedCall(request: SwiftProtobuf.Google_Protobuf_Empty()) { response in XCTAssertEqual(response.code, .unimplemented) - XCTAssertEqual( - response.error?.message, - "connectrpc.conformance.v1.TestService.UnimplementedCall is not implemented" - ) + XCTAssertTrue(validErrorMessages.contains(response.error?.message ?? "")) expectation.fulfill() } @@ -317,6 +320,15 @@ final class CallbackConformance: XCTestCase { } func testUnimplementedServerStreamingMethod() throws { + let validErrorMessages = [ + // connect-go + """ + connectrpc.conformance.v1.TestService.UnimplementedStreamingOutputCall is \ + not implemented + """, + // grpc-go + "method UnimplementedStreamingOutputCall not implemented", + ] try self.executeTestWithClients { client in let expectation = self.expectation(description: "Stream completes") let stream = client.unimplementedStreamingOutputCall { result in @@ -326,13 +338,9 @@ final class CallbackConformance: XCTestCase { case .complete(let code, let error, _): XCTAssertEqual(code, .unimplemented) - XCTAssertEqual( - (error as? ConnectError)?.message, - """ - connectrpc.conformance.v1.TestService.UnimplementedStreamingOutputCall is \ - not implemented - """ - ) + XCTAssertTrue(validErrorMessages.contains( + (error as? ConnectError)?.message ?? "" + )) expectation.fulfill() } } diff --git a/Tests/ConnectLibraryTests/ConnectConformance/ConformanceConfiguration.swift b/Tests/ConnectLibraryTests/ConnectConformance/ConformanceConfiguration.swift index 4e89b70a..ef9b1113 100644 --- a/Tests/ConnectLibraryTests/ConnectConformance/ConformanceConfiguration.swift +++ b/Tests/ConnectLibraryTests/ConnectConformance/ConformanceConfiguration.swift @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +// swiftlint:disable number_separator + import Connect import ConnectNIO import Foundation @@ -34,27 +36,39 @@ final class ConformanceConfiguration { /// - returns: A list of configurations to use for conformance tests. static func all(timeout: TimeInterval) -> [ConformanceConfiguration] { let urlSessionClient = ConformanceURLSessionHTTPClient(timeout: timeout) - let nioClient = ConformanceNIOHTTPClient( - // swiftlint:disable:next number_separator + let nioClient8081 = ConformanceNIOHTTPClient( host: "https://localhost", port: 8081, timeout: timeout ) - let matrix: [(networkProtocol: NetworkProtocol, httpClients: [HTTPClientInterface])] = [ - (.connect, [urlSessionClient, nioClient]), - (.grpcWeb, [urlSessionClient, nioClient]), - (.grpc, [nioClient]), // URLSession client does not support gRPC + let nioClient8083 = ConformanceNIOHTTPClient( + host: "https://localhost", port: 8083, timeout: timeout + ) + // swiftlint:disable:next large_tuple + let matrix: [( + networkProtocol: NetworkProtocol, + httpClients: [HTTPClientInterface], + codecs: [Codec], + port: Int + )] = [ + // Port 8081 is used to test against connect-go + (.connect, [urlSessionClient, nioClient8081], [JSONCodec(), ProtoCodec()], 8081), + (.grpcWeb, [urlSessionClient, nioClient8081], [JSONCodec(), ProtoCodec()], 8081), + // URLSession does not support gRPC + (.grpc, [nioClient8081], [JSONCodec(), ProtoCodec()], 8081), + // gRPC should also be tested against grpc-go, which runs on port 8083 + (.grpc, [nioClient8083], [ProtoCodec()], 8083), ] - let codecs: [Codec] = [JSONCodec(), ProtoCodec()] - return matrix.reduce(into: []) { configurations, tuple in + return matrix.reduce(into: [ConformanceConfiguration]()) { configurations, tuple in for httpClient in tuple.httpClients { - for codec in codecs { + for codec in tuple.codecs { configurations.append(.init( description: """ - \(tuple.networkProtocol) + \(type(of: codec)) via \(type(of: httpClient)) + \(tuple.networkProtocol) + \(type(of: codec)) via \(type(of: httpClient)) \ + on port \(tuple.port) """, protocolClient: ProtocolClient( httpClient: httpClient, config: ProtocolClientConfig( - host: "https://localhost:8081", + host: "https://localhost:\(tuple.port)", networkProtocol: tuple.networkProtocol, codec: codec, requestCompression: .init(minBytes: 10, pool: GzipCompressionPool())