Skip to content

Commit

Permalink
Context service (#1408)
Browse files Browse the repository at this point in the history
* Adds helper traits

* Alternative context methods on client calls

* Alternetive context methods on fs2 client calls

* Alternetive context methods on fs2 server calls

* WIP: Update macro

* Removes some unused methods

* Updates internals and tests

* Make changes compatible with previous versions

* Adds nowarn on test

* Minor esthetics code changes

* Makes the client context work as a resource

* Refactors some params and vals

* Fixes context param

* Adds tests

* Removes unused method

* Renames implicit dependencies and improve deprectated message
  • Loading branch information
fedefernandez authored Jan 20, 2022
1 parent 65efe5b commit 73930c4
Show file tree
Hide file tree
Showing 14 changed files with 580 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import cats.effect.std.Dispatcher
import cats.syntax.all._
import fs2.Stream
import fs2.grpc.client.{ClientOptions, Fs2ClientCall}
import higherkindness.mu.rpc.internal.client.tracingKernelToHeaders
import higherkindness.mu.rpc.internal.context.ClientContext
import io.grpc.{CallOptions, Channel, Metadata, MethodDescriptor}
import natchez.Span

object calls {

Expand Down Expand Up @@ -86,72 +85,64 @@ object calls {
}
.pure[F]

def tracingClientStreaming[F[_]: Async, Req, Res](
input: Stream[Kleisli[F, Span[F], *], Req],
def contextClientStreaming[F[_]: Async, C, Req, Res](
input: Stream[Kleisli[F, C, *], Req],
descriptor: MethodDescriptor[Req, Res],
channel: Channel,
options: CallOptions
): Kleisli[F, Span[F], Res] =
Kleisli[F, Span[F], Res] { parentSpan =>
parentSpan.span(descriptor.getFullMethodName()).use { span =>
span.kernel.flatMap { kernel =>
val headers = tracingKernelToHeaders(kernel)
val streamF: Stream[F, Req] =
input.translate(Kleisli.applyK[F, Span[F]](span))
clientStreaming[F, Req, Res](
streamF,
descriptor,
channel,
options,
headers
)
}
)(implicit clientContext: ClientContext[F, C]): Kleisli[F, C, Res] =
Kleisli[F, C, Res] { context =>
clientContext[Req, Res](descriptor, channel, options, context).use { c =>
val streamF: Stream[F, Req] =
input.translate(Kleisli.applyK[F, C](c.context))
clientStreaming[F, Req, Res](
streamF,
descriptor,
channel,
options,
c.metadata
)
}
}

def tracingServerStreaming[F[_]: Async, Req, Res](
def contextServerStreaming[F[_]: Async, C, Req, Res](
request: Req,
descriptor: MethodDescriptor[Req, Res],
channel: Channel,
options: CallOptions
): Kleisli[F, Span[F], Stream[Kleisli[F, Span[F], *], Res]] =
Kleisli[F, Span[F], Stream[Kleisli[F, Span[F], *], Res]] { parentSpan =>
parentSpan.span(descriptor.getFullMethodName()).use { span =>
span.kernel.map { kernel =>
val headers = tracingKernelToHeaders(kernel)
Stream
.resource(Dispatcher[F])
.flatMap { disp =>
Stream
.eval(Fs2ClientCall[F](channel, descriptor, disp, clientOptions(options)))
.flatMap(_.unaryToStreamingCall(request, headers))
}
.translate(Kleisli.liftK[F, Span[F]])
}
)(implicit clientContext: ClientContext[F, C]): Kleisli[F, C, Stream[Kleisli[F, C, *], Res]] =
Kleisli[F, C, Stream[Kleisli[F, C, *], Res]] { context =>
clientContext[Req, Res](descriptor, channel, options, context).use { c =>
Stream
.resource(Dispatcher[F])
.flatMap { disp =>
Stream
.eval(Fs2ClientCall[F](channel, descriptor, disp, clientOptions(options)))
.flatMap(_.unaryToStreamingCall(request, c.metadata))
}
.translate(Kleisli.liftK[F, C])
.pure[F]
}
}

def tracingBidiStreaming[F[_]: Async, Req, Res](
input: Stream[Kleisli[F, Span[F], *], Req],
def contextBidiStreaming[F[_]: Async, C, Req, Res](
input: Stream[Kleisli[F, C, *], Req],
descriptor: MethodDescriptor[Req, Res],
channel: Channel,
options: CallOptions
): Kleisli[F, Span[F], Stream[Kleisli[F, Span[F], *], Res]] =
Kleisli[F, Span[F], Stream[Kleisli[F, Span[F], *], Res]] { parentSpan =>
parentSpan.span(descriptor.getFullMethodName()).use { span =>
span.kernel.map { kernel =>
val headers = tracingKernelToHeaders(kernel)
val streamF: Stream[F, Req] =
input.translate(Kleisli.applyK[F, Span[F]](span))
Stream
.resource(Dispatcher[F])
.flatMap { disp =>
Stream
.eval(Fs2ClientCall[F](channel, descriptor, disp, clientOptions(options)))
.flatMap(_.streamingToStreamingCall(streamF, headers))
}
.translate(Kleisli.liftK[F, Span[F]])
}
)(implicit clientContext: ClientContext[F, C]): Kleisli[F, C, Stream[Kleisli[F, C, *], Res]] =
Kleisli[F, C, Stream[Kleisli[F, C, *], Res]] { context =>
clientContext[Req, Res](descriptor, channel, options, context).use { c =>
val streamF: Stream[F, Req] = input.translate(Kleisli.applyK[F, C](c.context))
Stream
.resource(Dispatcher[F])
.flatMap { disp =>
Stream
.eval(Fs2ClientCall[F](channel, descriptor, disp, clientOptions(options)))
.flatMap(_.streamingToStreamingCall(streamF, c.metadata))
}
.translate(Kleisli.liftK[F, C])
.pure[F]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ import cats.effect.std.Dispatcher
import cats.syntax.functor._
import fs2.Stream
import fs2.grpc.server.{Fs2ServerCallHandler, GzipCompressor, ServerOptions}
import higherkindness.mu.rpc.internal.server.extractTracingKernel
import higherkindness.mu.rpc.internal.context.ServerContext
import higherkindness.mu.rpc.protocol.{CompressionType, Gzip, Identity}
import io.grpc.{Metadata, MethodDescriptor, ServerCallHandler}
import natchez.{EntryPoint, Span}

object handlers {

Expand Down Expand Up @@ -72,68 +71,57 @@ object handlers {
Fs2ServerCallHandler[F](disp, serverCallOptions(compressionType))
.streamingToStreamingCall[Req, Res]((stream, metadata) => Stream.force(f(stream, metadata)))

private type Traced[F[_], A] = Kleisli[F, Span[F], A]
private type StreamOfTraced[F[_], A] = Stream[Kleisli[F, Span[F], *], A]

def tracingClientStreaming[F[_]: Async, Req, Res](
f: StreamOfTraced[F, Req] => Traced[F, Res],
def contextClientStreaming[F[_]: Async, C, Req, Res](
f: Stream[Kleisli[F, C, *], Req] => Kleisli[F, C, Res],
descriptor: MethodDescriptor[Req, Res],
entrypoint: EntryPoint[F],
disp: Dispatcher[F],
compressionType: CompressionType
): ServerCallHandler[Req, Res] =
)(implicit serverContext: ServerContext[F, C]): ServerCallHandler[Req, Res] =
clientStreaming[F, Req, Res](
{ (req: Stream[F, Req], metadata: Metadata) =>
val kernel = extractTracingKernel(metadata)
val streamK: StreamOfTraced[F, Req] = req.translate(Kleisli.liftK[F, Span[F]])
entrypoint.continueOrElseRoot(descriptor.getFullMethodName(), kernel).use[Res] { span =>
f(streamK).run(span)
val streamK: Stream[Kleisli[F, C, *], Req] = req.translate(Kleisli.liftK[F, C])
serverContext[Req, Res](descriptor, metadata).use[Res] { context =>
f(streamK).run(context)
}
},
disp,
compressionType
)

def tracingServerStreaming[F[_]: Async, Req, Res](
f: Req => Traced[F, StreamOfTraced[F, Res]],
def contextServerStreaming[F[_]: Async, C, Req, Res](
f: Req => Kleisli[F, C, Stream[Kleisli[F, C, *], Res]],
descriptor: MethodDescriptor[Req, Res],
entrypoint: EntryPoint[F],
disp: Dispatcher[F],
compressionType: CompressionType
): ServerCallHandler[Req, Res] =
)(implicit serverContext: ServerContext[F, C]): ServerCallHandler[Req, Res] =
serverStreaming[F, Req, Res](
{ (req: Req, metadata: Metadata) =>
val kernel = extractTracingKernel(metadata)
entrypoint
.continueOrElseRoot(descriptor.getFullMethodName(), kernel)
.use[Stream[F, Res]] { span =>
val kleisli: Traced[F, StreamOfTraced[F, Res]] = f(req)
val fStreamK: F[StreamOfTraced[F, Res]] = kleisli.run(span)
fStreamK.map(_.translate(Kleisli.applyK[F, Span[F]](span)))
serverContext[Req, Res](descriptor, metadata)
.use[Stream[F, Res]] { context =>
val kleisli: Kleisli[F, C, Stream[Kleisli[F, C, *], Res]] = f(req)
val fStreamK: F[Stream[Kleisli[F, C, *], Res]] = kleisli.run(context)
fStreamK.map(_.translate(Kleisli.applyK[F, C](context)))
}
},
disp,
compressionType
)

def tracingBidiStreaming[F[_]: Async, Req, Res](
f: StreamOfTraced[F, Req] => Traced[F, StreamOfTraced[F, Res]],
def contextBidiStreaming[F[_]: Async, C, Req, Res](
f: Stream[Kleisli[F, C, *], Req] => Kleisli[F, C, Stream[Kleisli[F, C, *], Res]],
descriptor: MethodDescriptor[Req, Res],
entrypoint: EntryPoint[F],
disp: Dispatcher[F],
compressionType: CompressionType
): ServerCallHandler[Req, Res] =
)(implicit serverContext: ServerContext[F, C]): ServerCallHandler[Req, Res] =
bidiStreaming[F, Req, Res](
{ (req: Stream[F, Req], metadata: Metadata) =>
val kernel = extractTracingKernel(metadata)
val reqStreamK: StreamOfTraced[F, Req] =
req.translate(Kleisli.liftK[F, Span[F]])
entrypoint
.continueOrElseRoot(descriptor.getFullMethodName(), kernel)
.use[Stream[F, Res]] { span =>
val kleisli: Traced[F, StreamOfTraced[F, Res]] = f(reqStreamK)
val fStreamK: F[StreamOfTraced[F, Res]] = kleisli.run(span)
fStreamK.map(_.translate(Kleisli.applyK[F, Span[F]](span)))
val reqStreamK: Stream[Kleisli[F, C, *], Req] =
req.translate(Kleisli.liftK[F, C])
serverContext[Req, Res](descriptor, metadata)
.use[Stream[F, Res]] { context =>
val kleisli: Kleisli[F, C, Stream[Kleisli[F, C, *], Res]] = f(reqStreamK)
val fStreamK: F[Stream[Kleisli[F, C, *], Res]] = kleisli.run(context)
fStreamK.map(_.translate(Kleisli.applyK[F, C](context)))
}
},
disp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ package higherkindness.mu.rpc.internal.client

import cats.data.Kleisli
import cats.effect.{Async, Sync}
import cats.syntax.flatMap._
import higherkindness.mu.rpc.internal.context.ClientContext
import io.grpc.stub.ClientCalls
import io.grpc.{CallOptions, Channel, Metadata, MethodDescriptor}
import natchez.Span

object calls {

Expand All @@ -42,18 +41,15 @@ object calls {
)
)

def tracingUnary[F[_]: Async, Req, Res](
def contextUnary[F[_]: Async, C, Req, Res](
request: Req,
descriptor: MethodDescriptor[Req, Res],
channel: Channel,
options: CallOptions
): Kleisli[F, Span[F], Res] =
Kleisli[F, Span[F], Res] { parentSpan =>
parentSpan.span(descriptor.getFullMethodName).use { span =>
span.kernel.flatMap { kernel =>
val headers = tracingKernelToHeaders(kernel)
unary[F, Req, Res](request, descriptor, channel, options, headers)
}
)(implicit clientContext: ClientContext[F, C]): Kleisli[F, C, Res] =
Kleisli[F, C, Res] { context =>
clientContext[Req, Res](descriptor, channel, options, context).use { c =>
unary[F, Req, Res](request, descriptor, channel, options, c.metadata)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ package higherkindness.mu.rpc.internal
import cats.effect.Async
import cats.syntax.all._
import com.google.common.util.concurrent.ListenableFuture
import io.grpc.Metadata
import io.grpc.Metadata.{ASCII_STRING_MARSHALLER, Key}
import natchez.Kernel

import scala.concurrent.ExecutionException
import scala.util.Try
Expand Down Expand Up @@ -52,12 +49,4 @@ package object client {
back.as(None)
}

private[internal] def tracingKernelToHeaders(kernel: Kernel): Metadata = {
val headers = new Metadata()
kernel.toHeaders.foreach { case (k, v) =>
headers.put(Key.of(k, ASCII_STRING_MARSHALLER), v)
}
headers
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package higherkindness.mu.rpc.internal.context

import cats.effect.Resource
import io.grpc.{CallOptions, Channel, Metadata, MethodDescriptor}

final case class ClientContextMetaData[C](context: C, metadata: Metadata)

trait ClientContext[F[_], C] {

def apply[Req, Res](
descriptor: MethodDescriptor[Req, Res],
channel: Channel,
options: CallOptions,
current: C
): Resource[F, ClientContextMetaData[C]]

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package higherkindness.mu.rpc.internal.context

import cats.effect._
import io.grpc.{Metadata, MethodDescriptor}

trait ServerContext[F[_], C] {

def apply[Req, Res](descriptor: MethodDescriptor[Req, Res], metadata: Metadata): Resource[F, C]

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import cats.data.Kleisli
import cats.effect.Async
import cats.effect.std.Dispatcher
import cats.syntax.all._
import higherkindness.mu.rpc.internal.context.ServerContext
import higherkindness.mu.rpc.protocol.CompressionType
import io.grpc.ServerCall.Listener
import io.grpc._
import io.grpc.stub.ServerCalls.UnaryMethod
import io.grpc.stub.{ServerCalls, StreamObserver}
import natchez.{EntryPoint, Span}

object handlers {

Expand All @@ -36,24 +36,20 @@ object handlers {
): ServerCallHandler[Req, Res] =
ServerCalls.asyncUnaryCall(unaryMethod[F, Req, Res](f, compressionType, disp))

def tracingUnary[F[_]: Async, Req, Res](
f: Req => Kleisli[F, Span[F], Res],
def contextUnary[F[_]: Async, C, Req, Res](
f: Req => Kleisli[F, C, Res],
methodDescriptor: MethodDescriptor[Req, Res],
entrypoint: EntryPoint[F],
compressionType: CompressionType,
disp: Dispatcher[F]
): ServerCallHandler[Req, Res] =
)(implicit serverContext: ServerContext[F, C]): ServerCallHandler[Req, Res] =
new ServerCallHandler[Req, Res] {
def startCall(
call: ServerCall[Req, Res],
metadata: Metadata
): Listener[Req] = {
val kernel = extractTracingKernel(metadata)
val spanResource =
entrypoint.continueOrElseRoot(methodDescriptor.getFullMethodName(), kernel)

val contextResource = serverContext[Req, Res](methodDescriptor, metadata)
val method = unaryMethod[F, Req, Res](
req => spanResource.use(span => f(req).run(span)),
req => contextResource.use(span => f(req).run(span)),
compressionType,
disp
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@
package higherkindness.mu.rpc.internal

import cats.effect.Sync
import io.grpc.{Metadata, Status, StatusException, StatusRuntimeException}
import io.grpc.Metadata.{ASCII_STRING_MARSHALLER, BINARY_HEADER_SUFFIX, Key}
import io.grpc.{Status, StatusException, StatusRuntimeException}
import io.grpc.stub.{ServerCallStreamObserver, StreamObserver}
import higherkindness.mu.rpc.protocol._
import natchez.Kernel
import scala.jdk.CollectionConverters._

package object server {

Expand Down Expand Up @@ -59,16 +56,4 @@ package object server {
}
}

private[internal] def extractTracingKernel(headers: Metadata): Kernel = {
val asciiHeaders = headers
.keys()
.asScala
.collect {
case k if !k.endsWith(BINARY_HEADER_SUFFIX) =>
k -> headers.get(Key.of(k, ASCII_STRING_MARSHALLER))
}
.toMap
Kernel(asciiHeaders)
}

}
Loading

0 comments on commit 73930c4

Please sign in to comment.