Skip to content

Commit

Permalink
add streaming rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
brambg committed Oct 25, 2023
1 parent 0a94da1 commit b876ff4
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -844,8 +844,15 @@ class AnnoRepoClient @JvmOverloads constructor(
)

suspend fun <R> usingGrpc(block: suspend (AnnoRepoGrpcClient) -> R): R {
val channel: ManagedChannel = ManagedChannelBuilder.forAddress(grcpHost, grcpPort!!).usePlaintext().build()
return AnnoRepoGrpcClient(channel, apiKey!!).use { client -> block(client) }
if (apiKey == null) {
throw RuntimeException("apiKey == null")
}
val channel: ManagedChannel = ManagedChannelBuilder
.forAddress(grcpHost, grcpPort!!)
.usePlaintext()
.build()
return AnnoRepoGrpcClient(channel, apiKey)
.use { client -> block(client) }
}

// private functions
Expand Down Expand Up @@ -988,6 +995,7 @@ class AnnoRepoClient @JvmOverloads constructor(
private val log: Logger = LoggerFactory.getLogger(AnnoRepoClient::class.java)
private val oMapper: ObjectMapper = ObjectMapper().registerKotlinModule()
private const val PROPERTY_FILE = "annorepo-client.properties"
const val TWO_HUNDRED_MB = 200 * 1024 * 1024

private val classVersion: String by lazy {
val resourceAsStream = AnnoRepoClient::class.java.getResourceAsStream(PROPERTY_FILE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import java.util.concurrent.TimeUnit
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import io.grpc.ManagedChannel
import io.grpc.Metadata
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.map
import nl.knaw.huc.annorepo.api.WebAnnotationAsMap
import nl.knaw.huc.annorepo.grpc.AddAnnotationsResponse
import nl.knaw.huc.annorepo.grpc.AnnotationIdentifier
import nl.knaw.huc.annorepo.grpc.AnnotationUploadServiceGrpcKt
import nl.knaw.huc.annorepo.grpc.HelloServiceGrpcKt
import nl.knaw.huc.annorepo.grpc.addAnnotationRequest
import nl.knaw.huc.annorepo.grpc.addAnnotationsRequest
import nl.knaw.huc.annorepo.grpc.sayHelloRequest

Expand All @@ -34,6 +39,21 @@ class AnnoRepoGrpcClient(private val channel: ManagedChannel, private val apiKey
return response.annotationIdentifierList
}

fun addContainerAnnotation(
containerName: String,
annotations: List<WebAnnotationAsMap>
): Flow<AnnotationIdentifier> {
val requests = annotations.map {
addAnnotationRequest {
this.containerName = containerName
this.annotationJson = objectMapper.writeValueAsString(it)
}
}.asFlow()
val metadata = Metadata()
metadata.put(Metadata.Key.of("api-key", Metadata.ASCII_STRING_MARSHALLER), "xxxx")
return uploadStub.addAnnotation(requests, metadata).map { it.annotationIdentifier }
}

suspend fun sayHello(name: String, apiKey: String) {
val request = sayHelloRequest {
this.name = name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nl.knaw.huc.annorepo.grpc
import java.net.URI
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import kotlinx.coroutines.flow.count
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import nl.knaw.huc.annorepo.api.WebAnnotation
Expand All @@ -13,7 +14,7 @@ class AnnoRepoGrpcClientTest {
@Disabled
@Test
fun `connect to grpc server via annorepo client`() = runBlocking {
val arc = AnnoRepoClient(serverURI = URI("http://localhost:8080"), apiKey = "something-or-other")
val arc = AnnoRepoClient(serverURI = URI("http://localhost:2023"), apiKey = "something-or-other")
val annotations1 = listOf(
webAnnotation("annotation-1")
)
Expand All @@ -31,6 +32,12 @@ class AnnoRepoGrpcClientTest {
client.sayHello(
"GRPC", "xxxxxxxxx"
)
val annotations2 = listOf(
webAnnotation("annotation-1"),
webAnnotation("annotation-2")
)
val resultFlow = client.addContainerAnnotation("mycontainer", annotations2)
assertThat(resultFlow.count()).isEqualTo(2)
}
arc.usingGrpc { client ->
client.sayHello("WORLD", "aaaaa")
Expand Down
10 changes: 10 additions & 0 deletions common/src/main/proto/annotation_upload_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ option java_package = "nl.knaw.huc.annorepo.grpc";

service AnnotationUploadService {
rpc AddAnnotations(AddAnnotationsRequest) returns (AddAnnotationsResponse);
rpc AddAnnotation(stream AddAnnotationRequest) returns (stream AddAnnotationResponse);
}

message AddAnnotationsRequest {
Expand All @@ -16,6 +17,11 @@ message AddAnnotationsRequest {
repeated string annotation = 3;
}

message AddAnnotationRequest {
string container_name = 1;
string annotation_json = 2;
}

message AnnotationIdentifier {
string id = 1;
string etag = 2;
Expand All @@ -25,6 +31,10 @@ message AddAnnotationsResponse {
repeated AnnotationIdentifier annotation_identifier = 1;
}

message AddAnnotationResponse {
AnnotationIdentifier annotation_identifier = 1;
}

message NamedAnnotation {
string preferred_name = 1;
string annotation = 2;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package nl.knaw.huc.annorepo.grpc

import java.util.UUID
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import org.slf4j.LoggerFactory
import nl.knaw.huc.annorepo.api.WebAnnotationAsMap

Expand All @@ -24,6 +27,14 @@ class AnnotationUploadService : AnnotationUploadServiceGrpcKt.AnnotationUploadSe
).build()
}

override fun addAnnotation(requests: Flow<AddAnnotationRequest>): Flow<AddAnnotationResponse> {
return requests.map {
addAnnotationResponse {
annotationIdentifier = annotationIdentifier { id = UUID.randomUUID().toString(); etag = "my-etag" }
}
}
}

private fun processRequest(request: AddAnnotationsRequest) {
val containerName = request.containerName
val annotationJsonList = request.annotationList
Expand Down

0 comments on commit b876ff4

Please sign in to comment.