Skip to content

Commit

Permalink
Flow based server streaming client call (#101)
Browse files Browse the repository at this point in the history
* flow based inbound message backpressure

* update tests

* introduce rpc call state interceptor

* add runTest utility function

* add state assertions for server streaming test

* update testing utilities

* delete obsolete response stream class
  • Loading branch information
marcoferrer authored Dec 30, 2019
1 parent 93f9fae commit aad7b71
Show file tree
Hide file tree
Showing 15 changed files with 941 additions and 233 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
install:
- ./gradlew assemble
script:
- ./gradlew check
- ./gradlew -Dkotlinx.coroutines.debug=on check
- cd example-project && ./gradlew test && ./gradlew clean test -PuseKrotoConfigDsl=true
after_success:
- bash <(curl -s https://codecov.io/bash)
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ subprojects{ subproject ->
}

tasks.withType(Test) {

testLogging {
// set options for log level LIFECYCLE
events (
Expand Down
2 changes: 2 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
kotlin.code.style=official


Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ package com.github.marcoferrer.krotoplus.coroutines

import com.github.marcoferrer.krotoplus.coroutines.call.newProducerScope
import com.github.marcoferrer.krotoplus.coroutines.call.toRpcException
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ProducerScope
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -60,7 +60,7 @@ public fun <T> CoroutineScope.launchProducerJob(
}
}

internal suspend fun Channel<*>.awaitCloseOrThrow(){
internal suspend fun SendChannel<*>.awaitCloseOrThrow(){
suspendCancellableCoroutine<Unit> { cont ->
invokeOnClose { error ->
if(error == null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2019 Kroto+ Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.marcoferrer.krotoplus.coroutines.client

import io.grpc.ClientCall
import io.grpc.ForwardingClientCall

internal inline fun <ReqT, RespT, C : ClientCall<ReqT, RespT>> C.beforeCancellation(
crossinline block: C.(message: String?, cause: Throwable?) -> Unit
): ClientCall<ReqT, RespT> {
return object : ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(this) {
override fun cancel(message: String?, cause: Throwable?){
block(message, cause)
super.cancel(message, cause)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,33 @@ import com.github.marcoferrer.krotoplus.coroutines.call.newRpcScope
import com.github.marcoferrer.krotoplus.coroutines.withCoroutineContext
import io.grpc.CallOptions
import io.grpc.MethodDescriptor
import io.grpc.Status
import io.grpc.stub.AbstractStub
import io.grpc.stub.ClientCallStreamObserver
import io.grpc.stub.ClientCalls.asyncBidiStreamingCall
import io.grpc.stub.ClientCalls.asyncClientStreamingCall
import io.grpc.stub.ClientCalls.asyncServerStreamingCall
import io.grpc.stub.ClientCalls.asyncUnaryCall
import io.grpc.stub.ClientResponseObserver
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ProducerScope
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.suspendCancellableCoroutine
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException

internal const val MESSAGE_CLIENT_CANCELLED_CALL = "Client has cancelled the call"

/**
* Executes a unary rpc call using the [io.grpc.Channel] and [io.grpc.CallOptions] attached to the
Expand Down Expand Up @@ -101,22 +117,70 @@ public fun <ReqT, RespT, T : AbstractStub<T>> T.clientCallServerStreaming(
public fun <ReqT, RespT> clientCallServerStreaming(
request: ReqT,
method: MethodDescriptor<ReqT, RespT>,
channel: io.grpc.Channel,
grpcChannel: io.grpc.Channel,
callOptions: CallOptions = CallOptions.DEFAULT
): ReceiveChannel<RespT> {

val initialContext = callOptions.getOption(CALL_OPTION_COROUTINE_CONTEXT)
with(newRpcScope(initialContext, method)) {
val call = channel.newCall(method, callOptions.withCoroutineContext(coroutineContext))
val responseObserverChannel = ClientResponseStreamChannel<ReqT, RespT>(coroutineContext)
asyncServerStreamingCall<ReqT, RespT>(
call,
request,
responseObserverChannel
)
bindScopeCancellationToCall(call)
return responseObserverChannel
val observerAdapter = ResponseObserverChannelAdapter<ReqT, RespT>()
val rpcScope = newRpcScope(callOptions.getOption(CALL_OPTION_COROUTINE_CONTEXT), method)
val responseFlow = callbackFlow<RespT> flow@ {
observerAdapter.scope = this

val call = grpcChannel
.newCall(method, callOptions.withCoroutineContext(coroutineContext))
.beforeCancellation { message, cause ->
observerAdapter.beforeCallCancellation(message, cause)
}

val job = coroutineContext[Job]!!

// Start the RPC Call
asyncServerStreamingCall<ReqT, RespT>(call, request, observerAdapter)

// If our parent job is cancelled before we can
// start the call then we need to propagate the
// cancellation to the underlying call
job.invokeOnCompletion { error ->
// Our job can be cancelled after completion due to the inner machinery
// of kotlinx.coroutines.flow.Channels.kt.emitAll(). Its final operation
// after receiving a close is a call to channel.cancelConsumed(cause).
// Even if it doesnt encounter an exception it will cancel with null.
// We will only invoke cancel on the call
if(job.isCancelled && observerAdapter.isActive){
call.cancel(MESSAGE_CLIENT_CANCELLED_CALL, error)
}
}

suspendCancellableCoroutine<Unit> { cont ->
// Here we need to handle not only parent job cancellation
// but calls to `channel.cancel(...)` as well.
cont.invokeOnCancellation { error ->
if (observerAdapter.isActive) {
call.cancel(MESSAGE_CLIENT_CANCELLED_CALL, error)
}
}
invokeOnClose { error ->
if (error == null)
cont.resume(Unit) else
cont.resumeWithException(error)
}
}
}

// Use buffer UNLIMITED so that we dont drop any inbound messages
return flow { emitAll(responseFlow.buffer(Channel.UNLIMITED)) }
.onEach {
if(observerAdapter.isActive){
observerAdapter.callStreamObserver.request(1)
}
}
// We use buffer RENDEZVOUS on the outer flow so that our
// `onEach` operator is only invoked each time a message is
// collected instead of each time a message is received from
// from the underlying call.
.buffer(Channel.RENDEZVOUS)
.produceIn(rpcScope)

}

public fun <ReqT, RespT, T : AbstractStub<T>> T.clientCallBidiStreaming(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ internal class ClientStreamingCallChannelImpl<ReqT,RespT>(

attachOutboundChannelCompletionHandler(
callStreamObserver, outboundChannel,
onSuccess = { outboundMessageHandler.close() },
onError = { error -> completableResponse.completeExceptionally(error) }
onSuccess = { outboundMessageHandler.close() }
)
completableResponse.invokeOnCompletion {
// If the client prematurely cancels the response
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2019 Kroto+ Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.marcoferrer.krotoplus.coroutines.client

import io.grpc.Status
import io.grpc.stub.ClientCallStreamObserver
import io.grpc.stub.ClientResponseObserver
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.ProducerScope
import java.util.concurrent.atomic.AtomicBoolean

internal class ResponseObserverChannelAdapter<ReqT, RespT>: ClientResponseObserver<ReqT, RespT> {

private val isAborted = AtomicBoolean()
private val isCompleted = AtomicBoolean()

lateinit var scope: ProducerScope<RespT>

lateinit var callStreamObserver: ClientCallStreamObserver<ReqT>
private set

val isActive: Boolean
get() = !(isAborted.get() || isCompleted.get())

override fun beforeStart(requestStream: ClientCallStreamObserver<ReqT>) {
require(::scope.isInitialized){ "Producer scope was not initialized" }
callStreamObserver = requestStream.apply { disableAutoInboundFlowControl() }
}

fun beforeCallCancellation(message: String?, cause: Throwable?){
if(!isAborted.getAndSet(true)) {
val cancellationStatus = Status.CANCELLED
.withDescription(message)
.withCause(cause)
.asRuntimeException()

scope.close(CancellationException(message, cancellationStatus))
}
}

override fun onNext(value: RespT) {
scope.offer(value)
}

override fun onError(t: Throwable) {
isAborted.set(true)
scope.close(t)
scope.cancel(CancellationException(t.message,t))
}

override fun onCompleted() {
isCompleted.set(true)
scope.close()
}
}

Loading

0 comments on commit aad7b71

Please sign in to comment.