Skip to content

Commit

Permalink
Work for task #1
Browse files Browse the repository at this point in the history
  • Loading branch information
rigazilla committed Feb 13, 2018
1 parent 538a24c commit fda28b2
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 3 deletions.
13 changes: 11 additions & 2 deletions src/main/java/org/infinispan/experimental/CacheClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import org.infinispan.grpc.CacheGrpc;
import org.infinispan.grpc.KeyMsg;
import org.infinispan.grpc.KeyValuePairMsg;
import org.infinispan.grpc.TopologyInfoMsg;
import org.infinispan.grpc.ValueMsg;
import org.infinispan.grpc.VoidMsg;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;

public class CacheClient {

Expand Down Expand Up @@ -40,13 +41,19 @@ public ValueMsg get(KeyMsg key) {
response = blockingStub.get(key);
return response;
}

public ValueMsg put(KeyValuePairMsg pair)
{
ValueMsg response;
response = blockingStub.put(pair);
return response;
}

public TopologyInfoMsg topologyGetInfo()
{
VoidMsg request = VoidMsg.getDefaultInstance();
return blockingStub.topologyGetInfo(request);
}

public static void main(String[] args) throws InterruptedException {
CacheClient client = new CacheClient("localhost", 50051);
Expand All @@ -61,6 +68,8 @@ public static void main(String[] args) throws InterruptedException {
KeyValuePairMsg pair = KeyValuePairMsg.newBuilder().setValue(value).setKey(key).build();
client.put(pair);
ValueMsg retVal = client.get(key);
// With the topology info client must setup an appropriate routing policy
TopologyInfoMsg topologyInfo = client.topologyGetInfo();
System.out.println("Value "+retVal.getMessage()+" for key "+key.getName());
} finally {
client.shutdown();
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/infinispan/experimental/ServerNode.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package org.infinispan.experimental;

import java.io.IOException;
import java.util.List;

import org.infinispan.Cache;
import org.infinispan.grpc.KeyMsg;
import org.infinispan.grpc.KeyValuePairMsg;
import org.infinispan.grpc.TopologyInfoMsg;
import org.infinispan.grpc.ValueMsg;
import org.infinispan.grpc.VoidMsg;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Address;

import io.grpc.Server;
import io.grpc.ServerBuilder;
Expand All @@ -29,6 +33,18 @@ public void put(KeyValuePairMsg request, StreamObserver<ValueMsg> responseObserv
private static EmbeddedCacheManager cacheManager;
private static Cache<Object,Object> cache;


@Override
public void topologyGetInfo(VoidMsg request, StreamObserver<TopologyInfoMsg> responseObserver) {
List<Address> addressList = cacheManager.getMembers();
TopologyInfoMsg retVal = TopologyInfoMsg.getDefaultInstance();
responseObserver.onNext(retVal);
responseObserver.onCompleted();
}

public void topologyGetServerList()
{
}
@Override
public void get(KeyMsg request, StreamObserver<ValueMsg> responseObserver) {
Object oVal = cache.get(request);
Expand Down
10 changes: 9 additions & 1 deletion src/main/proto/cache.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ option java_outer_classname = "CacheProto";
option objc_class_prefix = "HLW";

service Cache {
rpc topologyGetInfo (VoidMsg) returns (TopologyInfoMsg) {}
rpc get (KeyMsg) returns (ValueMsg) {}
rpc put (KeyValuePairMsg) returns (ValueMsg) {}
}
Expand All @@ -21,4 +22,11 @@ message ValueMsg {
message KeyValuePairMsg {
KeyMsg key = 1;
ValueMsg value = 2;
}
}

message VoidMsg {
}

message TopologyInfoMsg {
repeated string serverId = 1;
}

0 comments on commit fda28b2

Please sign in to comment.