Skip to content

Commit

Permalink
Hobbits binary fix (#786)
Browse files Browse the repository at this point in the history
* changed RPCMethod identifiers to match spec

* converted all message values to the binary format indicated in the spec

* mainly a fix to gossip

* now all packets are the correct size and no hex strings are sent over the wire

* last thing to do is fix attestation and block format

* added custom serialization to attestations and beaconblocks
  • Loading branch information
jrhea authored Jul 22, 2019
1 parent 57b6045 commit 9dc8556
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 226 deletions.
2 changes: 1 addition & 1 deletion config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# "hobbits": use HobbitsP2PNetwork
networkMode = "mock"
# Gossip options: floodsub,gossipsub,plumtree,none
gossipProtocol = "plumtree"
gossipProtocol = "floodsub"
identity = "0x00"
timer="QuartzTimer"
networkInterface = "0.0.0.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@
import org.apache.logging.log4j.Level;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.crypto.Hash;
import org.apache.tuweni.hobbits.Message;
import org.apache.tuweni.hobbits.Protocol;
import org.apache.tuweni.plumtree.State;
import tech.pegasys.artemis.datastructures.blocks.BeaconBlock;
import tech.pegasys.artemis.datastructures.blocks.BeaconBlockHeader;
import tech.pegasys.artemis.datastructures.operations.Attestation;
import tech.pegasys.artemis.networking.p2p.api.P2PNetwork;
import tech.pegasys.artemis.networking.p2p.hobbits.gossip.GossipCodec;
import tech.pegasys.artemis.networking.p2p.hobbits.gossip.GossipMessage;
import tech.pegasys.artemis.networking.p2p.hobbits.rpc.AttestationMessage;
import tech.pegasys.artemis.networking.p2p.hobbits.rpc.BlockBodiesMessage;
import tech.pegasys.artemis.networking.p2p.hobbits.rpc.GetStatusMessage;
import tech.pegasys.artemis.networking.p2p.hobbits.rpc.HelloMessage;
import tech.pegasys.artemis.networking.p2p.hobbits.rpc.RPCCodec;
Expand All @@ -61,7 +61,7 @@ public abstract class AbstractSocketHandler {
protected final Peer peer;
protected final ChainStorageClient store;
protected final State p2pState;
protected final Set<Long> pendingResponses = new HashSet<>();
protected final Set<BigInteger> pendingResponses = new HashSet<>();
protected final AtomicBoolean status = new AtomicBoolean(true);
protected final Consumer<Bytes> messageSender;
protected final Runnable handlerTermination;
Expand Down Expand Up @@ -136,33 +136,46 @@ public synchronized void handleMessage(Buffer message) {
}

protected void handleRPCMessage(RPCMessage rpcMessage) {
if (RPCMethod.GOODBYE.equals(rpcMessage.method())) {
if (RPCMethod.GOODBYE.code() == rpcMessage.method()) {
closed(null);
} else if (RPCMethod.HELLO.equals(rpcMessage.method())) {
} else if (RPCMethod.HELLO.code() == rpcMessage.method()) {
replyHello(rpcMessage.id());
} else if (RPCMethod.GET_STATUS.equals(rpcMessage.method())) {
} else if (RPCMethod.GET_STATUS.code() == rpcMessage.method()) {
replyStatus(rpcMessage.id());
} else if (RPCMethod.GET_ATTESTATION.equals(rpcMessage.method())) {
} else if (RPCMethod.GET_ATTESTATION.code() == rpcMessage.method()) {
replyAttestation(rpcMessage);
} else if (RPCMethod.GET_BLOCK_BODIES.equals(rpcMessage.method())) {
} else if (RPCMethod.GET_BLOCK_BODIES.code() == rpcMessage.method()) {
replyBlockBodies(rpcMessage);
} else if (RPCMethod.ATTESTATION.equals(rpcMessage.method())) {
Attestation attestation = Attestation.fromBytes(rpcMessage.bodyAs(Bytes.class));
this.eventBus.post(attestation);
} else if (RPCMethod.BLOCK_BODIES.equals(rpcMessage.method())) {
BeaconBlock beaconBlock = BeaconBlock.fromBytes(rpcMessage.bodyAsList().get(0));
this.eventBus.post(beaconBlock);
} else if (RPCMethod.ATTESTATION.code() == rpcMessage.method()) {
AttestationMessage rb = rpcMessage.bodyAs(AttestationMessage.class);
Attestation attestation = rb.body();
String key = attestation.toBytes().toHexString();
if (!receivedMessages.containsKey(key)) {
this.eventBus.post(attestation);
receivedMessages.put(key, true);
}
} else if (RPCMethod.BLOCK_BODIES.code() == rpcMessage.method()) {
BlockBodiesMessage rb = rpcMessage.bodyAs(BlockBodiesMessage.class);
BeaconBlock beaconBlock = rb.bodies().get(0);
String key = beaconBlock.toBytes().toHexString();
if (!receivedMessages.containsKey(key)) {
this.eventBus.post(beaconBlock);
receivedMessages.put(key, true);
}
}
}

public abstract void gossipMessage(
int method, String topic, long timestamp, Bytes messageHash, Bytes32 hash, Bytes body);

protected abstract void handleGossipMessage(GossipMessage gossipMessage);

protected void sendReply(RPCMethod method, Object payload, long id) {
sendBytes(RPCCodec.encode(method, payload, id).toBytes());
protected void sendReply(RPCMethod method, Object payload, BigInteger id) {
sendBytes(RPCCodec.encode(method.code(), payload, id).toBytes());
}

protected void sendMessage(RPCMethod method, Object payload) {
sendBytes(RPCCodec.encode(method, payload, pendingResponses).toBytes());
sendBytes(RPCCodec.encode(method.code(), payload, pendingResponses).toBytes());
}

protected void sendBytes(Bytes bytes) {
Expand All @@ -176,21 +189,7 @@ public void disconnect() {
}
}

public void gossipMessage(
int method, String topic, long timestamp, Bytes messageHash, Bytes32 hash, Bytes body) {
Bytes bytes =
GossipCodec.encode(
method,
topic,
BigInteger.valueOf(timestamp),
messageHash.toArrayUnsafe(),
hash.toArrayUnsafe(),
body.toArrayUnsafe())
.toBytes();
sendBytes(bytes);
}

public void replyHello(long requestId) {
public void replyHello(BigInteger requestId) {
if (!peer.peerHello()) {
HelloMessage msg =
new HelloMessage(
Expand Down Expand Up @@ -220,7 +219,7 @@ public void sendHello() {
peer.setPeerHello(true);
}

public void replyStatus(long requestId) {
public void replyStatus(BigInteger requestId) {
sendReply(
RPCMethod.GET_STATUS,
new GetStatusMessage(
Expand All @@ -239,62 +238,31 @@ public void sendStatus() {

public void replyAttestation(RPCMessage rpcMessage) {
RequestAttestationMessage rb = rpcMessage.bodyAs(RequestAttestationMessage.class);
Bytes32 signature = Hash.sha2_256(Bytes32.wrap(rb.hash()));
store
.getUnprocessedAttestation(signature)
.ifPresent(a -> sendReply(RPCMethod.ATTESTATION, a.toBytes(), rpcMessage.id()));
Optional<Attestation> attestation = store.getUnprocessedAttestation(Bytes32.wrap(rb.hash()));
if (attestation.isPresent()) {
sendReply(RPCMethod.ATTESTATION, new AttestationMessage(attestation.get()), rpcMessage.id());
}
}

public void sendGetAttestation(Bytes32 attestationHash) {
sendMessage(
RPCMethod.GET_ATTESTATION, new RequestAttestationMessage(attestationHash.toArrayUnsafe()));
}

public void replyBlockHeaders(RPCMessage rpcMessage) {
RequestBlocksMessage rb = rpcMessage.bodyAs(RequestBlocksMessage.class);
List<Optional<BeaconBlock>> blocks =
store.getUnprocessedBlock(
Bytes32.wrap(rb.startRoot()), rb.max().longValue(), rb.skip().longValue());
List<Bytes> blockHeaders = new ArrayList<>();
blocks.forEach(
block -> {
if (block.isPresent()) {
blockHeaders.add(
new BeaconBlockHeader(
block.get().getSlot(),
block.get().getParent_root(),
block.get().getState_root(),
block.get().getBody().hash_tree_root(),
block.get().getSignature())
.toBytes());
}
});
if (blockHeaders.size() > 0) {
sendReply(RPCMethod.BLOCK_HEADERS, blockHeaders, rpcMessage.id());
}
}

public void sendGetBlockHeaders(Bytes32 root) {
sendMessage(
RPCMethod.GET_BLOCK_HEADERS,
new RequestBlocksMessage(
root.toArrayUnsafe(), BigInteger.ONE, BigInteger.ONE, BigInteger.ZERO, (short) 0));
}

public void replyBlockBodies(RPCMessage rpcMessage) {
RequestBlocksMessage rb = rpcMessage.bodyAs(RequestBlocksMessage.class);
List<Optional<BeaconBlock>> blocks =
store.getUnprocessedBlock(
Bytes32.wrap(rb.startRoot()), rb.max().longValue(), rb.skip().longValue());
List<Bytes> blockBodies = new ArrayList<>();
List<BeaconBlock> blockBodies = new ArrayList<>();
blocks.forEach(
block -> {
if (block.isPresent()) {
blockBodies.add(block.get().toBytes());
blockBodies.add(block.get());
}
});
if (blockBodies.size() > 0) {
sendReply(RPCMethod.BLOCK_BODIES, blockBodies, rpcMessage.id());
sendReply(RPCMethod.BLOCK_BODIES, new BlockBodiesMessage(blockBodies), rpcMessage.id());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import io.vertx.core.net.NetSocket;
import java.math.BigInteger;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.logging.log4j.Level;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.plumtree.MessageSender;
import org.apache.tuweni.plumtree.State;
import tech.pegasys.artemis.datastructures.blocks.BeaconBlock;
import tech.pegasys.artemis.datastructures.operations.Attestation;
import tech.pegasys.artemis.networking.p2p.hobbits.gossip.GossipCodec;
import tech.pegasys.artemis.networking.p2p.hobbits.gossip.GossipMessage;
import tech.pegasys.artemis.storage.ChainStorageClient;
import tech.pegasys.artemis.util.alogger.ALogger;
Expand All @@ -44,20 +45,36 @@ public FloodsubSocketHandler(
super(eventBus, netSocket, userAgent, peer, store, p2pState, receivedMessages);
}

@Override
public void gossipMessage(
int method, String topic, long timestamp, Bytes messageHash, Bytes32 hash, Bytes body) {
// TODO: Hack to make compatible with HOBBITS FloodSub spec
method = 0;
hash = Bytes32.wrap(body);
Bytes bytes =
GossipCodec.encode(
method,
topic,
BigInteger.valueOf(timestamp),
messageHash.toArrayUnsafe(),
hash.toArrayUnsafe(),
Bytes.EMPTY.toArrayUnsafe())
.toBytes();
sendBytes(bytes);
}

@Override
@SuppressWarnings("StringSplitter")
protected void handleGossipMessage(GossipMessage gossipMessage) {
if (MessageSender.Verb.GOSSIP.ordinal() == gossipMessage.method()) {
Bytes body = Bytes.wrap(gossipMessage.body());
String key = body.toHexString();
if (gossipMessage.method() == 0) {
Bytes32 hash = Bytes32.wrap(gossipMessage.hash());
String key = hash.toHexString();
if (!receivedMessages.containsKey(key)) {
peer.setPeerGossip(body);
peer.setPeerGossip(hash);
if (gossipMessage.getTopic().equalsIgnoreCase("ATTESTATION")) {
Bytes32 attestationHash = Bytes32.wrap(gossipMessage.body());
this.sendGetAttestation(attestationHash);
this.sendGetAttestation(hash);
} else if (gossipMessage.getTopic().equalsIgnoreCase("BLOCK")) {
Bytes32 blockRoot = Bytes32.wrap(gossipMessage.body());
this.sendGetBlockBodies(blockRoot);
this.sendGetBlockBodies(hash);
}
receivedMessages.put(key, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import io.vertx.core.net.NetSocket;
import java.math.BigInteger;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.logging.log4j.Level;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.plumtree.MessageSender;
import org.apache.tuweni.plumtree.State;
import tech.pegasys.artemis.datastructures.blocks.BeaconBlock;
import tech.pegasys.artemis.datastructures.operations.Attestation;
import tech.pegasys.artemis.networking.p2p.hobbits.gossip.GossipCodec;
import tech.pegasys.artemis.networking.p2p.hobbits.gossip.GossipMessage;
import tech.pegasys.artemis.storage.ChainStorageClient;
import tech.pegasys.artemis.util.alogger.ALogger;
Expand All @@ -43,6 +46,21 @@ public PlumtreeSocketHandler(
super(eventBus, netSocket, userAgent, peer, store, p2pState, receivedMessages);
}

@Override
public void gossipMessage(
int method, String topic, long timestamp, Bytes messageHash, Bytes32 hash, Bytes body) {
Bytes bytes =
GossipCodec.encode(
method,
topic,
BigInteger.valueOf(timestamp),
messageHash.toArrayUnsafe(),
hash.toArrayUnsafe(),
body.toArrayUnsafe())
.toBytes();
sendBytes(bytes);
}

@Override
protected void handleGossipMessage(GossipMessage gossipMessage) {
if (MessageSender.Verb.GOSSIP.ordinal() == gossipMessage.method()) {
Expand Down
Loading

0 comments on commit 9dc8556

Please sign in to comment.