diff --git a/src/main/java/org/infinispan/experimental/CacheClient.java b/src/main/java/org/infinispan/experimental/CacheClient.java index d2c694d..3509f05 100644 --- a/src/main/java/org/infinispan/experimental/CacheClient.java +++ b/src/main/java/org/infinispan/experimental/CacheClient.java @@ -5,11 +5,12 @@ import org.infinispan.grpc.CacheGrpc; import org.infinispan.grpc.KeyMsg; import org.infinispan.grpc.KeyValuePairMsg; +import org.infinispan.grpc.TopologyInfoMsg; import org.infinispan.grpc.ValueMsg; +import org.infinispan.grpc.VoidMsg; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import io.grpc.StatusRuntimeException; public class CacheClient { @@ -40,13 +41,19 @@ public ValueMsg get(KeyMsg key) { response = blockingStub.get(key); return response; } - + public ValueMsg put(KeyValuePairMsg pair) { ValueMsg response; response = blockingStub.put(pair); return response; } + + public TopologyInfoMsg topologyGetInfo() + { + VoidMsg request = VoidMsg.getDefaultInstance(); + return blockingStub.topologyGetInfo(request); + } public static void main(String[] args) throws InterruptedException { CacheClient client = new CacheClient("localhost", 50051); @@ -61,6 +68,8 @@ public static void main(String[] args) throws InterruptedException { KeyValuePairMsg pair = KeyValuePairMsg.newBuilder().setValue(value).setKey(key).build(); client.put(pair); ValueMsg retVal = client.get(key); + // With the topology info client must setup an appropriate routing policy + TopologyInfoMsg topologyInfo = client.topologyGetInfo(); System.out.println("Value "+retVal.getMessage()+" for key "+key.getName()); } finally { client.shutdown(); diff --git a/src/main/java/org/infinispan/experimental/ServerNode.java b/src/main/java/org/infinispan/experimental/ServerNode.java index 619ca86..3808fb8 100644 --- a/src/main/java/org/infinispan/experimental/ServerNode.java +++ b/src/main/java/org/infinispan/experimental/ServerNode.java @@ -1,14 +1,18 @@ package org.infinispan.experimental; import java.io.IOException; +import java.util.List; import org.infinispan.Cache; import org.infinispan.grpc.KeyMsg; import org.infinispan.grpc.KeyValuePairMsg; +import org.infinispan.grpc.TopologyInfoMsg; import org.infinispan.grpc.ValueMsg; +import org.infinispan.grpc.VoidMsg; import org.infinispan.manager.CacheContainer; import org.infinispan.manager.DefaultCacheManager; import org.infinispan.manager.EmbeddedCacheManager; +import org.infinispan.remoting.transport.Address; import io.grpc.Server; import io.grpc.ServerBuilder; @@ -29,6 +33,18 @@ public void put(KeyValuePairMsg request, StreamObserver responseObserv private static EmbeddedCacheManager cacheManager; private static Cache cache; + + @Override + public void topologyGetInfo(VoidMsg request, StreamObserver responseObserver) { + List
addressList = cacheManager.getMembers(); + TopologyInfoMsg retVal = TopologyInfoMsg.getDefaultInstance(); + responseObserver.onNext(retVal); + responseObserver.onCompleted(); + } + + public void topologyGetServerList() + { + } @Override public void get(KeyMsg request, StreamObserver responseObserver) { Object oVal = cache.get(request); diff --git a/src/main/proto/cache.proto b/src/main/proto/cache.proto index 31ab729..e4583df 100644 --- a/src/main/proto/cache.proto +++ b/src/main/proto/cache.proto @@ -6,6 +6,7 @@ option java_outer_classname = "CacheProto"; option objc_class_prefix = "HLW"; service Cache { + rpc topologyGetInfo (VoidMsg) returns (TopologyInfoMsg) {} rpc get (KeyMsg) returns (ValueMsg) {} rpc put (KeyValuePairMsg) returns (ValueMsg) {} } @@ -21,4 +22,11 @@ message ValueMsg { message KeyValuePairMsg { KeyMsg key = 1; ValueMsg value = 2; -} \ No newline at end of file +} + +message VoidMsg { +} + +message TopologyInfoMsg { + repeated string serverId = 1; +}